vendor: update cxos-vendor-cargo

This commit is contained in:
cx-git-agent
2026-04-26 16:35:09 +00:00
committed by GitHub
parent 26cfdea312
commit dbba8e3da4
152 changed files with 39354 additions and 3 deletions
+7
View File
@@ -0,0 +1,7 @@
{
"git": {
"sha1": "927a31cbf65f78c3ef6b729631b2fc35335afe06",
"dirty": true
},
"path_in_vcs": "services/cxcloud-rs/crates/agent"
}
+2767
View File
File diff suppressed because it is too large Load Diff
+94
View File
@@ -0,0 +1,94 @@
# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
#
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
# to registry (e.g., crates.io) dependencies.
#
# If you are reading this file be aware that the original Cargo.toml
# will likely look very different (and much more reasonable).
# See Cargo.toml.orig for the original contents.
[package]
edition = "2021"
name = "cxcloud-agent"
version = "0.1.0"
build = false
publish = ["cxai"]
autolib = false
autobins = false
autoexamples = false
autotests = false
autobenches = false
readme = false
[[bin]]
name = "agent-service"
path = "src/main.rs"
[dependencies.anyhow]
version = "1"
[dependencies.axum]
version = "0.7"
features = ["macros"]
[dependencies.chrono]
version = "0.4"
features = ["serde"]
[dependencies.cxcloud-common]
version = "0.1.0"
registry-index = "sparse+https://git.cxllm-studio.com/api/packages/CxAI-LLM/cargo/"
[dependencies.cxcloud-proto]
version = "0.1.0"
registry-index = "sparse+https://git.cxllm-studio.com/api/packages/CxAI-LLM/cargo/"
[dependencies.prost]
version = "0.13"
[dependencies.prost-types]
version = "0.13"
[dependencies.redis]
version = "0.27"
features = [
"tokio-comp",
"connection-manager",
]
[dependencies.reqwest]
version = "0.12"
features = [
"json",
"rustls-tls",
]
default-features = false
[dependencies.serde]
version = "1"
features = ["derive"]
[dependencies.serde_json]
version = "1"
[dependencies.thiserror]
version = "2"
[dependencies.tokio]
version = "1"
features = ["full"]
[dependencies.tonic]
version = "0.12"
[dependencies.tracing]
version = "0.1"
[dependencies.uuid]
version = "1"
features = [
"v7",
"serde",
]
+27
View File
@@ -0,0 +1,27 @@
[package]
name = "cxcloud-agent"
version.workspace = true
edition.workspace = true
publish.workspace = true
[[bin]]
name = "agent-service"
path = "src/main.rs"
[dependencies]
cxcloud-common = { workspace = true }
cxcloud-proto = { workspace = true }
tokio = { workspace = true }
axum = { workspace = true }
tonic = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
reqwest = { workspace = true }
redis = { workspace = true }
tracing = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
anyhow = { workspace = true }
thiserror = { workspace = true }
+95
View File
@@ -0,0 +1,95 @@
use std::sync::Arc;
use tracing::{error, info, warn};
use cxcloud_common::{event::Event, redis_streams};
use crate::{grpc_clients, memory, planner, verifier, AppState};
const STREAM: &str = "events.raw";
const GROUP: &str = "agent-group";
const CONSUMER: &str = "agent-rs-1";
const STREAM_OUTPUT: &str = "events.output";
/// Main autonomous processing loop: consume events → plan → approve → execute → verify → output.
pub async fn run(state: Arc<AppState>) {
redis_streams::ensure_consumer_group(&state.redis, STREAM, GROUP).await;
info!("Agent consumer started on {STREAM} (group: {GROUP})");
loop {
let messages = match redis_streams::xreadgroup(&state.redis, GROUP, CONSUMER, STREAM, 1, 5000).await {
Ok(msgs) => msgs,
Err(e) => {
error!(error = %e, "Stream read error");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
continue;
}
};
for (stream_id, fields) in messages {
let event = match Event::from_stream_fields(&fields) {
Some(e) => e,
None => {
warn!(stream_id, "Failed to parse event from stream");
let _ = redis_streams::xack(&state.redis, STREAM, GROUP, &stream_id).await;
continue;
}
};
info!(event_id = %event.id, event_type = %event.r#type, "Processing event");
match process_event(&state, &event).await {
Ok(()) => {
info!(event_id = %event.id, "Event processed successfully");
}
Err(e) => {
error!(event_id = %event.id, error = %e, "Event processing failed");
}
}
let _ = redis_streams::xack(&state.redis, STREAM, GROUP, &stream_id).await;
}
}
}
async fn process_event(state: &AppState, event: &Event) -> anyhow::Result<()> {
// 1. Retrieve context from memory (ChromaDB RAG)
let context = memory::context::retrieve(&state.http_client, &state.chromadb_url, event).await;
// 2. Generate plan via LLM (Ollama)
let plan = planner::create_plan(&state.http_client, &state.ollama_url, event, &context).await?;
// 3. Request approval from Human Simulator
let approval = grpc_clients::request_approval(&plan).await;
match &approval {
Ok(decision) if decision == "REJECTED" => {
warn!(event_id = %event.id, "Plan rejected by policy engine");
return Ok(());
}
Err(e) => {
warn!(event_id = %event.id, error = %e, "Approval request failed, auto-approving");
}
_ => {}
}
// 4. Execute via Bot Service
let execution = grpc_clients::execute_plan(&plan).await;
// 5. Verify execution results
let verified = verifier::verify(&state.http_client, &state.ollama_url, &plan, &execution).await;
// 6. Store successful outcome in memory
if verified {
memory::store::save_outcome(&state.http_client, &state.chromadb_url, event, &plan).await;
}
// 7. Publish to output stream
let output_fields = vec![
("event_id".to_string(), event.id.clone()),
("plan".to_string(), plan.clone()),
("status".to_string(), if verified { "completed" } else { "failed" }.to_string()),
];
redis_streams::xadd(&state.redis, STREAM_OUTPUT, &output_fields).await?;
Ok(())
}
+61
View File
@@ -0,0 +1,61 @@
use cxcloud_common::config::{env_or, env_port};
use tracing::info;
/// Request approval from the Human Simulator service.
/// Falls back to HTTP if gRPC is unavailable.
pub async fn request_approval(plan: &str) -> Result<String, String> {
let host = env_or("HUMAN_SIMULATOR_HOST", "localhost");
let port = env_port("HUMAN_SIMULATOR_HTTP_PORT", 3002);
let url = format!("http://{host}:{port}/approve");
let client = reqwest::Client::new();
let resp = client
.post(&url)
.json(&serde_json::json!({
"request_id": uuid::Uuid::now_v7().to_string(),
"plan": plan,
}))
.send()
.await
.map_err(|e| format!("Human Simulator request failed: {e}"))?;
if resp.status().is_success() {
let body: serde_json::Value = resp.json().await.map_err(|e| format!("Parse error: {e}"))?;
let decision = body
.get("decision")
.and_then(|d| d.as_str())
.unwrap_or("APPROVED")
.to_string();
info!(decision, "Approval decision received");
Ok(decision)
} else {
Err(format!("Human Simulator returned {}", resp.status()))
}
}
/// Execute a plan via the Bot Service.
/// Falls back to HTTP if gRPC is unavailable.
pub async fn execute_plan(plan: &str) -> Result<String, String> {
let host = env_or("BOT_HOST", "localhost");
let port = env_port("BOT_HTTP_PORT", 8002);
let url = format!("http://{host}:{port}/execute");
let client = reqwest::Client::new();
let resp = client
.post(&url)
.json(&serde_json::json!({
"plan_id": uuid::Uuid::now_v7().to_string(),
"plan": plan,
}))
.send()
.await
.map_err(|e| format!("Bot execution request failed: {e}"))?;
if resp.status().is_success() {
let body: serde_json::Value = resp.json().await.map_err(|e| format!("Parse error: {e}"))?;
info!("Execution completed");
Ok(serde_json::to_string(&body).unwrap_or_default())
} else {
Err(format!("Bot Service returned {}", resp.status()))
}
}
+68
View File
@@ -0,0 +1,68 @@
use std::sync::Arc;
use tonic::{Request, Response, Status};
use tracing::info;
use cxcloud_proto::agent::{
agent_service_server::{AgentService, AgentServiceServer},
GetEventStatusRequest, GetEventStatusResponse, ProcessEventRequest, ProcessEventResponse,
};
use crate::AppState;
#[allow(dead_code)]
pub struct AgentServiceImpl {
state: Arc<AppState>,
}
#[tonic::async_trait]
impl AgentService for AgentServiceImpl {
async fn process_event(
&self,
request: Request<ProcessEventRequest>,
) -> Result<Response<ProcessEventResponse>, Status> {
let req = request.into_inner();
let event = req
.event
.ok_or_else(|| Status::invalid_argument("event is required"))?;
info!(event_id = %event.id, "Processing event via gRPC");
// In a full implementation, this would trigger the same pipeline as the consumer
let plan_id = uuid::Uuid::now_v7().to_string();
Ok(Response::new(ProcessEventResponse {
plan_id: plan_id.clone(),
plan: None, // Plan would be populated after LLM generation
}))
}
async fn get_event_status(
&self,
request: Request<GetEventStatusRequest>,
) -> Result<Response<GetEventStatusResponse>, Status> {
let req = request.into_inner();
Ok(Response::new(GetEventStatusResponse {
event_id: req.event_id,
plan_id: String::new(),
status: 0, // UNSPECIFIED
summary: "Status lookup not yet implemented".to_string(),
retry_count: 0,
}))
}
}
pub async fn serve(state: Arc<AppState>, port: u16) -> anyhow::Result<()> {
let addr = format!("0.0.0.0:{port}").parse()?;
let service = AgentServiceImpl { state };
info!(port, "Agent gRPC server starting");
tonic::transport::Server::builder()
.add_service(AgentServiceServer::new(service))
.serve(addr)
.await?;
Ok(())
}
+115
View File
@@ -0,0 +1,115 @@
mod consumer;
mod grpc_clients;
mod grpc_server;
pub mod memory;
mod planner;
mod verifier;
use axum::{extract::State, response::Json, routing::get, Router};
use std::sync::Arc;
use tracing::info;
use cxcloud_common::{
config::{env_or, env_port},
health::HealthResponse,
redis_streams, telemetry,
};
#[derive(Clone)]
pub struct AppState {
pub redis: redis::aio::ConnectionManager,
pub ollama_url: String,
pub chromadb_url: String,
pub http_client: reqwest::Client,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let redis_url = env_or("REDIS_URL", "redis://localhost:6379");
let otel_endpoint = env_or("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317");
let log_level = env_or("LOG_LEVEL", "info");
let ollama_url = env_or("OLLAMA_URL", "http://localhost:11434");
let chromadb_url = env_or("CHROMADB_URL", "http://localhost:8000");
telemetry::init("agent-service", &otel_endpoint, &log_level);
let redis_conn = redis_streams::connect(&redis_url).await?;
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()?;
let state = Arc::new(AppState {
redis: redis_conn.clone(),
ollama_url: ollama_url.clone(),
chromadb_url: chromadb_url.clone(),
http_client: http_client.clone(),
});
// Spawn Redis stream consumer (autonomous processing loop)
let consumer_state = state.clone();
tokio::spawn(async move {
consumer::run(consumer_state).await;
});
// Spawn gRPC server
let grpc_port = env_port("AGENT_GRPC_PORT", 50051);
let grpc_state = state.clone();
tokio::spawn(async move {
if let Err(e) = grpc_server::serve(grpc_state, grpc_port).await {
tracing::error!(error = %e, "gRPC server failed");
}
});
// HTTP server
let app = Router::new()
.route("/health", get(health))
.route("/metrics", get(metrics))
.with_state(state);
let http_port = env_port("AGENT_HTTP_PORT", 8001);
info!(http_port, grpc_port, "Agent service starting");
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{http_port}")).await?;
axum::serve(listener, app).await?;
telemetry::shutdown();
Ok(())
}
async fn health(State(state): State<Arc<AppState>>) -> Json<HealthResponse> {
let mut conn = state.redis.clone();
let redis_ok = redis_streams::ping(&mut conn).await;
let ollama_ok = state
.http_client
.get(format!("{}/api/tags", state.ollama_url))
.send()
.await
.map(|r| r.status().is_success())
.unwrap_or(false);
let chromadb_ok = state
.http_client
.get(format!("{}/api/v1/heartbeat", state.chromadb_url))
.send()
.await
.map(|r| r.status().is_success())
.unwrap_or(false);
let resp = HealthResponse::healthy("agent-service")
.with_dependency("redis", if redis_ok { "healthy" } else { "unhealthy" })
.with_dependency("ollama", if ollama_ok { "healthy" } else { "unhealthy" })
.with_dependency("chromadb", if chromadb_ok { "healthy" } else { "unhealthy" })
.compute_status();
Json(resp)
}
async fn metrics() -> Json<serde_json::Value> {
Json(serde_json::json!({
"service": "agent-service",
"events_processed": 0,
"plans_created": 0,
"plans_rejected": 0,
}))
}
+63
View File
@@ -0,0 +1,63 @@
use cxcloud_common::event::Event;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
const COLLECTION: &str = "cxcloud_memory";
#[derive(Debug, Serialize)]
struct QueryRequest {
query_texts: Vec<String>,
n_results: usize,
}
#[derive(Debug, Deserialize)]
struct QueryResponse {
documents: Option<Vec<Vec<String>>>,
}
/// Retrieve relevant context from ChromaDB via similarity search.
pub async fn retrieve(
client: &reqwest::Client,
chromadb_url: &str,
event: &Event,
) -> String {
let query_text = format!(
"Event type: {} source: {} payload: {}",
event.r#type,
event.source,
serde_json::to_string(&event.payload).unwrap_or_default()
);
let resp = client
.post(format!(
"{chromadb_url}/api/v1/collections/{COLLECTION}/query"
))
.json(&QueryRequest {
query_texts: vec![query_text],
n_results: 5,
})
.send()
.await;
match resp {
Ok(r) if r.status().is_success() => {
if let Ok(body) = r.json::<QueryResponse>().await {
if let Some(docs) = body.documents {
let context: Vec<String> = docs.into_iter().flatten().collect();
if !context.is_empty() {
debug!(count = context.len(), "Retrieved context from memory");
return context.join("\n---\n");
}
}
}
}
Ok(r) => {
debug!(status = %r.status(), "ChromaDB query returned non-success");
}
Err(e) => {
warn!(error = %e, "Failed to query ChromaDB for context");
}
}
String::new()
}
+2
View File
@@ -0,0 +1,2 @@
pub mod context;
pub mod store;
+57
View File
@@ -0,0 +1,57 @@
use cxcloud_common::event::Event;
use serde::Serialize;
use tracing::{debug, warn};
use uuid::Uuid;
const COLLECTION: &str = "cxcloud_memory";
#[derive(Debug, Serialize)]
struct AddRequest {
ids: Vec<String>,
documents: Vec<String>,
metadatas: Vec<serde_json::Value>,
}
/// Save a successful outcome to ChromaDB for future RAG retrieval.
pub async fn save_outcome(
client: &reqwest::Client,
chromadb_url: &str,
event: &Event,
plan: &str,
) {
let doc = format!(
"Event: {} ({})\nPlan: {}",
event.r#type, event.source, plan
);
let metadata = serde_json::json!({
"event_id": event.id,
"event_type": event.r#type,
"source": event.source,
"timestamp": event.timestamp.to_rfc3339(),
});
let resp = client
.post(format!(
"{chromadb_url}/api/v1/collections/{COLLECTION}/add"
))
.json(&AddRequest {
ids: vec![Uuid::now_v7().to_string()],
documents: vec![doc],
metadatas: vec![metadata],
})
.send()
.await;
match resp {
Ok(r) if r.status().is_success() => {
debug!(event_id = %event.id, "Saved outcome to memory");
}
Ok(r) => {
warn!(status = %r.status(), "Failed to save to ChromaDB");
}
Err(e) => {
warn!(error = %e, "Failed to connect to ChromaDB for save");
}
}
}
+73
View File
@@ -0,0 +1,73 @@
use cxcloud_common::event::Event;
use serde::{Deserialize, Serialize};
use tracing::info;
#[derive(Debug, Serialize)]
struct OllamaRequest {
model: String,
prompt: String,
stream: bool,
}
#[derive(Debug, Deserialize)]
struct OllamaResponse {
response: String,
}
/// Create a plan by sending the event + context to the local LLM (Ollama).
pub async fn create_plan(
client: &reqwest::Client,
ollama_url: &str,
event: &Event,
context: &str,
) -> anyhow::Result<String> {
let model = std::env::var("OLLAMA_MODEL").unwrap_or_else(|_| "mistral:7b".to_string());
let prompt = format!(
r#"You are an autonomous agent. Analyze this event and create an execution plan.
Event:
- ID: {}
- Source: {}
- Type: {}
- Payload: {}
Retrieved Context:
{}
Respond with a JSON plan containing:
- "goal": high-level description
- "reasoning": array of reasoning steps
- "tool_calls": array of tool invocations (tool_name, parameters)
Allowed tools: http_request, file_write, file_read, db_query, notification"#,
event.id,
event.source,
event.r#type,
serde_json::to_string_pretty(&event.payload).unwrap_or_default(),
if context.is_empty() { "No prior context available." } else { context },
);
info!(event_id = %event.id, model, "Generating plan via LLM");
let resp = client
.post(format!("{ollama_url}/api/generate"))
.json(&OllamaRequest {
model,
prompt,
stream: false,
})
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("Ollama returned {status}: {body}");
}
let ollama_resp: OllamaResponse = resp.json().await?;
info!(event_id = %event.id, "Plan generated");
Ok(ollama_resp.response)
}
+70
View File
@@ -0,0 +1,70 @@
use serde::{Deserialize, Serialize};
use tracing::info;
#[derive(Debug, Serialize)]
struct OllamaRequest {
model: String,
prompt: String,
stream: bool,
}
#[derive(Debug, Deserialize)]
struct OllamaResponse {
response: String,
}
/// Verify execution results by asking the LLM to evaluate success.
pub async fn verify(
client: &reqwest::Client,
ollama_url: &str,
plan: &str,
execution_result: &Result<String, String>,
) -> bool {
let model = std::env::var("OLLAMA_MODEL").unwrap_or_else(|_| "mistral:7b".to_string());
let exec_summary = match execution_result {
Ok(result) => format!("Execution succeeded:\n{result}"),
Err(err) => format!("Execution failed:\n{err}"),
};
let prompt = format!(
r#"You are a verification agent. Determine if this plan was executed successfully.
Plan:
{plan}
Execution Result:
{exec_summary}
Respond with ONLY "PASS" or "FAIL" followed by a brief reason."#,
);
let resp = client
.post(format!("{ollama_url}/api/generate"))
.json(&OllamaRequest {
model,
prompt,
stream: false,
})
.send()
.await;
match resp {
Ok(r) if r.status().is_success() => {
if let Ok(body) = r.json::<OllamaResponse>().await {
let passed = body.response.trim().starts_with("PASS");
info!(passed, reason = %body.response.lines().next().unwrap_or(""), "Verification result");
return passed;
}
}
Ok(r) => {
tracing::warn!(status = %r.status(), "Verification LLM returned error");
}
Err(e) => {
tracing::warn!(error = %e, "Verification request failed");
}
}
// Default to pass if verification is unavailable
true
}