EMM A2A Phase 2+: TaskStore and tasks/status¶
EMM A2A cycle:
· Phase 1: Process Manager as A2A Server
· Phase 2: Task Manager (list_board)
· Phase 3: Stream task status
· Phase 4: Auth, Rate Limiting, Observability
Theory: Task Lifecycle in A2A¶
A2A spec assumes async model. Client sends task, gets taskId. Status: submitted → working → completed | failed | input-required. For long-running tasks client doesn't wait on submit, it gets updates via polling or streaming. This lets orchestrator run multiple tasks in parallel and collect results as they complete.
I'm still sync: submit blocks until done. Process Manager and Task Manager run fast (seconds), so sync is fine for Phase 1-2. But I added TaskStore and tasks/status, so result is available after submit. Another agent can send task, get taskId in response, and later (e.g. 10 min) ask status, gets result from store. Prep for async submit: when long tasks appear, infrastructure is ready.
TaskStore Architecture¶
flowchart TB
subgraph Client[A2A Client]
C1[Submit]
C2[Poll status]
end
subgraph Backend[viz/backend]
ROUTER[A2A Router]
STORE[(InMemoryTaskStore)]
ADAPTER[A2A Adapter]
end
subgraph Agents[Agents]
PM[Process Manager]
TM[Task Manager]
end
C1 -->|"tasks/submit"| ROUTER
ROUTER --> ADAPTER
ADAPTER --> PM
ADAPTER --> TM
ADAPTER -->|"save(taskId, task)"| STORE
ADAPTER --> C1
C2 -->|"tasks/status"| ROUTER
ROUTER -->|"get(taskId)"| STORE
STORE -->|"task or not_found"| ROUTER
ROUTER --> C2
Sequence: submit + poll¶
sequenceDiagram
participant C as A2A Client
participant API as FastAPI
participant Adapter as A2A Adapter
participant Store as InMemoryTaskStore
participant PM as Process Manager
C->>API: POST tasks/submit (taskId: t-1)
API->>Adapter: route, invoke
Adapter->>PM: graph.ainvoke(...)
PM-->>Adapter: result
Adapter->>Store: put(t-1, task)
Adapter-->>C: JSON-RPC {result: {task}}
Note over C: An hour later, another process...
C->>API: POST tasks/status (taskId: t-1)
API->>Store: get(t-1)
Store-->>API: task (or null)
API-->>C: {result: {task}} or {task: {status: "not_found"}}
What I Implemented¶
-
InMemoryTaskStore, dict
taskId → task. After submit, result stored. TTL 1 hour, then removed. Simple impl: Python dict with timestamp; background cleanup or check on get. For production, Redis or DB; in-memory doesn't survive restart. -
tasks/status, JSON-RPC method.
params: { taskId }. Returns task from store or{"id":"...","status":"not_found"}. Client can call multiple times, idempotent. If task still running (future async), returnsstatus: "working". -
tasks/submit writes result to store right after completion. Even sync request, result available for polling. So submit returns task in response, and same task can be fetched later via tasks/status. Useful if client lost connection or wants to pass taskId to another process.
Code: InMemoryTaskStore, thread-safe dict with TTL. Lazy cleanup on get: if monotonic() - created_at > ttl, delete. tasks/status, get(taskId) from store, if null, {"status":"not_found"}. After submit, _store_task() saves task before return.
# viz/backend/viz_backend/a2a/task_store.py
class InMemoryTaskStore:
DEFAULT_TTL_SECONDS = 3600
def store(self, task_id, status, artifact=None, error=None):
with self._lock:
self._store[task_id] = {
"id": task_id, "status": status, "artifact": artifact,
"created_at": time.monotonic(),
}
def get(self, task_id):
entry = self._store.get(task_id)
if entry and time.monotonic() - entry["created_at"] > self._ttl:
del self._store[task_id]
return None
return {"id": entry["id"], "status": entry["status"], "artifact": entry.get("artifact")}
# tasks/status handler
task = a2a_task_store.get(task_id)
if task is None:
return {"task": {"id": task_id, "status": "not_found"}}
return {"task": task}
Example¶
Submit:
curl -X POST http://localhost:8000/api/a2a/tasks \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"tasks/submit","params":{"taskId":"test-1","message":{...}},"id":1}'
Poll status:
curl -X POST http://localhost:8000/api/a2a/tasks \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"tasks/status","params":{"taskId":"test-1"},"id":2}'
Response: {"result":{"task":{"id":"test-1","status":"completed","artifact":{...}}}}
Typical scenario: Agent A sends submit, gets taskId. Agent B (or same A minutes later) calls tasks/status with that taskId, gets result. No need to pass large artifact over channels, taskId is enough.
Limitations¶
- In-memory. Backend restart, store cleared. Production, Redis or DB. Fine for dev for now.
- Submit still sync: waits for completion. But result can be retrieved later. Async submit, separate phase, needs task queue.
- Missing taskId in tasks/status → JSON-RPC error -32602 (Invalid params). Client must always pass taskId.
Why I Did This¶
Prep for streaming and async submit. Client can poll instead of long sync. Infrastructure for future long-running tasks. Even for sync tasks TaskStore adds flexibility: result can be fetched later, taskId passed between processes.
What's Next¶
Phase 3: Stream task status, SSE endpoint instead of polling. Client opens GET /stream and receives events, no periodic POSTs.
