diff --git a/AGENT.md b/AGENT.md index aacc2d5..773864c 100644 --- a/AGENT.md +++ b/AGENT.md @@ -21,6 +21,28 @@ copied to any other operation. --- +## syva-core CP Mode vs Legacy Mode + +`syva-core` supports two zone ingestion paths that share the same in-process +`ZoneRegistry` and `EnforceEbpf`: + +1. Local gRPC surface. Adapters push zones to `syva-core` directly. This is + the v0.2 architecture and remains the default. +2. CP mode (`--cp-endpoint`). `syva-core` connects to a remote `syva-cp`, + registers as a node, and consumes assignments via server-streaming. The + reconcile loop lives in `syva-core/src/cp_reconcile/`. + +Both paths call the same in-process mutation helpers. CP mode is additive, not +a replacement yet. Session 4b will migrate adapters to push to `syva-cp` +instead of `syva-core`. + +The reconciler keeps in-memory state only (`AppliedState`). On restart, a fresh +subscription receives a `FULL_SNAPSHOT` from `syva-cp` and reconstructs desired +state. The only local persistence in CP mode is the `node-id` file used for +re-registration. + +--- + ## Mental Model: How to Think About This Codebase Syvä is a kernel enforcement boundary. Every line of eBPF code runs in a diff --git a/Cargo.lock b/Cargo.lock index da9bd11..095d0d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -371,8 +371,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -3092,6 +3094,7 @@ dependencies = [ "prost", "serde", "serde_json", + "syva-cp-client", "syva-ebpf-common", "syva-proto", "tokio", @@ -3100,6 +3103,7 @@ dependencies = [ "tonic", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -3131,6 +3135,29 @@ dependencies = [ "uuid", ] +[[package]] +name = "syva-cp-client" +version = "0.2.0" +dependencies = [ + "anyhow", + "chrono", + "prost", + "prost-types", + "serde", + "serde_json", + "sqlx", + "syva-cp", + "syva-proto", + "tempfile", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "ulid", + "uuid", +] + [[package]] name = "syva-ebpf-common" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 51c8ee3..df0266f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ "syva-proto", + "syva-cp-client", "syva-core", "syva-cp", "syva-adapter-file", diff --git a/syva-core/Cargo.toml b/syva-core/Cargo.toml index ce57be0..56218af 100644 --- a/syva-core/Cargo.toml +++ b/syva-core/Cargo.toml @@ -9,6 +9,7 @@ path = "src/main.rs" [dependencies] syva-proto = { path = "../syva-proto" } +syva-cp-client = { path = "../syva-cp-client" } syva-ebpf-common = { path = "../syva-ebpf-common", features = ["userspace"] } aya = { workspace = true } tonic = { workspace = true } @@ -24,3 +25,4 @@ serde_json = { workspace = true } clap = { workspace = true } libc = "0.2" prost = { workspace = true } +uuid = { version = "1", features = ["v4", "serde"] } diff --git a/syva-core/README.md b/syva-core/README.md new file mode 100644 index 0000000..fca39d6 --- /dev/null +++ b/syva-core/README.md @@ -0,0 +1,40 @@ +# syva-core + +Kernel enforcement engine for Syva. `syva-core` owns the in-process +`ZoneRegistry` and `EnforceEbpf` state and exposes a local gRPC surface for +adapters. + +## Operational Modes + +`syva-core` runs in one of two modes: + +### Legacy Mode + +Adapters (`syva-adapter-file`, `syva-adapter-k8s`, `syva-adapter-api`) connect +to the local gRPC surface and push zones directly. + +Start: + +```bash +syva-core --socket-path /run/syva/syva-core.sock +``` + +### CP Mode + +`syva-core` connects to a remote `syva-cp`, registers as a node, subscribes to +assignment updates, and reconciles its BPF maps to match desired state. Legacy +adapters can still push to the local gRPC surface in addition. + +Start: + +```bash +syva-core \ + --cp-endpoint http://syva-cp.cluster.local:50051 \ + --node-name "$(hostname)" \ + --node-labels "tier=prod,region=eu" \ + --fingerprint-path /etc/machine-id \ + --node-id-path /var/lib/syva/node-id +``` + +The node ID is persisted to `--node-id-path` so restarts appear as +re-registration of the same node rather than fresh registration. diff --git a/syva-core/src/cp_reconcile/mod.rs b/syva-core/src/cp_reconcile/mod.rs new file mode 100644 index 0000000..dcb756f --- /dev/null +++ b/syva-core/src/cp_reconcile/mod.rs @@ -0,0 +1,317 @@ +//! CP-mode reconciler. +//! +//! When syva-core runs with `--cp-endpoint`, this module drives the +//! assignment-to-BPF loop: +//! +//! 1. Receive a `NodeAssignmentUpdate` from syva-cp +//! 2. Diff against the last applied state +//! 3. Reuse the same registry/BPF mutation helpers as the local gRPC path +//! 4. Report applied or failed status back to syva-cp + +#![allow(dead_code)] + +pub mod state; + +use crate::ebpf::EnforceEbpf; +use crate::health::SharedHealth; +use crate::rpc::{ + allow_comm_local, deny_comm_local, register_zone_local, remove_zone_local, CoreZonePolicyInput, +}; +use crate::types::ZoneType; +use crate::zone::ZoneRegistry; +use serde::Deserialize; +use state::AppliedState; +use std::sync::Arc; +use syva_cp_client::{ + AppliedReport, CpClient, FailedReport, NodeAssignmentUpdate, ZoneAssignment, +}; +use syva_proto::syva_control::v1::node_assignment_update::Kind as UpdateKind; +use tokio::sync::{Mutex, RwLock}; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +pub struct Reconciler { + cp: CpClient, + registry: Arc>, + ebpf: Arc>, + health: SharedHealth, + applied: Arc>, +} + +impl Reconciler { + pub fn new( + cp: CpClient, + registry: Arc>, + ebpf: Arc>, + health: SharedHealth, + ) -> Self { + Self { + cp, + registry, + ebpf, + health, + applied: Arc::new(Mutex::new(AppliedState::new())), + } + } + + pub async fn run(self) { + let mut backoff_ms = 250_u64; + let max_backoff_ms = 30_000_u64; + + loop { + match self.run_once().await { + Ok(()) => { + info!("reconcile stream closed by server, reconnecting"); + backoff_ms = 250; + } + Err(error) => { + warn!("reconcile stream error: {error}; reconnecting in {backoff_ms}ms"); + tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await; + backoff_ms = (backoff_ms * 2).min(max_backoff_ms); + } + } + } + } + + async fn run_once(&self) -> anyhow::Result<()> { + let mut stream = self.cp.subscribe_assignments().await?; + + while let Some(update) = stream.message().await? { + self.handle_update(update).await; + } + + Ok(()) + } + + async fn handle_update(&self, update: NodeAssignmentUpdate) { + let kind = UpdateKind::try_from(update.kind).unwrap_or(UpdateKind::FullSnapshot); + + match kind { + UpdateKind::FullSnapshot => self.apply_full_snapshot(update.assignments).await, + UpdateKind::Upsert => self.apply_upserts(update.assignments).await, + UpdateKind::Remove => self.apply_remove(&update.removed_zone_id).await, + } + } + + async fn apply_full_snapshot(&self, desired: Vec) { + info!("applying FULL_SNAPSHOT with {} assignments", desired.len()); + + let (to_apply, to_remove) = { + let applied = self.applied.lock().await; + applied.diff_against_snapshot(&desired) + }; + + self.apply_upserts(to_apply).await; + for zone_id in to_remove { + self.apply_remove(&zone_id).await; + } + } + + async fn apply_upserts(&self, assignments: Vec) { + let mut applied_reports = Vec::new(); + let mut failed_reports = Vec::new(); + + for assignment in assignments { + match self.apply_one_upsert(&assignment).await { + Ok(policy) => { + let assignment_id = match Uuid::parse_str(&assignment.assignment_id) { + Ok(value) => value, + Err(error) => { + error!( + assignment_id = %assignment.assignment_id, + error = %error, + "bad assignment_id from control plane" + ); + continue; + } + }; + + let desired_policy_id = match Uuid::parse_str(&assignment.desired_policy_id) { + Ok(value) => value, + Err(error) => { + error!( + assignment_id = %assignment.assignment_id, + error = %error, + "bad desired_policy_id from control plane" + ); + continue; + } + }; + + { + let mut applied = self.applied.lock().await; + applied.record_applied_policy( + &assignment, + policy.allowed_zones.iter().cloned(), + ); + } + + self.sync_allowed_comms().await; + + applied_reports.push(AppliedReport { + assignment_id, + actual_zone_version: assignment.desired_zone_version, + actual_policy_id: desired_policy_id, + }); + } + Err(error) => { + error!(zone_id = %assignment.zone_id, error = %error, "apply failed"); + + if let Ok(assignment_id) = Uuid::parse_str(&assignment.assignment_id) { + failed_reports.push(FailedReport { + assignment_id, + error_json: serde_json::json!({ + "message": error.to_string(), + "zone_id": assignment.zone_id, + }), + }); + } + } + } + } + + if !applied_reports.is_empty() || !failed_reports.is_empty() { + if let Err(error) = self + .cp + .report_assignment_state(applied_reports, failed_reports) + .await + { + warn!("report_assignment_state failed: {error}"); + } + } + } + + async fn apply_remove(&self, zone_id_str: &str) { + let zone_id = match Uuid::parse_str(zone_id_str) { + Ok(value) => value, + Err(_) => { + warn!("remove with invalid zone_id: {zone_id_str}"); + return; + } + }; + + let zone_name = { + let applied = self.applied.lock().await; + applied.zone_name_for(&zone_id) + }; + let Some(zone_name) = zone_name else { + debug!(zone_id = %zone_id, "remove for unknown zone ignored"); + return; + }; + + match remove_zone_local( + &self.registry, + &self.ebpf, + &self.health, + &zone_name, + true, + ) + .await + { + Ok(result) if result.ok => { + let mut applied = self.applied.lock().await; + applied.record_removed(&zone_id); + drop(applied); + self.sync_allowed_comms().await; + info!(zone_name, "zone removed"); + } + Ok(result) => { + warn!(zone_name, message = result.message, "zone remove rejected"); + } + Err(error) => { + warn!(zone_name, error = %error, "zone remove failed"); + } + } + } + + async fn apply_one_upsert( + &self, + assignment: &ZoneAssignment, + ) -> Result { + let policy = parse_policy_json(&assignment.policy_json)?; + + register_zone_local( + &self.registry, + &self.ebpf, + &self.health, + &assignment.zone_name, + Some(policy.clone()), + ) + .await + .map_err(|error| anyhow::anyhow!("registry/BPF apply failed: {error}"))?; + + Ok(policy) + } + + async fn sync_allowed_comms(&self) { + let (to_allow, to_deny) = { + let applied = self.applied.lock().await; + applied.diff_comm_pairs() + }; + + for (zone_a, zone_b) in to_deny { + match deny_comm_local(&self.registry, &self.ebpf, &zone_a, &zone_b).await { + Ok(()) => { + let mut applied = self.applied.lock().await; + applied.record_denied_pair(&zone_a, &zone_b); + } + Err(error) => { + warn!(zone_a, zone_b, error = %error, "failed to deny comm pair"); + } + } + } + + for (zone_a, zone_b) in to_allow { + match allow_comm_local(&self.registry, &self.ebpf, &zone_a, &zone_b).await { + Ok(()) => { + let mut applied = self.applied.lock().await; + applied.record_allowed_pair(&zone_a, &zone_b); + } + Err(error) => { + warn!(zone_a, zone_b, error = %error, "failed to allow comm pair"); + } + } + } + } +} + +#[derive(Debug, Clone, Default, Deserialize)] +struct CompactPolicy { + #[serde(default)] + host_paths: Vec, + #[serde(default)] + allowed_zones: Vec, + #[serde(default)] + allow_ptrace: bool, + #[serde(default)] + zone_type: Option, +} + +fn parse_policy_json(policy_json: &str) -> Result { + if let Ok(full_policy) = serde_json::from_str::(policy_json) { + return Ok(CoreZonePolicyInput { + host_paths: full_policy.filesystem.host_paths, + allowed_zones: full_policy.network.allowed_zones, + allow_ptrace: full_policy + .capabilities + .allowed + .iter() + .any(|capability| capability == "CAP_SYS_PTRACE"), + zone_type: ZoneType::NonGlobal, + }); + } + + let compact = serde_json::from_str::(policy_json) + .map_err(|error| anyhow::anyhow!("policy parse: {error}"))?; + let zone_type = match compact.zone_type.as_deref() { + Some("privileged") => ZoneType::Privileged, + _ => ZoneType::NonGlobal, + }; + + Ok(CoreZonePolicyInput { + host_paths: compact.host_paths, + allowed_zones: compact.allowed_zones, + allow_ptrace: compact.allow_ptrace, + zone_type, + }) +} diff --git a/syva-core/src/cp_reconcile/state.rs b/syva-core/src/cp_reconcile/state.rs new file mode 100644 index 0000000..23af20b --- /dev/null +++ b/syva-core/src/cp_reconcile/state.rs @@ -0,0 +1,281 @@ +//! Tracks what this node has applied in CP mode. In-memory only. + +#![allow(dead_code)] + +use std::collections::{BTreeSet, HashMap, HashSet}; +use syva_cp_client::ZoneAssignment; +use uuid::Uuid; + +type CommPair = (String, String); + +pub struct AppliedState { + by_zone_id: HashMap, + active_comm_pairs: HashSet, +} + +pub struct AppliedEntry { + pub zone_name: String, + pub policy_id: Uuid, + pub zone_version: i64, + pub allowed_zones: BTreeSet, +} + +impl AppliedState { + pub fn new() -> Self { + Self { + by_zone_id: HashMap::new(), + active_comm_pairs: HashSet::new(), + } + } + + pub fn diff_against_snapshot( + &self, + desired: &[ZoneAssignment], + ) -> (Vec, Vec) { + let mut to_apply = Vec::new(); + let mut desired_ids = HashSet::new(); + + for assignment in desired { + let Ok(zone_id) = Uuid::parse_str(&assignment.zone_id) else { + continue; + }; + let Ok(policy_id) = Uuid::parse_str(&assignment.desired_policy_id) else { + continue; + }; + + desired_ids.insert(zone_id); + + let changed = match self.by_zone_id.get(&zone_id) { + None => true, + Some(existing) => { + existing.policy_id != policy_id + || existing.zone_version != assignment.desired_zone_version + } + }; + + if changed { + to_apply.push(assignment.clone()); + } + } + + let to_remove = self + .by_zone_id + .keys() + .filter(|zone_id| !desired_ids.contains(zone_id)) + .map(Uuid::to_string) + .collect(); + + (to_apply, to_remove) + } + + pub fn record_applied(&mut self, assignment: &ZoneAssignment) { + let Ok(zone_id) = Uuid::parse_str(&assignment.zone_id) else { + return; + }; + let Ok(policy_id) = Uuid::parse_str(&assignment.desired_policy_id) else { + return; + }; + + self.by_zone_id.insert( + zone_id, + AppliedEntry { + zone_name: assignment.zone_name.clone(), + policy_id, + zone_version: assignment.desired_zone_version, + allowed_zones: BTreeSet::new(), + }, + ); + } + + pub fn record_applied_policy( + &mut self, + assignment: &ZoneAssignment, + allowed_zones: impl IntoIterator, + ) { + let Ok(zone_id) = Uuid::parse_str(&assignment.zone_id) else { + return; + }; + let Ok(policy_id) = Uuid::parse_str(&assignment.desired_policy_id) else { + return; + }; + + self.by_zone_id.insert( + zone_id, + AppliedEntry { + zone_name: assignment.zone_name.clone(), + policy_id, + zone_version: assignment.desired_zone_version, + allowed_zones: allowed_zones.into_iter().collect(), + }, + ); + } + + pub fn record_removed(&mut self, zone_id: &Uuid) { + self.by_zone_id.remove(zone_id); + let active_zone_names: HashSet = self + .by_zone_id + .values() + .map(|entry| entry.zone_name.clone()) + .collect(); + self.active_comm_pairs.retain(|(zone_a, zone_b)| { + active_zone_names.contains(zone_a) && active_zone_names.contains(zone_b) + }); + } + + pub fn zone_name_for(&self, zone_id: &Uuid) -> Option { + self.by_zone_id.get(zone_id).map(|entry| entry.zone_name.clone()) + } + + pub fn diff_comm_pairs(&self) -> (Vec, Vec) { + let desired = self.desired_comm_pairs(); + + let to_allow = desired + .difference(&self.active_comm_pairs) + .cloned() + .collect(); + let to_deny = self + .active_comm_pairs + .difference(&desired) + .cloned() + .collect(); + + (to_allow, to_deny) + } + + pub fn record_allowed_pair(&mut self, zone_a: &str, zone_b: &str) { + self.active_comm_pairs + .insert(canonical_pair(zone_a, zone_b)); + } + + pub fn record_denied_pair(&mut self, zone_a: &str, zone_b: &str) { + self.active_comm_pairs + .remove(&canonical_pair(zone_a, zone_b)); + } + + fn desired_comm_pairs(&self) -> HashSet { + let mut pairs = HashSet::new(); + + for entry in self.by_zone_id.values() { + for peer in &entry.allowed_zones { + let Some(peer_entry) = self + .by_zone_id + .values() + .find(|candidate| candidate.zone_name == *peer) + else { + continue; + }; + + if peer_entry.allowed_zones.contains(&entry.zone_name) { + pairs.insert(canonical_pair(&entry.zone_name, peer)); + } + } + } + + pairs + } +} + +fn canonical_pair(zone_a: &str, zone_b: &str) -> CommPair { + if zone_a <= zone_b { + (zone_a.to_string(), zone_b.to_string()) + } else { + (zone_b.to_string(), zone_a.to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn assignment(zone_id: Uuid, policy_id: Uuid, version: i64, name: &str) -> ZoneAssignment { + ZoneAssignment { + assignment_id: Uuid::new_v4().to_string(), + zone_id: zone_id.to_string(), + zone_name: name.to_string(), + desired_zone_version: version, + desired_policy_id: policy_id.to_string(), + desired_policy_version: version, + policy_json: "{}".to_string(), + team_id: Uuid::new_v4().to_string(), + } + } + + #[test] + fn empty_state_treats_all_as_new() { + let state = AppliedState::new(); + let zone_id = Uuid::new_v4(); + let policy_id = Uuid::new_v4(); + + let desired = vec![assignment(zone_id, policy_id, 1, "z1")]; + let (apply, remove) = state.diff_against_snapshot(&desired); + + assert_eq!(apply.len(), 1); + assert!(remove.is_empty()); + } + + #[test] + fn unchanged_snapshot_is_noop() { + let mut state = AppliedState::new(); + let zone_id = Uuid::new_v4(); + let policy_id = Uuid::new_v4(); + let current = assignment(zone_id, policy_id, 1, "z1"); + state.record_applied(¤t); + + let (apply, remove) = state.diff_against_snapshot(std::slice::from_ref(¤t)); + + assert!(apply.is_empty()); + assert!(remove.is_empty()); + } + + #[test] + fn policy_change_triggers_apply() { + let mut state = AppliedState::new(); + let zone_id = Uuid::new_v4(); + let policy_id = Uuid::new_v4(); + let new_policy_id = Uuid::new_v4(); + state.record_applied(&assignment(zone_id, policy_id, 1, "z1")); + + let desired = vec![assignment(zone_id, new_policy_id, 2, "z1")]; + let (apply, remove) = state.diff_against_snapshot(&desired); + + assert_eq!(apply.len(), 1); + assert!(remove.is_empty()); + } + + #[test] + fn missing_from_snapshot_triggers_remove() { + let mut state = AppliedState::new(); + let zone_id = Uuid::new_v4(); + let policy_id = Uuid::new_v4(); + state.record_applied(&assignment(zone_id, policy_id, 1, "z1")); + + let (apply, remove) = state.diff_against_snapshot(&[]); + + assert!(apply.is_empty()); + assert_eq!(remove, vec![zone_id.to_string()]); + } + + #[test] + fn mixed_scenario() { + let mut state = AppliedState::new(); + let zone_one = Uuid::new_v4(); + let zone_two = Uuid::new_v4(); + let zone_three = Uuid::new_v4(); + let policy_id = Uuid::new_v4(); + let new_policy_id = Uuid::new_v4(); + + state.record_applied(&assignment(zone_one, policy_id, 1, "z1")); + state.record_applied(&assignment(zone_two, policy_id, 1, "z2")); + + let desired = vec![ + assignment(zone_one, policy_id, 1, "z1"), + assignment(zone_two, new_policy_id, 2, "z2"), + assignment(zone_three, policy_id, 1, "z3"), + ]; + + let (apply, remove) = state.diff_against_snapshot(&desired); + + assert_eq!(apply.len(), 2); + assert!(remove.is_empty()); + } +} diff --git a/syva-core/src/ebpf.rs b/syva-core/src/ebpf.rs index b76f9d8..12be7c7 100644 --- a/syva-core/src/ebpf.rs +++ b/syva-core/src/ebpf.rs @@ -3,7 +3,6 @@ //! Loads and attaches the 5 LSM programs. Provides typed //! wrappers for BPF map operations (zone membership, policy, comms). -use std::collections::HashMap; use std::fs; use std::os::unix::fs::MetadataExt; use std::path::{Path, PathBuf}; @@ -11,7 +10,7 @@ use std::path::{Path, PathBuf}; use aya::maps::HashMap as AyaHashMap; use aya::maps::RingBuf; use aya::programs::Lsm; -use aya::{Bpf, BpfLoader, Btf}; +use aya::{Btf, Ebpf, EbpfLoader}; use crate::types::{ZonePolicy, ZoneType, NetworkMode}; use syva_ebpf_common::{ ZoneInfoKernel, ZonePolicyKernel, ZoneCommKey, SelfTestResult, SelfTestInodeResult, @@ -44,7 +43,7 @@ const MAP_NAMES: &[&str] = &[ /// eBPF manager for the standalone enforce agent. pub struct EnforceEbpf { - bpf: Bpf, + bpf: Ebpf, pin_path: PathBuf, } @@ -83,7 +82,7 @@ impl EnforceEbpf { let obj_data = fs::read(&obj_path) .map_err(|e| anyhow::anyhow!("failed to read eBPF object {}: {e}", obj_path.display()))?; - let mut loader = BpfLoader::new(); + let mut loader = EbpfLoader::new(); loader.btf(Some(&btf)).map_pin_path(&pin_path); for (name, val) in &offsets { @@ -394,19 +393,18 @@ impl EnforceEbpf { /// Remove all ZONE_ALLOWED_COMMS entries involving a zone. pub fn remove_zone_comms(&mut self, zone_id: u32) -> anyhow::Result<()> { - let map: AyaHashMap<_, ZoneCommKey, u8> = AyaHashMap::try_from( - self.bpf.map_mut("ZONE_ALLOWED_COMMS") - .ok_or_else(|| anyhow::anyhow!("ZONE_ALLOWED_COMMS map not found"))?, - )?; - // Collect keys to remove (can't mutate while iterating). - let keys_to_remove: Vec = map - .keys() - .filter_map(|k| k.ok()) - .filter(|k| k.src_zone == zone_id || k.dst_zone == zone_id) - .collect(); - - drop(map); + let keys_to_remove: Vec = { + let map: AyaHashMap<_, ZoneCommKey, u8> = AyaHashMap::try_from( + self.bpf.map_mut("ZONE_ALLOWED_COMMS") + .ok_or_else(|| anyhow::anyhow!("ZONE_ALLOWED_COMMS map not found"))?, + )?; + + map.keys() + .filter_map(|k| k.ok()) + .filter(|k| k.src_zone == zone_id || k.dst_zone == zone_id) + .collect() + }; let mut map: AyaHashMap<_, ZoneCommKey, u8> = AyaHashMap::try_from( self.bpf.map_mut("ZONE_ALLOWED_COMMS") @@ -436,19 +434,18 @@ impl EnforceEbpf { /// Remove all INODE_ZONE_MAP entries for a given zone. pub fn remove_zone_inodes(&mut self, zone_id: u32) -> anyhow::Result<()> { - let map: AyaHashMap<_, u64, u32> = AyaHashMap::try_from( - self.bpf.map_mut("INODE_ZONE_MAP") - .ok_or_else(|| anyhow::anyhow!("INODE_ZONE_MAP map not found"))?, - )?; - - let keys_to_remove: Vec = map - .iter() - .filter_map(|r| r.ok()) - .filter(|(_, v)| *v == zone_id) - .map(|(k, _)| k) - .collect(); - - drop(map); + let keys_to_remove: Vec = { + let map: AyaHashMap<_, u64, u32> = AyaHashMap::try_from( + self.bpf.map_mut("INODE_ZONE_MAP") + .ok_or_else(|| anyhow::anyhow!("INODE_ZONE_MAP map not found"))?, + )?; + + map.iter() + .filter_map(|r| r.ok()) + .filter(|(_, v)| *v == zone_id) + .map(|(k, _)| k) + .collect() + }; let mut map: AyaHashMap<_, u64, u32> = AyaHashMap::try_from( self.bpf.map_mut("INODE_ZONE_MAP") diff --git a/syva-core/src/events.rs b/syva-core/src/events.rs index fda88db..7ca661d 100644 --- a/syva-core/src/events.rs +++ b/syva-core/src/events.rs @@ -18,8 +18,10 @@ pub const HOOK_NAMES: [&str; 7] = [ /// Maximum events to drain per tick. Prevents the blocking task from /// holding the thread for too long under high deny rates. +#[allow(dead_code)] const MAX_EVENTS_PER_TICK: usize = 1000; +#[allow(dead_code)] pub fn spawn_event_reader(ring_buf: RingBuf, cancel: CancellationToken) { tokio::spawn(async move { let mut ring_buf = ring_buf; diff --git a/syva-core/src/health.rs b/syva-core/src/health.rs index 8189a93..458cca6 100644 --- a/syva-core/src/health.rs +++ b/syva-core/src/health.rs @@ -100,6 +100,8 @@ async fn metrics(State(state): State) -> String { /// Pure function — testable without HTTP server. pub fn render_metrics(health: &HealthState) -> String { + type HookMetric = (&'static str, &'static str, fn(&HookCounters) -> u64); + let mut out = String::with_capacity(2048); out.push_str("# HELP syva_up Whether syva is attached and enforcing.\n"); @@ -121,7 +123,7 @@ pub fn render_metrics(health: &HealthState) -> String { // Per-hook enforcement counters — always emitted (default 0 before first // snapshot) so Prometheus series exist from the start. let hook_names = &crate::events::HOOK_NAMES; - let metrics: [(&str, &str, fn(&HookCounters) -> u64); 4] = [ + let metrics: [HookMetric; 4] = [ ("syva_hook_allow_total", "Events allowed per hook", |c: &HookCounters| c.allow), ("syva_hook_deny_total", "Events denied per hook", |c: &HookCounters| c.deny), ("syva_hook_error_total", "Hook errors (fail-open) per hook", |c: &HookCounters| c.error), diff --git a/syva-core/src/main.rs b/syva-core/src/main.rs index 9b10f3f..1c86a54 100644 --- a/syva-core/src/main.rs +++ b/syva-core/src/main.rs @@ -9,6 +9,7 @@ //! syva-core status # Show enforcement counters //! syva-core events --follow # Stream deny events +mod cp_reconcile; mod btf; mod ebpf; mod events; @@ -22,6 +23,7 @@ use std::sync::Arc; use std::time::Instant; use clap::{Parser, Subcommand}; +use syva_cp_client::CpClientConfig; use tokio::sync::{Mutex, RwLock}; use tonic::transport::Server; use tracing_subscriber::EnvFilter; @@ -45,6 +47,44 @@ struct Cli { /// Unix socket path for the gRPC server. #[arg(long, default_value = "/run/syva/syva-core.sock")] socket_path: String, + + /// Optional syva-cp endpoint. When set, syva-core registers with + /// syva-cp and consumes assignment updates in addition to its local + /// adapter-facing gRPC surface. + #[arg(long, env = "SYVA_CP_ENDPOINT")] + cp_endpoint: Option, + + /// Hostname to report to syva-cp. Defaults to the system hostname. + #[arg(long, env = "SYVA_NODE_NAME")] + node_name: Option, + + /// Path to the stable node fingerprint file (for example /etc/machine-id). + #[arg( + long, + env = "SYVA_NODE_FINGERPRINT_PATH", + default_value = "/etc/machine-id" + )] + fingerprint_path: PathBuf, + + /// Optional cluster identifier to report at node registration time. + #[arg(long, env = "SYVA_CLUSTER_ID")] + cluster_id: Option, + + /// Node labels to send at registration. Format: key=value,key=value + #[arg(long, env = "SYVA_NODE_LABELS", value_delimiter = ',')] + node_labels: Vec, + + /// Path where the registered node ID is persisted across restarts. + #[arg( + long, + env = "SYVA_NODE_ID_PATH", + default_value = "/var/lib/syva/node-id" + )] + node_id_path: PathBuf, + + /// Heartbeat interval in seconds for CP mode. + #[arg(long, env = "SYVA_HEARTBEAT_SECS", default_value = "15")] + heartbeat_secs: u64, } #[derive(Subcommand)] @@ -81,16 +121,12 @@ async fn main() -> anyhow::Result<()> { match cli.command { Some(Commands::Status) => cmd_status().await, Some(Commands::Events { follow, format }) => cmd_events(follow, format).await, - None => cmd_run(cli.ebpf_obj, cli.health_port, cli.socket_path).await, + None => cmd_run(cli).await, } } /// Main enforcement engine loop. -async fn cmd_run( - ebpf_obj: Option, - health_port: u16, - socket_path: String, -) -> anyhow::Result<()> { +async fn cmd_run(config: Cli) -> anyhow::Result<()> { tracing::info!("syva-core starting"); let start_time = Instant::now(); @@ -100,10 +136,10 @@ async fn cmd_run( let health_state = health::SharedHealth::new(RwLock::new( health::HealthState::new(), )); - health::spawn_health_server(health_port, health_state.clone()).await?; + health::spawn_health_server(config.health_port, health_state.clone()).await?; // Load eBPF programs (but do NOT attach — no enforcement yet). - let mut mgr = ebpf::EnforceEbpf::load(ebpf_obj.as_deref())?; + let mut mgr = ebpf::EnforceEbpf::load(config.ebpf_obj.as_deref())?; // Do not take the ENFORCEMENT_EVENTS ring buffer here — it is single-consumer // and the gRPC WatchEvents RPC needs to acquire it. Event logging for the core @@ -132,7 +168,7 @@ async fn cmd_run( tracing::info!("startup complete — enforcement active, awaiting gRPC connections"); // Ensure parent directory for socket path exists. - if let Some(parent) = std::path::Path::new(&socket_path).parent() { + if let Some(parent) = std::path::Path::new(&config.socket_path).parent() { if !parent.exists() { std::fs::create_dir_all(parent) .map_err(|e| anyhow::anyhow!("failed to create socket directory {}: {e}", parent.display()))?; @@ -140,9 +176,9 @@ async fn cmd_run( } // Remove stale socket file if it exists. - if std::path::Path::new(&socket_path).exists() { - std::fs::remove_file(&socket_path) - .map_err(|e| anyhow::anyhow!("failed to remove stale socket {}: {e}", socket_path))?; + if std::path::Path::new(&config.socket_path).exists() { + std::fs::remove_file(&config.socket_path) + .map_err(|e| anyhow::anyhow!("failed to remove stale socket {}: {e}", config.socket_path))?; } // Build gRPC service. @@ -154,11 +190,11 @@ async fn cmd_run( }; // Start gRPC server on Unix socket. - let uds = tokio::net::UnixListener::bind(&socket_path) - .map_err(|e| anyhow::anyhow!("failed to bind Unix socket {}: {e}", socket_path))?; + let uds = tokio::net::UnixListener::bind(&config.socket_path) + .map_err(|e| anyhow::anyhow!("failed to bind Unix socket {}: {e}", config.socket_path))?; let uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds); - tracing::info!(socket = socket_path, "gRPC server listening"); + tracing::info!(socket = config.socket_path, "gRPC server listening"); // Shutdown on SIGINT (ctrl-c) or SIGTERM (Kubernetes pod termination). let mut sigterm = tokio::signal::unix::signal( @@ -216,6 +252,54 @@ async fn cmd_run( } }); + if let Some(endpoint) = config.cp_endpoint.as_ref() { + let node_name = config + .node_name + .clone() + .or_else(system_hostname) + .unwrap_or_else(|| "unknown".to_string()); + let cp_config = CpClientConfig { + endpoint: endpoint.clone(), + node_name, + cluster_id: config.cluster_id.clone(), + fingerprint: read_fingerprint(&config.fingerprint_path), + labels: parse_labels(&config.node_labels), + node_id_path: config.node_id_path.clone(), + heartbeat_interval: std::time::Duration::from_secs(config.heartbeat_secs), + ..Default::default() + }; + + match syva_cp_client::CpClient::connect(cp_config).await { + Ok(cp) => match cp.register().await { + Ok(registration) => { + tracing::info!(node_id = %registration.node_id, "registered with syva-cp"); + + let heartbeat_handle = cp.spawn_heartbeat_loop(); + let reconciler = cp_reconcile::Reconciler::new( + cp, + registry.clone(), + ebpf.clone(), + health_state.clone(), + ); + tokio::spawn(async move { + let _heartbeat = heartbeat_handle; + reconciler.run().await; + }); + + tracing::info!("syva-core CP mode active"); + } + Err(error) => { + tracing::error!("could not register with syva-cp at startup: {error}"); + tracing::warn!("syva-core running in degraded mode (local adapters only)"); + } + }, + Err(error) => { + tracing::error!("could not connect to syva-cp at startup: {error}"); + tracing::warn!("syva-core running in degraded mode (local adapters only)"); + } + } + } + // Run gRPC server with graceful shutdown. let grpc_server = Server::builder() .add_service(SyvaCoreServer::new(service)) @@ -239,6 +323,44 @@ async fn cmd_run( Ok(()) } +pub(crate) fn parse_labels(entries: &[String]) -> std::collections::BTreeMap { + let mut labels = std::collections::BTreeMap::new(); + + for entry in entries { + if let Some((key, value)) = entry.split_once('=') { + labels.insert(key.to_string(), value.to_string()); + } + } + + labels +} + +pub(crate) fn read_fingerprint(path: &std::path::Path) -> Option { + std::fs::read_to_string(path) + .ok() + .map(|contents| contents.trim().to_string()) + .filter(|contents| !contents.is_empty()) +} + +fn system_hostname() -> Option { + let mut buffer = [0_u8; 256]; + let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) }; + if result != 0 { + return None; + } + + let length = buffer + .iter() + .position(|byte| *byte == 0) + .unwrap_or(buffer.len()); + let hostname = String::from_utf8_lossy(&buffer[..length]).into_owned(); + if hostname.is_empty() { + None + } else { + Some(hostname) + } +} + async fn cmd_status() -> anyhow::Result<()> { use aya::maps::PerCpuArray; use syva_ebpf_common::EnforcementCounters; diff --git a/syva-core/src/rpc/mod.rs b/syva-core/src/rpc/mod.rs index 539714f..11f3728 100644 --- a/syva-core/src/rpc/mod.rs +++ b/syva-core/src/rpc/mod.rs @@ -30,7 +30,7 @@ use syva_proto::syva_core::{ use crate::ebpf::EnforceEbpf; use crate::events::HOOK_NAMES; -use crate::health::{HookCounters, SharedHealth}; +use crate::health::SharedHealth; use crate::types::ZoneType; use crate::zone::{ZoneRegistry, ZoneState, ZoneTransition}; @@ -49,6 +49,192 @@ pub struct SyvaCoreService { pub start_time: Instant, } +#[derive(Debug, Clone)] +pub(crate) struct CoreZonePolicyInput { + pub host_paths: Vec, + pub allowed_zones: Vec, + pub allow_ptrace: bool, + pub zone_type: ZoneType, +} + +#[derive(Debug, Clone)] +pub(crate) struct RemoveZoneResult { + pub ok: bool, + pub message: String, +} + +pub(crate) async fn register_zone_local( + registry: &Arc>, + ebpf: &Arc>, + health: &SharedHealth, + zone_name: &str, + policy: Option, +) -> anyhow::Result { + let mut registry = registry.write().await; + let zone_id = registry.register_zone(zone_name)?; + + if let Some(policy) = policy { + let mut ebpf = ebpf.lock().await; + + let mut internal_policy = crate::types::ZonePolicy::default(); + if policy.allow_ptrace { + internal_policy + .capabilities + .allowed + .push("CAP_SYS_PTRACE".to_string()); + } + internal_policy.filesystem.host_paths = policy.host_paths.clone(); + internal_policy.network.allowed_zones = policy.allowed_zones; + + ebpf.set_zone_policy(zone_id, &internal_policy)?; + + if !policy.host_paths.is_empty() { + match ebpf.populate_inode_zone_map(zone_id, &policy.host_paths) { + Ok(inodes) => { + tracing::info!(zone = zone_name, zone_id, inodes, "inode map populated"); + } + Err(error) => { + tracing::warn!(zone = zone_name, %error, "inode map population failed"); + } + } + } + + let _ = policy.zone_type; + } + + let mut health = health.write().await; + health.zones_loaded = registry.zone_count(); + + Ok(zone_id) +} + +pub(crate) async fn remove_zone_local( + registry: &Arc>, + ebpf: &Arc>, + health: &SharedHealth, + zone_name: &str, + drain: bool, +) -> anyhow::Result { + let mut registry = registry.write().await; + + if drain { + registry.mark_draining(zone_name)?; + + let refcount = registry.refcount(zone_name); + if refcount == 0 { + let zone_id = registry.unregister_zone(zone_name)?; + + let mut ebpf = ebpf.lock().await; + let _ = ebpf.remove_zone_policy(zone_id); + let _ = ebpf.remove_zone_comms(zone_id); + let _ = ebpf.remove_zone_inodes(zone_id); + + tracing::info!(zone = zone_name, zone_id, "zone drained and removed"); + } else { + tracing::info!(zone = zone_name, refcount, "zone marked as draining"); + } + + let mut health = health.write().await; + health.zones_loaded = registry.zone_count(); + + return Ok(RemoveZoneResult { + ok: true, + message: String::new(), + }); + } + + let refcount = registry.refcount(zone_name); + if refcount > 0 { + return Ok(RemoveZoneResult { + ok: false, + message: format!( + "zone '{}' has {} active containers — use drain=true or detach them first", + zone_name, refcount + ), + }); + } + + let zone_id = registry.unregister_zone(zone_name)?; + + let mut ebpf = ebpf.lock().await; + let _ = ebpf.remove_zone_policy(zone_id); + let _ = ebpf.remove_zone_comms(zone_id); + let _ = ebpf.remove_zone_inodes(zone_id); + + let mut health = health.write().await; + health.zones_loaded = registry.zone_count(); + + tracing::info!(zone = zone_name, zone_id, "zone removed"); + Ok(RemoveZoneResult { + ok: true, + message: String::new(), + }) +} + +pub(crate) async fn allow_comm_local( + registry: &Arc>, + ebpf: &Arc>, + zone_a: &str, + zone_b: &str, +) -> anyhow::Result<()> { + let (zone_a_id, zone_b_id) = { + let registry = registry.read().await; + let zone_a_id = registry + .zone_id(zone_a) + .ok_or_else(|| anyhow::anyhow!("zone '{}' not registered", zone_a))?; + let zone_b_id = registry + .zone_id(zone_b) + .ok_or_else(|| anyhow::anyhow!("zone '{}' not registered", zone_b))?; + (zone_a_id, zone_b_id) + }; + + { + let mut ebpf = ebpf.lock().await; + ebpf.set_zone_allowed_comms(zone_a_id, zone_b_id)?; + } + + { + let mut registry = registry.write().await; + if registry.zone_id(zone_a).is_some() && registry.zone_id(zone_b).is_some() { + registry.record_allow_comm(zone_a, zone_b); + } + } + + tracing::info!(zone_a, zone_b, "cross-zone comm allowed"); + Ok(()) +} + +pub(crate) async fn deny_comm_local( + registry: &Arc>, + ebpf: &Arc>, + zone_a: &str, + zone_b: &str, +) -> anyhow::Result<()> { + let (zone_a_id, zone_b_id) = { + let registry = registry.read().await; + let zone_a_id = registry + .zone_id(zone_a) + .ok_or_else(|| anyhow::anyhow!("zone '{}' not registered", zone_a))?; + let zone_b_id = registry + .zone_id(zone_b) + .ok_or_else(|| anyhow::anyhow!("zone '{}' not registered", zone_b))?; + (zone_a_id, zone_b_id) + }; + + { + let mut ebpf = ebpf.lock().await; + ebpf.remove_zone_comm_pair(zone_a_id, zone_b_id)?; + } + + { + let mut registry = registry.write().await; + registry.record_deny_comm(zone_a, zone_b); + } + + tracing::info!(zone_a, zone_b, "cross-zone comm denied"); + Ok(()) +} + #[tonic::async_trait] impl SyvaCore for SyvaCoreService { async fn register_zone( @@ -62,52 +248,24 @@ impl SyvaCore for SyvaCoreService { return Err(Status::invalid_argument("zone_name is required")); } - let mut registry = self.registry.write().await; - let zone_id = registry.register_zone(&zone_name) - .map_err(|e| Status::internal(format!("failed to register zone: {e}")))?; - - // If a policy was provided, set it in BPF maps. - if let Some(proto_policy) = req.policy { - let mut ebpf = self.ebpf.lock().await; - - // Convert proto policy to internal ZonePolicy for BPF map population. - let allow_ptrace = proto_policy.allow_ptrace; - let zone_type = match proto_policy.zone_type { + let policy = req.policy.map(|proto_policy| CoreZonePolicyInput { + host_paths: proto_policy.host_paths, + allowed_zones: proto_policy.allowed_zones, + allow_ptrace: proto_policy.allow_ptrace, + zone_type: match proto_policy.zone_type { 1 => ZoneType::Privileged, _ => ZoneType::NonGlobal, - }; - - // Build a minimal internal ZonePolicy for set_zone_policy. - let mut policy = crate::types::ZonePolicy::default(); - if allow_ptrace { - policy.capabilities.allowed.push("CAP_SYS_PTRACE".to_string()); - } - policy.filesystem.host_paths = proto_policy.host_paths.clone(); - policy.network.allowed_zones = proto_policy.allowed_zones; - - ebpf.set_zone_policy(zone_id, &policy) - .map_err(|e| Status::internal(format!("failed to set zone policy: {e}")))?; - - // Populate inode map for host_paths. - if !proto_policy.host_paths.is_empty() { - match ebpf.populate_inode_zone_map(zone_id, &proto_policy.host_paths) { - Ok(n) => { - tracing::info!(zone = zone_name, zone_id, inodes = n, "inode map populated"); - } - Err(e) => { - tracing::warn!(zone = zone_name, %e, "inode map population failed"); - } - } - } - - drop(ebpf); - } - - // Update health state. - { - let mut h = self.health.write().await; - h.zones_loaded = registry.zone_count(); - } + }, + }); + let zone_id = register_zone_local( + &self.registry, + &self.ebpf, + &self.health, + &zone_name, + policy, + ) + .await + .map_err(|e| Status::internal(format!("failed to register zone: {e}")))?; tracing::info!(zone = zone_name, zone_id, "zone registered via gRPC"); Ok(Response::new(RegisterZoneResponse { zone_id })) @@ -123,67 +281,20 @@ impl SyvaCore for SyvaCoreService { if zone_name.is_empty() { return Err(Status::invalid_argument("zone_name is required")); } - - let mut registry = self.registry.write().await; - - if req.drain { - // Mark as draining — enforcement continues for existing containers. - registry.mark_draining(&zone_name) - .map_err(|e| Status::not_found(format!("{e}")))?; - - // If refcount is already 0, clean up immediately. - let refcount = registry.refcount(&zone_name); - if refcount == 0 { - let zone_id = registry.unregister_zone(&zone_name) - .map_err(|e| Status::internal(format!("{e}")))?; - - let mut ebpf = self.ebpf.lock().await; - let _ = ebpf.remove_zone_policy(zone_id); - let _ = ebpf.remove_zone_comms(zone_id); - let _ = ebpf.remove_zone_inodes(zone_id); - - tracing::info!(zone = zone_name, zone_id, "zone drained and removed"); - } else { - tracing::info!(zone = zone_name, refcount, "zone marked as draining"); - } - - let mut h = self.health.write().await; - h.zones_loaded = registry.zone_count(); - - Ok(Response::new(RemoveZoneResponse { - ok: true, - message: String::new(), - })) - } else { - // Immediate removal — reject if containers are attached. - let refcount = registry.refcount(&zone_name); - if refcount > 0 { - return Ok(Response::new(RemoveZoneResponse { - ok: false, - message: format!( - "zone '{}' has {} active containers — use drain=true or detach them first", - zone_name, refcount - ), - })); - } - - let zone_id = registry.unregister_zone(&zone_name) - .map_err(|e| Status::not_found(format!("{e}")))?; - - let mut ebpf = self.ebpf.lock().await; - let _ = ebpf.remove_zone_policy(zone_id); - let _ = ebpf.remove_zone_comms(zone_id); - let _ = ebpf.remove_zone_inodes(zone_id); - - let mut h = self.health.write().await; - h.zones_loaded = registry.zone_count(); - - tracing::info!(zone = zone_name, zone_id, "zone removed via gRPC"); - Ok(Response::new(RemoveZoneResponse { - ok: true, - message: String::new(), - })) - } + let result = remove_zone_local( + &self.registry, + &self.ebpf, + &self.health, + &zone_name, + req.drain, + ) + .await + .map_err(|e| Status::not_found(format!("{e}")))?; + + Ok(Response::new(RemoveZoneResponse { + ok: result.ok, + message: result.message, + })) } async fn attach_container( @@ -304,27 +415,9 @@ impl SyvaCore for SyvaCoreService { // mirror entry. If a zone is unregistered in the window, the BPF // entry will be cleared by remove_zone_comms, and the mirror // re-check below skips the stale record. - let (zone_a_id, zone_b_id) = { - let registry = self.registry.read().await; - let a = registry.zone_id(&req.zone_a) - .ok_or_else(|| Status::not_found(format!("zone '{}' not registered", req.zone_a)))?; - let b = registry.zone_id(&req.zone_b) - .ok_or_else(|| Status::not_found(format!("zone '{}' not registered", req.zone_b)))?; - (a, b) - }; - - { - let mut ebpf = self.ebpf.lock().await; - ebpf.set_zone_allowed_comms(zone_a_id, zone_b_id) - .map_err(|e| Status::internal(format!("failed to set allowed comms: {e}")))?; - } - - { - let mut registry = self.registry.write().await; - if registry.zone_id(&req.zone_a).is_some() && registry.zone_id(&req.zone_b).is_some() { - registry.record_allow_comm(&req.zone_a, &req.zone_b); - } - } + allow_comm_local(&self.registry, &self.ebpf, &req.zone_a, &req.zone_b) + .await + .map_err(|e| Status::internal(format!("failed to set allowed comms: {e}")))?; tracing::info!(zone_a = req.zone_a, zone_b = req.zone_b, "cross-zone comm allowed via gRPC"); Ok(Response::new(AllowCommResponse { ok: true })) @@ -343,31 +436,12 @@ impl SyvaCore for SyvaCoreService { // Same locking shape as allow_comm — resolve IDs under a read-lock, // release before the eBPF await, take a brief write-lock for the // mirror update afterwards. - let (zone_a_id, zone_b_id) = { - let registry = self.registry.read().await; - let a = registry.zone_id(&req.zone_a) - .ok_or_else(|| Status::not_found(format!("zone '{}' not registered", req.zone_a)))?; - let b = registry.zone_id(&req.zone_b) - .ok_or_else(|| Status::not_found(format!("zone '{}' not registered", req.zone_b)))?; - (a, b) - }; - - { - // Remove only the requested pair in both directions, preserving - // any unrelated comm entries involving either zone. - let mut ebpf = self.ebpf.lock().await; - ebpf.remove_zone_comm_pair(zone_a_id, zone_b_id) - .map_err(|e| Status::internal(format!( - "failed to remove comms between '{}' and '{}': {e}", req.zone_a, req.zone_b - )))?; - } - - { - // record_deny_comm is a HashSet remove — safe even if the zone - // was unregistered meanwhile (already wiped by unregister_zone). - let mut registry = self.registry.write().await; - registry.record_deny_comm(&req.zone_a, &req.zone_b); - } + deny_comm_local(&self.registry, &self.ebpf, &req.zone_a, &req.zone_b) + .await + .map_err(|e| Status::internal(format!( + "failed to remove comms between '{}' and '{}': {e}", + req.zone_a, req.zone_b + )))?; tracing::info!(zone_a = req.zone_a, zone_b = req.zone_b, "cross-zone comm denied via gRPC"); Ok(Response::new(DenyCommResponse { ok: true })) diff --git a/syva-core/src/types.rs b/syva-core/src/types.rs index db3d9b6..966b901 100644 --- a/syva-core/src/types.rs +++ b/syva-core/src/types.rs @@ -104,7 +104,7 @@ pub struct ZoneMetadata { } /// Declarative policy defining what a zone can do. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] #[serde(deny_unknown_fields)] pub struct ZonePolicy { /// Optional metadata section — parsed but not used for enforcement. @@ -118,20 +118,6 @@ pub struct ZonePolicy { pub syscalls: SyscallPolicy, } -impl Default for ZonePolicy { - fn default() -> Self { - Self { - zone: ZoneMetadata::default(), - capabilities: CapabilityPolicy::default(), - resources: ResourcePolicy::default(), - network: NetworkPolicy::default(), - filesystem: FilesystemPolicy::default(), - devices: DevicePolicy::default(), - syscalls: SyscallPolicy::default(), - } - } -} - impl ZonePolicy { /// Validate policy values against kernel constraints. pub fn validate(&self, zone_name: &str) -> anyhow::Result<()> { diff --git a/syva-core/src/zone.rs b/syva-core/src/zone.rs index 2d7af37..6e64a10 100644 --- a/syva-core/src/zone.rs +++ b/syva-core/src/zone.rs @@ -402,7 +402,7 @@ mod tests { #[test] fn duplicate_container_id_returns_error() { let mut reg = ZoneRegistry::new(); - reg.register_zone("frontend"); + let _ = reg.register_zone("frontend"); reg.add_container("c1", "frontend", 1000).unwrap(); // Second add with same container_id must fail. @@ -416,8 +416,8 @@ mod tests { #[test] fn duplicate_container_id_different_zone_returns_error() { let mut reg = ZoneRegistry::new(); - reg.register_zone("frontend"); - reg.register_zone("database"); + let _ = reg.register_zone("frontend"); + let _ = reg.register_zone("database"); reg.add_container("c1", "frontend", 1000).unwrap(); // Same container_id in a different zone must also fail. diff --git a/syva-cp-client/Cargo.toml b/syva-cp-client/Cargo.toml new file mode 100644 index 0000000..2b2d48e --- /dev/null +++ b/syva-cp-client/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "syva-cp-client" +version.workspace = true +edition.workspace = true +license.workspace = true +description = "Client library for the syva control plane" + +[dependencies] +syva-proto = { path = "../syva-proto" } + +tokio = { workspace = true, features = ["full"] } +tokio-stream = { workspace = true } +tonic = { workspace = true } +prost = { workspace = true } +prost-types = { workspace = true } + +tracing = { workspace = true } +anyhow = { workspace = true } +thiserror = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +uuid = { version = "1", features = ["v4", "serde"] } +chrono = { version = "0.4", features = ["serde"] } +ulid = "1.1" + +[dev-dependencies] +syva-cp = { path = "../syva-cp" } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tempfile = "3" +sqlx = { version = "0.8", features = ["runtime-tokio", "postgres"] } diff --git a/syva-cp-client/README.md b/syva-cp-client/README.md new file mode 100644 index 0000000..ddeb5bd --- /dev/null +++ b/syva-cp-client/README.md @@ -0,0 +1,12 @@ +# syva-cp-client + +Client library for the syva control plane. Used by `syva-core` to: + +- register as a node +- send heartbeats +- subscribe to assignment updates +- report applied or failed assignment state + +This crate is a thin typed wrapper over the tonic-generated client from +`syva-proto`. It does not implement reconcile logic. That belongs to the +consumer. diff --git a/syva-cp-client/src/client.rs b/syva-cp-client/src/client.rs new file mode 100644 index 0000000..8d160bb --- /dev/null +++ b/syva-cp-client/src/client.rs @@ -0,0 +1,246 @@ +use crate::error::CpClientError; +use std::collections::BTreeMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::RwLock; +use tonic::transport::{Channel, Endpoint}; +use tracing::{debug, info, warn}; +use uuid::Uuid; + +use syva_proto::syva_control::v1::assignment_service_client::AssignmentServiceClient; +use syva_proto::syva_control::v1::node_service_client::NodeServiceClient; +use syva_proto::syva_control::v1::{ + AppliedAssignment, FailedAssignment, HeartbeatRequest, NodeAssignmentUpdate, + RegisterNodeRequest, ReportAssignmentStateRequest, SubscribeAssignmentsRequest, +}; + +#[derive(Debug, Clone)] +pub struct NodeRegistration { + pub node_id: Uuid, + pub node_name: String, +} + +#[derive(Debug, Clone)] +pub struct CpClientConfig { + pub endpoint: String, + pub node_name: String, + pub cluster_id: Option, + pub fingerprint: Option, + pub labels: BTreeMap, + pub node_id_path: PathBuf, + pub heartbeat_interval: Duration, + pub connect_timeout: Duration, +} + +impl Default for CpClientConfig { + fn default() -> Self { + Self { + endpoint: "http://127.0.0.1:50051".to_string(), + node_name: "unknown".to_string(), + cluster_id: None, + fingerprint: None, + labels: BTreeMap::new(), + node_id_path: PathBuf::from("/var/lib/syva/node-id"), + heartbeat_interval: Duration::from_secs(15), + connect_timeout: Duration::from_secs(5), + } + } +} + +#[derive(Clone)] +pub struct CpClient { + config: CpClientConfig, + channel: Channel, + registration: Arc>>, +} + +impl CpClient { + pub async fn connect(config: CpClientConfig) -> Result { + let endpoint = Endpoint::from_shared(config.endpoint.clone()) + .map_err(|error| CpClientError::InvalidEndpoint(error.to_string()))? + .connect_timeout(config.connect_timeout) + .tcp_keepalive(Some(Duration::from_secs(30))); + + let channel = endpoint.connect().await?; + info!(endpoint = %config.endpoint, "connected to syva-cp"); + + Ok(Self { + config, + channel, + registration: Arc::new(RwLock::new(None)), + }) + } + + pub async fn register(&self) -> Result { + let proposed_id = self.read_or_generate_node_id().await; + + let mut client = NodeServiceClient::new(self.channel.clone()); + let request = RegisterNodeRequest { + node_name: self.config.node_name.clone(), + fingerprint: self.config.fingerprint.clone().unwrap_or_default(), + cluster_id: self.config.cluster_id.clone().unwrap_or_default(), + labels: self.config.labels.clone().into_iter().collect(), + capabilities_json: "{}".to_string(), + proposed_id: proposed_id.to_string(), + }; + let response = client.register_node(request).await?.into_inner(); + + let assigned_id = Uuid::parse_str(&response.assigned_id) + .map_err(|error| CpClientError::Internal(format!("bad assigned_id: {error}")))?; + + self.persist_node_id(assigned_id).await; + + let registration = NodeRegistration { + node_id: assigned_id, + node_name: self.config.node_name.clone(), + }; + + *self.registration.write().await = Some(registration.clone()); + info!(node_id = %assigned_id, "registered with syva-cp"); + Ok(registration) + } + + pub async fn heartbeat(&self, status_hint: &str) -> Result<(), CpClientError> { + let node_id = self.require_registered().await?; + let mut client = NodeServiceClient::new(self.channel.clone()); + client + .heartbeat(HeartbeatRequest { + node_id: node_id.to_string(), + status_hint: status_hint.to_string(), + }) + .await?; + Ok(()) + } + + pub fn spawn_heartbeat_loop(&self) -> tokio::task::JoinHandle<()> { + let client = self.clone(); + let interval = client.config.heartbeat_interval; + + tokio::spawn(async move { + let mut ticker = tokio::time::interval(interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + loop { + ticker.tick().await; + match client.heartbeat("online").await { + Ok(()) => debug!("heartbeat sent"), + Err(error) => warn!("heartbeat failed: {error}"), + } + } + }) + } + + pub async fn subscribe_assignments( + &self, + ) -> Result, CpClientError> { + let node_id = self.require_registered().await?; + let mut client = AssignmentServiceClient::new(self.channel.clone()); + let stream = client + .subscribe_assignments(SubscribeAssignmentsRequest { + node_id: node_id.to_string(), + }) + .await? + .into_inner(); + Ok(stream) + } + + pub async fn report_assignment_state( + &self, + applied: Vec, + failed: Vec, + ) -> Result<(usize, usize), CpClientError> { + let node_id = self.require_registered().await?; + let mut client = AssignmentServiceClient::new(self.channel.clone()); + let response = client + .report_assignment_state(ReportAssignmentStateRequest { + node_id: node_id.to_string(), + applied: applied + .into_iter() + .map(|item| AppliedAssignment { + assignment_id: item.assignment_id.to_string(), + actual_zone_version: item.actual_zone_version, + actual_policy_id: item.actual_policy_id.to_string(), + }) + .collect(), + failed: failed + .into_iter() + .map(|item| FailedAssignment { + assignment_id: item.assignment_id.to_string(), + error_json: item.error_json.to_string(), + }) + .collect(), + }) + .await? + .into_inner(); + + Ok(( + response.accepted_count as usize, + response.rejected_count as usize, + )) + } + + async fn require_registered(&self) -> Result { + self.registration + .read() + .await + .as_ref() + .map(|registration| registration.node_id) + .ok_or(CpClientError::NotRegistered) + } + + async fn read_or_generate_node_id(&self) -> Uuid { + match tokio::fs::read_to_string(&self.config.node_id_path).await { + Ok(text) => { + let trimmed = text.trim(); + if let Ok(uuid) = Uuid::parse_str(trimmed) { + return uuid; + } + } + Err(error) => { + debug!( + path = %self.config.node_id_path.display(), + error = %error, + "node id file not readable; generating a fresh id" + ); + } + } + + Uuid::from_u128(ulid::Ulid::new().0) + } + + async fn persist_node_id(&self, id: Uuid) { + if let Some(parent) = self.config.node_id_path.parent() { + if let Err(error) = tokio::fs::create_dir_all(parent).await { + warn!( + path = %parent.display(), + error = %error, + "could not create node-id directory" + ); + return; + } + } + + if let Err(error) = tokio::fs::write(&self.config.node_id_path, id.to_string()).await { + warn!( + path = %self.config.node_id_path.display(), + error = %error, + "could not persist node_id" + ); + } + } +} + +#[derive(Debug, Clone)] +pub struct AppliedReport { + pub assignment_id: Uuid, + pub actual_zone_version: i64, + pub actual_policy_id: Uuid, +} + +#[derive(Debug, Clone)] +pub struct FailedReport { + pub assignment_id: Uuid, + pub error_json: serde_json::Value, +} + diff --git a/syva-cp-client/src/error.rs b/syva-cp-client/src/error.rs new file mode 100644 index 0000000..9644b57 --- /dev/null +++ b/syva-cp-client/src/error.rs @@ -0,0 +1,23 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum CpClientError { + #[error("connection failed: {0}")] + Connection(#[from] tonic::transport::Error), + + #[error("grpc error: {0}")] + Grpc(#[from] tonic::Status), + + #[error("invalid endpoint: {0}")] + InvalidEndpoint(String), + + #[error("not registered: call register() before this operation")] + NotRegistered, + + #[error("serialization error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("internal: {0}")] + Internal(String), +} + diff --git a/syva-cp-client/src/lib.rs b/syva-cp-client/src/lib.rs new file mode 100644 index 0000000..8dc50eb --- /dev/null +++ b/syva-cp-client/src/lib.rs @@ -0,0 +1,22 @@ +//! Client library for the syva control plane. +//! +//! Used by `syva-core` (and eventually the adapters) to connect to syva-cp, +//! register as a node, subscribe to assignments, and report state back. +//! +//! This crate is a thin, typed wrapper over the tonic-generated client in +//! `syva-proto`. It does not add behavior beyond: +//! +//! - connection bootstrap +//! - typed error conversion +//! - background heartbeat task +//! - assignment stream exposed as tonic's typed stream +//! +//! It does NOT implement the reconcile loop. That belongs in the consumer. + +pub mod client; +pub mod error; + +pub use client::{AppliedReport, CpClient, CpClientConfig, FailedReport, NodeRegistration}; +pub use error::CpClientError; +pub use syva_proto::syva_control::v1::{NodeAssignmentUpdate, ZoneAssignment}; + diff --git a/syva-cp-client/tests/integration_end_to_end.rs b/syva-cp-client/tests/integration_end_to_end.rs new file mode 100644 index 0000000..ea2ec9e --- /dev/null +++ b/syva-cp-client/tests/integration_end_to_end.rs @@ -0,0 +1,234 @@ +use std::collections::BTreeMap; +use std::str::FromStr; +use std::time::Duration; + +use sqlx::Connection; +use syva_cp::config::Config as CpConfig; +use syva_cp_client::{AppliedReport, CpClient, CpClientConfig}; +use syva_proto::syva_control::v1::assignment_service_client::AssignmentServiceClient; +use syva_proto::syva_control::v1::team_service_client::TeamServiceClient; +use syva_proto::syva_control::v1::zone_service_client::ZoneServiceClient; +use syva_proto::syva_control::v1::{ + node_assignment_update::Kind as UpdateKind, CreateTeamRequest, CreateZoneRequest, + ListAssignmentsRequest, +}; +use tempfile::TempDir; +use tokio::net::TcpListener; +use uuid::Uuid; + +async fn free_local_addr() -> std::net::SocketAddr { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind ephemeral listener"); + let addr = listener.local_addr().expect("read local addr"); + drop(listener); + addr +} + +async fn wait_for_grpc(endpoint: &str) { + let target = endpoint.trim_start_matches("http://"); + + for _ in 0..50 { + if tokio::net::TcpStream::connect(target).await.is_ok() { + return; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + + panic!("timed out waiting for syva-cp at {endpoint}"); +} + +fn split_database_url(database_url: &str) -> (&str, &str, String) { + let (prefix, database_and_suffix) = database_url + .rsplit_once('/') + .expect("database url must contain a database name"); + match database_and_suffix.split_once('?') { + Some((database_name, query)) => (prefix, database_name, format!("?{query}")), + None => (prefix, database_and_suffix, String::new()), + } +} + +fn replace_database_name(database_url: &str, database_name: &str) -> String { + let (prefix, _, suffix) = split_database_url(database_url); + format!("{prefix}/{database_name}{suffix}") +} + +async fn create_isolated_database(base_database_url: &str) -> (String, String) { + let isolated_name = format!("syva_cp_client_{}", Uuid::new_v4().simple()); + let admin_url = replace_database_name(base_database_url, "postgres"); + + let mut connection = sqlx::PgConnection::connect(&admin_url) + .await + .expect("connect postgres admin database"); + sqlx::query(&format!("CREATE DATABASE {isolated_name}")) + .execute(&mut connection) + .await + .expect("create isolated database"); + + ( + replace_database_name(base_database_url, &isolated_name), + isolated_name, + ) +} + +async fn drop_isolated_database(base_database_url: &str, isolated_name: &str) { + let admin_url = replace_database_name(base_database_url, "postgres"); + let mut connection = match sqlx::PgConnection::connect(&admin_url).await { + Ok(connection) => connection, + Err(_) => return, + }; + + let terminate = format!( + "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{isolated_name}'" + ); + let _ = sqlx::query(&terminate).execute(&mut connection).await; + let _ = sqlx::query(&format!("DROP DATABASE IF EXISTS {isolated_name}")) + .execute(&mut connection) + .await; +} + +#[tokio::test] +#[ignore = "requires postgres; run with --ignored"] +async fn end_to_end_register_subscribe_report() { + let base_database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + let (database_url, isolated_name) = create_isolated_database(&base_database_url).await; + let grpc_addr = free_local_addr().await; + let health_addr = free_local_addr().await; + let endpoint = format!("http://{grpc_addr}"); + + let cp_handle = tokio::spawn(async move { + syva_cp::run(CpConfig { + database_url, + grpc_addr, + health_addr, + db_max_connections: 16, + db_timeout_secs: 5, + }) + .await + }); + + wait_for_grpc(&endpoint).await; + + let test_result = async { + let tmp = TempDir::new().expect("tempdir"); + let node_id_path = tmp.path().join("node-id"); + + let suffix = Uuid::new_v4().simple().to_string(); + let mut labels = BTreeMap::new(); + labels.insert("session".to_string(), suffix.clone()); + let node_name = format!("test-node-{suffix}"); + let fingerprint = format!("fingerprint-{suffix}"); + + let cp = CpClient::connect(CpClientConfig { + endpoint: endpoint.clone(), + node_name, + fingerprint: Some(fingerprint), + labels, + node_id_path, + heartbeat_interval: Duration::from_secs(5), + connect_timeout: Duration::from_secs(2), + ..Default::default() + }) + .await?; + + let registration = cp.register().await?; + assert_ne!(registration.node_id, Uuid::nil()); + + cp.heartbeat("online").await?; + + let mut stream = cp.subscribe_assignments().await?; + let first = tokio::time::timeout(Duration::from_secs(5), stream.message()) + .await + .expect("subscribe timeout")? + .expect("stream closed"); + assert_eq!(first.kind, UpdateKind::FullSnapshot as i32); + + let team_name = format!("platform-{suffix}"); + let zone_name = format!("agents-{suffix}"); + + let mut team_client = TeamServiceClient::connect(endpoint.clone()).await?; + let team = team_client + .create_team(CreateTeamRequest { + name: team_name, + display_name: "Platform".to_string(), + }) + .await? + .into_inner() + .team + .expect("team response"); + + let mut zone_client = ZoneServiceClient::connect(endpoint.clone()).await?; + let create_zone = zone_client + .create_zone(CreateZoneRequest { + team_id: team.id.clone(), + name: zone_name.clone(), + display_name: "Agents".to_string(), + policy_json: "{\"allowed_zones\":[]}".to_string(), + summary_json: String::new(), + selector_json: format!("{{\"match_labels\":{{\"session\":\"{suffix}\"}}}}"), + metadata_json: String::new(), + }) + .await? + .into_inner(); + let created_zone = create_zone.zone.expect("zone response"); + assert_eq!(created_zone.name, zone_name); + + let update = tokio::time::timeout(Duration::from_secs(5), async { + loop { + let Some(message) = stream.message().await.expect("stream error") else { + panic!("assignment stream closed"); + }; + if message.kind == UpdateKind::Upsert as i32 { + break message; + } + } + }) + .await + .expect("waiting for upsert"); + + assert_eq!(update.assignments.len(), 1); + let assignment = &update.assignments[0]; + assert_eq!(assignment.zone_name, zone_name); + + let (accepted, rejected) = cp + .report_assignment_state( + vec![AppliedReport { + assignment_id: Uuid::from_str(&assignment.assignment_id) + .expect("assignment uuid"), + actual_zone_version: assignment.desired_zone_version, + actual_policy_id: Uuid::from_str(&assignment.desired_policy_id) + .expect("policy uuid"), + }], + Vec::new(), + ) + .await?; + assert_eq!(accepted, 1); + assert_eq!(rejected, 0); + + let mut assignment_client = AssignmentServiceClient::connect(endpoint.clone()).await?; + let list = assignment_client + .list_assignments(ListAssignmentsRequest { + zone_id: String::new(), + node_id: registration.node_id.to_string(), + status: String::new(), + limit: 10, + }) + .await? + .into_inner(); + + assert_eq!(list.assignments.len(), 1); + let reported = &list.assignments[0]; + assert_eq!(reported.status, "applied"); + assert_eq!(reported.actual_policy_id, assignment.desired_policy_id); + assert_eq!(reported.actual_zone_version, assignment.desired_zone_version); + + Ok::<(), anyhow::Error>(()) + } + .await; + + cp_handle.abort(); + let _ = cp_handle.await; + drop_isolated_database(&base_database_url, &isolated_name).await; + + test_result.expect("end-to-end assertion failure"); +}