diff --git a/Cargo.lock b/Cargo.lock index e10e688c..724be68c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7373,9 +7373,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "trzcina" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55cd35208b88a2f0f7eb890af3034ba5676ec11cd9a61ef73248b400ae452fea" +checksum = "c58d685f2fd0c04ea9c0dcff193161de5aebf1eba5e1267807e5ae2a8562cd93" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 348c77c1..6f73ced9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ tokio-test = "0.4.4" tokio-tungstenite = "0.28" tokio-util = "0.7" thiserror = "2" -trzcina = "=0.2.1" +trzcina = "=0.3.0" url = { version = "2.5", features = ["serde"] } paddler = { version = "4.0.0", path = "paddler" } paddler_bootstrap = { version = "4.0.0", path = "paddler_bootstrap" } diff --git a/paddler/src/agent/llamacpp_arbiter_service.rs b/paddler/src/agent/llamacpp_arbiter_service.rs index 16096d78..c867d966 100644 --- a/paddler/src/agent/llamacpp_arbiter_service.rs +++ b/paddler/src/agent/llamacpp_arbiter_service.rs @@ -27,101 +27,126 @@ use crate::agent_applicable_state::AgentApplicableState; use crate::agent_applicable_state_holder::AgentApplicableStateHolder; use crate::slot_aggregated_status_manager::SlotAggregatedStatusManager; -pub struct LlamaCppArbiterService { - pub agent_applicable_state: Option, - pub agent_applicable_state_holder: Arc, - pub agent_name: Option, - pub continue_from_conversation_history_request_rx: - mpsc::UnboundedReceiver, - pub continue_from_raw_prompt_request_rx: mpsc::UnboundedReceiver, - pub desired_slots_total: i32, - pub generate_embedding_batch_request_rx: mpsc::UnboundedReceiver, - pub continuous_batch_arbiter_handle: Option, - pub model_metadata_holder: Arc, - pub slot_aggregated_status_manager: Arc, -} - -impl LlamaCppArbiterService { - async fn apply_state(&mut self, shutdown: &CancellationToken) -> Result<()> { - self.wait_for_in_flight_requests_to_finish(shutdown).await?; - self.tear_down_arbiter()?; - - if let Some(applicable_state) = self.agent_applicable_state.clone() { - self.slot_aggregated_status_manager.reset(); - - match ContinuousBatchArbiter::build_from_applicable_state( - applicable_state, - self.agent_name.clone(), - self.desired_slots_total, - self.model_metadata_holder.clone(), - self.slot_aggregated_status_manager.clone(), - ) { - ContinuousBatchArbiterBuildOutcome::ReadyToSpawn(arbiter) => { - self.continuous_batch_arbiter_handle = Some(arbiter.spawn().await?); - info!("Reconciled state change applied successfully"); - } - ContinuousBatchArbiterBuildOutcome::NoModelConfigured => { - warn!( - "No model configured in applicable state; skipping llama.cpp initialization" - ); - } +async fn apply_state( + shutdown: &CancellationToken, + agent_applicable_state: Option<&AgentApplicableState>, + agent_name: Option<&str>, + desired_slots_total: i32, + model_metadata_holder: &Arc, + slot_aggregated_status_manager: &Arc, + continuous_batch_arbiter_handle: &mut Option, +) -> Result<()> { + wait_for_in_flight_requests_to_finish( + shutdown, + continuous_batch_arbiter_handle.as_ref(), + slot_aggregated_status_manager, + ) + .await?; + shutdown_arbiter_handle(continuous_batch_arbiter_handle).await?; + + if let Some(applicable_state) = agent_applicable_state.cloned() { + slot_aggregated_status_manager.reset(); + + match ContinuousBatchArbiter::build_from_applicable_state( + applicable_state, + agent_name.map(str::to_owned), + desired_slots_total, + model_metadata_holder.clone(), + slot_aggregated_status_manager.clone(), + ) { + ContinuousBatchArbiterBuildOutcome::ReadyToSpawn(arbiter) => { + *continuous_batch_arbiter_handle = Some(arbiter.spawn().await?); + info!("Reconciled state change applied successfully"); + } + ContinuousBatchArbiterBuildOutcome::NoModelConfigured => { + warn!( + "No model configured in applicable state; skipping llama.cpp initialization" + ); } } - - self.slot_aggregated_status_manager - .slot_aggregated_status - .set_state_application_status(AgentStateApplicationStatus::Applied); - - Ok(()) } - async fn wait_for_in_flight_requests_to_finish( - &self, - shutdown: &CancellationToken, - ) -> Result<()> { - if self.continuous_batch_arbiter_handle.is_some() { - drain_in_flight_requests(&self.slot_aggregated_status_manager, shutdown).await?; - } + slot_aggregated_status_manager + .slot_aggregated_status + .set_state_application_status(AgentStateApplicationStatus::Applied); - Ok(()) - } + Ok(()) +} - fn tear_down_arbiter(&mut self) -> Result<()> { - if let Some(arbiter_handle) = self.continuous_batch_arbiter_handle.take() { - arbiter_handle - .shutdown() - .context("Unable to stop arbiter controller")?; +fn forward_command( + continuous_batch_arbiter_handle: Option<&ContinuousBatchArbiterHandle>, + command: ContinuousBatchSchedulerCommand, +) { + if let Some(arbiter_handle) = continuous_batch_arbiter_handle { + if let Err(err) = arbiter_handle.command_tx.send(command) { + error!("Failed to forward command to scheduler: {err}"); } - - Ok(()) + } else { + error!("ContinuousBatchArbiterHandle is not initialized"); } +} - fn forward_command(&self, command: ContinuousBatchSchedulerCommand) { - if let Some(arbiter_handle) = &self.continuous_batch_arbiter_handle { - if let Err(err) = arbiter_handle.command_tx.send(command) { - error!("Failed to forward command to scheduler: {err}"); - } - } else { - error!("ContinuousBatchArbiterHandle is not initialized"); - } +async fn shutdown_arbiter_handle( + continuous_batch_arbiter_handle: &mut Option, +) -> Result<()> { + let Some(handle) = continuous_batch_arbiter_handle.take() else { + return Ok(()); + }; + + tokio::task::spawn_blocking(move || handle.shutdown()) + .await + .context("Arbiter shutdown task panicked")? + .context("Arbiter shutdown returned an error") +} + +async fn try_to_apply_state( + shutdown: &CancellationToken, + agent_applicable_state: Option<&AgentApplicableState>, + agent_name: Option<&str>, + desired_slots_total: i32, + model_metadata_holder: &Arc, + slot_aggregated_status_manager: &Arc, + continuous_batch_arbiter_handle: &mut Option, +) { + if let Err(err) = apply_state( + shutdown, + agent_applicable_state, + agent_name, + desired_slots_total, + model_metadata_holder, + slot_aggregated_status_manager, + continuous_batch_arbiter_handle, + ) + .await + { + error!("Failed to apply reconciled state change: {err}"); } +} - async fn try_to_apply_state(&mut self, shutdown: &CancellationToken) { - if let Err(err) = self.apply_state(shutdown).await { - error!("Failed to apply reconciled state change: {err}"); - } +async fn wait_for_in_flight_requests_to_finish( + shutdown: &CancellationToken, + continuous_batch_arbiter_handle: Option<&ContinuousBatchArbiterHandle>, + slot_aggregated_status_manager: &Arc, +) -> Result<()> { + if continuous_batch_arbiter_handle.is_some() { + drain_in_flight_requests(slot_aggregated_status_manager, shutdown).await?; } - async fn shutdown_arbiter_handle(&mut self) -> Result<()> { - let Some(handle) = self.continuous_batch_arbiter_handle.take() else { - return Ok(()); - }; + Ok(()) +} - tokio::task::spawn_blocking(move || handle.shutdown()) - .await - .context("Arbiter shutdown task panicked")? - .context("Arbiter shutdown returned an error") - } +pub struct LlamaCppArbiterService { + pub agent_applicable_state: Option, + pub agent_applicable_state_holder: Arc, + pub agent_name: Option, + pub continue_from_conversation_history_request_rx: + mpsc::UnboundedReceiver, + pub continue_from_raw_prompt_request_rx: mpsc::UnboundedReceiver, + pub desired_slots_total: i32, + pub generate_embedding_batch_request_rx: mpsc::UnboundedReceiver, + pub continuous_batch_arbiter_handle: Option, + pub model_metadata_holder: Arc, + pub slot_aggregated_status_manager: Arc, } #[async_trait] @@ -130,8 +155,21 @@ impl Service for LlamaCppArbiterService { "agent::llamacpp_arbiter_service" } - async fn run(&mut self, shutdown: CancellationToken) -> Result<()> { - let mut reconciled_state = self.agent_applicable_state_holder.subscribe(); + async fn run(self: Box, shutdown: CancellationToken) -> Result<()> { + let Self { + mut agent_applicable_state, + agent_applicable_state_holder, + agent_name, + mut continue_from_conversation_history_request_rx, + mut continue_from_raw_prompt_request_rx, + desired_slots_total, + mut generate_embedding_batch_request_rx, + mut continuous_batch_arbiter_handle, + model_metadata_holder, + slot_aggregated_status_manager, + } = *self; + + let mut reconciled_state = agent_applicable_state_holder.subscribe(); let mut ticker = interval(Duration::from_secs(1)); ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -141,10 +179,10 @@ impl Service for LlamaCppArbiterService { biased; () = shutdown.cancelled() => break Ok(()), _ = ticker.tick() => { - let current_status = self.slot_aggregated_status_manager.slot_aggregated_status.get_state_application_status()?; + let current_status = slot_aggregated_status_manager.slot_aggregated_status.get_state_application_status()?; if current_status.should_try_to_apply() { - self.slot_aggregated_status_manager + slot_aggregated_status_manager .slot_aggregated_status .set_state_application_status( if matches!(current_status, AgentStateApplicationStatus::AttemptedAndRetrying) { @@ -154,36 +192,55 @@ impl Service for LlamaCppArbiterService { } ); - self.try_to_apply_state(&shutdown).await; + try_to_apply_state( + &shutdown, + agent_applicable_state.as_ref(), + agent_name.as_deref(), + desired_slots_total, + &model_metadata_holder, + &slot_aggregated_status_manager, + &mut continuous_batch_arbiter_handle, + ).await; } } _ = reconciled_state.changed() => { - self.agent_applicable_state.clone_from(&reconciled_state.borrow_and_update()); - self.slot_aggregated_status_manager + agent_applicable_state.clone_from(&reconciled_state.borrow_and_update()); + slot_aggregated_status_manager .slot_aggregated_status .set_state_application_status(AgentStateApplicationStatus::Fresh); - self.try_to_apply_state(&shutdown).await; + try_to_apply_state( + &shutdown, + agent_applicable_state.as_ref(), + agent_name.as_deref(), + desired_slots_total, + &model_metadata_holder, + &slot_aggregated_status_manager, + &mut continuous_batch_arbiter_handle, + ).await; } - Some(request) = self.continue_from_conversation_history_request_rx.recv() => { - self.forward_command( + Some(request) = continue_from_conversation_history_request_rx.recv() => { + forward_command( + continuous_batch_arbiter_handle.as_ref(), ContinuousBatchSchedulerCommand::ContinueFromConversationHistory(request), ); } - Some(request) = self.continue_from_raw_prompt_request_rx.recv() => { - self.forward_command( + Some(request) = continue_from_raw_prompt_request_rx.recv() => { + forward_command( + continuous_batch_arbiter_handle.as_ref(), ContinuousBatchSchedulerCommand::ContinueFromRawPrompt(request), ); } - Some(request) = self.generate_embedding_batch_request_rx.recv() => { - self.forward_command( + Some(request) = generate_embedding_batch_request_rx.recv() => { + forward_command( + continuous_batch_arbiter_handle.as_ref(), ContinuousBatchSchedulerCommand::GenerateEmbeddingBatch(request), ); } } }; - if let Err(err) = self.shutdown_arbiter_handle().await { + if let Err(err) = shutdown_arbiter_handle(&mut continuous_batch_arbiter_handle).await { error!("Failed to shut down arbiter cleanly: {err:#}"); } @@ -211,7 +268,7 @@ mod tests { let (generate_embedding_batch_request_tx, generate_embedding_batch_request_rx) = mpsc::unbounded_channel(); - let mut service = LlamaCppArbiterService { + let service = LlamaCppArbiterService { agent_applicable_state: None, agent_applicable_state_holder: Arc::new(AgentApplicableStateHolder::default()), agent_name: None, @@ -227,7 +284,8 @@ mod tests { let shutdown = CancellationToken::new(); let task_token = shutdown.clone(); - let mut join_handle = tokio::spawn(async move { service.run(task_token).await }); + let mut join_handle = + tokio::spawn(async move { Box::new(service).run(task_token).await }); drop(continue_from_conversation_history_request_tx); drop(continue_from_raw_prompt_request_tx); diff --git a/paddler/src/agent/management_socket_client_service.rs b/paddler/src/agent/management_socket_client_service.rs index ebe16a5b..771f5836 100644 --- a/paddler/src/agent/management_socket_client_service.rs +++ b/paddler/src/agent/management_socket_client_service.rs @@ -497,7 +497,7 @@ impl Service for ManagementSocketClientService { "agent::management_socket_client_service" } - async fn run(&mut self, shutdown: CancellationToken) -> Result<()> { + async fn run(self: Box, shutdown: CancellationToken) -> Result<()> { let mut ticker = interval(Duration::from_secs(1)); ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); diff --git a/paddler/src/agent/reconciliation_service.rs b/paddler/src/agent/reconciliation_service.rs index 0e134900..c4de3101 100644 --- a/paddler/src/agent/reconciliation_service.rs +++ b/paddler/src/agent/reconciliation_service.rs @@ -16,6 +16,51 @@ use crate::agent_issue_fix::AgentIssueFix; use crate::converts_to_applicable_state::ConvertsToApplicableState as _; use crate::slot_aggregated_status::SlotAggregatedStatus; +async fn convert_to_applicable_state( + agent_desired_state: Option<&AgentDesiredState>, + slot_aggregated_status: &Arc, + agent_applicable_state_holder: &AgentApplicableStateHolder, + is_converted_to_applicable_state: &mut bool, +) -> Result<()> { + let applicable_state = match agent_desired_state { + None => None, + Some(agent_desired_state) => Some( + agent_desired_state + .to_applicable_state(slot_aggregated_status.clone()) + .await?, + ), + }; + + slot_aggregated_status.set_uses_chat_template_override( + applicable_state + .as_ref() + .is_some_and(|applicable_state| applicable_state.chat_template_override.is_some()), + ); + slot_aggregated_status.register_fix(&AgentIssueFix::ModelStateIsReconciled); + agent_applicable_state_holder.set_agent_applicable_state(applicable_state)?; + *is_converted_to_applicable_state = true; + + Ok(()) +} + +async fn try_convert_to_applicable_state( + agent_desired_state: Option<&AgentDesiredState>, + slot_aggregated_status: &Arc, + agent_applicable_state_holder: &AgentApplicableStateHolder, + is_converted_to_applicable_state: &mut bool, +) { + if let Err(err) = convert_to_applicable_state( + agent_desired_state, + slot_aggregated_status, + agent_applicable_state_holder, + is_converted_to_applicable_state, + ) + .await + { + error!("Failed to convert to applicable state: {err}"); + } +} + pub struct ReconciliationService { pub agent_applicable_state_holder: Arc, pub agent_desired_state: Option, @@ -24,43 +69,21 @@ pub struct ReconciliationService { pub slot_aggregated_status: Arc, } -impl ReconciliationService { - pub async fn convert_to_applicable_state(&mut self) -> Result<()> { - let applicable_state = match &self.agent_desired_state { - None => None, - Some(agent_desired_state) => Some( - agent_desired_state - .to_applicable_state(self.slot_aggregated_status.clone()) - .await?, - ), - }; - - self.is_converted_to_applicable_state = true; - self.slot_aggregated_status.set_uses_chat_template_override( - applicable_state - .as_ref() - .is_some_and(|applicable_state| applicable_state.chat_template_override.is_some()), - ); - self.slot_aggregated_status - .register_fix(&AgentIssueFix::ModelStateIsReconciled); - self.agent_applicable_state_holder - .set_agent_applicable_state(applicable_state) - } - - pub async fn try_convert_to_applicable_state(&mut self) { - if let Err(err) = self.convert_to_applicable_state().await { - error!("Failed to convert to applicable state: {err}"); - } - } -} - #[async_trait] impl Service for ReconciliationService { fn name(&self) -> &'static str { "agent::reconciliation_service" } - async fn run(&mut self, shutdown: CancellationToken) -> Result<()> { + async fn run(self: Box, shutdown: CancellationToken) -> Result<()> { + let Self { + agent_applicable_state_holder, + mut agent_desired_state, + mut agent_desired_state_rx, + mut is_converted_to_applicable_state, + slot_aggregated_status, + } = *self; + let mut ticker = interval(Duration::from_secs(1)); ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -69,20 +92,54 @@ impl Service for ReconciliationService { tokio::select! { () = shutdown.cancelled() => break Ok(()), _ = ticker.tick() => { - if !self.is_converted_to_applicable_state { - self.try_convert_to_applicable_state().await; + if !is_converted_to_applicable_state { + try_convert_to_applicable_state( + agent_desired_state.as_ref(), + &slot_aggregated_status, + &agent_applicable_state_holder, + &mut is_converted_to_applicable_state, + ).await; } }, - next_agent_desired_state = self.agent_desired_state_rx.recv() => { - self.is_converted_to_applicable_state = false; - self.agent_desired_state = if let Some(agent_desired_state) = next_agent_desired_state { Some(agent_desired_state) } else { + next_agent_desired_state = agent_desired_state_rx.recv() => { + is_converted_to_applicable_state = false; + agent_desired_state = if let Some(next) = next_agent_desired_state { + Some(next) + } else { error!("Agent desired state channel closed, stopping reconciliation service."); - break Ok(()) }; - self.try_convert_to_applicable_state().await; + try_convert_to_applicable_state( + agent_desired_state.as_ref(), + &slot_aggregated_status, + &agent_applicable_state_holder, + &mut is_converted_to_applicable_state, + ).await; } } } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn flag_stays_false_when_set_agent_applicable_state_fails() { + let holder = AgentApplicableStateHolder::default(); + let slot_aggregated_status = Arc::new(SlotAggregatedStatus::new(1)); + let mut is_converted_to_applicable_state = false; + + let result = convert_to_applicable_state( + None, + &slot_aggregated_status, + &holder, + &mut is_converted_to_applicable_state, + ) + .await; + + assert!(result.is_err()); + assert!(!is_converted_to_applicable_state); + } +} diff --git a/paddler/src/balancer/compatibility/openai_service/mod.rs b/paddler/src/balancer/compatibility/openai_service/mod.rs index b39d0041..55b8fd9f 100644 --- a/paddler/src/balancer/compatibility/openai_service/mod.rs +++ b/paddler/src/balancer/compatibility/openai_service/mod.rs @@ -33,7 +33,7 @@ impl Service for OpenAIService { "balancer::compatibility::openai_service" } - async fn run(&mut self, shutdown: CancellationToken) -> Result<()> { + async fn run(self: Box, shutdown: CancellationToken) -> Result<()> { let cors_allowed_hosts = self .inference_service_configuration .cors_allowed_hosts diff --git a/paddler/src/balancer/inference_service/mod.rs b/paddler/src/balancer/inference_service/mod.rs index e22b535a..3deaccf4 100644 --- a/paddler/src/balancer/inference_service/mod.rs +++ b/paddler/src/balancer/inference_service/mod.rs @@ -39,7 +39,7 @@ impl Service for InferenceService { "balancer::inference_service" } - async fn run(&mut self, shutdown: CancellationToken) -> Result<()> { + async fn run(self: Box, shutdown: CancellationToken) -> Result<()> { #[cfg_attr(not(feature = "web_admin_panel"), expect(unused_mut))] let mut cors_allowed_hosts = self.configuration.cors_allowed_hosts.clone(); diff --git a/paddler/src/balancer/management_service/mod.rs b/paddler/src/balancer/management_service/mod.rs index 72131e0f..211211af 100644 --- a/paddler/src/balancer/management_service/mod.rs +++ b/paddler/src/balancer/management_service/mod.rs @@ -50,7 +50,7 @@ impl Service for ManagementService { "balancer::management_service" } - async fn run(&mut self, shutdown: CancellationToken) -> Result<()> { + async fn run(self: Box, shutdown: CancellationToken) -> Result<()> { #[cfg_attr(not(feature = "web_admin_panel"), expect(unused_mut))] let mut cors_allowed_hosts = self.configuration.cors_allowed_hosts.clone(); diff --git a/paddler/src/balancer/reconciliation_service.rs b/paddler/src/balancer/reconciliation_service.rs index 928ab0f7..21aec788 100644 --- a/paddler/src/balancer/reconciliation_service.rs +++ b/paddler/src/balancer/reconciliation_service.rs @@ -16,34 +16,49 @@ use crate::balancer_applicable_state_holder::BalancerApplicableStateHolder; use crate::converts_to_applicable_state::ConvertsToApplicableState as _; use crate::sets_desired_state::SetsDesiredState as _; -pub struct ReconciliationService { - pub agent_controller_pool: Arc, - pub balancer_applicable_state_holder: Arc, - pub balancer_desired_state: BalancerDesiredState, - pub balancer_desired_state_rx: broadcast::Receiver, - pub is_converted_to_applicable_state: bool, -} +async fn convert_to_applicable_state( + balancer_desired_state: &BalancerDesiredState, + agent_controller_pool: &AgentControllerPool, + balancer_applicable_state_holder: &BalancerApplicableStateHolder, + is_converted_to_applicable_state: &mut bool, +) -> Result<()> { + let balancer_applicable_state = balancer_desired_state.to_applicable_state(()).await?; -impl ReconciliationService { - pub async fn convert_to_applicable_state(&mut self) -> Result<()> { - let balancer_applicable_state = self.balancer_desired_state.to_applicable_state(()).await?; + agent_controller_pool + .set_desired_state(balancer_applicable_state.agent_desired_state.clone()) + .await?; + balancer_applicable_state_holder + .set_balancer_applicable_state(Some(balancer_applicable_state)); - self.agent_controller_pool - .set_desired_state(balancer_applicable_state.agent_desired_state.clone()) - .await?; - self.balancer_applicable_state_holder - .set_balancer_applicable_state(Some(balancer_applicable_state)); + *is_converted_to_applicable_state = true; - self.is_converted_to_applicable_state = true; + Ok(()) +} - Ok(()) +async fn try_convert_to_applicable_state( + balancer_desired_state: &BalancerDesiredState, + agent_controller_pool: &AgentControllerPool, + balancer_applicable_state_holder: &BalancerApplicableStateHolder, + is_converted_to_applicable_state: &mut bool, +) { + if let Err(err) = convert_to_applicable_state( + balancer_desired_state, + agent_controller_pool, + balancer_applicable_state_holder, + is_converted_to_applicable_state, + ) + .await + { + error!("Failed to convert to applicable state: {err}"); } +} - pub async fn try_convert_to_applicable_state(&mut self) { - if let Err(err) = self.convert_to_applicable_state().await { - error!("Failed to convert to applicable state: {err}"); - } - } +pub struct ReconciliationService { + pub agent_controller_pool: Arc, + pub balancer_applicable_state_holder: Arc, + pub balancer_desired_state: BalancerDesiredState, + pub balancer_desired_state_rx: broadcast::Receiver, + pub is_converted_to_applicable_state: bool, } #[async_trait] @@ -52,7 +67,15 @@ impl Service for ReconciliationService { "balancer::reconciliation_service" } - async fn run(&mut self, shutdown: CancellationToken) -> Result<()> { + async fn run(self: Box, shutdown: CancellationToken) -> Result<()> { + let Self { + agent_controller_pool, + balancer_applicable_state_holder, + mut balancer_desired_state, + mut balancer_desired_state_rx, + mut is_converted_to_applicable_state, + } = *self; + let mut ticker = interval(Duration::from_secs(1)); ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -61,14 +84,24 @@ impl Service for ReconciliationService { tokio::select! { () = shutdown.cancelled() => break Ok(()), _ = ticker.tick() => { - if !self.is_converted_to_applicable_state { - self.try_convert_to_applicable_state().await; + if !is_converted_to_applicable_state { + try_convert_to_applicable_state( + &balancer_desired_state, + &agent_controller_pool, + &balancer_applicable_state_holder, + &mut is_converted_to_applicable_state, + ).await; } }, - balancer_desired_state = self.balancer_desired_state_rx.recv() => { - self.is_converted_to_applicable_state = false; - self.balancer_desired_state = balancer_desired_state?; - self.try_convert_to_applicable_state().await; + received_balancer_desired_state = balancer_desired_state_rx.recv() => { + is_converted_to_applicable_state = false; + balancer_desired_state = received_balancer_desired_state?; + try_convert_to_applicable_state( + &balancer_desired_state, + &agent_controller_pool, + &balancer_applicable_state_holder, + &mut is_converted_to_applicable_state, + ).await; } } } diff --git a/paddler/src/balancer/statsd_service/mod.rs b/paddler/src/balancer/statsd_service/mod.rs index 509d734f..f0e3715d 100644 --- a/paddler/src/balancer/statsd_service/mod.rs +++ b/paddler/src/balancer/statsd_service/mod.rs @@ -49,7 +49,7 @@ impl Service for StatsdService { "balancer::statsd_service" } - async fn run(&mut self, shutdown: CancellationToken) -> Result<()> { + async fn run(self: Box, shutdown: CancellationToken) -> Result<()> { let statsd_sink_socket = UdpSocket::bind("0.0.0.0:0")?; let statsd_sink = UdpMetricSink::from(self.configuration.statsd_addr, statsd_sink_socket)?; diff --git a/paddler/src/balancer/web_admin_panel_service/mod.rs b/paddler/src/balancer/web_admin_panel_service/mod.rs index fff4829d..0701e072 100644 --- a/paddler/src/balancer/web_admin_panel_service/mod.rs +++ b/paddler/src/balancer/web_admin_panel_service/mod.rs @@ -26,7 +26,7 @@ impl Service for WebAdminPanelService { "balancer::web_admin_panel_service" } - async fn run(&mut self, shutdown: CancellationToken) -> Result<()> { + async fn run(self: Box, shutdown: CancellationToken) -> Result<()> { let app_data: Data = Data::new(AppData { template_data: self.configuration.template_data.clone(), });