In-Memory Message Broker

The in-memory broker provides a lightweight, high-performance message broker implementation that runs entirely in memory. It’s perfect for testing, development, and lightweight production scenarios where external broker dependencies are not desired.

Table of contents

  1. Overview
  2. Quick Start
    1. Basic Usage
  3. Configuration
    1. Basic Configuration
    2. Pre-configured Profiles
    3. Configuration Options
  4. Advanced Features
    1. Message Ordering
    2. Message Time-To-Live (TTL)
    3. Health Monitoring
    4. Statistics Collection
    5. Graceful Shutdown
  5. Error Handling
  6. Performance Characteristics
    1. Benchmarks
  7. Testing Integration
  8. Migration from External Brokers
  9. Best Practices
    1. For Development
    2. For Production
    3. For High Throughput
  10. Limitations
  11. API Reference
    1. InMemoryBroker
    2. InMemoryPublisher
    3. InMemorySubscriber
    4. InMemoryConfig
    5. InMemoryError

Overview

The in-memory broker is a complete implementation of Kincir’s Publisher and Subscriber traits that operates entirely within your application’s memory space. It provides:

Quick Start

Add Kincir to your Cargo.toml:

[dependencies]
kincir = "0.1.6"
tokio = { version = "1.0", features = ["full"] }

Basic Usage

use kincir::memory::{InMemoryBroker, InMemoryPublisher, InMemorySubscriber, InMemoryConfig};
use kincir::{Publisher, Subscriber, Message};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create broker with default configuration
    let broker = Arc::new(InMemoryBroker::with_default_config());
    
    // Create publisher and subscriber
    let publisher = InMemoryPublisher::new(broker.clone());
    let mut subscriber = InMemorySubscriber::new(broker.clone());
    
    // Subscribe to a topic
    subscriber.subscribe("orders").await?;
    
    // Publish a message
    let message = Message::new(b"Order #1234".to_vec())
        .with_metadata("customer", "Alice")
        .with_metadata("priority", "high");
    
    publisher.publish("orders", vec![message]).await?;
    
    // Receive the message
    let received = subscriber.receive().await?;
    println!("Received: {}", String::from_utf8_lossy(&received.payload));
    
    Ok(())
}

Configuration

The in-memory broker supports extensive configuration through InMemoryConfig:

Basic Configuration

use kincir::memory::InMemoryConfig;
use std::time::Duration;

let config = InMemoryConfig::new()
    .with_max_queue_size(Some(1000))      // Limit queue size per topic
    .with_max_topics(Some(100))           // Limit number of topics
    .with_stats(true)                     // Enable statistics collection
    .with_maintain_order(true)            // Enable message ordering
    .with_message_ttl(Some(Duration::from_secs(300))); // 5-minute TTL

Pre-configured Profiles

// High-performance configuration
let config = InMemoryConfig::high_performance();

// Testing configuration (optimized for tests)
let config = InMemoryConfig::for_testing();

// Default configuration
let config = InMemoryConfig::default();

Configuration Options

Option Type Default Description
max_queue_size Option<usize> None Maximum messages per topic queue
max_topics Option<usize> None Maximum number of topics
enable_persistence bool true Enable message persistence in queues
maintain_order bool true Maintain message ordering
enable_stats bool false Enable statistics collection
default_message_ttl Option<Duration> None Default message time-to-live
cleanup_interval Duration 60s TTL cleanup task interval

Advanced Features

Message Ordering

When maintain_order is enabled, messages are automatically assigned sequence numbers:

let config = InMemoryConfig::new().with_maintain_order(true);
let broker = Arc::new(InMemoryBroker::new(config));

// Messages will have _sequence metadata: "1", "2", "3", etc.

Message Time-To-Live (TTL)

Configure automatic message expiration:

use std::time::Duration;

let config = InMemoryConfig::new()
    .with_message_ttl(Some(Duration::from_secs(300))) // 5 minutes
    .with_cleanup_interval(Duration::from_secs(60));  // Check every minute

let broker = Arc::new(InMemoryBroker::new(config));

Health Monitoring

Monitor broker health and performance:

// Get broker health information
let health = broker.health_check();
println!("Healthy: {}", health.is_healthy);
println!("Topics: {}", health.topic_count);
println!("Queued messages: {}", health.total_queued_messages);
println!("Memory usage: {} bytes", health.memory_usage_estimate);

// Get detailed topic information
let topics = broker.list_topic_info();
for topic in topics {
    println!("Topic: {}, Messages: {}, Age: {:?}", 
             topic.name, topic.queue_size, topic.age());
}

Statistics Collection

Enable comprehensive statistics:

let config = InMemoryConfig::new().with_stats(true);
let broker = Arc::new(InMemoryBroker::new(config));
let publisher = InMemoryPublisher::new(broker.clone());

// Publish some messages...

// Get statistics
if let Some(stats) = publisher.stats() {
    println!("Messages published: {}", stats.messages_published());
    println!("Messages consumed: {}", stats.messages_consumed());
    println!("Active topics: {}", stats.active_topics());
    println!("Uptime: {:?}", stats.uptime());
}

Graceful Shutdown

Properly shutdown the broker:

// Graceful shutdown (waits for in-flight operations)
broker.shutdown()?;

// Force shutdown (immediate)
broker.force_shutdown()?;

Error Handling

The in-memory broker provides comprehensive error handling:

use kincir::memory::InMemoryError;

match publisher.publish("topic", messages).await {
    Ok(()) => println!("Published successfully"),
    Err(InMemoryError::QueueFull { topic }) => {
        println!("Queue full for topic: {}", topic);
    },
    Err(InMemoryError::MaxTopicsReached { limit }) => {
        println!("Cannot create more topics, limit: {}", limit);
    },
    Err(InMemoryError::BrokerShutdown) => {
        println!("Broker is shutdown");
    },
    Err(e) => println!("Other error: {}", e),
}

Performance Characteristics

The in-memory broker is optimized for high performance:

Benchmarks

Typical performance on modern hardware:

Operation Latency Throughput
Publish < 0.1ms 10,000+ msg/s
Subscribe < 0.1ms 10,000+ msg/s
Topic Creation < 0.05ms 20,000+ ops/s

Testing Integration

The in-memory broker is perfect for testing:

#[cfg(test)]
mod tests {
    use super::*;
    use kincir::memory::{InMemoryBroker, InMemoryPublisher, InMemorySubscriber, InMemoryConfig};
    
    #[tokio::test]
    async fn test_message_flow() {
        let broker = Arc::new(InMemoryBroker::new(InMemoryConfig::for_testing()));
        let publisher = InMemoryPublisher::new(broker.clone());
        let mut subscriber = InMemorySubscriber::new(broker.clone());
        
        subscriber.subscribe("test").await.unwrap();
        
        let message = Message::new(b"test data".to_vec());
        publisher.publish("test", vec![message.clone()]).await.unwrap();
        
        let received = subscriber.receive().await.unwrap();
        assert_eq!(received.payload, message.payload);
    }
}

Migration from External Brokers

The in-memory broker implements the same Publisher and Subscriber traits as other Kincir backends, making migration seamless:

// Before: Using Kafka
let publisher = KafkaPublisher::new("localhost:9092");

// After: Using in-memory broker
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker);

// Same API, no code changes needed!
publisher.publish("topic", messages).await?;

Best Practices

For Development

For Production

For High Throughput

Limitations

While the in-memory broker is feature-rich, be aware of these limitations:

For scenarios requiring persistence or distribution, consider using Kafka or RabbitMQ backends.

API Reference

InMemoryBroker

Core broker implementation with topic management and health monitoring.

InMemoryPublisher

Publisher implementation for sending messages to topics.

InMemorySubscriber

Subscriber implementation for receiving messages from topics.

InMemoryConfig

Configuration builder for customizing broker behavior.

InMemoryError

Error types for comprehensive error handling.

For detailed API documentation, see the Rust docs.