Skip to content

EMM A2A Phase 4: Auth, Rate Limiting, Observability

English version

EMM A2A Phase 4: автентифікація, rate limiting, observability

Цикл EMM A2A:
· Phase 1: Process Manager як A2A сервер
· Phase 2: Task Manager (list_board)
· Phase 2+: TaskStore і tasks/status
· Phase 3: Stream task status
· Phase 4: Auth, Rate Limiting, Observability (ця стаття)

Контекст: незахищений A2A

У Phase 1-3 A2A endpoints були публічними: Agent Card, tasks/submit, tasks/status, stream, без автентифікації. Це зручно для локальної розробки, але небезпечно для production. Інші API (bookmarks, workflows, search) вже використовували X-API-Key і rate limiting. Phase 4 вирівнює A2A з ними: один механізм auth для всього backend.

Що змінено

1. Auth, X-API-Key

Усі A2A endpoints тепер вимагають заголовок X-API-Key. Той самий API_KEY з .env, що й для bookmarks/workflows. Відсутній або невалідний ключ → 401 Unauthorized.

Підхід: FastAPI Depends(), кожен endpoint оголошує dependencies=[Depends(verify_api_key), ...]. Dependency verify_api_key перевіряє заголовок до виконання handler'а. Підтримуються два формати: X-API-Key: <key> або Authorization: Bearer <key>. Порівняння ключів, constant-time (hmac.compare_digest), щоб уникнути timing attacks.

# viz/backend/viz_backend/routers/a2a.py
from viz_backend.middleware.auth import rate_limit_read, rate_limit_write, verify_api_key

@router.get(
    "/.well-known/agent.json",
    dependencies=[Depends(verify_api_key), Depends(rate_limit_read)],
)
async def get_agent_card(request: Request):
    ...

@router.post(
    "/api/a2a/tasks",
    dependencies=[Depends(verify_api_key), Depends(rate_limit_write)],
)
async def handle_a2a_tasks(request: Request):
    ...
flowchart LR
    subgraph Client[A2A Client]
        R[Request + X-API-Key]
    end

    subgraph Router[A2A Router]
        AUTH{verify_api_key}
        RATE[rate_limit]
        HANDLER[Handler]
    end

    R --> AUTH
    AUTH -->|"valid"| RATE
    AUTH -->|"missing/invalid"| 401[401 Unauthorized]
    RATE --> HANDLER

2. Rate Limiting

  • GET (Agent Card, stream), rate_limit_read
  • POST (tasks/submit, tasks/status), rate_limit_write

Ті самі ліміти, що й для інших API. A2A-специфічні ліміти можна додати пізніше.

Реалізація: Sliding-window rate limiter за client_id (X-Forwarded-For або IP). Ліміти з env: RATE_LIMIT_READ (default 120/min), RATE_LIMIT_WRITE (60/min). При перевищенні, 429 з заголовками Retry-After та X-RateLimit-Remaining.

# viz/backend/viz_backend/middleware/auth.py
async def rate_limit_read(request: Request) -> None:
    """Rate limiter for read endpoints."""
    client_id = _get_client_id(request)
    if not rate_limiter.is_allowed(client_id, RATE_LIMIT_READ):
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded",
            headers={"Retry-After": str(RATE_LIMIT_WINDOW), "X-RateLimit-Remaining": str(remaining)},
        )

3. Observability, структуроване логування

Кожен запит логується з контекстом:

Event Коли Поля
a2a_agent_card GET /.well-known/agent.json method
a2a_request Початок POST /api/a2a/tasks method, task_id
a2a_request_complete Кінець обробки method, task_id, duration_ms
a2a_stream GET /stream task_id

Логи можна фільтрувати в CloudWatch/Datadog за event або task_id. duration_ms, для аналізу латентності.

Підхід: logger.info(..., extra={...}), структуровані поля потрапляють у JSON-логер. Паттерн: на початку handler'а, a2a_request, в кінці, a2a_request_complete з duration_ms.

# viz/backend/viz_backend/routers/a2a.py
async def handle_a2a_tasks(request: Request):
    t0 = time.perf_counter()
    method, params, req_id = _parse_json_rpc_request(body)
    task_id = (params or {}).get("taskId") if params else None

    logger.info("a2a_request", extra={"event": "a2a_request", "method": method, "task_id": task_id})
    # ... обробка ...
    duration_ms = (time.perf_counter() - t0) * 1000
    logger.info(
        "a2a_request_complete",
        extra={
            "event": "a2a_request_complete",
            "method": method,
            "task_id": task_id,
            "duration_ms": round(duration_ms, 2),
        },
    )
    return result

Послідовність: запит з auth

sequenceDiagram
    participant C as A2A Client
    participant API as FastAPI
    participant Auth as verify_api_key
    participant Rate as rate_limit
    participant Handler as A2A Handler

    C->>API: GET /.well-known/agent.json
    Note over C,API: X-API-Key: <key>

    API->>Auth: Depends(verify_api_key)
    alt key valid
        Auth-->>API: OK
        API->>Rate: Depends(rate_limit_read)
        Rate-->>API: OK
        API->>Handler: get_agent_card()
        Handler-->>C: Agent Card JSON
    else key missing/invalid
        Auth-->>API: 401
        API-->>C: 401 Unauthorized
    end

Приклад

# Без ключа, 401
curl -s -o /dev/null -w "%{http_code}" http://localhost:8000/.well-known/agent.json
# 401

# З ключем, 200
curl -H "X-API-Key: $API_KEY" http://localhost:8000/.well-known/agent.json
# {"name":"Expert Memory Machine",...}

# tasks/submit
curl -X POST http://localhost:8000/api/a2a/tasks \
  -H "Content-Type: application/json" \
  -H "X-API-Key: $API_KEY" \
  -d '{"jsonrpc":"2.0","method":"tasks/submit","params":{...},"id":1}'

# stream
curl -N -H "X-API-Key: $API_KEY" http://localhost:8000/api/a2a/tasks/t-1/stream

Тести

Додано 3 тести на 401 без ключа:

  • test_agent_card_returns_401_without_api_key
  • test_tasks_submit_returns_401_without_api_key
  • test_stream_returns_401_without_api_key

Fixture a2a_api_key(monkeypatch) встановлює API_KEY для тестів; усі A2A-запити використовують headers={"X-API-Key": "test-api-key-for-a2a"}.

# viz/backend/tests/test_a2a_router.py
A2A_HEADERS = {"X-API-Key": "test-api-key-for-a2a"}

@pytest.fixture(autouse=True)
def a2a_api_key(monkeypatch):
    """Set API_KEY for A2A tests (auth required since Phase 4)."""
    monkeypatch.setenv("API_KEY", "test-api-key-for-a2a")

def test_agent_card_returns_401_without_api_key(self, client):
    """GET /.well-known/agent.json without X-API-Key returns 401."""
    r = client.get("/.well-known/agent.json")  # без headers
    assert r.status_code == 401

Breaking Change

Так. A2A-клієнти тепер повинні передавати X-API-Key. Раніше endpoints були без auth.

Обмеження

  • Ті самі rate limits, що й інші API (немає A2A-специфічної настройки)
  • Логування через Python logging (OpenTelemetry, пізніше)

Що далі

Phase 5, Calendar, Finance, Content Classifier як A2A skills. Потім, async submit і multi-event streaming для довгих задач. OpenTelemetry, коли з'явиться потреба в distributed tracing.