Performance Optimization in Kincir
This guide covers techniques and best practices for optimizing Kincir performance in high-load production scenarios.
Table of Contents
- Benchmarking Basics
- In-Memory Broker Optimization
- Batch Processing
- Connection Pooling
- Memory Management
- Async Optimization
- Monitoring Performance
Benchmarking Basics
Simple Throughput Test
use kincir::{Publisher, Subscriber, Message};
use kincir::memory::{InMemoryBroker, InMemoryPublisher, InMemorySubscriber};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let broker = Arc::new(InMemoryBroker::with_default_config());
let publisher = InMemoryPublisher::new(broker.clone());
let mut subscriber = InMemorySubscriber::new(broker);
subscriber.subscribe("benchmark").await?;
let message_count = 100_000;
let message_size = 1024; // 1KB messages
// Create test messages
let messages: Vec<Message> = (0..message_count)
.map(|i| {
let payload = vec![0u8; message_size];
Message::new(payload).with_metadata("id", &i.to_string())
})
.collect();
println!("Starting throughput test with {} messages of {} bytes each",
message_count, message_size);
// Measure publish throughput
let start = Instant::now();
publisher.publish("benchmark", messages).await?;
let publish_duration = start.elapsed();
let publish_throughput = message_count as f64 / publish_duration.as_secs_f64();
println!("Publish throughput: {:.2} messages/second", publish_throughput);
// Measure receive throughput
let start = Instant::now();
for _ in 0..message_count {
let _ = subscriber.receive().await?;
}
let receive_duration = start.elapsed();
let receive_throughput = message_count as f64 / receive_duration.as_secs_f64();
println!("Receive throughput: {:.2} messages/second", receive_throughput);
Ok(())
}
In-Memory Broker Optimization
High-Performance Configuration
use kincir::memory::{InMemoryBroker, BrokerConfig};
use std::sync::Arc;
use std::time::Duration;
fn create_optimized_broker() -> Arc<InMemoryBroker> {
let config = BrokerConfig {
max_queue_size: 1_000_000, // Large queue for high throughput
enable_message_ordering: false, // Disable if ordering not needed
enable_ttl: false, // Disable TTL for better performance
enable_health_monitoring: false, // Disable monitoring in production
cleanup_interval: Duration::from_secs(300), // Less frequent cleanup
max_subscribers_per_topic: 1000,
enable_statistics: false, // Disable stats collection
};
Arc::new(InMemoryBroker::new(config))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let broker = create_optimized_broker();
// Use the optimized broker
let publisher = kincir::memory::InMemoryPublisher::new(broker.clone());
let mut subscriber = kincir::memory::InMemorySubscriber::new(broker);
// Your high-performance application logic here
Ok(())
}
Concurrent Publishers and Subscribers
use kincir::memory::{InMemoryBroker, InMemoryPublisher, InMemorySubscriber};
use kincir::{Publisher, Subscriber, Message};
use std::sync::Arc;
use tokio::task::JoinSet;
use std::time::Instant;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let broker = Arc::new(InMemoryBroker::with_default_config());
let num_publishers = 10;
let num_subscribers = 5;
let messages_per_publisher = 10_000;
let mut tasks = JoinSet::new();
// Spawn multiple publishers
for publisher_id in 0..num_publishers {
let broker = broker.clone();
tasks.spawn(async move {
let publisher = InMemoryPublisher::new(broker);
let start = Instant::now();
for i in 0..messages_per_publisher {
let message = Message::new(
format!("Publisher {} - Message {}", publisher_id, i).into_bytes()
);
publisher.publish("high-throughput", vec![message]).await?;
}
let duration = start.elapsed();
println!("Publisher {} completed in {:?}", publisher_id, duration);
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
});
}
// Spawn multiple subscribers
for subscriber_id in 0..num_subscribers {
let broker = broker.clone();
tasks.spawn(async move {
let mut subscriber = InMemorySubscriber::new(broker);
subscriber.subscribe("high-throughput").await?;
let mut received_count = 0;
let start = Instant::now();
// Each subscriber processes messages for a fixed duration
let timeout = tokio::time::sleep(std::time::Duration::from_secs(10));
tokio::pin!(timeout);
loop {
tokio::select! {
result = subscriber.receive() => {
match result {
Ok(_) => received_count += 1,
Err(e) => eprintln!("Subscriber {} error: {}", subscriber_id, e),
}
}
_ = &mut timeout => break,
}
}
let duration = start.elapsed();
let throughput = received_count as f64 / duration.as_secs_f64();
println!("Subscriber {} processed {} messages ({:.2} msg/s)",
subscriber_id, received_count, throughput);
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
});
}
// Wait for all tasks to complete
while let Some(result) = tasks.join_next().await {
if let Err(e) = result? {
eprintln!("Task error: {}", e);
}
}
Ok(())
}
Batch Processing
Efficient Batch Publishing
use kincir::{Publisher, Message};
use kincir::memory::InMemoryPublisher;
use std::sync::Arc;
use std::time::Instant;
pub struct BatchPublisher {
publisher: InMemoryPublisher,
batch_size: usize,
batch_timeout: std::time::Duration,
}
impl BatchPublisher {
pub fn new(
broker: Arc<kincir::memory::InMemoryBroker>,
batch_size: usize,
batch_timeout: std::time::Duration,
) -> Self {
Self {
publisher: InMemoryPublisher::new(broker),
batch_size,
batch_timeout,
}
}
pub async fn publish_batch(
&self,
topic: &str,
messages: Vec<Message>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let start = Instant::now();
// Process messages in batches
for chunk in messages.chunks(self.batch_size) {
self.publisher.publish(topic, chunk.to_vec()).await?;
// Optional: small delay between batches to prevent overwhelming
if chunk.len() == self.batch_size {
tokio::time::sleep(std::time::Duration::from_micros(100)).await;
}
}
let duration = start.elapsed();
println!("Published {} messages in {} batches ({:?})",
messages.len(),
(messages.len() + self.batch_size - 1) / self.batch_size,
duration);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let broker = Arc::new(kincir::memory::InMemoryBroker::with_default_config());
let batch_publisher = BatchPublisher::new(
broker,
1000, // Batch size
std::time::Duration::from_millis(100), // Batch timeout
);
// Create a large number of messages
let messages: Vec<Message> = (0..50_000)
.map(|i| Message::new(format!("Batch message {}", i).into_bytes()))
.collect();
batch_publisher.publish_batch("batch-topic", messages).await?;
Ok(())
}
Batch Message Processing
use kincir::{Subscriber, Message};
use kincir::memory::InMemorySubscriber;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::timeout;
pub struct BatchProcessor {
subscriber: InMemorySubscriber,
batch_size: usize,
batch_timeout: Duration,
}
impl BatchProcessor {
pub fn new(
broker: Arc<kincir::memory::InMemoryBroker>,
batch_size: usize,
batch_timeout: Duration,
) -> Self {
Self {
subscriber: InMemorySubscriber::new(broker),
batch_size,
batch_timeout,
}
}
pub async fn subscribe(&mut self, topic: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.subscriber.subscribe(topic).await
}
pub async fn process_batches<F>(&mut self, mut processor: F) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
F: FnMut(Vec<Message>) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>>,
{
let mut batch = Vec::with_capacity(self.batch_size);
let mut last_batch_time = Instant::now();
loop {
// Try to receive a message with timeout
let receive_timeout = Duration::from_millis(100);
match timeout(receive_timeout, self.subscriber.receive()).await {
Ok(Ok(message)) => {
batch.push(message);
// Process batch if it's full or timeout exceeded
if batch.len() >= self.batch_size ||
last_batch_time.elapsed() >= self.batch_timeout {
if !batch.is_empty() {
let batch_to_process = std::mem::take(&mut batch);
processor(batch_to_process).await?;
last_batch_time = Instant::now();
}
}
}
Ok(Err(e)) => {
eprintln!("Error receiving message: {}", e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(_) => {
// Timeout - process any pending messages
if !batch.is_empty() && last_batch_time.elapsed() >= self.batch_timeout {
let batch_to_process = std::mem::take(&mut batch);
processor(batch_to_process).await?;
last_batch_time = Instant::now();
}
}
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let broker = Arc::new(kincir::memory::InMemoryBroker::with_default_config());
let mut batch_processor = BatchProcessor::new(
broker,
100, // Process 100 messages at a time
Duration::from_millis(500), // Or every 500ms
);
batch_processor.subscribe("batch-topic").await?;
batch_processor.process_batches(|batch| {
Box::pin(async move {
println!("Processing batch of {} messages", batch.len());
// Simulate batch processing
for (i, message) in batch.iter().enumerate() {
println!(" Message {}: {:?}", i, String::from_utf8_lossy(&message.payload));
}
// Simulate processing time
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(())
})
}).await?;
Ok(())
}
Connection Pooling
RabbitMQ Connection Pool
use kincir::rabbitmq::RabbitMQPublisher;
use std::sync::Arc;
use tokio::sync::Semaphore;
use std::collections::VecDeque;
use tokio::sync::Mutex;
pub struct RabbitMQConnectionPool {
publishers: Arc<Mutex<VecDeque<RabbitMQPublisher>>>,
semaphore: Arc<Semaphore>,
connection_string: String,
max_connections: usize,
}
impl RabbitMQConnectionPool {
pub fn new(connection_string: String, max_connections: usize) -> Self {
Self {
publishers: Arc::new(Mutex::new(VecDeque::new())),
semaphore: Arc::new(Semaphore::new(max_connections)),
connection_string,
max_connections,
}
}
pub async fn get_publisher(&self) -> Result<PooledPublisher, Box<dyn std::error::Error + Send + Sync>> {
// Acquire semaphore permit
let permit = self.semaphore.clone().acquire_owned().await?;
// Try to get existing publisher from pool
let mut publishers = self.publishers.lock().await;
if let Some(publisher) = publishers.pop_front() {
return Ok(PooledPublisher {
publisher: Some(publisher),
pool: self.publishers.clone(),
_permit: permit,
});
}
drop(publishers);
// Create new publisher if pool is empty
let publisher = RabbitMQPublisher::new(&self.connection_string);
Ok(PooledPublisher {
publisher: Some(publisher),
pool: self.publishers.clone(),
_permit: permit,
})
}
}
pub struct PooledPublisher {
publisher: Option<RabbitMQPublisher>,
pool: Arc<Mutex<VecDeque<RabbitMQPublisher>>>,
_permit: tokio::sync::OwnedSemaphorePermit,
}
impl PooledPublisher {
pub fn get(&self) -> &RabbitMQPublisher {
self.publisher.as_ref().unwrap()
}
}
impl Drop for PooledPublisher {
fn drop(&mut self) {
if let Some(publisher) = self.publisher.take() {
let pool = self.pool.clone();
tokio::spawn(async move {
let mut publishers = pool.lock().await;
publishers.push_back(publisher);
});
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let pool = Arc::new(RabbitMQConnectionPool::new(
"amqp://localhost:5672".to_string(),
10, // Max 10 connections
));
// Use the connection pool
let pooled_publisher = pool.get_publisher().await?;
let message = kincir::Message::new(b"Pooled message".to_vec());
pooled_publisher.get().publish("test-topic", vec![message]).await?;
// Publisher is automatically returned to pool when dropped
Ok(())
}
Memory Management
Memory-Efficient Message Handling
use kincir::{Message, Publisher, Subscriber};
use std::sync::Arc;
use bytes::Bytes;
// Use Bytes for zero-copy message payloads
pub struct EfficientMessage {
pub uuid: String,
pub payload: Bytes, // Zero-copy byte buffer
pub metadata: std::collections::HashMap<String, String>,
}
impl EfficientMessage {
pub fn new(payload: Bytes) -> Self {
Self {
uuid: uuid::Uuid::new_v4().to_string(),
payload,
metadata: std::collections::HashMap::new(),
}
}
pub fn from_slice(data: &[u8]) -> Self {
Self::new(Bytes::copy_from_slice(data))
}
pub fn from_static(data: &'static [u8]) -> Self {
Self::new(Bytes::from_static(data))
}
}
// Memory pool for message reuse
pub struct MessagePool {
pool: Arc<tokio::sync::Mutex<Vec<Message>>>,
max_size: usize,
}
impl MessagePool {
pub fn new(max_size: usize) -> Self {
Self {
pool: Arc::new(tokio::sync::Mutex::new(Vec::with_capacity(max_size))),
max_size,
}
}
pub async fn get_message(&self, payload: Vec<u8>) -> Message {
let mut pool = self.pool.lock().await;
if let Some(mut message) = pool.pop() {
// Reuse existing message
message.payload = payload;
message.metadata.clear();
message.uuid = uuid::Uuid::new_v4().to_string();
message
} else {
// Create new message
Message::new(payload)
}
}
pub async fn return_message(&self, message: Message) {
let mut pool = self.pool.lock().await;
if pool.len() < self.max_size {
pool.push(message);
}
// If pool is full, message is dropped and memory is freed
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let message_pool = Arc::new(MessagePool::new(1000));
// Use the message pool
let message = message_pool.get_message(b"Pooled message".to_vec()).await;
// Process the message...
// Return to pool when done
message_pool.return_message(message).await;
Ok(())
}
Async Optimization
Efficient Async Patterns
use kincir::{Publisher, Subscriber, Message};
use std::sync::Arc;
use tokio::sync::mpsc;
use futures::stream::{StreamExt, FuturesUnordered};
// Producer-consumer pattern with channels
pub struct AsyncMessageProcessor {
sender: mpsc::Sender<Message>,
receiver: Option<mpsc::Receiver<Message>>,
}
impl AsyncMessageProcessor {
pub fn new(buffer_size: usize) -> Self {
let (sender, receiver) = mpsc::channel(buffer_size);
Self {
sender,
receiver: Some(receiver),
}
}
pub async fn send_message(&self, message: Message) -> Result<(), mpsc::error::SendError<Message>> {
self.sender.send(message).await
}
pub async fn process_messages<F>(&mut self, mut processor: F) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
F: FnMut(Message) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>>,
{
let mut receiver = self.receiver.take().unwrap();
while let Some(message) = receiver.recv().await {
processor(message).await?;
}
Ok(())
}
}
// Parallel message processing
pub async fn process_messages_parallel<F>(
messages: Vec<Message>,
concurrency: usize,
processor: F,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
F: Fn(Message) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send>> + Send + Sync + 'static,
{
let processor = Arc::new(processor);
let mut futures = FuturesUnordered::new();
let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));
for message in messages {
let processor = processor.clone();
let semaphore = semaphore.clone();
futures.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
processor(message).await
}));
}
while let Some(result) = futures.next().await {
result??;
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Example: Process 1000 messages with max 10 concurrent tasks
let messages: Vec<Message> = (0..1000)
.map(|i| Message::new(format!("Message {}", i).into_bytes()))
.collect();
process_messages_parallel(
messages,
10, // Max concurrency
|message| {
Box::pin(async move {
// Simulate processing
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
println!("Processed: {:?}", String::from_utf8_lossy(&message.payload));
Ok(())
})
},
).await?;
Ok(())
}
Monitoring Performance
Performance Metrics Collection
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct PerformanceMetrics {
pub messages_published: u64,
pub messages_received: u64,
pub total_publish_time: Duration,
pub total_receive_time: Duration,
pub error_count: u64,
pub start_time: Instant,
}
impl PerformanceMetrics {
pub fn new() -> Self {
Self {
messages_published: 0,
messages_received: 0,
total_publish_time: Duration::ZERO,
total_receive_time: Duration::ZERO,
error_count: 0,
start_time: Instant::now(),
}
}
pub fn publish_throughput(&self) -> f64 {
if self.total_publish_time.is_zero() {
0.0
} else {
self.messages_published as f64 / self.total_publish_time.as_secs_f64()
}
}
pub fn receive_throughput(&self) -> f64 {
if self.total_receive_time.is_zero() {
0.0
} else {
self.messages_received as f64 / self.total_receive_time.as_secs_f64()
}
}
pub fn overall_throughput(&self) -> f64 {
let elapsed = self.start_time.elapsed().as_secs_f64();
if elapsed == 0.0 {
0.0
} else {
(self.messages_published + self.messages_received) as f64 / elapsed
}
}
pub fn error_rate(&self) -> f64 {
let total_operations = self.messages_published + self.messages_received;
if total_operations == 0 {
0.0
} else {
self.error_count as f64 / total_operations as f64
}
}
}
pub struct MetricsCollector {
metrics: Arc<RwLock<PerformanceMetrics>>,
}
impl MetricsCollector {
pub fn new() -> Self {
Self {
metrics: Arc::new(RwLock::new(PerformanceMetrics::new())),
}
}
pub async fn record_publish(&self, duration: Duration) {
let mut metrics = self.metrics.write().await;
metrics.messages_published += 1;
metrics.total_publish_time += duration;
}
pub async fn record_receive(&self, duration: Duration) {
let mut metrics = self.metrics.write().await;
metrics.messages_received += 1;
metrics.total_receive_time += duration;
}
pub async fn record_error(&self) {
let mut metrics = self.metrics.write().await;
metrics.error_count += 1;
}
pub async fn get_metrics(&self) -> PerformanceMetrics {
self.metrics.read().await.clone()
}
pub async fn print_summary(&self) {
let metrics = self.get_metrics().await;
println!("=== Performance Summary ===");
println!("Messages Published: {}", metrics.messages_published);
println!("Messages Received: {}", metrics.messages_received);
println!("Publish Throughput: {:.2} msg/s", metrics.publish_throughput());
println!("Receive Throughput: {:.2} msg/s", metrics.receive_throughput());
println!("Overall Throughput: {:.2} msg/s", metrics.overall_throughput());
println!("Error Rate: {:.4}%", metrics.error_rate() * 100.0);
println!("Total Runtime: {:?}", metrics.start_time.elapsed());
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let metrics = Arc::new(MetricsCollector::new());
let broker = Arc::new(kincir::memory::InMemoryBroker::with_default_config());
let publisher = kincir::memory::InMemoryPublisher::new(broker.clone());
let mut subscriber = kincir::memory::InMemorySubscriber::new(broker);
subscriber.subscribe("perf-test").await?;
// Publish messages with timing
for i in 0..1000 {
let start = Instant::now();
let message = Message::new(format!("Message {}", i).into_bytes());
match publisher.publish("perf-test", vec![message]).await {
Ok(_) => {
metrics.record_publish(start.elapsed()).await;
}
Err(_) => {
metrics.record_error().await;
}
}
}
// Receive messages with timing
for _ in 0..1000 {
let start = Instant::now();
match subscriber.receive().await {
Ok(_) => {
metrics.record_receive(start.elapsed()).await;
}
Err(_) => {
metrics.record_error().await;
}
}
}
// Print performance summary
metrics.print_summary().await;
Ok(())
}
Best Practices Summary
- Use appropriate batch sizes - Balance latency vs throughput
- Configure broker settings - Disable unnecessary features for production
- Implement connection pooling - Reuse connections for external brokers
- Monitor memory usage - Use object pools for high-frequency operations
- Optimize async patterns - Use channels and semaphores for concurrency control
- Measure everything - Collect metrics to identify bottlenecks
- Test under load - Benchmark with realistic workloads
- Profile your application - Use tools like
perf
andflamegraph
Running Performance Tests
# Clone the repository
git clone https://github.com/rezacute/kincir.git
cd kincir
# Run performance benchmarks
cargo run --release --example performance-benchmark
# Profile with flamegraph (requires cargo-flamegraph)
cargo install flamegraph
cargo flamegraph --example performance-benchmark
# Memory profiling with valgrind (Linux)
valgrind --tool=massif cargo run --release --example performance-benchmark
This performance guide provides the foundation for building high-throughput, low-latency applications with Kincir.