Kafka Backend
This guide explains how to use Kincir with Apache Kafka as a message broker backend.
System Dependencies
If you’re using the Kafka backend (kafka
feature), you’ll need:
- librdkafka 1.8.0 or later
- OpenSSL development libraries (for secure connections)
Ubuntu/Debian:
sudo apt-get install librdkafka-dev libssl-dev
macOS (using Homebrew):
brew install librdkafka openssl
Configuration
To use Kafka with Kincir, you need to enable the kafka
feature in your Cargo.toml
:
[dependencies]
kincir = { version = "0.1.0", features = ["kafka"] }
Publisher Example
Here’s an example of publishing messages to a Kafka topic:
use kincir::prelude::*;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Create a Kafka publisher configuration
let config = KafkaPublisherConfig::new()
.bootstrap_servers("localhost:9092")
.client_id("my-publisher")
.build()?;
// Create a publisher instance
let publisher = KafkaPublisher::new(config)?;
// Create a simple message
let message = Message::new()
.topic("my-topic")
.key("user-123")
.payload("Hello, Kincir!");
// Publish the message
let result = publisher.publish(message).await?;
println!("Message published successfully: {:?}", result);
Ok(())
}
Subscriber Example
Here’s how to create a Kafka subscriber:
use kincir::prelude::*;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Create a Kafka subscriber configuration
let config = KafkaSubscriberConfig::new()
.bootstrap_servers("localhost:9092")
.group_id("my-consumer-group")
.auto_offset_reset(AutoOffsetReset::Earliest)
.build()?;
// Create a subscriber instance
let subscriber = KafkaSubscriber::new(config)?;
// Subscribe to a topic
subscriber.subscribe("my-topic")?;
println!("Waiting for messages...");
// Process messages
subscriber.start(|message| {
println!("Received message: {:?}", message);
// Return Ok to acknowledge the message
Ok(())
}).await?;
Ok(())
}
Advanced Configuration
Kincir allows you to set additional Kafka-specific configuration options:
use kincir::prelude::*;
use std::error::Error;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Create a Kafka publisher with advanced configuration
let config = KafkaPublisherConfig::new()
.bootstrap_servers("kafka1:9092,kafka2:9092,kafka3:9092")
.client_id("advanced-publisher")
.message_timeout(Duration::from_secs(30))
.security_protocol(SecurityProtocol::Ssl)
.ssl_ca_location("/path/to/ca.pem")
.ssl_certificate_location("/path/to/cert.pem")
.ssl_key_location("/path/to/key.pem")
.build()?;
let publisher = KafkaPublisher::new(config)?;
// Use the publisher...
Ok(())
}
Error Handling
Kincir provides specific error types for Kafka-related errors:
use kincir::prelude::*;
use kincir::kafka::error::KafkaError;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = KafkaPublisherConfig::new()
.bootstrap_servers("localhost:9092")
.build()?;
let publisher = match KafkaPublisher::new(config) {
Ok(publisher) => publisher,
Err(err) => {
if let Some(kafka_err) = err.downcast_ref::<KafkaError>() {
match kafka_err {
KafkaError::Configuration(conf_err) => {
eprintln!("Kafka configuration error: {}", conf_err);
},
KafkaError::Connection(conn_err) => {
eprintln!("Kafka connection error: {}", conn_err);
},
// Handle other specific errors
_ => eprintln!("Other Kafka error: {}", kafka_err),
}
}
return Err(err);
}
};
// Use the publisher...
Ok(())
}
Next Steps
Learn more about other available backends in Kincir: