From e2c9bf94f956103bddb9d32a24faac7af37e7be5 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Mon, 27 Apr 2026 01:34:32 +0200 Subject: [PATCH 1/2] migration: phase 2 adapters local core mode --- Cargo.lock | 16 ++ Cargo.toml | 1 + syva-adapter-api/Cargo.toml | 1 + syva-adapter-api/src/main.rs | 12 +- syva-adapter-api/src/routes.rs | 442 ++++++++++++++++++++++------- syva-adapter-file/Cargo.toml | 1 + syva-adapter-file/src/diff.rs | 88 ++++++ syva-adapter-file/src/lib.rs | 1 + syva-adapter-file/src/main.rs | 17 +- syva-adapter-file/src/run.rs | 189 +++++++++++- syva-adapter-file/src/translate.rs | 90 ++++++ syva-adapter-k8s/Cargo.toml | 1 + syva-adapter-k8s/src/main.rs | 12 +- syva-adapter-k8s/src/mapper.rs | 80 ++++++ syva-adapter-k8s/src/watcher.rs | 166 +++++++++-- syva-core-client/Cargo.toml | 13 + syva-core-client/src/lib.rs | 64 +++++ 17 files changed, 1052 insertions(+), 142 deletions(-) create mode 100644 syva-adapter-file/src/diff.rs create mode 100644 syva-core-client/Cargo.toml create mode 100644 syva-core-client/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 8ec8626..5206015 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3027,6 +3027,7 @@ dependencies = [ "clap", "serde", "serde_json", + "syva-core-client", "syva-cp-client", "tokio", "tonic", @@ -3043,6 +3044,7 @@ dependencies = [ "clap", "serde", "serde_json", + "syva-core-client", "syva-cp-client", "syva-ebpf-common", "tokio", @@ -3065,6 +3067,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "syva-core-client", "syva-cp-client", "tokio", "tonic", @@ -3099,6 +3102,19 @@ dependencies = [ "uuid", ] +[[package]] +name = "syva-core-client" +version = "0.2.0" +dependencies = [ + "hyper-util", + "syva-proto", + "thiserror 1.0.69", + "tokio", + "tonic", + "tower 0.4.13", + "tracing", +] + [[package]] name = "syva-cp" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index df0266f..5ba24b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ "syva-proto", + "syva-core-client", "syva-cp-client", "syva-core", "syva-cp", diff --git a/syva-adapter-api/Cargo.toml b/syva-adapter-api/Cargo.toml index c49d9b8..ae32768 100644 --- a/syva-adapter-api/Cargo.toml +++ b/syva-adapter-api/Cargo.toml @@ -8,6 +8,7 @@ name = "syva-api" path = "src/main.rs" [dependencies] +syva-core-client = { path = "../syva-core-client" } syva-cp-client = { path = "../syva-cp-client" } tokio = { workspace = true } tracing = { workspace = true } diff --git a/syva-adapter-api/src/main.rs b/syva-adapter-api/src/main.rs index 108852b..1e1ee36 100644 --- a/syva-adapter-api/src/main.rs +++ b/syva-adapter-api/src/main.rs @@ -3,6 +3,7 @@ mod routes; use anyhow::Result; use clap::Parser; use std::net::SocketAddr; +use std::path::PathBuf; use uuid::Uuid; #[derive(Parser, Debug)] @@ -13,12 +14,16 @@ struct Cli { listen: SocketAddr, /// syva-cp gRPC endpoint. - #[arg(long, env = "SYVA_CP_ENDPOINT")] - cp_endpoint: String, + #[arg(long, env = "SYVA_CP_ENDPOINT", conflicts_with = "core_socket")] + cp_endpoint: Option, + + /// Local syva-core Unix socket. + #[arg(long, env = "SYVA_CORE_SOCKET", conflicts_with = "cp_endpoint")] + core_socket: Option, /// Team UUID this proxy creates and updates zones in. #[arg(long, env = "SYVA_TEAM_ID")] - team_id: Uuid, + team_id: Option, } #[tokio::main] @@ -34,6 +39,7 @@ async fn main() -> Result<()> { routes::serve(routes::Config { listen: cli.listen, cp_endpoint: cli.cp_endpoint, + core_socket: cli.core_socket, team_id: cli.team_id, }) .await diff --git a/syva-adapter-api/src/routes.rs b/syva-adapter-api/src/routes.rs index 498c872..3ba38af 100644 --- a/syva-adapter-api/src/routes.rs +++ b/syva-adapter-api/src/routes.rs @@ -9,21 +9,36 @@ use axum::{ use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::net::SocketAddr; +use std::path::PathBuf; use std::time::Duration; +use syva_core_client::syva_core::{ + ListZonesRequest, RegisterZoneRequest, RemoveZoneRequest, ZonePolicy, ZoneSummary, +}; use syva_cp_client::{CpClient, CpClientConfig, CreateZoneArgs, DeleteZoneArgs, UpdateZoneArgs}; use tracing::warn; use uuid::Uuid; #[derive(Clone)] pub struct AppState { - cp: CpClient, - team_id: Uuid, + mode: ClientMode, +} + +#[derive(Clone)] +enum ClientMode { + Cp { + client: CpClient, + team_id: Uuid, + }, + Core { + client: syva_core_client::SyvaCoreClient, + }, } pub struct Config { pub listen: SocketAddr, - pub cp_endpoint: String, - pub team_id: Uuid, + pub cp_endpoint: Option, + pub core_socket: Option, + pub team_id: Option, } #[derive(Debug)] @@ -52,6 +67,34 @@ pub struct UpdateZoneBody { pub selector_json: Option, } +#[derive(Deserialize)] +#[serde(default, deny_unknown_fields)] +struct CorePolicyJson { + host_paths: Vec, + allowed_zones: Vec, + allow_ptrace: bool, + zone_type: CoreZoneType, +} + +impl Default for CorePolicyJson { + fn default() -> Self { + Self { + host_paths: Vec::new(), + allowed_zones: Vec::new(), + allow_ptrace: false, + zone_type: CoreZoneType::Standard, + } + } +} + +#[derive(Clone, Copy, Deserialize)] +#[serde(rename_all = "lowercase")] +enum CoreZoneType { + Standard, + Privileged, + Isolated, +} + #[derive(Deserialize)] pub struct ListZonesQuery { pub status: Option, @@ -85,11 +128,29 @@ pub struct HealthOut { } pub async fn serve(config: Config) -> Result<()> { - let cp = connect_with_retry(&config.cp_endpoint).await; - let app = router(AppState { - cp, - team_id: config.team_id, - }); + let state = match (&config.cp_endpoint, &config.core_socket) { + (Some(_), Some(_)) => { + anyhow::bail!("--cp-endpoint and --core-socket are mutually exclusive") + } + (None, None) => anyhow::bail!("exactly one of --cp-endpoint or --core-socket is required"), + (Some(endpoint), None) => { + let team_id = config + .team_id + .context("--team-id is required when using --cp-endpoint")?; + AppState { + mode: ClientMode::Cp { + client: connect_with_retry(endpoint).await, + team_id, + }, + } + } + (None, Some(socket_path)) => AppState { + mode: ClientMode::Core { + client: syva_core_client::connect_unix_socket_with_retry(socket_path.clone()).await, + }, + }, + }; + let app = router(state); let listener = tokio::net::TcpListener::bind(config.listen) .await @@ -103,7 +164,10 @@ pub async fn serve(config: Config) -> Result<()> { pub fn router(state: AppState) -> Router { Router::new() .route("/v1/zones", post(create_zone).get(list_zones)) - .route("/v1/zones/{name}", get(get_zone).put(update_zone).delete(delete_zone)) + .route( + "/v1/zones/{name}", + get(get_zone).put(update_zone).delete(delete_zone), + ) .route("/healthz", get(healthz)) .with_state(state) } @@ -138,59 +202,104 @@ pub async fn create_zone( State(state): State, Json(body): Json, ) -> Result<(StatusCode, Json), ApiError> { - let output = state - .cp - .create_zone(CreateZoneArgs { - team_id: state.team_id, - name: body.name, - display_name: body.display_name, - policy_json: body.policy_json, - summary_json: None, - selector_json: body.selector_json, - metadata_json: None, - }) - .await - .map_err(ApiError::from_cp)?; - - Ok(( - StatusCode::CREATED, - Json(CreateZoneOut { - zone_id: output.zone_id.to_string(), - policy_id: output.policy_id.to_string(), - version: output.version, - }), - )) + match state.mode { + ClientMode::Cp { client, team_id } => { + let output = client + .create_zone(CreateZoneArgs { + team_id, + name: body.name, + display_name: body.display_name, + policy_json: body.policy_json, + summary_json: None, + selector_json: body.selector_json, + metadata_json: None, + }) + .await + .map_err(ApiError::from_cp)?; + + Ok(( + StatusCode::CREATED, + Json(CreateZoneOut { + zone_id: output.zone_id.to_string(), + policy_id: output.policy_id.to_string(), + version: output.version, + }), + )) + } + ClientMode::Core { mut client } => { + let response = client + .register_zone(core_register_request(&body.name, body.policy_json)?) + .await + .map_err(ApiError::from_core)? + .into_inner(); + + Ok(( + StatusCode::CREATED, + Json(CreateZoneOut { + zone_id: response.zone_id.to_string(), + policy_id: String::new(), + version: 0, + }), + )) + } + } } pub async fn list_zones( State(state): State, Query(query): Query, ) -> Result>, ApiError> { - let zones = state - .cp - .list_zones( - state.team_id, - query.status.as_deref(), - query.limit.unwrap_or(100), - ) - .await - .map_err(ApiError::from_cp)?; - - Ok(Json(zones.into_iter().map(zone_to_out).collect())) + match state.mode { + ClientMode::Cp { client, team_id } => { + let zones = client + .list_zones(team_id, query.status.as_deref(), query.limit.unwrap_or(100)) + .await + .map_err(ApiError::from_cp)?; + Ok(Json(zones.into_iter().map(zone_to_out).collect())) + } + ClientMode::Core { mut client } => { + let mut zones = client + .list_zones(ListZonesRequest {}) + .await + .map_err(ApiError::from_core)? + .into_inner() + .zones; + if let Some(status) = query.status { + zones.retain(|zone| zone.state == status); + } + zones.truncate(query.limit.unwrap_or(100).max(0) as usize); + Ok(Json(zones.into_iter().map(core_zone_to_out).collect())) + } + } } pub async fn get_zone( State(state): State, Path(name): Path, ) -> Result, ApiError> { - let zone = state - .cp - .get_zone_by_name(state.team_id, &name) - .await - .map_err(ApiError::from_cp)? - .ok_or_else(|| ApiError::not_found(format!("zone '{name}' not found")))?; - - Ok(Json(zone_to_out(zone))) + match state.mode { + ClientMode::Cp { client, team_id } => { + let zone = client + .get_zone_by_name(team_id, &name) + .await + .map_err(ApiError::from_cp)? + .ok_or_else(|| ApiError::not_found(format!("zone '{name}' not found")))?; + Ok(Json(zone_to_out(zone))) + } + ClientMode::Core { mut client } => { + let zones = client + .list_zones(ListZonesRequest {}) + .await + .map_err(ApiError::from_core)? + .into_inner() + .zones; + let zone = zones + .into_iter() + .find(|zone| zone.name == name) + .ok_or_else(|| ApiError::not_found(format!("zone '{name}' not found")))?; + Ok(Json(core_zone_to_out(zone))) + } + } } pub async fn update_zone( @@ -198,57 +307,91 @@ pub async fn update_zone( Path(name): Path, Json(body): Json, ) -> Result, ApiError> { - let snapshot = state - .cp - .get_zone_by_name(state.team_id, &name) - .await - .map_err(ApiError::from_cp)? - .ok_or_else(|| ApiError::not_found(format!("zone '{name}' not found")))?; - - state - .cp - .update_zone(UpdateZoneArgs { - zone_id: snapshot.zone_id, - if_version: body.if_version, - policy_json: body.policy_json, - selector_json: body.selector_json, - metadata_json: None, - }) - .await - .map_err(ApiError::from_cp)?; - - let refreshed = state - .cp - .get_zone_by_name(state.team_id, &name) - .await - .map_err(ApiError::from_cp)? - .ok_or_else(|| ApiError::not_found(format!("zone '{name}' not found after update")))?; - - Ok(Json(zone_to_out(refreshed))) + match state.mode { + ClientMode::Cp { client, team_id } => { + let snapshot = client + .get_zone_by_name(team_id, &name) + .await + .map_err(ApiError::from_cp)? + .ok_or_else(|| ApiError::not_found(format!("zone '{name}' not found")))?; + + client + .update_zone(UpdateZoneArgs { + zone_id: snapshot.zone_id, + if_version: body.if_version, + policy_json: body.policy_json, + selector_json: body.selector_json, + metadata_json: None, + }) + .await + .map_err(ApiError::from_cp)?; + + let refreshed = client + .get_zone_by_name(team_id, &name) + .await + .map_err(ApiError::from_cp)? + .ok_or_else(|| { + ApiError::not_found(format!("zone '{name}' not found after update")) + })?; + Ok(Json(zone_to_out(refreshed))) + } + ClientMode::Core { mut client } => { + if let Some(policy_json) = body.policy_json { + client + .register_zone(core_register_request(&name, policy_json)?) + .await + .map_err(ApiError::from_core)?; + } + let zones = client + .list_zones(ListZonesRequest {}) + .await + .map_err(ApiError::from_core)? + .into_inner() + .zones; + let zone = zones + .into_iter() + .find(|zone| zone.name == name) + .ok_or_else(|| { + ApiError::not_found(format!("zone '{name}' not found after update")) + })?; + Ok(Json(core_zone_to_out(zone))) + } + } } pub async fn delete_zone( State(state): State, Path(name): Path, ) -> Result { - let snapshot = state - .cp - .get_zone_by_name(state.team_id, &name) - .await - .map_err(ApiError::from_cp)? - .ok_or_else(|| ApiError::not_found(format!("zone '{name}' not found")))?; - - state - .cp - .delete_zone(DeleteZoneArgs { - zone_id: snapshot.zone_id, - if_version: snapshot.version, - drain: true, - }) - .await - .map_err(ApiError::from_cp)?; - - Ok(StatusCode::NO_CONTENT) + match state.mode { + ClientMode::Cp { client, team_id } => { + let snapshot = client + .get_zone_by_name(team_id, &name) + .await + .map_err(ApiError::from_cp)? + .ok_or_else(|| ApiError::not_found(format!("zone '{name}' not found")))?; + + client + .delete_zone(DeleteZoneArgs { + zone_id: snapshot.zone_id, + if_version: snapshot.version, + drain: true, + }) + .await + .map_err(ApiError::from_cp)?; + Ok(StatusCode::NO_CONTENT) + } + ClientMode::Core { mut client } => { + client + .remove_zone(RemoveZoneRequest { + zone_name: name, + drain: true, + }) + .await + .map_err(ApiError::from_core)?; + Ok(StatusCode::NO_CONTENT) + } + } } pub async fn healthz() -> Json { @@ -270,6 +413,48 @@ fn zone_to_out(zone: syva_cp_client::ZoneSnapshot) -> ZoneOut { } } +fn core_zone_to_out(zone: ZoneSummary) -> ZoneOut { + ZoneOut { + zone_id: zone.zone_id.to_string(), + team_id: String::new(), + name: zone.name, + display_name: None, + status: zone.state, + version: 0, + current_policy_id: None, + current_policy_json: None, + selector_json: None, + metadata_json: None, + } +} + +fn core_register_request( + name: &str, + policy_json: JsonValue, +) -> Result { + // Local-core mode intentionally accepts only the fields represented by + // syva.core.v1.ZonePolicy; CP-only request fields are ignored by handlers. + let policy: CorePolicyJson = serde_json::from_value(policy_json).map_err(|error| ApiError { + status: StatusCode::BAD_REQUEST, + message: format!( + "local-core mode expects policy_json with host_paths, allowed_zones, allow_ptrace, and zone_type only: {error}" + ), + })?; + + Ok(RegisterZoneRequest { + zone_name: name.to_string(), + policy: Some(ZonePolicy { + host_paths: policy.host_paths, + allowed_zones: policy.allowed_zones, + allow_ptrace: policy.allow_ptrace, + zone_type: match policy.zone_type { + CoreZoneType::Privileged => 1, + CoreZoneType::Standard | CoreZoneType::Isolated => 0, + }, + }), + }) +} + impl ApiError { fn not_found(message: String) -> Self { Self { @@ -298,13 +483,29 @@ impl ApiError { syva_cp_client::CpClientError::NotRegistered => StatusCode::INTERNAL_SERVER_ERROR, }; - Self { status, message: error.to_string() } + Self { + status, + message: error.to_string(), + } + } + + fn from_core(error: impl std::fmt::Display) -> Self { + Self { + status: StatusCode::BAD_GATEWAY, + message: error.to_string(), + } } } impl IntoResponse for ApiError { fn into_response(self) -> Response { - (self.status, Json(ErrorBody { error: self.message })).into_response() + ( + self.status, + Json(ErrorBody { + error: self.message, + }), + ) + .into_response() } } @@ -324,7 +525,10 @@ mod tests { assert_eq!(body.name, "web"); assert_eq!(body.display_name.as_deref(), Some("Web")); assert_eq!(body.policy_json["host_paths"], serde_json::json!(["/data"])); - assert_eq!(body.selector_json.unwrap()["all_nodes"], serde_json::json!(true)); + assert_eq!( + body.selector_json.unwrap()["all_nodes"], + serde_json::json!(true) + ); } #[test] @@ -332,6 +536,44 @@ mod tests { let json = r#"{"if_version":7,"policy_json":{"allow_ptrace":true}}"#; let body: UpdateZoneBody = serde_json::from_str(json).unwrap(); assert_eq!(body.if_version, 7); - assert_eq!(body.policy_json.unwrap()["allow_ptrace"], serde_json::json!(true)); + assert_eq!( + body.policy_json.unwrap()["allow_ptrace"], + serde_json::json!(true) + ); + } + + #[test] + fn core_register_request_maps_narrow_local_policy_json() { + let request = core_register_request( + "web", + serde_json::json!({ + "host_paths": ["/data"], + "allowed_zones": ["db"], + "allow_ptrace": true, + "zone_type": "privileged" + }), + ) + .expect("request"); + + let policy = request.policy.expect("policy"); + assert_eq!(request.zone_name, "web"); + assert_eq!(policy.host_paths, vec!["/data"]); + assert_eq!(policy.allowed_zones, vec!["db"]); + assert!(policy.allow_ptrace); + assert_eq!(policy.zone_type, 1); + } + + #[test] + fn core_register_request_rejects_cp_only_policy_json_fields() { + let error = core_register_request( + "web", + serde_json::json!({ + "host_paths": [], + "selector_json": {"all_nodes": true} + }), + ) + .expect_err("unknown fields should be rejected in local mode"); + + assert_eq!(error.status, StatusCode::BAD_REQUEST); } } diff --git a/syva-adapter-file/Cargo.toml b/syva-adapter-file/Cargo.toml index 467baf0..70bd606 100644 --- a/syva-adapter-file/Cargo.toml +++ b/syva-adapter-file/Cargo.toml @@ -12,6 +12,7 @@ name = "syva-file" path = "src/main.rs" [dependencies] +syva-core-client = { path = "../syva-core-client" } syva-cp-client = { path = "../syva-cp-client" } tokio = { workspace = true } tracing = { workspace = true } diff --git a/syva-adapter-file/src/diff.rs b/syva-adapter-file/src/diff.rs new file mode 100644 index 0000000..c46b46e --- /dev/null +++ b/syva-adapter-file/src/diff.rs @@ -0,0 +1,88 @@ +use std::collections::{BTreeSet, HashMap}; + +use crate::policy::FilePolicy; +use syva_core_client::syva_core::ZoneSummary; + +#[derive(Debug, Default, PartialEq, Eq)] +pub struct CoreDiff { + pub create: Vec, + pub update: Vec, + pub remove: Vec, +} + +pub fn diff_against_core( + desired: &HashMap, + applied: &[ZoneSummary], +) -> CoreDiff { + let desired_names: BTreeSet<&str> = desired.keys().map(String::as_str).collect(); + let applied_names: BTreeSet<&str> = applied.iter().map(|zone| zone.name.as_str()).collect(); + + let create = desired_names + .difference(&applied_names) + .map(|name| (*name).to_string()) + .collect(); + let update = desired_names + .intersection(&applied_names) + .map(|name| (*name).to_string()) + .collect(); + let remove = applied_names + .difference(&desired_names) + .map(|name| (*name).to_string()) + .collect(); + + CoreDiff { + create, + update, + remove, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn diffs_desired_against_core_snapshot() { + let mut desired = HashMap::new(); + desired.insert( + "web".to_string(), + FilePolicy { + display_name: None, + selector: None, + policy: Default::default(), + }, + ); + desired.insert( + "api".to_string(), + FilePolicy { + display_name: None, + selector: None, + policy: Default::default(), + }, + ); + + let applied = vec![ + ZoneSummary { + name: "web".to_string(), + zone_id: 1, + state: "active".to_string(), + containers_active: 1, + }, + ZoneSummary { + name: "old".to_string(), + zone_id: 2, + state: "pending".to_string(), + containers_active: 0, + }, + ]; + + assert_eq!( + diff_against_core(&desired, &applied), + CoreDiff { + create: vec!["api".to_string()], + update: vec!["web".to_string()], + remove: vec!["old".to_string()], + } + ); + } +} diff --git a/syva-adapter-file/src/lib.rs b/syva-adapter-file/src/lib.rs index f345e10..a0edef5 100644 --- a/syva-adapter-file/src/lib.rs +++ b/syva-adapter-file/src/lib.rs @@ -1,3 +1,4 @@ +pub mod diff; pub mod policy; pub mod run; pub mod translate; diff --git a/syva-adapter-file/src/main.rs b/syva-adapter-file/src/main.rs index 442a91c..1a5044a 100644 --- a/syva-adapter-file/src/main.rs +++ b/syva-adapter-file/src/main.rs @@ -8,16 +8,24 @@ use uuid::Uuid; struct Cli { /// Directory containing TOML zone policies. One file per zone. /// Filename (without .toml) becomes the zone name. - #[arg(long, env = "SYVA_FILE_POLICY_DIR", default_value = "/etc/syva/policies")] + #[arg( + long, + env = "SYVA_FILE_POLICY_DIR", + default_value = "/etc/syva/policies" + )] policy_dir: PathBuf, /// syva-cp gRPC endpoint. - #[arg(long, env = "SYVA_CP_ENDPOINT")] - cp_endpoint: String, + #[arg(long, env = "SYVA_CP_ENDPOINT", conflicts_with = "core_socket")] + cp_endpoint: Option, + + /// Local syva-core Unix socket. + #[arg(long, env = "SYVA_CORE_SOCKET", conflicts_with = "cp_endpoint")] + core_socket: Option, /// Team UUID this adapter manages zones for. #[arg(long, env = "SYVA_TEAM_ID")] - team_id: Uuid, + team_id: Option, /// Reconcile interval in seconds. #[arg(long, env = "SYVA_RECONCILE_SECS", default_value = "5")] @@ -51,6 +59,7 @@ async fn main() -> Result<()> { syva_file::run::run(syva_file::run::Config { policy_dir: cli.policy_dir, cp_endpoint: cli.cp_endpoint, + core_socket: cli.core_socket, team_id: cli.team_id, reconcile_interval: std::time::Duration::from_secs(cli.reconcile_secs), }) diff --git a/syva-adapter-file/src/run.rs b/syva-adapter-file/src/run.rs index af3effb..9616fcf 100644 --- a/syva-adapter-file/src/run.rs +++ b/syva-adapter-file/src/run.rs @@ -1,26 +1,52 @@ use crate::policy::{load_policies_from_dir, FilePolicy}; -use crate::translate::{policy_to_create_args, policy_to_update_args}; +use crate::translate::{ + policy_to_core_register, policy_to_core_update, policy_to_create_args, policy_to_update_args, +}; use anyhow::{Context, Result}; use std::collections::HashMap; use std::path::PathBuf; use std::time::Duration; +use syva_core_client::syva_core::{ + AllowCommRequest, ListZonesRequest, RegisterHostPathRequest, RemoveZoneRequest, +}; use syva_cp_client::{CpClient, CpClientConfig, DeleteZoneArgs, ZoneSnapshot}; use tracing::{debug, info, warn}; use uuid::Uuid; pub struct Config { pub policy_dir: PathBuf, - pub cp_endpoint: String, - pub team_id: Uuid, + pub cp_endpoint: Option, + pub core_socket: Option, + pub team_id: Option, pub reconcile_interval: Duration, } pub async fn run(config: Config) -> Result<()> { - let cp = connect_with_retry(&config.cp_endpoint).await; + match (&config.cp_endpoint, &config.core_socket) { + (Some(_), Some(_)) => { + anyhow::bail!("--cp-endpoint and --core-socket are mutually exclusive") + } + (None, None) => anyhow::bail!("exactly one of --cp-endpoint or --core-socket is required"), + (Some(_), None) if config.team_id.is_none() => { + anyhow::bail!("--team-id is required when using --cp-endpoint") + } + _ => {} + } + + match (&config.cp_endpoint, &config.core_socket) { + (Some(endpoint), None) => run_cp_mode(endpoint, &config).await, + (None, Some(socket_path)) => run_core_mode(socket_path.clone(), &config).await, + _ => unreachable!("validated above"), + } +} + +async fn run_cp_mode(endpoint: &str, config: &Config) -> Result<()> { + let cp = connect_with_retry(endpoint).await; + let team_id = config.team_id.context("missing team_id")?; info!( policy_dir = %config.policy_dir.display(), - team_id = %config.team_id, + team_id = %team_id, "syva-file starting" ); info!( @@ -33,7 +59,47 @@ pub async fn run(config: Config) -> Result<()> { loop { tokio::select! { _ = ticker.tick() => { - match reconcile_once(&cp, &config).await { + match reconcile_once_cp(&cp, config, team_id).await { + Ok(stats) if stats.changed > 0 => { + info!( + created = stats.created, + updated = stats.updated, + deleted = stats.deleted, + "reconcile done" + ); + } + Ok(_) => debug!("reconcile done, no changes"), + Err(error) => warn!("reconcile failed: {error:#}"), + } + } + result = tokio::signal::ctrl_c() => { + result.context("wait for ctrl-c")?; + info!("received shutdown signal"); + return Ok(()); + } + } + } +} + +async fn run_core_mode(socket_path: PathBuf, config: &Config) -> Result<()> { + let mut core = syva_core_client::connect_unix_socket_with_retry(socket_path.clone()).await; + + info!( + policy_dir = %config.policy_dir.display(), + socket = %socket_path.display(), + "syva-file starting in local-core mode" + ); + info!( + "container watcher and container membership reconciliation are deferred until ContainerService is implemented" + ); + + let mut ticker = tokio::time::interval(config.reconcile_interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = ticker.tick() => { + match reconcile_once_core(&mut core, config).await { Ok(stats) if stats.changed > 0 => { info!( created = stats.created, @@ -89,19 +155,27 @@ async fn connect_with_retry(endpoint: &str) -> CpClient { } } -async fn reconcile_once(cp: &CpClient, config: &Config) -> Result { +async fn reconcile_once_cp( + cp: &CpClient, + config: &Config, + team_id: Uuid, +) -> Result { let on_disk = load_policies_from_dir(&config.policy_dir) .with_context(|| format!("load policies from {}", config.policy_dir.display()))?; - let in_cp = cp.list_zones(config.team_id, None, 500).await?; - let in_cp_by_name: HashMap = - in_cp.into_iter().map(|zone| (zone.name.clone(), zone)).collect(); + let in_cp = cp.list_zones(team_id, None, 500).await?; + let in_cp_by_name: HashMap = in_cp + .into_iter() + .map(|zone| (zone.name.clone(), zone)) + .collect(); let mut stats = ReconcileStats::default(); for (name, policy) in &on_disk { - match cp.get_zone_by_name(config.team_id, name).await? { - None => match cp.create_zone(policy_to_create_args(config.team_id, name, policy)?).await + match cp.get_zone_by_name(team_id, name).await? { + None => match cp + .create_zone(policy_to_create_args(team_id, name, policy)?) + .await { Ok(output) => { stats.created += 1; @@ -111,7 +185,8 @@ async fn reconcile_once(cp: &CpClient, config: &Config) -> Result warn!(zone = %name, error = %error, "create_zone failed"), }, Some(snapshot) => match policy_to_update_args(&snapshot, policy)? { - Some(args) => match update_zone_with_refresh(cp, config.team_id, name, policy, args).await { + Some(args) => match update_zone_with_refresh(cp, team_id, name, policy, args).await + { Ok(output) => { stats.updated += 1; stats.changed += 1; @@ -148,7 +223,7 @@ async fn reconcile_once(cp: &CpClient, config: &Config) -> Result { - match cp.get_zone_by_name(config.team_id, name).await? { + match cp.get_zone_by_name(team_id, name).await? { Some(refreshed) if refreshed.status != "deleted" => { match cp .delete_zone(DeleteZoneArgs { @@ -178,6 +253,92 @@ async fn reconcile_once(cp: &CpClient, config: &Config) -> Result Result { + let on_disk = load_policies_from_dir(&config.policy_dir) + .with_context(|| format!("load policies from {}", config.policy_dir.display()))?; + let in_core = core + .list_zones(ListZonesRequest {}) + .await? + .into_inner() + .zones; + let diff = crate::diff::diff_against_core(&on_disk, &in_core); + + let mut stats = ReconcileStats::default(); + + for name in diff.create { + let Some(policy) = on_disk.get(&name) else { + continue; + }; + apply_core_register(core, &name, policy, true).await?; + stats.created += 1; + stats.changed += 1; + info!(zone = %name, "zone registered in local core"); + } + + for name in diff.update { + let Some(policy) = on_disk.get(&name) else { + continue; + }; + apply_core_register(core, &name, policy, false).await?; + stats.updated += 1; + stats.changed += 1; + debug!(zone = %name, "zone refreshed in local core"); + } + + for name in diff.remove { + core.remove_zone(RemoveZoneRequest { + zone_name: name.clone(), + drain: true, + }) + .await?; + stats.deleted += 1; + stats.changed += 1; + info!(zone = %name, "zone removal requested in local core"); + } + + Ok(stats) +} + +async fn apply_core_register( + core: &mut syva_core_client::SyvaCoreClient, + name: &str, + policy: &FilePolicy, + create: bool, +) -> Result<()> { + let request = if create { + policy_to_core_register(name, policy) + } else { + policy_to_core_update(name, policy) + }; + core.register_zone(request).await?; + + for allowed_zone in &policy.policy.network.allowed_zones { + if let Err(error) = core + .allow_comm(AllowCommRequest { + zone_a: name.to_string(), + zone_b: allowed_zone.clone(), + }) + .await + { + warn!(zone = %name, peer = %allowed_zone, error = %error, "allow_comm failed"); + } + } + + for path in &policy.policy.filesystem.host_paths { + core.register_host_path(RegisterHostPathRequest { + zone_name: name.to_string(), + path: path.clone(), + recursive: true, + }) + .await?; + } + + Ok(()) +} + async fn update_zone_with_refresh( cp: &CpClient, team_id: Uuid, diff --git a/syva-adapter-file/src/translate.rs b/syva-adapter-file/src/translate.rs index 01e53f8..bcf73f8 100644 --- a/syva-adapter-file/src/translate.rs +++ b/syva-adapter-file/src/translate.rs @@ -1,5 +1,6 @@ use crate::policy::FilePolicy; use anyhow::Result; +use syva_core_client::syva_core::{RegisterZoneRequest, ZonePolicy}; use syva_cp_client::{CreateZoneArgs, UpdateZoneArgs, ZoneSnapshot}; use uuid::Uuid; @@ -48,3 +49,92 @@ pub fn policy_to_update_args( metadata_json: None, })) } + +/// Translate a TOML policy into the node-local core API. +/// +/// Local core has no team, display-name, node-selector, or metadata concepts: +/// it writes desired state into one node's BPF maps, so CP-only fields are +/// intentionally dropped here. Updates use the same RegisterZone RPC because +/// `syva.core.v1` has no separate UpdateZone operation. +pub fn policy_to_core_register(name: &str, policy: &FilePolicy) -> RegisterZoneRequest { + RegisterZoneRequest { + zone_name: name.to_string(), + policy: Some(ZonePolicy { + host_paths: policy.policy.filesystem.host_paths.clone(), + allowed_zones: policy.policy.network.allowed_zones.clone(), + allow_ptrace: policy + .policy + .capabilities + .allowed + .iter() + .any(|cap| is_ptrace_cap(cap)), + zone_type: if policy + .policy + .zone + .zone_type + .eq_ignore_ascii_case("privileged") + { + 1 + } else { + 0 + }, + }), + } +} + +pub fn policy_to_core_update(name: &str, policy: &FilePolicy) -> RegisterZoneRequest { + policy_to_core_register(name, policy) +} + +fn is_ptrace_cap(capability: &str) -> bool { + let normalized = capability.to_ascii_uppercase(); + normalized == "CAP_SYS_PTRACE" || normalized == "SYS_PTRACE" +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::ZonePolicy as FileZonePolicy; + + #[test] + fn maps_file_policy_to_core_register() { + let mut policy = FileZonePolicy::default(); + policy.filesystem.host_paths = vec!["/srv/web".to_string()]; + policy.network.allowed_zones = vec!["db".to_string()]; + policy.capabilities.allowed = vec!["CAP_SYS_PTRACE".to_string()]; + policy.zone.zone_type = "privileged".to_string(); + + let request = policy_to_core_register( + "web", + &FilePolicy { + display_name: Some("Web".to_string()), + selector: Some(serde_json::json!({"all_nodes": true})), + policy, + }, + ); + + let core_policy = request.policy.expect("policy"); + assert_eq!(request.zone_name, "web"); + assert_eq!(core_policy.host_paths, vec!["/srv/web"]); + assert_eq!(core_policy.allowed_zones, vec!["db"]); + assert!(core_policy.allow_ptrace); + assert_eq!(core_policy.zone_type, 1); + } + + #[test] + fn maps_non_privileged_file_policy_to_standard_core_zone() { + let mut policy = FileZonePolicy::default(); + policy.zone.zone_type = "isolated".to_string(); + + let request = policy_to_core_register( + "worker", + &FilePolicy { + display_name: None, + selector: None, + policy, + }, + ); + + assert_eq!(request.policy.expect("policy").zone_type, 0); + } +} diff --git a/syva-adapter-k8s/Cargo.toml b/syva-adapter-k8s/Cargo.toml index 9f9a7cc..fcc6cff 100644 --- a/syva-adapter-k8s/Cargo.toml +++ b/syva-adapter-k8s/Cargo.toml @@ -8,6 +8,7 @@ name = "syva-k8s" path = "src/main.rs" [dependencies] +syva-core-client = { path = "../syva-core-client" } syva-cp-client = { path = "../syva-cp-client" } tokio = { workspace = true } tracing = { workspace = true } diff --git a/syva-adapter-k8s/src/main.rs b/syva-adapter-k8s/src/main.rs index 8d59d37..d6334ce 100644 --- a/syva-adapter-k8s/src/main.rs +++ b/syva-adapter-k8s/src/main.rs @@ -4,6 +4,7 @@ mod watcher; use anyhow::Result; use clap::Parser; +use std::path::PathBuf; use uuid::Uuid; #[derive(Parser, Debug)] @@ -14,12 +15,16 @@ struct Cli { namespace: String, /// syva-cp gRPC endpoint. - #[arg(long, env = "SYVA_CP_ENDPOINT")] - cp_endpoint: String, + #[arg(long, env = "SYVA_CP_ENDPOINT", conflicts_with = "core_socket")] + cp_endpoint: Option, + + /// Local syva-core Unix socket. + #[arg(long, env = "SYVA_CORE_SOCKET", conflicts_with = "cp_endpoint")] + core_socket: Option, /// Team UUID this adapter manages zones for. #[arg(long, env = "SYVA_TEAM_ID")] - team_id: Uuid, + team_id: Option, } #[tokio::main] @@ -35,6 +40,7 @@ async fn main() -> Result<()> { watcher::run(watcher::Config { namespace: cli.namespace, cp_endpoint: cli.cp_endpoint, + core_socket: cli.core_socket, team_id: cli.team_id, }) .await diff --git a/syva-adapter-k8s/src/mapper.rs b/syva-adapter-k8s/src/mapper.rs index c5b00ee..1924395 100644 --- a/syva-adapter-k8s/src/mapper.rs +++ b/syva-adapter-k8s/src/mapper.rs @@ -1,5 +1,6 @@ use crate::crd::{SyvaZonePolicy, ZoneTypeSpec}; use anyhow::Result; +use syva_core_client::syva_core::{RegisterZoneRequest, ZonePolicy}; use syva_cp_client::{CreateZoneArgs, UpdateZoneArgs, ZoneSnapshot}; use uuid::Uuid; @@ -46,6 +47,40 @@ pub fn spec_to_update_args( })) } +/// Translate a SyvaZonePolicy CRD into the node-local core API. +/// +/// Local core applies policy to one node and has no team ownership, selector, +/// display-name, or metadata concepts. The CRD selector is intentionally +/// dropped here; Kubernetes scheduling decides which adapter instance sees the +/// CRD, not the local core. +pub fn spec_to_core_register(name: &str, crd: &SyvaZonePolicy) -> RegisterZoneRequest { + let spec = &crd.spec; + RegisterZoneRequest { + zone_name: name.to_string(), + policy: Some(ZonePolicy { + host_paths: spec + .filesystem + .as_ref() + .map(|filesystem| filesystem.host_paths.clone()) + .unwrap_or_default(), + allowed_zones: spec + .network + .as_ref() + .map(|network| network.allowed_zones.clone()) + .unwrap_or_default(), + allow_ptrace: spec + .process + .as_ref() + .map(|process| process.allow_ptrace) + .unwrap_or(false), + zone_type: match spec.zone_type.as_ref().unwrap_or(&ZoneTypeSpec::Standard) { + ZoneTypeSpec::Privileged => 1, + ZoneTypeSpec::Standard | ZoneTypeSpec::Isolated => 0, + }, + }), + } +} + fn spec_to_policy_json(crd: &SyvaZonePolicy) -> Result { let spec = &crd.spec; Ok(serde_json::json!({ @@ -126,4 +161,49 @@ mod tests { assert_eq!(value["nodeNames"], serde_json::json!(["n1"])); assert_eq!(value["matchLabels"]["tier"], serde_json::json!("prod")); } + + #[test] + fn maps_crd_to_core_register_and_drops_selector() { + let mut labels = BTreeMap::new(); + labels.insert("tier".to_string(), "prod".to_string()); + let resource = crd(SyvaZonePolicySpec { + filesystem: Some(FilesystemSpec { + host_paths: vec!["/data".into()], + }), + network: Some(NetworkSpec { + allowed_zones: vec!["db".into()], + }), + process: Some(ProcessSpec { allow_ptrace: true }), + selector: Some(SelectorSpec { + all_nodes: false, + node_names: vec!["node-a".into()], + match_labels: labels, + }), + zone_type: Some(ZoneTypeSpec::Privileged), + }); + + let request = spec_to_core_register("web", &resource); + let policy = request.policy.expect("policy"); + + assert_eq!(request.zone_name, "web"); + assert_eq!(policy.host_paths, vec!["/data"]); + assert_eq!(policy.allowed_zones, vec!["db"]); + assert!(policy.allow_ptrace); + assert_eq!(policy.zone_type, 1); + } + + #[test] + fn maps_isolated_crd_to_standard_core_zone_until_core_accepts_isolated() { + let resource = crd(SyvaZonePolicySpec { + filesystem: None, + network: None, + process: None, + selector: None, + zone_type: Some(ZoneTypeSpec::Isolated), + }); + + let request = spec_to_core_register("worker", &resource); + + assert_eq!(request.policy.expect("policy").zone_type, 0); + } } diff --git a/syva-adapter-k8s/src/watcher.rs b/syva-adapter-k8s/src/watcher.rs index be8d568..499adcc 100644 --- a/syva-adapter-k8s/src/watcher.rs +++ b/syva-adapter-k8s/src/watcher.rs @@ -1,48 +1,106 @@ use crate::crd::SyvaZonePolicy; -use crate::mapper::{spec_to_create_args, spec_to_update_args}; +use crate::mapper::{spec_to_core_register, spec_to_create_args, spec_to_update_args}; use anyhow::{Context, Result}; use futures::StreamExt; use kube::runtime::watcher::{watcher, Config as WatcherConfig, Event}; use kube::{Api, Client as KubeClient}; use std::collections::{HashMap, HashSet}; +use std::path::PathBuf; use std::time::Duration; +use syva_core_client::syva_core::{ListZonesRequest, RemoveZoneRequest}; use syva_cp_client::{CpClient, CpClientConfig, DeleteZoneArgs, ZoneSnapshot}; use tracing::{info, warn}; use uuid::Uuid; pub struct Config { pub namespace: String, - pub cp_endpoint: String, - pub team_id: Uuid, + pub cp_endpoint: Option, + pub core_socket: Option, + pub team_id: Option, } pub async fn run(config: Config) -> Result<()> { - let cp = connect_with_retry(&config.cp_endpoint).await; + match (&config.cp_endpoint, &config.core_socket) { + (Some(_), Some(_)) => { + anyhow::bail!("--cp-endpoint and --core-socket are mutually exclusive") + } + (None, None) => anyhow::bail!("exactly one of --cp-endpoint or --core-socket is required"), + (Some(_), None) if config.team_id.is_none() => { + anyhow::bail!("--team-id is required when using --cp-endpoint") + } + _ => {} + } let kube = KubeClient::try_default().await?; let crds: Api = Api::namespaced(kube.clone(), &config.namespace); + match (config.cp_endpoint.clone(), config.core_socket.clone()) { + (Some(endpoint), None) => run_cp_mode(config, crds, &endpoint).await, + (None, Some(socket_path)) => run_core_mode(config, crds, socket_path).await, + _ => unreachable!("validated above"), + } +} + +async fn run_cp_mode(config: Config, crds: Api, endpoint: &str) -> Result<()> { + let cp = connect_with_retry(endpoint).await; + let team_id = config.team_id.context("missing team_id")?; + + info!(namespace = %config.namespace, team_id = %team_id, "syva-k8s starting"); + info!( + "pod annotation and container membership reconciliation are deferred until ContainerService is implemented" + ); + + initial_reconcile(&cp, &crds, team_id).await?; + + let mut stream = watcher(crds, WatcherConfig::default()).boxed(); + while let Some(event) = stream.next().await { + match event { + Ok(Event::Apply(crd)) => { + if let Err(error) = handle_apply(&cp, team_id, &crd).await { + warn!(name = ?crd.metadata.name, error = %error, "apply failed"); + } + } + Ok(Event::Delete(crd)) => { + if let Err(error) = handle_delete(&cp, team_id, &crd).await { + warn!(name = ?crd.metadata.name, error = %error, "delete failed"); + } + } + Ok(Event::Init) | Ok(Event::InitDone) | Ok(Event::InitApply(_)) => {} + Err(error) => warn!("watcher error: {error}"), + } + } + + Ok(()) +} + +async fn run_core_mode( + config: Config, + crds: Api, + socket_path: PathBuf, +) -> Result<()> { + let mut core = syva_core_client::connect_unix_socket_with_retry(socket_path.clone()).await; + info!( namespace = %config.namespace, - team_id = %config.team_id, - "syva-k8s starting" + socket = %socket_path.display(), + "syva-k8s starting in local-core mode" ); info!( "pod annotation and container membership reconciliation are deferred until ContainerService is implemented" ); - initial_reconcile(&cp, &crds, config.team_id).await?; + initial_reconcile_core(&mut core, &crds).await?; let mut stream = watcher(crds, WatcherConfig::default()).boxed(); while let Some(event) = stream.next().await { match event { Ok(Event::Apply(crd)) => { - if let Err(error) = handle_apply(&cp, config.team_id, &crd).await { + if let Err(error) = handle_apply_core(&mut core, &crd).await { warn!(name = ?crd.metadata.name, error = %error, "apply failed"); } } Ok(Event::Delete(crd)) => { - if let Err(error) = handle_delete(&cp, config.team_id, &crd).await { + if let Err(error) = handle_delete_core(&mut core, &crd).await { warn!(name = ?crd.metadata.name, error = %error, "delete failed"); } } @@ -80,15 +138,13 @@ async fn connect_with_retry(endpoint: &str) -> CpClient { } } -async fn initial_reconcile( - cp: &CpClient, - crds: &Api, - team_id: Uuid, -) -> Result<()> { +async fn initial_reconcile(cp: &CpClient, crds: &Api, team_id: Uuid) -> Result<()> { let crd_list = crds.list(&Default::default()).await?; let in_cp = cp.list_zones(team_id, None, 500).await?; - let in_cp_by_name: HashMap = - in_cp.into_iter().map(|zone| (zone.name.clone(), zone)).collect(); + let in_cp_by_name: HashMap = in_cp + .into_iter() + .map(|zone| (zone.name.clone(), zone)) + .collect(); let mut crd_names = HashSet::new(); for crd in &crd_list { @@ -140,8 +196,12 @@ async fn initial_reconcile( }) .await { - Ok(()) => info!(zone = %name, "zone deleted (no matching CRD) after refresh"), - Err(retry_error) => warn!(zone = %name, error = %retry_error, "initial delete failed after refresh"), + Ok(()) => { + info!(zone = %name, "zone deleted (no matching CRD) after refresh") + } + Err(retry_error) => { + warn!(zone = %name, error = %retry_error, "initial delete failed after refresh") + } } } _ => {} @@ -154,6 +214,41 @@ async fn initial_reconcile( Ok(()) } +async fn initial_reconcile_core( + core: &mut syva_core_client::SyvaCoreClient, + crds: &Api, +) -> Result<()> { + let crd_list = crds.list(&Default::default()).await?; + let in_core = core + .list_zones(ListZonesRequest {}) + .await? + .into_inner() + .zones; + let in_core_by_name: HashSet = in_core.into_iter().map(|zone| zone.name).collect(); + + let mut crd_names = HashSet::new(); + for crd in &crd_list { + let Some(name) = crd.metadata.name.clone() else { + continue; + }; + crd_names.insert(name.clone()); + core.register_zone(spec_to_core_register(&name, crd)) + .await?; + info!(zone = %name, "zone registered from CRD (initial)"); + } + + for name in in_core_by_name.difference(&crd_names) { + core.remove_zone(RemoveZoneRequest { + zone_name: name.clone(), + drain: true, + }) + .await?; + info!(zone = %name, "zone removed from local core (no matching CRD)"); + } + + Ok(()) +} + async fn handle_apply(cp: &CpClient, team_id: Uuid, crd: &SyvaZonePolicy) -> Result<()> { let name = crd .metadata @@ -215,6 +310,41 @@ async fn handle_delete(cp: &CpClient, team_id: Uuid, crd: &SyvaZonePolicy) -> Re Ok(()) } +pub(crate) async fn handle_apply_core( + core: &mut syva_core_client::SyvaCoreClient, + crd: &SyvaZonePolicy, +) -> Result<()> { + let name = crd + .metadata + .name + .clone() + .context("CRD missing metadata.name")?; + + core.register_zone(spec_to_core_register(&name, crd)) + .await?; + info!(zone = %name, "zone registered from CRD"); + Ok(()) +} + +pub(crate) async fn handle_delete_core( + core: &mut syva_core_client::SyvaCoreClient, + crd: &SyvaZonePolicy, +) -> Result<()> { + let name = crd + .metadata + .name + .clone() + .context("CRD missing metadata.name")?; + + core.remove_zone(RemoveZoneRequest { + zone_name: name.clone(), + drain: true, + }) + .await?; + info!(zone = %name, "zone deleted (CRD removed)"); + Ok(()) +} + async fn update_zone_with_refresh( cp: &CpClient, team_id: Uuid, diff --git a/syva-core-client/Cargo.toml b/syva-core-client/Cargo.toml new file mode 100644 index 0000000..e7f789c --- /dev/null +++ b/syva-core-client/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "syva-core-client" +version.workspace = true +edition.workspace = true + +[dependencies] +syva-proto = { path = "../syva-proto" } +tokio = { workspace = true } +tonic = { workspace = true } +tower = { version = "0.4", features = ["util"] } +hyper-util = { version = "0.1", features = ["tokio"] } +thiserror = { workspace = true } +tracing = { workspace = true } diff --git a/syva-core-client/src/lib.rs b/syva-core-client/src/lib.rs new file mode 100644 index 0000000..87acec5 --- /dev/null +++ b/syva-core-client/src/lib.rs @@ -0,0 +1,64 @@ +//! Unix-socket client for the node-local `syva.core.v1` API. + +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use syva_proto::syva_core::syva_core_client::SyvaCoreClient as ProtoSyvaCoreClient; +use tonic::transport::{Channel, Endpoint}; + +pub use syva_proto::syva_core; + +#[derive(Debug, thiserror::Error)] +pub enum CoreClientError { + #[error("invalid endpoint: {0}")] + InvalidEndpoint(String), + + #[error("connection failed: {0}")] + Connection(#[from] tonic::transport::Error), + + #[error("grpc error: {0}")] + Grpc(#[from] tonic::Status), +} + +pub type SyvaCoreClient = ProtoSyvaCoreClient; + +pub async fn connect_unix_socket( + socket_path: impl AsRef, +) -> Result { + let path = socket_path.as_ref().to_path_buf(); + let endpoint = Endpoint::try_from("http://[::]:50051") + .map_err(|error| CoreClientError::InvalidEndpoint(error.to_string()))?; + + let channel = endpoint + .connect_with_connector(tower::service_fn(move |_: tonic::transport::Uri| { + let path = path.clone(); + async move { + let stream = tokio::net::UnixStream::connect(&path).await?; + Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(stream)) + } + })) + .await?; + + Ok(ProtoSyvaCoreClient::new(channel)) +} + +pub async fn connect_unix_socket_with_retry(socket_path: PathBuf) -> SyvaCoreClient { + let mut backoff = Duration::from_millis(250); + let max_backoff = Duration::from_secs(30); + + loop { + match connect_unix_socket(&socket_path).await { + Ok(client) => return client, + Err(error) => { + tracing::warn!( + socket = %socket_path.display(), + error = %error, + backoff_ms = backoff.as_millis(), + "could not connect to syva-core; retrying" + ); + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(max_backoff); + } + } + } +} From ba1609bf01952d7c2b00d259c7041c78b00eb53b Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Tue, 28 Apr 2026 01:36:58 +0200 Subject: [PATCH 2/2] fix phase 2 local mode review issues --- syva-adapter-api/src/routes.rs | 31 +++++--- syva-adapter-file/src/diff.rs | 38 ++++++++- syva-adapter-file/src/run.rs | 137 ++++++++++++++++++++++++++++---- syva-adapter-k8s/src/watcher.rs | 136 ++++++++++++++++++++++++++++++- 4 files changed, 315 insertions(+), 27 deletions(-) diff --git a/syva-adapter-api/src/routes.rs b/syva-adapter-api/src/routes.rs index 3ba38af..64917de 100644 --- a/syva-adapter-api/src/routes.rs +++ b/syva-adapter-api/src/routes.rs @@ -71,7 +71,6 @@ pub struct UpdateZoneBody { #[serde(default, deny_unknown_fields)] struct CorePolicyJson { host_paths: Vec, - allowed_zones: Vec, allow_ptrace: bool, zone_type: CoreZoneType, } @@ -80,7 +79,6 @@ impl Default for CorePolicyJson { fn default() -> Self { Self { host_paths: Vec::new(), - allowed_zones: Vec::new(), allow_ptrace: false, zone_type: CoreZoneType::Standard, } @@ -267,7 +265,9 @@ pub async fn list_zones( if let Some(status) = query.status { zones.retain(|zone| zone.state == status); } - zones.truncate(query.limit.unwrap_or(100).max(0) as usize); + let limit = query.limit.unwrap_or(100).max(0); + let limit = usize::try_from(limit).unwrap_or(usize::MAX); + zones.truncate(limit); Ok(Json(zones.into_iter().map(core_zone_to_out).collect())) } } @@ -432,12 +432,12 @@ fn core_register_request( name: &str, policy_json: JsonValue, ) -> Result { - // Local-core mode intentionally accepts only the fields represented by - // syva.core.v1.ZonePolicy; CP-only request fields are ignored by handlers. + // Local-core API calls have no global policy view, so cross-zone comms must + // come from reconciling adapters that can derive mutual allow pairs. let policy: CorePolicyJson = serde_json::from_value(policy_json).map_err(|error| ApiError { status: StatusCode::BAD_REQUEST, message: format!( - "local-core mode expects policy_json with host_paths, allowed_zones, allow_ptrace, and zone_type only: {error}" + "local-core mode expects policy_json with host_paths, allow_ptrace, and zone_type only: {error}" ), })?; @@ -445,7 +445,7 @@ fn core_register_request( zone_name: name.to_string(), policy: Some(ZonePolicy { host_paths: policy.host_paths, - allowed_zones: policy.allowed_zones, + allowed_zones: Vec::new(), allow_ptrace: policy.allow_ptrace, zone_type: match policy.zone_type { CoreZoneType::Privileged => 1, @@ -548,7 +548,6 @@ mod tests { "web", serde_json::json!({ "host_paths": ["/data"], - "allowed_zones": ["db"], "allow_ptrace": true, "zone_type": "privileged" }), @@ -558,7 +557,7 @@ mod tests { let policy = request.policy.expect("policy"); assert_eq!(request.zone_name, "web"); assert_eq!(policy.host_paths, vec!["/data"]); - assert_eq!(policy.allowed_zones, vec!["db"]); + assert!(policy.allowed_zones.is_empty()); assert!(policy.allow_ptrace); assert_eq!(policy.zone_type, 1); } @@ -576,4 +575,18 @@ mod tests { assert_eq!(error.status, StatusCode::BAD_REQUEST); } + + #[test] + fn core_register_request_rejects_allowed_zones_without_global_reconcile() { + let error = core_register_request( + "web", + serde_json::json!({ + "host_paths": [], + "allowed_zones": ["db"] + }), + ) + .expect_err("API local mode should not accept comm policy"); + + assert_eq!(error.status, StatusCode::BAD_REQUEST); + } } diff --git a/syva-adapter-file/src/diff.rs b/syva-adapter-file/src/diff.rs index c46b46e..f87367a 100644 --- a/syva-adapter-file/src/diff.rs +++ b/syva-adapter-file/src/diff.rs @@ -13,6 +13,7 @@ pub struct CoreDiff { pub fn diff_against_core( desired: &HashMap, applied: &[ZoneSummary], + last_applied: &HashMap, ) -> CoreDiff { let desired_names: BTreeSet<&str> = desired.keys().map(String::as_str).collect(); let applied_names: BTreeSet<&str> = applied.iter().map(|zone| zone.name.as_str()).collect(); @@ -23,6 +24,14 @@ pub fn diff_against_core( .collect(); let update = desired_names .intersection(&applied_names) + .filter(|name| { + let name = **name; + desired + .get(name) + .and_then(|policy| serde_json::to_value(policy).ok()) + .as_ref() + != last_applied.get(name) + }) .map(|name| (*name).to_string()) .collect(); let remove = applied_names @@ -77,7 +86,7 @@ mod tests { ]; assert_eq!( - diff_against_core(&desired, &applied), + diff_against_core(&desired, &applied, &HashMap::new()), CoreDiff { create: vec!["api".to_string()], update: vec!["web".to_string()], @@ -85,4 +94,31 @@ mod tests { } ); } + + #[test] + fn skips_updates_when_desired_policy_matches_last_applied_snapshot() { + let policy = FilePolicy { + display_name: None, + selector: None, + policy: Default::default(), + }; + let mut desired = HashMap::new(); + desired.insert("web".to_string(), policy.clone()); + let applied = vec![ZoneSummary { + name: "web".to_string(), + zone_id: 1, + state: "active".to_string(), + containers_active: 0, + }]; + let mut last_applied = HashMap::new(); + last_applied.insert( + "web".to_string(), + serde_json::to_value(policy).expect("policy"), + ); + + assert_eq!( + diff_against_core(&desired, &applied, &last_applied), + CoreDiff::default() + ); + } } diff --git a/syva-adapter-file/src/run.rs b/syva-adapter-file/src/run.rs index 9616fcf..30dc1cb 100644 --- a/syva-adapter-file/src/run.rs +++ b/syva-adapter-file/src/run.rs @@ -3,11 +3,12 @@ use crate::translate::{ policy_to_core_register, policy_to_core_update, policy_to_create_args, policy_to_update_args, }; use anyhow::{Context, Result}; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::path::PathBuf; use std::time::Duration; use syva_core_client::syva_core::{ - AllowCommRequest, ListZonesRequest, RegisterHostPathRequest, RemoveZoneRequest, + AllowCommRequest, DenyCommRequest, ListCommsRequest, ListZonesRequest, RegisterHostPathRequest, + RemoveZoneRequest, }; use syva_cp_client::{CpClient, CpClientConfig, DeleteZoneArgs, ZoneSnapshot}; use tracing::{debug, info, warn}; @@ -83,6 +84,7 @@ async fn run_cp_mode(endpoint: &str, config: &Config) -> Result<()> { async fn run_core_mode(socket_path: PathBuf, config: &Config) -> Result<()> { let mut core = syva_core_client::connect_unix_socket_with_retry(socket_path.clone()).await; + let mut last_applied = HashMap::new(); info!( policy_dir = %config.policy_dir.display(), @@ -99,7 +101,7 @@ async fn run_core_mode(socket_path: PathBuf, config: &Config) -> Result<()> { loop { tokio::select! { _ = ticker.tick() => { - match reconcile_once_core(&mut core, config).await { + match reconcile_once_core(&mut core, config, &mut last_applied).await { Ok(stats) if stats.changed > 0 => { info!( created = stats.created, @@ -256,6 +258,7 @@ async fn reconcile_once_cp( async fn reconcile_once_core( core: &mut syva_core_client::SyvaCoreClient, config: &Config, + last_applied: &mut HashMap, ) -> Result { let on_disk = load_policies_from_dir(&config.policy_dir) .with_context(|| format!("load policies from {}", config.policy_dir.display()))?; @@ -264,7 +267,7 @@ async fn reconcile_once_core( .await? .into_inner() .zones; - let diff = crate::diff::diff_against_core(&on_disk, &in_core); + let diff = crate::diff::diff_against_core(&on_disk, &in_core, last_applied); let mut stats = ReconcileStats::default(); @@ -273,6 +276,7 @@ async fn reconcile_once_core( continue; }; apply_core_register(core, &name, policy, true).await?; + last_applied.insert(name.clone(), serde_json::to_value(policy)?); stats.created += 1; stats.changed += 1; info!(zone = %name, "zone registered in local core"); @@ -283,6 +287,7 @@ async fn reconcile_once_core( continue; }; apply_core_register(core, &name, policy, false).await?; + last_applied.insert(name.clone(), serde_json::to_value(policy)?); stats.updated += 1; stats.changed += 1; debug!(zone = %name, "zone refreshed in local core"); @@ -294,11 +299,14 @@ async fn reconcile_once_core( drain: true, }) .await?; + last_applied.remove(&name); stats.deleted += 1; stats.changed += 1; info!(zone = %name, "zone removal requested in local core"); } + reconcile_core_comms(core, &on_disk).await?; + Ok(stats) } @@ -313,19 +321,22 @@ async fn apply_core_register( } else { policy_to_core_update(name, policy) }; - core.register_zone(request).await?; - - for allowed_zone in &policy.policy.network.allowed_zones { - if let Err(error) = core - .allow_comm(AllowCommRequest { - zone_a: name.to_string(), - zone_b: allowed_zone.clone(), + if !create { + let removed = core + .remove_zone(RemoveZoneRequest { + zone_name: name.to_string(), + drain: false, }) - .await - { - warn!(zone = %name, peer = %allowed_zone, error = %error, "allow_comm failed"); + .await? + .into_inner(); + if !removed.ok { + anyhow::bail!( + "cannot update local-core zone '{name}' authoritatively: {}", + removed.message + ); } } + core.register_zone(request).await?; for path in &policy.policy.filesystem.host_paths { core.register_host_path(RegisterHostPathRequest { @@ -339,6 +350,72 @@ async fn apply_core_register( Ok(()) } +async fn reconcile_core_comms( + core: &mut syva_core_client::SyvaCoreClient, + policies: &HashMap, +) -> Result<()> { + let desired = desired_mutual_comm_pairs(policies); + let current = core + .list_comms(ListCommsRequest { + zone_name: String::new(), + }) + .await? + .into_inner() + .pairs + .into_iter() + .map(|pair| canonical_pair(&pair.zone_a, &pair.zone_b)) + .collect::>(); + + for (zone_a, zone_b) in desired.difference(¤t) { + core.allow_comm(AllowCommRequest { + zone_a: zone_a.clone(), + zone_b: zone_b.clone(), + }) + .await?; + } + + for (zone_a, zone_b) in current.difference(&desired) { + if policies.contains_key(zone_a) && policies.contains_key(zone_b) { + core.deny_comm(DenyCommRequest { + zone_a: zone_a.clone(), + zone_b: zone_b.clone(), + }) + .await?; + } + } + + Ok(()) +} + +fn desired_mutual_comm_pairs(policies: &HashMap) -> BTreeSet<(String, String)> { + let mut pairs = BTreeSet::new(); + for (zone, policy) in policies { + for peer in &policy.policy.network.allowed_zones { + let Some(peer_policy) = policies.get(peer) else { + continue; + }; + if peer_policy + .policy + .network + .allowed_zones + .iter() + .any(|candidate| candidate == zone) + { + pairs.insert(canonical_pair(zone, peer)); + } + } + } + pairs +} + +fn canonical_pair(zone_a: &str, zone_b: &str) -> (String, String) { + if zone_a <= zone_b { + (zone_a.to_string(), zone_b.to_string()) + } else { + (zone_b.to_string(), zone_a.to_string()) + } +} + async fn update_zone_with_refresh( cp: &CpClient, team_id: Uuid, @@ -375,3 +452,35 @@ fn is_retryable_conflict(error: &syva_cp_client::CpClientError) -> bool { _ => false, } } + +#[cfg(test)] +mod tests { + use super::*; + + fn policy(allowed_zones: &[&str]) -> FilePolicy { + let mut policy = crate::types::ZonePolicy::default(); + policy.network.allowed_zones = allowed_zones + .iter() + .map(|zone| (*zone).to_string()) + .collect(); + FilePolicy { + display_name: None, + selector: None, + policy, + } + } + + #[test] + fn derives_only_mutual_comm_pairs() { + let policies = HashMap::from([ + ("web".to_string(), policy(&["api", "db"])), + ("api".to_string(), policy(&["web"])), + ("db".to_string(), policy(&[])), + ]); + + assert_eq!( + desired_mutual_comm_pairs(&policies), + BTreeSet::from([("api".to_string(), "web".to_string())]) + ); + } +} diff --git a/syva-adapter-k8s/src/watcher.rs b/syva-adapter-k8s/src/watcher.rs index 499adcc..590cd48 100644 --- a/syva-adapter-k8s/src/watcher.rs +++ b/syva-adapter-k8s/src/watcher.rs @@ -4,10 +4,12 @@ use anyhow::{Context, Result}; use futures::StreamExt; use kube::runtime::watcher::{watcher, Config as WatcherConfig, Event}; use kube::{Api, Client as KubeClient}; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::path::PathBuf; use std::time::Duration; -use syva_core_client::syva_core::{ListZonesRequest, RemoveZoneRequest}; +use syva_core_client::syva_core::{ + AllowCommRequest, DenyCommRequest, ListCommsRequest, ListZonesRequest, RemoveZoneRequest, +}; use syva_cp_client::{CpClient, CpClientConfig, DeleteZoneArgs, ZoneSnapshot}; use tracing::{info, warn}; use uuid::Uuid; @@ -91,17 +93,21 @@ async fn run_core_mode( initial_reconcile_core(&mut core, &crds).await?; - let mut stream = watcher(crds, WatcherConfig::default()).boxed(); + let mut stream = watcher(crds.clone(), WatcherConfig::default()).boxed(); while let Some(event) = stream.next().await { match event { Ok(Event::Apply(crd)) => { if let Err(error) = handle_apply_core(&mut core, &crd).await { warn!(name = ?crd.metadata.name, error = %error, "apply failed"); + } else if let Err(error) = reconcile_core_comms(&mut core, &crds).await { + warn!(error = %error, "communication reconcile failed"); } } Ok(Event::Delete(crd)) => { if let Err(error) = handle_delete_core(&mut core, &crd).await { warn!(name = ?crd.metadata.name, error = %error, "delete failed"); + } else if let Err(error) = reconcile_core_comms(&mut core, &crds).await { + warn!(error = %error, "communication reconcile failed"); } } Ok(Event::Init) | Ok(Event::InitDone) | Ok(Event::InitApply(_)) => {} @@ -246,6 +252,8 @@ async fn initial_reconcile_core( info!(zone = %name, "zone removed from local core (no matching CRD)"); } + reconcile_core_comms(core, crds).await?; + Ok(()) } @@ -345,6 +353,87 @@ pub(crate) async fn handle_delete_core( Ok(()) } +async fn reconcile_core_comms( + core: &mut syva_core_client::SyvaCoreClient, + crds: &Api, +) -> Result<()> { + let crd_list = crds.list(&Default::default()).await?; + let policies = crd_list + .iter() + .filter_map(|crd| crd.metadata.name.as_ref().map(|name| (name.clone(), crd))) + .collect::>(); + let desired = desired_mutual_comm_pairs(&policies); + let current = core + .list_comms(ListCommsRequest { + zone_name: String::new(), + }) + .await? + .into_inner() + .pairs + .into_iter() + .map(|pair| canonical_pair(&pair.zone_a, &pair.zone_b)) + .collect::>(); + + for (zone_a, zone_b) in desired.difference(¤t) { + core.allow_comm(AllowCommRequest { + zone_a: zone_a.clone(), + zone_b: zone_b.clone(), + }) + .await?; + } + + for (zone_a, zone_b) in current.difference(&desired) { + if policies.contains_key(zone_a) && policies.contains_key(zone_b) { + core.deny_comm(DenyCommRequest { + zone_a: zone_a.clone(), + zone_b: zone_b.clone(), + }) + .await?; + } + } + + Ok(()) +} + +fn desired_mutual_comm_pairs( + policies: &HashMap, +) -> BTreeSet<(String, String)> { + let mut pairs = BTreeSet::new(); + for (zone, policy) in policies { + let Some(network) = policy.spec.network.as_ref() else { + continue; + }; + for peer in &network.allowed_zones { + let Some(peer_policy) = policies.get(peer) else { + continue; + }; + let mutual = peer_policy + .spec + .network + .as_ref() + .map(|network| { + network + .allowed_zones + .iter() + .any(|candidate| candidate == zone) + }) + .unwrap_or(false); + if mutual { + pairs.insert(canonical_pair(zone, peer)); + } + } + } + pairs +} + +fn canonical_pair(zone_a: &str, zone_b: &str) -> (String, String) { + if zone_a <= zone_b { + (zone_a.to_string(), zone_b.to_string()) + } else { + (zone_b.to_string(), zone_a.to_string()) + } +} + async fn update_zone_with_refresh( cp: &CpClient, team_id: Uuid, @@ -381,3 +470,44 @@ fn is_retryable_conflict(error: &syva_cp_client::CpClientError) -> bool { _ => false, } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::crd::{NetworkSpec, SyvaZonePolicySpec}; + + fn crd(name: &str, allowed_zones: &[&str]) -> SyvaZonePolicy { + SyvaZonePolicy::new( + name, + SyvaZonePolicySpec { + filesystem: None, + network: Some(NetworkSpec { + allowed_zones: allowed_zones + .iter() + .map(|zone| (*zone).to_string()) + .collect(), + }), + process: None, + selector: None, + zone_type: None, + }, + ) + } + + #[test] + fn derives_only_mutual_comm_pairs() { + let web = crd("web", &["api", "db"]); + let api = crd("api", &["web"]); + let db = crd("db", &[]); + let policies = HashMap::from([ + ("web".to_string(), &web), + ("api".to_string(), &api), + ("db".to_string(), &db), + ]); + + assert_eq!( + desired_mutual_comm_pairs(&policies), + BTreeSet::from([("api".to_string(), "web".to_string())]) + ); + } +}