Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit bf2826c

Browse files
authored
refactor: refactor NodeStatusUpdater::process_nodes to run concurrently (#585)
* refactor NodeStatusUpdater::process_nodes to run concurrently, refactor trait objects to use enums instead * clippy * clippy * fmt
1 parent a282313 commit bf2826c

17 files changed

Lines changed: 691 additions & 558 deletions

File tree

crates/orchestrator/src/api/routes/storage.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,6 @@ mod tests {
325325
use std::sync::Arc;
326326

327327
use super::*;
328-
use crate::plugins::StatusUpdatePlugin;
329328
use crate::{
330329
api::tests::helper::{create_test_app_state, create_test_app_state_with_nodegroups},
331330
models::node::{NodeStatus, OrchestratorNode},
@@ -575,12 +574,14 @@ mod tests {
575574
None,
576575
None,
577576
));
578-
let _ = plugin.clone().register_observer().await;
579-
580-
let _ = plugin
581-
.handle_status_change(&node, &NodeStatus::Healthy)
577+
let _ = app_state
578+
.store_context
579+
.task_store
580+
.add_observer(plugin.clone())
582581
.await;
583582

583+
let _ = plugin.handle_status_change(&node).await;
584+
584585
let task = Task {
585586
id: Uuid::new_v4(),
586587
image: "test-image".to_string(),

crates/orchestrator/src/api/routes/task.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::api::server::AppState;
2+
use crate::plugins::node_groups::get_task_topologies;
23
use actix_web::{
34
web::{self, delete, get, post, Data},
45
HttpResponse, Scope,
@@ -64,8 +65,8 @@ async fn create_task(task: web::Json<TaskRequest>, app_state: Data<AppState>) ->
6465
}
6566
};
6667

67-
if let Some(group_plugin) = &app_state.node_groups_plugin {
68-
match group_plugin.get_task_topologies(&task) {
68+
if app_state.node_groups_plugin.is_some() {
69+
match get_task_topologies(&task) {
6970
Ok(topology) => {
7071
if topology.is_empty() {
7172
return HttpResponse::BadRequest().json(json!({"success": false, "error": "No topology found for task but grouping plugin is active."}));

crates/orchestrator/src/discovery/monitor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub struct DiscoveryMonitor {
2626
heartbeats: Arc<LoopHeartbeats>,
2727
http_client: reqwest::Client,
2828
max_healthy_nodes_with_same_endpoint: u32,
29-
status_change_handlers: Vec<Box<dyn StatusUpdatePlugin>>,
29+
status_change_handlers: Vec<StatusUpdatePlugin>,
3030
}
3131

3232
impl DiscoveryMonitor {
@@ -39,7 +39,7 @@ impl DiscoveryMonitor {
3939
store_context: Arc<StoreContext>,
4040
heartbeats: Arc<LoopHeartbeats>,
4141
max_healthy_nodes_with_same_endpoint: u32,
42-
status_change_handlers: Vec<Box<dyn StatusUpdatePlugin>>,
42+
status_change_handlers: Vec<StatusUpdatePlugin>,
4343
) -> Self {
4444
Self {
4545
coordinator_wallet,

crates/orchestrator/src/events/mod.rs

Lines changed: 0 additions & 7 deletions
This file was deleted.

crates/orchestrator/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
mod api;
22
mod discovery;
3-
mod events;
43
mod metrics;
54
mod models;
65
mod node;

crates/orchestrator/src/main.rs

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ async fn main() -> Result<()> {
154154
.unwrap();
155155

156156
let group_store_context = store_context.clone();
157-
let mut scheduler_plugins: Vec<Box<dyn SchedulerPlugin>> = Vec::new();
158-
let mut status_update_plugins: Vec<Box<dyn StatusUpdatePlugin>> = vec![];
157+
let mut scheduler_plugins: Vec<SchedulerPlugin> = Vec::new();
158+
let mut status_update_plugins: Vec<StatusUpdatePlugin> = vec![];
159159
let mut node_groups_plugin: Option<Arc<NodeGroupsPlugin>> = None;
160160
let mut webhook_plugins: Vec<WebhookPlugin> = vec![];
161161

@@ -167,7 +167,7 @@ async fn main() -> Result<()> {
167167
let plugin = WebhookPlugin::new(config);
168168
let plugin_clone = plugin.clone();
169169
webhook_plugins.push(plugin_clone);
170-
status_update_plugins.push(Box::new(plugin));
170+
status_update_plugins.push(plugin.into());
171171
info!("Plugin: Webhook plugin initialized");
172172
}
173173
}
@@ -201,26 +201,27 @@ async fn main() -> Result<()> {
201201
match serde_json::from_str::<Vec<NodeGroupConfiguration>>(&node_group_configs) {
202202
Ok(configs) if !configs.is_empty() => {
203203
let node_groups_heartbeats = heartbeats.clone();
204-
let group_plugin = NodeGroupsPlugin::new(
204+
205+
let group_plugin = Arc::new(NodeGroupsPlugin::new(
205206
configs,
206207
store.clone(),
207208
group_store_context.clone(),
208209
Some(node_groups_heartbeats.clone()),
209210
Some(webhook_plugins.clone()),
210-
);
211+
));
212+
213+
// Register the plugin as a task observer
214+
group_store_context
215+
.task_store
216+
.add_observer(group_plugin.clone())
217+
.await;
211218

212219
let status_group_plugin = group_plugin.clone();
213220
let group_plugin_for_server = group_plugin.clone();
214-
let group_plugin_arc = Arc::new(group_plugin_for_server);
215-
216-
// Register the plugin as a task observer
217-
if let Err(e) = group_plugin_arc.clone().register_observer().await {
218-
error!("Failed to register node groups plugin as observer: {e}");
219-
}
220221

221-
node_groups_plugin = Some(group_plugin_arc);
222-
scheduler_plugins.push(Box::new(group_plugin));
223-
status_update_plugins.push(Box::new(status_group_plugin));
222+
node_groups_plugin = Some(group_plugin_for_server);
223+
scheduler_plugins.push(group_plugin.into());
224+
status_update_plugins.push(status_group_plugin.into());
224225
info!("Plugin: Node group plugin initialized");
225226
}
226227
Ok(_) => {
@@ -263,16 +264,16 @@ async fn main() -> Result<()> {
263264
}
264265

265266
// Create status_update_plugins for discovery monitor
266-
let mut discovery_status_update_plugins: Vec<Box<dyn StatusUpdatePlugin>> = vec![];
267+
let mut discovery_status_update_plugins: Vec<StatusUpdatePlugin> = vec![];
267268

268269
// Add webhook plugins to discovery status update plugins
269270
for plugin in &webhook_plugins {
270-
discovery_status_update_plugins.push(Box::new(plugin.clone()));
271+
discovery_status_update_plugins.push(plugin.into());
271272
}
272273

273274
// Add node groups plugin if available
274275
if let Some(group_plugin) = node_groups_plugin.clone() {
275-
discovery_status_update_plugins.push(Box::new(group_plugin.as_ref().clone()));
276+
discovery_status_update_plugins.push(group_plugin.into());
276277
}
277278

278279
let discovery_store_context = store_context.clone();
@@ -316,16 +317,16 @@ async fn main() -> Result<()> {
316317
});
317318

318319
// Create status_update_plugins for status updater
319-
let mut status_updater_plugins: Vec<Box<dyn StatusUpdatePlugin>> = vec![];
320+
let mut status_updater_plugins: Vec<StatusUpdatePlugin> = vec![];
320321

321322
// Add webhook plugins to status updater plugins
322323
for plugin in &webhook_plugins {
323-
status_updater_plugins.push(Box::new(plugin.clone()));
324+
status_updater_plugins.push(plugin.into());
324325
}
325326

326327
// Add node groups plugin if available
327328
if let Some(group_plugin) = node_groups_plugin.clone() {
328-
status_updater_plugins.push(Box::new(group_plugin.as_ref().clone()));
329+
status_updater_plugins.push(group_plugin.into());
329330
}
330331

331332
let status_update_store_context = store_context.clone();
Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,103 @@
1-
mod traits;
2-
pub use traits::*;
1+
use crate::plugins::newest_task::NewestTaskPlugin;
2+
use alloy::primitives::Address;
3+
use anyhow::Result;
4+
use shared::models::task::Task;
5+
use std::sync::Arc;
36

4-
pub(crate) mod node_groups;
7+
use crate::{
8+
models::node::{NodeStatus, OrchestratorNode},
9+
plugins::node_groups::NodeGroupsPlugin,
10+
plugins::webhook::WebhookPlugin,
11+
};
512

613
pub(crate) mod newest_task;
7-
14+
pub(crate) mod node_groups;
815
pub(crate) mod webhook;
16+
17+
#[derive(Clone)]
18+
pub enum StatusUpdatePlugin {
19+
NodeGroupsPlugin(Arc<NodeGroupsPlugin>),
20+
WebhookPlugin(WebhookPlugin),
21+
}
22+
23+
impl StatusUpdatePlugin {
24+
pub(crate) async fn handle_status_change(
25+
&self,
26+
node: &OrchestratorNode,
27+
status: &NodeStatus,
28+
) -> Result<()> {
29+
match self {
30+
StatusUpdatePlugin::NodeGroupsPlugin(plugin) => plugin.handle_status_change(node).await,
31+
StatusUpdatePlugin::WebhookPlugin(plugin) => plugin.handle_status_change(node, status),
32+
}
33+
}
34+
}
35+
36+
impl From<Arc<NodeGroupsPlugin>> for StatusUpdatePlugin {
37+
fn from(plugin: Arc<NodeGroupsPlugin>) -> Self {
38+
StatusUpdatePlugin::NodeGroupsPlugin(plugin)
39+
}
40+
}
41+
42+
impl From<&Arc<NodeGroupsPlugin>> for StatusUpdatePlugin {
43+
fn from(plugin: &Arc<NodeGroupsPlugin>) -> Self {
44+
StatusUpdatePlugin::NodeGroupsPlugin(plugin.clone())
45+
}
46+
}
47+
48+
impl From<WebhookPlugin> for StatusUpdatePlugin {
49+
fn from(plugin: WebhookPlugin) -> Self {
50+
StatusUpdatePlugin::WebhookPlugin(plugin)
51+
}
52+
}
53+
54+
impl From<&WebhookPlugin> for StatusUpdatePlugin {
55+
fn from(plugin: &WebhookPlugin) -> Self {
56+
StatusUpdatePlugin::WebhookPlugin(plugin.clone())
57+
}
58+
}
59+
60+
#[derive(Clone)]
61+
pub enum SchedulerPlugin {
62+
NodeGroupsPlugin(Arc<NodeGroupsPlugin>),
63+
NewestTaskPlugin(NewestTaskPlugin),
64+
}
65+
66+
impl SchedulerPlugin {
67+
pub(crate) async fn filter_tasks(
68+
&self,
69+
tasks: &[Task],
70+
node_address: &Address,
71+
) -> Result<Vec<Task>> {
72+
match self {
73+
SchedulerPlugin::NodeGroupsPlugin(plugin) => {
74+
plugin.filter_tasks(tasks, node_address).await
75+
}
76+
SchedulerPlugin::NewestTaskPlugin(plugin) => plugin.filter_tasks(tasks),
77+
}
78+
}
79+
}
80+
81+
impl From<Arc<NodeGroupsPlugin>> for SchedulerPlugin {
82+
fn from(plugin: Arc<NodeGroupsPlugin>) -> Self {
83+
SchedulerPlugin::NodeGroupsPlugin(plugin)
84+
}
85+
}
86+
87+
impl From<&Arc<NodeGroupsPlugin>> for SchedulerPlugin {
88+
fn from(plugin: &Arc<NodeGroupsPlugin>) -> Self {
89+
SchedulerPlugin::NodeGroupsPlugin(plugin.clone())
90+
}
91+
}
92+
93+
impl From<NewestTaskPlugin> for SchedulerPlugin {
94+
fn from(plugin: NewestTaskPlugin) -> Self {
95+
SchedulerPlugin::NewestTaskPlugin(plugin)
96+
}
97+
}
98+
99+
impl From<&NewestTaskPlugin> for SchedulerPlugin {
100+
fn from(plugin: &NewestTaskPlugin) -> Self {
101+
SchedulerPlugin::NewestTaskPlugin(plugin.clone())
102+
}
103+
}

crates/orchestrator/src/plugins/newest_task/mod.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,11 @@
1-
use alloy::primitives::Address;
21
use anyhow::Result;
3-
use async_trait::async_trait;
42
use shared::models::task::Task;
53

6-
use super::{Plugin, SchedulerPlugin};
4+
#[derive(Clone)]
5+
pub struct NewestTaskPlugin;
76

8-
pub(crate) struct NewestTaskPlugin;
9-
10-
impl Plugin for NewestTaskPlugin {}
11-
12-
#[async_trait]
13-
impl SchedulerPlugin for NewestTaskPlugin {
14-
async fn filter_tasks(&self, tasks: &[Task], _node_address: &Address) -> Result<Vec<Task>> {
7+
impl NewestTaskPlugin {
8+
pub(crate) fn filter_tasks(&self, tasks: &[Task]) -> Result<Vec<Task>> {
159
if tasks.is_empty() {
1610
return Ok(vec![]);
1711
}
@@ -32,8 +26,8 @@ mod tests {
3226

3327
use super::*;
3428

35-
#[tokio::test]
36-
async fn test_filter_tasks() {
29+
#[test]
30+
fn test_filter_tasks() {
3731
let plugin = NewestTaskPlugin;
3832
let tasks = vec![
3933
Task {
@@ -54,7 +48,7 @@ mod tests {
5448
},
5549
];
5650

57-
let filtered_tasks = plugin.filter_tasks(&tasks, &Address::ZERO).await.unwrap();
51+
let filtered_tasks = plugin.filter_tasks(&tasks).unwrap();
5852
assert_eq!(filtered_tasks.len(), 1);
5953
assert_eq!(filtered_tasks[0].id, tasks[1].id);
6054
}

0 commit comments

Comments
 (0)