Monitoring and Observability with Kincir
This guide demonstrates how to add comprehensive monitoring, metrics, and distributed tracing to Kincir applications.
Table of Contents
Metrics Collection
Prometheus Metrics
use prometheus::{Counter, Histogram, Gauge, Registry, Encoder, TextEncoder};
use kincir::{Publisher, Subscriber, Message};
use std::sync::Arc;
use std::time::Instant;
pub struct MetricsCollector {
messages_published: Counter,
messages_received: Counter,
message_processing_duration: Histogram,
active_connections: Gauge,
error_count: Counter,
registry: Registry,
}
impl MetricsCollector {
pub fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let registry = Registry::new();
let messages_published = Counter::new(
"kincir_messages_published_total",
"Total number of messages published"
)?;
let messages_received = Counter::new(
"kincir_messages_received_total",
"Total number of messages received"
)?;
let message_processing_duration = Histogram::new(
"kincir_message_processing_duration_seconds",
"Time spent processing messages"
)?;
let active_connections = Gauge::new(
"kincir_active_connections",
"Number of active connections"
)?;
let error_count = Counter::new(
"kincir_errors_total",
"Total number of errors"
)?;
registry.register(Box::new(messages_published.clone()))?;
registry.register(Box::new(messages_received.clone()))?;
registry.register(Box::new(message_processing_duration.clone()))?;
registry.register(Box::new(active_connections.clone()))?;
registry.register(Box::new(error_count.clone()))?;
Ok(Self {
messages_published,
messages_received,
message_processing_duration,
active_connections,
error_count,
registry,
})
}
pub fn record_message_published(&self) {
self.messages_published.inc();
}
pub fn record_message_received(&self) {
self.messages_received.inc();
}
pub fn record_processing_time(&self, duration: std::time::Duration) {
self.message_processing_duration.observe(duration.as_secs_f64());
}
pub fn set_active_connections(&self, count: i64) {
self.active_connections.set(count as f64);
}
pub fn record_error(&self) {
self.error_count.inc();
}
pub fn export_metrics(&self) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let encoder = TextEncoder::new();
let metric_families = self.registry.gather();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer)?;
Ok(String::from_utf8(buffer)?)
}
}
// Instrumented Publisher
pub struct InstrumentedPublisher<P> {
publisher: P,
metrics: Arc<MetricsCollector>,
}
impl<P> InstrumentedPublisher<P>
where
P: Publisher,
{
pub fn new(publisher: P, metrics: Arc<MetricsCollector>) -> Self {
Self { publisher, metrics }
}
}
#[async_trait::async_trait]
impl<P> Publisher for InstrumentedPublisher<P>
where
P: Publisher + Send + Sync,
{
type Error = P::Error;
async fn publish(&self, topic: &str, messages: Vec<Message>) -> Result<(), Self::Error> {
let start = Instant::now();
let message_count = messages.len();
match self.publisher.publish(topic, messages).await {
Ok(()) => {
for _ in 0..message_count {
self.metrics.record_message_published();
}
self.metrics.record_processing_time(start.elapsed());
Ok(())
}
Err(e) => {
self.metrics.record_error();
Err(e)
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let metrics = Arc::new(MetricsCollector::new()?);
let broker = Arc::new(kincir::memory::InMemoryBroker::with_default_config());
let base_publisher = kincir::memory::InMemoryPublisher::new(broker);
let instrumented_publisher = InstrumentedPublisher::new(base_publisher, metrics.clone());
// Publish some messages
for i in 0..100 {
let message = Message::new(format!("Message {}", i).into_bytes());
instrumented_publisher.publish("test-topic", vec![message]).await?;
}
// Export metrics
println!("Metrics:\n{}", metrics.export_metrics()?);
Ok(())
}
Health Checks
Application Health Monitoring
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use std::time::{Duration, Instant};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct HealthCheck {
pub name: String,
pub status: HealthStatus,
pub message: String,
pub last_checked: String,
pub response_time_ms: u64,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct HealthReport {
pub overall_status: HealthStatus,
pub checks: Vec<HealthCheck>,
pub timestamp: String,
}
pub struct HealthMonitor {
checks: Arc<RwLock<HashMap<String, HealthCheck>>>,
}
impl HealthMonitor {
pub fn new() -> Self {
Self {
checks: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn register_check<F, Fut>(&self, name: String, check_fn: F)
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<String, String>> + Send,
{
let checks = self.checks.clone();
let check_name = name.clone();
tokio::spawn(async move {
loop {
let start = Instant::now();
let (status, message) = match check_fn().await {
Ok(msg) => (HealthStatus::Healthy, msg),
Err(err) => (HealthStatus::Unhealthy, err),
};
let health_check = HealthCheck {
name: check_name.clone(),
status,
message,
last_checked: chrono::Utc::now().to_rfc3339(),
response_time_ms: start.elapsed().as_millis() as u64,
};
{
let mut checks_map = checks.write().await;
checks_map.insert(check_name.clone(), health_check);
}
tokio::time::sleep(Duration::from_secs(30)).await;
}
});
}
pub async fn get_health_report(&self) -> HealthReport {
let checks_map = self.checks.read().await;
let checks: Vec<HealthCheck> = checks_map.values().cloned().collect();
let overall_status = if checks.iter().any(|c| matches!(c.status, HealthStatus::Unhealthy)) {
HealthStatus::Unhealthy
} else if checks.iter().any(|c| matches!(c.status, HealthStatus::Degraded)) {
HealthStatus::Degraded
} else {
HealthStatus::Healthy
};
HealthReport {
overall_status,
checks,
timestamp: chrono::Utc::now().to_rfc3339(),
}
}
}
// Kincir-specific health checks
pub struct KincirHealthChecks {
broker: Arc<kincir::memory::InMemoryBroker>,
}
impl KincirHealthChecks {
pub fn new(broker: Arc<kincir::memory::InMemoryBroker>) -> Self {
Self { broker }
}
pub async fn check_broker_health(&self) -> Result<String, String> {
// Check if broker is responsive
let publisher = kincir::memory::InMemoryPublisher::new(self.broker.clone());
let test_message = Message::new(b"health-check".to_vec());
match publisher.publish("health-check", vec![test_message]).await {
Ok(_) => Ok("Broker is healthy".to_string()),
Err(e) => Err(format!("Broker health check failed: {}", e)),
}
}
pub async fn check_memory_usage(&self) -> Result<String, String> {
// Simple memory usage check (in a real implementation, you'd use proper memory monitoring)
let stats = self.broker.get_statistics().await;
let total_messages = stats.total_messages_published + stats.total_messages_received;
if total_messages > 1_000_000 {
Err("High message volume detected".to_string())
} else {
Ok(format!("Memory usage normal (total messages: {})", total_messages))
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let broker = Arc::new(kincir::memory::InMemoryBroker::with_default_config());
let health_monitor = HealthMonitor::new();
let kincir_checks = KincirHealthChecks::new(broker.clone());
// Register health checks
health_monitor.register_check(
"broker_health".to_string(),
{
let kincir_checks = kincir_checks.clone();
move || kincir_checks.check_broker_health()
}
).await;
health_monitor.register_check(
"memory_usage".to_string(),
{
let kincir_checks = kincir_checks.clone();
move || kincir_checks.check_memory_usage()
}
).await;
// Wait for health checks to run
tokio::time::sleep(Duration::from_secs(2)).await;
// Get health report
let report = health_monitor.get_health_report().await;
println!("Health Report: {}", serde_json::to_string_pretty(&report)?);
Ok(())
}
Distributed Tracing
OpenTelemetry Integration
use opentelemetry::{
global,
trace::{TraceContextExt, Tracer},
Context, KeyValue,
};
use opentelemetry_jaeger::new_agent_pipeline;
use tracing::{info, instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
pub struct TracedPublisher<P> {
publisher: P,
tracer: Box<dyn Tracer + Send + Sync>,
}
impl<P> TracedPublisher<P>
where
P: Publisher,
{
pub fn new(publisher: P) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let tracer = new_agent_pipeline()
.with_service_name("kincir-publisher")
.install_simple()?;
Ok(Self {
publisher,
tracer: Box::new(tracer),
})
}
}
#[async_trait::async_trait]
impl<P> Publisher for TracedPublisher<P>
where
P: Publisher + Send + Sync,
{
type Error = P::Error;
#[instrument(skip(self, messages))]
async fn publish(&self, topic: &str, messages: Vec<Message>) -> Result<(), Self::Error> {
let span = self.tracer
.start("kincir.publish")
.with_attributes(vec![
KeyValue::new("messaging.destination", topic.to_string()),
KeyValue::new("messaging.message_count", messages.len() as i64),
]);
let cx = Context::current_with_span(span);
let _guard = cx.attach();
// Add trace context to messages
let traced_messages: Vec<Message> = messages
.into_iter()
.map(|mut msg| {
let span_context = Span::current().context().span().span_context();
msg.set_metadata("trace_id", &span_context.trace_id().to_string());
msg.set_metadata("span_id", &span_context.span_id().to_string());
msg
})
.collect();
info!("Publishing {} messages to topic '{}'", traced_messages.len(), topic);
match self.publisher.publish(topic, traced_messages).await {
Ok(()) => {
info!("Successfully published messages");
Ok(())
}
Err(e) => {
tracing::error!("Failed to publish messages: {:?}", e);
Err(e)
}
}
}
}
// Traced message processor
pub struct TracedMessageProcessor {
tracer: Box<dyn Tracer + Send + Sync>,
}
impl TracedMessageProcessor {
pub fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let tracer = new_agent_pipeline()
.with_service_name("kincir-processor")
.install_simple()?;
Ok(Self {
tracer: Box::new(tracer),
})
}
#[instrument(skip(self, message))]
pub async fn process_message(&self, message: Message) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Extract trace context from message
let trace_id = message.metadata.get("trace_id");
let span_id = message.metadata.get("span_id");
let span = self.tracer
.start("kincir.process_message")
.with_attributes(vec![
KeyValue::new("message.uuid", message.uuid.clone()),
KeyValue::new("message.size", message.payload.len() as i64),
]);
if let (Some(trace_id), Some(span_id)) = (trace_id, span_id) {
span.add_event(
"trace_context_extracted",
vec![
KeyValue::new("parent.trace_id", trace_id.clone()),
KeyValue::new("parent.span_id", span_id.clone()),
],
);
}
let cx = Context::current_with_span(span);
let _guard = cx.attach();
info!("Processing message: {}", message.uuid);
// Simulate message processing
tokio::time::sleep(Duration::from_millis(10)).await;
info!("Message processed successfully");
Ok(())
}
}
Logging
Structured Logging
use tracing::{info, warn, error, debug, instrument};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use serde_json::json;
pub struct StructuredLogger;
impl StructuredLogger {
pub fn init() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "kincir=debug,info".into()),
)
.with(tracing_subscriber::fmt::layer().json())
.try_init()?;
Ok(())
}
}
pub struct LoggingPublisher<P> {
publisher: P,
service_name: String,
}
impl<P> LoggingPublisher<P>
where
P: Publisher,
{
pub fn new(publisher: P, service_name: String) -> Self {
Self { publisher, service_name }
}
}
#[async_trait::async_trait]
impl<P> Publisher for LoggingPublisher<P>
where
P: Publisher + Send + Sync,
{
type Error = P::Error;
#[instrument(skip(self, messages), fields(service = %self.service_name, topic = %topic, message_count = messages.len()))]
async fn publish(&self, topic: &str, messages: Vec<Message>) -> Result<(), Self::Error> {
debug!("Starting message publish operation");
let start = std::time::Instant::now();
match self.publisher.publish(topic, messages.clone()).await {
Ok(()) => {
let duration = start.elapsed();
info!(
duration_ms = duration.as_millis(),
"Messages published successfully"
);
// Log individual message details in debug mode
for (i, message) in messages.iter().enumerate() {
debug!(
message_index = i,
message_uuid = %message.uuid,
message_size = message.payload.len(),
metadata = ?message.metadata,
"Message details"
);
}
Ok(())
}
Err(e) => {
error!(
error = %e,
duration_ms = start.elapsed().as_millis(),
"Failed to publish messages"
);
Err(e)
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
StructuredLogger::init()?;
let broker = Arc::new(kincir::memory::InMemoryBroker::with_default_config());
let base_publisher = kincir::memory::InMemoryPublisher::new(broker);
let logging_publisher = LoggingPublisher::new(base_publisher, "order-service".to_string());
// Publish messages with structured logging
for i in 0..5 {
let message = Message::new(format!("Order {}", i).into_bytes())
.with_metadata("order_id", &format!("ORD-{:03}", i))
.with_metadata("customer_id", &format!("CUST-{}", i % 3));
logging_publisher.publish("orders", vec![message]).await?;
}
Ok(())
}
Alerting
Alert Manager Integration
use serde::{Serialize, Deserialize};
use reqwest::Client;
use std::collections::HashMap;
#[derive(Serialize, Deserialize, Debug)]
pub struct Alert {
pub labels: HashMap<String, String>,
pub annotations: HashMap<String, String>,
pub starts_at: String,
pub ends_at: Option<String>,
}
pub struct AlertManager {
client: Client,
webhook_url: String,
}
impl AlertManager {
pub fn new(webhook_url: String) -> Self {
Self {
client: Client::new(),
webhook_url,
}
}
pub async fn send_alert(&self, alert: Alert) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let alerts = vec![alert];
let response = self.client
.post(&self.webhook_url)
.json(&alerts)
.send()
.await?;
if response.status().is_success() {
println!("Alert sent successfully");
} else {
eprintln!("Failed to send alert: {}", response.status());
}
Ok(())
}
pub fn create_high_error_rate_alert(&self, service: &str, error_rate: f64) -> Alert {
let mut labels = HashMap::new();
labels.insert("alertname".to_string(), "HighErrorRate".to_string());
labels.insert("service".to_string(), service.to_string());
labels.insert("severity".to_string(), "critical".to_string());
let mut annotations = HashMap::new();
annotations.insert(
"summary".to_string(),
format!("High error rate detected in {}", service),
);
annotations.insert(
"description".to_string(),
format!("Error rate is {:.2}% which exceeds the threshold", error_rate * 100.0),
);
Alert {
labels,
annotations,
starts_at: chrono::Utc::now().to_rfc3339(),
ends_at: None,
}
}
}
// Monitoring service that triggers alerts
pub struct MonitoringService {
metrics: Arc<MetricsCollector>,
alert_manager: AlertManager,
error_rate_threshold: f64,
}
impl MonitoringService {
pub fn new(
metrics: Arc<MetricsCollector>,
alert_manager: AlertManager,
error_rate_threshold: f64,
) -> Self {
Self {
metrics,
alert_manager,
error_rate_threshold,
}
}
pub async fn check_and_alert(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// This would typically get metrics from Prometheus
// For demo purposes, we'll simulate checking error rate
let error_rate = 0.15; // 15% error rate
if error_rate > self.error_rate_threshold {
let alert = self.alert_manager.create_high_error_rate_alert("kincir-service", error_rate);
self.alert_manager.send_alert(alert).await?;
}
Ok(())
}
}
This monitoring guide provides comprehensive observability for Kincir applications in production environments.