vendor: update cargo-cxcloud-api-gateway-0.1.0

This commit is contained in:
cx-git-agent
2026-04-26 16:48:18 +00:00
committed by GitHub
parent 1c3663e48a
commit fd036d832b
10 changed files with 3525 additions and 3 deletions
+7
View File
@@ -0,0 +1,7 @@
{
"git": {
"sha1": "927a31cbf65f78c3ef6b729631b2fc35335afe06",
"dirty": true
},
"path_in_vcs": "services/cxcloud-rs/crates/api-gateway"
}
Generated
+3027
View File
File diff suppressed because it is too large Load Diff
+100
View File
@@ -0,0 +1,100 @@
# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
#
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
# to registry (e.g., crates.io) dependencies.
#
# If you are reading this file be aware that the original Cargo.toml
# will likely look very different (and much more reasonable).
# See Cargo.toml.orig for the original contents.
[package]
edition = "2021"
name = "cxcloud-api-gateway"
version = "0.1.0"
build = false
publish = ["cxai"]
autolib = false
autobins = false
autoexamples = false
autotests = false
autobenches = false
readme = false
[[bin]]
name = "api-gateway"
path = "src/main.rs"
[dependencies.anyhow]
version = "1"
[dependencies.axum]
version = "0.7"
features = ["macros"]
[dependencies.chrono]
version = "0.4"
features = ["serde"]
[dependencies.cxcloud-common]
version = "0.1.0"
registry-index = "sparse+https://git.cxllm-studio.com/api/packages/CxAI-LLM/cargo/"
[dependencies.cxcloud-proto]
version = "0.1.0"
registry-index = "sparse+https://git.cxllm-studio.com/api/packages/CxAI-LLM/cargo/"
[dependencies.governor]
version = "0.6"
[dependencies.jsonwebtoken]
version = "9"
[dependencies.redis]
version = "0.27"
features = [
"tokio-comp",
"connection-manager",
]
[dependencies.reqwest]
version = "0.12"
features = [
"json",
"rustls-tls",
]
default-features = false
[dependencies.serde]
version = "1"
features = ["derive"]
[dependencies.serde_json]
version = "1"
[dependencies.tokio]
version = "1"
features = ["full"]
[dependencies.tower]
version = "0.4"
[dependencies.tower-http]
version = "0.5"
features = [
"fs",
"cors",
"trace",
"timeout",
]
[dependencies.tracing]
version = "0.1"
[dependencies.uuid]
version = "1"
features = [
"v7",
"serde",
]
+27
View File
@@ -0,0 +1,27 @@
[package]
name = "cxcloud-api-gateway"
version.workspace = true
edition.workspace = true
publish.workspace = true
[[bin]]
name = "api-gateway"
path = "src/main.rs"
[dependencies]
cxcloud-common = { workspace = true }
cxcloud-proto = { workspace = true }
tokio = { workspace = true }
axum = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
reqwest = { workspace = true }
redis = { workspace = true }
jsonwebtoken = { workspace = true }
governor = { workspace = true }
tracing = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
anyhow = { workspace = true }
-3
View File
@@ -1,3 +0,0 @@
# cargo-cxcloud-api-gateway-0.1.0
Cargo crate: cxcloud-api-gateway-0.1.0
+79
View File
@@ -0,0 +1,79 @@
use std::sync::Arc;
use axum::{
body::Body,
extract::{Request, State},
http::{HeaderMap, StatusCode},
middleware::Next,
response::Response,
};
use chrono::Utc;
use jsonwebtoken::{encode, EncodingKey, Header};
use serde::{Deserialize, Serialize};
use tracing::warn;
use crate::AppState;
#[derive(Debug, Serialize, Deserialize)]
struct Claims {
sub: String,
iss: String,
exp: usize,
iat: usize,
}
/// Middleware that validates X-API-Key and injects a short-lived JWT.
pub async fn auth_middleware(
State(state): State<Arc<AppState>>,
mut req: Request<Body>,
next: Next,
) -> Result<Response, StatusCode> {
let path = req.uri().path();
// Skip auth for health/ready/metrics
if matches!(path, "/health" | "/ready" | "/metrics") {
return Ok(next.run(req).await);
}
let api_key = extract_api_key(req.headers());
match api_key {
Some(key) if key == state.config.api_key => {
// Mint a short-lived JWT for downstream services
let now = Utc::now().timestamp() as usize;
let claims = Claims {
sub: "api-client".to_string(),
iss: state.config.jwt_issuer.clone(),
exp: now + state.config.jwt_expiry_seconds as usize,
iat: now,
};
if let Ok(token) = encode(
&Header::default(),
&claims,
&EncodingKey::from_secret(state.config.jwt_secret.as_bytes()),
) {
req.headers_mut().insert(
"Authorization",
format!("Bearer {token}").parse().unwrap(),
);
}
Ok(next.run(req).await)
}
Some(_) => {
warn!("Invalid API key");
Err(StatusCode::UNAUTHORIZED)
}
None => {
warn!("Missing X-API-Key header");
Err(StatusCode::UNAUTHORIZED)
}
}
}
fn extract_api_key(headers: &HeaderMap) -> Option<String> {
headers
.get("x-api-key")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
}
+60
View File
@@ -0,0 +1,60 @@
use cxcloud_common::config::{env_or, env_port, env_u64, ServiceConfig};
#[derive(Debug, Clone)]
pub struct GatewayConfig {
pub base: ServiceConfig,
pub port: u16,
pub jwt_secret: String,
pub jwt_issuer: String,
pub jwt_expiry_seconds: u64,
pub api_key: String,
pub rate_limit_rps: u32,
pub rate_limit_burst: u32,
pub upstream_input_gateway: String,
pub upstream_agent: String,
pub upstream_bot: String,
pub upstream_human_simulator: String,
pub upstream_output: String,
}
impl GatewayConfig {
pub fn from_env() -> Self {
let agent_host = env_or("AGENT_HOST", "localhost");
let agent_port = env_port("AGENT_HTTP_PORT", 8001);
let bot_host = env_or("BOT_HOST", "localhost");
let bot_port = env_port("BOT_HTTP_PORT", 8002);
let input_host = env_or("INPUT_GATEWAY_HOST", "localhost");
let input_port = env_port("INPUT_GATEWAY_PORT", 3001);
let human_host = env_or("HUMAN_SIMULATOR_HOST", "localhost");
let human_port = env_port("HUMAN_SIMULATOR_HTTP_PORT", 3002);
let output_host = env_or("OUTPUT_HOST", "localhost");
let output_port = env_port("OUTPUT_HTTP_PORT", 8003);
Self {
base: ServiceConfig::from_env("api-gateway"),
port: env_port("API_GATEWAY_PORT", 8080),
jwt_secret: env_or("JWT_SECRET", "dev-secret-change-me"),
jwt_issuer: env_or("JWT_ISSUER", "cxcloud-api-gateway"),
jwt_expiry_seconds: env_u64("JWT_EXPIRY_SECONDS", 300),
api_key: env_or("API_KEY_DEV", "dev-test-key"),
rate_limit_rps: env_u64("RATE_LIMIT_RPS", 10) as u32,
rate_limit_burst: env_u64("RATE_LIMIT_BURST", 20) as u32,
upstream_input_gateway: format!("http://{input_host}:{input_port}"),
upstream_agent: format!("http://{agent_host}:{agent_port}"),
upstream_bot: format!("http://{bot_host}:{bot_port}"),
upstream_human_simulator: format!("http://{human_host}:{human_port}"),
upstream_output: format!("http://{output_host}:{output_port}"),
}
}
pub fn upstream_for_prefix(&self, prefix: &str) -> Option<&str> {
match prefix {
"webhook" => Some(&self.upstream_input_gateway),
"agent" => Some(&self.upstream_agent),
"bot" => Some(&self.upstream_bot),
"policies" => Some(&self.upstream_human_simulator),
"deliver" => Some(&self.upstream_output),
_ => None,
}
}
}
+97
View File
@@ -0,0 +1,97 @@
mod auth;
mod config;
mod proxy;
mod ratelimit;
use axum::{
extract::State,
http::StatusCode,
response::Json,
routing::{any, get},
Router,
};
use std::sync::Arc;
use tracing::info;
use cxcloud_common::{config::env_port, health::HealthResponse, telemetry};
#[derive(Clone)]
pub struct AppState {
pub config: config::GatewayConfig,
pub redis: redis::aio::ConnectionManager,
pub http_client: reqwest::Client,
pub rate_limiter: ratelimit::RateLimiter,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cfg = config::GatewayConfig::from_env();
telemetry::init("api-gateway", &cfg.base.otel_endpoint, &cfg.base.log_level);
let redis_conn = cxcloud_common::redis_streams::connect(&cfg.base.redis_url).await?;
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()?;
let rate_limiter = ratelimit::new(cfg.rate_limit_rps, cfg.rate_limit_burst);
let state = Arc::new(AppState {
config: cfg.clone(),
redis: redis_conn,
http_client,
rate_limiter,
});
let app = Router::new()
.route("/health", get(health))
.route("/ready", get(ready))
.route("/metrics", get(metrics))
.route("/webhook/{*path}", any(proxy::proxy_handler))
.route("/agent/{*path}", any(proxy::proxy_handler))
.route("/bot/{*path}", any(proxy::proxy_handler))
.route("/policies/{*path}", any(proxy::proxy_handler))
.route("/deliver/{*path}", any(proxy::proxy_handler))
.layer(axum::middleware::from_fn_with_state(
state.clone(),
auth::auth_middleware,
))
.layer(axum::middleware::from_fn_with_state(
state.clone(),
ratelimit::rate_limit_middleware,
))
.with_state(state);
let port = env_port("API_GATEWAY_PORT", 8080);
let addr = format!("0.0.0.0:{port}");
info!(port, "API Gateway starting");
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app).await?;
telemetry::shutdown();
Ok(())
}
async fn health(State(state): State<Arc<AppState>>) -> Json<HealthResponse> {
let redis_ok = cxcloud_common::redis_streams::ping(&state.redis).await;
let resp = HealthResponse::healthy("api-gateway")
.with_dependency("redis", if redis_ok { "healthy" } else { "unhealthy" })
.compute_status();
Json(resp)
}
async fn ready(State(state): State<Arc<AppState>>) -> StatusCode {
if cxcloud_common::redis_streams::ping(&state.redis).await {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
}
}
async fn metrics() -> Json<serde_json::Value> {
Json(serde_json::json!({
"service": "api-gateway",
"requests_total": 0,
"auth_failures": 0,
"rate_limited": 0,
}))
}
+88
View File
@@ -0,0 +1,88 @@
use std::sync::Arc;
use axum::{
body::Body,
extract::{Request, State},
http::StatusCode,
response::Response,
};
use tracing::{error, info};
use crate::AppState;
/// Reverse proxy handler: routes requests to upstream services based on path prefix.
pub async fn proxy_handler(
State(state): State<Arc<AppState>>,
req: Request<Body>,
) -> Result<Response, StatusCode> {
let path = req.uri().path();
let prefix = path
.trim_start_matches('/')
.split('/')
.next()
.unwrap_or("");
let upstream_base = state
.config
.upstream_for_prefix(prefix)
.ok_or(StatusCode::NOT_FOUND)?;
// Strip the prefix and forward the rest
let remaining = path
.trim_start_matches('/')
.strip_prefix(prefix)
.unwrap_or("");
let upstream_url = format!("{upstream_base}{remaining}");
info!(upstream_url, method = %req.method(), "Proxying request");
let method = req.method().clone();
let headers = req.headers().clone();
// Read request body
let body_bytes = axum::body::to_bytes(req.into_body(), 10 * 1024 * 1024)
.await
.map_err(|_| StatusCode::BAD_REQUEST)?;
// Build upstream request
let mut upstream_req = state.http_client.request(method, &upstream_url);
// Forward relevant headers
for (name, value) in headers.iter() {
if !matches!(
name.as_str(),
"host" | "connection" | "transfer-encoding" | "x-api-key"
) {
upstream_req = upstream_req.header(name, value);
}
}
if !body_bytes.is_empty() {
upstream_req = upstream_req.body(body_bytes);
}
// Send to upstream
let upstream_resp = upstream_req.send().await.map_err(|e| {
error!(error = %e, upstream_url, "Upstream request failed");
StatusCode::BAD_GATEWAY
})?;
// Convert upstream response back to axum response
let status = StatusCode::from_u16(upstream_resp.status().as_u16())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let mut response_builder = Response::builder().status(status);
for (name, value) in upstream_resp.headers() {
response_builder = response_builder.header(name, value);
}
let body = upstream_resp.bytes().await.map_err(|e| {
error!(error = %e, "Failed to read upstream response body");
StatusCode::BAD_GATEWAY
})?;
response_builder
.body(Body::from(body))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
}
+40
View File
@@ -0,0 +1,40 @@
use std::num::NonZeroU32;
use std::sync::Arc;
use axum::{
body::Body,
extract::{Request, State},
http::StatusCode,
middleware::Next,
response::Response,
};
use governor::{Quota, RateLimiter as GovLimiter, clock::DefaultClock, state::{InMemoryState, NotKeyed}};
pub type RateLimiter = Arc<GovLimiter<NotKeyed, InMemoryState, DefaultClock>>;
pub fn new(rps: u32, burst: u32) -> RateLimiter {
let quota = Quota::per_second(NonZeroU32::new(rps).unwrap_or(NonZeroU32::new(10).unwrap()))
.allow_burst(NonZeroU32::new(burst).unwrap_or(NonZeroU32::new(20).unwrap()));
Arc::new(GovLimiter::direct(quota))
}
pub async fn rate_limit_middleware(
State(state): State<Arc<crate::AppState>>,
req: Request<Body>,
next: Next,
) -> Result<Response, StatusCode> {
let path = req.uri().path();
// Skip rate limiting for health/ready/metrics
if matches!(path, "/health" | "/ready" | "/metrics") {
return Ok(next.run(req).await);
}
match state.rate_limiter.check() {
Ok(_) => Ok(next.run(req).await),
Err(_) => {
tracing::warn!("Rate limit exceeded");
Err(StatusCode::TOO_MANY_REQUESTS)
}
}
}