EMM A2A Phase 3: Stream task status¶
Цикл EMM A2A:
· Phase 1: Process Manager як A2A сервер
· Phase 2: Task Manager (list_board)
· Phase 2+: TaskStore і tasks/status
У Phase 2+ я описав TaskStore і отримання статусу через polling (POST tasks/status). Нижче - SSE як альтернатива: один GET замість періодичних POST.
Теорія: Streaming в A2A¶
A2A spec підтримує streaming для task status. Замість polling (клієнт кожні N секунд відправляє POST) - сервер пушить події. Менше навантаження на мережу, швидша реакція на зміни. Варіанти: SSE (Server-Sent Events), WebSocket, push notifications. Я обрав SSE: простий HTTP GET, один напрямок (server → client), без додаткових протоколів. WebSocket - надлишковий для one-way потоку; SSE вбудований у браузер і curl.
SSE формат: кожна подія - рядки event: <name> та data: <payload>, розділені подвійним \n\n. Клієнт відкриває з'єднання і читає потік. Браузер має EventSource API; curl з -N (no buffer) теж працює. Content-Type: text/event-stream, Cache-Control: no-cache - щоб проксі не кешували.
Agent Card capabilities: streaming: true - клієнт знає, що може використовувати stream endpoint замість polling. Discovery один раз - далі вибір: POST tasks/status або GET /stream.
Архітектура: три способи отримати status¶
flowchart TB
subgraph Client[A2A Client]
C1[Submit]
C2[Poll: tasks/status]
C3[Stream: GET /stream]
end
subgraph Backend[viz/backend]
ROUTER[A2A Router]
STORE[(TaskStore)]
STREAM[SSE Stream]
end
C1 -->|"POST tasks/submit"| ROUTER
ROUTER --> STORE
C2 -->|"POST tasks/status"| ROUTER
ROUTER -->|"get(taskId)"| STORE
STORE --> ROUTER
ROUTER --> C2
C3 -->|"GET /tasks/{id}/stream"| STREAM
STREAM -->|"read from store"| STORE
STORE --> STREAM
STREAM -->|"event: task/status\ndata: {...}"| C3
Послідовність: SSE stream¶
sequenceDiagram
participant C as A2A Client
participant API as FastAPI
participant Stream as SSE Handler
participant Store as TaskStore
C->>API: GET /api/a2a/tasks/t-1/stream
API->>Stream: StreamingResponse
Stream->>Store: get(t-1)
Store-->>Stream: task (or null)
alt task exists
Stream-->>C: event: task/status\ndata: {"id":"t-1","status":"completed",...}\n\n
else task not found
Stream-->>C: event: task/status\ndata: {"id":"t-1","status":"not_found"}\n\n
end
Note over C,Stream: Connection closes (one event for sync tasks)
Що я реалізував¶
-
GET /api/a2a/tasks/{task_id}/stream - endpoint. FastAPI
StreamingResponse,Content-Type: text/event-stream,Cache-Control: no-cache. Читає task з TaskStore, формує SSE-подію, відправляє і закриває з'єднання. Для sync-задач - одна подія з фінальним статусом. -
Подія -
event: task/status,data: {json}. JSON - task з store (id, status, artifact) або{"id":"...","status":"not_found"}. Клієнт парсить data як JSON. Формат сумісний з tasks/status response. -
Agent Card -
capabilities.streaming = true. Клієнт, який підтримує streaming, може використовувати GET замість POST для отримання статусу.
Код: _stream_task_events - async generator: читає task з store, формує SSE-рядки (event: task/status, data: {json}), yield. StreamingResponse з media_type="text/event-stream", headers={"Cache-Control": "no-cache"}. Endpoint GET /api/a2a/tasks/{task_id}/stream викликає generator і повертає stream.
# viz/backend/viz_backend/routers/a2a.py
async def _stream_task_events(task_id: str):
task = a2a_task_store.get(task_id)
payload = task or {"id": task_id, "status": "not_found"}
yield f"event: task/status\ndata: {json.dumps(payload)}\n\n"
@router.get("/tasks/{task_id}/stream")
async def stream_task(task_id: str):
return StreamingResponse(
_stream_task_events(task_id),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache"},
)
Приклад¶
curl -N http://localhost:8000/api/a2a/tasks/test-1/stream
Відповідь: event: task/status\ndata: {"id":"test-1","status":"not_found"}\n\n
Якщо task є в store - отримаєш task з artifact. -N у curl вимикає буферизацію - події приходять одразу.
Обмеження¶
- Поки одна подія. Process Manager і Task Manager - sync. Результат одразу після submit. Stream віддає один event з фінальним статусом. Не real-time token-by-token (це було б для LLM streaming, не для task status).
- Для майбутніх async tasks - можна пушити кілька подій (running → completed). Потрібно буде тримати з'єднання відкритим і періодично читати store, або інтегрувати з чергою подій.
Чому обрав SSE¶
Просто. HTTP GET, без WebSocket. EventSource в JS, curl -N. Не потрібні додаткові бібліотеки. Інфраструктура готова для async tasks: коли з'являться long-running - додам push подій у stream. Поки один event достатній для sync-сценарію.
Що далі¶
Phase 3 повністю - Calendar, Finance, Content Classifier як A2A skills. Phase 4 - auth, rate limiting, observability. Потім - async submit і multi-event streaming для довгих задач.
