A2A Python SDK: Complete Guide with Examples
Everything you need to build A2A agents and clients in Python. Server setup, client usage, streaming, push notifications, error handling, testing, and Google ADK integration.
The a2a-sdk Python package gives you both sides of the A2A protocol: server components for building agents and client components for consuming them. This guide covers everything in the SDK with working code.
Installation
pip install a2a-sdk fastapi uvicorn
For Google ADK integration:
pip install "google-adk[a2a]"
Creating a server
An A2A server needs four components: an AgentExecutor, an AgentCard, a TaskStore, and a request handler:
# server.py
import asyncio
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.server.apps import A2AFastAPIApplication
from a2a.types import (
AgentCard, AgentCapabilities, AgentSkill,
Task, TaskStatus, TaskState, TaskStatusUpdateEvent,
)
from a2a.utils.message import new_agent_text_message
class SummaryAgent(AgentExecutor):
async def execute(self, context: RequestContext, event_queue: EventQueue):
user_text = context.message.parts[0].root.text
await event_queue.put(TaskStatusUpdateEvent(
task_id=context.task_id,
context_id=context.context_id,
status=TaskStatus(state=TaskState.working),
final=False,
))
words = user_text.split()
summary = " ".join(words[: max(len(words) // 3, 5)]) + "..."
await event_queue.put(Task(
id=context.task_id,
context_id=context.context_id,
status=TaskStatus(
state=TaskState.completed,
message=new_agent_text_message(
f"Summary: {summary}",
context.context_id, context.task_id,
),
),
))
async def cancel(self, context: RequestContext, event_queue: EventQueue):
await event_queue.put(Task(
id=context.task_id,
context_id=context.context_id,
status=TaskStatus(state=TaskState.canceled),
))
agent_card = AgentCard(
name="Summary Agent",
description="Summarizes text input into shorter form.",
url="http://localhost:8000",
version="1.0.0",
capabilities=AgentCapabilities(streaming=True, pushNotifications=False),
skills=[AgentSkill(
id="summarize", name="Summarize Text",
description="Takes long text and returns a shorter summary.",
tags=["text", "summarization"],
)],
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
)
handler = DefaultRequestHandler(
agent_executor=SummaryAgent(),
task_store=InMemoryTaskStore(),
)
app = A2AFastAPIApplication(agent_card, handler).build()
uvicorn server:app --port 8000
The AgentExecutor interface
Two methods to implement:
execute(context, event_queue)— handles incoming messages, puts results on the queuecancel(context, event_queue)— handles task cancellation
The RequestContext gives you task_id, context_id (for multi-turn), message (the incoming Message), and task (current Task if resuming).
Task states
submitted -> working -> completed
-> failed
-> input-required -> working -> completed
-> canceled
Report states by putting events on the queue. Use TaskStatusUpdateEvent with final=False for intermediate updates, and Task for terminal states:
# Ask for more input (multi-turn)
await event_queue.put(Task(
id=context.task_id, context_id=context.context_id,
status=TaskStatus(
state=TaskState.input_required,
message=new_agent_text_message(
"Which format? JSON or plain text?",
context.context_id, context.task_id,
),
),
))
The input-required state is how you build multi-turn agents. The client sees it and sends a follow-up with the same context_id.
Creating a client
Basic send
import asyncio
from a2a.client import A2AClient
async def main():
async with A2AClient(url="http://localhost:8000") as client:
card = await client.get_agent_card()
print(f"Agent: {card.name}, streaming={card.capabilities.streaming}")
response = await client.send_message(message={
"role": "user",
"parts": [{"kind": "text", "text": "Summarize the A2A protocol."}],
})
task = response.result
print(f"Result: {task.status.message.parts[0].root.text}")
asyncio.run(main())
Using ClientFactory
ClientFactory auto-discovers the agent card and negotiates the best transport:
from a2a.client import ClientFactory, create_text_message_object
async def main():
client = await ClientFactory.connect("http://localhost:8000")
try:
message = create_text_message_object(content="Explain A2A in one sentence.")
async for event in client.send_message(message):
if isinstance(event, tuple):
task, update = event
if task.status.state == "completed":
print(task.status.message.parts[0].root.text)
finally:
await client.close()
Streaming
Streaming uses Server-Sent Events. Put multiple events for chunked responses:
class StreamingAgent(AgentExecutor):
async def execute(self, context: RequestContext, event_queue: EventQueue):
user_text = context.message.parts[0].root.text
await event_queue.put(TaskStatusUpdateEvent(
task_id=context.task_id, context_id=context.context_id,
status=TaskStatus(state=TaskState.working), final=False,
))
chunks = ["Processing... ", "Analyzing... ", "Done."]
accumulated = ""
for i, chunk in enumerate(chunks):
await asyncio.sleep(0.3)
accumulated += chunk
if i == len(chunks) - 1:
await event_queue.put(Task(
id=context.task_id, context_id=context.context_id,
status=TaskStatus(
state=TaskState.completed,
message=new_agent_text_message(
accumulated, context.context_id, context.task_id,
),
),
))
else:
await event_queue.put(TaskStatusUpdateEvent(
task_id=context.task_id, context_id=context.context_id,
status=TaskStatus(
state=TaskState.working,
message=new_agent_text_message(
accumulated, context.context_id, context.task_id,
),
),
final=False,
))
Client-side streaming
from a2a.client import ClientFactory, create_text_message_object
from a2a.types import TaskState
async def stream_response():
client = await ClientFactory.connect("http://localhost:8000")
try:
message = create_text_message_object(content="Detailed analysis please.")
async for event in client.send_message(message):
if isinstance(event, tuple):
task, update = event
if task.status.state == TaskState.working and task.status.message:
print(f"[working] {task.status.message.parts[0].root.text}")
elif task.status.state == TaskState.completed:
print(f"[done] {task.status.message.parts[0].root.text}")
break
finally:
await client.close()
Push notifications
For long-running tasks, register a webhook instead of holding an SSE connection:
from a2a.types import PushNotificationConfig
# Set pushNotifications=True in your AgentCard capabilities, then:
config = PushNotificationConfig(
url="https://my-app.example.com/webhooks/a2a",
events=["task.completed", "task.failed"],
)
await client.create_push_notification_config(task_id=task.id, config=config)
The agent POSTs task updates to your webhook URL as JSON.
Error handling
Catch errors on the client side. Report errors through task state on the server side:
# Client-side
from a2a.types import A2AError
import httpx
async def safe_send(url: str, text: str):
try:
async with A2AClient(url=url) as client:
response = await client.send_message(
message={"role": "user", "parts": [{"kind": "text", "text": text}]}
)
task = response.result
if task.status.state == "failed":
print(f"Task failed: {task.status.message}")
return None
return task
except httpx.ConnectError:
print(f"Cannot connect to {url}")
except A2AError as e:
print(f"A2A protocol error: {e}")
return None
# Server-side — report errors as failed tasks, don't raise
class RobustAgent(AgentExecutor):
async def execute(self, context: RequestContext, event_queue: EventQueue):
try:
user_text = context.message.parts[0].root.text
result = self.process(user_text)
await event_queue.put(Task(
id=context.task_id, context_id=context.context_id,
status=TaskStatus(
state=TaskState.completed,
message=new_agent_text_message(result, context.context_id, context.task_id),
),
))
except Exception as e:
await event_queue.put(Task(
id=context.task_id, context_id=context.context_id,
status=TaskStatus(
state=TaskState.failed,
message=new_agent_text_message(
f"Error: {e}", context.context_id, context.task_id,
),
),
))
Testing agents
# test_agent.py
import pytest
import httpx
BASE_URL = "http://localhost:8000"
def test_agent_card():
response = httpx.get(f"{BASE_URL}/.well-known/agent-card.json")
assert response.status_code == 200
card = response.json()
assert "name" in card
assert len(card["skills"]) > 0
def test_message_send():
payload = {
"jsonrpc": "2.0", "id": "test-1",
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{"kind": "text", "text": "Test input"}],
}
},
}
response = httpx.post(BASE_URL, json=payload, timeout=30)
assert response.status_code == 200
result = response.json()
assert result["result"]["status"]["state"] == "completed"
Google ADK integration
The ADK wraps all server boilerplate into one function call:
from google.adk import Agent
from google.adk.a2a.utils.agent_to_a2a import to_a2a
agent = Agent(
model="gemini-2.0-flash",
name="research_helper",
description="Answers research questions with structured analysis.",
instruction="Provide clear, well-structured answers.",
)
app = to_a2a(agent, port=8001)
To consume an ADK agent from another ADK agent:
from google.adk import Agent
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
remote = RemoteA2aAgent(
name="remote_research",
description="Remote research assistant",
agent_card_url="http://localhost:8001/.well-known/agent-card.json",
)
coordinator = Agent(
model="gemini-2.0-flash",
name="coordinator",
description="Delegates research tasks to remote agents.",
instruction="Delegate research questions to the remote_research agent.",
sub_agents=[remote],
)
See the Google ADK stack page for full setup and advanced patterns.
Next steps
- A2A Protocol Tutorial — the quick-start version
- A2A TypeScript SDK Guide — TypeScript server and client implementations
- Deploy to Production — Docker, TLS, health checks, monitoring
- Agent Card JSON Schema Reference — every field documented
- Browse agents and stacks for real-world examples
Related Stacks
Related posts
Google ADK Tutorial: Build and Deploy A2A Agents
Comprehensive Google ADK tutorial for 2026. Installation, agent definition, tools, sub-agents, A2A exposure, custom Agent Cards, deployment, and consuming remote agents.
A2A TypeScript SDK: Server and Client Examples
Build A2A agents and clients in TypeScript. Server setup with Express and Hono, client usage, streaming with ReadableStream, type safety with the protocol types.
A2A Protocol Tutorial: Your First Agent in 15 Minutes
Build your first A2A agent from scratch. Install the SDK, create an agent, expose it over A2A, test with curl, and consume it from another agent — all working code.