From db54006114f9efcdfaae9e6db66c2d1f23908d26 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 22 May 2026 23:49:09 +0100 Subject: [PATCH 1/3] Use ArcSwap for aggregate fn registry Signed-off-by: Robert Kruszewski --- vortex-array/src/aggregate_fn/accumulator.rs | 14 +++----- .../src/aggregate_fn/accumulator_grouped.rs | 14 ++++---- vortex-array/src/aggregate_fn/proto.rs | 2 +- vortex-array/src/aggregate_fn/session.rs | 36 +++++++++++-------- 4 files changed, 34 insertions(+), 32 deletions(-) diff --git a/vortex-array/src/aggregate_fn/accumulator.rs b/vortex-array/src/aggregate_fn/accumulator.rs index 762d5e00fb0..669858d9479 100644 --- a/vortex-array/src/aggregate_fn/accumulator.rs +++ b/vortex-array/src/aggregate_fn/accumulator.rs @@ -142,7 +142,7 @@ impl DynAccumulator for Accumulator { } let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().kernels; + let kernels = &session.aggregate_fns().kernels.load(); // 1. Kernel registry first: a registered `(encoding, aggregate_fn)` kernel is strictly // more specific than the vtable's `try_accumulate` short-circuit. Checking the @@ -150,13 +150,11 @@ impl DynAccumulator for Accumulator { // `Combined::try_accumulate` always returns true, so a later kernel check would be // unreachable. { - let kernels_r = kernels.read(); let batch_id = batch.encoding_id(); - let kernel = kernels_r + let kernel = kernels .get(&(batch_id, Some(self.aggregate_fn.id()))) - .or_else(|| kernels_r.get(&(batch_id, None))) + .or_else(|| kernels.get(&(batch_id, None))) .copied(); - drop(kernels_r); if let Some(kernel) = kernel && let Some(result) = kernel.aggregate(&self.aggregate_fn, batch, ctx)? { @@ -187,13 +185,11 @@ impl DynAccumulator for Accumulator { break; } - let kernels_r = kernels.read(); let batch_id = batch.encoding_id(); - let kernel = kernels_r + let kernel = kernels .get(&(batch_id, Some(self.aggregate_fn.id()))) - .or_else(|| kernels_r.get(&(batch_id, None))) + .or_else(|| kernels.get(&(batch_id, None))) .copied(); - drop(kernels_r); if let Some(kernel) = kernel && let Some(result) = kernel.aggregate(&self.aggregate_fn, &batch, ctx)? { diff --git a/vortex-array/src/aggregate_fn/accumulator_grouped.rs b/vortex-array/src/aggregate_fn/accumulator_grouped.rs index a751a7c5749..9f28f317fdc 100644 --- a/vortex-array/src/aggregate_fn/accumulator_grouped.rs +++ b/vortex-array/src/aggregate_fn/accumulator_grouped.rs @@ -163,17 +163,16 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().grouped_kernels; + let kernels = &session.aggregate_fns().grouped_kernels.load(); for _ in 0..max_iterations() { if elements.is::() { break; } - let kernels_r = kernels.read(); - if let Some(result) = kernels_r + if let Some(result) = kernels .get(&(elements.encoding_id(), Some(self.aggregate_fn.id()))) - .or_else(|| kernels_r.get(&(elements.encoding_id(), None))) + .or_else(|| kernels.get(&(elements.encoding_id(), None))) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { @@ -255,17 +254,16 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().grouped_kernels; + let kernels = &session.aggregate_fns().grouped_kernels.load(); for _ in 0..64 { if elements.is::() { break; } - let kernels_r = kernels.read(); - if let Some(result) = kernels_r + if let Some(result) = kernels .get(&(elements.encoding_id(), Some(self.aggregate_fn.id()))) - .or_else(|| kernels_r.get(&(elements.encoding_id(), None))) + .or_else(|| kernels.get(&(elements.encoding_id(), None))) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { diff --git a/vortex-array/src/aggregate_fn/proto.rs b/vortex-array/src/aggregate_fn/proto.rs index 9bcfab5818c..bff198b3955 100644 --- a/vortex-array/src/aggregate_fn/proto.rs +++ b/vortex-array/src/aggregate_fn/proto.rs @@ -36,7 +36,7 @@ impl AggregateFnRef { /// Note: the serialization format is not stable and may change between versions. pub fn from_proto(proto: &pb::AggregateFn, session: &VortexSession) -> VortexResult { let agg_fn_id: AggregateFnId = AggregateFnId::new(proto.id.as_str()); - let agg_fn = if let Some(plugin) = session.aggregate_fns().registry().find(&agg_fn_id) { + let agg_fn = if let Some(plugin) = session.aggregate_fns().find_plugin(&agg_fn_id) { plugin.deserialize(proto.metadata(), session)? } else if session.allows_unknown() { new_foreign_aggregate_fn(agg_fn_id, proto.metadata().to_vec()) diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index d89f9da069d..1d3400bb68d 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -4,7 +4,7 @@ use std::any::Any; use std::sync::Arc; -use parking_lot::RwLock; +use arc_swap::ArcSwap; use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::SessionVar; @@ -49,10 +49,10 @@ pub type AggregateFnRegistry = Registry; /// Session state for aggregate function vtables. #[derive(Debug)] pub struct AggregateFnSession { - registry: AggregateFnRegistry, + registry: ArcSwap>, - pub(super) kernels: RwLock>, - pub(super) grouped_kernels: RwLock>, + pub(super) kernels: ArcSwap>, + pub(super) grouped_kernels: ArcSwap>, } impl SessionVar for AggregateFnSession { @@ -70,9 +70,9 @@ type KernelKey = (ArrayId, Option); impl Default for AggregateFnSession { fn default() -> Self { let this = Self { - registry: AggregateFnRegistry::default(), - kernels: RwLock::new(HashMap::default()), - grouped_kernels: RwLock::new(HashMap::default()), + registry: ArcSwap::from_pointee(HashMap::default()), + kernels: ArcSwap::from_pointee(HashMap::default()), + grouped_kernels: ArcSwap::from_pointee(HashMap::default()), }; // Register the built-in aggregate functions @@ -107,15 +107,20 @@ impl Default for AggregateFnSession { impl AggregateFnSession { /// Returns the aggregate function registry. - pub fn registry(&self) -> &AggregateFnRegistry { - &self.registry + pub fn find_plugin(&self, id: &AggregateFnId) -> Option { + self.registry.load().get(id).cloned() } /// Register an aggregate function vtable in the session, replacing any existing vtable with /// the same ID. pub fn register(&self, vtable: V) { - self.registry - .register(vtable.id(), Arc::new(vtable) as AggregateFnPluginRef); + let id = vtable.id(); + let pluginref = Arc::new(vtable) as AggregateFnPluginRef; + self.registry.rcu(move |registry| { + let mut existing = registry.as_ref().clone(); + existing.insert(id, Arc::clone(&pluginref)); + existing + }); } /// Register an aggregate function kernel for a specific aggregate function and array type. @@ -125,9 +130,12 @@ impl AggregateFnSession { agg_fn_id: Option>, kernel: &'static dyn DynAggregateKernel, ) { - self.kernels - .write() - .insert((array_id.into(), agg_fn_id.map(|id| id.into())), kernel); + let id = (array_id.into(), agg_fn_id.map(|id| id.into())); + self.kernels.rcu(move |registry| { + let mut existing = registry.as_ref().clone(); + existing.insert(id, kernel); + existing + }); } } From 7d1b61866d52b25851b4b0ed20efd610526271eb Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 27 May 2026 11:25:16 +0100 Subject: [PATCH 2/3] more Signed-off-by: Robert Kruszewski --- vortex-array/src/aggregate_fn/accumulator.rs | 16 +++----- .../src/aggregate_fn/accumulator_grouped.rs | 14 +++---- vortex-array/src/aggregate_fn/session.rs | 38 +++++++++++++++---- 3 files changed, 42 insertions(+), 26 deletions(-) diff --git a/vortex-array/src/aggregate_fn/accumulator.rs b/vortex-array/src/aggregate_fn/accumulator.rs index 669858d9479..cf5e7f6c219 100644 --- a/vortex-array/src/aggregate_fn/accumulator.rs +++ b/vortex-array/src/aggregate_fn/accumulator.rs @@ -142,7 +142,7 @@ impl DynAccumulator for Accumulator { } let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().kernels.load(); + let aggregate_fns = session.aggregate_fns(); // 1. Kernel registry first: a registered `(encoding, aggregate_fn)` kernel is strictly // more specific than the vtable's `try_accumulate` short-circuit. Checking the @@ -150,11 +150,8 @@ impl DynAccumulator for Accumulator { // `Combined::try_accumulate` always returns true, so a later kernel check would be // unreachable. { - let batch_id = batch.encoding_id(); - let kernel = kernels - .get(&(batch_id, Some(self.aggregate_fn.id()))) - .or_else(|| kernels.get(&(batch_id, None))) - .copied(); + let kernel = + aggregate_fns.find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()); if let Some(kernel) = kernel && let Some(result) = kernel.aggregate(&self.aggregate_fn, batch, ctx)? { @@ -185,11 +182,8 @@ impl DynAccumulator for Accumulator { break; } - let batch_id = batch.encoding_id(); - let kernel = kernels - .get(&(batch_id, Some(self.aggregate_fn.id()))) - .or_else(|| kernels.get(&(batch_id, None))) - .copied(); + let kernel = + aggregate_fns.find_aggregate_kernel(batch.encoding_id(), self.aggregate_fn.id()); if let Some(kernel) = kernel && let Some(result) = kernel.aggregate(&self.aggregate_fn, &batch, ctx)? { diff --git a/vortex-array/src/aggregate_fn/accumulator_grouped.rs b/vortex-array/src/aggregate_fn/accumulator_grouped.rs index 9f28f317fdc..3fb53229b4e 100644 --- a/vortex-array/src/aggregate_fn/accumulator_grouped.rs +++ b/vortex-array/src/aggregate_fn/accumulator_grouped.rs @@ -163,16 +163,15 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().grouped_kernels.load(); + let aggregate_fns = session.aggregate_fns(); for _ in 0..max_iterations() { if elements.is::() { break; } - if let Some(result) = kernels - .get(&(elements.encoding_id(), Some(self.aggregate_fn.id()))) - .or_else(|| kernels.get(&(elements.encoding_id(), None))) + if let Some(result) = aggregate_fns + .find_groupped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { @@ -254,16 +253,15 @@ impl GroupedAccumulator { let mut elements = groups.elements().clone(); let groups_validity = groups.validity()?; let session = ctx.session().clone(); - let kernels = &session.aggregate_fns().grouped_kernels.load(); + let aggregate_fns = session.aggregate_fns(); for _ in 0..64 { if elements.is::() { break; } - if let Some(result) = kernels - .get(&(elements.encoding_id(), Some(self.aggregate_fn.id()))) - .or_else(|| kernels.get(&(elements.encoding_id(), None))) + if let Some(result) = aggregate_fns + .find_groupped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index 1d3400bb68d..90dbb48729f 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -8,7 +8,6 @@ use arc_swap::ArcSwap; use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::SessionVar; -use vortex_session::registry::Registry; use vortex_utils::aliases::hash_map::HashMap; use crate::aggregate_fn::AggregateFnId; @@ -43,16 +42,13 @@ use crate::arrays::dict::compute::is_constant::DictIsConstantKernel; use crate::arrays::dict::compute::is_sorted::DictIsSortedKernel; use crate::arrays::dict::compute::min_max::DictMinMaxKernel; -/// Registry of aggregate function vtables. -pub type AggregateFnRegistry = Registry; - /// Session state for aggregate function vtables. #[derive(Debug)] pub struct AggregateFnSession { registry: ArcSwap>, - pub(super) kernels: ArcSwap>, - pub(super) grouped_kernels: ArcSwap>, + kernels: ArcSwap>, + grouped_kernels: ArcSwap>, } impl SessionVar for AggregateFnSession { @@ -106,7 +102,7 @@ impl Default for AggregateFnSession { } impl AggregateFnSession { - /// Returns the aggregate function registry. + /// Find plugin in the registry for the given id pub fn find_plugin(&self, id: &AggregateFnId) -> Option { self.registry.load().get(id).cloned() } @@ -123,6 +119,20 @@ impl AggregateFnSession { }); } + pub fn find_aggregate_kernel( + &self, + array_id: impl Into, + agg_fn_id: impl Into, + ) -> Option<&'static dyn DynAggregateKernel> { + let loaded = self.kernels.load(); + let id = array_id.into(); + let fn_id = agg_fn_id.into(); + loaded + .get(&(id, Some(fn_id))) + .or_else(|| loaded.get(&(id, None))) + .copied() + } + /// Register an aggregate function kernel for a specific aggregate function and array type. pub fn register_aggregate_kernel( &self, @@ -137,6 +147,20 @@ impl AggregateFnSession { existing }); } + + pub fn find_groupped_kernel( + &self, + array_id: impl Into, + agg_fn_id: impl Into, + ) -> Option<&'static dyn DynGroupedAggregateKernel> { + let loaded = self.grouped_kernels.load(); + let id = array_id.into(); + let fn_id = agg_fn_id.into(); + loaded + .get(&(id, Some(fn_id))) + .or_else(|| loaded.get(&(id, None))) + .copied() + } } /// Extension trait for accessing aggregate function session data. From a472bd2c27710d262f0103284c25ef9efd4a81cf Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 27 May 2026 11:26:40 +0100 Subject: [PATCH 3/3] rename Signed-off-by: Robert Kruszewski --- vortex-array/src/aggregate_fn/accumulator_grouped.rs | 4 ++-- vortex-array/src/aggregate_fn/session.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/vortex-array/src/aggregate_fn/accumulator_grouped.rs b/vortex-array/src/aggregate_fn/accumulator_grouped.rs index 3fb53229b4e..be940d8c474 100644 --- a/vortex-array/src/aggregate_fn/accumulator_grouped.rs +++ b/vortex-array/src/aggregate_fn/accumulator_grouped.rs @@ -171,7 +171,7 @@ impl GroupedAccumulator { } if let Some(result) = aggregate_fns - .find_groupped_kernel(elements.encoding_id(), self.aggregate_fn.id()) + .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { @@ -261,7 +261,7 @@ impl GroupedAccumulator { } if let Some(result) = aggregate_fns - .find_groupped_kernel(elements.encoding_id(), self.aggregate_fn.id()) + .find_grouped_kernel(elements.encoding_id(), self.aggregate_fn.id()) .and_then(|kernel| { // SAFETY: we assume that elements execution is safe let groups = unsafe { diff --git a/vortex-array/src/aggregate_fn/session.rs b/vortex-array/src/aggregate_fn/session.rs index 90dbb48729f..c145ad4a2d9 100644 --- a/vortex-array/src/aggregate_fn/session.rs +++ b/vortex-array/src/aggregate_fn/session.rs @@ -148,7 +148,7 @@ impl AggregateFnSession { }); } - pub fn find_groupped_kernel( + pub fn find_grouped_kernel( &self, array_id: impl Into, agg_fn_id: impl Into,