Перейти до змісту

EMM A2A Phase 2: Task Manager (list_board)

English version

Схема маршрутизації EMM A2A Phase 2

Контекст: мульти-агентний Agent Card

У A2A один Agent Card може описувати кілька skills від різних агентів. Це не «один endpoint = один агент» — це «один endpoint = один A2A server», який маршрутизує по skillId до відповідного LangGraph. Клієнт отримує один JSON з усіма skills і сам вирішує, який викликати. Не потрібні окремі URL для кожного агента.

Phase 1 мав тільки Process Manager. У Phase 2 я додав Task Manager. Один POST /api/a2a/tasks, але тепер Adapter дивиться на params.skillId: якщо list_board — викликає Task Manager; якщо немає або weekly_report — Process Manager. Default fallback — Process Manager, щоб старі клієнти без skillId продовжували працювати.

Схема маршрутизації

flowchart LR
    subgraph Client[A2A Client]
        R[Request]
    end

    subgraph Router[A2A Router]
        P[JSON-RPC Parser]
        ROUTE{skillId?}
        PM_MAP[ProcessManagerMapper]
        TM_MAP[TaskManagerMapper]
    end

    subgraph Agents[Agents]
        PM[Process Manager\nweekly_report]
        TM[Task Manager\nlist_board]
    end

    R --> P
    P --> ROUTE
    ROUTE -->|"default / weekly_report"| PM_MAP
    ROUTE -->|"list_board"| TM_MAP
    PM_MAP --> PM
    TM_MAP --> TM

Послідовність: list_board

sequenceDiagram
    participant C as A2A Client
    participant API as FastAPI
    participant Adapter as A2A Adapter
    participant TM as Task Manager
    participant Store as Task Store / lakeFS

    C->>API: GET /.well-known/agent.json
    API->>C: Agent Card (skills: weekly_report, list_board)

    C->>API: POST /api/a2a/tasks
    Note over C,API: skillId: list_board, message.parts[0].text: {"operation":"list_board","board_id":"..."}

    API->>Adapter: parse, skillId = list_board
    Adapter->>Adapter: TaskManagerMapper: A2A → LangGraph input

    Adapter->>TM: graph.ainvoke({"operation":"list_board","board_id":"..."})
    TM->>Store: read boards, lists, cards
    Store-->>TM: board JSON
    TM-->>Adapter: result (board structure)

    Adapter->>Adapter: map → A2A Task + Artifact
    Adapter-->>C: JSON-RPC {result: {task: {artifact: board}}}

Що я додав

  • skillId — опційний параметр. Відсутній або weekly_report → Process Manager. list_board → Task Manager. Якщо передати невідомий skillId — зараз fallback на Process Manager (може не знайти operation і повернути помилку; явну валідацію додам пізніше).

  • TaskManagerMapper — аналог ProcessManagerMapper, але для Task Manager. Input: {"operation":"list_board","board_id":"..."}. Output — artifact з JSON дошки: структура з lists, у кожному list — cards з id, title, description. Формат такий самий, як у Task Manager API, щоб клієнт міг одразу відображати або обробляти.

  • Agent Card — тепер два skills. Клієнт бачить обидва і вибирає. Discovery один раз — далі виклики з потрібним skillId.

Код: Маршрутизація по skill_id в _handle_tasks_submit. Default — weekly_report (Process Manager). list_board → Task Manager. TaskManagerMapper — такий самий патерн як ProcessManagerMapper: a2a_message_to_input парсить JSON з message.parts, result_to_a2a_task обгортає в A2A Task.

# viz/backend/viz_backend/routers/a2a.py
async def _handle_tasks_submit(params: dict, req_id) -> dict:
    skill_id = params.get("skillId") or "weekly_report"
    if skill_id == "weekly_report":
        return await _handle_process_manager(params, req_id)
    if skill_id == "list_board":
        return await _handle_task_manager(params, req_id)
    # unknown skillId → failed task

# viz/backend/viz_backend/a2a/task_manager_mapper.py
def a2a_message_to_input(message: dict) -> dict:
    merged = {"operation": "list_board", "board_id": None}
    for t in text_parts:
        data = json.loads(t) if t.startswith("{") else {"board_id": t}
        for k, v in data.items():
            if v is not None and k in merged:
                merged[k] = v
    return merged

Приклад

curl -X POST http://localhost:8000/api/a2a/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "method": "tasks/submit",
    "params": {
      "taskId": "test-2",
      "skillId": "list_board",
      "message": {
        "role": "user",
        "parts": [{
          "type": "text",
          "text": "{\"operation\":\"list_board\",\"board_id\":\"<BOARD_ID>\"}"
        }]
      }
    },
    "id": 2
  }'

Замість <BOARD_ID> — реальний ID дошки з Task Manager. Його можна взяти з frontend (URL при відкритті board) або через list_workspaceslist_boards у MCP Task Manager. Response містить повну структуру дошки: lists з cards, можна парсити для подальшої обробки.

Обмеження

  • Поки тільки list_board. Create card, move card, list workspaces — не в A2A. Read-only. Записні операції потребують більше параметрів (list_id, card title, position) — додам окремим skill.
  • Невалідний board_id → status: "failed" з текстом помилки в artifact. Task Manager повертає помилку — Adapter прокидує її в A2A-форматі.

Чому обрав list_board

Найпростіша операція: один input (board_id), один output (JSON дошки). Інші операції Task Manager мають більше параметрів: create_card потребує list_id, title; move_card — card_id, target_list_id. list_board — мінімальний пілот для другого агента: перевірити маршрутизацію і маппінг без складних схем.

Що далі

Phase 2+ — TaskStore і tasks/status. Щоб отримати taskId і питати статус пізніше, без блокування на submit. Потім Phase 3 — streaming через SSE.