vendor: update cargo-cxcloud-common-0.1.0
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"git": {
|
||||
"sha1": "927a31cbf65f78c3ef6b729631b2fc35335afe06",
|
||||
"dirty": true
|
||||
},
|
||||
"path_in_vcs": "services/cxcloud-rs/crates/common"
|
||||
}
|
||||
Generated
+2282
File diff suppressed because it is too large
Load Diff
+92
@@ -0,0 +1,92 @@
|
||||
# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
|
||||
#
|
||||
# When uploading crates to the registry Cargo will automatically
|
||||
# "normalize" Cargo.toml files for maximal compatibility
|
||||
# with all versions of Cargo and also rewrite `path` dependencies
|
||||
# to registry (e.g., crates.io) dependencies.
|
||||
#
|
||||
# If you are reading this file be aware that the original Cargo.toml
|
||||
# will likely look very different (and much more reasonable).
|
||||
# See Cargo.toml.orig for the original contents.
|
||||
|
||||
[package]
|
||||
edition = "2021"
|
||||
name = "cxcloud-common"
|
||||
version = "0.1.0"
|
||||
build = false
|
||||
publish = ["cxai"]
|
||||
autolib = false
|
||||
autobins = false
|
||||
autoexamples = false
|
||||
autotests = false
|
||||
autobenches = false
|
||||
readme = false
|
||||
|
||||
[lib]
|
||||
name = "cxcloud_common"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies.anyhow]
|
||||
version = "1"
|
||||
|
||||
[dependencies.chrono]
|
||||
version = "0.4"
|
||||
features = ["serde"]
|
||||
|
||||
[dependencies.cxcloud-proto]
|
||||
version = "0.1.0"
|
||||
registry-index = "sparse+https://git.cxllm-studio.com/api/packages/CxAI-LLM/cargo/"
|
||||
|
||||
[dependencies.deadpool-redis]
|
||||
version = "0.18"
|
||||
|
||||
[dependencies.opentelemetry]
|
||||
version = "0.24"
|
||||
|
||||
[dependencies.opentelemetry-otlp]
|
||||
version = "0.17"
|
||||
|
||||
[dependencies.opentelemetry_sdk]
|
||||
version = "0.24"
|
||||
features = ["rt-tokio"]
|
||||
|
||||
[dependencies.redis]
|
||||
version = "0.27"
|
||||
features = [
|
||||
"tokio-comp",
|
||||
"connection-manager",
|
||||
]
|
||||
|
||||
[dependencies.serde]
|
||||
version = "1"
|
||||
features = ["derive"]
|
||||
|
||||
[dependencies.serde_json]
|
||||
version = "1"
|
||||
|
||||
[dependencies.thiserror]
|
||||
version = "2"
|
||||
|
||||
[dependencies.tokio]
|
||||
version = "1"
|
||||
features = ["full"]
|
||||
|
||||
[dependencies.tracing]
|
||||
version = "0.1"
|
||||
|
||||
[dependencies.tracing-opentelemetry]
|
||||
version = "0.25"
|
||||
|
||||
[dependencies.tracing-subscriber]
|
||||
version = "0.3"
|
||||
features = [
|
||||
"env-filter",
|
||||
"json",
|
||||
]
|
||||
|
||||
[dependencies.uuid]
|
||||
version = "1"
|
||||
features = [
|
||||
"v7",
|
||||
"serde",
|
||||
]
|
||||
Generated
+23
@@ -0,0 +1,23 @@
|
||||
[package]
|
||||
name = "cxcloud-common"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
publish.workspace = true
|
||||
|
||||
[dependencies]
|
||||
cxcloud-proto = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
redis = { workspace = true }
|
||||
deadpool-redis = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
opentelemetry = { workspace = true }
|
||||
opentelemetry_sdk = { workspace = true }
|
||||
opentelemetry-otlp = { workspace = true }
|
||||
tracing-opentelemetry = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
@@ -1,3 +0,0 @@
|
||||
# cargo-cxcloud-common-0.1.0
|
||||
|
||||
Cargo crate: cxcloud-common-0.1.0
|
||||
+127
@@ -0,0 +1,127 @@
|
||||
use std::env;
|
||||
|
||||
/// Read an environment variable with a default fallback.
|
||||
pub fn env_or(key: &str, default: &str) -> String {
|
||||
env::var(key).unwrap_or_else(|_| default.to_string())
|
||||
}
|
||||
|
||||
/// Read a required environment variable. Panics if missing.
|
||||
pub fn env_required(key: &str) -> String {
|
||||
env::var(key).unwrap_or_else(|_| panic!("{key} environment variable is required"))
|
||||
}
|
||||
|
||||
/// Read an environment variable as a u16 port number.
|
||||
pub fn env_port(key: &str, default: u16) -> u16 {
|
||||
env::var(key)
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(default)
|
||||
}
|
||||
|
||||
/// Read an environment variable as a u64.
|
||||
pub fn env_u64(key: &str, default: u64) -> u64 {
|
||||
env::var(key)
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(default)
|
||||
}
|
||||
|
||||
/// Common service configuration shared by all services.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServiceConfig {
|
||||
pub redis_url: String,
|
||||
pub otel_endpoint: String,
|
||||
pub otel_service_name: String,
|
||||
pub log_level: String,
|
||||
}
|
||||
|
||||
impl ServiceConfig {
|
||||
pub fn from_env(service_name: &str) -> Self {
|
||||
Self {
|
||||
redis_url: env_or("REDIS_URL", "redis://localhost:6379"),
|
||||
otel_endpoint: env_or("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"),
|
||||
otel_service_name: env_or("OTEL_SERVICE_NAME", service_name),
|
||||
log_level: env_or("LOG_LEVEL", "info"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn env_or_returns_default_when_unset() {
|
||||
// Use a key unlikely to be set
|
||||
let val = env_or("CXCLOUD_TEST_UNSET_VAR_12345", "fallback");
|
||||
assert_eq!(val, "fallback");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn env_or_returns_value_when_set() {
|
||||
env::set_var("CXCLOUD_TEST_ENV_OR", "custom");
|
||||
let val = env_or("CXCLOUD_TEST_ENV_OR", "default");
|
||||
assert_eq!(val, "custom");
|
||||
env::remove_var("CXCLOUD_TEST_ENV_OR");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn env_port_parses_valid_port() {
|
||||
env::set_var("CXCLOUD_TEST_PORT", "9090");
|
||||
let port = env_port("CXCLOUD_TEST_PORT", 3000);
|
||||
assert_eq!(port, 9090);
|
||||
env::remove_var("CXCLOUD_TEST_PORT");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn env_port_returns_default_for_invalid() {
|
||||
env::set_var("CXCLOUD_TEST_PORT_BAD", "not-a-number");
|
||||
let port = env_port("CXCLOUD_TEST_PORT_BAD", 3000);
|
||||
assert_eq!(port, 3000);
|
||||
env::remove_var("CXCLOUD_TEST_PORT_BAD");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn env_port_returns_default_when_unset() {
|
||||
let port = env_port("CXCLOUD_TEST_PORT_UNSET_99999", 8080);
|
||||
assert_eq!(port, 8080);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn env_u64_parses_valid_value() {
|
||||
env::set_var("CXCLOUD_TEST_U64", "42");
|
||||
let val = env_u64("CXCLOUD_TEST_U64", 10);
|
||||
assert_eq!(val, 42);
|
||||
env::remove_var("CXCLOUD_TEST_U64");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn env_u64_returns_default_when_unset() {
|
||||
let val = env_u64("CXCLOUD_TEST_U64_UNSET_99999", 100);
|
||||
assert_eq!(val, 100);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "environment variable is required")]
|
||||
fn env_required_panics_when_missing() {
|
||||
env_required("CXCLOUD_TEST_REQUIRED_UNSET_99999");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn env_required_returns_value_when_set() {
|
||||
env::set_var("CXCLOUD_TEST_REQUIRED", "present");
|
||||
let val = env_required("CXCLOUD_TEST_REQUIRED");
|
||||
assert_eq!(val, "present");
|
||||
env::remove_var("CXCLOUD_TEST_REQUIRED");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn service_config_from_env_defaults() {
|
||||
let cfg = ServiceConfig::from_env("test-svc");
|
||||
assert!(!cfg.redis_url.is_empty());
|
||||
assert!(!cfg.otel_endpoint.is_empty());
|
||||
assert!(!cfg.log_level.is_empty());
|
||||
// service name should default if OTEL_SERVICE_NAME not set
|
||||
assert!(!cfg.otel_service_name.is_empty());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,95 @@
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum CxError {
|
||||
#[error("Redis error: {0}")]
|
||||
Redis(#[from] redis::RedisError),
|
||||
|
||||
#[error("Serialization error: {0}")]
|
||||
Serialization(#[from] serde_json::Error),
|
||||
|
||||
#[error("Configuration error: {0}")]
|
||||
Config(String),
|
||||
|
||||
#[error("Service unavailable: {0}")]
|
||||
Unavailable(String),
|
||||
|
||||
#[error("Not found: {0}")]
|
||||
NotFound(String),
|
||||
|
||||
#[error("Unauthorized: {0}")]
|
||||
Unauthorized(String),
|
||||
|
||||
#[error("Rate limited")]
|
||||
RateLimited,
|
||||
|
||||
#[error("Upstream error: {status} {message}")]
|
||||
Upstream { status: u16, message: String },
|
||||
|
||||
#[error("Internal error: {0}")]
|
||||
Internal(String),
|
||||
}
|
||||
|
||||
impl CxError {
|
||||
pub fn status_code(&self) -> u16 {
|
||||
match self {
|
||||
Self::Unauthorized(_) => 401,
|
||||
Self::RateLimited => 429,
|
||||
Self::NotFound(_) => 404,
|
||||
Self::Unavailable(_) => 503,
|
||||
Self::Upstream { status, .. } => *status,
|
||||
_ => 500,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn status_codes_are_correct() {
|
||||
assert_eq!(CxError::Unauthorized("x".into()).status_code(), 401);
|
||||
assert_eq!(CxError::RateLimited.status_code(), 429);
|
||||
assert_eq!(CxError::NotFound("x".into()).status_code(), 404);
|
||||
assert_eq!(CxError::Unavailable("x".into()).status_code(), 503);
|
||||
assert_eq!(
|
||||
CxError::Upstream { status: 502, message: "bad".into() }.status_code(),
|
||||
502
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn internal_errors_default_to_500() {
|
||||
assert_eq!(CxError::Internal("oops".into()).status_code(), 500);
|
||||
assert_eq!(CxError::Config("bad config".into()).status_code(), 500);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn error_display_messages() {
|
||||
let e = CxError::NotFound("item-123".into());
|
||||
assert_eq!(e.to_string(), "Not found: item-123");
|
||||
|
||||
let e = CxError::RateLimited;
|
||||
assert_eq!(e.to_string(), "Rate limited");
|
||||
|
||||
let e = CxError::Upstream { status: 503, message: "timeout".into() };
|
||||
assert_eq!(e.to_string(), "Upstream error: 503 timeout");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn redis_error_converts() {
|
||||
let redis_err = redis::RedisError::from((redis::ErrorKind::IoError, "connection refused"));
|
||||
let cx_err = CxError::from(redis_err);
|
||||
assert_eq!(cx_err.status_code(), 500);
|
||||
assert!(cx_err.to_string().contains("Redis error"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn json_error_converts() {
|
||||
let json_err = serde_json::from_str::<serde_json::Value>("{{bad}}").unwrap_err();
|
||||
let cx_err = CxError::from(json_err);
|
||||
assert_eq!(cx_err.status_code(), 500);
|
||||
assert!(cx_err.to_string().contains("Serialization error"));
|
||||
}
|
||||
}
|
||||
+152
@@ -0,0 +1,152 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Unified event envelope matching cxcloud.common.Event proto.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Event {
|
||||
pub id: String,
|
||||
pub source: String,
|
||||
pub r#type: String,
|
||||
pub timestamp: DateTime<Utc>,
|
||||
pub payload: serde_json::Value,
|
||||
pub metadata: HashMap<String, String>,
|
||||
}
|
||||
|
||||
impl Event {
|
||||
pub fn new(source: &str, event_type: &str, payload: serde_json::Value) -> Self {
|
||||
Self {
|
||||
id: Uuid::now_v7().to_string(),
|
||||
source: source.to_string(),
|
||||
r#type: event_type.to_string(),
|
||||
timestamp: Utc::now(),
|
||||
payload,
|
||||
metadata: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert to a flat HashMap for Redis XADD.
|
||||
pub fn to_stream_fields(&self) -> Vec<(String, String)> {
|
||||
vec![
|
||||
("id".into(), self.id.clone()),
|
||||
("source".into(), self.source.clone()),
|
||||
("type".into(), self.r#type.clone()),
|
||||
("timestamp".into(), self.timestamp.to_rfc3339()),
|
||||
("payload".into(), serde_json::to_string(&self.payload).unwrap_or_default()),
|
||||
("metadata".into(), serde_json::to_string(&self.metadata).unwrap_or_default()),
|
||||
]
|
||||
}
|
||||
|
||||
/// Parse from Redis stream fields.
|
||||
pub fn from_stream_fields(fields: &HashMap<String, String>) -> Option<Self> {
|
||||
Some(Self {
|
||||
id: fields.get("id")?.clone(),
|
||||
source: fields.get("source")?.clone(),
|
||||
r#type: fields.get("type")?.clone(),
|
||||
timestamp: fields
|
||||
.get("timestamp")?
|
||||
.parse::<DateTime<Utc>>()
|
||||
.ok()?,
|
||||
payload: fields
|
||||
.get("payload")
|
||||
.and_then(|p| serde_json::from_str(p).ok())
|
||||
.unwrap_or(serde_json::Value::Null),
|
||||
metadata: fields
|
||||
.get("metadata")
|
||||
.and_then(|m| serde_json::from_str(m).ok())
|
||||
.unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn new_event_has_valid_uuid_and_timestamp() {
|
||||
let event = Event::new("test-source", "test.created", serde_json::json!({"key": "val"}));
|
||||
assert!(!event.id.is_empty());
|
||||
assert_eq!(event.source, "test-source");
|
||||
assert_eq!(event.r#type, "test.created");
|
||||
assert_eq!(event.payload["key"], "val");
|
||||
assert!(event.metadata.is_empty());
|
||||
// UUID v7 is time-ordered, starts with a recent timestamp
|
||||
assert!(uuid::Uuid::parse_str(&event.id).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn roundtrip_stream_fields() {
|
||||
let mut event = Event::new("svc", "evt.type", serde_json::json!({"n": 42}));
|
||||
event.metadata.insert("k".into(), "v".into());
|
||||
|
||||
let fields = event.to_stream_fields();
|
||||
let field_map: HashMap<String, String> = fields.into_iter().collect();
|
||||
|
||||
let restored = Event::from_stream_fields(&field_map).expect("should parse");
|
||||
assert_eq!(restored.id, event.id);
|
||||
assert_eq!(restored.source, "svc");
|
||||
assert_eq!(restored.r#type, "evt.type");
|
||||
assert_eq!(restored.payload["n"], 42);
|
||||
assert_eq!(restored.metadata["k"], "v");
|
||||
assert_eq!(restored.timestamp.timestamp(), event.timestamp.timestamp());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_stream_fields_missing_id_returns_none() {
|
||||
let mut fields = HashMap::new();
|
||||
fields.insert("source".into(), "src".into());
|
||||
fields.insert("type".into(), "t".into());
|
||||
fields.insert("timestamp".into(), Utc::now().to_rfc3339());
|
||||
assert!(Event::from_stream_fields(&fields).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_stream_fields_bad_timestamp_returns_none() {
|
||||
let mut fields = HashMap::new();
|
||||
fields.insert("id".into(), "abc".into());
|
||||
fields.insert("source".into(), "src".into());
|
||||
fields.insert("type".into(), "t".into());
|
||||
fields.insert("timestamp".into(), "not-a-date".into());
|
||||
assert!(Event::from_stream_fields(&fields).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_stream_fields_missing_payload_defaults_to_null() {
|
||||
let mut fields = HashMap::new();
|
||||
fields.insert("id".into(), "abc".into());
|
||||
fields.insert("source".into(), "src".into());
|
||||
fields.insert("type".into(), "t".into());
|
||||
fields.insert("timestamp".into(), Utc::now().to_rfc3339());
|
||||
let event = Event::from_stream_fields(&fields).unwrap();
|
||||
assert_eq!(event.payload, serde_json::Value::Null);
|
||||
assert!(event.metadata.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_serializes_to_json() {
|
||||
let event = Event::new("src", "t", serde_json::json!(null));
|
||||
let json = serde_json::to_value(&event).unwrap();
|
||||
assert_eq!(json["source"], "src");
|
||||
assert_eq!(json["type"], "t");
|
||||
assert!(json["id"].is_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_deserializes_from_json() {
|
||||
let json_str = r#"{
|
||||
"id": "test-id",
|
||||
"source": "api",
|
||||
"type": "user.login",
|
||||
"timestamp": "2025-01-01T00:00:00Z",
|
||||
"payload": {"user": "alice"},
|
||||
"metadata": {"env": "prod"}
|
||||
}"#;
|
||||
let event: Event = serde_json::from_str(json_str).unwrap();
|
||||
assert_eq!(event.id, "test-id");
|
||||
assert_eq!(event.r#type, "user.login");
|
||||
assert_eq!(event.payload["user"], "alice");
|
||||
assert_eq!(event.metadata["env"], "prod");
|
||||
}
|
||||
}
|
||||
+110
@@ -0,0 +1,110 @@
|
||||
use chrono::Utc;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct HealthResponse {
|
||||
pub service: String,
|
||||
pub status: HealthStatus,
|
||||
pub version: String,
|
||||
pub checked_at: String,
|
||||
pub dependencies: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum HealthStatus {
|
||||
Healthy,
|
||||
Degraded,
|
||||
Unhealthy,
|
||||
}
|
||||
|
||||
impl HealthResponse {
|
||||
pub fn healthy(service: &str) -> Self {
|
||||
Self {
|
||||
service: service.to_string(),
|
||||
status: HealthStatus::Healthy,
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
checked_at: Utc::now().to_rfc3339(),
|
||||
dependencies: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_dependency(mut self, name: &str, status: &str) -> Self {
|
||||
self.dependencies.insert(name.to_string(), status.to_string());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn compute_status(mut self) -> Self {
|
||||
let has_unhealthy = self.dependencies.values().any(|v| v == "unhealthy");
|
||||
let has_degraded = self.dependencies.values().any(|v| v == "degraded");
|
||||
self.status = if has_unhealthy {
|
||||
HealthStatus::Unhealthy
|
||||
} else if has_degraded {
|
||||
HealthStatus::Degraded
|
||||
} else {
|
||||
HealthStatus::Healthy
|
||||
};
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn healthy_response_has_correct_defaults() {
|
||||
let resp = HealthResponse::healthy("test-svc");
|
||||
assert_eq!(resp.service, "test-svc");
|
||||
assert!(matches!(resp.status, HealthStatus::Healthy));
|
||||
assert!(resp.dependencies.is_empty());
|
||||
assert!(!resp.version.is_empty());
|
||||
assert!(!resp.checked_at.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn with_dependency_adds_entry() {
|
||||
let resp = HealthResponse::healthy("svc")
|
||||
.with_dependency("redis", "healthy")
|
||||
.with_dependency("ollama", "degraded");
|
||||
assert_eq!(resp.dependencies.len(), 2);
|
||||
assert_eq!(resp.dependencies["redis"], "healthy");
|
||||
assert_eq!(resp.dependencies["ollama"], "degraded");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compute_status_unhealthy_takes_priority() {
|
||||
let resp = HealthResponse::healthy("svc")
|
||||
.with_dependency("redis", "healthy")
|
||||
.with_dependency("db", "unhealthy")
|
||||
.with_dependency("cache", "degraded")
|
||||
.compute_status();
|
||||
assert!(matches!(resp.status, HealthStatus::Unhealthy));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compute_status_degraded_when_no_unhealthy() {
|
||||
let resp = HealthResponse::healthy("svc")
|
||||
.with_dependency("redis", "healthy")
|
||||
.with_dependency("cache", "degraded")
|
||||
.compute_status();
|
||||
assert!(matches!(resp.status, HealthStatus::Degraded));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compute_status_stays_healthy() {
|
||||
let resp = HealthResponse::healthy("svc")
|
||||
.with_dependency("redis", "healthy")
|
||||
.with_dependency("db", "healthy")
|
||||
.compute_status();
|
||||
assert!(matches!(resp.status, HealthStatus::Healthy));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serializes_status_lowercase() {
|
||||
let resp = HealthResponse::healthy("svc");
|
||||
let json = serde_json::to_value(&resp).unwrap();
|
||||
assert_eq!(json["status"], "healthy");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
pub mod config;
|
||||
pub mod error;
|
||||
pub mod event;
|
||||
pub mod health;
|
||||
pub mod redis_streams;
|
||||
pub mod telemetry;
|
||||
@@ -0,0 +1,170 @@
|
||||
use redis::aio::ConnectionManager;
|
||||
use redis::{Client, RedisResult};
|
||||
use std::collections::HashMap;
|
||||
use tracing::{debug, error};
|
||||
|
||||
/// Create a Redis ConnectionManager from a URL.
|
||||
pub async fn connect(url: &str) -> RedisResult<ConnectionManager> {
|
||||
let client = Client::open(url)?;
|
||||
let conn = client.get_connection_manager().await?;
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
/// Publish an event to a Redis stream (XADD).
|
||||
pub async fn xadd(
|
||||
conn: &ConnectionManager,
|
||||
stream: &str,
|
||||
fields: &[(String, String)],
|
||||
) -> RedisResult<String> {
|
||||
let mut conn = conn.clone();
|
||||
let field_refs: Vec<(&str, &str)> = fields.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect();
|
||||
let id: String = redis::cmd("XADD")
|
||||
.arg(stream)
|
||||
.arg("*")
|
||||
.arg(&field_refs)
|
||||
.query_async(&mut conn)
|
||||
.await?;
|
||||
debug!(stream, id, "Published to stream");
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// Create a consumer group if it doesn't exist.
|
||||
pub async fn ensure_consumer_group(
|
||||
conn: &ConnectionManager,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
) {
|
||||
let mut conn = conn.clone();
|
||||
let result: RedisResult<String> = redis::cmd("XGROUP")
|
||||
.arg("CREATE")
|
||||
.arg(stream)
|
||||
.arg(group)
|
||||
.arg("0")
|
||||
.arg("MKSTREAM")
|
||||
.query_async(&mut conn)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(_) => debug!(stream, group, "Created consumer group"),
|
||||
Err(e) if e.to_string().contains("BUSYGROUP") => {
|
||||
debug!(stream, group, "Consumer group already exists");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(stream, group, error = %e, "Failed to create consumer group");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Read messages from a consumer group (XREADGROUP).
|
||||
pub async fn xreadgroup(
|
||||
conn: &ConnectionManager,
|
||||
group: &str,
|
||||
consumer: &str,
|
||||
stream: &str,
|
||||
count: usize,
|
||||
block_ms: usize,
|
||||
) -> RedisResult<Vec<(String, HashMap<String, String>)>> {
|
||||
let mut conn = conn.clone();
|
||||
let result: RedisResult<Vec<redis::Value>> = redis::cmd("XREADGROUP")
|
||||
.arg("GROUP")
|
||||
.arg(group)
|
||||
.arg(consumer)
|
||||
.arg("COUNT")
|
||||
.arg(count)
|
||||
.arg("BLOCK")
|
||||
.arg(block_ms)
|
||||
.arg("STREAMS")
|
||||
.arg(stream)
|
||||
.arg(">")
|
||||
.query_async(&mut conn)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(values) => Ok(parse_stream_response(values)),
|
||||
Err(e) => {
|
||||
if e.to_string().contains("nil") {
|
||||
Ok(vec![])
|
||||
} else {
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Acknowledge a message (XACK).
|
||||
pub async fn xack(
|
||||
conn: &ConnectionManager,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
id: &str,
|
||||
) -> RedisResult<()> {
|
||||
let mut conn = conn.clone();
|
||||
let _: i64 = redis::cmd("XACK")
|
||||
.arg(stream)
|
||||
.arg(group)
|
||||
.arg(id)
|
||||
.query_async(&mut conn)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check Redis connectivity.
|
||||
pub async fn ping(conn: &ConnectionManager) -> bool {
|
||||
let mut conn = conn.clone();
|
||||
let result: RedisResult<String> = redis::cmd("PING").query_async(&mut conn).await;
|
||||
result.is_ok()
|
||||
}
|
||||
|
||||
/// Parse the nested Redis stream response into flat (id, fields) pairs.
|
||||
fn parse_stream_response(values: Vec<redis::Value>) -> Vec<(String, HashMap<String, String>)> {
|
||||
let mut results = Vec::new();
|
||||
|
||||
// Response structure: [[stream_name, [[id, [field, value, ...]], ...]]]
|
||||
for stream_data in values {
|
||||
if let redis::Value::Array(stream_entries) = stream_data {
|
||||
// stream_entries = [stream_name, entries_array]
|
||||
if stream_entries.len() < 2 {
|
||||
continue;
|
||||
}
|
||||
if let redis::Value::Array(entries) = &stream_entries[1] {
|
||||
for entry in entries {
|
||||
if let redis::Value::Array(entry_parts) = entry {
|
||||
if entry_parts.len() < 2 {
|
||||
continue;
|
||||
}
|
||||
let id = match &entry_parts[0] {
|
||||
redis::Value::BulkString(b) => String::from_utf8_lossy(b).to_string(),
|
||||
_ => continue,
|
||||
};
|
||||
let mut fields = HashMap::new();
|
||||
if let redis::Value::Array(field_values) = &entry_parts[1] {
|
||||
let mut i = 0;
|
||||
while i + 1 < field_values.len() {
|
||||
let key = match &field_values[i] {
|
||||
redis::Value::BulkString(b) => {
|
||||
String::from_utf8_lossy(b).to_string()
|
||||
}
|
||||
_ => {
|
||||
i += 2;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let val = match &field_values[i + 1] {
|
||||
redis::Value::BulkString(b) => {
|
||||
String::from_utf8_lossy(b).to_string()
|
||||
}
|
||||
_ => String::new(),
|
||||
};
|
||||
fields.insert(key, val);
|
||||
i += 2;
|
||||
}
|
||||
}
|
||||
results.push((id, fields));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
results
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
use opentelemetry_sdk::runtime;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
|
||||
|
||||
/// Initialize tracing with OpenTelemetry OTLP export and structured JSON logging.
|
||||
pub fn init(service_name: &str, otlp_endpoint: &str, log_level: &str) {
|
||||
let env_filter = EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| EnvFilter::new(log_level));
|
||||
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.json()
|
||||
.with_target(true)
|
||||
.with_thread_ids(true);
|
||||
|
||||
// Try to set up OTel; fall back to fmt-only if it fails
|
||||
let otel_layer = setup_otel(service_name, otlp_endpoint).ok();
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(otel_layer)
|
||||
.with(env_filter)
|
||||
.with(fmt_layer)
|
||||
.init();
|
||||
}
|
||||
|
||||
fn setup_otel(
|
||||
service_name: &str,
|
||||
endpoint: &str,
|
||||
) -> Result<tracing_opentelemetry::OpenTelemetryLayer<tracing_subscriber::Registry, opentelemetry_sdk::trace::Tracer>, Box<dyn std::error::Error>> {
|
||||
let provider = opentelemetry_otlp::new_pipeline()
|
||||
.tracing()
|
||||
.with_exporter(
|
||||
opentelemetry_otlp::new_exporter()
|
||||
.tonic()
|
||||
.with_endpoint(endpoint),
|
||||
)
|
||||
.with_trace_config(
|
||||
opentelemetry_sdk::trace::Config::default().with_resource(
|
||||
opentelemetry_sdk::Resource::new(vec![
|
||||
opentelemetry::KeyValue::new("service.name", service_name.to_string()),
|
||||
]),
|
||||
),
|
||||
)
|
||||
.install_batch(runtime::Tokio)?;
|
||||
|
||||
let tracer = provider.tracer(service_name.to_string());
|
||||
opentelemetry::global::set_tracer_provider(provider);
|
||||
|
||||
Ok(tracing_opentelemetry::layer().with_tracer(tracer))
|
||||
}
|
||||
|
||||
/// Shutdown the OpenTelemetry tracer provider, flushing pending spans.
|
||||
pub fn shutdown() {
|
||||
opentelemetry::global::shutdown_tracer_provider();
|
||||
}
|
||||
Reference in New Issue
Block a user