Kafka Backend

This guide explains how to use Kincir with Apache Kafka as a message broker backend.

System Dependencies

If you’re using the Kafka backend (kafka feature), you’ll need:

Ubuntu/Debian:

sudo apt-get install librdkafka-dev libssl-dev

macOS (using Homebrew):

brew install librdkafka openssl

Configuration

To use Kafka with Kincir, you need to enable the kafka feature in your Cargo.toml:

[dependencies]
kincir = { version = "0.1.0", features = ["kafka"] }

Publisher Example

Here’s an example of publishing messages to a Kafka topic:

use kincir::prelude::*;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Create a Kafka publisher configuration
    let config = KafkaPublisherConfig::new()
        .bootstrap_servers("localhost:9092")
        .client_id("my-publisher")
        .build()?;
    
    // Create a publisher instance
    let publisher = KafkaPublisher::new(config)?;
    
    // Create a simple message
    let message = Message::new()
        .topic("my-topic")
        .key("user-123")
        .payload("Hello, Kincir!");
    
    // Publish the message
    let result = publisher.publish(message).await?;
    
    println!("Message published successfully: {:?}", result);
    
    Ok(())
}

Subscriber Example

Here’s how to create a Kafka subscriber:

use kincir::prelude::*;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Create a Kafka subscriber configuration
    let config = KafkaSubscriberConfig::new()
        .bootstrap_servers("localhost:9092")
        .group_id("my-consumer-group")
        .auto_offset_reset(AutoOffsetReset::Earliest)
        .build()?;
    
    // Create a subscriber instance
    let subscriber = KafkaSubscriber::new(config)?;
    
    // Subscribe to a topic
    subscriber.subscribe("my-topic")?;
    
    println!("Waiting for messages...");
    
    // Process messages
    subscriber.start(|message| {
        println!("Received message: {:?}", message);
        
        // Return Ok to acknowledge the message
        Ok(())
    }).await?;
    
    Ok(())
}

Advanced Configuration

Kincir allows you to set additional Kafka-specific configuration options:

use kincir::prelude::*;
use std::error::Error;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Create a Kafka publisher with advanced configuration
    let config = KafkaPublisherConfig::new()
        .bootstrap_servers("kafka1:9092,kafka2:9092,kafka3:9092")
        .client_id("advanced-publisher")
        .message_timeout(Duration::from_secs(30))
        .security_protocol(SecurityProtocol::Ssl)
        .ssl_ca_location("/path/to/ca.pem")
        .ssl_certificate_location("/path/to/cert.pem")
        .ssl_key_location("/path/to/key.pem")
        .build()?;
    
    let publisher = KafkaPublisher::new(config)?;
    
    // Use the publisher...
    
    Ok(())
}

Error Handling

Kincir provides specific error types for Kafka-related errors:

use kincir::prelude::*;
use kincir::kafka::error::KafkaError;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let config = KafkaPublisherConfig::new()
        .bootstrap_servers("localhost:9092")
        .build()?;
    
    let publisher = match KafkaPublisher::new(config) {
        Ok(publisher) => publisher,
        Err(err) => {
            if let Some(kafka_err) = err.downcast_ref::<KafkaError>() {
                match kafka_err {
                    KafkaError::Configuration(conf_err) => {
                        eprintln!("Kafka configuration error: {}", conf_err);
                    },
                    KafkaError::Connection(conn_err) => {
                        eprintln!("Kafka connection error: {}", conn_err);
                    },
                    // Handle other specific errors
                    _ => eprintln!("Other Kafka error: {}", kafka_err),
                }
            }
            return Err(err);
        }
    };
    
    // Use the publisher...
    
    Ok(())
}

Next Steps

Learn more about other available backends in Kincir: