EMM A2A Phase 2+: TaskStore і tasks/status¶
Цикл EMM A2A:
· Phase 1: Process Manager як A2A сервер
· Phase 2: Task Manager (list_board)
Теорія: Task lifecycle в A2A¶
A2A spec передбачає асинхронну модель. Клієнт відправляє task — отримує taskId. Статус: submitted → working → completed | failed | input-required. Для long-running tasks клієнт не чекає на submit — він polling'ом або streaming'ом отримує оновлення. Це дозволяє оркестратору запустити кілька задач паралельно і збирати результати по мірі готовності.
У мене поки sync: submit блокується до завершення. Process Manager і Task Manager виконуються швидко (секунди), тому sync прийнятний для Phase 1–2. Але я додав TaskStore і tasks/status — щоб результат був доступний після submit. Інший агент може відправити task, отримати taskId у response, і пізніше (наприклад, через 10 хвилин) запитати status — отримає результат з store. Це підготовка до async submit: коли з'являться довгі задачі, інфраструктура вже готова.
Архітектура TaskStore¶
flowchart TB
subgraph Client[A2A Client]
C1[Submit]
C2[Poll status]
end
subgraph Backend[viz/backend]
ROUTER[A2A Router]
STORE[(InMemoryTaskStore)]
ADAPTER[A2A Adapter]
end
subgraph Agents[Agents]
PM[Process Manager]
TM[Task Manager]
end
C1 -->|"tasks/submit"| ROUTER
ROUTER --> ADAPTER
ADAPTER --> PM
ADAPTER --> TM
ADAPTER -->|"save(taskId, task)"| STORE
ADAPTER --> C1
C2 -->|"tasks/status"| ROUTER
ROUTER -->|"get(taskId)"| STORE
STORE -->|"task or not_found"| ROUTER
ROUTER --> C2
Послідовність: submit + poll¶
sequenceDiagram
participant C as A2A Client
participant API as FastAPI
participant Adapter as A2A Adapter
participant Store as InMemoryTaskStore
participant PM as Process Manager
C->>API: POST tasks/submit (taskId: t-1)
API->>Adapter: route, invoke
Adapter->>PM: graph.ainvoke(...)
PM-->>Adapter: result
Adapter->>Store: put(t-1, task)
Adapter-->>C: JSON-RPC {result: {task}}
Note over C: Через час, інший процес...
C->>API: POST tasks/status (taskId: t-1)
API->>Store: get(t-1)
Store-->>API: task (or null)
API-->>C: {result: {task}} or {task: {status: "not_found"}}
Що я реалізував¶
-
InMemoryTaskStore — dict
taskId → task. Після submit результат зберігається. TTL 1 година — потім видаляється. Реалізація проста: Python dict з timestamp; background cleanup або перевірка при get. Для production планую Redis або DB — in-memory не переживає рестарт. -
tasks/status — JSON-RPC method.
params: { taskId }. Повертає task з store або{"id":"...","status":"not_found"}. Клієнт може викликати кілька разів — ідемпотентно. Якщо task ще виконується (у майбутньому async) — повернеstatus: "working". -
tasks/submit записує результат у store одразу після завершення. Навіть sync-запит — результат доступний для polling. Тобто submit повертає task у response, і той самий task можна отримати пізніше через tasks/status. Корисно, якщо клієнт втратив connection або хоче передати taskId іншому процесу.
Код: InMemoryTaskStore — thread-safe dict з TTL. Lazy cleanup при get: якщо monotonic() - created_at > ttl — видаляємо. tasks/status — get(taskId) з store, якщо null — {"status":"not_found"}. Після submit — _store_task() зберігає task перед return.
# viz/backend/viz_backend/a2a/task_store.py
class InMemoryTaskStore:
DEFAULT_TTL_SECONDS = 3600
def store(self, task_id, status, artifact=None, error=None):
with self._lock:
self._store[task_id] = {
"id": task_id, "status": status, "artifact": artifact,
"created_at": time.monotonic(),
}
def get(self, task_id):
entry = self._store.get(task_id)
if entry and time.monotonic() - entry["created_at"] > self._ttl:
del self._store[task_id]
return None
return {"id": entry["id"], "status": entry["status"], "artifact": entry.get("artifact")}
# tasks/status handler
task = a2a_task_store.get(task_id)
if task is None:
return {"task": {"id": task_id, "status": "not_found"}}
return {"task": task}
Приклад¶
Submit:
curl -X POST http://localhost:8000/api/a2a/tasks \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"tasks/submit","params":{"taskId":"test-1","message":{...}},"id":1}'
Poll status:
curl -X POST http://localhost:8000/api/a2a/tasks \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"tasks/status","params":{"taskId":"test-1"},"id":2}'
Response: {"result":{"task":{"id":"test-1","status":"completed","artifact":{...}}}}
Типовий сценарій: Агент A відправляє submit, отримує taskId. Агент B (або той самий A через хвилину) викликає tasks/status з цим taskId — отримує результат. Не потрібно передавати великий artifact по каналах — достатньо taskId.
Обмеження¶
- In-memory. Рестарт backend — store очищається. Production — Redis або DB. Поки для dev достатньо.
- Submit все ще sync: чекає завершення. Але результат можна отримати пізніше. Async submit — окремий етап, потребує черги задач.
- Відсутній taskId в tasks/status → JSON-RPC error -32602 (Invalid params). Клієнт повинен завжди передавати taskId.
Навіщо це зробив¶
Підготовка до streaming і async submit. Клієнт може polling замість довгого sync. Інфраструктура для майбутніх long-running tasks. Навіть для sync-задач TaskStore дає гнучкість: результат можна забрати пізніше, передати taskId між процесами.
Що далі¶
Phase 3 — streaming. SSE endpoint замість polling. Клієнт відкриває GET /stream і отримує події, без періодичних POST.
