infrastructurepython
A2A MQTT
30Agent to agent protocol using MQTT
1 starsUpdated 2025-12-17Apache-2.0
Quality Score30/100
★ Community
7
◷ Freshness
61
✓ Official
30
⚡ Skills
10
⬡ Protocol
30
🔒 Security
20
Getting Started
1Clone the repository
$ git clone https://github.com/JonathanGrocott/A2A-MQTT
2Navigate to the project
$ cd A2A-MQTT
3Install dependencies
$ pip install -r requirements.txt
4Run the agent
$ python main.py
README
A2A MQTT
A Python implementation of the Agent-to-Agent (A2A) Protocol using MQTT as the transport layer.
This library enables AI agents to discover each other, negotiate capabilities, and collaborate on tasks using an event-driven architecture suitable for scalable, distributed systems.
Features
- MQTT Transport: Replaces HTTP/JSON-RPC with lightweight, event-driven MQTT messaging.
- Protocol Compliance: Implements core A2A concepts including Agent Cards, Task Requests, and Task Results.
- Automatic Discovery: Agents automatically publish retained "Agent Cards" to a common discovery topic.
- Graceful Shutdown: Automatically removes Agent Cards (clears retained message) upon stopping.
- Type Safety: Uses Pydantic models for strict schema validation.
Installation
pip install -r requirements.txt
Quick Start
1. Start an Agent
Create an agent that exposes a simple tool.
from a2a_mqtt import A2AAgent
def echo(params: dict):
return f"Echo: {params.get('message')}"
agent = A2AAgent(
org_id="my-org",
agent_id="agent-001",
name="Echo Agent",
description="Echoes messages",
broker_address="test.mosquitto.org"
)
agent.register_tool(echo)
agent.start()
2. Send a Task
Use the MQTTTransport directly or another Agent to send a task.
Topic Schema
This implementation follows a strict topic hierarchy to organize traffic:
| Purpose | Topic Pattern | Description |
|---|---|---|
| Discovery | discovery/{org_id}/{agent_id} |
Retained Agent Cards. Subscribers listen here to find available agents. |
| Tasks | {org_id}/{agent_id}/tasks |
Incoming Task Requests. Agents subscribe here to receive work. |
| Results | {org_id}/{agent_id}/results |
Outgoing Task Results. Clients subscribe here to get responses. |
Message Formats
Messages use JSON payloads compatible with the A2A spec.
Agent Card (Discovery)
Published to discovery/... with retain=True.
{
"protocolVersion": "1.0",
"name": "Supply Chain Agent",
"url": "mqtt://global-industries/supply-chain-01",
"preferredTransport": "MQTT",
"skills": [
{
"id": "inventory-check",
"name": "Check Inventory",
"description": "Returns stock levels for a product ID."
}
]
}
Task Request
Published to .../tasks.
{
"id": "req-123",
"taskId": "task-abc-789",
"operation": "inventory-check",
"timestamp": "2024-01-01T12:00:00Z",
"params": {
"message": {
"role": "user",
"content": {
"parts": [{ "data": { "product_id": "WIDGET-A" } }]
}
}
}
}
Task Result
Published to .../results.
{
"id": "req-123",
"taskId": "task-abc-789",
"status": "completed",
"result": {
"status": "completed",
"result": { "stock": 500 }
}
}
Security & Best Practices
For production deployments (non-demo), follow these guidelines:
- TLS Encryption: Always use MQTTS (typically port 8883) to encrypt traffic between agents and the broker.
- Authentication: Configure the broker to require username/password or client certificate authentication.
- Access Control Lists (ACLs): Restrict agents so they can only:
- Publish to their own
tasks/resultstopics. - Subscribe to their own
taskstopic. - Read
discovery/#(if they need to find others).
- Publish to their own
- Graceful Shutdown: The
A2AAgentclass attempts to clear its retained Agent Card onstop(). Ensure your application handles signals (SIGINT/SIGTERM) to callagent.stop().
Development
Running tests:
pip install pytest
pytest tests/
Capabilities
StreamingPush NotificationsMulti-TurnAuth: none
mcpmqtt