Get the FREE Ultimate OpenClaw Setup Guide →

rabbitmq-operations

Scanned
npx machina-cli add skill othmane55/claude-collective-intelligence/rabbitmq-ops --openclaw
Files (1)
SKILL.md
10.0 KB

RabbitMQ Operations

Comprehensive skill for managing RabbitMQ in multi-agent orchestration systems.

Quick Start

Basic Connection

import { RabbitMQClient } from './scripts/rabbitmq-client.js';

const client = new RabbitMQClient({
  url: 'amqp://localhost:5672'
});

await client.connect();

Send Message to Queue

await client.publishTask({
  title: "Process data",
  description: "Transform CSV to JSON",
  priority: "high"
});

Consume Messages

await client.consumeTasks('agent.tasks', async (msg, { ack, nack }) => {
  console.log('Received:', msg.task);

  // Process task
  const result = await processTask(msg.task);

  // Acknowledge
  ack();
});

Core Concepts

Queues

Point-to-point messaging with load balancing:

// Setup durable queue
await client.setupTaskQueue('agent.tasks');

// Multiple consumers share work
// Each message delivered to ONE consumer

Exchanges

Publish-subscribe with routing:

// Fanout - broadcast to all
await client.setupBrainstormExchange('agent.brainstorm');

// Topic - selective routing
await client.setupStatusExchange('agent.status');

Message Patterns

Work Queue (Load Balancing):

Producer → Queue → Consumer 1
                 → Consumer 2
                 → Consumer 3

Each message to ONE consumer

Pub/Sub (Broadcasting):

Publisher → Exchange → Queue 1 → Consumer 1
                    → Queue 2 → Consumer 2
                    → Queue 3 → Consumer 3

Each message to ALL consumers

Topic (Selective):

Publisher → Topic Exchange
   ↓ (routing key: agent.status.connected)
   → Queue (pattern: agent.status.#) → Consumer 1
   → Queue (pattern: agent.*.connected) → Consumer 2

Advanced Operations

Message Persistence

// Durable queue + persistent messages = survive restarts
await channel.assertQueue('agent.tasks', {
  durable: true  // Queue survives broker restart
});

await channel.sendToQueue('agent.tasks', message, {
  persistent: true  // Message written to disk
});

Prefetch (QoS)

// Fair dispatch - each worker gets 1 task at a time
await channel.prefetch(1);

// Or allow multiple concurrent tasks per worker
await channel.prefetch(5);

Message TTL

// Messages expire after 1 hour
await channel.assertQueue('agent.tasks', {
  arguments: {
    'x-message-ttl': 3600000
  }
});

Dead Letter Exchange

// Failed messages go to dead letter queue
await channel.assertQueue('agent.tasks', {
  arguments: {
    'x-dead-letter-exchange': 'dlx',
    'x-dead-letter-routing-key': 'failed'
  }
});

Priority Queues

await channel.assertQueue('agent.tasks', {
  arguments: {
    'x-max-priority': 10
  }
});

// Send with priority
await channel.sendToQueue('agent.tasks', message, {
  priority: 8
});

Connection Management

Auto-Reconnect

const client = new RabbitMQClient({
  autoReconnect: true,
  maxReconnectAttempts: 10
});

client.on('disconnected', () => {
  console.log('Connection lost, will retry...');
});

client.on('connected', () => {
  console.log('Reconnected successfully!');
});

Heartbeat

// Keep connection alive
const client = new RabbitMQClient({
  heartbeat: 30  // Send heartbeat every 30s
});

Connection Pooling

// For high-throughput scenarios
const pool = new ConnectionPool({
  min: 2,
  max: 10,
  url: 'amqp://localhost:5672'
});

const channel = await pool.acquire();
// Use channel
await pool.release(channel);

Message Acknowledgment

Manual Ack

await client.consumeTasks('queue', async (msg, { ack, nack, reject }) => {
  try {
    await processMessage(msg);
    ack();  // Success
  } catch (error) {
    if (error.isTransient) {
      nack(true);  // Requeue for retry
    } else {
      reject();  // Dead letter
    }
  }
}, { noAck: false });

Auto-Ack (Risky)

// Message lost if processing fails!
await channel.consume('queue', handler, { noAck: true });

Routing Patterns

Direct Exchange

await channel.assertExchange('direct_logs', 'direct');

// Bind with routing key
await channel.bindQueue(queue, 'direct_logs', 'error');
await channel.bindQueue(queue, 'direct_logs', 'warning');

// Publish with routing key
await channel.publish('direct_logs', 'error', message);

Topic Exchange

await channel.assertExchange('logs', 'topic');

// Wildcards in binding:
// * matches one word
// # matches zero or more words

await channel.bindQueue(queue, 'logs', 'agent.status.*');
await channel.bindQueue(queue, 'logs', 'agent.#');

// Publish
await channel.publish('logs', 'agent.status.connected', message);

Fanout Exchange

await channel.assertExchange('notifications', 'fanout');

// All bound queues receive message
await channel.bindQueue(queue1, 'notifications', '');
await channel.bindQueue(queue2, 'notifications', '');

await channel.publish('notifications', '', message);
// Both queue1 and queue2 receive it

Monitoring and Debugging

Queue Inspection

// Check queue status
const info = await channel.checkQueue('agent.tasks');
console.log('Messages:', info.messageCount);
console.log('Consumers:', info.consumerCount);

Management API

// Use RabbitMQ Management HTTP API
const response = await fetch('http://localhost:15672/api/queues', {
  headers: {
    'Authorization': 'Basic ' + btoa('guest:guest')
  }
});

const queues = await response.json();
queues.forEach(q => {
  console.log(`${q.name}: ${q.messages} messages`);
});

Consumer Tracking

const consumer = await channel.consume('queue', handler);

// Cancel consumer
await channel.cancel(consumer.consumerTag);

Performance Optimization

Batch Publishing

// Publish multiple messages efficiently
const messages = [...];

for (const msg of messages) {
  channel.sendToQueue('queue', Buffer.from(JSON.stringify(msg)));
}

// Wait for all to be written
await channel.waitForConfirms();

Publisher Confirms

await channel.confirmSelect();

channel.sendToQueue('queue', message);

await channel.waitForConfirms();
// Message definitely received by broker

Consumer Concurrency

// Multiple channels for parallel processing
const channels = await Promise.all([
  connection.createChannel(),
  connection.createChannel(),
  connection.createChannel()
]);

channels.forEach((ch, i) => {
  ch.consume('queue', handler);
});

Error Handling

Channel Errors

channel.on('error', (err) => {
  console.error('Channel error:', err);
  // Channel is closed, create new one
});

channel.on('close', () => {
  console.log('Channel closed');
});

Connection Errors

connection.on('error', (err) => {
  console.error('Connection error:', err);
});

connection.on('close', () => {
  console.log('Connection closed');
  // Implement reconnection logic
});

Message Handling Errors

try {
  await processMessage(msg);
  ack();
} catch (error) {
  console.error('Processing error:', error);

  // Log error with context
  await logError({
    messageId: msg.properties.messageId,
    error: error.message,
    stack: error.stack,
    timestamp: Date.now()
  });

  // Decide: retry or dead letter
  if (shouldRetry(error)) {
    nack(true);  // Requeue
  } else {
    reject();  // Dead letter
  }
}

Best Practices

  1. Use Persistent Messages for Critical Data

    sendToQueue(queue, msg, { persistent: true });
    
  2. Set Reasonable Prefetch

    await channel.prefetch(1);  // Fair dispatch
    
  3. Always Close Connections

    process.on('SIGINT', async () => {
      await channel.close();
      await connection.close();
    });
    
  4. Use Confirm Channels for Reliability

    await channel.confirmSelect();
    await channel.waitForConfirms();
    
  5. Monitor Queue Depths

    setInterval(async () => {
      const info = await channel.checkQueue('queue');
      if (info.messageCount > 100) {
        alert('High queue depth!');
      }
    }, 60000);
    
  6. Implement Dead Letter Queues

    // Capture and analyze failed messages
    
  7. Use Message TTL

    // Prevent old messages from clogging queues
    
  8. Set Max Length

    await channel.assertQueue('queue', {
      arguments: { 'x-max-length': 10000 }
    });
    

Common Patterns

Request-Reply

// Request
const correlationId = uuid();
const replyQueue = await channel.assertQueue('', { exclusive: true });

await channel.sendToQueue('rpc_queue', msg, {
  correlationId,
  replyTo: replyQueue.queue
});

// Wait for reply
await channel.consume(replyQueue.queue, (reply) => {
  if (reply.properties.correlationId === correlationId) {
    // Process reply
  }
});

Work Stealing

// Workers can steal work from each other
await channel.prefetch(1);
await channel.consume('queue', async (msg) => {
  // Process
  ack();
});

Circuit Breaker

let failureCount = 0;
const threshold = 5;

await channel.consume('queue', async (msg) => {
  try {
    await process(msg);
    failureCount = 0;
    ack();
  } catch (error) {
    failureCount++;

    if (failureCount >= threshold) {
      console.error('Circuit open, stopping consumption');
      await channel.cancel(consumerTag);
    } else {
      nack(true);
    }
  }
});

Resources

Source

git clone https://github.com/othmane55/claude-collective-intelligence/blob/main/skills/rabbitmq-ops/SKILL.mdView on GitHub

Overview

Manage RabbitMQ in multi-agent orchestration systems. Create durable queues, configure fanout or topic exchanges, and publish or consume messages with proper acknowledgments. Enables work queues, pub/sub, and topic routing for distributed messaging.

How This Skill Works

Connect with a RabbitMQClient, declare queues and exchanges, and publish messages to queues or exchanges. Consume messages with ack/nack using consumeTasks, and bind queues to exchanges (e.g., setupTaskQueue, setupBrainstormExchange, setupStatusExchange) to implement different messaging patterns. Leverage advanced features like persistence, prefetch, TTL, dead-lettering, and priority, along with auto-reconnect, heartbeats, and pooling for robust operation.

When to Use It

  • Building a task processing pipeline with multiple workers (work queue) using agent.tasks
  • Broadcasting events to multiple consumers via a brainstorming fanout exchange
  • Selective routing of status updates with a topic-based exchange and routing keys
  • Ensuring reliability with durable queues and persistent messages (x-message-ttl, DLX)
  • High-throughput scenarios requiring connection pooling and auto-reconnect

Quick Start

  1. Step 1: Create a RabbitMQClient and connect to amqp://localhost:5672
  2. Step 2: Setup queues/exchanges (e.g., setupTaskQueue, setupBrainstormExchange) and publish a task with publishTask
  3. Step 3: Consume tasks with consumeTasks and acknowledge (ack) after processing

Best Practices

  • Use durable queues and persistent messages to survive broker restarts
  • Tune prefetch to balance load and achieve fair dispatch across workers
  • Configure dead-letter exchanges and routing keys for failed messages
  • Choose fanout for pub/sub and topic for selective routing in exchanges
  • Prefer manual acks and handle nack/reject to maintain processing integrity

Example Use Cases

  • Work queue where multiple agents consume from agent.tasks and each message goes to a single worker
  • Brainstorm exchange used to broadcast events to all bound queues and consumers
  • Status updates routed with agent.status.* patterns on a topic exchange
  • Messages directed to a dead-letter queue on failure to enable retries or inspection
  • Priority queues handling urgent tasks by assigning high priority to critical messages

Frequently Asked Questions

Add this skill to your agents
Sponsor this space

Reach thousands of developers