RabbitMQ Integration Example
RabbitMQ is a robust, enterprise-grade message broker that provides reliable message queuing with advanced routing capabilities. Kincir provides seamless integration with RabbitMQ through its unified interface.
Prerequisites
Before running these examples, ensure you have RabbitMQ installed and running:
# Using Docker
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# Or install locally (Ubuntu/Debian)
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server
Basic Usage
Simple Publisher-Subscriber
use kincir::rabbitmq::{RabbitMQPublisher, RabbitMQSubscriber};
use kincir::{Publisher, Subscriber, Message};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create RabbitMQ publisher and subscriber
let publisher = RabbitMQPublisher::new("amqp://localhost:5672");
let mut subscriber = RabbitMQSubscriber::new("amqp://localhost:5672", "my-queue");
// Subscribe to an exchange
subscriber.subscribe("user-events").await?;
// Publish a message
let message = Message::new(b"User registered: john@example.com".to_vec())
.with_metadata("event_type", "user_registration")
.with_metadata("user_id", "12345")
.with_metadata("timestamp", &chrono::Utc::now().to_rfc3339());
publisher.publish("user-events", vec![message]).await?;
// Receive and process the message
let received = subscriber.receive().await?;
println!("Received: {:?}", String::from_utf8_lossy(&received.payload));
println!("Event type: {:?}", received.get_metadata("event_type"));
Ok(())
}
Message Acknowledgments
RabbitMQ supports message acknowledgments for reliable processing:
use kincir::rabbitmq::RabbitMQAckSubscriber;
use kincir::{AckSubscriber, Message};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut subscriber = RabbitMQAckSubscriber::new("amqp://localhost:5672", "orders-queue");
subscriber.subscribe("orders").await?;
loop {
// Receive message with acknowledgment handle
let (message, ack_handle) = subscriber.receive_with_ack().await?;
// Process the message
match process_order(&message).await {
Ok(()) => {
println!("Order processed successfully");
// Acknowledge successful processing
ack_handle.ack().await?;
}
Err(e) => {
eprintln!("Failed to process order: {}", e);
// Reject and requeue the message
ack_handle.nack(true).await?;
}
}
}
}
async fn process_order(message: &Message) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Simulate order processing
println!("Processing order: {:?}", String::from_utf8_lossy(&message.payload));
// Simulate potential failure
if message.get_metadata("order_id") == Some("FAIL") {
return Err("Simulated processing failure".into());
}
// Simulate processing time
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(())
}
Advanced Configuration
Custom RabbitMQ Configuration
use kincir::rabbitmq::{RabbitMQConfig, RabbitMQPublisher, RabbitMQSubscriber};
use kincir::{Publisher, Subscriber, Message};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create custom configuration
let config = RabbitMQConfig {
connection_url: "amqp://user:password@localhost:5672/vhost".to_string(),
exchange_name: "my-exchange".to_string(),
exchange_type: "topic".to_string(),
durable: true,
auto_delete: false,
routing_key: Some("events.user.*".to_string()),
queue_options: Some(QueueOptions {
durable: true,
exclusive: false,
auto_delete: false,
arguments: HashMap::new(),
}),
};
// Create publisher and subscriber with custom config
let publisher = RabbitMQPublisher::with_config(config.clone());
let mut subscriber = RabbitMQSubscriber::with_config(config, "user-events-queue");
// Use with routing keys
subscriber.subscribe("events.user.registered").await?;
let message = Message::new(b"User John registered".to_vec());
publisher.publish_with_routing_key("events.user.registered", vec![message]).await?;
let received = subscriber.receive().await?;
println!("Received: {:?}", String::from_utf8_lossy(&received.payload));
Ok(())
}
Topic-Based Routing
use kincir::rabbitmq::{RabbitMQPublisher, RabbitMQSubscriber};
use kincir::{Publisher, Subscriber, Message};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let publisher = RabbitMQPublisher::new("amqp://localhost:5672");
// Create subscribers for different routing patterns
let mut user_subscriber = RabbitMQSubscriber::new("amqp://localhost:5672", "user-events");
let mut order_subscriber = RabbitMQSubscriber::new("amqp://localhost:5672", "order-events");
let mut all_subscriber = RabbitMQSubscriber::new("amqp://localhost:5672", "all-events");
// Subscribe to different routing patterns
user_subscriber.subscribe("events.user.*").await?;
order_subscriber.subscribe("events.order.*").await?;
all_subscriber.subscribe("events.*").await?;
// Publish messages with different routing keys
let user_message = Message::new(b"User registered".to_vec());
let order_message = Message::new(b"Order placed".to_vec());
let system_message = Message::new(b"System maintenance".to_vec());
publisher.publish_with_routing_key("events.user.registered", vec![user_message]).await?;
publisher.publish_with_routing_key("events.order.placed", vec![order_message]).await?;
publisher.publish_with_routing_key("events.system.maintenance", vec![system_message]).await?;
// Each subscriber will receive relevant messages
println!("User subscriber: {:?}", user_subscriber.receive().await?);
println!("Order subscriber: {:?}", order_subscriber.receive().await?);
// All subscriber receives all messages
for _ in 0..3 {
println!("All subscriber: {:?}", all_subscriber.receive().await?);
}
Ok(())
}
Work Queue Pattern
use kincir::rabbitmq::{RabbitMQPublisher, RabbitMQSubscriber};
use kincir::{Publisher, Subscriber, Message};
use std::sync::Arc;
use tokio::task;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let publisher = RabbitMQPublisher::new("amqp://localhost:5672");
// Create multiple workers
let mut worker_handles = vec![];
for worker_id in 0..3 {
let handle = task::spawn(async move {
let mut subscriber = RabbitMQSubscriber::new("amqp://localhost:5672", "work-queue");
subscriber.subscribe("tasks").await.unwrap();
loop {
match subscriber.receive().await {
Ok(message) => {
let task_data = String::from_utf8_lossy(&message.payload);
println!("Worker {} processing: {}", worker_id, task_data);
// Simulate work
let work_duration = message.get_metadata("duration")
.and_then(|d| d.parse::<u64>().ok())
.unwrap_or(1000);
tokio::time::sleep(tokio::time::Duration::from_millis(work_duration)).await;
println!("Worker {} completed: {}", worker_id, task_data);
}
Err(e) => {
eprintln!("Worker {} error: {}", worker_id, e);
break;
}
}
}
});
worker_handles.push(handle);
}
// Publish work tasks
for i in 0..10 {
let task = Message::new(format!("Task #{}", i).into_bytes())
.with_metadata("task_id", &i.to_string())
.with_metadata("duration", &(500 + i * 100).to_string()); // Varying work duration
publisher.publish("tasks", vec![task]).await?;
println!("Published task #{}", i);
}
// Let workers process for a while
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
// Cancel workers
for handle in worker_handles {
handle.abort();
}
Ok(())
}
RPC Pattern
use kincir::rabbitmq::{RabbitMQPublisher, RabbitMQSubscriber};
use kincir::{Publisher, Subscriber, Message};
use uuid::Uuid;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// RPC Server
let server_handle = tokio::spawn(async {
let mut subscriber = RabbitMQSubscriber::new("amqp://localhost:5672", "rpc-requests");
let publisher = RabbitMQPublisher::new("amqp://localhost:5672");
subscriber.subscribe("rpc_queue").await.unwrap();
loop {
match subscriber.receive().await {
Ok(request) => {
let request_data = String::from_utf8_lossy(&request.payload);
println!("RPC Server received: {}", request_data);
// Process request (simulate calculation)
let result = if let Some(num_str) = request.get_metadata("number") {
num_str.parse::<i32>().unwrap_or(0) * 2
} else {
0
};
// Send response
if let Some(reply_to) = request.get_metadata("reply_to") {
if let Some(correlation_id) = request.get_metadata("correlation_id") {
let response = Message::new(result.to_string().into_bytes())
.with_metadata("correlation_id", correlation_id);
publisher.publish(reply_to, vec![response]).await.unwrap();
}
}
}
Err(e) => {
eprintln!("RPC Server error: {}", e);
break;
}
}
}
});
// RPC Client
let client_handle = tokio::spawn(async {
let publisher = RabbitMQPublisher::new("amqp://localhost:5672");
let mut subscriber = RabbitMQSubscriber::new("amqp://localhost:5672", "rpc-responses");
let reply_queue = "rpc_reply_queue";
subscriber.subscribe(reply_queue).await.unwrap();
// Send RPC requests
for i in 1..=5 {
let correlation_id = Uuid::new_v4().to_string();
let request = Message::new(format!("Calculate double of {}", i).into_bytes())
.with_metadata("number", &i.to_string())
.with_metadata("reply_to", reply_queue)
.with_metadata("correlation_id", &correlation_id);
publisher.publish("rpc_queue", vec![request]).await.unwrap();
println!("RPC Client sent request for number: {}", i);
// Wait for response
let response = subscriber.receive().await.unwrap();
if response.get_metadata("correlation_id") == Some(&correlation_id) {
let result = String::from_utf8_lossy(&response.payload);
println!("RPC Client received result: {}", result);
}
}
});
// Wait for both client and server
tokio::select! {
_ = server_handle => {},
_ = client_handle => {},
}
Ok(())
}
Error Handling and Reconnection
use kincir::rabbitmq::{RabbitMQPublisher, RabbitMQSubscriber};
use kincir::{Publisher, Subscriber, Message, KincirError};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut retry_count = 0;
let max_retries = 5;
loop {
match run_rabbitmq_client().await {
Ok(()) => {
println!("RabbitMQ client completed successfully");
break;
}
Err(KincirError::ConnectionError(e)) => {
retry_count += 1;
if retry_count >= max_retries {
eprintln!("Max retries reached. Giving up. Error: {}", e);
return Err(e.into());
}
println!("Connection failed (attempt {}). Retrying in 5 seconds...", retry_count);
sleep(Duration::from_secs(5)).await;
}
Err(e) => {
eprintln!("Non-recoverable error: {}", e);
return Err(e.into());
}
}
}
Ok(())
}
async fn run_rabbitmq_client() -> Result<(), KincirError> {
let publisher = RabbitMQPublisher::new("amqp://localhost:5672");
let mut subscriber = RabbitMQSubscriber::new("amqp://localhost:5672", "test-queue");
// This will fail if RabbitMQ is not running
subscriber.subscribe("test-exchange").await?;
let message = Message::new(b"Test message".to_vec());
publisher.publish("test-exchange", vec![message]).await?;
let received = subscriber.receive().await?;
println!("Received: {:?}", String::from_utf8_lossy(&received.payload));
Ok(())
}
Performance Considerations
Connection Pooling
use kincir::rabbitmq::{RabbitMQConnectionPool, RabbitMQPublisher};
use kincir::{Publisher, Message};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create connection pool for better performance
let pool = Arc::new(RabbitMQConnectionPool::new("amqp://localhost:5672", 10).await?);
// Create multiple publishers sharing the pool
let mut handles = vec![];
for i in 0..5 {
let pool_clone = pool.clone();
let handle = tokio::spawn(async move {
let publisher = RabbitMQPublisher::with_pool(pool_clone);
for j in 0..100 {
let message = Message::new(format!("Message {}-{}", i, j).into_bytes());
publisher.publish("high-throughput", vec![message]).await.unwrap();
}
});
handles.push(handle);
}
// Wait for all publishers to complete
for handle in handles {
handle.await?;
}
println!("Published 500 messages using connection pool");
Ok(())
}
Testing with RabbitMQ
#[cfg(test)]
mod tests {
use super::*;
use testcontainers::{clients::Cli, images::rabbitmq::RabbitMq, Container};
#[tokio::test]
async fn test_rabbitmq_integration() {
// Start RabbitMQ container for testing
let docker = Cli::default();
let rabbitmq_container = docker.run(RabbitMq::default());
let connection_string = format!(
"amqp://guest:guest@127.0.0.1:{}",
rabbitmq_container.get_host_port_ipv4(5672)
);
let publisher = RabbitMQPublisher::new(&connection_string);
let mut subscriber = RabbitMQSubscriber::new(&connection_string, "test-queue");
subscriber.subscribe("test-exchange").await.unwrap();
let message = Message::new(b"integration test message".to_vec());
publisher.publish("test-exchange", vec![message]).await.unwrap();
let received = subscriber.receive().await.unwrap();
assert_eq!(received.payload, b"integration test message");
}
}
Next Steps
- Kafka Integration - High-throughput streaming
- MQTT Support - IoT messaging
- Message Acknowledgments - Reliable processing
- Microservices Communication - Service integration