Skip to content

EMM A2A Phase 2+: TaskStore and tasks/status

Українська версія

EMM A2A Phase 2+ TaskStore

EMM A2A cycle:
· Phase 1: Process Manager as A2A Server
· Phase 2: Task Manager (list_board)

Theory: Task Lifecycle in A2A

A2A spec assumes async model. Client sends task — gets taskId. Status: submittedworkingcompleted | failed | input-required. For long-running tasks client doesn't wait on submit — it gets updates via polling or streaming. This lets orchestrator run multiple tasks in parallel and collect results as they complete.

I'm still sync: submit blocks until done. Process Manager and Task Manager run fast (seconds), so sync is fine for Phase 1–2. But I added TaskStore and tasks/status — so result is available after submit. Another agent can send task, get taskId in response, and later (e.g. 10 min) ask status — gets result from store. Prep for async submit: when long tasks appear, infrastructure is ready.

TaskStore Architecture

flowchart TB
    subgraph Client[A2A Client]
        C1[Submit]
        C2[Poll status]
    end

    subgraph Backend[viz/backend]
        ROUTER[A2A Router]
        STORE[(InMemoryTaskStore)]
        ADAPTER[A2A Adapter]
    end

    subgraph Agents[Agents]
        PM[Process Manager]
        TM[Task Manager]
    end

    C1 -->|"tasks/submit"| ROUTER
    ROUTER --> ADAPTER
    ADAPTER --> PM
    ADAPTER --> TM
    ADAPTER -->|"save(taskId, task)"| STORE
    ADAPTER --> C1

    C2 -->|"tasks/status"| ROUTER
    ROUTER -->|"get(taskId)"| STORE
    STORE -->|"task or not_found"| ROUTER
    ROUTER --> C2

Sequence: submit + poll

sequenceDiagram
    participant C as A2A Client
    participant API as FastAPI
    participant Adapter as A2A Adapter
    participant Store as InMemoryTaskStore
    participant PM as Process Manager

    C->>API: POST tasks/submit (taskId: t-1)
    API->>Adapter: route, invoke
    Adapter->>PM: graph.ainvoke(...)
    PM-->>Adapter: result
    Adapter->>Store: put(t-1, task)
    Adapter-->>C: JSON-RPC {result: {task}}

    Note over C: An hour later, another process...

    C->>API: POST tasks/status (taskId: t-1)
    API->>Store: get(t-1)
    Store-->>API: task (or null)
    API-->>C: {result: {task}} or {task: {status: "not_found"}}

What I Implemented

  1. InMemoryTaskStore — dict taskId → task. After submit, result stored. TTL 1 hour — then removed. Simple impl: Python dict with timestamp; background cleanup or check on get. For production — Redis or DB; in-memory doesn't survive restart.

  2. tasks/status — JSON-RPC method. params: { taskId }. Returns task from store or {"id":"...","status":"not_found"}. Client can call multiple times — idempotent. If task still running (future async) — returns status: "working".

  3. tasks/submit writes result to store right after completion. Even sync request — result available for polling. So submit returns task in response, and same task can be fetched later via tasks/status. Useful if client lost connection or wants to pass taskId to another process.

Code: InMemoryTaskStore — thread-safe dict with TTL. Lazy cleanup on get: if monotonic() - created_at > ttl — delete. tasks/status — get(taskId) from store, if null — {"status":"not_found"}. After submit — _store_task() saves task before return.

# viz/backend/viz_backend/a2a/task_store.py
class InMemoryTaskStore:
    DEFAULT_TTL_SECONDS = 3600

    def store(self, task_id, status, artifact=None, error=None):
        with self._lock:
            self._store[task_id] = {
                "id": task_id, "status": status, "artifact": artifact,
                "created_at": time.monotonic(),
            }

    def get(self, task_id):
        entry = self._store.get(task_id)
        if entry and time.monotonic() - entry["created_at"] > self._ttl:
            del self._store[task_id]
            return None
        return {"id": entry["id"], "status": entry["status"], "artifact": entry.get("artifact")}

# tasks/status handler
task = a2a_task_store.get(task_id)
if task is None:
    return {"task": {"id": task_id, "status": "not_found"}}
return {"task": task}

Example

Submit:

curl -X POST http://localhost:8000/api/a2a/tasks \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"tasks/submit","params":{"taskId":"test-1","message":{...}},"id":1}'

Poll status:

curl -X POST http://localhost:8000/api/a2a/tasks \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"tasks/status","params":{"taskId":"test-1"},"id":2}'

Response: {"result":{"task":{"id":"test-1","status":"completed","artifact":{...}}}}

Typical scenario: Agent A sends submit, gets taskId. Agent B (or same A minutes later) calls tasks/status with that taskId — gets result. No need to pass large artifact over channels — taskId is enough.

Limitations

  • In-memory. Backend restart — store cleared. Production — Redis or DB. Fine for dev for now.
  • Submit still sync: waits for completion. But result can be retrieved later. Async submit — separate phase, needs task queue.
  • Missing taskId in tasks/status → JSON-RPC error -32602 (Invalid params). Client must always pass taskId.

Why I Did This

Prep for streaming and async submit. Client can poll instead of long sync. Infrastructure for future long-running tasks. Even for sync tasks TaskStore adds flexibility: result can be fetched later, taskId passed between processes.

What's Next

Phase 3 — streaming. SSE endpoint instead of polling. Client opens GET /stream and receives events, no periodic POSTs.