EMM A2A Phase 3: Stream task status¶
EMM A2A cycle:
· Phase 1: Process Manager as A2A Server
· Phase 2: Task Manager (list_board)
· Phase 2+: TaskStore and tasks/status
In Phase 2+ I covered TaskStore and polling (POST tasks/status). Below - SSE as an alternative: one GET instead of periodic POSTs.
Theory: Streaming in A2A¶
A2A spec supports streaming for task status. Instead of polling (client sends POST every N seconds) - server pushes events. Less network load, faster reaction to changes. Options: SSE (Server-Sent Events), WebSocket, push notifications. I chose SSE: plain HTTP GET, one-way (server → client), no extra protocols. WebSocket - overkill for one-way flow; SSE built into browser and curl.
SSE format: each event - lines event: <name> and data: <payload>, separated by double \n\n. Client opens connection and reads stream. Browser has EventSource API; curl with -N (no buffer) works too. Content-Type: text/event-stream, Cache-Control: no-cache - so proxies don't cache.
Agent Card capabilities: streaming: true - client knows it can use stream endpoint instead of polling. Discovery once - then choice: POST tasks/status or GET /stream.
Architecture: Three Ways to Get Status¶
flowchart TB
subgraph Client[A2A Client]
C1[Submit]
C2[Poll: tasks/status]
C3[Stream: GET /stream]
end
subgraph Backend[viz/backend]
ROUTER[A2A Router]
STORE[(TaskStore)]
STREAM[SSE Stream]
end
C1 -->|"POST tasks/submit"| ROUTER
ROUTER --> STORE
C2 -->|"POST tasks/status"| ROUTER
ROUTER -->|"get(taskId)"| STORE
STORE --> ROUTER
ROUTER --> C2
C3 -->|"GET /tasks/{id}/stream"| STREAM
STREAM -->|"read from store"| STORE
STORE --> STREAM
STREAM -->|"event: task/status\ndata: {...}"| C3
Sequence: SSE Stream¶
sequenceDiagram
participant C as A2A Client
participant API as FastAPI
participant Stream as SSE Handler
participant Store as TaskStore
C->>API: GET /api/a2a/tasks/t-1/stream
API->>Stream: StreamingResponse
Stream->>Store: get(t-1)
Store-->>Stream: task (or null)
alt task exists
Stream-->>C: event: task/status\ndata: {"id":"t-1","status":"completed",...}\n\n
else task not found
Stream-->>C: event: task/status\ndata: {"id":"t-1","status":"not_found"}\n\n
end
Note over C,Stream: Connection closes (one event for sync tasks)
What I Implemented¶
- GET /api/a2a/tasks/{task_id}/stream - endpoint. FastAPI
StreamingResponse,Content-Type: text/event-stream,Cache-Control: no-cache. Reads task from TaskStore, formats SSE event, sends and closes connection. For sync tasks - one event with final status. - Event -
event: task/status,data: {json}. JSON - task from store (id, status, artifact) or{"id":"...","status":"not_found"}. Client parses data as JSON. Format compatible with tasks/status response. - Agent Card -
capabilities.streaming = true. Client that supports streaming can use GET instead of POST to fetch status.
Code: _stream_task_events - async generator: reads task from store, formats SSE lines (event: task/status, data: {json}), yields. StreamingResponse with media_type="text/event-stream", headers={"Cache-Control": "no-cache"}. Endpoint GET /api/a2a/tasks/{task_id}/stream invokes generator and returns stream.
# viz/backend/viz_backend/routers/a2a.py
async def _stream_task_events(task_id: str):
task = a2a_task_store.get(task_id)
payload = task or {"id": task_id, "status": "not_found"}
yield f"event: task/status\ndata: {json.dumps(payload)}\n\n"
@router.get("/tasks/{task_id}/stream")
async def stream_task(task_id: str):
return StreamingResponse(
_stream_task_events(task_id),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache"},
)
Example¶
curl -N http://localhost:8000/api/a2a/tasks/test-1/stream
Response: event: task/status\ndata: {"id":"test-1","status":"not_found"}\n\n
If task exists in store - you get task with artifact. -N in curl disables buffering - events arrive immediately.
Limitations¶
- So far one event. Process Manager and Task Manager - sync. Result right after submit. Stream returns one event with final status. Not real-time token-by-token (that would be for LLM streaming, not task status).
- For future async tasks - can push multiple events (running → completed). Will need to keep connection open and periodically read store, or integrate with event queue.
Why I Chose SSE¶
Simple. HTTP GET, no WebSocket. EventSource in JS, curl -N. No extra libraries. Infrastructure ready for async tasks: when long-running appear - add event push to stream. For now one event is enough for sync scenario.
What's Next¶
Full Phase 3 - Calendar, Finance, Content Classifier as A2A skills. Phase 4 - auth, rate limiting, observability. Then - async submit and multi-event streaming for long tasks.
