StackA2A
infrastructurepython

A2A MQTT

30

by JonathanGrocott

Agent to agent protocol using MQTT

1 starsUpdated 2025-12-17Apache-2.0
Quality Score30/100
Community
7
Freshness
61
Official
30
Skills
10
Protocol
30
🔒 Security
20

Getting Started

1Clone the repository
$ git clone https://github.com/JonathanGrocott/A2A-MQTT
2Navigate to the project
$ cd A2A-MQTT
3Install dependencies
$ pip install -r requirements.txt
4Run the agent
$ python main.py

README

A2A MQTT

A Python implementation of the Agent-to-Agent (A2A) Protocol using MQTT as the transport layer.

This library enables AI agents to discover each other, negotiate capabilities, and collaborate on tasks using an event-driven architecture suitable for scalable, distributed systems.

Features

  • MQTT Transport: Replaces HTTP/JSON-RPC with lightweight, event-driven MQTT messaging.
  • Protocol Compliance: Implements core A2A concepts including Agent Cards, Task Requests, and Task Results.
  • Automatic Discovery: Agents automatically publish retained "Agent Cards" to a common discovery topic.
  • Graceful Shutdown: Automatically removes Agent Cards (clears retained message) upon stopping.
  • Type Safety: Uses Pydantic models for strict schema validation.

Installation

pip install -r requirements.txt

Quick Start

1. Start an Agent

Create an agent that exposes a simple tool.

from a2a_mqtt import A2AAgent

def echo(params: dict):
    return f"Echo: {params.get('message')}"

agent = A2AAgent(
    org_id="my-org",
    agent_id="agent-001", 
    name="Echo Agent", 
    description="Echoes messages",
    broker_address="test.mosquitto.org"
)

agent.register_tool(echo)
agent.start()

2. Send a Task

Use the MQTTTransport directly or another Agent to send a task.

Topic Schema

This implementation follows a strict topic hierarchy to organize traffic:

Purpose Topic Pattern Description
Discovery discovery/{org_id}/{agent_id} Retained Agent Cards. Subscribers listen here to find available agents.
Tasks {org_id}/{agent_id}/tasks Incoming Task Requests. Agents subscribe here to receive work.
Results {org_id}/{agent_id}/results Outgoing Task Results. Clients subscribe here to get responses.

Message Formats

Messages use JSON payloads compatible with the A2A spec.

Agent Card (Discovery)

Published to discovery/... with retain=True.

{
  "protocolVersion": "1.0",
  "name": "Supply Chain Agent",
  "url": "mqtt://global-industries/supply-chain-01",
  "preferredTransport": "MQTT",
  "skills": [
    {
      "id": "inventory-check",
      "name": "Check Inventory",
      "description": "Returns stock levels for a product ID."
    }
  ]
}

Task Request

Published to .../tasks.

{
  "id": "req-123",
  "taskId": "task-abc-789",
  "operation": "inventory-check",
  "timestamp": "2024-01-01T12:00:00Z",
  "params": {
    "message": {
      "role": "user",
      "content": {
        "parts": [{ "data": { "product_id": "WIDGET-A" } }]
      }
    }
  }
}

Task Result

Published to .../results.

{
  "id": "req-123",
  "taskId": "task-abc-789",
  "status": "completed",
  "result": {
    "status": "completed",
    "result": { "stock": 500 }
  }
}

Security & Best Practices

For production deployments (non-demo), follow these guidelines:

  1. TLS Encryption: Always use MQTTS (typically port 8883) to encrypt traffic between agents and the broker.
  2. Authentication: Configure the broker to require username/password or client certificate authentication.
  3. Access Control Lists (ACLs): Restrict agents so they can only:
    • Publish to their own tasks/results topics.
    • Subscribe to their own tasks topic.
    • Read discovery/# (if they need to find others).
  4. Graceful Shutdown: The A2AAgent class attempts to clear its retained Agent Card on stop(). Ensure your application handles signals (SIGINT/SIGTERM) to call agent.stop().

Development

Running tests:

pip install pytest
pytest tests/

Capabilities

StreamingPush NotificationsMulti-TurnAuth: none
mcpmqtt
View on GitHub