Перейти до змісту

EMM A2A Phase 4: Auth, Rate Limiting, Observability

Українська версія

EMM A2A Phase 4: Auth, rate limiting, observability

EMM A2A cycle:
· Phase 1: Process Manager as A2A Server
· Phase 2: Task Manager (list_board)
· Phase 2+: TaskStore and tasks/status
· Phase 3: Stream task status
· Phase 4: Auth, Rate Limiting, Observability (this article)

Context: Unprotected A2A

In Phase 1-3 A2A endpoints were public: Agent Card, tasks/submit, tasks/status, stream, no authentication. Convenient for local dev, but unsafe for production. Other APIs (bookmarks, workflows, search) already used X-API-Key and rate limiting. Phase 4 aligns A2A with them: one auth mechanism for the whole backend.

What Changed

1. Auth, X-API-Key

All A2A endpoints now require the X-API-Key header. Same API_KEY from .env as for bookmarks/workflows. Missing or invalid key → 401 Unauthorized.

Approach: FastAPI Depends(), each endpoint declares dependencies=[Depends(verify_api_key), ...]. The verify_api_key dependency checks the header before the handler runs. Two formats supported: X-API-Key: <key> or Authorization: Bearer <key>. Key comparison uses constant-time hmac.compare_digest to prevent timing attacks.

# viz/backend/viz_backend/routers/a2a.py
from viz_backend.middleware.auth import rate_limit_read, rate_limit_write, verify_api_key

@router.get(
    "/.well-known/agent.json",
    dependencies=[Depends(verify_api_key), Depends(rate_limit_read)],
)
async def get_agent_card(request: Request):
    ...

@router.post(
    "/api/a2a/tasks",
    dependencies=[Depends(verify_api_key), Depends(rate_limit_write)],
)
async def handle_a2a_tasks(request: Request):
    ...
flowchart LR
    subgraph Client[A2A Client]
        R[Request + X-API-Key]
    end

    subgraph Router[A2A Router]
        AUTH{verify_api_key}
        RATE[rate_limit]
        HANDLER[Handler]
    end

    R --> AUTH
    AUTH -->|"valid"| RATE
    AUTH -->|"missing/invalid"| 401[401 Unauthorized]
    RATE --> HANDLER

2. Rate Limiting

  • GET (Agent Card, stream), rate_limit_read
  • POST (tasks/submit, tasks/status), rate_limit_write

Same limits as other APIs. A2A-specific limits can be added later.

Implementation: Sliding-window rate limiter by client_id (X-Forwarded-For or IP). Limits from env: RATE_LIMIT_READ (default 120/min), RATE_LIMIT_WRITE (60/min). On exceed, 429 with Retry-After and X-RateLimit-Remaining headers.

# viz/backend/viz_backend/middleware/auth.py
async def rate_limit_read(request: Request) -> None:
    """Rate limiter for read endpoints."""
    client_id = _get_client_id(request)
    if not rate_limiter.is_allowed(client_id, RATE_LIMIT_READ):
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded",
            headers={"Retry-After": str(RATE_LIMIT_WINDOW), "X-RateLimit-Remaining": str(remaining)},
        )

3. Observability, Structured Logging

Each request is logged with context:

Event When Fields
a2a_agent_card GET /.well-known/agent.json method
a2a_request Start of POST /api/a2a/tasks method, task_id
a2a_request_complete End of processing method, task_id, duration_ms
a2a_stream GET /stream task_id

Logs can be filtered in CloudWatch/Datadog by event or task_id. duration_ms, for latency analysis.

Approach: logger.info(..., extra={...}), structured fields go to the JSON logger. Pattern: at handler start, a2a_request, at end, a2a_request_complete with duration_ms.

# viz/backend/viz_backend/routers/a2a.py
async def handle_a2a_tasks(request: Request):
    t0 = time.perf_counter()
    method, params, req_id = _parse_json_rpc_request(body)
    task_id = (params or {}).get("taskId") if params else None

    logger.info("a2a_request", extra={"event": "a2a_request", "method": method, "task_id": task_id})
    # ... processing ...
    duration_ms = (time.perf_counter() - t0) * 1000
    logger.info(
        "a2a_request_complete",
        extra={
            "event": "a2a_request_complete",
            "method": method,
            "task_id": task_id,
            "duration_ms": round(duration_ms, 2),
        },
    )
    return result

Sequence: Request with Auth

sequenceDiagram
    participant C as A2A Client
    participant API as FastAPI
    participant Auth as verify_api_key
    participant Rate as rate_limit
    participant Handler as A2A Handler

    C->>API: GET /.well-known/agent.json
    Note over C,API: X-API-Key: <key>

    API->>Auth: Depends(verify_api_key)
    alt key valid
        Auth-->>API: OK
        API->>Rate: Depends(rate_limit_read)
        Rate-->>API: OK
        API->>Handler: get_agent_card()
        Handler-->>C: Agent Card JSON
    else key missing/invalid
        Auth-->>API: 401
        API-->>C: 401 Unauthorized
    end

Example

# Without key, 401
curl -s -o /dev/null -w "%{http_code}" http://localhost:8000/.well-known/agent.json
# 401

# With key, 200
curl -H "X-API-Key: $API_KEY" http://localhost:8000/.well-known/agent.json
# {"name":"Expert Memory Machine",...}

# tasks/submit
curl -X POST http://localhost:8000/api/a2a/tasks \
  -H "Content-Type: application/json" \
  -H "X-API-Key: $API_KEY" \
  -d '{"jsonrpc":"2.0","method":"tasks/submit","params":{...},"id":1}'

# stream
curl -N -H "X-API-Key: $API_KEY" http://localhost:8000/api/a2a/tasks/t-1/stream

Tests

Added 3 tests for 401 without key:

  • test_agent_card_returns_401_without_api_key
  • test_tasks_submit_returns_401_without_api_key
  • test_stream_returns_401_without_api_key

Fixture a2a_api_key(monkeypatch) sets API_KEY for tests; all A2A requests use headers={"X-API-Key": "test-api-key-for-a2a"}.

# viz/backend/tests/test_a2a_router.py
A2A_HEADERS = {"X-API-Key": "test-api-key-for-a2a"}

@pytest.fixture(autouse=True)
def a2a_api_key(monkeypatch):
    """Set API_KEY for A2A tests (auth required since Phase 4)."""
    monkeypatch.setenv("API_KEY", "test-api-key-for-a2a")

def test_agent_card_returns_401_without_api_key(self, client):
    """GET /.well-known/agent.json without X-API-Key returns 401."""
    r = client.get("/.well-known/agent.json")  # no headers
    assert r.status_code == 401

Breaking Change

Yes. A2A clients must now send X-API-Key. Previously endpoints were unauthenticated.

Limitations

  • Same rate limits as other APIs (no A2A-specific tuning)
  • Logging via Python logging (OpenTelemetry, later)

What's Next

Phase 5, Calendar, Finance, Content Classifier as A2A skills. Then, async submit and multi-event streaming for long tasks. OpenTelemetry, when distributed tracing is needed.