Перейти до змісту

EMM A2A Phase 3: Stream task status

English version

EMM A2A Phase 3 SSE streaming

Цикл EMM A2A:
· Phase 1: Process Manager як A2A сервер
· Phase 2: Task Manager (list_board)
· Phase 2+: TaskStore і tasks/status

У Phase 2+ я описав TaskStore і отримання статусу через polling (POST tasks/status). Нижче - SSE як альтернатива: один GET замість періодичних POST.

Теорія: Streaming в A2A

A2A spec підтримує streaming для task status. Замість polling (клієнт кожні N секунд відправляє POST) - сервер пушить події. Менше навантаження на мережу, швидша реакція на зміни. Варіанти: SSE (Server-Sent Events), WebSocket, push notifications. Я обрав SSE: простий HTTP GET, один напрямок (server → client), без додаткових протоколів. WebSocket - надлишковий для one-way потоку; SSE вбудований у браузер і curl.

SSE формат: кожна подія - рядки event: <name> та data: <payload>, розділені подвійним \n\n. Клієнт відкриває з'єднання і читає потік. Браузер має EventSource API; curl з -N (no buffer) теж працює. Content-Type: text/event-stream, Cache-Control: no-cache - щоб проксі не кешували.

Agent Card capabilities: streaming: true - клієнт знає, що може використовувати stream endpoint замість polling. Discovery один раз - далі вибір: POST tasks/status або GET /stream.

Архітектура: три способи отримати status

flowchart TB
    subgraph Client[A2A Client]
        C1[Submit]
        C2[Poll: tasks/status]
        C3[Stream: GET /stream]
    end

    subgraph Backend[viz/backend]
        ROUTER[A2A Router]
        STORE[(TaskStore)]
        STREAM[SSE Stream]
    end

    C1 -->|"POST tasks/submit"| ROUTER
    ROUTER --> STORE

    C2 -->|"POST tasks/status"| ROUTER
    ROUTER -->|"get(taskId)"| STORE
    STORE --> ROUTER
    ROUTER --> C2

    C3 -->|"GET /tasks/{id}/stream"| STREAM
    STREAM -->|"read from store"| STORE
    STORE --> STREAM
    STREAM -->|"event: task/status\ndata: {...}"| C3

Послідовність: SSE stream

sequenceDiagram
    participant C as A2A Client
    participant API as FastAPI
    participant Stream as SSE Handler
    participant Store as TaskStore

    C->>API: GET /api/a2a/tasks/t-1/stream
    API->>Stream: StreamingResponse
    Stream->>Store: get(t-1)
    Store-->>Stream: task (or null)

    alt task exists
        Stream-->>C: event: task/status\ndata: {"id":"t-1","status":"completed",...}\n\n
    else task not found
        Stream-->>C: event: task/status\ndata: {"id":"t-1","status":"not_found"}\n\n
    end

    Note over C,Stream: Connection closes (one event for sync tasks)

Що я реалізував

  1. GET /api/a2a/tasks/{task_id}/stream - endpoint. FastAPI StreamingResponse, Content-Type: text/event-stream, Cache-Control: no-cache. Читає task з TaskStore, формує SSE-подію, відправляє і закриває з'єднання. Для sync-задач - одна подія з фінальним статусом.

  2. Подія - event: task/status, data: {json}. JSON - task з store (id, status, artifact) або {"id":"...","status":"not_found"}. Клієнт парсить data як JSON. Формат сумісний з tasks/status response.

  3. Agent Card - capabilities.streaming = true. Клієнт, який підтримує streaming, може використовувати GET замість POST для отримання статусу.

Код: _stream_task_events - async generator: читає task з store, формує SSE-рядки (event: task/status, data: {json}), yield. StreamingResponse з media_type="text/event-stream", headers={"Cache-Control": "no-cache"}. Endpoint GET /api/a2a/tasks/{task_id}/stream викликає generator і повертає stream.

# viz/backend/viz_backend/routers/a2a.py
async def _stream_task_events(task_id: str):
    task = a2a_task_store.get(task_id)
    payload = task or {"id": task_id, "status": "not_found"}
    yield f"event: task/status\ndata: {json.dumps(payload)}\n\n"

@router.get("/tasks/{task_id}/stream")
async def stream_task(task_id: str):
    return StreamingResponse(
        _stream_task_events(task_id),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache"},
    )

Приклад

curl -N http://localhost:8000/api/a2a/tasks/test-1/stream

Відповідь: event: task/status\ndata: {"id":"test-1","status":"not_found"}\n\n

Якщо task є в store - отримаєш task з artifact. -N у curl вимикає буферизацію - події приходять одразу.

Обмеження

  • Поки одна подія. Process Manager і Task Manager - sync. Результат одразу після submit. Stream віддає один event з фінальним статусом. Не real-time token-by-token (це було б для LLM streaming, не для task status).
  • Для майбутніх async tasks - можна пушити кілька подій (running → completed). Потрібно буде тримати з'єднання відкритим і періодично читати store, або інтегрувати з чергою подій.

Чому обрав SSE

Просто. HTTP GET, без WebSocket. EventSource в JS, curl -N. Не потрібні додаткові бібліотеки. Інфраструктура готова для async tasks: коли з'являться long-running - додам push подій у stream. Поки один event достатній для sync-сценарію.

Що далі

Phase 3 повністю - Calendar, Finance, Content Classifier як A2A skills. Phase 4 - auth, rate limiting, observability. Потім - async submit і multi-event streaming для довгих задач.