Kafka Integration Example
Apache Kafka is a distributed streaming platform designed for high-throughput, fault-tolerant message streaming. Kincir provides seamless integration with Kafka through its unified interface.
Prerequisites
Before running these examples, ensure you have Kafka installed and running:
# Using Docker Compose
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# Start with: docker-compose up -d
Basic Usage
Simple Producer-Consumer
use kincir::kafka::{KafkaPublisher, KafkaSubscriber};
use kincir::{Publisher, Subscriber, Message};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create Kafka publisher and subscriber
let publisher = KafkaPublisher::new("localhost:9092");
let mut subscriber = KafkaSubscriber::new("localhost:9092", "my-consumer-group");
// Subscribe to a topic
subscriber.subscribe("user-events").await?;
// Publish messages
let messages = vec![
Message::new(b"User registered: alice@example.com".to_vec())
.with_metadata("event_type", "user_registration")
.with_metadata("user_id", "alice123")
.with_metadata("timestamp", &chrono::Utc::now().to_rfc3339()),
Message::new(b"User logged in: alice@example.com".to_vec())
.with_metadata("event_type", "user_login")
.with_metadata("user_id", "alice123")
.with_metadata("ip_address", "192.168.1.100"),
];
publisher.publish("user-events", messages).await?;
// Consume messages
for _ in 0..2 {
let message = subscriber.receive().await?;
println!("Received: {:?}", String::from_utf8_lossy(&message.payload));
println!("Event type: {:?}", message.get_metadata("event_type"));
}
Ok(())
}
High-Throughput Publishing
use kincir::kafka::{KafkaPublisher, KafkaConfig};
use kincir::{Publisher, Message};
use std::time::Instant;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Configure Kafka for high throughput
let config = KafkaConfig {
bootstrap_servers: "localhost:9092".to_string(),
batch_size: 16384,
linger_ms: 5,
compression_type: "snappy".to_string(),
acks: "1".to_string(),
retries: 3,
max_in_flight_requests: 5,
};
let publisher = KafkaPublisher::with_config(config);
let message_count = 100_000;
let batch_size = 1000;
println!("Publishing {} messages in batches of {}", message_count, batch_size);
let start = Instant::now();
for batch in 0..(message_count / batch_size) {
let mut messages = Vec::with_capacity(batch_size);
for i in 0..batch_size {
let message_id = batch * batch_size + i;
let message = Message::new(format!("High throughput message #{}", message_id).into_bytes())
.with_metadata("batch", &batch.to_string())
.with_metadata("message_id", &message_id.to_string())
.with_metadata("timestamp", &chrono::Utc::now().timestamp().to_string());
messages.push(message);
}
publisher.publish("high-throughput-topic", messages).await?;
if batch % 10 == 0 {
println!("Published batch {} ({} messages)", batch, (batch + 1) * batch_size);
}
}
let duration = start.elapsed();
println!("Published {} messages in {:?}", message_count, duration);
println!("Throughput: {:.2} messages/second",
message_count as f64 / duration.as_secs_f64());
Ok(())
}
Consumer Groups
use kincir::kafka::{KafkaSubscriber, KafkaConsumerConfig};
use kincir::{Subscriber, Message};
use std::sync::Arc;
use tokio::task;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create multiple consumers in the same group
let consumer_group = "order-processing-group";
let topic = "orders";
let mut consumer_handles = vec![];
for consumer_id in 0..3 {
let handle = task::spawn(async move {
let config = KafkaConsumerConfig {
bootstrap_servers: "localhost:9092".to_string(),
group_id: consumer_group.to_string(),
auto_offset_reset: "earliest".to_string(),
enable_auto_commit: true,
auto_commit_interval_ms: 1000,
session_timeout_ms: 30000,
max_poll_records: 500,
};
let mut subscriber = KafkaSubscriber::with_config(config);
subscriber.subscribe(topic).await.unwrap();
let mut message_count = 0;
loop {
match subscriber.receive().await {
Ok(message) => {
message_count += 1;
let order_data = String::from_utf8_lossy(&message.payload);
println!("Consumer {} processed order: {} (total: {})",
consumer_id, order_data, message_count);
// Simulate processing time
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Err(e) => {
eprintln!("Consumer {} error: {}", consumer_id, e);
break;
}
}
}
});
consumer_handles.push(handle);
}
// Let consumers run for a while
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
// Cancel consumers
for handle in consumer_handles {
handle.abort();
}
Ok(())
}
Partitioned Topics
use kincir::kafka::{KafkaPublisher, KafkaPartitioner};
use kincir::{Publisher, Message};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let publisher = KafkaPublisher::new("localhost:9092");
// Publish messages with specific partitioning
let user_ids = vec!["user1", "user2", "user3", "user4", "user5"];
for user_id in user_ids {
let messages = vec![
Message::new(format!("Login event for {}", user_id).into_bytes())
.with_metadata("user_id", user_id)
.with_metadata("event_type", "login")
.with_metadata("partition_key", user_id), // Use user_id for partitioning
Message::new(format!("Purchase event for {}", user_id).into_bytes())
.with_metadata("user_id", user_id)
.with_metadata("event_type", "purchase")
.with_metadata("partition_key", user_id),
];
// Messages with the same partition_key will go to the same partition
publisher.publish_with_key("user-events", messages, Some(user_id)).await?;
}
println!("Published events for all users with partitioning");
Ok(())
}
Exactly-Once Semantics
use kincir::kafka::{KafkaPublisher, KafkaTransactionalConfig};
use kincir::{Publisher, Message};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Configure for exactly-once semantics
let config = KafkaTransactionalConfig {
bootstrap_servers: "localhost:9092".to_string(),
transactional_id: "order-processor-1".to_string(),
enable_idempotence: true,
acks: "all".to_string(),
retries: i32::MAX,
max_in_flight_requests: 1,
};
let publisher = KafkaPublisher::with_transactional_config(config);
// Begin transaction
publisher.begin_transaction().await?;
match process_order_batch().await {
Ok(processed_orders) => {
// Publish all processed orders in a single transaction
for order in processed_orders {
let message = Message::new(order.to_json().into_bytes())
.with_metadata("order_id", &order.id)
.with_metadata("status", "processed");
publisher.publish("processed-orders", vec![message]).await?;
}
// Commit transaction
publisher.commit_transaction().await?;
println!("Successfully processed and published order batch");
}
Err(e) => {
// Abort transaction on error
publisher.abort_transaction().await?;
eprintln!("Failed to process orders, transaction aborted: {}", e);
}
}
Ok(())
}
#[derive(Debug)]
struct Order {
id: String,
amount: f64,
customer_id: String,
}
impl Order {
fn to_json(&self) -> String {
format!(r#"{{"id":"{}","amount":{},"customer_id":"{}"}}"#,
self.id, self.amount, self.customer_id)
}
}
async fn process_order_batch() -> Result<Vec<Order>, Box<dyn std::error::Error + Send + Sync>> {
// Simulate order processing
let orders = vec![
Order { id: "ORD-001".to_string(), amount: 99.99, customer_id: "CUST-123".to_string() },
Order { id: "ORD-002".to_string(), amount: 149.50, customer_id: "CUST-456".to_string() },
Order { id: "ORD-003".to_string(), amount: 75.25, customer_id: "CUST-789".to_string() },
];
// Simulate processing time
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
Ok(orders)
}
Stream Processing
use kincir::kafka::{KafkaSubscriber, KafkaPublisher};
use kincir::{Subscriber, Publisher, Message};
use serde_json::{Value, json};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Input stream: raw events
let mut input_subscriber = KafkaSubscriber::new("localhost:9092", "stream-processor");
input_subscriber.subscribe("raw-events").await?;
// Output streams: processed events
let metrics_publisher = KafkaPublisher::new("localhost:9092");
let alerts_publisher = KafkaPublisher::new("localhost:9092");
println!("Starting stream processor...");
loop {
match input_subscriber.receive().await {
Ok(message) => {
// Parse incoming event
if let Ok(event) = parse_event(&message) {
// Process event and generate outputs
if let Some(metric) = generate_metric(&event) {
metrics_publisher.publish("metrics", vec![metric]).await?;
}
if let Some(alert) = check_for_alert(&event) {
alerts_publisher.publish("alerts", vec![alert]).await?;
}
// Log processing
println!("Processed event: {}", event["event_type"]);
}
}
Err(e) => {
eprintln!("Stream processing error: {}", e);
// Implement error handling strategy
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
}
fn parse_event(message: &Message) -> Result<Value, serde_json::Error> {
let payload = String::from_utf8_lossy(&message.payload);
serde_json::from_str(&payload)
}
fn generate_metric(event: &Value) -> Option<Message> {
if let Some(event_type) = event["event_type"].as_str() {
let metric = json!({
"metric_name": format!("{}_count", event_type),
"value": 1,
"timestamp": chrono::Utc::now().timestamp(),
"tags": {
"event_type": event_type
}
});
Some(Message::new(metric.to_string().into_bytes())
.with_metadata("metric_type", "counter")
.with_metadata("source", "stream_processor"))
} else {
None
}
}
fn check_for_alert(event: &Value) -> Option<Message> {
// Check for error events
if event["event_type"] == "error" {
let alert = json!({
"alert_type": "error_detected",
"severity": "high",
"message": format!("Error event detected: {}", event["message"]),
"timestamp": chrono::Utc::now().to_rfc3339(),
"source_event": event
});
Some(Message::new(alert.to_string().into_bytes())
.with_metadata("alert_level", "high")
.with_metadata("alert_type", "error_detected"))
} else {
None
}
}
Kafka Streams Integration
use kincir::kafka::{KafkaStreamsBuilder, KafkaSubscriber, KafkaPublisher};
use kincir::{Subscriber, Publisher, Message};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Word count stream processing example
let word_counts: Arc<Mutex<HashMap<String, u64>>> = Arc::new(Mutex::new(HashMap::new()));
let mut subscriber = KafkaSubscriber::new("localhost:9092", "word-count-processor");
subscriber.subscribe("text-input").await?;
let publisher = KafkaPublisher::new("localhost:9092");
println!("Starting word count stream processor...");
loop {
match subscriber.receive().await {
Ok(message) => {
let text = String::from_utf8_lossy(&message.payload);
// Process text and count words
let words: Vec<&str> = text.split_whitespace().collect();
let mut counts = word_counts.lock().unwrap();
for word in words {
let word = word.to_lowercase();
*counts.entry(word.clone()).or_insert(0) += 1;
// Publish updated count
let count_message = Message::new(counts[&word].to_string().into_bytes())
.with_metadata("word", &word)
.with_metadata("count", &counts[&word].to_string())
.with_metadata("timestamp", &chrono::Utc::now().timestamp().to_string());
publisher.publish("word-counts", vec![count_message]).await?;
}
println!("Processed text with {} words", words.len());
}
Err(e) => {
eprintln!("Stream processing error: {}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
}
Performance Monitoring
use kincir::kafka::{KafkaPublisher, KafkaSubscriber, KafkaMetrics};
use kincir::{Publisher, Subscriber, Message};
use std::time::{Instant, Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let publisher = KafkaPublisher::new("localhost:9092");
let mut subscriber = KafkaSubscriber::new("localhost:9092", "perf-test-group");
subscriber.subscribe("performance-test").await?;
// Performance test parameters
let message_count = 10_000;
let message_size = 1024; // 1KB messages
let payload = vec![0u8; message_size];
println!("Starting Kafka performance test...");
println!("Messages: {}, Size: {} bytes each", message_count, message_size);
// Measure publishing performance
let start = Instant::now();
let mut messages = Vec::with_capacity(100);
for i in 0..message_count {
let message = Message::new(payload.clone())
.with_metadata("sequence", &i.to_string())
.with_metadata("timestamp", &chrono::Utc::now().timestamp_nanos().to_string());
messages.push(message);
// Publish in batches of 100
if messages.len() == 100 || i == message_count - 1 {
publisher.publish("performance-test", messages.clone()).await?;
messages.clear();
}
if i % 1000 == 0 {
println!("Published {} messages", i + 1);
}
}
let publish_duration = start.elapsed();
println!("Publishing completed in {:?}", publish_duration);
println!("Publishing rate: {:.2} messages/second",
message_count as f64 / publish_duration.as_secs_f64());
// Measure consuming performance
let start = Instant::now();
let mut received_count = 0;
while received_count < message_count {
match subscriber.receive().await {
Ok(_message) => {
received_count += 1;
if received_count % 1000 == 0 {
println!("Received {} messages", received_count);
}
}
Err(e) => {
eprintln!("Error receiving message: {}", e);
break;
}
}
}
let consume_duration = start.elapsed();
println!("Consuming completed in {:?}", consume_duration);
println!("Consuming rate: {:.2} messages/second",
received_count as f64 / consume_duration.as_secs_f64());
// Get Kafka metrics
if let Ok(metrics) = publisher.get_metrics().await {
println!("Publisher metrics: {:?}", metrics);
}
if let Ok(metrics) = subscriber.get_metrics().await {
println!("Consumer metrics: {:?}", metrics);
}
Ok(())
}
Error Handling and Resilience
use kincir::kafka::{KafkaPublisher, KafkaSubscriber, KafkaError};
use kincir::{Publisher, Subscriber, Message};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut retry_count = 0;
let max_retries = 5;
loop {
match run_kafka_client().await {
Ok(()) => {
println!("Kafka client completed successfully");
break;
}
Err(KafkaError::BrokerNotAvailable) => {
retry_count += 1;
if retry_count >= max_retries {
eprintln!("Max retries reached. Kafka broker not available.");
return Err("Kafka broker not available".into());
}
println!("Kafka broker not available (attempt {}). Retrying in 10 seconds...", retry_count);
sleep(Duration::from_secs(10)).await;
}
Err(KafkaError::TopicNotFound(topic)) => {
println!("Topic '{}' not found. Creating topic...", topic);
// In a real application, you might want to create the topic here
sleep(Duration::from_secs(5)).await;
}
Err(e) => {
eprintln!("Non-recoverable Kafka error: {}", e);
return Err(e.into());
}
}
}
Ok(())
}
async fn run_kafka_client() -> Result<(), KafkaError> {
let publisher = KafkaPublisher::new("localhost:9092");
let mut subscriber = KafkaSubscriber::new("localhost:9092", "resilient-consumer");
subscriber.subscribe("resilient-topic").await?;
let message = Message::new(b"Resilient message".to_vec());
publisher.publish("resilient-topic", vec![message]).await?;
let received = subscriber.receive().await?;
println!("Received: {:?}", String::from_utf8_lossy(&received.payload));
Ok(())
}
Testing with Kafka
#[cfg(test)]
mod tests {
use super::*;
use testcontainers::{clients::Cli, images::kafka::Kafka, Container};
#[tokio::test]
async fn test_kafka_integration() {
// Start Kafka container for testing
let docker = Cli::default();
let kafka_container = docker.run(Kafka::default());
let bootstrap_servers = format!(
"127.0.0.1:{}",
kafka_container.get_host_port_ipv4(9092)
);
let publisher = KafkaPublisher::new(&bootstrap_servers);
let mut subscriber = KafkaSubscriber::new(&bootstrap_servers, "test-group");
subscriber.subscribe("test-topic").await.unwrap();
let message = Message::new(b"integration test message".to_vec());
publisher.publish("test-topic", vec![message]).await.unwrap();
let received = subscriber.receive().await.unwrap();
assert_eq!(received.payload, b"integration test message");
}
}
Next Steps
- MQTT Support - IoT messaging
- Message Acknowledgments - Reliable processing
- Stream Processing - Real-time data processing
- Performance Optimization - High-throughput tuning