EMM A2A Phase 2+: TaskStore and tasks/status¶
EMM A2A cycle:
· Phase 1: Process Manager as A2A Server
· Phase 2: Task Manager (list_board)
Theory: Task Lifecycle in A2A¶
A2A spec assumes async model. Client sends task — gets taskId. Status: submitted → working → completed | failed | input-required. For long-running tasks client doesn't wait on submit — it gets updates via polling or streaming. This lets orchestrator run multiple tasks in parallel and collect results as they complete.
I'm still sync: submit blocks until done. Process Manager and Task Manager run fast (seconds), so sync is fine for Phase 1–2. But I added TaskStore and tasks/status — so result is available after submit. Another agent can send task, get taskId in response, and later (e.g. 10 min) ask status — gets result from store. Prep for async submit: when long tasks appear, infrastructure is ready.
TaskStore Architecture¶
flowchart TB
subgraph Client[A2A Client]
C1[Submit]
C2[Poll status]
end
subgraph Backend[viz/backend]
ROUTER[A2A Router]
STORE[(InMemoryTaskStore)]
ADAPTER[A2A Adapter]
end
subgraph Agents[Agents]
PM[Process Manager]
TM[Task Manager]
end
C1 -->|"tasks/submit"| ROUTER
ROUTER --> ADAPTER
ADAPTER --> PM
ADAPTER --> TM
ADAPTER -->|"save(taskId, task)"| STORE
ADAPTER --> C1
C2 -->|"tasks/status"| ROUTER
ROUTER -->|"get(taskId)"| STORE
STORE -->|"task or not_found"| ROUTER
ROUTER --> C2
Sequence: submit + poll¶
sequenceDiagram
participant C as A2A Client
participant API as FastAPI
participant Adapter as A2A Adapter
participant Store as InMemoryTaskStore
participant PM as Process Manager
C->>API: POST tasks/submit (taskId: t-1)
API->>Adapter: route, invoke
Adapter->>PM: graph.ainvoke(...)
PM-->>Adapter: result
Adapter->>Store: put(t-1, task)
Adapter-->>C: JSON-RPC {result: {task}}
Note over C: An hour later, another process...
C->>API: POST tasks/status (taskId: t-1)
API->>Store: get(t-1)
Store-->>API: task (or null)
API-->>C: {result: {task}} or {task: {status: "not_found"}}
What I Implemented¶
-
InMemoryTaskStore — dict
taskId → task. After submit, result stored. TTL 1 hour — then removed. Simple impl: Python dict with timestamp; background cleanup or check on get. For production — Redis or DB; in-memory doesn't survive restart. -
tasks/status — JSON-RPC method.
params: { taskId }. Returns task from store or{"id":"...","status":"not_found"}. Client can call multiple times — idempotent. If task still running (future async) — returnsstatus: "working". -
tasks/submit writes result to store right after completion. Even sync request — result available for polling. So submit returns task in response, and same task can be fetched later via tasks/status. Useful if client lost connection or wants to pass taskId to another process.
Code: InMemoryTaskStore — thread-safe dict with TTL. Lazy cleanup on get: if monotonic() - created_at > ttl — delete. tasks/status — get(taskId) from store, if null — {"status":"not_found"}. After submit — _store_task() saves task before return.
# viz/backend/viz_backend/a2a/task_store.py
class InMemoryTaskStore:
DEFAULT_TTL_SECONDS = 3600
def store(self, task_id, status, artifact=None, error=None):
with self._lock:
self._store[task_id] = {
"id": task_id, "status": status, "artifact": artifact,
"created_at": time.monotonic(),
}
def get(self, task_id):
entry = self._store.get(task_id)
if entry and time.monotonic() - entry["created_at"] > self._ttl:
del self._store[task_id]
return None
return {"id": entry["id"], "status": entry["status"], "artifact": entry.get("artifact")}
# tasks/status handler
task = a2a_task_store.get(task_id)
if task is None:
return {"task": {"id": task_id, "status": "not_found"}}
return {"task": task}
Example¶
Submit:
curl -X POST http://localhost:8000/api/a2a/tasks \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"tasks/submit","params":{"taskId":"test-1","message":{...}},"id":1}'
Poll status:
curl -X POST http://localhost:8000/api/a2a/tasks \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"tasks/status","params":{"taskId":"test-1"},"id":2}'
Response: {"result":{"task":{"id":"test-1","status":"completed","artifact":{...}}}}
Typical scenario: Agent A sends submit, gets taskId. Agent B (or same A minutes later) calls tasks/status with that taskId — gets result. No need to pass large artifact over channels — taskId is enough.
Limitations¶
- In-memory. Backend restart — store cleared. Production — Redis or DB. Fine for dev for now.
- Submit still sync: waits for completion. But result can be retrieved later. Async submit — separate phase, needs task queue.
- Missing taskId in tasks/status → JSON-RPC error -32602 (Invalid params). Client must always pass taskId.
Why I Did This¶
Prep for streaming and async submit. Client can poll instead of long sync. Infrastructure for future long-running tasks. Even for sync tasks TaskStore adds flexibility: result can be fetched later, taskId passed between processes.
What's Next¶
Phase 3 — streaming. SSE endpoint instead of polling. Client opens GET /stream and receives events, no periodic POSTs.
