Перейти до змісту

EMM A2A Phase 2+: TaskStore і tasks/status

English version

EMM A2A Phase 2+ TaskStore

Цикл EMM A2A:
· Phase 1: Process Manager як A2A сервер
· Phase 2: Task Manager (list_board)

Теорія: Task lifecycle в A2A

A2A spec передбачає асинхронну модель. Клієнт відправляє task — отримує taskId. Статус: submittedworkingcompleted | failed | input-required. Для long-running tasks клієнт не чекає на submit — він polling'ом або streaming'ом отримує оновлення. Це дозволяє оркестратору запустити кілька задач паралельно і збирати результати по мірі готовності.

У мене поки sync: submit блокується до завершення. Process Manager і Task Manager виконуються швидко (секунди), тому sync прийнятний для Phase 1–2. Але я додав TaskStore і tasks/status — щоб результат був доступний після submit. Інший агент може відправити task, отримати taskId у response, і пізніше (наприклад, через 10 хвилин) запитати status — отримає результат з store. Це підготовка до async submit: коли з'являться довгі задачі, інфраструктура вже готова.

Архітектура TaskStore

flowchart TB
    subgraph Client[A2A Client]
        C1[Submit]
        C2[Poll status]
    end

    subgraph Backend[viz/backend]
        ROUTER[A2A Router]
        STORE[(InMemoryTaskStore)]
        ADAPTER[A2A Adapter]
    end

    subgraph Agents[Agents]
        PM[Process Manager]
        TM[Task Manager]
    end

    C1 -->|"tasks/submit"| ROUTER
    ROUTER --> ADAPTER
    ADAPTER --> PM
    ADAPTER --> TM
    ADAPTER -->|"save(taskId, task)"| STORE
    ADAPTER --> C1

    C2 -->|"tasks/status"| ROUTER
    ROUTER -->|"get(taskId)"| STORE
    STORE -->|"task or not_found"| ROUTER
    ROUTER --> C2

Послідовність: submit + poll

sequenceDiagram
    participant C as A2A Client
    participant API as FastAPI
    participant Adapter as A2A Adapter
    participant Store as InMemoryTaskStore
    participant PM as Process Manager

    C->>API: POST tasks/submit (taskId: t-1)
    API->>Adapter: route, invoke
    Adapter->>PM: graph.ainvoke(...)
    PM-->>Adapter: result
    Adapter->>Store: put(t-1, task)
    Adapter-->>C: JSON-RPC {result: {task}}

    Note over C: Через час, інший процес...

    C->>API: POST tasks/status (taskId: t-1)
    API->>Store: get(t-1)
    Store-->>API: task (or null)
    API-->>C: {result: {task}} or {task: {status: "not_found"}}

Що я реалізував

  1. InMemoryTaskStore — dict taskId → task. Після submit результат зберігається. TTL 1 година — потім видаляється. Реалізація проста: Python dict з timestamp; background cleanup або перевірка при get. Для production планую Redis або DB — in-memory не переживає рестарт.

  2. tasks/status — JSON-RPC method. params: { taskId }. Повертає task з store або {"id":"...","status":"not_found"}. Клієнт може викликати кілька разів — ідемпотентно. Якщо task ще виконується (у майбутньому async) — поверне status: "working".

  3. tasks/submit записує результат у store одразу після завершення. Навіть sync-запит — результат доступний для polling. Тобто submit повертає task у response, і той самий task можна отримати пізніше через tasks/status. Корисно, якщо клієнт втратив connection або хоче передати taskId іншому процесу.

Код: InMemoryTaskStore — thread-safe dict з TTL. Lazy cleanup при get: якщо monotonic() - created_at > ttl — видаляємо. tasks/status — get(taskId) з store, якщо null — {"status":"not_found"}. Після submit — _store_task() зберігає task перед return.

# viz/backend/viz_backend/a2a/task_store.py
class InMemoryTaskStore:
    DEFAULT_TTL_SECONDS = 3600

    def store(self, task_id, status, artifact=None, error=None):
        with self._lock:
            self._store[task_id] = {
                "id": task_id, "status": status, "artifact": artifact,
                "created_at": time.monotonic(),
            }

    def get(self, task_id):
        entry = self._store.get(task_id)
        if entry and time.monotonic() - entry["created_at"] > self._ttl:
            del self._store[task_id]
            return None
        return {"id": entry["id"], "status": entry["status"], "artifact": entry.get("artifact")}

# tasks/status handler
task = a2a_task_store.get(task_id)
if task is None:
    return {"task": {"id": task_id, "status": "not_found"}}
return {"task": task}

Приклад

Submit:

curl -X POST http://localhost:8000/api/a2a/tasks \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"tasks/submit","params":{"taskId":"test-1","message":{...}},"id":1}'

Poll status:

curl -X POST http://localhost:8000/api/a2a/tasks \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"tasks/status","params":{"taskId":"test-1"},"id":2}'

Response: {"result":{"task":{"id":"test-1","status":"completed","artifact":{...}}}}

Типовий сценарій: Агент A відправляє submit, отримує taskId. Агент B (або той самий A через хвилину) викликає tasks/status з цим taskId — отримує результат. Не потрібно передавати великий artifact по каналах — достатньо taskId.

Обмеження

  • In-memory. Рестарт backend — store очищається. Production — Redis або DB. Поки для dev достатньо.
  • Submit все ще sync: чекає завершення. Але результат можна отримати пізніше. Async submit — окремий етап, потребує черги задач.
  • Відсутній taskId в tasks/status → JSON-RPC error -32602 (Invalid params). Клієнт повинен завжди передавати taskId.

Навіщо це зробив

Підготовка до streaming і async submit. Клієнт може polling замість довгого sync. Інфраструктура для майбутніх long-running tasks. Навіть для sync-задач TaskStore дає гнучкість: результат можна забрати пізніше, передати taskId між процесами.

Що далі

Phase 3 — streaming. SSE endpoint замість polling. Клієнт відкриває GET /stream і отримує події, без періодичних POST.