From f4fe3647f8509d15841168f00ea7b891f81ffd59 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:02:49 +0200 Subject: [PATCH 01/16] feat(db): nodes, node_labels, assignments, assignment_versions schema --- .../20260420140000_nodes_and_assignments.sql | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 syva-cp/migrations/20260420140000_nodes_and_assignments.sql diff --git a/syva-cp/migrations/20260420140000_nodes_and_assignments.sql b/syva-cp/migrations/20260420140000_nodes_and_assignments.sql new file mode 100644 index 0000000..78d04f1 --- /dev/null +++ b/syva-cp/migrations/20260420140000_nodes_and_assignments.sql @@ -0,0 +1,79 @@ +-- Nodes: registered node agents +CREATE TABLE nodes ( + id UUID PRIMARY KEY, + node_name TEXT UNIQUE NOT NULL, + cluster_id TEXT NULL, + status TEXT NOT NULL CHECK (status IN ('online', 'offline', 'decommissioning', 'decommissioned')), + fingerprint TEXT NULL, + last_seen_at TIMESTAMPTZ NULL, + last_heartbeat_event_id UUID NULL REFERENCES control_plane_events(id), + current_token_expires_at TIMESTAMPTZ NULL, + capabilities_json JSONB NOT NULL DEFAULT '{}', + metadata_json JSONB NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL, + updated_at TIMESTAMPTZ NOT NULL, + version BIGINT NOT NULL, + caused_by_event_id UUID NULL REFERENCES control_plane_events(id) +); + +CREATE INDEX idx_nodes_status ON nodes(status) WHERE status IN ('online', 'offline'); +CREATE INDEX idx_nodes_last_seen ON nodes(last_seen_at DESC); +CREATE UNIQUE INDEX idx_nodes_fingerprint ON nodes(fingerprint) WHERE fingerprint IS NOT NULL; + +-- Node labels: key/value pairs used by selector matching. +CREATE TABLE node_labels ( + node_id UUID NOT NULL REFERENCES nodes(id) ON DELETE CASCADE, + key TEXT NOT NULL, + value TEXT NOT NULL, + PRIMARY KEY (node_id, key) +); + +CREATE INDEX idx_node_labels_key_value ON node_labels(key, value); + +-- Assignments: desired state, which zones should be present on which nodes. +CREATE TABLE assignments ( + id UUID PRIMARY KEY, + zone_id UUID NOT NULL REFERENCES zones(id), + node_id UUID NOT NULL REFERENCES nodes(id), + status TEXT NOT NULL CHECK (status IN ( + 'desired', 'applying', 'applied', 'drifted', 'removing', 'removed', 'failed' + )), + desired_policy_id UUID NOT NULL REFERENCES policies(id), + desired_zone_version BIGINT NOT NULL, + actual_policy_id UUID NULL REFERENCES policies(id), + actual_zone_version BIGINT NULL, + last_reported_at TIMESTAMPTZ NULL, + error_json JSONB NULL, + created_at TIMESTAMPTZ NOT NULL, + updated_at TIMESTAMPTZ NOT NULL, + version BIGINT NOT NULL, + caused_by_event_id UUID NOT NULL REFERENCES control_plane_events(id), + UNIQUE (zone_id, node_id) +); + +CREATE INDEX idx_assignments_node_status ON assignments(node_id, status); +CREATE INDEX idx_assignments_zone ON assignments(zone_id); +CREATE INDEX idx_assignments_status ON assignments(status) + WHERE status IN ('desired', 'applying', 'drifted', 'failed'); + +-- assignment_versions: append-only history snapshots. +CREATE TABLE assignment_versions ( + id UUID PRIMARY KEY, + assignment_id UUID NOT NULL REFERENCES assignments(id), + version BIGINT NOT NULL, + snapshot_json JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + caused_by_event_id UUID NOT NULL REFERENCES control_plane_events(id), + UNIQUE (assignment_id, version) +); + +CREATE INDEX idx_assignment_versions_assignment_version + ON assignment_versions(assignment_id, version DESC); + +CREATE TRIGGER assignment_versions_no_update + BEFORE UPDATE ON assignment_versions + FOR EACH ROW EXECUTE FUNCTION reject_mutation(); + +CREATE TRIGGER assignment_versions_no_delete + BEFORE DELETE ON assignment_versions + FOR EACH ROW EXECUTE FUNCTION reject_mutation(); From bf661209075dfdcd9e427cb8c6e17ede429f5b6f Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:05:44 +0200 Subject: [PATCH 02/16] feat(syva-cp): Node, Assignment, NodeSelector types with matching logic --- Cargo.lock | 70 ++++++++++++++++++++++++++++----- syva-cp/Cargo.toml | 1 + syva-cp/src/db/types.rs | 86 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 147 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5123749..da9bd11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -283,7 +283,7 @@ checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" dependencies = [ "getrandom 0.2.17", "instant", - "rand", + "rand 0.8.5", ] [[package]] @@ -1770,7 +1770,7 @@ dependencies = [ "num-integer", "num-iter", "num-traits", - "rand", + "rand 0.8.5", "smallvec", "zeroize", ] @@ -2164,8 +2164,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.5", ] [[package]] @@ -2175,7 +2185,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.5", ] [[package]] @@ -2187,6 +2207,15 @@ dependencies = [ "getrandom 0.2.17", ] +[[package]] +name = "rand_core" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" +dependencies = [ + "getrandom 0.3.4", +] + [[package]] name = "raw-cpuid" version = "11.6.0" @@ -2270,7 +2299,7 @@ dependencies = [ "num-traits", "pkcs1", "pkcs8", - "rand_core", + "rand_core 0.6.4", "signature", "spki", "subtle", @@ -2636,7 +2665,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ "digest", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -2818,7 +2847,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", - "rand", + "rand 0.8.5", "rsa", "serde", "sha1", @@ -2858,7 +2887,7 @@ dependencies = [ "md-5", "memchr", "once_cell", - "rand", + "rand 0.8.5", "serde", "serde_json", "sha2", @@ -3098,6 +3127,7 @@ dependencies = [ "tonic", "tracing", "tracing-subscriber", + "ulid", "uuid", ] @@ -3365,7 +3395,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -3501,6 +3531,16 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" +[[package]] +name = "ulid" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" +dependencies = [ + "rand 0.9.4", + "web-time", +] + [[package]] name = "unicode-bidi" version = "0.3.18" @@ -3738,6 +3778,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.26.11" diff --git a/syva-cp/Cargo.toml b/syva-cp/Cargo.toml index cb43926..e9b8d18 100644 --- a/syva-cp/Cargo.toml +++ b/syva-cp/Cargo.toml @@ -53,6 +53,7 @@ thiserror = { workspace = true } clap = { workspace = true } sha2 = "0.10" hex = "0.4" +ulid = "1.1" [dev-dependencies] # sqlx::test macro runs each test in an isolated database. diff --git a/syva-cp/src/db/types.rs b/syva-cp/src/db/types.rs index 2e25516..c24c653 100644 --- a/syva-cp/src/db/types.rs +++ b/syva-cp/src/db/types.rs @@ -114,3 +114,89 @@ impl PolicyInput { format!("sha256:{}", hex::encode(hash)) } } + +#[derive(Debug, Clone, FromRow, Serialize, Deserialize)] +pub struct Node { + pub id: Uuid, + pub node_name: String, + pub cluster_id: Option, + pub status: String, + pub fingerprint: Option, + pub last_seen_at: Option>, + pub last_heartbeat_event_id: Option, + pub current_token_expires_at: Option>, + pub capabilities_json: JsonValue, + pub metadata_json: JsonValue, + pub created_at: DateTime, + pub updated_at: DateTime, + pub version: i64, + pub caused_by_event_id: Option, +} + +#[derive(Debug, Clone, FromRow, Serialize, Deserialize)] +pub struct Assignment { + pub id: Uuid, + pub zone_id: Uuid, + pub node_id: Uuid, + pub status: String, + pub desired_policy_id: Uuid, + pub desired_zone_version: i64, + pub actual_policy_id: Option, + pub actual_zone_version: Option, + pub last_reported_at: Option>, + pub error_json: Option, + pub created_at: DateTime, + pub updated_at: DateTime, + pub version: i64, + pub caused_by_event_id: Uuid, +} + +/// Node label set, keyed for deterministic selector matching. +pub type NodeLabels = BTreeMap; + +/// NodeSelector — how a zone chooses its nodes. +/// +/// Stored in `zones.selector_json` and matched in Rust. +/// +/// If all three fields are empty/default, the selector matches nothing. +/// Use `all_nodes: true` to target every node. +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +pub struct NodeSelector { + #[serde(default)] + pub all_nodes: bool, + + #[serde(default)] + pub node_names: Vec, + + #[serde(default)] + pub match_labels: BTreeMap, +} + +impl NodeSelector { + pub fn from_json(value: &JsonValue) -> Result { + if value.is_null() { + return Ok(Self::default()); + } + + serde_json::from_value(value.clone()) + } + + pub fn matches(&self, node_name: &str, labels: &NodeLabels) -> bool { + if self.all_nodes { + return true; + } + + if !self.node_names.is_empty() && self.node_names.iter().any(|n| n == node_name) { + return true; + } + + if !self.match_labels.is_empty() { + return self + .match_labels + .iter() + .all(|(k, v)| labels.get(k).map(|x| x == v).unwrap_or(false)); + } + + false + } +} From 8415706934517f6b107b87490da2dc89d00052ca Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:06:38 +0200 Subject: [PATCH 03/16] feat(proto): NodeService, AssignmentService, ZoneAssignment streaming messages --- syva-proto/proto/syva_control.proto | 183 ++++++++++++++++++++++++++++ 1 file changed, 183 insertions(+) diff --git a/syva-proto/proto/syva_control.proto b/syva-proto/proto/syva_control.proto index 80264da..0870c87 100644 --- a/syva-proto/proto/syva_control.proto +++ b/syva-proto/proto/syva_control.proto @@ -198,3 +198,186 @@ message GetZoneContextRequest { message GetZoneContextResponse { string context_json = 1; } + +// ---- Node service ---- + +service NodeService { + rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse); + rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse); + rpc DecommissionNode(DecommissionNodeRequest) returns (DecommissionNodeResponse); + rpc SetNodeLabels(SetNodeLabelsRequest) returns (SetNodeLabelsResponse); + rpc GetNode(GetNodeRequest) returns (GetNodeResponse); + rpc ListNodes(ListNodesRequest) returns (ListNodesResponse); +} + +message Node { + string id = 1; + string node_name = 2; + string cluster_id = 3; + string status = 4; + string fingerprint = 5; + google.protobuf.Timestamp last_seen_at = 6; + google.protobuf.Timestamp created_at = 7; + google.protobuf.Timestamp updated_at = 8; + int64 version = 9; + map labels = 10; + string capabilities_json = 11; + string metadata_json = 12; +} + +message RegisterNodeRequest { + string node_name = 1; + string fingerprint = 2; + string cluster_id = 3; + map labels = 4; + string capabilities_json = 5; + string proposed_id = 6; +} + +message RegisterNodeResponse { + Node node = 1; + string assigned_id = 2; +} + +message HeartbeatRequest { + string node_id = 1; + string status_hint = 2; +} + +message HeartbeatResponse { + google.protobuf.Timestamp server_time = 1; +} + +message DecommissionNodeRequest { + string node_id = 1; + int64 if_version = 2; +} + +message DecommissionNodeResponse { + Node node = 1; +} + +message SetNodeLabelsRequest { + string node_id = 1; + int64 if_version = 2; + map labels = 3; +} + +message SetNodeLabelsResponse { + Node node = 1; +} + +message GetNodeRequest { + oneof identifier { + string id = 1; + string node_name = 2; + } +} + +message GetNodeResponse { + Node node = 1; +} + +message ListNodesRequest { + string status = 1; + int64 limit = 2; +} + +message ListNodesResponse { + repeated Node nodes = 1; +} + +// ---- Assignment service ---- + +service AssignmentService { + rpc SubscribeAssignments(SubscribeAssignmentsRequest) returns (stream NodeAssignmentUpdate); + rpc ReportAssignmentState(ReportAssignmentStateRequest) returns (ReportAssignmentStateResponse); + rpc GetAssignment(GetAssignmentRequest) returns (GetAssignmentResponse); + rpc ListAssignments(ListAssignmentsRequest) returns (ListAssignmentsResponse); +} + +message SubscribeAssignmentsRequest { + string node_id = 1; +} + +message NodeAssignmentUpdate { + enum Kind { + FULL_SNAPSHOT = 0; + UPSERT = 1; + REMOVE = 2; + } + + Kind kind = 1; + repeated ZoneAssignment assignments = 2; + string removed_zone_id = 3; + int64 server_revision = 4; +} + +message ZoneAssignment { + string assignment_id = 1; + string zone_id = 2; + string zone_name = 3; + int64 desired_zone_version = 4; + string desired_policy_id = 5; + int64 desired_policy_version = 6; + string policy_json = 7; + string team_id = 8; +} + +message ReportAssignmentStateRequest { + string node_id = 1; + repeated AppliedAssignment applied = 2; + repeated FailedAssignment failed = 3; +} + +message AppliedAssignment { + string assignment_id = 1; + int64 actual_zone_version = 2; + string actual_policy_id = 3; +} + +message FailedAssignment { + string assignment_id = 1; + string error_json = 2; +} + +message ReportAssignmentStateResponse { + int32 accepted_count = 1; + int32 rejected_count = 2; +} + +message Assignment { + string id = 1; + string zone_id = 2; + string node_id = 3; + string status = 4; + string desired_policy_id = 5; + int64 desired_zone_version = 6; + string actual_policy_id = 7; + int64 actual_zone_version = 8; + google.protobuf.Timestamp last_reported_at = 9; + string error_json = 10; + google.protobuf.Timestamp created_at = 11; + google.protobuf.Timestamp updated_at = 12; + int64 version = 13; + string caused_by_event_id = 14; +} + +message GetAssignmentRequest { + string assignment_id = 1; +} + +message GetAssignmentResponse { + Assignment assignment = 1; +} + +message ListAssignmentsRequest { + string zone_id = 1; + string node_id = 2; + string status = 3; + int64 limit = 4; +} + +message ListAssignmentsResponse { + repeated Assignment assignments = 1; +} From 218ac96e15e5f4f39b9ff371d806bcf7eacf9ae5 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:07:51 +0200 Subject: [PATCH 04/16] feat(syva-cp): pure assignment engine with selector matching and diff --- syva-cp/src/engine/assignment.rs | 257 +++++++++++++++++++++++++++++++ syva-cp/src/engine/mod.rs | 7 + syva-cp/src/lib.rs | 1 + 3 files changed, 265 insertions(+) create mode 100644 syva-cp/src/engine/assignment.rs create mode 100644 syva-cp/src/engine/mod.rs diff --git a/syva-cp/src/engine/assignment.rs b/syva-cp/src/engine/assignment.rs new file mode 100644 index 0000000..a147c8e --- /dev/null +++ b/syva-cp/src/engine/assignment.rs @@ -0,0 +1,257 @@ +use crate::db::types::{NodeLabels, NodeSelector}; +use std::collections::HashSet; +use uuid::Uuid; + +/// Input: a zone + its current policy + every online node. +/// Output: the desired set of (zone_id, node_id) assignments. +pub fn compute_zone_assignments( + zone: &ZoneForAssignment, + nodes: &[NodeForAssignment], +) -> Vec { + let selector = match NodeSelector::from_json(&zone.selector_json) { + Ok(selector) => selector, + Err(_) => return Vec::new(), + }; + + nodes + .iter() + .filter(|node| node.status == "online") + .filter(|node| selector.matches(&node.node_name, &node.labels)) + .map(|node| DesiredAssignment { + zone_id: zone.zone_id, + node_id: node.node_id, + desired_policy_id: zone.current_policy_id, + desired_zone_version: zone.zone_version, + }) + .collect() +} + +/// For a single node, across all active zones, compute every assignment that +/// should exist. +pub fn compute_node_assignments( + node: &NodeForAssignment, + zones: &[ZoneForAssignment], +) -> Vec { + if node.status != "online" { + return Vec::new(); + } + + zones + .iter() + .filter_map(|zone| { + let selector = NodeSelector::from_json(&zone.selector_json).ok()?; + if selector.matches(&node.node_name, &node.labels) { + Some(DesiredAssignment { + zone_id: zone.zone_id, + node_id: node.node_id, + desired_policy_id: zone.current_policy_id, + desired_zone_version: zone.zone_version, + }) + } else { + None + } + }) + .collect() +} + +/// Diff current vs desired. Returns (to_upsert, to_remove). +pub fn diff_assignments( + current: &[ExistingAssignment], + desired: &[DesiredAssignment], +) -> (Vec, Vec) { + let desired_ids: HashSet<(Uuid, Uuid)> = + desired.iter().map(|a| (a.zone_id, a.node_id)).collect(); + + let to_upsert = desired + .iter() + .filter(|desired_assignment| { + match current.iter().find(|existing| { + existing.zone_id == desired_assignment.zone_id + && existing.node_id == desired_assignment.node_id + }) { + None => true, + Some(existing) => { + existing.desired_policy_id != desired_assignment.desired_policy_id + || existing.desired_zone_version + != desired_assignment.desired_zone_version + } + } + }) + .cloned() + .collect(); + + let to_remove = current + .iter() + .filter(|existing| !desired_ids.contains(&(existing.zone_id, existing.node_id))) + .map(|existing| existing.id) + .collect(); + + (to_upsert, to_remove) +} + +pub struct ZoneForAssignment { + pub zone_id: Uuid, + pub selector_json: serde_json::Value, + pub current_policy_id: Uuid, + pub zone_version: i64, +} + +pub struct NodeForAssignment { + pub node_id: Uuid, + pub node_name: String, + pub status: String, + pub labels: NodeLabels, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DesiredAssignment { + pub zone_id: Uuid, + pub node_id: Uuid, + pub desired_policy_id: Uuid, + pub desired_zone_version: i64, +} + +pub struct ExistingAssignment { + pub id: Uuid, + pub zone_id: Uuid, + pub node_id: Uuid, + pub desired_policy_id: Uuid, + pub desired_zone_version: i64, +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use std::collections::BTreeMap; + + fn zone(zid: Uuid, pid: Uuid, selector: serde_json::Value, ver: i64) -> ZoneForAssignment { + ZoneForAssignment { + zone_id: zid, + selector_json: selector, + current_policy_id: pid, + zone_version: ver, + } + } + + fn node(nid: Uuid, name: &str, labels: &[(&str, &str)]) -> NodeForAssignment { + let mut label_map = BTreeMap::new(); + for (key, value) in labels { + label_map.insert((*key).to_string(), (*value).to_string()); + } + + NodeForAssignment { + node_id: nid, + node_name: name.to_string(), + status: "online".into(), + labels: label_map, + } + } + + #[test] + fn all_nodes_matches_everything() { + let zone = zone(Uuid::new_v4(), Uuid::new_v4(), json!({"all_nodes": true}), 1); + let node_a = node(Uuid::new_v4(), "a", &[]); + let node_b = node(Uuid::new_v4(), "b", &[("tier", "prod")]); + + let out = compute_zone_assignments(&zone, &[node_a, node_b]); + assert_eq!(out.len(), 2); + } + + #[test] + fn node_names_matches_specific() { + let zone = zone( + Uuid::new_v4(), + Uuid::new_v4(), + json!({"node_names": ["a", "c"]}), + 1, + ); + let node_a = node(Uuid::new_v4(), "a", &[]); + let node_b = node(Uuid::new_v4(), "b", &[]); + let node_c = node(Uuid::new_v4(), "c", &[]); + + let out = compute_zone_assignments(&zone, &[node_a, node_b, node_c]); + assert_eq!(out.len(), 2); + } + + #[test] + fn match_labels_requires_all_pairs() { + let zone = zone( + Uuid::new_v4(), + Uuid::new_v4(), + json!({"match_labels": {"tier": "prod", "region": "eu"}}), + 1, + ); + let matching = node(Uuid::new_v4(), "a", &[("tier", "prod"), ("region", "eu")]); + let missing_one = node(Uuid::new_v4(), "b", &[("tier", "prod")]); + let wrong_value = node(Uuid::new_v4(), "c", &[("tier", "dev"), ("region", "eu")]); + + let out = compute_zone_assignments(&zone, &[matching, missing_one, wrong_value]); + assert_eq!(out.len(), 1); + } + + #[test] + fn empty_selector_matches_nothing() { + let zone = zone(Uuid::new_v4(), Uuid::new_v4(), json!({}), 1); + let node = node(Uuid::new_v4(), "a", &[]); + + let out = compute_zone_assignments(&zone, &[node]); + assert_eq!(out.len(), 0); + } + + #[test] + fn offline_nodes_excluded() { + let zone = zone(Uuid::new_v4(), Uuid::new_v4(), json!({"all_nodes": true}), 1); + let mut node = node(Uuid::new_v4(), "a", &[]); + node.status = "offline".into(); + + let out = compute_zone_assignments(&zone, &[node]); + assert_eq!(out.len(), 0); + } + + #[test] + fn diff_finds_additions_removals_updates() { + let zone_id = Uuid::new_v4(); + let node_one = Uuid::new_v4(); + let node_two = Uuid::new_v4(); + let old_policy = Uuid::new_v4(); + let new_policy = Uuid::new_v4(); + let assignment_id = Uuid::new_v4(); + + let current = vec![ + ExistingAssignment { + id: assignment_id, + zone_id, + node_id: node_one, + desired_policy_id: old_policy, + desired_zone_version: 1, + }, + ExistingAssignment { + id: Uuid::new_v4(), + zone_id, + node_id: node_two, + desired_policy_id: old_policy, + desired_zone_version: 1, + }, + ]; + + let desired = vec![ + DesiredAssignment { + zone_id, + node_id: node_one, + desired_policy_id: new_policy, + desired_zone_version: 2, + }, + DesiredAssignment { + zone_id, + node_id: Uuid::new_v4(), + desired_policy_id: new_policy, + desired_zone_version: 2, + }, + ]; + + let (upsert, remove) = diff_assignments(¤t, &desired); + assert_eq!(upsert.len(), 2); + assert_eq!(remove.len(), 1); + } +} diff --git a/syva-cp/src/engine/mod.rs b/syva-cp/src/engine/mod.rs new file mode 100644 index 0000000..16646f3 --- /dev/null +++ b/syva-cp/src/engine/mod.rs @@ -0,0 +1,7 @@ +//! Engine modules — pure computation (no DB I/O). +//! +//! Functions here are pure functions of their inputs. DB writes live in +//! `crate::write`. The engine produces what should happen; the writer +//! applies it inside a transaction. + +pub mod assignment; diff --git a/syva-cp/src/lib.rs b/syva-cp/src/lib.rs index 6bd458d..9b94d21 100644 --- a/syva-cp/src/lib.rs +++ b/syva-cp/src/lib.rs @@ -7,6 +7,7 @@ pub mod config; pub mod db; +pub mod engine; pub mod error; pub mod health; pub mod metrics; From 1da45d9215e3ae51d28eb8e8d6a9dbfb98d144c8 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:10:11 +0200 Subject: [PATCH 05/16] feat(syva-cp): register_node with fingerprint-based re-registration and synchronous assignment recomputation --- syva-cp/src/write/mod.rs | 1 + syva-cp/src/write/node.rs | 540 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 541 insertions(+) create mode 100644 syva-cp/src/write/node.rs diff --git a/syva-cp/src/write/mod.rs b/syva-cp/src/write/mod.rs index 66e955e..4ec513b 100644 --- a/syva-cp/src/write/mod.rs +++ b/syva-cp/src/write/mod.rs @@ -35,6 +35,7 @@ use sqlx::postgres::PgPool; use uuid::Uuid; pub mod team; +pub mod node; pub mod zone; pub struct TransactionalWriter<'a> { diff --git a/syva-cp/src/write/node.rs b/syva-cp/src/write/node.rs new file mode 100644 index 0000000..595bb99 --- /dev/null +++ b/syva-cp/src/write/node.rs @@ -0,0 +1,540 @@ +use crate::db::types::{Actor, Node, NodeLabels}; +use crate::engine::assignment::{ + compute_node_assignments, diff_assignments, DesiredAssignment, ExistingAssignment, + NodeForAssignment, ZoneForAssignment, +}; +use crate::error::CpError; +use crate::metrics; +use crate::write::TransactionalWriter; +use chrono::Utc; +use serde_json::{json, Value as JsonValue}; +use sqlx::Row; +use std::time::Instant; +use uuid::Uuid; + +#[derive(Debug, Clone, serde::Serialize)] +pub struct RegisterNodeInput { + pub node_name: String, + pub fingerprint: Option, + pub cluster_id: Option, + pub labels: NodeLabels, + pub capabilities_json: JsonValue, + pub proposed_id: Uuid, +} + +pub struct RegisterNodeOutput { + pub node: Node, + pub labels: NodeLabels, + pub is_new: bool, + pub assignments_upserted: usize, + pub assignments_removed: usize, +} + +impl<'a> TransactionalWriter<'a> { + pub async fn register_node( + &self, + input: RegisterNodeInput, + actor: &Actor, + ) -> Result { + const OPERATION: &str = "node.register"; + + if let Err(err) = validate_register_node(&input) { + self.record_rejected_audit(OPERATION, actor, &input, &err).await; + return Err(err); + } + + match self.try_register_node(input.clone(), actor).await { + Ok(out) => Ok(out), + Err(err) => { + self.record_rejected_audit(OPERATION, actor, &input, &err).await; + Err(err) + } + } + } + + async fn try_register_node( + &self, + input: RegisterNodeInput, + actor: &Actor, + ) -> Result { + const OPERATION: &str = "node.register"; + let start = Instant::now(); + let event_id = Uuid::new_v4(); + let audit_id = Uuid::new_v4(); + let now = Utc::now(); + + let mut tx = self.pool.begin().await.map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "begin_failed"); + CpError::Database(e) + })?; + + let existing: Option<(Uuid, i64)> = if let Some(fingerprint) = input.fingerprint.as_deref() + { + sqlx::query_as("SELECT id, version FROM nodes WHERE fingerprint = $1 FOR UPDATE") + .bind(fingerprint) + .fetch_optional(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "node_lookup_failed"); + CpError::Database(e) + })? + } else { + sqlx::query_as("SELECT id, version FROM nodes WHERE node_name = $1 FOR UPDATE") + .bind(&input.node_name) + .fetch_optional(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "node_lookup_failed"); + CpError::Database(e) + })? + }; + + let resource_id = existing.map(|(id, _)| id).unwrap_or(input.proposed_id); + + sqlx::query( + r#"INSERT INTO control_plane_events + (id, event_type, source, subject_type, subject_id, + resource_type, resource_id, occurred_at, payload_json) + VALUES ($1, 'node.registered', 'node-agent', $2, $3, + 'node', $4, $5, $6)"#, + ) + .bind(event_id) + .bind(&actor.subject_type) + .bind(&actor.subject_id) + .bind(resource_id) + .bind(now) + .bind(json!({ + "node_name": &input.node_name, + "is_new": existing.is_none(), + })) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "event_insert_failed"); + CpError::Database(e) + })?; + + let (node_row, is_new) = match existing { + Some((node_id, current_version)) => { + let row = sqlx::query( + r#"UPDATE nodes + SET node_name = $1, + cluster_id = $2, + fingerprint = $3, + status = 'online', + capabilities_json = $4, + last_seen_at = $5, + updated_at = $5, + version = version + 1, + caused_by_event_id = $6 + WHERE id = $7 AND version = $8 + RETURNING *"#, + ) + .bind(&input.node_name) + .bind(&input.cluster_id) + .bind(&input.fingerprint) + .bind(&input.capabilities_json) + .bind(now) + .bind(event_id) + .bind(node_id) + .bind(current_version) + .fetch_optional(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "node_update_failed"); + CpError::Database(e) + })? + .ok_or_else(|| { + metrics::record_transaction_rollback(OPERATION, "version_race"); + CpError::Internal("node version race during re-registration".into()) + })?; + + (row, false) + } + None => { + let row = match sqlx::query( + r#"INSERT INTO nodes + (id, node_name, cluster_id, fingerprint, status, + capabilities_json, last_seen_at, created_at, updated_at, + version, caused_by_event_id) + VALUES ($1, $2, $3, $4, 'online', $5, $6, $6, $6, 1, $7) + RETURNING *"#, + ) + .bind(input.proposed_id) + .bind(&input.node_name) + .bind(&input.cluster_id) + .bind(&input.fingerprint) + .bind(&input.capabilities_json) + .bind(now) + .bind(event_id) + .fetch_one(&mut *tx) + .await + { + Ok(row) => row, + Err(sqlx::Error::Database(db_err)) if db_err.is_unique_violation() => { + metrics::record_transaction_rollback(OPERATION, "name_or_id_conflict"); + return Err(CpError::Conflict { + message: format!( + "node_name '{}' or id is already taken", + input.node_name + ), + }); + } + Err(e) => { + metrics::record_transaction_rollback(OPERATION, "node_insert_failed"); + return Err(CpError::Database(e)); + } + }; + + (row, true) + } + }; + + let node = node_from_row(&node_row); + + sqlx::query("DELETE FROM node_labels WHERE node_id = $1") + .bind(node.id) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "label_delete_failed"); + CpError::Database(e) + })?; + + for (key, value) in &input.labels { + sqlx::query("INSERT INTO node_labels (node_id, key, value) VALUES ($1, $2, $3)") + .bind(node.id) + .bind(key) + .bind(value) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "label_insert_failed"); + CpError::Database(e) + })?; + } + + let (upserted, removed) = + recompute_node_assignments_in_tx(&mut tx, &node, &input.labels, event_id, now).await?; + + let request_json = serde_json::to_value(&input).unwrap_or_else(|_| json!({})); + sqlx::query( + r#"INSERT INTO audit_log + (id, occurred_at, actor_type, actor_id, action, + resource_type, resource_id, result, request_json, + control_plane_event_id) + VALUES ($1, $2, $3, $4, 'node.register', 'node', $5, + 'success', $6, $7)"#, + ) + .bind(audit_id) + .bind(now) + .bind(&actor.actor_type) + .bind(&actor.actor_id) + .bind(node.id) + .bind(&request_json) + .bind(event_id) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "audit_insert_failed"); + CpError::Database(e) + })?; + + sqlx::query("SELECT pg_notify('syva_cp_assignments', $1)") + .bind(node.id.to_string()) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "notify_failed"); + CpError::Database(e) + })?; + + tx.commit().await.map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "commit_failed"); + CpError::Database(e) + })?; + + metrics::record_transaction_duration(OPERATION, start.elapsed()); + + Ok(RegisterNodeOutput { + node, + labels: input.labels, + is_new, + assignments_upserted: upserted, + assignments_removed: removed, + }) + } +} + +fn validate_register_node(input: &RegisterNodeInput) -> Result<(), CpError> { + if input.node_name.is_empty() || input.node_name.len() > 253 { + return Err(CpError::InvalidArgument( + "node_name must be 1..=253 chars".into(), + )); + } + + if input.capabilities_json.is_null() { + return Err(CpError::InvalidArgument( + "capabilities_json must be valid non-null JSON".into(), + )); + } + + for key in input.labels.keys() { + if key.is_empty() || key.len() > 253 { + return Err(CpError::InvalidArgument( + "label keys must be 1..=253 chars".into(), + )); + } + } + + Ok(()) +} + +pub(crate) fn node_from_row(row: &sqlx::postgres::PgRow) -> Node { + Node { + id: row.get("id"), + node_name: row.get("node_name"), + cluster_id: row.get("cluster_id"), + status: row.get("status"), + fingerprint: row.get("fingerprint"), + last_seen_at: row.get("last_seen_at"), + last_heartbeat_event_id: row.get("last_heartbeat_event_id"), + current_token_expires_at: row.get("current_token_expires_at"), + capabilities_json: row.get("capabilities_json"), + metadata_json: row.get("metadata_json"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + version: row.get("version"), + caused_by_event_id: row.get("caused_by_event_id"), + } +} + +#[allow(dead_code)] +pub(crate) fn node_advisory_lock_key(node_id: Uuid) -> i64 { + let mut hash: u64 = 0xcbf29ce484222325; + for byte in node_id.as_bytes() { + hash ^= *byte as u64; + hash = hash.wrapping_mul(0x100000001b3); + } + hash as i64 +} + +pub(crate) async fn recompute_node_assignments_in_tx( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + node: &Node, + labels: &NodeLabels, + event_id: Uuid, + now: chrono::DateTime, +) -> Result<(usize, usize), CpError> { + let zone_rows = sqlx::query( + r#"SELECT id, selector_json, current_policy_id, version + FROM zones + WHERE deleted_at IS NULL + AND status = 'active' + AND current_policy_id IS NOT NULL"#, + ) + .fetch_all(&mut **tx) + .await + .map_err(CpError::Database)?; + + let zones = zone_rows + .iter() + .map(|row| ZoneForAssignment { + zone_id: row.get("id"), + selector_json: row + .get::, _>("selector_json") + .unwrap_or(JsonValue::Null), + current_policy_id: row.get("current_policy_id"), + zone_version: row.get("version"), + }) + .collect::>(); + + let node_input = NodeForAssignment { + node_id: node.id, + node_name: node.node_name.clone(), + status: node.status.clone(), + labels: labels.clone(), + }; + + let desired = compute_node_assignments(&node_input, &zones); + + let current_rows = sqlx::query( + r#"SELECT id, zone_id, node_id, desired_policy_id, desired_zone_version + FROM assignments + WHERE node_id = $1 AND status NOT IN ('removed', 'failed')"#, + ) + .bind(node.id) + .fetch_all(&mut **tx) + .await + .map_err(CpError::Database)?; + + let current = current_rows + .iter() + .map(|row| ExistingAssignment { + id: row.get("id"), + zone_id: row.get("zone_id"), + node_id: row.get("node_id"), + desired_policy_id: row.get("desired_policy_id"), + desired_zone_version: row.get("desired_zone_version"), + }) + .collect::>(); + + let (upserts, removes) = diff_assignments(¤t, &desired); + + for assignment in &upserts { + apply_assignment_upsert(tx, assignment, event_id, now).await?; + } + + for assignment_id in &removes { + let _ = apply_assignment_remove_and_return_node(tx, *assignment_id, event_id, now).await?; + } + + Ok((upserts.len(), removes.len())) +} + +#[allow(dead_code)] +pub(crate) async fn apply_assignment_upsert_exposed( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + desired: &DesiredAssignment, + event_id: Uuid, + now: chrono::DateTime, +) -> Result<(), CpError> { + apply_assignment_upsert(tx, desired, event_id, now).await +} + +#[allow(dead_code)] +pub(crate) async fn apply_assignment_remove_exposed( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + assignment_id: Uuid, + event_id: Uuid, + now: chrono::DateTime, +) -> Result, CpError> { + apply_assignment_remove_and_return_node(tx, assignment_id, event_id, now).await +} + +async fn apply_assignment_upsert( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + desired: &DesiredAssignment, + event_id: Uuid, + now: chrono::DateTime, +) -> Result<(), CpError> { + let assignment_id = Uuid::new_v4(); + + let row = sqlx::query( + r#"INSERT INTO assignments + (id, zone_id, node_id, status, desired_policy_id, + desired_zone_version, created_at, updated_at, + version, caused_by_event_id) + VALUES ($1, $2, $3, 'desired', $4, $5, $6, $6, 1, $7) + ON CONFLICT (zone_id, node_id) DO UPDATE SET + desired_policy_id = EXCLUDED.desired_policy_id, + desired_zone_version = EXCLUDED.desired_zone_version, + status = CASE + WHEN assignments.actual_policy_id = EXCLUDED.desired_policy_id + AND assignments.actual_zone_version = EXCLUDED.desired_zone_version + THEN 'applied' + ELSE 'desired' + END, + updated_at = EXCLUDED.updated_at, + version = assignments.version + 1, + caused_by_event_id = EXCLUDED.caused_by_event_id + RETURNING *"#, + ) + .bind(assignment_id) + .bind(desired.zone_id) + .bind(desired.node_id) + .bind(desired.desired_policy_id) + .bind(desired.desired_zone_version) + .bind(now) + .bind(event_id) + .fetch_one(&mut **tx) + .await + .map_err(CpError::Database)?; + + let resulting_id: Uuid = row.get("id"); + let resulting_version: i64 = row.get("version"); + let snapshot = assignment_snapshot_json(&row); + + sqlx::query( + r#"INSERT INTO assignment_versions + (id, assignment_id, version, snapshot_json, created_at, + caused_by_event_id) + VALUES ($1, $2, $3, $4, $5, $6)"#, + ) + .bind(Uuid::new_v4()) + .bind(resulting_id) + .bind(resulting_version) + .bind(&snapshot) + .bind(now) + .bind(event_id) + .execute(&mut **tx) + .await + .map_err(CpError::Database)?; + + Ok(()) +} + +async fn apply_assignment_remove_and_return_node( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + assignment_id: Uuid, + event_id: Uuid, + now: chrono::DateTime, +) -> Result, CpError> { + let row = sqlx::query( + r#"UPDATE assignments + SET status = 'removing', + updated_at = $1, + version = version + 1, + caused_by_event_id = $2 + WHERE id = $3 AND status NOT IN ('removed', 'failed') + RETURNING *"#, + ) + .bind(now) + .bind(event_id) + .bind(assignment_id) + .fetch_optional(&mut **tx) + .await + .map_err(CpError::Database)?; + + Ok(match row { + Some(row) => { + let node_id: Uuid = row.get("node_id"); + let version: i64 = row.get("version"); + let snapshot = assignment_snapshot_json(&row); + + sqlx::query( + r#"INSERT INTO assignment_versions + (id, assignment_id, version, snapshot_json, created_at, + caused_by_event_id) + VALUES ($1, $2, $3, $4, $5, $6)"#, + ) + .bind(Uuid::new_v4()) + .bind(assignment_id) + .bind(version) + .bind(&snapshot) + .bind(now) + .bind(event_id) + .execute(&mut **tx) + .await + .map_err(CpError::Database)?; + + Some(node_id) + } + None => None, + }) +} + +fn assignment_snapshot_json(row: &sqlx::postgres::PgRow) -> JsonValue { + json!({ + "id": row.get::("id").to_string(), + "zone_id": row.get::("zone_id").to_string(), + "node_id": row.get::("node_id").to_string(), + "status": row.get::("status"), + "version": row.get::("version"), + "desired_policy_id": row.get::("desired_policy_id").to_string(), + "desired_zone_version": row.get::("desired_zone_version"), + "actual_policy_id": row.get::, _>("actual_policy_id").map(|id| id.to_string()), + "actual_zone_version": row.get::, _>("actual_zone_version"), + "error_json": row.get::, _>("error_json"), + }) +} From b3bc4d1e8f9e2a51316e8d30d80bf2f8323326db Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:11:53 +0200 Subject: [PATCH 06/16] feat(syva-cp): heartbeat, set_node_labels, decommission_node with assignment recomputation --- syva-cp/src/write/node.rs | 460 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 460 insertions(+) diff --git a/syva-cp/src/write/node.rs b/syva-cp/src/write/node.rs index 595bb99..efd8c46 100644 --- a/syva-cp/src/write/node.rs +++ b/syva-cp/src/write/node.rs @@ -9,6 +9,7 @@ use crate::write::TransactionalWriter; use chrono::Utc; use serde_json::{json, Value as JsonValue}; use sqlx::Row; +use std::collections::BTreeMap; use std::time::Instant; use uuid::Uuid; @@ -266,6 +267,465 @@ impl<'a> TransactionalWriter<'a> { } } +#[derive(Debug, Clone, serde::Serialize)] +pub struct HeartbeatInput { + pub node_id: Uuid, + pub status_hint: Option, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct SetNodeLabelsInput { + pub node_id: Uuid, + pub if_version: i64, + pub labels: NodeLabels, +} + +pub struct SetNodeLabelsOutput { + pub node: Node, + pub labels: NodeLabels, + pub assignments_upserted: usize, + pub assignments_removed: usize, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct DecommissionNodeInput { + pub node_id: Uuid, + pub if_version: i64, +} + +impl<'a> TransactionalWriter<'a> { + /// Heartbeat is a telemetry write, not a policy mutation. + /// + /// It still writes a control-plane event, but intentionally skips audit to + /// avoid turning periodic liveness pings into the dominant audit volume. + /// This exception is specific to heartbeats and must not be copied to + /// policy-changing operations. + pub async fn heartbeat_node( + &self, + input: HeartbeatInput, + actor: &Actor, + ) -> Result<(), CpError> { + const OPERATION: &str = "node.heartbeat"; + let start = Instant::now(); + let event_id = Uuid::new_v4(); + let now = Utc::now(); + + let mut tx = self.pool.begin().await.map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "begin_failed"); + CpError::Database(e) + })?; + + sqlx::query( + r#"INSERT INTO control_plane_events + (id, event_type, source, subject_type, subject_id, + resource_type, resource_id, occurred_at, payload_json) + VALUES ($1, 'node.heartbeat', 'node-agent', $2, $3, + 'node', $4, $5, $6)"#, + ) + .bind(event_id) + .bind(&actor.subject_type) + .bind(&actor.subject_id) + .bind(input.node_id) + .bind(now) + .bind(json!({ "status_hint": &input.status_hint })) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "event_insert_failed"); + CpError::Database(e) + })?; + + let updated = sqlx::query( + r#"UPDATE nodes + SET last_seen_at = $1, + last_heartbeat_event_id = $2, + status = CASE WHEN status = 'offline' THEN 'online' ELSE status END + WHERE id = $3 + RETURNING id"#, + ) + .bind(now) + .bind(event_id) + .bind(input.node_id) + .fetch_optional(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "node_update_failed"); + CpError::Database(e) + })?; + + if updated.is_none() { + metrics::record_transaction_rollback(OPERATION, "node_not_found"); + return Err(CpError::NotFound { + resource: "node", + identifier: input.node_id.to_string(), + }); + } + + tx.commit().await.map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "commit_failed"); + CpError::Database(e) + })?; + + metrics::record_transaction_duration(OPERATION, start.elapsed()); + Ok(()) + } + + pub async fn set_node_labels( + &self, + input: SetNodeLabelsInput, + actor: &Actor, + ) -> Result { + const OPERATION: &str = "node.set_labels"; + + match self.try_set_node_labels(input.clone(), actor).await { + Ok(out) => Ok(out), + Err(err) => { + self.record_rejected_audit(OPERATION, actor, &input, &err).await; + Err(err) + } + } + } + + async fn try_set_node_labels( + &self, + input: SetNodeLabelsInput, + actor: &Actor, + ) -> Result { + const OPERATION: &str = "node.set_labels"; + let start = Instant::now(); + let event_id = Uuid::new_v4(); + let audit_id = Uuid::new_v4(); + let now = Utc::now(); + + let mut tx = self.pool.begin().await.map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "begin_failed"); + CpError::Database(e) + })?; + + sqlx::query("SELECT pg_advisory_xact_lock($1)") + .bind(node_advisory_lock_key(input.node_id)) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "advisory_lock_failed"); + CpError::Database(e) + })?; + + let current: Option = + sqlx::query_scalar("SELECT version FROM nodes WHERE id = $1 FOR UPDATE") + .bind(input.node_id) + .fetch_optional(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "node_lookup_failed"); + CpError::Database(e) + })?; + + match current { + None => { + metrics::record_transaction_rollback(OPERATION, "node_not_found"); + return Err(CpError::NotFound { + resource: "node", + identifier: input.node_id.to_string(), + }); + } + Some(version) if version != input.if_version => { + metrics::record_transaction_rollback(OPERATION, "version_conflict"); + return Err(CpError::VersionConflict { + resource: "node", + resource_id: input.node_id, + expected: input.if_version, + }); + } + Some(_) => {} + } + + sqlx::query( + r#"INSERT INTO control_plane_events + (id, event_type, source, subject_type, subject_id, + resource_type, resource_id, occurred_at, payload_json) + VALUES ($1, 'node.labeled', 'api', $2, $3, + 'node', $4, $5, $6)"#, + ) + .bind(event_id) + .bind(&actor.subject_type) + .bind(&actor.subject_id) + .bind(input.node_id) + .bind(now) + .bind(json!({ "label_count": input.labels.len() })) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "event_insert_failed"); + CpError::Database(e) + })?; + + let node_row = sqlx::query( + r#"UPDATE nodes + SET version = version + 1, + updated_at = $1, + caused_by_event_id = $2 + WHERE id = $3 AND version = $4 + RETURNING *"#, + ) + .bind(now) + .bind(event_id) + .bind(input.node_id) + .bind(input.if_version) + .fetch_optional(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "node_update_failed"); + CpError::Database(e) + })? + .ok_or_else(|| { + metrics::record_transaction_rollback(OPERATION, "version_conflict_race"); + CpError::VersionConflict { + resource: "node", + resource_id: input.node_id, + expected: input.if_version, + } + })?; + + let node = node_from_row(&node_row); + + sqlx::query("DELETE FROM node_labels WHERE node_id = $1") + .bind(input.node_id) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "label_delete_failed"); + CpError::Database(e) + })?; + + for (key, value) in &input.labels { + sqlx::query("INSERT INTO node_labels (node_id, key, value) VALUES ($1, $2, $3)") + .bind(input.node_id) + .bind(key) + .bind(value) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "label_insert_failed"); + CpError::Database(e) + })?; + } + + let (upserts, removes) = + recompute_node_assignments_in_tx(&mut tx, &node, &input.labels, event_id, now).await?; + + let request_json = serde_json::to_value(&input).unwrap_or_else(|_| json!({})); + sqlx::query( + r#"INSERT INTO audit_log + (id, occurred_at, actor_type, actor_id, action, + resource_type, resource_id, result, request_json, + control_plane_event_id) + VALUES ($1, $2, $3, $4, 'node.set_labels', 'node', $5, + 'success', $6, $7)"#, + ) + .bind(audit_id) + .bind(now) + .bind(&actor.actor_type) + .bind(&actor.actor_id) + .bind(input.node_id) + .bind(&request_json) + .bind(event_id) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "audit_insert_failed"); + CpError::Database(e) + })?; + + sqlx::query("SELECT pg_notify('syva_cp_assignments', $1)") + .bind(input.node_id.to_string()) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "notify_failed"); + CpError::Database(e) + })?; + + tx.commit().await.map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "commit_failed"); + CpError::Database(e) + })?; + + metrics::record_transaction_duration(OPERATION, start.elapsed()); + + Ok(SetNodeLabelsOutput { + node, + labels: input.labels, + assignments_upserted: upserts, + assignments_removed: removes, + }) + } + + pub async fn decommission_node( + &self, + input: DecommissionNodeInput, + actor: &Actor, + ) -> Result { + const OPERATION: &str = "node.decommission"; + + match self.try_decommission_node(input.clone(), actor).await { + Ok(node) => Ok(node), + Err(err) => { + self.record_rejected_audit(OPERATION, actor, &input, &err).await; + Err(err) + } + } + } + + async fn try_decommission_node( + &self, + input: DecommissionNodeInput, + actor: &Actor, + ) -> Result { + const OPERATION: &str = "node.decommission"; + let start = Instant::now(); + let event_id = Uuid::new_v4(); + let audit_id = Uuid::new_v4(); + let now = Utc::now(); + + let mut tx = self.pool.begin().await.map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "begin_failed"); + CpError::Database(e) + })?; + + sqlx::query("SELECT pg_advisory_xact_lock($1)") + .bind(node_advisory_lock_key(input.node_id)) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "advisory_lock_failed"); + CpError::Database(e) + })?; + + let current: Option = + sqlx::query_scalar("SELECT version FROM nodes WHERE id = $1 FOR UPDATE") + .bind(input.node_id) + .fetch_optional(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "node_lookup_failed"); + CpError::Database(e) + })?; + + match current { + None => { + metrics::record_transaction_rollback(OPERATION, "node_not_found"); + return Err(CpError::NotFound { + resource: "node", + identifier: input.node_id.to_string(), + }); + } + Some(version) if version != input.if_version => { + metrics::record_transaction_rollback(OPERATION, "version_conflict"); + return Err(CpError::VersionConflict { + resource: "node", + resource_id: input.node_id, + expected: input.if_version, + }); + } + Some(_) => {} + } + + sqlx::query( + r#"INSERT INTO control_plane_events + (id, event_type, source, subject_type, subject_id, + resource_type, resource_id, occurred_at, payload_json) + VALUES ($1, 'node.decommissioned', 'api', $2, $3, + 'node', $4, $5, $6)"#, + ) + .bind(event_id) + .bind(&actor.subject_type) + .bind(&actor.subject_id) + .bind(input.node_id) + .bind(now) + .bind(json!({})) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "event_insert_failed"); + CpError::Database(e) + })?; + + let node_row = sqlx::query( + r#"UPDATE nodes + SET status = 'decommissioned', + version = version + 1, + updated_at = $1, + caused_by_event_id = $2 + WHERE id = $3 AND version = $4 + RETURNING *"#, + ) + .bind(now) + .bind(event_id) + .bind(input.node_id) + .bind(input.if_version) + .fetch_optional(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "node_update_failed"); + CpError::Database(e) + })? + .ok_or_else(|| { + metrics::record_transaction_rollback(OPERATION, "version_conflict_race"); + CpError::VersionConflict { + resource: "node", + resource_id: input.node_id, + expected: input.if_version, + } + })?; + + let node = node_from_row(&node_row); + let empty_labels: NodeLabels = BTreeMap::new(); + let _ = recompute_node_assignments_in_tx(&mut tx, &node, &empty_labels, event_id, now) + .await?; + + let request_json = serde_json::to_value(&input).unwrap_or_else(|_| json!({})); + sqlx::query( + r#"INSERT INTO audit_log + (id, occurred_at, actor_type, actor_id, action, + resource_type, resource_id, result, request_json, + control_plane_event_id) + VALUES ($1, $2, $3, $4, 'node.decommission', 'node', $5, + 'success', $6, $7)"#, + ) + .bind(audit_id) + .bind(now) + .bind(&actor.actor_type) + .bind(&actor.actor_id) + .bind(input.node_id) + .bind(&request_json) + .bind(event_id) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "audit_insert_failed"); + CpError::Database(e) + })?; + + sqlx::query("SELECT pg_notify('syva_cp_assignments', $1)") + .bind(input.node_id.to_string()) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "notify_failed"); + CpError::Database(e) + })?; + + tx.commit().await.map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "commit_failed"); + CpError::Database(e) + })?; + + metrics::record_transaction_duration(OPERATION, start.elapsed()); + Ok(node) + } +} + fn validate_register_node(input: &RegisterNodeInput) -> Result<(), CpError> { if input.node_name.is_empty() || input.node_name.len() > 253 { return Err(CpError::InvalidArgument( From 5c98a02a467a0e1c51b683a99bdaa6ff8d9984ef Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:13:11 +0200 Subject: [PATCH 07/16] feat(syva-cp): report_assignment_state with status transition and error handling --- syva-cp/src/write/assignment.rs | 260 ++++++++++++++++++++++++++++++++ syva-cp/src/write/mod.rs | 1 + 2 files changed, 261 insertions(+) create mode 100644 syva-cp/src/write/assignment.rs diff --git a/syva-cp/src/write/assignment.rs b/syva-cp/src/write/assignment.rs new file mode 100644 index 0000000..b22227a --- /dev/null +++ b/syva-cp/src/write/assignment.rs @@ -0,0 +1,260 @@ +use crate::db::types::Actor; +use crate::error::CpError; +use crate::metrics; +use crate::write::TransactionalWriter; +use chrono::Utc; +use serde_json::{json, Value as JsonValue}; +use sqlx::Row; +use std::time::Instant; +use uuid::Uuid; + +#[derive(Debug, Clone, serde::Serialize)] +pub struct AppliedReport { + pub assignment_id: Uuid, + pub actual_zone_version: i64, + pub actual_policy_id: Uuid, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct FailedReport { + pub assignment_id: Uuid, + pub error_json: JsonValue, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct ReportAssignmentStateInput { + pub node_id: Uuid, + pub applied: Vec, + pub failed: Vec, +} + +pub struct ReportAssignmentStateOutput { + pub accepted: usize, + pub rejected: usize, +} + +impl<'a> TransactionalWriter<'a> { + pub async fn report_assignment_state( + &self, + input: ReportAssignmentStateInput, + actor: &Actor, + ) -> Result { + const OPERATION: &str = "assignment.report"; + + match self.try_report_assignment_state(input.clone(), actor).await { + Ok(out) => Ok(out), + Err(err) => { + self.record_rejected_audit(OPERATION, actor, &input, &err).await; + Err(err) + } + } + } + + async fn try_report_assignment_state( + &self, + input: ReportAssignmentStateInput, + actor: &Actor, + ) -> Result { + const OPERATION: &str = "assignment.report"; + let start = Instant::now(); + let event_id = Uuid::new_v4(); + let audit_id = Uuid::new_v4(); + let now = Utc::now(); + + let mut tx = self.pool.begin().await.map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "begin_failed"); + CpError::Database(e) + })?; + + sqlx::query( + r#"INSERT INTO control_plane_events + (id, event_type, source, subject_type, subject_id, + resource_type, resource_id, occurred_at, payload_json) + VALUES ($1, 'assignment.reported', 'node-agent', $2, $3, + 'node', $4, $5, $6)"#, + ) + .bind(event_id) + .bind(&actor.subject_type) + .bind(&actor.subject_id) + .bind(input.node_id) + .bind(now) + .bind(json!({ + "applied_count": input.applied.len(), + "failed_count": input.failed.len(), + })) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "event_insert_failed"); + CpError::Database(e) + })?; + + let mut accepted = 0usize; + let mut rejected = 0usize; + + for applied in &input.applied { + let row = sqlx::query( + r#"UPDATE assignments + SET actual_policy_id = $1, + actual_zone_version = $2, + last_reported_at = $3, + status = CASE + WHEN $1 = desired_policy_id + AND $2 = desired_zone_version + THEN 'applied' + ELSE 'drifted' + END, + updated_at = $3, + version = version + 1, + caused_by_event_id = $4, + error_json = NULL + WHERE id = $5 AND node_id = $6 + RETURNING *"#, + ) + .bind(applied.actual_policy_id) + .bind(applied.actual_zone_version) + .bind(now) + .bind(event_id) + .bind(applied.assignment_id) + .bind(input.node_id) + .fetch_optional(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "applied_update_failed"); + CpError::Database(e) + })?; + + if let Some(row) = row { + accepted += 1; + insert_assignment_version( + &mut tx, + row.get("id"), + row.get("version"), + assignment_snapshot_json(&row), + now, + event_id, + ) + .await?; + } else { + rejected += 1; + } + } + + for failed in &input.failed { + let row = sqlx::query( + r#"UPDATE assignments + SET status = 'failed', + error_json = $1, + last_reported_at = $2, + updated_at = $2, + version = version + 1, + caused_by_event_id = $3 + WHERE id = $4 AND node_id = $5 + RETURNING *"#, + ) + .bind(&failed.error_json) + .bind(now) + .bind(event_id) + .bind(failed.assignment_id) + .bind(input.node_id) + .fetch_optional(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "failed_update_failed"); + CpError::Database(e) + })?; + + if let Some(row) = row { + accepted += 1; + insert_assignment_version( + &mut tx, + row.get("id"), + row.get("version"), + assignment_snapshot_json(&row), + now, + event_id, + ) + .await?; + } else { + rejected += 1; + } + } + + let request_json = serde_json::to_value(&input).unwrap_or_else(|_| json!({})); + sqlx::query( + r#"INSERT INTO audit_log + (id, occurred_at, actor_type, actor_id, action, + resource_type, resource_id, result, request_json, + control_plane_event_id) + VALUES ($1, $2, $3, $4, 'assignment.report', 'node', $5, + 'success', $6, $7)"#, + ) + .bind(audit_id) + .bind(now) + .bind(&actor.actor_type) + .bind(&actor.actor_id) + .bind(input.node_id) + .bind(&request_json) + .bind(event_id) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "audit_insert_failed"); + CpError::Database(e) + })?; + + tx.commit().await.map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "commit_failed"); + CpError::Database(e) + })?; + + metrics::record_transaction_duration(OPERATION, start.elapsed()); + + Ok(ReportAssignmentStateOutput { accepted, rejected }) + } +} + +async fn insert_assignment_version( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + assignment_id: Uuid, + version: i64, + snapshot_json: JsonValue, + now: chrono::DateTime, + event_id: Uuid, +) -> Result<(), CpError> { + sqlx::query( + r#"INSERT INTO assignment_versions + (id, assignment_id, version, snapshot_json, + created_at, caused_by_event_id) + VALUES ($1, $2, $3, $4, $5, $6)"#, + ) + .bind(Uuid::new_v4()) + .bind(assignment_id) + .bind(version) + .bind(&snapshot_json) + .bind(now) + .bind(event_id) + .execute(&mut **tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback("assignment.report", "version_insert_failed"); + CpError::Database(e) + })?; + + Ok(()) +} + +fn assignment_snapshot_json(row: &sqlx::postgres::PgRow) -> JsonValue { + json!({ + "id": row.get::("id").to_string(), + "zone_id": row.get::("zone_id").to_string(), + "node_id": row.get::("node_id").to_string(), + "status": row.get::("status"), + "version": row.get::("version"), + "desired_policy_id": row.get::("desired_policy_id").to_string(), + "desired_zone_version": row.get::("desired_zone_version"), + "actual_policy_id": row.get::, _>("actual_policy_id").map(|id| id.to_string()), + "actual_zone_version": row.get::, _>("actual_zone_version"), + "error_json": row.get::, _>("error_json"), + }) +} diff --git a/syva-cp/src/write/mod.rs b/syva-cp/src/write/mod.rs index 4ec513b..f87aa12 100644 --- a/syva-cp/src/write/mod.rs +++ b/syva-cp/src/write/mod.rs @@ -34,6 +34,7 @@ use serde_json::{json, Value}; use sqlx::postgres::PgPool; use uuid::Uuid; +pub mod assignment; pub mod team; pub mod node; pub mod zone; From 4213d56522064b525bb6d355e4c65184b8691874 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:15:17 +0200 Subject: [PATCH 08/16] feat(syva-cp): synchronous assignment recomputation inside zone lifecycle writes --- syva-cp/src/write/zone.rs | 173 +++++++++++++++++++++++++++++++++++++- 1 file changed, 172 insertions(+), 1 deletion(-) diff --git a/syva-cp/src/write/zone.rs b/syva-cp/src/write/zone.rs index 6474747..edc76cf 100644 --- a/syva-cp/src/write/zone.rs +++ b/syva-cp/src/write/zone.rs @@ -1,10 +1,15 @@ -use crate::db::types::{Actor, Policy, PolicyInput, Zone}; +use crate::db::types::{Actor, NodeLabels, Policy, PolicyInput, Zone}; +use crate::engine::assignment::{ + compute_zone_assignments, diff_assignments, ExistingAssignment, NodeForAssignment, + ZoneForAssignment, +}; use crate::error::CpError; use crate::metrics; use crate::write::TransactionalWriter; use chrono::Utc; use serde_json::{json, Value as JsonValue}; use sqlx::Row; +use std::collections::{BTreeMap, HashSet}; use std::time::Instant; use uuid::Uuid; @@ -255,6 +260,9 @@ impl<'a> TransactionalWriter<'a> { CpError::Database(e) })?; + let affected_nodes = + recompute_zone_assignments_in_tx(&mut tx, zone.id, event_id, now, false).await?; + let request_json = serde_json::to_value(&input).unwrap_or_else(|_| json!({})); sqlx::query( r#"INSERT INTO audit_log @@ -279,6 +287,17 @@ impl<'a> TransactionalWriter<'a> { CpError::Database(e) })?; + for node_id in &affected_nodes { + sqlx::query("SELECT pg_notify('syva_cp_assignments', $1)") + .bind(node_id.to_string()) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "notify_failed"); + CpError::Database(e) + })?; + } + tx.commit().await.map_err(|e| { metrics::record_transaction_rollback(OPERATION, "commit_failed"); CpError::Database(e) @@ -537,6 +556,12 @@ impl<'a> TransactionalWriter<'a> { CpError::Database(e) })?; + let affected_nodes = if input.policy.is_some() || input.selector_json.is_some() { + recompute_zone_assignments_in_tx(&mut tx, input.zone_id, event_id, now, false).await? + } else { + HashSet::new() + }; + let request_json = serde_json::to_value(&input).unwrap_or_else(|_| json!({})); sqlx::query( r#"INSERT INTO audit_log @@ -561,6 +586,17 @@ impl<'a> TransactionalWriter<'a> { CpError::Database(e) })?; + for node_id in &affected_nodes { + sqlx::query("SELECT pg_notify('syva_cp_assignments', $1)") + .bind(node_id.to_string()) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "notify_failed"); + CpError::Database(e) + })?; + } + tx.commit().await.map_err(|e| { metrics::record_transaction_rollback(OPERATION, "commit_failed"); CpError::Database(e) @@ -731,6 +767,15 @@ impl<'a> TransactionalWriter<'a> { CpError::Database(e) })?; + let affected_nodes = recompute_zone_assignments_in_tx( + &mut tx, + input.zone_id, + event_id, + now, + zone.status == "deleted", + ) + .await?; + let request_json = serde_json::to_value(&input).unwrap_or_else(|_| json!({})); sqlx::query( r#"INSERT INTO audit_log @@ -755,6 +800,17 @@ impl<'a> TransactionalWriter<'a> { CpError::Database(e) })?; + for node_id in &affected_nodes { + sqlx::query("SELECT pg_notify('syva_cp_assignments', $1)") + .bind(node_id.to_string()) + .execute(&mut *tx) + .await + .map_err(|e| { + metrics::record_transaction_rollback(OPERATION, "notify_failed"); + CpError::Database(e) + })?; + } + tx.commit().await.map_err(|e| { metrics::record_transaction_rollback(OPERATION, "commit_failed"); CpError::Database(e) @@ -802,6 +858,121 @@ fn hash_name_lock(team_id: Uuid, name: &str) -> i64 { hash as i64 } +pub(crate) async fn recompute_zone_assignments_in_tx( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + zone_id: Uuid, + event_id: Uuid, + now: chrono::DateTime, + zone_is_deleted: bool, +) -> Result, CpError> { + let desired = if zone_is_deleted { + Vec::new() + } else { + let zone_row = sqlx::query( + r#"SELECT id, selector_json, current_policy_id, version, status + FROM zones WHERE id = $1"#, + ) + .bind(zone_id) + .fetch_one(&mut **tx) + .await + .map_err(CpError::Database)?; + + let status: String = zone_row.get("status"); + let policy_id: Option = zone_row.get("current_policy_id"); + if status != "active" { + Vec::new() + } else { + let Some(current_policy_id) = policy_id else { + return Ok(HashSet::new()); + }; + + let zone = ZoneForAssignment { + zone_id, + selector_json: zone_row + .get::, _>("selector_json") + .unwrap_or(JsonValue::Null), + current_policy_id, + zone_version: zone_row.get("version"), + }; + + let node_rows = sqlx::query( + r#"SELECT id, node_name, status + FROM nodes + WHERE status = 'online'"#, + ) + .fetch_all(&mut **tx) + .await + .map_err(CpError::Database)?; + + let mut nodes = Vec::with_capacity(node_rows.len()); + for node_row in node_rows { + let node_id: Uuid = node_row.get("id"); + let label_rows = sqlx::query( + "SELECT key, value FROM node_labels WHERE node_id = $1", + ) + .bind(node_id) + .fetch_all(&mut **tx) + .await + .map_err(CpError::Database)?; + + let mut labels: NodeLabels = BTreeMap::new(); + for label_row in label_rows { + labels.insert(label_row.get("key"), label_row.get("value")); + } + + nodes.push(NodeForAssignment { + node_id, + node_name: node_row.get("node_name"), + status: node_row.get("status"), + labels, + }); + } + + compute_zone_assignments(&zone, &nodes) + } + }; + + let current_rows = sqlx::query( + r#"SELECT id, zone_id, node_id, desired_policy_id, desired_zone_version + FROM assignments + WHERE zone_id = $1 AND status NOT IN ('removed', 'failed')"#, + ) + .bind(zone_id) + .fetch_all(&mut **tx) + .await + .map_err(CpError::Database)?; + + let current = current_rows + .iter() + .map(|row| ExistingAssignment { + id: row.get("id"), + zone_id: row.get("zone_id"), + node_id: row.get("node_id"), + desired_policy_id: row.get("desired_policy_id"), + desired_zone_version: row.get("desired_zone_version"), + }) + .collect::>(); + + let (upserts, removes) = diff_assignments(¤t, &desired); + + let mut affected_nodes = HashSet::new(); + for desired in &upserts { + crate::write::node::apply_assignment_upsert_exposed(tx, desired, event_id, now).await?; + affected_nodes.insert(desired.node_id); + } + + for assignment_id in &removes { + if let Some(node_id) = + crate::write::node::apply_assignment_remove_exposed(tx, *assignment_id, event_id, now) + .await? + { + affected_nodes.insert(node_id); + } + } + + Ok(affected_nodes) +} + #[allow(dead_code)] pub(crate) fn zone_advisory_lock_key(zone_id: Uuid) -> i64 { let bytes = zone_id.as_bytes(); From ffaffc7558fe651b1ae49c7b9bdf9306553d9368 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:16:14 +0200 Subject: [PATCH 09/16] feat(syva-cp): read queries for nodes, node labels, and assignments --- syva-cp/src/read/assignment.rs | 125 +++++++++++++++++++++++++++++++++ syva-cp/src/read/mod.rs | 2 + syva-cp/src/read/node.rs | 85 ++++++++++++++++++++++ 3 files changed, 212 insertions(+) create mode 100644 syva-cp/src/read/assignment.rs create mode 100644 syva-cp/src/read/node.rs diff --git a/syva-cp/src/read/assignment.rs b/syva-cp/src/read/assignment.rs new file mode 100644 index 0000000..af81745 --- /dev/null +++ b/syva-cp/src/read/assignment.rs @@ -0,0 +1,125 @@ +use crate::db::types::Assignment; +use crate::error::CpError; +use serde_json::Value as JsonValue; +use sqlx::postgres::PgPool; +use sqlx::Row; +use uuid::Uuid; + +pub async fn get_assignment(pool: &PgPool, id: Uuid) -> Result { + let row = sqlx::query("SELECT * FROM assignments WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await? + .ok_or(CpError::NotFound { + resource: "assignment", + identifier: id.to_string(), + })?; + + Ok(assignment_from_row(&row)) +} + +pub async fn list_for_node(pool: &PgPool, node_id: Uuid) -> Result, CpError> { + let rows = sqlx::query( + r#"SELECT * FROM assignments + WHERE node_id = $1 AND status NOT IN ('removed', 'failed') + ORDER BY created_at ASC"#, + ) + .bind(node_id) + .fetch_all(pool) + .await?; + + Ok(rows.iter().map(assignment_from_row).collect()) +} + +pub async fn list_filtered( + pool: &PgPool, + zone_id: Option, + node_id: Option, + status: Option<&str>, + limit: i64, +) -> Result, CpError> { + let limit = limit.clamp(1, 500); + let rows = match (zone_id, node_id, status) { + (Some(zone_id), Some(node_id), Some(status)) => sqlx::query( + "SELECT * FROM assignments WHERE zone_id=$1 AND node_id=$2 AND status=$3 ORDER BY updated_at DESC LIMIT $4", + ) + .bind(zone_id) + .bind(node_id) + .bind(status) + .bind(limit) + .fetch_all(pool) + .await?, + (Some(zone_id), Some(node_id), None) => sqlx::query( + "SELECT * FROM assignments WHERE zone_id=$1 AND node_id=$2 ORDER BY updated_at DESC LIMIT $3", + ) + .bind(zone_id) + .bind(node_id) + .bind(limit) + .fetch_all(pool) + .await?, + (Some(zone_id), None, Some(status)) => sqlx::query( + "SELECT * FROM assignments WHERE zone_id=$1 AND status=$2 ORDER BY updated_at DESC LIMIT $3", + ) + .bind(zone_id) + .bind(status) + .bind(limit) + .fetch_all(pool) + .await?, + (None, Some(node_id), Some(status)) => sqlx::query( + "SELECT * FROM assignments WHERE node_id=$1 AND status=$2 ORDER BY updated_at DESC LIMIT $3", + ) + .bind(node_id) + .bind(status) + .bind(limit) + .fetch_all(pool) + .await?, + (Some(zone_id), None, None) => sqlx::query( + "SELECT * FROM assignments WHERE zone_id=$1 ORDER BY updated_at DESC LIMIT $2", + ) + .bind(zone_id) + .bind(limit) + .fetch_all(pool) + .await?, + (None, Some(node_id), None) => sqlx::query( + "SELECT * FROM assignments WHERE node_id=$1 ORDER BY updated_at DESC LIMIT $2", + ) + .bind(node_id) + .bind(limit) + .fetch_all(pool) + .await?, + (None, None, Some(status)) => sqlx::query( + "SELECT * FROM assignments WHERE status=$1 ORDER BY updated_at DESC LIMIT $2", + ) + .bind(status) + .bind(limit) + .fetch_all(pool) + .await?, + (None, None, None) => sqlx::query( + "SELECT * FROM assignments ORDER BY updated_at DESC LIMIT $1", + ) + .bind(limit) + .fetch_all(pool) + .await?, + }; + + Ok(rows.iter().map(assignment_from_row).collect()) +} + +pub(crate) fn assignment_from_row(row: &sqlx::postgres::PgRow) -> Assignment { + Assignment { + id: row.get("id"), + zone_id: row.get("zone_id"), + node_id: row.get("node_id"), + status: row.get("status"), + desired_policy_id: row.get("desired_policy_id"), + desired_zone_version: row.get("desired_zone_version"), + actual_policy_id: row.get("actual_policy_id"), + actual_zone_version: row.get("actual_zone_version"), + last_reported_at: row.get("last_reported_at"), + error_json: row.get::, _>("error_json"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + version: row.get("version"), + caused_by_event_id: row.get("caused_by_event_id"), + } +} diff --git a/syva-cp/src/read/mod.rs b/syva-cp/src/read/mod.rs index 6b1947e..535fe78 100644 --- a/syva-cp/src/read/mod.rs +++ b/syva-cp/src/read/mod.rs @@ -1,4 +1,6 @@ //! Read queries. Never imports from `crate::write` and never mutates. +pub mod assignment; +pub mod node; pub mod team; pub mod zone; diff --git a/syva-cp/src/read/node.rs b/syva-cp/src/read/node.rs new file mode 100644 index 0000000..244b4be --- /dev/null +++ b/syva-cp/src/read/node.rs @@ -0,0 +1,85 @@ +use crate::db::types::{Node, NodeLabels}; +use crate::error::CpError; +use sqlx::postgres::PgPool; +use sqlx::Row; +use std::collections::BTreeMap; +use uuid::Uuid; + +pub async fn get_node(pool: &PgPool, node_id: Uuid) -> Result<(Node, NodeLabels), CpError> { + let row = sqlx::query("SELECT * FROM nodes WHERE id = $1") + .bind(node_id) + .fetch_optional(pool) + .await? + .ok_or(CpError::NotFound { + resource: "node", + identifier: node_id.to_string(), + })?; + + let node = crate::write::node::node_from_row(&row); + let labels = load_labels(pool, node_id).await?; + Ok((node, labels)) +} + +pub async fn get_node_by_name( + pool: &PgPool, + node_name: &str, +) -> Result<(Node, NodeLabels), CpError> { + let row = sqlx::query("SELECT * FROM nodes WHERE node_name = $1") + .bind(node_name) + .fetch_optional(pool) + .await? + .ok_or_else(|| CpError::NotFound { + resource: "node", + identifier: node_name.to_string(), + })?; + + let node = crate::write::node::node_from_row(&row); + let labels = load_labels(pool, node.id).await?; + Ok((node, labels)) +} + +pub async fn list_nodes( + pool: &PgPool, + status_filter: Option<&str>, + limit: i64, +) -> Result, CpError> { + let limit = limit.clamp(1, 500); + let rows = match status_filter { + Some(status) => { + sqlx::query("SELECT * FROM nodes WHERE status = $1 ORDER BY created_at DESC LIMIT $2") + .bind(status) + .bind(limit) + .fetch_all(pool) + .await? + } + None => { + sqlx::query("SELECT * FROM nodes ORDER BY created_at DESC LIMIT $1") + .bind(limit) + .fetch_all(pool) + .await? + } + }; + + let mut out = Vec::with_capacity(rows.len()); + for row in rows { + let node = crate::write::node::node_from_row(&row); + let labels = load_labels(pool, node.id).await?; + out.push((node, labels)); + } + + Ok(out) +} + +pub async fn load_labels(pool: &PgPool, node_id: Uuid) -> Result { + let rows = sqlx::query("SELECT key, value FROM node_labels WHERE node_id = $1") + .bind(node_id) + .fetch_all(pool) + .await?; + + let mut labels = BTreeMap::new(); + for row in rows { + labels.insert(row.get::("key"), row.get::("value")); + } + + Ok(labels) +} From ce13a08c098b319859dc3f3376fe50192666a437 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:17:11 +0200 Subject: [PATCH 10/16] feat(syva-cp): assignment bus using pg_notify and tokio broadcast --- syva-cp/src/bus.rs | 69 ++++++++++++++++++++++++++++++++++++++++++ syva-cp/src/lib.rs | 7 ++++- syva-cp/src/rpc/mod.rs | 3 +- 3 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 syva-cp/src/bus.rs diff --git a/syva-cp/src/bus.rs b/syva-cp/src/bus.rs new file mode 100644 index 0000000..377ff90 --- /dev/null +++ b/syva-cp/src/bus.rs @@ -0,0 +1,69 @@ +use anyhow::Result; +use sqlx::postgres::{PgListener, PgPool}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::{broadcast, RwLock}; +use tokio::task::JoinHandle; +use uuid::Uuid; + +const CHANNEL: &str = "syva_cp_assignments"; + +#[derive(Clone)] +pub struct AssignmentBus { + inner: Arc>>>, +} + +impl AssignmentBus { + pub fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn subscribe(&self, node_id: Uuid) -> broadcast::Receiver<()> { + let mut map = self.inner.write().await; + let entry = map + .entry(node_id) + .or_insert_with(|| broadcast::channel(64).0); + entry.subscribe() + } + + async fn notify(&self, node_id: Uuid) { + let map = self.inner.read().await; + if let Some(sender) = map.get(&node_id) { + let _ = sender.send(()); + } + } +} + +impl Default for AssignmentBus { + fn default() -> Self { + Self::new() + } +} + +pub async fn spawn_listener(pool: PgPool, bus: AssignmentBus) -> Result> { + let mut listener = PgListener::connect_with(&pool).await?; + listener.listen(CHANNEL).await?; + + let handle = tokio::spawn(async move { + loop { + match listener.recv().await { + Ok(notification) => { + let payload = notification.payload(); + if let Ok(node_id) = Uuid::parse_str(payload) { + bus.notify(node_id).await; + } else { + tracing::warn!("bus: bad notify payload: {payload}"); + } + } + Err(err) => { + tracing::error!("bus: listener error: {err}"); + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + } + } + } + }); + + Ok(handle) +} diff --git a/syva-cp/src/lib.rs b/syva-cp/src/lib.rs index 9b94d21..fceb370 100644 --- a/syva-cp/src/lib.rs +++ b/syva-cp/src/lib.rs @@ -5,6 +5,7 @@ //! `docs/adr/0003-transactional-write-discipline.md` for the rules that //! govern every mutating operation in this crate. +pub mod bus; pub mod config; pub mod db; pub mod engine; @@ -16,6 +17,7 @@ pub mod rpc; pub mod write; use anyhow::Result; +use bus::AssignmentBus; use config::Config; pub async fn run(config: Config) -> Result<()> { @@ -28,7 +30,9 @@ pub async fn run(config: Config) -> Result<()> { // Mirrors syva-core's pattern: observability before enforcement. let (health_handle, health_ready) = health::spawn(config.health_addr, metrics_handle); - let rpc_handle = rpc::spawn(pool.clone(), config.grpc_addr).await?; + let bus = AssignmentBus::new(); + let listener_handle = bus::spawn_listener(pool.clone(), bus.clone()).await?; + let rpc_handle = rpc::spawn(pool.clone(), bus.clone(), config.grpc_addr).await?; health_ready.set(true); tracing::info!("syva-cp ready"); @@ -40,6 +44,7 @@ pub async fn run(config: Config) -> Result<()> { } health_ready.set(false); + listener_handle.abort(); rpc_handle.abort(); health_handle.abort(); Ok(()) diff --git a/syva-cp/src/rpc/mod.rs b/syva-cp/src/rpc/mod.rs index 99c74fe..c967bb8 100644 --- a/syva-cp/src/rpc/mod.rs +++ b/syva-cp/src/rpc/mod.rs @@ -1,6 +1,7 @@ //! gRPC server. Only wires services to the underlying TransactionalWriter //! and read module — no business logic here, no direct DB writes. +use crate::bus::AssignmentBus; use anyhow::Result; use sqlx::postgres::PgPool; use std::net::SocketAddr; @@ -10,7 +11,7 @@ use tonic::transport::Server; pub mod team_service; pub mod zone_service; -pub async fn spawn(pool: PgPool, addr: SocketAddr) -> Result> { +pub async fn spawn(pool: PgPool, _bus: AssignmentBus, addr: SocketAddr) -> Result> { let team_svc = team_service::TeamServiceImpl { pool: pool.clone() }; let zone_svc = zone_service::ZoneServiceImpl { pool: pool.clone() }; From 05754780db9f2c7ec3e0b4ba7e9111db79443aad Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:19:26 +0200 Subject: [PATCH 11/16] feat(syva-cp): NodeService gRPC wiring --- syva-cp/src/read/node.rs | 15 +- syva-cp/src/rpc/mod.rs | 7 + syva-cp/src/rpc/node_service.rs | 247 ++++++++++++++++++++++++++++++++ 3 files changed, 263 insertions(+), 6 deletions(-) create mode 100644 syva-cp/src/rpc/node_service.rs diff --git a/syva-cp/src/read/node.rs b/syva-cp/src/read/node.rs index 244b4be..9a07efc 100644 --- a/syva-cp/src/read/node.rs +++ b/syva-cp/src/read/node.rs @@ -76,10 +76,13 @@ pub async fn load_labels(pool: &PgPool, node_id: Uuid) -> Result("key"), row.get::("value")); - } - - Ok(labels) + Ok(rows + .into_iter() + .map(|row| { + ( + row.get::("key"), + row.get::("value"), + ) + }) + .collect::>()) } diff --git a/syva-cp/src/rpc/mod.rs b/syva-cp/src/rpc/mod.rs index c967bb8..cc80484 100644 --- a/syva-cp/src/rpc/mod.rs +++ b/syva-cp/src/rpc/mod.rs @@ -8,15 +8,22 @@ use std::net::SocketAddr; use tokio::task::JoinHandle; use tonic::transport::Server; +pub mod node_service; pub mod team_service; pub mod zone_service; pub async fn spawn(pool: PgPool, _bus: AssignmentBus, addr: SocketAddr) -> Result> { + let node_svc = node_service::NodeServiceImpl { pool: pool.clone() }; let team_svc = team_service::TeamServiceImpl { pool: pool.clone() }; let zone_svc = zone_service::ZoneServiceImpl { pool: pool.clone() }; let handle = tokio::spawn(async move { let result = Server::builder() + .add_service( + syva_proto::syva_control::v1::node_service_server::NodeServiceServer::new( + node_svc, + ), + ) .add_service( syva_proto::syva_control::v1::team_service_server::TeamServiceServer::new( team_svc, diff --git a/syva-cp/src/rpc/node_service.rs b/syva-cp/src/rpc/node_service.rs new file mode 100644 index 0000000..c014eda --- /dev/null +++ b/syva-cp/src/rpc/node_service.rs @@ -0,0 +1,247 @@ +use crate::db::types::Actor; +use crate::read; +use crate::write::node::{ + DecommissionNodeInput, HeartbeatInput, RegisterNodeInput, SetNodeLabelsInput, +}; +use crate::write::TransactionalWriter; +use serde_json::Value as JsonValue; +use sqlx::postgres::PgPool; +use std::collections::BTreeMap; +use syva_proto::syva_control::v1::node_service_server::NodeService; +use syva_proto::syva_control::v1::{ + get_node_request::Identifier, DecommissionNodeRequest, DecommissionNodeResponse, + GetNodeRequest, GetNodeResponse, HeartbeatRequest, HeartbeatResponse, ListNodesRequest, + ListNodesResponse, Node as NodeProto, RegisterNodeRequest, RegisterNodeResponse, + SetNodeLabelsRequest, SetNodeLabelsResponse, +}; +use tonic::{Request, Response, Status}; +use uuid::Uuid; + +pub struct NodeServiceImpl { + pub pool: PgPool, +} + +#[tonic::async_trait] +impl NodeService for NodeServiceImpl { + async fn register_node( + &self, + request: Request, + ) -> Result, Status> { + grpc_request_counter("RegisterNode"); + let req = request.into_inner(); + let proposed_id = parse_uuid(&req.proposed_id, "proposed_id")?; + let capabilities_json = parse_optional_json(&req.capabilities_json, "capabilities_json")? + .unwrap_or_else(|| JsonValue::Object(serde_json::Map::new())); + + let writer = TransactionalWriter::new(&self.pool); + let out = writer + .register_node( + RegisterNodeInput { + node_name: req.node_name, + fingerprint: non_empty(req.fingerprint), + cluster_id: non_empty(req.cluster_id), + labels: map_to_labels(req.labels), + capabilities_json, + proposed_id, + }, + &dev_actor(), + ) + .await?; + + let assigned_id = out.node.id.to_string(); + + Ok(Response::new(RegisterNodeResponse { + node: Some(node_to_proto(out.node, out.labels)), + assigned_id, + })) + } + + async fn heartbeat( + &self, + request: Request, + ) -> Result, Status> { + grpc_request_counter("Heartbeat"); + let req = request.into_inner(); + let node_id = parse_uuid(&req.node_id, "node_id")?; + + let writer = TransactionalWriter::new(&self.pool); + writer + .heartbeat_node( + HeartbeatInput { + node_id, + status_hint: non_empty(req.status_hint), + }, + &dev_actor(), + ) + .await?; + + Ok(Response::new(HeartbeatResponse { + server_time: Some(to_ts(chrono::Utc::now())), + })) + } + + async fn decommission_node( + &self, + request: Request, + ) -> Result, Status> { + grpc_request_counter("DecommissionNode"); + let req = request.into_inner(); + let node_id = parse_uuid(&req.node_id, "node_id")?; + + let writer = TransactionalWriter::new(&self.pool); + let node = writer + .decommission_node( + DecommissionNodeInput { + node_id, + if_version: req.if_version, + }, + &dev_actor(), + ) + .await?; + + let labels = read::node::load_labels(&self.pool, node.id).await?; + + Ok(Response::new(DecommissionNodeResponse { + node: Some(node_to_proto(node, labels)), + })) + } + + async fn set_node_labels( + &self, + request: Request, + ) -> Result, Status> { + grpc_request_counter("SetNodeLabels"); + let req = request.into_inner(); + let node_id = parse_uuid(&req.node_id, "node_id")?; + + let writer = TransactionalWriter::new(&self.pool); + let out = writer + .set_node_labels( + SetNodeLabelsInput { + node_id, + if_version: req.if_version, + labels: map_to_labels(req.labels), + }, + &dev_actor(), + ) + .await?; + + Ok(Response::new(SetNodeLabelsResponse { + node: Some(node_to_proto(out.node, out.labels)), + })) + } + + async fn get_node( + &self, + request: Request, + ) -> Result, Status> { + grpc_request_counter("GetNode"); + let req = request.into_inner(); + + let (node, labels) = match req.identifier { + Some(Identifier::Id(id)) => read::node::get_node(&self.pool, parse_uuid(&id, "id")?).await?, + Some(Identifier::NodeName(node_name)) => { + read::node::get_node_by_name(&self.pool, &node_name).await? + } + None => return Err(Status::invalid_argument("identifier required")), + }; + + Ok(Response::new(GetNodeResponse { + node: Some(node_to_proto(node, labels)), + })) + } + + async fn list_nodes( + &self, + request: Request, + ) -> Result, Status> { + grpc_request_counter("ListNodes"); + let req = request.into_inner(); + let status = if req.status.is_empty() { + None + } else { + Some(req.status.as_str()) + }; + let limit = if req.limit <= 0 { 50 } else { req.limit }; + + let nodes = read::node::list_nodes(&self.pool, status, limit).await?; + + Ok(Response::new(ListNodesResponse { + nodes: nodes + .into_iter() + .map(|(node, labels)| node_to_proto(node, labels)) + .collect(), + })) + } +} + +fn grpc_request_counter(method: &'static str) { + ::metrics::counter!( + "syva_cp_grpc_requests_total", + "service" => "NodeService", + "method" => method + ) + .increment(1); +} + +#[allow(clippy::result_large_err)] +fn parse_uuid(s: &str, field: &'static str) -> Result { + Uuid::parse_str(s).map_err(|_| Status::invalid_argument(format!("invalid {field}"))) +} + +#[allow(clippy::result_large_err)] +fn parse_optional_json(s: &str, field: &'static str) -> Result, Status> { + if s.is_empty() { + Ok(None) + } else { + serde_json::from_str(s) + .map(Some) + .map_err(|_| Status::invalid_argument(format!("{field} must be valid JSON"))) + } +} + +fn map_to_labels(labels: std::collections::HashMap) -> BTreeMap { + labels.into_iter().collect() +} + +fn non_empty(s: String) -> Option { + if s.is_empty() { + None + } else { + Some(s) + } +} + +fn dev_actor() -> Actor { + Actor { + actor_type: "user".into(), + actor_id: "dev".into(), + team_id: None, + subject_type: "user".into(), + subject_id: "dev".into(), + } +} + +fn node_to_proto(node: crate::db::types::Node, labels: BTreeMap) -> NodeProto { + NodeProto { + id: node.id.to_string(), + node_name: node.node_name, + cluster_id: node.cluster_id.unwrap_or_default(), + status: node.status, + fingerprint: node.fingerprint.unwrap_or_default(), + last_seen_at: node.last_seen_at.map(to_ts), + created_at: Some(to_ts(node.created_at)), + updated_at: Some(to_ts(node.updated_at)), + version: node.version, + labels: labels.into_iter().collect(), + capabilities_json: node.capabilities_json.to_string(), + metadata_json: node.metadata_json.to_string(), + } +} + +fn to_ts(ts: chrono::DateTime) -> prost_types::Timestamp { + prost_types::Timestamp { + seconds: ts.timestamp(), + nanos: ts.timestamp_subsec_nanos() as i32, + } +} From 04a74987faf10b06b5191ed60cb8861e7f43c157 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:24:08 +0200 Subject: [PATCH 12/16] feat(syva-cp): AssignmentService with streaming subscribe and report --- syva-cp/src/rpc/assignment_service.rs | 342 ++++++++++++++++++++++++++ syva-cp/src/rpc/mod.rs | 12 +- 2 files changed, 353 insertions(+), 1 deletion(-) create mode 100644 syva-cp/src/rpc/assignment_service.rs diff --git a/syva-cp/src/rpc/assignment_service.rs b/syva-cp/src/rpc/assignment_service.rs new file mode 100644 index 0000000..a5ef330 --- /dev/null +++ b/syva-cp/src/rpc/assignment_service.rs @@ -0,0 +1,342 @@ +use crate::bus::AssignmentBus; +use crate::db::types::Actor; +use crate::read; +use crate::write::assignment::{ + AppliedReport, FailedReport, ReportAssignmentStateInput, +}; +use crate::write::TransactionalWriter; +use serde_json::json; +use sqlx::postgres::PgPool; +use sqlx::Row; +use std::collections::HashMap; +use syva_proto::syva_control::v1::assignment_service_server::AssignmentService; +use syva_proto::syva_control::v1::node_assignment_update::Kind as UpdateKind; +use syva_proto::syva_control::v1::{ + Assignment as AssignmentProto, GetAssignmentRequest, GetAssignmentResponse, + ListAssignmentsRequest, ListAssignmentsResponse, NodeAssignmentUpdate, + ReportAssignmentStateRequest, ReportAssignmentStateResponse, + SubscribeAssignmentsRequest, ZoneAssignment, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response, Status}; +use uuid::Uuid; + +pub struct AssignmentServiceImpl { + pub pool: PgPool, + pub bus: AssignmentBus, +} + +#[tonic::async_trait] +impl AssignmentService for AssignmentServiceImpl { + type SubscribeAssignmentsStream = ReceiverStream>; + + async fn subscribe_assignments( + &self, + request: Request, + ) -> Result, Status> { + grpc_request_counter("SubscribeAssignments"); + let node_id = parse_uuid(&request.into_inner().node_id, "node_id")?; + + let (tx, rx) = mpsc::channel::>(32); + let pool = self.pool.clone(); + let bus = self.bus.clone(); + + tokio::spawn(async move { + let initial = match build_assignments_for_node(&pool, node_id).await { + Ok(assignments) => assignments, + Err(err) => { + let _ = tx.send(Err(Status::from(err))).await; + return; + } + }; + + let mut last_sent: HashMap = initial + .iter() + .filter_map(|assignment| { + let zone_id = Uuid::parse_str(&assignment.zone_id).ok()?; + let policy_id = Uuid::parse_str(&assignment.desired_policy_id).ok()?; + Some((zone_id, (policy_id, assignment.desired_zone_version))) + }) + .collect(); + + if tx + .send(Ok(NodeAssignmentUpdate { + kind: UpdateKind::FullSnapshot as i32, + assignments: initial, + removed_zone_id: String::new(), + server_revision: 1, + })) + .await + .is_err() + { + return; + } + + let mut bus_rx = bus.subscribe(node_id).await; + let mut server_revision = 2_i64; + + while bus_rx.recv().await.is_ok() { + let current = match build_assignments_for_node(&pool, node_id).await { + Ok(assignments) => assignments, + Err(err) => { + let _ = tx.send(Err(Status::from(err))).await; + return; + } + }; + + let current_map: HashMap = current + .into_iter() + .filter_map(|assignment| { + let zone_id = Uuid::parse_str(&assignment.zone_id).ok()?; + let policy_id = + Uuid::parse_str(&assignment.desired_policy_id).ok()?; + Some(( + zone_id, + (policy_id, assignment.desired_zone_version, assignment), + )) + }) + .collect(); + + for (zone_id, (policy_id, zone_version, assignment)) in ¤t_map { + let changed = last_sent + .get(zone_id) + .map(|(old_policy_id, old_zone_version)| { + old_policy_id != policy_id || old_zone_version != zone_version + }) + .unwrap_or(true); + + if changed + && tx + .send(Ok(NodeAssignmentUpdate { + kind: UpdateKind::Upsert as i32, + assignments: vec![assignment.clone()], + removed_zone_id: String::new(), + server_revision, + })) + .await + .is_err() + { + return; + } + + if changed { + server_revision += 1; + } + } + + for zone_id in last_sent.keys() { + if current_map.contains_key(zone_id) { + continue; + } + + if tx + .send(Ok(NodeAssignmentUpdate { + kind: UpdateKind::Remove as i32, + assignments: Vec::new(), + removed_zone_id: zone_id.to_string(), + server_revision, + })) + .await + .is_err() + { + return; + } + server_revision += 1; + } + + last_sent = current_map + .into_iter() + .map(|(zone_id, (policy_id, zone_version, _))| { + (zone_id, (policy_id, zone_version)) + }) + .collect(); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } + + async fn report_assignment_state( + &self, + request: Request, + ) -> Result, Status> { + grpc_request_counter("ReportAssignmentState"); + let req = request.into_inner(); + let node_id = parse_uuid(&req.node_id, "node_id")?; + + let mut applied = Vec::with_capacity(req.applied.len()); + for report in req.applied { + applied.push(AppliedReport { + assignment_id: parse_uuid(&report.assignment_id, "assignment_id")?, + actual_zone_version: report.actual_zone_version, + actual_policy_id: parse_uuid(&report.actual_policy_id, "actual_policy_id")?, + }); + } + + let mut failed = Vec::with_capacity(req.failed.len()); + for report in req.failed { + let error_json = serde_json::from_str(&report.error_json) + .unwrap_or_else(|_| json!({ "raw": report.error_json })); + failed.push(FailedReport { + assignment_id: parse_uuid(&report.assignment_id, "assignment_id")?, + error_json, + }); + } + + let writer = TransactionalWriter::new(&self.pool); + let out = writer + .report_assignment_state( + ReportAssignmentStateInput { + node_id, + applied, + failed, + }, + &dev_actor(), + ) + .await?; + + Ok(Response::new(ReportAssignmentStateResponse { + accepted_count: out.accepted as i32, + rejected_count: out.rejected as i32, + })) + } + + async fn get_assignment( + &self, + request: Request, + ) -> Result, Status> { + grpc_request_counter("GetAssignment"); + let assignment_id = parse_uuid(&request.into_inner().assignment_id, "assignment_id")?; + let assignment = read::assignment::get_assignment(&self.pool, assignment_id).await?; + + Ok(Response::new(GetAssignmentResponse { + assignment: Some(assignment_to_proto(assignment)), + })) + } + + async fn list_assignments( + &self, + request: Request, + ) -> Result, Status> { + grpc_request_counter("ListAssignments"); + let req = request.into_inner(); + let zone_id = parse_optional_uuid(&req.zone_id, "zone_id")?; + let node_id = parse_optional_uuid(&req.node_id, "node_id")?; + let status = if req.status.is_empty() { + None + } else { + Some(req.status.as_str()) + }; + let limit = if req.limit <= 0 { 50 } else { req.limit }; + + let assignments = + read::assignment::list_filtered(&self.pool, zone_id, node_id, status, limit).await?; + + Ok(Response::new(ListAssignmentsResponse { + assignments: assignments.into_iter().map(assignment_to_proto).collect(), + })) + } +} + +async fn build_assignments_for_node( + pool: &PgPool, + node_id: Uuid, +) -> Result, crate::error::CpError> { + let rows = sqlx::query( + r#"SELECT a.id AS assignment_id, + a.zone_id, + a.desired_policy_id, + a.desired_zone_version, + z.name AS zone_name, + z.team_id, + p.version AS desired_policy_version, + p.policy_json + FROM assignments a + INNER JOIN zones z ON z.id = a.zone_id AND z.deleted_at IS NULL + INNER JOIN policies p ON p.id = a.desired_policy_id + WHERE a.node_id = $1 AND a.status NOT IN ('removed', 'failed')"#, + ) + .bind(node_id) + .fetch_all(pool) + .await?; + + Ok(rows + .into_iter() + .map(|row| ZoneAssignment { + assignment_id: row.get::("assignment_id").to_string(), + zone_id: row.get::("zone_id").to_string(), + zone_name: row.get("zone_name"), + desired_zone_version: row.get("desired_zone_version"), + desired_policy_id: row.get::("desired_policy_id").to_string(), + desired_policy_version: row.get("desired_policy_version"), + policy_json: row.get::("policy_json").to_string(), + team_id: row.get::("team_id").to_string(), + }) + .collect()) +} + +fn grpc_request_counter(method: &'static str) { + ::metrics::counter!( + "syva_cp_grpc_requests_total", + "service" => "AssignmentService", + "method" => method + ) + .increment(1); +} + +#[allow(clippy::result_large_err)] +fn parse_uuid(s: &str, field: &'static str) -> Result { + Uuid::parse_str(s).map_err(|_| Status::invalid_argument(format!("invalid {field}"))) +} + +#[allow(clippy::result_large_err)] +fn parse_optional_uuid(s: &str, field: &'static str) -> Result, Status> { + if s.is_empty() { + Ok(None) + } else { + parse_uuid(s, field).map(Some) + } +} + +fn dev_actor() -> Actor { + Actor { + actor_type: "node".into(), + actor_id: "agent".into(), + team_id: None, + subject_type: "node".into(), + subject_id: "agent".into(), + } +} + +fn assignment_to_proto(assignment: crate::db::types::Assignment) -> AssignmentProto { + AssignmentProto { + id: assignment.id.to_string(), + zone_id: assignment.zone_id.to_string(), + node_id: assignment.node_id.to_string(), + status: assignment.status, + desired_policy_id: assignment.desired_policy_id.to_string(), + desired_zone_version: assignment.desired_zone_version, + actual_policy_id: assignment + .actual_policy_id + .map(|id| id.to_string()) + .unwrap_or_default(), + actual_zone_version: assignment.actual_zone_version.unwrap_or_default(), + last_reported_at: assignment.last_reported_at.map(to_ts), + error_json: assignment + .error_json + .map(|value| value.to_string()) + .unwrap_or_default(), + created_at: Some(to_ts(assignment.created_at)), + updated_at: Some(to_ts(assignment.updated_at)), + version: assignment.version, + caused_by_event_id: assignment.caused_by_event_id.to_string(), + } +} + +fn to_ts(ts: chrono::DateTime) -> prost_types::Timestamp { + prost_types::Timestamp { + seconds: ts.timestamp(), + nanos: ts.timestamp_subsec_nanos() as i32, + } +} diff --git a/syva-cp/src/rpc/mod.rs b/syva-cp/src/rpc/mod.rs index cc80484..3f7a502 100644 --- a/syva-cp/src/rpc/mod.rs +++ b/syva-cp/src/rpc/mod.rs @@ -8,17 +8,27 @@ use std::net::SocketAddr; use tokio::task::JoinHandle; use tonic::transport::Server; +pub mod assignment_service; pub mod node_service; pub mod team_service; pub mod zone_service; -pub async fn spawn(pool: PgPool, _bus: AssignmentBus, addr: SocketAddr) -> Result> { +pub async fn spawn(pool: PgPool, bus: AssignmentBus, addr: SocketAddr) -> Result> { + let assignment_svc = assignment_service::AssignmentServiceImpl { + pool: pool.clone(), + bus, + }; let node_svc = node_service::NodeServiceImpl { pool: pool.clone() }; let team_svc = team_service::TeamServiceImpl { pool: pool.clone() }; let zone_svc = zone_service::ZoneServiceImpl { pool: pool.clone() }; let handle = tokio::spawn(async move { let result = Server::builder() + .add_service( + syva_proto::syva_control::v1::assignment_service_server::AssignmentServiceServer::new( + assignment_svc, + ), + ) .add_service( syva_proto::syva_control::v1::node_service_server::NodeServiceServer::new( node_svc, From eff54d45a28b6542b8958e4fac00d40223858770 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:27:33 +0200 Subject: [PATCH 13/16] test(syva-cp): structural tests for register_node, set_node_labels, decommission_node, heartbeat --- .../decommission_node_removes_assignments.rs | 152 +++++++++++++++ syva-cp/tests/heartbeat_exception_to_audit.rs | 89 +++++++++ .../register_node_writes_all_causal_rows.rs | 129 +++++++++++++ .../set_node_labels_recomputes_assignments.rs | 177 ++++++++++++++++++ 4 files changed, 547 insertions(+) create mode 100644 syva-cp/tests/decommission_node_removes_assignments.rs create mode 100644 syva-cp/tests/heartbeat_exception_to_audit.rs create mode 100644 syva-cp/tests/register_node_writes_all_causal_rows.rs create mode 100644 syva-cp/tests/set_node_labels_recomputes_assignments.rs diff --git a/syva-cp/tests/decommission_node_removes_assignments.rs b/syva-cp/tests/decommission_node_removes_assignments.rs new file mode 100644 index 0000000..12fd5a5 --- /dev/null +++ b/syva-cp/tests/decommission_node_removes_assignments.rs @@ -0,0 +1,152 @@ +use serde_json::json; +use sqlx::postgres::PgPool; +use syva_cp::db::types::{Actor, PolicyInput}; +use syva_cp::error::CpError; +use syva_cp::write::node::{DecommissionNodeInput, RegisterNodeInput}; +use syva_cp::write::team::CreateTeamInput; +use syva_cp::write::zone::CreateZoneInput; +use syva_cp::write::TransactionalWriter; +use uuid::Uuid; + +fn actor() -> Actor { + Actor { + actor_type: "user".into(), + actor_id: "test".into(), + team_id: None, + subject_type: "user".into(), + subject_id: "test".into(), + } +} + +async fn seed_node_with_assignments(pool: &PgPool) -> (Uuid, i64) { + let writer = TransactionalWriter::new(pool); + let team = writer + .create_team( + CreateTeamInput { + name: "platform".into(), + display_name: None, + }, + &actor(), + ) + .await + .unwrap(); + + for zone_name in ["agents-a", "agents-b"] { + writer + .create_zone( + CreateZoneInput { + team_id: team.id, + name: zone_name.into(), + display_name: None, + policy: PolicyInput { + policy_json: json!({"allowed_zones": []}), + summary_json: None, + }, + selector_json: Some(json!({"all_nodes": true})), + metadata_json: None, + }, + &actor(), + ) + .await + .unwrap(); + } + + let node = writer + .register_node( + RegisterNodeInput { + node_name: "n01".into(), + fingerprint: Some("fp-01".into()), + cluster_id: None, + labels: Default::default(), + capabilities_json: json!({}), + proposed_id: Uuid::new_v4(), + }, + &actor(), + ) + .await + .unwrap(); + + (node.node.id, node.node.version) +} + +#[sqlx::test] +async fn decommission_node_removes_assignments(pool: PgPool) { + let (node_id, version) = seed_node_with_assignments(&pool).await; + let writer = TransactionalWriter::new(&pool); + + let node = writer + .decommission_node( + DecommissionNodeInput { + node_id, + if_version: version, + }, + &actor(), + ) + .await + .unwrap(); + + assert_eq!(node.status, "decommissioned"); + assert_eq!(node.version, version + 1); + + let removing_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM assignments WHERE node_id = $1 AND status = 'removing'", + ) + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(removing_count, 2); + + let version_count: i64 = sqlx::query_scalar( + r#"SELECT COUNT(*) + FROM assignment_versions av + INNER JOIN assignments a ON a.id = av.assignment_id + WHERE a.node_id = $1"#, + ) + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(version_count, 4); + + let audit = sqlx::query( + "SELECT result, control_plane_event_id FROM audit_log WHERE action = 'node.decommission' ORDER BY occurred_at DESC LIMIT 1", + ) + .fetch_one(&pool) + .await + .unwrap(); + let result: String = sqlx::Row::get(&audit, "result"); + let event_link: Option = sqlx::Row::get(&audit, "control_plane_event_id"); + assert_eq!(result, "success"); + assert_eq!(event_link, node.caused_by_event_id); +} + +#[sqlx::test] +async fn decommission_node_with_stale_version_returns_conflict(pool: PgPool) { + let (node_id, version) = seed_node_with_assignments(&pool).await; + let writer = TransactionalWriter::new(&pool); + + let err = writer + .decommission_node( + DecommissionNodeInput { + node_id, + if_version: version + 99, + }, + &actor(), + ) + .await + .unwrap_err(); + + assert!(matches!(err, CpError::VersionConflict { .. })); + + let audit = sqlx::query( + "SELECT result, control_plane_event_id FROM audit_log WHERE action = 'node.decommission' ORDER BY occurred_at DESC LIMIT 1", + ) + .fetch_one(&pool) + .await + .unwrap(); + let result: String = sqlx::Row::get(&audit, "result"); + let event_link: Option = sqlx::Row::get(&audit, "control_plane_event_id"); + assert_eq!(result, "denied"); + assert!(event_link.is_none()); +} diff --git a/syva-cp/tests/heartbeat_exception_to_audit.rs b/syva-cp/tests/heartbeat_exception_to_audit.rs new file mode 100644 index 0000000..7cd9be2 --- /dev/null +++ b/syva-cp/tests/heartbeat_exception_to_audit.rs @@ -0,0 +1,89 @@ +use serde_json::json; +use sqlx::postgres::PgPool; +use syva_cp::db::types::Actor; +use syva_cp::write::node::{HeartbeatInput, RegisterNodeInput}; +use syva_cp::write::TransactionalWriter; +use uuid::Uuid; + +fn actor() -> Actor { + Actor { + actor_type: "user".into(), + actor_id: "test".into(), + team_id: None, + subject_type: "user".into(), + subject_id: "test".into(), + } +} + +async fn seed_node(pool: &PgPool) -> Uuid { + let writer = TransactionalWriter::new(pool); + writer + .register_node( + RegisterNodeInput { + node_name: "n01".into(), + fingerprint: Some("fp-01".into()), + cluster_id: None, + labels: Default::default(), + capabilities_json: json!({}), + proposed_id: Uuid::new_v4(), + }, + &actor(), + ) + .await + .unwrap() + .node + .id +} + +#[sqlx::test] +async fn heartbeat_exception_to_audit(pool: PgPool) { + let node_id = seed_node(&pool).await; + let writer = TransactionalWriter::new(&pool); + + let before_events: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM control_plane_events WHERE resource_id = $1", + ) + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + + writer + .heartbeat_node( + HeartbeatInput { + node_id, + status_hint: Some("online".into()), + }, + &actor(), + ) + .await + .unwrap(); + + let after_events: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM control_plane_events WHERE resource_id = $1", + ) + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(after_events, before_events + 1); + + let audit_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM audit_log WHERE action = 'node.heartbeat' AND resource_id = $1", + ) + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(audit_count, 0); + + let last_seen = sqlx::query("SELECT last_seen_at, last_heartbeat_event_id FROM nodes WHERE id = $1") + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + let last_seen_at: Option> = sqlx::Row::get(&last_seen, "last_seen_at"); + let heartbeat_event_id: Option = sqlx::Row::get(&last_seen, "last_heartbeat_event_id"); + assert!(last_seen_at.is_some()); + assert!(heartbeat_event_id.is_some()); +} diff --git a/syva-cp/tests/register_node_writes_all_causal_rows.rs b/syva-cp/tests/register_node_writes_all_causal_rows.rs new file mode 100644 index 0000000..d7cc922 --- /dev/null +++ b/syva-cp/tests/register_node_writes_all_causal_rows.rs @@ -0,0 +1,129 @@ +use serde_json::json; +use sqlx::postgres::PgPool; +use syva_cp::db::types::{Actor, PolicyInput}; +use syva_cp::write::node::RegisterNodeInput; +use syva_cp::write::team::CreateTeamInput; +use syva_cp::write::zone::CreateZoneInput; +use syva_cp::write::TransactionalWriter; +use uuid::Uuid; + +fn actor() -> Actor { + Actor { + actor_type: "user".into(), + actor_id: "test".into(), + team_id: None, + subject_type: "user".into(), + subject_id: "test".into(), + } +} + +async fn seed_team_with_zone(pool: &PgPool) -> Uuid { + let writer = TransactionalWriter::new(pool); + let team = writer + .create_team( + CreateTeamInput { + name: "platform".into(), + display_name: None, + }, + &actor(), + ) + .await + .unwrap(); + + writer + .create_zone( + CreateZoneInput { + team_id: team.id, + name: "agents".into(), + display_name: None, + policy: PolicyInput { + policy_json: json!({"allowed_zones": []}), + summary_json: None, + }, + selector_json: Some(json!({"match_labels": {"tier": "prod"}})), + metadata_json: None, + }, + &actor(), + ) + .await + .unwrap(); + + team.id +} + +#[sqlx::test] +async fn register_node_writes_all_causal_rows(pool: PgPool) { + let _team_id = seed_team_with_zone(&pool).await; + let writer = TransactionalWriter::new(&pool); + let node_id = Uuid::new_v4(); + + let out = writer + .register_node( + RegisterNodeInput { + node_name: "n01".into(), + fingerprint: Some("fp-01".into()), + cluster_id: Some("cluster-a".into()), + labels: [("tier".to_string(), "prod".to_string())] + .into_iter() + .collect(), + capabilities_json: json!({"runtime": "linux"}), + proposed_id: node_id, + }, + &actor(), + ) + .await + .unwrap(); + + assert!(out.is_new); + assert_eq!(out.node.id, node_id); + assert_eq!(out.assignments_upserted, 1); + assert_eq!(out.assignments_removed, 0); + + let event_count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM control_plane_events WHERE resource_id = $1") + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(event_count, 1); + + let label_count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM node_labels WHERE node_id = $1") + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(label_count, 1); + + let assignment_count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM assignments WHERE node_id = $1") + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(assignment_count, 1); + + let assignment_version_count: i64 = sqlx::query_scalar( + r#"SELECT COUNT(*) + FROM assignment_versions av + INNER JOIN assignments a ON a.id = av.assignment_id + WHERE a.node_id = $1"#, + ) + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(assignment_version_count, 1); + + let audit = sqlx::query( + "SELECT result, control_plane_event_id FROM audit_log WHERE resource_id = $1 AND action = 'node.register'", + ) + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + let result: String = sqlx::Row::get(&audit, "result"); + let event_link: Option = sqlx::Row::get(&audit, "control_plane_event_id"); + assert_eq!(result, "success"); + assert_eq!(event_link, out.node.caused_by_event_id); +} diff --git a/syva-cp/tests/set_node_labels_recomputes_assignments.rs b/syva-cp/tests/set_node_labels_recomputes_assignments.rs new file mode 100644 index 0000000..b3abb65 --- /dev/null +++ b/syva-cp/tests/set_node_labels_recomputes_assignments.rs @@ -0,0 +1,177 @@ +use serde_json::json; +use sqlx::postgres::PgPool; +use syva_cp::db::types::{Actor, PolicyInput}; +use syva_cp::error::CpError; +use syva_cp::write::node::{RegisterNodeInput, SetNodeLabelsInput}; +use syva_cp::write::team::CreateTeamInput; +use syva_cp::write::zone::CreateZoneInput; +use syva_cp::write::TransactionalWriter; +use uuid::Uuid; + +fn actor() -> Actor { + Actor { + actor_type: "user".into(), + actor_id: "test".into(), + team_id: None, + subject_type: "user".into(), + subject_id: "test".into(), + } +} + +async fn seed_node_and_zone(pool: &PgPool) -> (Uuid, i64) { + let writer = TransactionalWriter::new(pool); + let team = writer + .create_team( + CreateTeamInput { + name: "platform".into(), + display_name: None, + }, + &actor(), + ) + .await + .unwrap(); + + writer + .create_zone( + CreateZoneInput { + team_id: team.id, + name: "agents".into(), + display_name: None, + policy: PolicyInput { + policy_json: json!({"allowed_zones": []}), + summary_json: None, + }, + selector_json: Some(json!({"match_labels": {"tier": "prod"}})), + metadata_json: None, + }, + &actor(), + ) + .await + .unwrap(); + + let node = writer + .register_node( + RegisterNodeInput { + node_name: "n01".into(), + fingerprint: Some("fp-01".into()), + cluster_id: None, + labels: [("tier".to_string(), "dev".to_string())] + .into_iter() + .collect(), + capabilities_json: json!({}), + proposed_id: Uuid::new_v4(), + }, + &actor(), + ) + .await + .unwrap(); + + (node.node.id, node.node.version) +} + +#[sqlx::test] +async fn set_node_labels_recomputes_assignments(pool: PgPool) { + let (node_id, version) = seed_node_and_zone(&pool).await; + let writer = TransactionalWriter::new(&pool); + + let out = writer + .set_node_labels( + SetNodeLabelsInput { + node_id, + if_version: version, + labels: [("tier".to_string(), "prod".to_string())] + .into_iter() + .collect(), + }, + &actor(), + ) + .await + .unwrap(); + + assert_eq!(out.assignments_upserted, 1); + assert_eq!(out.assignments_removed, 0); + assert_eq!(out.node.version, version + 1); + + let desired_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM assignments WHERE node_id = $1 AND status = 'desired'", + ) + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(desired_count, 1); + + let out = writer + .set_node_labels( + SetNodeLabelsInput { + node_id, + if_version: out.node.version, + labels: [("tier".to_string(), "dev".to_string())] + .into_iter() + .collect(), + }, + &actor(), + ) + .await + .unwrap(); + + assert_eq!(out.assignments_upserted, 0); + assert_eq!(out.assignments_removed, 1); + + let removing_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM assignments WHERE node_id = $1 AND status = 'removing'", + ) + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(removing_count, 1); + + let versions: i64 = sqlx::query_scalar( + r#"SELECT COUNT(*) + FROM assignment_versions av + INNER JOIN assignments a ON a.id = av.assignment_id + WHERE a.node_id = $1"#, + ) + .bind(node_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(versions, 2); +} + +#[sqlx::test] +async fn set_node_labels_with_stale_version_returns_conflict(pool: PgPool) { + let (node_id, version) = seed_node_and_zone(&pool).await; + let writer = TransactionalWriter::new(&pool); + + let err = match writer + .set_node_labels( + SetNodeLabelsInput { + node_id, + if_version: version + 99, + labels: [("tier".to_string(), "prod".to_string())] + .into_iter() + .collect(), + }, + &actor(), + ) + .await + { + Ok(_) => panic!("expected stale version conflict"), + Err(err) => err, + }; + + assert!(matches!(err, CpError::VersionConflict { .. })); + + let audit = sqlx::query( + "SELECT result, control_plane_event_id FROM audit_log WHERE action = 'node.set_labels' ORDER BY occurred_at DESC LIMIT 1", + ) + .fetch_one(&pool) + .await + .unwrap(); + let result: String = sqlx::Row::get(&audit, "result"); + let event_link: Option = sqlx::Row::get(&audit, "control_plane_event_id"); + assert_eq!(result, "denied"); + assert!(event_link.is_none()); +} From 0b68f6349dcd96cab4c1198ed4c9177ba11ecc2b Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:30:08 +0200 Subject: [PATCH 14/16] test(syva-cp): end-to-end integration - zone writes recompute assignments correctly --- .../zone_write_recomputes_assignments.rs | 300 ++++++++++++++++++ 1 file changed, 300 insertions(+) create mode 100644 syva-cp/tests/zone_write_recomputes_assignments.rs diff --git a/syva-cp/tests/zone_write_recomputes_assignments.rs b/syva-cp/tests/zone_write_recomputes_assignments.rs new file mode 100644 index 0000000..4679741 --- /dev/null +++ b/syva-cp/tests/zone_write_recomputes_assignments.rs @@ -0,0 +1,300 @@ +use serde_json::json; +use sqlx::postgres::PgPool; +use syva_cp::db::types::{Actor, PolicyInput}; +use syva_cp::write::node::{DecommissionNodeInput, RegisterNodeInput}; +use syva_cp::write::team::CreateTeamInput; +use syva_cp::write::zone::{CreateZoneInput, DeleteZoneInput, UpdateZoneInput}; +use syva_cp::write::TransactionalWriter; +use uuid::Uuid; + +fn actor() -> Actor { + Actor { + actor_type: "user".into(), + actor_id: "test".into(), + team_id: None, + subject_type: "user".into(), + subject_id: "test".into(), + } +} + +struct SeededNode { + id: Uuid, + version: i64, + name: &'static str, +} + +async fn register_node( + pool: &PgPool, + name: &'static str, + labels: &[(&str, &str)], +) -> SeededNode { + let writer = TransactionalWriter::new(pool); + let node = writer + .register_node( + RegisterNodeInput { + node_name: name.into(), + fingerprint: Some(format!("fp-{name}")), + cluster_id: None, + labels: labels + .iter() + .map(|(key, value)| (key.to_string(), value.to_string())) + .collect(), + capabilities_json: json!({}), + proposed_id: Uuid::new_v4(), + }, + &actor(), + ) + .await + .unwrap(); + + SeededNode { + id: node.node.id, + version: node.node.version, + name, + } +} + +#[sqlx::test] +async fn zone_write_recomputes_assignments(pool: PgPool) { + let writer = TransactionalWriter::new(&pool); + let team = writer + .create_team( + CreateTeamInput { + name: "platform".into(), + display_name: None, + }, + &actor(), + ) + .await + .unwrap(); + + let n1 = register_node(&pool, "n1", &[("tier", "prod"), ("region", "eu")]).await; + let n2 = register_node(&pool, "n2", &[("tier", "prod"), ("region", "us")]).await; + let n3 = register_node(&pool, "n3", &[("tier", "dev"), ("region", "eu")]).await; + let n4 = register_node(&pool, "n4", &[("tier", "dev"), ("region", "us")]).await; + let n5 = register_node(&pool, "n5", &[("tier", "dev"), ("region", "eu")]).await; + + let zone1 = writer + .create_zone( + CreateZoneInput { + team_id: team.id, + name: "all-nodes".into(), + display_name: None, + policy: PolicyInput { + policy_json: json!({"allowed_zones": []}), + summary_json: None, + }, + selector_json: Some(json!({"all_nodes": true})), + metadata_json: None, + }, + &actor(), + ) + .await + .unwrap(); + + let zone1_assignment_count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM assignments WHERE zone_id = $1") + .bind(zone1.zone.id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(zone1_assignment_count, 5); + + let zone1 = writer + .update_zone( + UpdateZoneInput { + zone_id: zone1.zone.id, + if_version: zone1.zone.version, + policy: None, + selector_json: Some(json!({"match_labels": {"tier": "prod"}})), + metadata_json: None, + }, + &actor(), + ) + .await + .unwrap(); + + let zone1_desired_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM assignments WHERE zone_id = $1 AND status = 'desired'", + ) + .bind(zone1.zone.id) + .fetch_one(&pool) + .await + .unwrap(); + let zone1_removing_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM assignments WHERE zone_id = $1 AND status = 'removing'", + ) + .bind(zone1.zone.id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(zone1_desired_count, 2); + assert_eq!(zone1_removing_count, 3); + + let zone2 = writer + .create_zone( + CreateZoneInput { + team_id: team.id, + name: "named".into(), + display_name: None, + policy: PolicyInput { + policy_json: json!({"allowed_zones": ["db"]}), + summary_json: None, + }, + selector_json: Some(json!({"node_names": [n1.name, n3.name]})), + metadata_json: None, + }, + &actor(), + ) + .await + .unwrap(); + + let zone2_assignment_count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM assignments WHERE zone_id = $1") + .bind(zone2.zone.id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(zone2_assignment_count, 2); + + let zone2 = writer + .delete_zone( + DeleteZoneInput { + zone_id: zone2.zone.id, + if_version: zone2.zone.version, + drain: true, + }, + &actor(), + ) + .await + .unwrap(); + assert_eq!(zone2.status, "draining"); + + let zone2_removing_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM assignments WHERE zone_id = $1 AND status = 'removing'", + ) + .bind(zone2.id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(zone2_removing_count, 2); + + let zone1 = writer + .delete_zone( + DeleteZoneInput { + zone_id: zone1.zone.id, + if_version: zone1.zone.version, + drain: false, + }, + &actor(), + ) + .await + .unwrap(); + assert_eq!(zone1.status, "deleted"); + + let zone1_after_delete: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM assignments WHERE zone_id = $1 AND status = 'removing'", + ) + .bind(zone1.id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(zone1_after_delete, 5); + + let zone3 = writer + .create_zone( + CreateZoneInput { + team_id: team.id, + name: "survivors".into(), + display_name: None, + policy: PolicyInput { + policy_json: json!({"allowed_zones": ["cache"]}), + summary_json: None, + }, + selector_json: Some(json!({"all_nodes": true})), + metadata_json: None, + }, + &actor(), + ) + .await + .unwrap(); + + let zone3_assignment_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM assignments WHERE zone_id = $1 AND status = 'desired'", + ) + .bind(zone3.zone.id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(zone3_assignment_count, 5); + + let node = writer + .decommission_node( + DecommissionNodeInput { + node_id: n1.id, + if_version: n1.version, + }, + &actor(), + ) + .await + .unwrap(); + assert_eq!(node.status, "decommissioned"); + + let n1_active_assignments: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM assignments WHERE node_id = $1 AND status = 'desired'", + ) + .bind(n1.id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(n1_active_assignments, 0); + + let n1_removing_assignments: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM assignments WHERE node_id = $1 AND status = 'removing'", + ) + .bind(n1.id) + .fetch_one(&pool) + .await + .unwrap(); + assert!(n1_removing_assignments >= 3); + + let assignment_versions: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM assignment_versions") + .fetch_one(&pool) + .await + .unwrap(); + let assignments: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM assignments") + .fetch_one(&pool) + .await + .unwrap(); + assert!(assignment_versions >= assignments); + + let orphan_count: i64 = sqlx::query_scalar( + r#"SELECT COUNT(*) + FROM assignments a + LEFT JOIN zones z ON z.id = a.zone_id + LEFT JOIN nodes n ON n.id = a.node_id + WHERE z.id IS NULL OR n.id IS NULL"#, + ) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(orphan_count, 0); + + let event_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM control_plane_events") + .fetch_one(&pool) + .await + .unwrap(); + assert!(event_count >= 10); + + let surviving_zone3_assignments: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM assignments WHERE zone_id = $1 AND status = 'desired'", + ) + .bind(zone3.zone.id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(surviving_zone3_assignments, 4); + + let _ = (n2, n3, n4, n5); +} From d50c53949f202624a328a5e4cd813b4a8b58f426 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 24 Apr 2026 01:31:22 +0200 Subject: [PATCH 15/16] docs(syva-cp): E2E smoke test for node+assignment flow; document heartbeat audit exception --- AGENT.md | 16 ++++++++++++++++ syva-cp/README.md | 48 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/AGENT.md b/AGENT.md index 69b9700..aacc2d5 100644 --- a/AGENT.md +++ b/AGENT.md @@ -5,6 +5,22 @@ Read CLAUDE.md first. This file covers *how to work*, not *what exists*. --- +## Heartbeat Audit Exception + +Node heartbeats are the single control-plane operation exempt from ADR 0003 +Rule 8's "audit every mutation" discipline. + +- Heartbeats happen many times per minute per node. +- `last_seen_at` is telemetry, not policy. +- Auditing every heartbeat would drown the audit log in low-value noise. + +Heartbeats still write a `control_plane_events` row with +`event_type = 'node.heartbeat'`, preserving the causal spine. They do **not** +write to `audit_log`. This exception is specific to heartbeats and must not be +copied to any other operation. + +--- + ## Mental Model: How to Think About This Codebase Syvä is a kernel enforcement boundary. Every line of eBPF code runs in a diff --git a/syva-cp/README.md b/syva-cp/README.md index cb58a57..2be823e 100644 --- a/syva-cp/README.md +++ b/syva-cp/README.md @@ -139,3 +139,51 @@ brew install grpcurl curl -L https://github.com/fullstorydev/grpcurl/releases/download/v1.9.1/grpcurl_1.9.1_linux_x86_64.tar.gz \ | tar xz -C /tmp && sudo mv /tmp/grpcurl /usr/local/bin/ ``` + +## E2E Smoke Test — Nodes and Assignments (Session 3) + +```bash +# Assume syva-cp is already running locally from the setup above. + +# 1. Create a team +TEAM_ID=$(grpcurl -plaintext -d '{"name":"platform"}' \ + localhost:50051 syva.control.v1.TeamService/CreateTeam \ + | jq -r '.team.id') +echo "team: $TEAM_ID" + +# 2. Create a zone with a label-based selector +ZONE_ID=$(grpcurl -plaintext -d "{ + \"team_id\":\"$TEAM_ID\", + \"name\":\"agents\", + \"policy_json\":\"{\\\"allowed_zones\\\":[]}\", + \"selector_json\":\"{\\\"match_labels\\\":{\\\"tier\\\":\\\"prod\\\"}}\" +}" localhost:50051 syva.control.v1.ZoneService/CreateZone \ + | jq -r '.zone.id') +echo "zone: $ZONE_ID" + +# 3. Generate a node_id on the node side +NODE_ID=$(uuidgen) +echo "node: $NODE_ID" + +# 4. Register the node with a matching label +grpcurl -plaintext -d "{ + \"node_name\":\"n01\", + \"proposed_id\":\"$NODE_ID\", + \"labels\":{\"tier\":\"prod\"} +}" localhost:50051 syva.control.v1.NodeService/RegisterNode + +# 5. In another shell, subscribe to assignment updates +grpcurl -plaintext -d "{\"node_id\":\"$NODE_ID\"}" \ + localhost:50051 syva.control.v1.AssignmentService/SubscribeAssignments + +# Expected: one FULL_SNAPSHOT with one assignment + +# 6. Flip the labels so the node no longer matches +grpcurl -plaintext -d "{ + \"node_id\":\"$NODE_ID\", + \"if_version\":1, + \"labels\":{\"tier\":\"dev\"} +}" localhost:50051 syva.control.v1.NodeService/SetNodeLabels + +# Expected on the stream: a REMOVE for the zone +``` From e7564171040e624e3dc892e09dc2b8aff5277159 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Sat, 25 Apr 2026 00:40:58 +0200 Subject: [PATCH 16/16] fix(syva-cp): address node assignment review feedback --- syva-cp/src/bus.rs | 38 ++++++++++++++++---- syva-cp/src/db/types.rs | 20 +++++++++++ syva-cp/src/engine/assignment.rs | 32 ++++++++++++++++- syva-cp/src/read/node.rs | 52 +++++++++++++++++++++++---- syva-cp/src/rpc/assignment_service.rs | 10 ++++-- syva-cp/src/write/node.rs | 26 +++----------- syva-cp/src/write/zone.rs | 40 ++++++++++++++------- 7 files changed, 167 insertions(+), 51 deletions(-) diff --git a/syva-cp/src/bus.rs b/syva-cp/src/bus.rs index 377ff90..4e7aaa1 100644 --- a/syva-cp/src/bus.rs +++ b/syva-cp/src/bus.rs @@ -29,9 +29,18 @@ impl AssignmentBus { } async fn notify(&self, node_id: Uuid) { - let map = self.inner.read().await; - if let Some(sender) = map.get(&node_id) { - let _ = sender.send(()); + let mut map = self.inner.write().await; + let should_remove = match map.get(&node_id) { + Some(sender) if sender.receiver_count() == 0 => true, + Some(sender) => { + let _ = sender.send(()); + false + } + None => false, + }; + + if should_remove { + map.remove(&node_id); } } } @@ -43,10 +52,15 @@ impl Default for AssignmentBus { } pub async fn spawn_listener(pool: PgPool, bus: AssignmentBus) -> Result> { - let mut listener = PgListener::connect_with(&pool).await?; - listener.listen(CHANNEL).await?; - let handle = tokio::spawn(async move { + let mut listener = match connect_listener(&pool).await { + Ok(listener) => listener, + Err(err) => { + tracing::error!("bus: initial listener setup failed: {err}"); + return; + } + }; + loop { match listener.recv().await { Ok(notification) => { @@ -60,6 +74,12 @@ pub async fn spawn_listener(pool: PgPool, bus: AssignmentBus) -> Result { tracing::error!("bus: listener error: {err}"); tokio::time::sleep(std::time::Duration::from_millis(200)).await; + match connect_listener(&pool).await { + Ok(new_listener) => listener = new_listener, + Err(reconnect_err) => { + tracing::error!("bus: listener reconnect failed: {reconnect_err}"); + } + } } } } @@ -67,3 +87,9 @@ pub async fn spawn_listener(pool: PgPool, bus: AssignmentBus) -> Result Result { + let mut listener = PgListener::connect_with(pool).await?; + listener.listen(CHANNEL).await?; + Ok(listener) +} diff --git a/syva-cp/src/db/types.rs b/syva-cp/src/db/types.rs index c24c653..1775b4f 100644 --- a/syva-cp/src/db/types.rs +++ b/syva-cp/src/db/types.rs @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Map as JsonMap; use serde_json::Value as JsonValue; use sqlx::FromRow; +use sqlx::Row; use std::collections::BTreeMap; use uuid::Uuid; @@ -133,6 +134,25 @@ pub struct Node { pub caused_by_event_id: Option, } +pub fn node_from_row(row: &sqlx::postgres::PgRow) -> Node { + Node { + id: row.get("id"), + node_name: row.get("node_name"), + cluster_id: row.get("cluster_id"), + status: row.get("status"), + fingerprint: row.get("fingerprint"), + last_seen_at: row.get("last_seen_at"), + last_heartbeat_event_id: row.get("last_heartbeat_event_id"), + current_token_expires_at: row.get("current_token_expires_at"), + capabilities_json: row.get("capabilities_json"), + metadata_json: row.get("metadata_json"), + created_at: row.get("created_at"), + updated_at: row.get("updated_at"), + version: row.get("version"), + caused_by_event_id: row.get("caused_by_event_id"), + } +} + #[derive(Debug, Clone, FromRow, Serialize, Deserialize)] pub struct Assignment { pub id: Uuid, diff --git a/syva-cp/src/engine/assignment.rs b/syva-cp/src/engine/assignment.rs index a147c8e..a1b3696 100644 --- a/syva-cp/src/engine/assignment.rs +++ b/syva-cp/src/engine/assignment.rs @@ -71,7 +71,8 @@ pub fn diff_assignments( }) { None => true, Some(existing) => { - existing.desired_policy_id != desired_assignment.desired_policy_id + existing.status == "removing" + || existing.desired_policy_id != desired_assignment.desired_policy_id || existing.desired_zone_version != desired_assignment.desired_zone_version } @@ -117,6 +118,7 @@ pub struct ExistingAssignment { pub node_id: Uuid, pub desired_policy_id: Uuid, pub desired_zone_version: i64, + pub status: String, } #[cfg(test)] @@ -225,6 +227,7 @@ mod tests { node_id: node_one, desired_policy_id: old_policy, desired_zone_version: 1, + status: "applied".into(), }, ExistingAssignment { id: Uuid::new_v4(), @@ -232,6 +235,7 @@ mod tests { node_id: node_two, desired_policy_id: old_policy, desired_zone_version: 1, + status: "applied".into(), }, ]; @@ -254,4 +258,30 @@ mod tests { assert_eq!(upsert.len(), 2); assert_eq!(remove.len(), 1); } + + #[test] + fn diff_reactivates_removing_assignment() { + let zone_id = Uuid::new_v4(); + let node_id = Uuid::new_v4(); + let policy_id = Uuid::new_v4(); + + let current = vec![ExistingAssignment { + id: Uuid::new_v4(), + zone_id, + node_id, + desired_policy_id: policy_id, + desired_zone_version: 1, + status: "removing".into(), + }]; + let desired = vec![DesiredAssignment { + zone_id, + node_id, + desired_policy_id: policy_id, + desired_zone_version: 1, + }]; + + let (upsert, remove) = diff_assignments(¤t, &desired); + assert_eq!(upsert.len(), 1); + assert!(remove.is_empty()); + } } diff --git a/syva-cp/src/read/node.rs b/syva-cp/src/read/node.rs index 9a07efc..41feba7 100644 --- a/syva-cp/src/read/node.rs +++ b/syva-cp/src/read/node.rs @@ -1,4 +1,4 @@ -use crate::db::types::{Node, NodeLabels}; +use crate::db::types::{node_from_row, Node, NodeLabels}; use crate::error::CpError; use sqlx::postgres::PgPool; use sqlx::Row; @@ -15,7 +15,7 @@ pub async fn get_node(pool: &PgPool, node_id: Uuid) -> Result<(Node, NodeLabels) identifier: node_id.to_string(), })?; - let node = crate::write::node::node_from_row(&row); + let node = node_from_row(&row); let labels = load_labels(pool, node_id).await?; Ok((node, labels)) } @@ -33,7 +33,7 @@ pub async fn get_node_by_name( identifier: node_name.to_string(), })?; - let node = crate::write::node::node_from_row(&row); + let node = node_from_row(&row); let labels = load_labels(pool, node.id).await?; Ok((node, labels)) } @@ -60,10 +60,13 @@ pub async fn list_nodes( } }; - let mut out = Vec::with_capacity(rows.len()); - for row in rows { - let node = crate::write::node::node_from_row(&row); - let labels = load_labels(pool, node.id).await?; + let nodes: Vec = rows.into_iter().map(|row| node_from_row(&row)).collect(); + let node_ids: Vec = nodes.iter().map(|node| node.id).collect(); + let mut labels_by_node = load_labels_for_nodes(pool, &node_ids).await?; + + let mut out = Vec::with_capacity(nodes.len()); + for node in nodes { + let labels = labels_by_node.remove(&node.id).unwrap_or_default(); out.push((node, labels)); } @@ -86,3 +89,38 @@ pub async fn load_labels(pool: &PgPool, node_id: Uuid) -> Result>()) } + +async fn load_labels_for_nodes( + pool: &PgPool, + node_ids: &[Uuid], +) -> Result, CpError> { + let mut labels_by_node = node_ids + .iter() + .copied() + .map(|node_id| (node_id, BTreeMap::new())) + .collect::>(); + + if node_ids.is_empty() { + return Ok(labels_by_node); + } + + let rows = sqlx::query( + "SELECT node_id, key, value FROM node_labels WHERE node_id = ANY($1)", + ) + .bind(node_ids) + .fetch_all(pool) + .await?; + + for row in rows { + let node_id = row.get::("node_id"); + let key = row.get::("key"); + let value = row.get::("value"); + + labels_by_node + .entry(node_id) + .or_default() + .extend(std::iter::once((key, value))); + } + + Ok(labels_by_node) +} diff --git a/syva-cp/src/rpc/assignment_service.rs b/syva-cp/src/rpc/assignment_service.rs index a5ef330..a6e59c1 100644 --- a/syva-cp/src/rpc/assignment_service.rs +++ b/syva-cp/src/rpc/assignment_service.rs @@ -76,7 +76,13 @@ impl AssignmentService for AssignmentServiceImpl { let mut bus_rx = bus.subscribe(node_id).await; let mut server_revision = 2_i64; - while bus_rx.recv().await.is_ok() { + loop { + match bus_rx.recv().await { + Ok(_) => {} + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + let current = match build_assignments_for_node(&pool, node_id).await { Ok(assignments) => assignments, Err(err) => { @@ -255,7 +261,7 @@ async fn build_assignments_for_node( FROM assignments a INNER JOIN zones z ON z.id = a.zone_id AND z.deleted_at IS NULL INNER JOIN policies p ON p.id = a.desired_policy_id - WHERE a.node_id = $1 AND a.status NOT IN ('removed', 'failed')"#, + WHERE a.node_id = $1 AND a.status NOT IN ('removing', 'removed', 'failed')"#, ) .bind(node_id) .fetch_all(pool) diff --git a/syva-cp/src/write/node.rs b/syva-cp/src/write/node.rs index efd8c46..4581c96 100644 --- a/syva-cp/src/write/node.rs +++ b/syva-cp/src/write/node.rs @@ -1,4 +1,4 @@ -use crate::db::types::{Actor, Node, NodeLabels}; +use crate::db::types::{node_from_row, Actor, Node, NodeLabels}; use crate::engine::assignment::{ compute_node_assignments, diff_assignments, DesiredAssignment, ExistingAssignment, NodeForAssignment, ZoneForAssignment, @@ -750,25 +750,6 @@ fn validate_register_node(input: &RegisterNodeInput) -> Result<(), CpError> { Ok(()) } -pub(crate) fn node_from_row(row: &sqlx::postgres::PgRow) -> Node { - Node { - id: row.get("id"), - node_name: row.get("node_name"), - cluster_id: row.get("cluster_id"), - status: row.get("status"), - fingerprint: row.get("fingerprint"), - last_seen_at: row.get("last_seen_at"), - last_heartbeat_event_id: row.get("last_heartbeat_event_id"), - current_token_expires_at: row.get("current_token_expires_at"), - capabilities_json: row.get("capabilities_json"), - metadata_json: row.get("metadata_json"), - created_at: row.get("created_at"), - updated_at: row.get("updated_at"), - version: row.get("version"), - caused_by_event_id: row.get("caused_by_event_id"), - } -} - #[allow(dead_code)] pub(crate) fn node_advisory_lock_key(node_id: Uuid) -> i64 { let mut hash: u64 = 0xcbf29ce484222325; @@ -819,9 +800,9 @@ pub(crate) async fn recompute_node_assignments_in_tx( let desired = compute_node_assignments(&node_input, &zones); let current_rows = sqlx::query( - r#"SELECT id, zone_id, node_id, desired_policy_id, desired_zone_version + r#"SELECT id, zone_id, node_id, status, desired_policy_id, desired_zone_version FROM assignments - WHERE node_id = $1 AND status NOT IN ('removed', 'failed')"#, + WHERE node_id = $1 AND status NOT IN ('removed', 'failed', 'removing')"#, ) .bind(node.id) .fetch_all(&mut **tx) @@ -836,6 +817,7 @@ pub(crate) async fn recompute_node_assignments_in_tx( node_id: row.get("node_id"), desired_policy_id: row.get("desired_policy_id"), desired_zone_version: row.get("desired_zone_version"), + status: row.get("status"), }) .collect::>(); diff --git a/syva-cp/src/write/zone.rs b/syva-cp/src/write/zone.rs index edc76cf..6fe9b15 100644 --- a/syva-cp/src/write/zone.rs +++ b/syva-cp/src/write/zone.rs @@ -904,22 +904,35 @@ pub(crate) async fn recompute_zone_assignments_in_tx( .await .map_err(CpError::Database)?; - let mut nodes = Vec::with_capacity(node_rows.len()); - for node_row in node_rows { - let node_id: Uuid = node_row.get("id"); - let label_rows = sqlx::query( - "SELECT key, value FROM node_labels WHERE node_id = $1", + let node_ids: Vec = + node_rows.iter().map(|node_row| node_row.get("id")).collect(); + let label_rows = if node_ids.is_empty() { + Vec::new() + } else { + sqlx::query( + r#"SELECT node_id, key, value + FROM node_labels + WHERE node_id = ANY($1)"#, ) - .bind(node_id) + .bind(&node_ids) .fetch_all(&mut **tx) .await - .map_err(CpError::Database)?; + .map_err(CpError::Database)? + }; - let mut labels: NodeLabels = BTreeMap::new(); - for label_row in label_rows { - labels.insert(label_row.get("key"), label_row.get("value")); - } + let mut labels_by_node_id: BTreeMap = BTreeMap::new(); + for label_row in label_rows { + let label_node_id: Uuid = label_row.get("node_id"); + labels_by_node_id + .entry(label_node_id) + .or_default() + .insert(label_row.get("key"), label_row.get("value")); + } + let mut nodes = Vec::with_capacity(node_rows.len()); + for node_row in node_rows { + let node_id: Uuid = node_row.get("id"); + let labels = labels_by_node_id.remove(&node_id).unwrap_or_default(); nodes.push(NodeForAssignment { node_id, node_name: node_row.get("node_name"), @@ -933,9 +946,9 @@ pub(crate) async fn recompute_zone_assignments_in_tx( }; let current_rows = sqlx::query( - r#"SELECT id, zone_id, node_id, desired_policy_id, desired_zone_version + r#"SELECT id, zone_id, node_id, status, desired_policy_id, desired_zone_version FROM assignments - WHERE zone_id = $1 AND status NOT IN ('removed', 'failed')"#, + WHERE zone_id = $1 AND status NOT IN ('removed', 'failed', 'removing')"#, ) .bind(zone_id) .fetch_all(&mut **tx) @@ -950,6 +963,7 @@ pub(crate) async fn recompute_zone_assignments_in_tx( node_id: row.get("node_id"), desired_policy_id: row.get("desired_policy_id"), desired_zone_version: row.get("desired_zone_version"), + status: row.get("status"), }) .collect::>();