A2A Agent Orchestration System
34by Khaledayman9
A distributed multi-agent system that orchestrates complex tasks across specialized AI agents using A2A (Agent-to-Agent) protocol, MCP (Model Context Protocol) and LangGraph.
Getting Started
README
A2A Agent Orchestration System
A distributed multi-agent system that orchestrates complex tasks across specialized AI agents using A2A (Agent-to-Agent) protocol, MCP (Model Context Protocol) and LangGraph.
Architecture Overview
User Query → Orchestrator Agent → [Math Agent | Weather Agent | ...] → Coordinated Response
System Flow
- User Input: Natural language queries are received by the orchestrator
- Planning: Orchestrator analyzes the query and creates an execution plan
- Task Distribution: Tasks are distributed to appropriate specialized agents
- Parallel/Sequential Execution: Tasks execute based on dependencies
- Response Coordination: Results are combined and returned to the user
Core Components
Common Utilities:
1. The BaseAgent:
- Features:
- LLM Initialization: Supports both OpenAI (GPT) and Google (Gemini) models
- Memory Management: Uses LangGraph's
MemorySaverfor conversation persistence - Async Lifecycle: Handles async initialization with
_ensure_initialized() - Tool Integration: Abstract methods for tool and prompt definition
- Response Processing: Standardized response handling pipeline
2. BaseAgentExecutor:
- Features:
- Context Management: Processes
RequestContextwith user input and session data - Event Streaming: Uses
EventQueueto stream responses back to client - Error Handling: Converts exceptions to proper A2A error formats
- Lifecycle Management: Ensures agents are properly initialized before execution
- Context Management: Processes
3. BaseAgentServer:
- Features:
- Agent Card Loading: Dynamically loads configuration from JSON files
- Server Lifecycle: Manages uvicorn server startup/shutdown
- A2A Integration: Creates
A2AStarletteApplicationwith proper handlers - Request Routing: Uses
DefaultRequestHandlerfor A2A protocol compliance
4. Agent Card System:
- Agent cards define:
- Capabilities: What the agent can do (streaming, multimodal, etc.)
- Skills: Specific functions with examples and tags
- Endpoints: URL and communication preferences
- Metadata: Version, description, supported modes
Agents
1. Orchestrator Agent (Port 10003)
- Purpose: Central coordinator that plans and executes complex multi-agent workflows
- Capabilities:
- Query analysis and task decomposition
- Intelligent agent routing
- Parallel and sequential task execution
- Dependency management
- Model: GPT-4.1
- Skills: Task planning, agent routing
2. Math Agent (Port 10004)
- Purpose: Specialized mathematical computation agent
- Capabilities: Arithmetic operations and power calculations
- Tools: add, subtract, multiply, divide, square, cube, power
- Model: GPT-4.1
- Response Format: Structured math output with step-by-step solutions
3. Weather Agent (Port 10005)
- Purpose: Weather information retrieval using MCP (Model Context Protocol)
- Capabilities: Current weather and forecasts
- Tools: MCP weather server integration
- Model: GPT-4.1
- Skills: Weather queries for any location
Inter-Agent Communication
RemoteAgentConnection
The orchestrator communicates with other agents via HTTP using the A2A protocol:
Communication flow:
- Discovery:
create_from_url()fetches agent card from/agent-cardendpoint - Connection: Establishes persistent HTTP client with 600-second timeout
- Message Sending:
send_message()sends A2A-formatted requests - Response Processing: Extracts text from structured A2A response format
Message Flow
Orchestrator → HTTP POST /send-message → Agent Server
↓
A2A Protocol Message
↓
{
"id": "unique-id",
"params": {
"message": {
"role": "user",
"parts": [{"text": "user input"}]
}
}
}
Response Processing
Agents return structured responses that get processed through multiple layers:
- LangGraph Output: Returns structured format (e.g.,
MathResponseFormat) - Agent Processing:
_process_response()extracts relevant content - A2A Wrapping: Content gets wrapped in A2A message format
- HTTP Response: Final JSON response sent over HTTP
Execution Pipeline
Orchestrator Workflow
-
Planning Phase:
- LLM analyzes query and creates
ExecutionPlan - Tasks assigned to appropriate agents based on capabilities
- Dependencies calculated for proper ordering
- LLM analyzes query and creates
-
Execution Phase:
- Dependency graph built from task relationships
- Ready tasks (no pending dependencies) identified
- Parallel execution using
asyncio.gather() - Results collected and dependencies updated
-
Coordination Phase:
- Results from dependent tasks passed to subsequent tasks
- Final response assembled from all task outputs
- Summary and status returned to user
Parallel vs Sequential Execution Examples
-
Case 1: Testing a single agent
- Input:
What is 5 + 7?- Result:
Result: context_id=None extensions=None kind='message' message_id='6a628346-4caa-4f2e-be2b-ac75dfc7f01b' metadata=None parts=[Part(root=TextPart(kind='text', metadata=None, text='Execution Summary: A single math calculation task to compute the sum of 5 and 7.\n\nTask 1 (Math Agent): 5 + 7 = 12\n'))] reference_task_ids=None role=<Role.agent: 'agent'> task_id=None -
Case 2: Testing multiple agents with concurrent tasks
- Input:
Calculate 3 * 4 and tell me the weather in New York- Result:
context_id=None extensions=None kind='message' message_id='d59c464a-b0e7-4ef2-9a2b-394a518c7bec' metadata=None parts=[Part(root=TextPart(kind='text', metadata=None, text='Execution Summary: First, calculate 3 * 4 using the Math Agent. Second, get the current weather in New York using the Weather Agent. Both tasks are independent and can be executed in parallel.\n\nTask 1 (Math Agent): 3 * 4 = 12\nTask 2 (Weather Agent): It seems there was an issue retrieving the weather for New York. Could you please try again later?\n'))] reference_task_ids=None role=<Role.agent: 'agent'> task_id=None -
Case 3: Testing multiple agents with sequential (dependent) tasks
- Input:
First calculate 3 × 4. Then, using that result as the day number of this month, tell me the weather in Cairo on that day.- Result:
Result: context_id=None extensions=None kind='message' message_id='406de694-0deb-45be-a360-6f22345e0219' metadata=None parts=[Part(root=TextPart(kind='text', metadata=None, text='Execution Summary: First, calculate 3 × 4 to get 12. Then, get the weather in Cairo on the 12th day of this month.\n\nTask 1 (Math Agent): 3 × 4 = 12\nTask 2 (Weather Agent): The weather forecast for Cairo on the 12th day of this month is currently unavailable. Please try again later or provide additional details for assistance.\n'))] reference_task_ids=None role=<Role.agent: 'agent'> task_id=None
Directory Structure
A2A-Orchestrator/
├── a2a_server/ # Core package
│ ├── agent_cards/ # Agent configuration
│ │ ├── math_agent_card.json
│ │ ├── orchestrator_agent_card.json
│ │ └── weather_agent_card.json
│ ├── agents/ # Agent implementations
│ │ ├── math_agent_server/
│ │ ├── orchestrator_agent_server/
│ │ └── weather_agent_server/
│ ├── common/ # Shared utilities
│ │ ├── agent_card_loader.py
│ │ ├── base_agent.py
│ │ ├── base_agent_executor.py
│ │ ├── base_agent_server.py
│ │ ├── models.py
│ │ ├── prompts.py
│ │ └── remote_agent_connection.py
│ └── mcp/ # Model Context Protocol
│ ├── servers/
│ │ └── weather.py
│ └── servers.json
├── a2a_server_manager.py # Main server manager
├── test_a2a_server.py # Integration tests
├── logger.py # Debugging code
├── requirements.txt
├── pyproject.toml
├── README.md
└── settings.py
Key Features
Intelligent Orchestration
- Dynamic Planning: Automatically breaks down complex queries into executable tasks
- Dependency Management: Handles sequential and parallel task execution
- Agent Discovery: Automatically discovers and utilizes available specialized agents
Parallel Execution
- Independent Tasks: Run simultaneously for optimal performance
- Dependency Resolution: Sequential execution when tasks depend on previous results
- Mixed Execution: Combines parallel and sequential patterns as needed
Extensible Architecture
- Plugin System: Easy to add new specialized agents
- MCP Integration: Supports Model Context Protocol for external tool integration
- Agent Cards: JSON-based agent capability descriptions
Installation & Setup
Prerequisites
- Python 3.10+
- UV package manager (recommended) or pip
Environment Setup
-
Clone and navigate to the project:
git clone <repository-url> cd A2A-Orchestrator -
Set up environment variables: Create a
.envfile and set environment variables:# settings.py OPENAI_API_KEY = "your-openai-key" OPENAI_BASE_URL = "https://api.openai.com/v1" # Optional GOOGLE_API_KEY = "your-google-key" # For Gemini models
Installation Methods
Option 1: Using UV (Recommended)
# Install UV if not already installed
curl -LsSf https://astral.sh/uv/install.sh | sh
# Install dependencies
uv sync
# Activate virtual environment
source .venv/bin/activate # On Windows: .venv\Scripts\activate
Option 2: Using Python/Pip
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
Running the System
Start All Servers
# Using UV
uv run python a2a_server_manager.py
# Using Python
python a2a_server_manager.py
This starts all agent servers simultaneously:
- Orchestrator Agent: localhost:10003
- Math Agent: localhost:10004
- Weather Agent: localhost:10005
Start Individual Agents (Alternative)
# Math Agent only
uv run python -m a2a_server.agents.math_agent_server
# Weather Agent only
uv run python -m a2a_server.agents.weather_agent_server
# Orchestrator only
uv run python -m a2a_server.agents.orchestrator_agent_server
Testing
Run Integration Tests
# Make sure all servers are running first
python test_a2a_server.py
Manual Testing
# Test individual agents via HTTP API
curl -X POST http://localhost:10004/send-message \
-H "Content-Type: application/json" \
-d '{"message": "Calculate 5 + 7"}'
Configuration
Agent Cards
Each agent has a JSON configuration card defining:
- Capabilities and skills
- Supported input/output modes
- Tool descriptions and examples
- API endpoints
Example structure:
{
"name": "Math Agent",
"description": "Mathematical computation specialist",
"url": "http://localhost:10004/",
"skills": [
{
"id": "add",
"name": "Addition",
"description": "Add two numbers",
"examples": ["5 + 7", "add 10 and 20"]
}
]
}
MCP Server Configuration
Weather agent uses MCP for external tool integration:
{
"Weather": {
"command": "python",
"args": ["-m", "a2a_server.mcp.servers.weather"],
"transport": "stdio"
},
"Weather (UV)": {
"command": "uv",
"args": ["run", "python", "-m", "a2a_server.mcp.servers.weather"],
"transport": "stdio"
}
}
Development
Adding New Agents
- Create agent card: Add JSON configuration to
agent_cards/ - Implement agent: Extend
BaseAgentinagents/ - Create server: Extend
BaseAgentServer - Add to manager: Register in
a2a_server_manager.py - Update orchestrator: Agent will be auto-discovered
Extending Capabilities
- Add Tools: Implement LangChain tools for new capabilities
- MCP Integration: Add external tools via Model Context Protocol
- Custom Prompts: Define agent-specific behavior in
prompts.py - Response Formats: Add structured output models in
models.py
Dependencies
Core Stack
- a2a-sdk: Agent-to-Agent communication protocol
- langgraph: Graph-based agent orchestration
- langchain: LLM framework and tool integration
- fastmcp: Model Context Protocol implementation
- pydantic: Data validation and serialization
LLM Providers
- OpenAI: GPT models (primary)
- Google: Gemini models (optional)
Web Framework
- FastAPI/Uvicorn: HTTP server infrastructure
- httpx: Async HTTP client for inter-agent communication
Troubleshooting
Common Issues
- Port conflicts: Check if ports 10003-10005 are available
- Rate Limiting Issues: Shared API Key Problem
- Issue: All agents use the same OpenAI API key configured in
settings.py - Impact: High request volume can trigger 429 "Too Many Requests" errors
- Issue: All agents use the same OpenAI API key configured in
- Parallel Execution Amplification: Orchestrator's parallel task execution can send multiple simultaneous requests. Multiplies rate limit pressure during complex queries.
- No Streaming Support: Current implementation lacks real-time streaming
- Memory Management: Uses in-memory storage only
Debugging
- Enable debug logging in agents
- Check individual agent health endpoints
- Use
test_a2a_server.pyfor integration testing
Performance Considerations
- Parallel Execution: Independent tasks run simultaneously
- Connection Pooling: HTTP clients reuse connections
- Memory Management: Agents use memory savers for conversation state
- Timeout Handling: 10-minute timeout for long-running operations
Security Notes
- Local Development: Currently configured for localhost only
- API Keys: Store securely and never commit to version control
- Network Access: Consider firewall rules for production deployment
Future Enhancements
- Additional specialized agents (code, research, etc.)
- Enhanced dependency resolution algorithms
- Monitoring and observability features
- Production-ready deployment configurations
- WebSocket support for real-time communication
License
This project is licensed under the Apache License, Version 2.0 - see the LICENSE file for details.
The code in the common/ directory is from the Google A2A project and is also licensed under the Apache License, Version 2.0.
This project also makes use of other open-source libraries (e.g., LangGraph, MCP, FastMCP), which are subject to their respective licenses.