EMM A2A Phase 4: Auth, Rate Limiting, Observability¶
Цикл EMM A2A:
· Phase 1: Process Manager як A2A сервер
· Phase 2: Task Manager (list_board)
· Phase 2+: TaskStore і tasks/status
· Phase 3: Stream task status
· Phase 4: Auth, Rate Limiting, Observability (ця стаття)
Контекст: незахищений A2A¶
У Phase 1-3 A2A endpoints були публічними: Agent Card, tasks/submit, tasks/status, stream, без автентифікації. Це зручно для локальної розробки, але небезпечно для production. Інші API (bookmarks, workflows, search) вже використовували X-API-Key і rate limiting. Phase 4 вирівнює A2A з ними: один механізм auth для всього backend.
Що змінено¶
1. Auth, X-API-Key¶
Усі A2A endpoints тепер вимагають заголовок X-API-Key. Той самий API_KEY з .env, що й для bookmarks/workflows. Відсутній або невалідний ключ → 401 Unauthorized.
Підхід: FastAPI Depends(), кожен endpoint оголошує dependencies=[Depends(verify_api_key), ...]. Dependency verify_api_key перевіряє заголовок до виконання handler'а. Підтримуються два формати: X-API-Key: <key> або Authorization: Bearer <key>. Порівняння ключів, constant-time (hmac.compare_digest), щоб уникнути timing attacks.
# viz/backend/viz_backend/routers/a2a.py
from viz_backend.middleware.auth import rate_limit_read, rate_limit_write, verify_api_key
@router.get(
"/.well-known/agent.json",
dependencies=[Depends(verify_api_key), Depends(rate_limit_read)],
)
async def get_agent_card(request: Request):
...
@router.post(
"/api/a2a/tasks",
dependencies=[Depends(verify_api_key), Depends(rate_limit_write)],
)
async def handle_a2a_tasks(request: Request):
...
flowchart LR
subgraph Client[A2A Client]
R[Request + X-API-Key]
end
subgraph Router[A2A Router]
AUTH{verify_api_key}
RATE[rate_limit]
HANDLER[Handler]
end
R --> AUTH
AUTH -->|"valid"| RATE
AUTH -->|"missing/invalid"| 401[401 Unauthorized]
RATE --> HANDLER
2. Rate Limiting¶
- GET (Agent Card, stream),
rate_limit_read - POST (tasks/submit, tasks/status),
rate_limit_write
Ті самі ліміти, що й для інших API. A2A-специфічні ліміти можна додати пізніше.
Реалізація: Sliding-window rate limiter за client_id (X-Forwarded-For або IP). Ліміти з env: RATE_LIMIT_READ (default 120/min), RATE_LIMIT_WRITE (60/min). При перевищенні, 429 з заголовками Retry-After та X-RateLimit-Remaining.
# viz/backend/viz_backend/middleware/auth.py
async def rate_limit_read(request: Request) -> None:
"""Rate limiter for read endpoints."""
client_id = _get_client_id(request)
if not rate_limiter.is_allowed(client_id, RATE_LIMIT_READ):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={"Retry-After": str(RATE_LIMIT_WINDOW), "X-RateLimit-Remaining": str(remaining)},
)
3. Observability, структуроване логування¶
Кожен запит логується з контекстом:
| Event | Коли | Поля |
|---|---|---|
a2a_agent_card |
GET /.well-known/agent.json | method |
a2a_request |
Початок POST /api/a2a/tasks | method, task_id |
a2a_request_complete |
Кінець обробки | method, task_id, duration_ms |
a2a_stream |
GET /stream | task_id |
Логи можна фільтрувати в CloudWatch/Datadog за event або task_id. duration_ms, для аналізу латентності.
Підхід: logger.info(..., extra={...}), структуровані поля потрапляють у JSON-логер. Паттерн: на початку handler'а, a2a_request, в кінці, a2a_request_complete з duration_ms.
# viz/backend/viz_backend/routers/a2a.py
async def handle_a2a_tasks(request: Request):
t0 = time.perf_counter()
method, params, req_id = _parse_json_rpc_request(body)
task_id = (params or {}).get("taskId") if params else None
logger.info("a2a_request", extra={"event": "a2a_request", "method": method, "task_id": task_id})
# ... обробка ...
duration_ms = (time.perf_counter() - t0) * 1000
logger.info(
"a2a_request_complete",
extra={
"event": "a2a_request_complete",
"method": method,
"task_id": task_id,
"duration_ms": round(duration_ms, 2),
},
)
return result
Послідовність: запит з auth¶
sequenceDiagram
participant C as A2A Client
participant API as FastAPI
participant Auth as verify_api_key
participant Rate as rate_limit
participant Handler as A2A Handler
C->>API: GET /.well-known/agent.json
Note over C,API: X-API-Key: <key>
API->>Auth: Depends(verify_api_key)
alt key valid
Auth-->>API: OK
API->>Rate: Depends(rate_limit_read)
Rate-->>API: OK
API->>Handler: get_agent_card()
Handler-->>C: Agent Card JSON
else key missing/invalid
Auth-->>API: 401
API-->>C: 401 Unauthorized
end
Приклад¶
# Без ключа, 401
curl -s -o /dev/null -w "%{http_code}" http://localhost:8000/.well-known/agent.json
# 401
# З ключем, 200
curl -H "X-API-Key: $API_KEY" http://localhost:8000/.well-known/agent.json
# {"name":"Expert Memory Machine",...}
# tasks/submit
curl -X POST http://localhost:8000/api/a2a/tasks \
-H "Content-Type: application/json" \
-H "X-API-Key: $API_KEY" \
-d '{"jsonrpc":"2.0","method":"tasks/submit","params":{...},"id":1}'
# stream
curl -N -H "X-API-Key: $API_KEY" http://localhost:8000/api/a2a/tasks/t-1/stream
Тести¶
Додано 3 тести на 401 без ключа:
test_agent_card_returns_401_without_api_keytest_tasks_submit_returns_401_without_api_keytest_stream_returns_401_without_api_key
Fixture a2a_api_key(monkeypatch) встановлює API_KEY для тестів; усі A2A-запити використовують headers={"X-API-Key": "test-api-key-for-a2a"}.
# viz/backend/tests/test_a2a_router.py
A2A_HEADERS = {"X-API-Key": "test-api-key-for-a2a"}
@pytest.fixture(autouse=True)
def a2a_api_key(monkeypatch):
"""Set API_KEY for A2A tests (auth required since Phase 4)."""
monkeypatch.setenv("API_KEY", "test-api-key-for-a2a")
def test_agent_card_returns_401_without_api_key(self, client):
"""GET /.well-known/agent.json without X-API-Key returns 401."""
r = client.get("/.well-known/agent.json") # без headers
assert r.status_code == 401
Breaking Change¶
Так. A2A-клієнти тепер повинні передавати X-API-Key. Раніше endpoints були без auth.
Обмеження¶
- Ті самі rate limits, що й інші API (немає A2A-специфічної настройки)
- Логування через Python
logging(OpenTelemetry, пізніше)
Що далі¶
Phase 5, Calendar, Finance, Content Classifier як A2A skills. Потім, async submit і multi-event streaming для довгих задач. OpenTelemetry, коли з'явиться потреба в distributed tracing.
