vendor: update cargo-cxcloud-input-gateway-0.1.0

This commit is contained in:
cx-git-agent
2026-04-26 16:48:23 +00:00
committed by GitHub
parent bf96f4bc43
commit aa7563292f
10 changed files with 3432 additions and 3 deletions
+7
View File
@@ -0,0 +1,7 @@
{
"git": {
"sha1": "927a31cbf65f78c3ef6b729631b2fc35335afe06",
"dirty": true
},
"path_in_vcs": "services/cxcloud-rs/crates/input-gateway"
}
Generated
+2972
View File
File diff suppressed because it is too large Load Diff
+97
View File
@@ -0,0 +1,97 @@
# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
#
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
# to registry (e.g., crates.io) dependencies.
#
# If you are reading this file be aware that the original Cargo.toml
# will likely look very different (and much more reasonable).
# See Cargo.toml.orig for the original contents.
[package]
edition = "2021"
name = "cxcloud-input-gateway"
version = "0.1.0"
build = false
publish = ["cxai"]
autolib = false
autobins = false
autoexamples = false
autotests = false
autobenches = false
readme = false
[[bin]]
name = "input-gateway"
path = "src/main.rs"
[dependencies.anyhow]
version = "1"
[dependencies.axum]
version = "0.7"
features = ["macros"]
[dependencies.chrono]
version = "0.4"
features = ["serde"]
[dependencies.cxcloud-common]
version = "0.1.0"
registry-index = "sparse+https://git.cxllm-studio.com/api/packages/CxAI-LLM/cargo/"
[dependencies.cxcloud-proto]
version = "0.1.0"
registry-index = "sparse+https://git.cxllm-studio.com/api/packages/CxAI-LLM/cargo/"
[dependencies.notify]
version = "7"
[dependencies.redis]
version = "0.27"
features = [
"tokio-comp",
"connection-manager",
]
[dependencies.reqwest]
version = "0.12"
features = [
"json",
"rustls-tls",
]
default-features = false
[dependencies.serde]
version = "1"
features = ["derive"]
[dependencies.serde_json]
version = "1"
[dependencies.tokio]
version = "1"
features = ["full"]
[dependencies.tower]
version = "0.4"
[dependencies.tower-http]
version = "0.5"
features = [
"fs",
"cors",
"trace",
"timeout",
]
[dependencies.tracing]
version = "0.1"
[dependencies.uuid]
version = "1"
features = [
"v7",
"serde",
]
+26
View File
@@ -0,0 +1,26 @@
[package]
name = "cxcloud-input-gateway"
version.workspace = true
edition.workspace = true
publish.workspace = true
[[bin]]
name = "input-gateway"
path = "src/main.rs"
[dependencies]
cxcloud-common = { workspace = true }
cxcloud-proto = { workspace = true }
tokio = { workspace = true }
axum = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
reqwest = { workspace = true }
redis = { workspace = true }
notify = { workspace = true }
tracing = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
anyhow = { workspace = true }
-3
View File
@@ -1,3 +0,0 @@
# cargo-cxcloud-input-gateway-0.1.0
Cargo crate: cxcloud-input-gateway-0.1.0
+60
View File
@@ -0,0 +1,60 @@
use notify::{Event as NotifyEvent, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::Path;
use tokio::sync::mpsc;
use tracing::{error, info, warn};
use cxcloud_common::event::Event;
/// Watch a directory for new/modified files and emit events.
pub async fn watch(watch_path: &str, tx: mpsc::Sender<Event>) -> anyhow::Result<()> {
let path = Path::new(watch_path);
if !path.exists() {
warn!(path = %watch_path, "Watch path does not exist, skipping filesystem watcher");
// Keep the task alive but idle
loop {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
}
info!(path = %watch_path, "Starting filesystem watcher");
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::channel::<NotifyEvent>(100);
let mut watcher = RecommendedWatcher::new(
move |result: Result<NotifyEvent, notify::Error>| {
match result {
Ok(event) => {
let _ = notify_tx.blocking_send(event);
}
Err(e) => error!(error = %e, "Filesystem watch error"),
}
},
notify::Config::default(),
)?;
watcher.watch(path, RecursiveMode::Recursive)?;
while let Some(event) = notify_rx.recv().await {
match event.kind {
EventKind::Create(_) | EventKind::Modify(_) => {
for path in &event.paths {
let file_path = path.to_string_lossy().to_string();
info!(file = %file_path, "File change detected");
let payload = serde_json::json!({
"file_path": file_path,
"event_kind": format!("{:?}", event.kind),
});
let evt = Event::new("filesystem", "file_changed", payload);
if let Err(e) = tx.send(evt).await {
error!(error = %e, "Failed to send filesystem event");
}
}
}
_ => {} // Ignore deletes, access, etc.
}
}
Ok(())
}
+118
View File
@@ -0,0 +1,118 @@
mod filesystem;
mod normalizer;
mod poller;
mod webhook;
use axum::{
extract::State,
http::StatusCode,
response::Json,
routing::{get, post},
Router,
};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::info;
use cxcloud_common::{
config::{env_or, env_port, env_u64},
event::Event,
health::HealthResponse,
redis_streams, telemetry,
};
#[derive(Clone)]
pub struct AppState {
pub redis: redis::aio::ConnectionManager,
pub event_tx: mpsc::Sender<Event>,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let redis_url = env_or("REDIS_URL", "redis://localhost:6379");
let otel_endpoint = env_or("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317");
let log_level = env_or("LOG_LEVEL", "info");
telemetry::init("input-gateway", &otel_endpoint, &log_level);
let redis_conn = redis_streams::connect(&redis_url).await?;
// Channel for all input sources → normalizer → Redis
let (event_tx, event_rx) = mpsc::channel::<Event>(1000);
// Spawn normalizer: reads from channel, publishes to Redis stream
let norm_redis = redis_conn.clone();
tokio::spawn(async move {
normalizer::run(event_rx, &norm_redis).await;
});
// Spawn filesystem watcher
let watch_paths = env_or("WATCH_PATHS", "/data/watched");
let fs_tx = event_tx.clone();
tokio::spawn(async move {
if let Err(e) = filesystem::watch(&watch_paths, fs_tx).await {
tracing::error!(error = %e, "Filesystem watcher failed");
}
});
// Spawn API poller
let poll_endpoints = env_or("POLL_ENDPOINTS", "");
let poll_interval = env_u64("POLL_INTERVAL_SECONDS", 60);
if !poll_endpoints.is_empty() {
let poll_tx = event_tx.clone();
tokio::spawn(async move {
poller::run(&poll_endpoints, poll_interval, poll_tx).await;
});
}
// Spawn feedback consumer
let feedback_redis = redis_conn.clone();
tokio::spawn(async move {
normalizer::consume_feedback(&feedback_redis).await;
});
let state = Arc::new(AppState {
redis: redis_conn,
event_tx,
});
let app = Router::new()
.route("/health", get(health))
.route("/ready", get(ready))
.route("/metrics", get(metrics))
.route("/webhook", post(webhook::webhook_handler))
.with_state(state);
let port = env_port("INPUT_GATEWAY_PORT", 3001);
info!(port, "Input Gateway starting");
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{port}")).await?;
axum::serve(listener, app).await?;
telemetry::shutdown();
Ok(())
}
async fn health(State(state): State<Arc<AppState>>) -> Json<HealthResponse> {
let redis_ok = redis_streams::ping(&state.redis).await;
let resp = HealthResponse::healthy("input-gateway")
.with_dependency("redis", if redis_ok { "healthy" } else { "unhealthy" })
.compute_status();
Json(resp)
}
async fn ready(State(state): State<Arc<AppState>>) -> StatusCode {
if redis_streams::ping(&state.redis).await {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
}
}
async fn metrics() -> Json<serde_json::Value> {
Json(serde_json::json!({
"service": "input-gateway",
"events_published": 0,
"errors": 0,
}))
}
+63
View File
@@ -0,0 +1,63 @@
use redis::aio::ConnectionManager;
use tokio::sync::mpsc;
use tracing::{debug, error, info};
use cxcloud_common::{event::Event, redis_streams};
const STREAM_RAW: &str = "events.raw";
const STREAM_FEEDBACK: &str = "events.feedback";
const FEEDBACK_GROUP: &str = "input-feedback-group";
/// Consume events from the channel and publish to Redis stream.
pub async fn run(mut rx: mpsc::Receiver<Event>, redis: &ConnectionManager) {
info!("Normalizer started, publishing to {STREAM_RAW}");
while let Some(event) = rx.recv().await {
let fields = event.to_stream_fields();
match redis_streams::xadd(redis, STREAM_RAW, &fields).await {
Ok(id) => {
debug!(
stream_id = %id,
event_id = %event.id,
source = %event.source,
"Published event to stream"
);
}
Err(e) => {
error!(
error = %e,
event_id = %event.id,
"Failed to publish event to Redis stream"
);
}
}
}
}
/// Consume feedback events from Redis (events.feedback stream).
pub async fn consume_feedback(redis: &ConnectionManager) {
redis_streams::ensure_consumer_group(redis, STREAM_FEEDBACK, FEEDBACK_GROUP).await;
info!("Feedback consumer started on {STREAM_FEEDBACK}");
loop {
match redis_streams::xreadgroup(redis, FEEDBACK_GROUP, "input-gw-1", STREAM_FEEDBACK, 10, 5000).await {
Ok(messages) => {
for (stream_id, fields) in messages {
if let Some(event) = Event::from_stream_fields(&fields) {
info!(
event_id = %event.id,
source = %event.source,
"Received feedback event"
);
}
let _ = redis_streams::xack(redis, STREAM_FEEDBACK, FEEDBACK_GROUP, &stream_id).await;
}
}
Err(e) => {
error!(error = %e, "Feedback consumer error");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
}
+47
View File
@@ -0,0 +1,47 @@
use tokio::sync::mpsc;
use tracing::{error, info, warn};
use cxcloud_common::event::Event;
/// Periodically poll external API endpoints and emit events.
pub async fn run(endpoints: &str, interval_secs: u64, tx: mpsc::Sender<Event>) {
let urls: Vec<&str> = endpoints.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).collect();
if urls.is_empty() {
info!("No poll endpoints configured, poller idle");
return;
}
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.expect("Failed to create HTTP client");
let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
info!(count = urls.len(), interval_secs, "Starting API poller");
loop {
interval.tick().await;
for url in &urls {
match client.get(*url).send().await {
Ok(resp) if resp.status().is_success() => {
match resp.json::<serde_json::Value>().await {
Ok(body) => {
let event = Event::new("api_poll", "poll_response", body);
if let Err(e) = tx.send(event).await {
error!(error = %e, url, "Failed to send poll event");
}
}
Err(e) => warn!(error = %e, url, "Failed to parse poll response"),
}
}
Ok(resp) => {
warn!(status = %resp.status(), url, "Poll endpoint returned error");
}
Err(e) => {
warn!(error = %e, url, "Failed to reach poll endpoint");
}
}
}
}
}
+42
View File
@@ -0,0 +1,42 @@
use std::sync::Arc;
use axum::{extract::State, http::StatusCode, response::Json};
use serde::Deserialize;
use tracing::{error, info};
use cxcloud_common::event::Event;
use crate::AppState;
#[derive(Deserialize)]
pub struct WebhookPayload {
#[serde(default = "default_type")]
pub r#type: String,
pub payload: serde_json::Value,
#[serde(default)]
pub metadata: std::collections::HashMap<String, String>,
}
fn default_type() -> String {
"webhook_event".to_string()
}
pub async fn webhook_handler(
State(state): State<Arc<AppState>>,
Json(body): Json<WebhookPayload>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let mut event = Event::new("webhook", &body.r#type, body.payload);
event.metadata = body.metadata;
info!(event_id = %event.id, event_type = %event.r#type, "Received webhook event");
state.event_tx.send(event.clone()).await.map_err(|e| {
error!(error = %e, "Failed to send event to normalizer");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(serde_json::json!({
"id": event.id,
"status": "accepted",
})))
}