Microservices Communication with Kincir
This guide demonstrates how to use Kincir for inter-service communication in a microservices architecture.
Table of Contents
- Service Architecture
- Event-Driven Communication
- Request-Response Pattern
- Service Discovery
- Load Balancing
Service Architecture
Order Processing System
use kincir::{Publisher, Subscriber, Message};
use kincir::memory::{InMemoryBroker, InMemoryPublisher, InMemorySubscriber};
use serde::{Serialize, Deserialize};
use std::sync::Arc;
#[derive(Serialize, Deserialize, Debug)]
pub struct OrderEvent {
pub order_id: String,
pub customer_id: String,
pub amount: f64,
pub status: OrderStatus,
}
#[derive(Serialize, Deserialize, Debug)]
pub enum OrderStatus {
Created,
PaymentProcessed,
Shipped,
Delivered,
Cancelled,
}
// Order Service
pub struct OrderService {
publisher: InMemoryPublisher,
subscriber: InMemorySubscriber,
}
impl OrderService {
pub fn new(broker: Arc<InMemoryBroker>) -> Self {
Self {
publisher: InMemoryPublisher::new(broker.clone()),
subscriber: InMemorySubscriber::new(broker),
}
}
pub async fn create_order(&self, order: OrderEvent) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Order Service: Creating order {}", order.order_id);
let message = Message::new(serde_json::to_vec(&order)?)
.with_metadata("event_type", "order_created")
.with_metadata("service", "order_service");
self.publisher.publish("order_events", vec![message]).await?;
Ok(())
}
pub async fn listen_for_events(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.subscriber.subscribe("payment_events").await?;
self.subscriber.subscribe("shipping_events").await?;
loop {
let message = self.subscriber.receive().await?;
self.handle_event(message).await?;
}
}
async fn handle_event(&self, message: Message) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let event_type = message.metadata.get("event_type").unwrap_or(&"unknown".to_string());
match event_type.as_str() {
"payment_processed" => {
let order_event: OrderEvent = serde_json::from_slice(&message.payload)?;
println!("Order Service: Payment processed for order {}", order_event.order_id);
// Update order status and publish event
let updated_order = OrderEvent {
status: OrderStatus::PaymentProcessed,
..order_event
};
let response_message = Message::new(serde_json::to_vec(&updated_order)?)
.with_metadata("event_type", "order_updated")
.with_metadata("service", "order_service");
self.publisher.publish("order_events", vec![response_message]).await?;
}
"shipment_created" => {
let order_event: OrderEvent = serde_json::from_slice(&message.payload)?;
println!("Order Service: Shipment created for order {}", order_event.order_id);
}
_ => {
println!("Order Service: Unknown event type: {}", event_type);
}
}
Ok(())
}
}
// Payment Service
pub struct PaymentService {
publisher: InMemoryPublisher,
subscriber: InMemorySubscriber,
}
impl PaymentService {
pub fn new(broker: Arc<InMemoryBroker>) -> Self {
Self {
publisher: InMemoryPublisher::new(broker.clone()),
subscriber: InMemorySubscriber::new(broker),
}
}
pub async fn listen_for_orders(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.subscriber.subscribe("order_events").await?;
loop {
let message = self.subscriber.receive().await?;
if let Some(event_type) = message.metadata.get("event_type") {
if event_type == "order_created" {
self.process_payment(message).await?;
}
}
}
}
async fn process_payment(&self, message: Message) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let order: OrderEvent = serde_json::from_slice(&message.payload)?;
println!("Payment Service: Processing payment for order {} (${:.2})",
order.order_id, order.amount);
// Simulate payment processing
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let payment_event = OrderEvent {
status: OrderStatus::PaymentProcessed,
..order
};
let response_message = Message::new(serde_json::to_vec(&payment_event)?)
.with_metadata("event_type", "payment_processed")
.with_metadata("service", "payment_service");
self.publisher.publish("payment_events", vec![response_message]).await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let broker = Arc::new(InMemoryBroker::with_default_config());
// Start services
let order_service = Arc::new(OrderService::new(broker.clone()));
let payment_service = Arc::new(PaymentService::new(broker.clone()));
// Start event listeners
let mut order_listener = OrderService::new(broker.clone());
let mut payment_listener = PaymentService::new(broker.clone());
tokio::spawn(async move {
if let Err(e) = order_listener.listen_for_events().await {
eprintln!("Order listener error: {}", e);
}
});
tokio::spawn(async move {
if let Err(e) = payment_listener.listen_for_orders().await {
eprintln!("Payment listener error: {}", e);
}
});
// Simulate order creation
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let order = OrderEvent {
order_id: "ORD-001".to_string(),
customer_id: "CUST-123".to_string(),
amount: 99.99,
status: OrderStatus::Created,
};
order_service.create_order(order).await?;
// Keep the application running
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
Ok(())
}
Event-Driven Communication
Saga Pattern Implementation
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SagaStep {
pub step_id: String,
pub service: String,
pub action: String,
pub compensation_action: String,
pub status: StepStatus,
pub data: serde_json::Value,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum StepStatus {
Pending,
Completed,
Failed,
Compensated,
}
pub struct SagaOrchestrator {
publisher: InMemoryPublisher,
subscriber: InMemorySubscriber,
active_sagas: HashMap<String, Vec<SagaStep>>,
}
impl SagaOrchestrator {
pub fn new(broker: Arc<InMemoryBroker>) -> Self {
Self {
publisher: InMemoryPublisher::new(broker.clone()),
subscriber: InMemorySubscriber::new(broker),
active_sagas: HashMap::new(),
}
}
pub async fn start_saga(&mut self, saga_id: String, steps: Vec<SagaStep>) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Starting saga: {}", saga_id);
self.active_sagas.insert(saga_id.clone(), steps.clone());
// Execute first step
if let Some(first_step) = steps.first() {
self.execute_step(&saga_id, first_step).await?;
}
Ok(())
}
async fn execute_step(&self, saga_id: &str, step: &SagaStep) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Executing step {} for saga {}", step.step_id, saga_id);
let message = Message::new(serde_json::to_vec(step)?)
.with_metadata("saga_id", saga_id)
.with_metadata("step_id", &step.step_id)
.with_metadata("action", &step.action);
let topic = format!("{}_commands", step.service);
self.publisher.publish(&topic, vec![message]).await?;
Ok(())
}
}
Request-Response Pattern
use tokio::sync::oneshot;
use std::collections::HashMap;
use uuid::Uuid;
pub struct RequestResponseService {
publisher: InMemoryPublisher,
subscriber: InMemorySubscriber,
pending_requests: Arc<tokio::sync::Mutex<HashMap<String, oneshot::Sender<Message>>>>,
}
impl RequestResponseService {
pub fn new(broker: Arc<InMemoryBroker>, service_name: &str) -> Self {
let mut subscriber = InMemorySubscriber::new(broker.clone());
Self {
publisher: InMemoryPublisher::new(broker),
subscriber,
pending_requests: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
}
}
pub async fn request(&self, target_service: &str, request_data: Vec<u8>) -> Result<Message, Box<dyn std::error::Error + Send + Sync>> {
let request_id = Uuid::new_v4().to_string();
let (sender, receiver) = oneshot::channel();
// Store the pending request
{
let mut pending = self.pending_requests.lock().await;
pending.insert(request_id.clone(), sender);
}
// Send the request
let request_message = Message::new(request_data)
.with_metadata("request_id", &request_id)
.with_metadata("reply_to", "response_topic");
let topic = format!("{}_requests", target_service);
self.publisher.publish(&topic, vec![request_message]).await?;
// Wait for response
let response = receiver.await?;
Ok(response)
}
pub async fn listen_for_responses(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.subscriber.subscribe("response_topic").await?;
loop {
let message = self.subscriber.receive().await?;
if let Some(request_id) = message.metadata.get("request_id") {
let mut pending = self.pending_requests.lock().await;
if let Some(sender) = pending.remove(request_id) {
let _ = sender.send(message);
}
}
}
}
}
This microservices guide demonstrates the core patterns for building distributed systems with Kincir.