StackA2A
orchestrationtypescript

A2A Amqp

27

by cloudamqp

Production-ready AMQP adapter for A2A protocol enabling distributed task processing, event sourcing with LavinMQ streams, and horizontal worker scaling.

3 starsUpdated 2025-11-18
Quality Score27/100
Community
13
Freshness
48
Official
30
Skills
10
Protocol
30
🔒 Security
20

Getting Started

1Clone the repository
$ git clone https://github.com/cloudamqp/a2a-amqp
2Navigate to the project
$ cd a2a-amqp
3Install dependencies
$ npm install
4Run the agent
$ npm start

README

a2a-amqp

AMQP-backed EventBus and WorkQueue for scaling A2A agents with long-running tasks.

Why?

A2A agents often need to handle long-running tasks (LLM calls, complex processing, etc.). Running these tasks inline in HTTP handlers causes:

  • Timeout issues: HTTP connections timeout on long tasks
  • Scaling problems: Single server bottlenecks
  • Resource waste: Servers blocked waiting for tasks to complete

This library solves these problems by:

  1. Queuing tasks via AMQP instead of processing inline
  2. Distributing work across multiple worker processes
  3. Event sourcing all task events for replay and recovery
  4. Streaming results back via SSE while workers process in the background

Architecture

HTTP Request → Server (enqueues task) → Returns immediately
                    ↓
               AMQP Queue
                    ↓
          Worker Pool (scales horizontally)
                    ↓
          Process task & publish events
                    ↓
          AMQP Stream (event sourcing)
                    ↓
          Client streams results via SSE

Installation

# Installing using bun
bun add @cloudamqp/a2a-amqp @a2a-js/sdk @cloudamqp/amqp-client

# Or with npm
npm install @cloudamqp/a2a-amqp @a2a-js/sdk @cloudamqp/amqp-client

Requires: LavinMQ or RabbitMQ with stream support

docker run -d -p 5672:5672 -p 15672:15672 cloudamqp/lavinmq:latest

Quick Start

1. HTTP Server (enqueues tasks)

import { AMQPAgentBackend, QueuingRequestHandler } from "@84codes/a2a-amqp";
import { A2AExpressApp } from "@a2a-js/sdk/server/express";
import express from "express";

// Create AMQP backend
const backend = await AMQPAgentBackend.create({
  url: "amqp://localhost:5672",
  agentName: "my-agent",
});

// Create request handler (handles task queuing + event projection)
const requestHandler = new QueuingRequestHandler(agentCard, backend);
await requestHandler.initialize();

// Setup Express with A2A routes
const app = express();
new A2AExpressApp(requestHandler).setupRoutes(app, "/");
app.listen(3000);

2. Worker Process (processes tasks)

import { AMQPAgentBackend, WorkerEventBus } from "@84codes/a2a-amqp";
import { AgentExecutor, RequestContext } from "@a2a-js/sdk/server";

// Create backend with same agent name as server
const backend = await AMQPAgentBackend.create({
  url: "amqp://localhost:5672",
  agentName: "my-agent",
});

// Initialize work queue
await backend.workQueue.initialize();

class MyExecutor implements AgentExecutor {
  async execute(context: RequestContext, eventBus: ExecutionEventBus) {
    // Your long-running task logic here
    eventBus.publish({
      kind: "status-update",
      taskId: context.taskId,
      contextId: context.contextId,
      status: { state: "working", timestamp: new Date().toISOString() },
      final: false,
    });

    // ... do work ...

    eventBus.publish({
      kind: "status-update",
      taskId: context.taskId,
      contextId: context.contextId,
      status: { state: "completed", timestamp: new Date().toISOString() },
      final: true,
    });
    eventBus.finished();
  }
}

const executor = new MyExecutor();

// Start consuming with async generator pattern
const messages = backend.workQueue.start();

for await (const taskMessage of messages) {
  const { taskId, contextId, requestContext } = taskMessage;

  // Create request context
  const context = new RequestContext(
    requestContext.userMessage,
    taskId,
    contextId,
    requestContext.task,
    requestContext.referenceTasks
  );

  // Create event bus for publishing task events
  const eventBus = new WorkerEventBus(backend.amqpConnection, taskId, contextId);

  // Execute task
  await executor.execute(context, eventBus);
}

3. Scale horizontally

Run multiple workers to process tasks in parallel:

# Terminal 1: HTTP Server
bun run server

# Terminal 2-N: Workers (scale as needed)
bun run worker
bun run worker  # Add more workers for higher throughput

Features

  • Work Queue: Distribute tasks across multiple worker processes
  • Event Sourcing: All task events stored in AMQP streams for replay
  • In-Memory Projection: Fast task lookups with automatic recovery from streams
  • SSE Streaming: Automatic streaming of task events back to clients
  • Horizontal Scaling: Add more workers to increase throughput
  • Graceful Shutdown: Clean consumer and connection handling
  • Type-Safe: Full TypeScript support with Zod validation

Configuration

interface AMQPAgentBackendConfig {
  url: string;                    // AMQP broker URL
  agentName: string;              // Agent identifier
  streamRetention?: string;       // Event retention (default: "7d")
  streamMaxBytes?: number;        // Max stream size (default: 1GB)
  workQueueName?: string;         // Custom work queue name
  exchangeName?: string;          // Custom exchange name
  logger?: Logger;                // Custom logger
  connection?: {
    heartbeat?: number;           // Heartbeat interval in seconds
    reconnectDelay?: number;      // Reconnection delay in ms
    maxReconnectAttempts?: number;// Max reconnection attempts
  };
  publishing?: {
    persistent?: boolean;         // Persistent messages (default: true)
    confirmMode?: boolean;        // Publisher confirms (default: true)
    messageTtl?: number;          // Message TTL in ms (0 = no expiration)
  };
}

Examples

See complete working examples:

  • src/examples/http-server.ts - HTTP server with queuing
  • src/examples/worker.ts - Worker process
# Run the example
bun run server  # Terminal 1
bun run worker  # Terminal 2

# Send a request
curl -X POST http://localhost:3000/ \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","id":1,"method":"messages/send","params":{"message":{"kind":"message","role":"user","messageId":"1","contextId":"ctx-1","parts":[{"kind":"text","text":"Hello"}]}}}'

Testing

bun run test             # Run all tests (unit + integration)
bun run test:unit        # Unit tests only
bun run test:integration # Integration tests only
bun run test:watch       # Watch mode
bun run test:coverage    # With coverage

License

MIT

Capabilities

StreamingPush NotificationsMulti-TurnAuth: none
agent-orchestrationai-agentsamqpevent-drivenlavinmqtask-queue
View on GitHub