Skip to content

EMM A2A Phase 3: Stream task status

Українська версія

EMM A2A Phase 3 SSE streaming

EMM A2A cycle:
· Phase 1: Process Manager as A2A Server
· Phase 2: Task Manager (list_board)
· Phase 2+: TaskStore and tasks/status

In Phase 2+ I covered TaskStore and polling (POST tasks/status). Below - SSE as an alternative: one GET instead of periodic POSTs.

Theory: Streaming in A2A

A2A spec supports streaming for task status. Instead of polling (client sends POST every N seconds) - server pushes events. Less network load, faster reaction to changes. Options: SSE (Server-Sent Events), WebSocket, push notifications. I chose SSE: plain HTTP GET, one-way (server → client), no extra protocols. WebSocket - overkill for one-way flow; SSE built into browser and curl.

SSE format: each event - lines event: <name> and data: <payload>, separated by double \n\n. Client opens connection and reads stream. Browser has EventSource API; curl with -N (no buffer) works too. Content-Type: text/event-stream, Cache-Control: no-cache - so proxies don't cache.

Agent Card capabilities: streaming: true - client knows it can use stream endpoint instead of polling. Discovery once - then choice: POST tasks/status or GET /stream.

Architecture: Three Ways to Get Status

flowchart TB
    subgraph Client[A2A Client]
        C1[Submit]
        C2[Poll: tasks/status]
        C3[Stream: GET /stream]
    end

    subgraph Backend[viz/backend]
        ROUTER[A2A Router]
        STORE[(TaskStore)]
        STREAM[SSE Stream]
    end

    C1 -->|"POST tasks/submit"| ROUTER
    ROUTER --> STORE

    C2 -->|"POST tasks/status"| ROUTER
    ROUTER -->|"get(taskId)"| STORE
    STORE --> ROUTER
    ROUTER --> C2

    C3 -->|"GET /tasks/{id}/stream"| STREAM
    STREAM -->|"read from store"| STORE
    STORE --> STREAM
    STREAM -->|"event: task/status\ndata: {...}"| C3

Sequence: SSE Stream

sequenceDiagram
    participant C as A2A Client
    participant API as FastAPI
    participant Stream as SSE Handler
    participant Store as TaskStore

    C->>API: GET /api/a2a/tasks/t-1/stream
    API->>Stream: StreamingResponse
    Stream->>Store: get(t-1)
    Store-->>Stream: task (or null)

    alt task exists
        Stream-->>C: event: task/status\ndata: {"id":"t-1","status":"completed",...}\n\n
    else task not found
        Stream-->>C: event: task/status\ndata: {"id":"t-1","status":"not_found"}\n\n
    end

    Note over C,Stream: Connection closes (one event for sync tasks)

What I Implemented

  1. GET /api/a2a/tasks/{task_id}/stream - endpoint. FastAPI StreamingResponse, Content-Type: text/event-stream, Cache-Control: no-cache. Reads task from TaskStore, formats SSE event, sends and closes connection. For sync tasks - one event with final status.
  2. Event - event: task/status, data: {json}. JSON - task from store (id, status, artifact) or {"id":"...","status":"not_found"}. Client parses data as JSON. Format compatible with tasks/status response.
  3. Agent Card - capabilities.streaming = true. Client that supports streaming can use GET instead of POST to fetch status.

Code: _stream_task_events - async generator: reads task from store, formats SSE lines (event: task/status, data: {json}), yields. StreamingResponse with media_type="text/event-stream", headers={"Cache-Control": "no-cache"}. Endpoint GET /api/a2a/tasks/{task_id}/stream invokes generator and returns stream.

# viz/backend/viz_backend/routers/a2a.py
async def _stream_task_events(task_id: str):
    task = a2a_task_store.get(task_id)
    payload = task or {"id": task_id, "status": "not_found"}
    yield f"event: task/status\ndata: {json.dumps(payload)}\n\n"

@router.get("/tasks/{task_id}/stream")
async def stream_task(task_id: str):
    return StreamingResponse(
        _stream_task_events(task_id),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache"},
    )

Example

curl -N http://localhost:8000/api/a2a/tasks/test-1/stream

Response: event: task/status\ndata: {"id":"test-1","status":"not_found"}\n\n

If task exists in store - you get task with artifact. -N in curl disables buffering - events arrive immediately.

Limitations

  • So far one event. Process Manager and Task Manager - sync. Result right after submit. Stream returns one event with final status. Not real-time token-by-token (that would be for LLM streaming, not task status).
  • For future async tasks - can push multiple events (running → completed). Will need to keep connection open and periodically read store, or integrate with event queue.

Why I Chose SSE

Simple. HTTP GET, no WebSocket. EventSource in JS, curl -N. No extra libraries. Infrastructure ready for async tasks: when long-running appear - add event push to stream. For now one event is enough for sync scenario.

What's Next

Full Phase 3 - Calendar, Finance, Content Classifier as A2A skills. Phase 4 - auth, rate limiting, observability. Then - async submit and multi-event streaming for long tasks.