58 lines
1.9 KiB
Rust
58 lines
1.9 KiB
Rust
use std::sync::Arc;
|
|
use tracing::{error, info};
|
|
|
|
use cxcloud_common::{event::Event, redis_streams};
|
|
|
|
use crate::{router, AppState};
|
|
|
|
/// Continuously consume from events.output and route to delivery handlers.
|
|
pub async fn run(state: Arc<AppState>) {
|
|
let consumer_name = format!("output-{}", uuid::Uuid::now_v7());
|
|
info!(consumer = %consumer_name, "Output consumer starting");
|
|
|
|
loop {
|
|
let entries = match redis_streams::xreadgroup(
|
|
&state.redis,
|
|
"output-group",
|
|
&consumer_name,
|
|
"events.output",
|
|
1,
|
|
5000,
|
|
)
|
|
.await
|
|
{
|
|
Ok(entries) => entries,
|
|
Err(e) => {
|
|
error!(error = %e, "Error reading from events.output");
|
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
for (msg_id, fields) in &entries {
|
|
{
|
|
match Event::from_stream_fields(&fields) {
|
|
Some(event) => {
|
|
info!(event_id = %event.id, event_type = %event.r#type, "Processing output event");
|
|
|
|
if let Err(e) = router::route_and_deliver(&state, &event).await {
|
|
error!(event_id = %event.id, error = %e, "Delivery failed");
|
|
}
|
|
|
|
// ACK the message
|
|
if let Err(e) =
|
|
redis_streams::xack(&state.redis, "events.output", "output-group", &msg_id)
|
|
.await
|
|
{
|
|
error!(msg_id = %msg_id, error = %e, "Failed to ACK");
|
|
}
|
|
}
|
|
None => {
|
|
error!(msg_id = %msg_id, "Failed to parse event");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|