EMM A2A Phase 1: Process Manager як A2A сервер¶
У Expert Memory Machine вже є кілька LangGraph-агентів: Process Manager, Task Manager, Calendar, Finance, Content Classifier. Вони спілкуються через MCP та task_store. Але якщо зовнішній агент (наприклад, Claude Code або інший оркестратор) захоче делегувати їм роботу — потрібен стандартизований інтерфейс. Саме для цього я почав інтегрувати A2A.
Що таке A2A¶
A2A (Agent-to-Agent) — відкритий протокол від Google (квітень 2025) для взаємодії AI-агентів. Транспорт: JSON-RPC 2.0 поверх HTTP(S). Ідея: один агент може делегувати роботу іншому через стандартизований інтерфейс, без point-to-point інтеграцій. Замість того щоб писати окремий REST API для кожного агента, клієнт знаходить агента через discovery, бачить його skills і відправляє task у єдиному форматі.
A2A vs MCP — не конкуренти. MCP (Model Context Protocol) — для model-to-tools: виклики функцій, ресурси. A2A — для колаборації агентів: делегація, multi-turn, stateful. MCP: «модель викликає інструмент». A2A: «агент делегує задачу іншому агенту». У EMM я використовую обидва: MCP для Task Manager, Confluence тощо; A2A — для того, щоб зовнішній агент міг викликати Process Manager як peer.
Ключові елементи A2A:
- Agent Card —
GET /.well-known/agent.json. JSON з описом агента: name, skills, capabilities, URL. Discovery: інший агент знаходить цей і бачить, що він вміє. Це як OpenAPI, але для агентів. - Task lifecycle — submitted → working → completed | failed. Клієнт відправляє task, отримує taskId, може питати статус. Для long-running задач — polling або streaming.
- Message format — multi-modal: text, images. У мене поки тільки text parts; images планую в майбутньому.
Архітектура EMM + A2A¶
flowchart TB
subgraph Client["A2A Client (зовнішній агент)"]
C[Client Agent]
end
subgraph Backend["viz/backend (FastAPI)"]
AC["GET /.well-known/agent.json"]
TASK["POST /api/a2a/tasks"]
ADAPTER[A2A Adapter]
AC --> ADAPTER
TASK --> ADAPTER
end
subgraph Agents["LangGraph Agents"]
PM[Process Manager]
TM[Task Manager]
end
C -->|"1. Discovery"| AC
C -->|"2. tasks/submit (JSON-RPC)"| TASK
ADAPTER -->|"graph.ainvoke()"| PM
PM -->|"task_store, MCP"| TM
Послідовність взаємодії (Phase 1)¶
sequenceDiagram
participant C as A2A Client
participant API as FastAPI
participant Adapter as A2A Adapter
participant PM as Process Manager
participant TM as Task Manager
C->>API: GET /.well-known/agent.json
API->>C: Agent Card (skills: weekly_report)
C->>API: POST /api/a2a/tasks (tasks/submit)
Note over C,API: JSON-RPC: method, params.taskId, params.message
API->>Adapter: parse JSON-RPC, route by skillId
Adapter->>Adapter: ProcessManagerMapper: message → LangGraph input
Adapter->>PM: graph.ainvoke({"operation":"weekly_report","board_id":"..."})
PM->>TM: get_board_metrics, task_store (існуючий)
TM-->>PM: board data
PM-->>Adapter: result (report_md)
Adapter->>Adapter: map result → A2A Task + Artifact
Adapter-->>API: Task {status: completed, artifact}
API-->>C: JSON-RPC response {result: {task}}
Що я реалізував у Phase 1¶
-
Agent Card —
GET /.well-known/agent.json. Повертає JSON зі skills (покиweekly_report), capabilities, url. Клієнт може спочатку зробити GET, побачити доступні skills, і тільки потім відправити submit з потрібним operation у message. -
tasks/submit —
POST /api/a2a/tasksз JSON-RPC. Body:method: "tasks/submit",params: { taskId, message }. Під капотом — Process Managergraph.ainvoke(). Один endpoint для всіх A2A-запитів; маршрутизація по skillId (у Phase 2) або за замовчуванням — Process Manager. -
ProcessManagerMapper — конвертує A2A message в LangGraph input і назад. Я не чіпав Process Manager — він приймає
{"operation":"weekly_report","board_id":"..."}. Adapter мапить: витягує JSON зmessage.parts[0].text, передає в graph, результат обгортає в A2A Task з artifact. Розділення відповідальності: агент не знає про A2A.
Код: Agent Card — статичний JSON з skills. Mapper — витягує parts[].text, парсить JSON, мерджить з defaults. Результат — result_to_a2a_task обгортає в A2A Task.
# viz/backend/viz_backend/a2a/agent_card.py
def build_agent_card(base_url: str) -> dict:
return {
"name": "Expert Memory Machine",
"skills": [{"id": "weekly_report", "name": "Weekly Report", ...}],
"capabilities": {"streaming": True, ...},
}
# viz/backend/viz_backend/a2a/process_manager_mapper.py
def a2a_message_to_input(message: dict) -> dict:
parts = message.get("parts") or []
text_parts = [p.get("text") for p in parts if p.get("type") == "text"]
merged = {"operation": "weekly_report", "board_id": None, ...}
for t in text_parts:
data = json.loads(t) if t.startswith("{") else {"board_id": t}
for k, v in data.items():
if v is not None and k in merged:
merged[k] = v
return merged
Приклад запиту¶
curl -X POST http://localhost:8000/api/a2a/tasks \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"method": "tasks/submit",
"params": {
"taskId": "test-1",
"message": {
"role": "user",
"parts": [{"type": "text", "text": "{\"operation\":\"weekly_report\",\"board_id\":\"<BOARD_ID>\"}"}]
}
},
"id": 1
}'
Response: JSON-RPC з result.task.status = "completed" або "failed", result.task.artifact — результат (report markdown). При помилці — status: "failed", текст помилки в artifact.
Що потрібно для запиту: board_id — UUID дошки з Task Manager. Його можна взяти з frontend (URL при відкритті board) або через MCP Task Manager. Process Manager збирає метрики з цієї дошки (velocity, blocked, cycle time) і формує weekly report.
Обмеження¶
- Написав власний JSON-RPC парсер замість a2a-langchain-adapters. Spec ще draft — бібліотеки можуть змінитись. Ризик: доведеться оновлювати вручну. Але поки spec не стабільний, власний парсер дає контроль.
- Один skill —
weekly_report. Calendar, Finance, Content Classifier — поки не в A2A. Їх додам пізніше, коли adapter layer стабілізується. - Немає auth, rate limiting, observability (Phase 4). Для локальної розробки достатньо; для production — окремий етап.
Чому обрав Process Manager¶
Чіткий input/output; вже використовує task_store (делегація до Task Manager). Task Manager складніший — багато операцій (list_board, create_card, move_card, list_workspaces). Process Manager має одну операцію weekly_report з двома параметрами. Phase 1 — пілот для adapter layer: перевірити, що маппінг A2A ↔ LangGraph працює, перш ніж масштабувати на інших агентів.
Що далі¶
Phase 2 — Task Manager (list_board): другий skill, маршрутизація по skillId.
Phase 2+ — TaskStore і tasks/status: результат доступний після submit, можна питати статус пізніше.
Phase 3 — streaming: SSE замість polling. Детальніше в наступних статтях.
