Skip to content

EMM A2A Phase 1: Process Manager як A2A сервер

Архітектура EMM A2A Phase 1

English version available

У Expert Memory Machine вже є кілька LangGraph-агентів: Process Manager, Task Manager, Calendar, Finance, Content Classifier. Вони спілкуються через MCP та task_store. Але якщо зовнішній агент (наприклад, Claude Code або інший оркестратор) захоче делегувати їм роботу — потрібен стандартизований інтерфейс. Саме для цього я почав інтегрувати A2A.

Що таке A2A

A2A (Agent-to-Agent) — відкритий протокол від Google (квітень 2025) для взаємодії AI-агентів. Транспорт: JSON-RPC 2.0 поверх HTTP(S). Ідея: один агент може делегувати роботу іншому через стандартизований інтерфейс, без point-to-point інтеграцій. Замість того щоб писати окремий REST API для кожного агента, клієнт знаходить агента через discovery, бачить його skills і відправляє task у єдиному форматі.

A2A vs MCP — не конкуренти. MCP (Model Context Protocol) — для model-to-tools: виклики функцій, ресурси. A2A — для колаборації агентів: делегація, multi-turn, stateful. MCP: «модель викликає інструмент». A2A: «агент делегує задачу іншому агенту». У EMM я використовую обидва: MCP для Task Manager, Confluence тощо; A2A — для того, щоб зовнішній агент міг викликати Process Manager як peer.

Ключові елементи A2A:

  1. Agent CardGET /.well-known/agent.json. JSON з описом агента: name, skills, capabilities, URL. Discovery: інший агент знаходить цей і бачить, що він вміє. Це як OpenAPI, але для агентів.
  2. Task lifecycle — submitted → working → completed | failed. Клієнт відправляє task, отримує taskId, може питати статус. Для long-running задач — polling або streaming.
  3. Message format — multi-modal: text, images. У мене поки тільки text parts; images планую в майбутньому.

Архітектура EMM + A2A

flowchart TB
    subgraph Client["A2A Client (зовнішній агент)"]
        C[Client Agent]
    end

    subgraph Backend["viz/backend (FastAPI)"]
        AC["GET /.well-known/agent.json"]
        TASK["POST /api/a2a/tasks"]
        ADAPTER[A2A Adapter]
        AC --> ADAPTER
        TASK --> ADAPTER
    end

    subgraph Agents["LangGraph Agents"]
        PM[Process Manager]
        TM[Task Manager]
    end

    C -->|"1. Discovery"| AC
    C -->|"2. tasks/submit (JSON-RPC)"| TASK
    ADAPTER -->|"graph.ainvoke()"| PM
    PM -->|"task_store, MCP"| TM

Послідовність взаємодії (Phase 1)

sequenceDiagram
    participant C as A2A Client
    participant API as FastAPI
    participant Adapter as A2A Adapter
    participant PM as Process Manager
    participant TM as Task Manager

    C->>API: GET /.well-known/agent.json
    API->>C: Agent Card (skills: weekly_report)

    C->>API: POST /api/a2a/tasks (tasks/submit)
    Note over C,API: JSON-RPC: method, params.taskId, params.message

    API->>Adapter: parse JSON-RPC, route by skillId
    Adapter->>Adapter: ProcessManagerMapper: message → LangGraph input

    Adapter->>PM: graph.ainvoke({"operation":"weekly_report","board_id":"..."})
    PM->>TM: get_board_metrics, task_store (існуючий)
    TM-->>PM: board data
    PM-->>Adapter: result (report_md)

    Adapter->>Adapter: map result → A2A Task + Artifact
    Adapter-->>API: Task {status: completed, artifact}
    API-->>C: JSON-RPC response {result: {task}}

Що я реалізував у Phase 1

  1. Agent CardGET /.well-known/agent.json. Повертає JSON зі skills (поки weekly_report), capabilities, url. Клієнт може спочатку зробити GET, побачити доступні skills, і тільки потім відправити submit з потрібним operation у message.

  2. tasks/submitPOST /api/a2a/tasks з JSON-RPC. Body: method: "tasks/submit", params: { taskId, message }. Під капотом — Process Manager graph.ainvoke(). Один endpoint для всіх A2A-запитів; маршрутизація по skillId (у Phase 2) або за замовчуванням — Process Manager.

  3. ProcessManagerMapper — конвертує A2A message в LangGraph input і назад. Я не чіпав Process Manager — він приймає {"operation":"weekly_report","board_id":"..."}. Adapter мапить: витягує JSON з message.parts[0].text, передає в graph, результат обгортає в A2A Task з artifact. Розділення відповідальності: агент не знає про A2A.

Код: Agent Card — статичний JSON з skills. Mapper — витягує parts[].text, парсить JSON, мерджить з defaults. Результат — result_to_a2a_task обгортає в A2A Task.

# viz/backend/viz_backend/a2a/agent_card.py
def build_agent_card(base_url: str) -> dict:
    return {
        "name": "Expert Memory Machine",
        "skills": [{"id": "weekly_report", "name": "Weekly Report", ...}],
        "capabilities": {"streaming": True, ...},
    }

# viz/backend/viz_backend/a2a/process_manager_mapper.py
def a2a_message_to_input(message: dict) -> dict:
    parts = message.get("parts") or []
    text_parts = [p.get("text") for p in parts if p.get("type") == "text"]
    merged = {"operation": "weekly_report", "board_id": None, ...}
    for t in text_parts:
        data = json.loads(t) if t.startswith("{") else {"board_id": t}
        for k, v in data.items():
            if v is not None and k in merged:
                merged[k] = v
    return merged

Приклад запиту

curl -X POST http://localhost:8000/api/a2a/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "method": "tasks/submit",
    "params": {
      "taskId": "test-1",
      "message": {
        "role": "user",
        "parts": [{"type": "text", "text": "{\"operation\":\"weekly_report\",\"board_id\":\"<BOARD_ID>\"}"}]
      }
    },
    "id": 1
  }'

Response: JSON-RPC з result.task.status = "completed" або "failed", result.task.artifact — результат (report markdown). При помилці — status: "failed", текст помилки в artifact.

Що потрібно для запиту: board_id — UUID дошки з Task Manager. Його можна взяти з frontend (URL при відкритті board) або через MCP Task Manager. Process Manager збирає метрики з цієї дошки (velocity, blocked, cycle time) і формує weekly report.

Обмеження

  • Написав власний JSON-RPC парсер замість a2a-langchain-adapters. Spec ще draft — бібліотеки можуть змінитись. Ризик: доведеться оновлювати вручну. Але поки spec не стабільний, власний парсер дає контроль.
  • Один skill — weekly_report. Calendar, Finance, Content Classifier — поки не в A2A. Їх додам пізніше, коли adapter layer стабілізується.
  • Немає auth, rate limiting, observability (Phase 4). Для локальної розробки достатньо; для production — окремий етап.

Чому обрав Process Manager

Чіткий input/output; вже використовує task_store (делегація до Task Manager). Task Manager складніший — багато операцій (list_board, create_card, move_card, list_workspaces). Process Manager має одну операцію weekly_report з двома параметрами. Phase 1 — пілот для adapter layer: перевірити, що маппінг A2A ↔ LangGraph працює, перш ніж масштабувати на інших агентів.

Що далі

Phase 2 — Task Manager (list_board): другий skill, маршрутизація по skillId.
Phase 2+ — TaskStore і tasks/status: результат доступний після submit, можна питати статус пізніше.
Phase 3 — streaming: SSE замість polling. Детальніше в наступних статтях.