EMM A2A Phase 4: Auth, Rate Limiting, Observability¶
EMM A2A cycle:
· Phase 1: Process Manager as A2A Server
· Phase 2: Task Manager (list_board)
· Phase 2+: TaskStore and tasks/status
· Phase 3: Stream task status
· Phase 4: Auth, Rate Limiting, Observability (this article)
Context: Unprotected A2A¶
In Phase 1-3 A2A endpoints were public: Agent Card, tasks/submit, tasks/status, stream, no authentication. Convenient for local dev, but unsafe for production. Other APIs (bookmarks, workflows, search) already used X-API-Key and rate limiting. Phase 4 aligns A2A with them: one auth mechanism for the whole backend.
What Changed¶
1. Auth, X-API-Key¶
All A2A endpoints now require the X-API-Key header. Same API_KEY from .env as for bookmarks/workflows. Missing or invalid key → 401 Unauthorized.
Approach: FastAPI Depends(), each endpoint declares dependencies=[Depends(verify_api_key), ...]. The verify_api_key dependency checks the header before the handler runs. Two formats supported: X-API-Key: <key> or Authorization: Bearer <key>. Key comparison uses constant-time hmac.compare_digest to prevent timing attacks.
# viz/backend/viz_backend/routers/a2a.py
from viz_backend.middleware.auth import rate_limit_read, rate_limit_write, verify_api_key
@router.get(
"/.well-known/agent.json",
dependencies=[Depends(verify_api_key), Depends(rate_limit_read)],
)
async def get_agent_card(request: Request):
...
@router.post(
"/api/a2a/tasks",
dependencies=[Depends(verify_api_key), Depends(rate_limit_write)],
)
async def handle_a2a_tasks(request: Request):
...
flowchart LR
subgraph Client[A2A Client]
R[Request + X-API-Key]
end
subgraph Router[A2A Router]
AUTH{verify_api_key}
RATE[rate_limit]
HANDLER[Handler]
end
R --> AUTH
AUTH -->|"valid"| RATE
AUTH -->|"missing/invalid"| 401[401 Unauthorized]
RATE --> HANDLER
2. Rate Limiting¶
- GET (Agent Card, stream),
rate_limit_read - POST (tasks/submit, tasks/status),
rate_limit_write
Same limits as other APIs. A2A-specific limits can be added later.
Implementation: Sliding-window rate limiter by client_id (X-Forwarded-For or IP). Limits from env: RATE_LIMIT_READ (default 120/min), RATE_LIMIT_WRITE (60/min). On exceed, 429 with Retry-After and X-RateLimit-Remaining headers.
# viz/backend/viz_backend/middleware/auth.py
async def rate_limit_read(request: Request) -> None:
"""Rate limiter for read endpoints."""
client_id = _get_client_id(request)
if not rate_limiter.is_allowed(client_id, RATE_LIMIT_READ):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={"Retry-After": str(RATE_LIMIT_WINDOW), "X-RateLimit-Remaining": str(remaining)},
)
3. Observability, Structured Logging¶
Each request is logged with context:
| Event | When | Fields |
|---|---|---|
a2a_agent_card |
GET /.well-known/agent.json | method |
a2a_request |
Start of POST /api/a2a/tasks | method, task_id |
a2a_request_complete |
End of processing | method, task_id, duration_ms |
a2a_stream |
GET /stream | task_id |
Logs can be filtered in CloudWatch/Datadog by event or task_id. duration_ms, for latency analysis.
Approach: logger.info(..., extra={...}), structured fields go to the JSON logger. Pattern: at handler start, a2a_request, at end, a2a_request_complete with duration_ms.
# viz/backend/viz_backend/routers/a2a.py
async def handle_a2a_tasks(request: Request):
t0 = time.perf_counter()
method, params, req_id = _parse_json_rpc_request(body)
task_id = (params or {}).get("taskId") if params else None
logger.info("a2a_request", extra={"event": "a2a_request", "method": method, "task_id": task_id})
# ... processing ...
duration_ms = (time.perf_counter() - t0) * 1000
logger.info(
"a2a_request_complete",
extra={
"event": "a2a_request_complete",
"method": method,
"task_id": task_id,
"duration_ms": round(duration_ms, 2),
},
)
return result
Sequence: Request with Auth¶
sequenceDiagram
participant C as A2A Client
participant API as FastAPI
participant Auth as verify_api_key
participant Rate as rate_limit
participant Handler as A2A Handler
C->>API: GET /.well-known/agent.json
Note over C,API: X-API-Key: <key>
API->>Auth: Depends(verify_api_key)
alt key valid
Auth-->>API: OK
API->>Rate: Depends(rate_limit_read)
Rate-->>API: OK
API->>Handler: get_agent_card()
Handler-->>C: Agent Card JSON
else key missing/invalid
Auth-->>API: 401
API-->>C: 401 Unauthorized
end
Example¶
# Without key, 401
curl -s -o /dev/null -w "%{http_code}" http://localhost:8000/.well-known/agent.json
# 401
# With key, 200
curl -H "X-API-Key: $API_KEY" http://localhost:8000/.well-known/agent.json
# {"name":"Expert Memory Machine",...}
# tasks/submit
curl -X POST http://localhost:8000/api/a2a/tasks \
-H "Content-Type: application/json" \
-H "X-API-Key: $API_KEY" \
-d '{"jsonrpc":"2.0","method":"tasks/submit","params":{...},"id":1}'
# stream
curl -N -H "X-API-Key: $API_KEY" http://localhost:8000/api/a2a/tasks/t-1/stream
Tests¶
Added 3 tests for 401 without key:
test_agent_card_returns_401_without_api_keytest_tasks_submit_returns_401_without_api_keytest_stream_returns_401_without_api_key
Fixture a2a_api_key(monkeypatch) sets API_KEY for tests; all A2A requests use headers={"X-API-Key": "test-api-key-for-a2a"}.
# viz/backend/tests/test_a2a_router.py
A2A_HEADERS = {"X-API-Key": "test-api-key-for-a2a"}
@pytest.fixture(autouse=True)
def a2a_api_key(monkeypatch):
"""Set API_KEY for A2A tests (auth required since Phase 4)."""
monkeypatch.setenv("API_KEY", "test-api-key-for-a2a")
def test_agent_card_returns_401_without_api_key(self, client):
"""GET /.well-known/agent.json without X-API-Key returns 401."""
r = client.get("/.well-known/agent.json") # no headers
assert r.status_code == 401
Breaking Change¶
Yes. A2A clients must now send X-API-Key. Previously endpoints were unauthenticated.
Limitations¶
- Same rate limits as other APIs (no A2A-specific tuning)
- Logging via Python
logging(OpenTelemetry, later)
What's Next¶
Phase 5, Calendar, Finance, Content Classifier as A2A skills. Then, async submit and multi-event streaming for long tasks. OpenTelemetry, when distributed tracing is needed.
