Skip to content

EMM A2A Phase 2+: TaskStore and tasks/status

Українська версія

EMM A2A Phase 2+ TaskStore

EMM A2A cycle:
· Phase 1: Process Manager as A2A Server
· Phase 2: Task Manager (list_board)
· Phase 3: Stream task status
· Phase 4: Auth, Rate Limiting, Observability

Theory: Task Lifecycle in A2A

A2A spec assumes async model. Client sends task, gets taskId. Status: submittedworkingcompleted | failed | input-required. For long-running tasks client doesn't wait on submit, it gets updates via polling or streaming. This lets orchestrator run multiple tasks in parallel and collect results as they complete.

I'm still sync: submit blocks until done. Process Manager and Task Manager run fast (seconds), so sync is fine for Phase 1-2. But I added TaskStore and tasks/status, so result is available after submit. Another agent can send task, get taskId in response, and later (e.g. 10 min) ask status, gets result from store. Prep for async submit: when long tasks appear, infrastructure is ready.

TaskStore Architecture

flowchart TB
    subgraph Client[A2A Client]
        C1[Submit]
        C2[Poll status]
    end

    subgraph Backend[viz/backend]
        ROUTER[A2A Router]
        STORE[(InMemoryTaskStore)]
        ADAPTER[A2A Adapter]
    end

    subgraph Agents[Agents]
        PM[Process Manager]
        TM[Task Manager]
    end

    C1 -->|"tasks/submit"| ROUTER
    ROUTER --> ADAPTER
    ADAPTER --> PM
    ADAPTER --> TM
    ADAPTER -->|"save(taskId, task)"| STORE
    ADAPTER --> C1

    C2 -->|"tasks/status"| ROUTER
    ROUTER -->|"get(taskId)"| STORE
    STORE -->|"task or not_found"| ROUTER
    ROUTER --> C2

Sequence: submit + poll

sequenceDiagram
    participant C as A2A Client
    participant API as FastAPI
    participant Adapter as A2A Adapter
    participant Store as InMemoryTaskStore
    participant PM as Process Manager

    C->>API: POST tasks/submit (taskId: t-1)
    API->>Adapter: route, invoke
    Adapter->>PM: graph.ainvoke(...)
    PM-->>Adapter: result
    Adapter->>Store: put(t-1, task)
    Adapter-->>C: JSON-RPC {result: {task}}

    Note over C: An hour later, another process...

    C->>API: POST tasks/status (taskId: t-1)
    API->>Store: get(t-1)
    Store-->>API: task (or null)
    API-->>C: {result: {task}} or {task: {status: "not_found"}}

What I Implemented

  1. InMemoryTaskStore, dict taskId → task. After submit, result stored. TTL 1 hour, then removed. Simple impl: Python dict with timestamp; background cleanup or check on get. For production, Redis or DB; in-memory doesn't survive restart.

  2. tasks/status, JSON-RPC method. params: { taskId }. Returns task from store or {"id":"...","status":"not_found"}. Client can call multiple times, idempotent. If task still running (future async), returns status: "working".

  3. tasks/submit writes result to store right after completion. Even sync request, result available for polling. So submit returns task in response, and same task can be fetched later via tasks/status. Useful if client lost connection or wants to pass taskId to another process.

Code: InMemoryTaskStore, thread-safe dict with TTL. Lazy cleanup on get: if monotonic() - created_at > ttl, delete. tasks/status, get(taskId) from store, if null, {"status":"not_found"}. After submit, _store_task() saves task before return.

# viz/backend/viz_backend/a2a/task_store.py
class InMemoryTaskStore:
    DEFAULT_TTL_SECONDS = 3600

    def store(self, task_id, status, artifact=None, error=None):
        with self._lock:
            self._store[task_id] = {
                "id": task_id, "status": status, "artifact": artifact,
                "created_at": time.monotonic(),
            }

    def get(self, task_id):
        entry = self._store.get(task_id)
        if entry and time.monotonic() - entry["created_at"] > self._ttl:
            del self._store[task_id]
            return None
        return {"id": entry["id"], "status": entry["status"], "artifact": entry.get("artifact")}

# tasks/status handler
task = a2a_task_store.get(task_id)
if task is None:
    return {"task": {"id": task_id, "status": "not_found"}}
return {"task": task}

Example

Submit:

curl -X POST http://localhost:8000/api/a2a/tasks \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"tasks/submit","params":{"taskId":"test-1","message":{...}},"id":1}'

Poll status:

curl -X POST http://localhost:8000/api/a2a/tasks \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"tasks/status","params":{"taskId":"test-1"},"id":2}'

Response: {"result":{"task":{"id":"test-1","status":"completed","artifact":{...}}}}

Typical scenario: Agent A sends submit, gets taskId. Agent B (or same A minutes later) calls tasks/status with that taskId, gets result. No need to pass large artifact over channels, taskId is enough.

Limitations

  • In-memory. Backend restart, store cleared. Production, Redis or DB. Fine for dev for now.
  • Submit still sync: waits for completion. But result can be retrieved later. Async submit, separate phase, needs task queue.
  • Missing taskId in tasks/status → JSON-RPC error -32602 (Invalid params). Client must always pass taskId.

Why I Did This

Prep for streaming and async submit. Client can poll instead of long sync. Infrastructure for future long-running tasks. Even for sync tasks TaskStore adds flexibility: result can be fetched later, taskId passed between processes.

What's Next

Phase 3: Stream task status, SSE endpoint instead of polling. Client opens GET /stream and receives events, no periodic POSTs.