From 81f2df3af3238f1e33df46b8eaf0095b83e2871c Mon Sep 17 00:00:00 2001 From: Mateusz Charytoniuk Date: Fri, 22 May 2026 18:42:02 +0200 Subject: [PATCH] change Service::run to take self: Box --- Cargo.lock | 2 +- README.md | 2 +- trzcina/Cargo.toml | 2 +- trzcina/src/running_service_collection.rs | 6 +-- trzcina/src/service.rs | 2 +- ..._via_shared_holder_between_two_services.rs | 49 ++++++++++--------- ...le_runs_all_services_returned_by_bundle.rs | 27 +++++----- ...egister_service_runs_registered_service.rs | 12 ++--- ...ts_hung_service_after_shutdown_deadline.rs | 2 +- ...aborts_hung_services_on_external_cancel.rs | 2 +- ...ative_and_abort_deadlines_independently.rs | 2 +- ..._services_when_external_token_cancelled.rs | 12 ++--- ...iblings_when_one_service_finishes_first.rs | 35 ++++++------- ...when_all_services_finish_simultaneously.rs | 2 +- ...l_failures_when_multiple_services_error.rs | 35 ++++++------- .../run_records_non_string_panic_payload.rs | 2 +- ...ords_service_error_and_cancels_siblings.rs | 35 ++++++------- ...ords_service_panic_and_cancels_siblings.rs | 35 ++++++------- ...un_records_string_literal_panic_payload.rs | 2 +- .../tests/run_records_string_panic_payload.rs | 2 +- ...own_deadline_when_service_ignores_abort.rs | 2 +- ...rts_actix_style_shutdown_signal_pattern.rs | 12 ++--- .../supports_internal_retry_loop_pattern.rs | 2 +- ..._interval_ticker_reconciliation_pattern.rs | 2 +- ...ports_multi_channel_select_pump_pattern.rs | 2 +- ...utable_internal_state_across_iterations.rs | 2 +- ...pports_notify_driven_event_loop_pattern.rs | 2 +- 27 files changed, 134 insertions(+), 158 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ff75e9..6fc92b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -269,7 +269,7 @@ dependencies = [ [[package]] name = "trzcina" -version = "0.2.1" +version = "0.3.0" dependencies = [ "anyhow", "async-trait", diff --git a/README.md b/README.md index 72f45ae..7f9ef40 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ struct EchoService; #[async_trait] impl Service for EchoService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + async fn run(self: Box, cancellation_token: CancellationToken) -> Result<()> { cancellation_token.cancelled().await; Ok(()) } diff --git a/trzcina/Cargo.toml b/trzcina/Cargo.toml index 0c609a4..f5e6078 100644 --- a/trzcina/Cargo.toml +++ b/trzcina/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "trzcina" -version = "0.2.1" +version = "0.3.0" edition = "2024" license = "Apache-2.0" description = "Async service lifecycle orchestration with cooperative cancellation and shutdown deadlines." diff --git a/trzcina/src/running_service_collection.rs b/trzcina/src/running_service_collection.rs index ff0c754..5f07a35 100644 --- a/trzcina/src/running_service_collection.rs +++ b/trzcina/src/running_service_collection.rs @@ -31,16 +31,16 @@ fn panic_payload_to_string(panic_payload: Box) -> String { async fn run_service_with_sibling_cancellation_on_return( service_name: &'static str, - mut service: Box, + service: Box, cancellation_token: CancellationToken, ) -> ServiceShutdownOutcome { let _sibling_cancellation_guard = SiblingCancellationGuard::new(cancellation_token.clone()); - classify_service_outcome(service_name, &mut service, cancellation_token).await + classify_service_outcome(service_name, service, cancellation_token).await } async fn classify_service_outcome( service_name: &'static str, - service: &mut Box, + service: Box, cancellation_token: CancellationToken, ) -> ServiceShutdownOutcome { info!("Service {service_name:?} starting"); diff --git a/trzcina/src/service.rs b/trzcina/src/service.rs index 24af5be..be7d69a 100644 --- a/trzcina/src/service.rs +++ b/trzcina/src/service.rs @@ -7,5 +7,5 @@ pub trait Service: Send + 'static { fn name(&self) -> &'static str { std::any::type_name::() } - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()>; + async fn run(self: Box, cancellation_token: CancellationToken) -> Result<()>; } diff --git a/trzcina/tests/coordinates_via_shared_holder_between_two_services.rs b/trzcina/tests/coordinates_via_shared_holder_between_two_services.rs index 9789ac1..934a155 100644 --- a/trzcina/tests/coordinates_via_shared_holder_between_two_services.rs +++ b/trzcina/tests/coordinates_via_shared_holder_between_two_services.rs @@ -15,33 +15,39 @@ use trzcina::ServiceShutdownOutcome; const PRODUCED_VALUE: u32 = 42; -struct CoordinatingService { - is_producer: bool, +struct ProducerService { notify: Arc, - observation_tx: Option>, shared_state: Arc>>, } #[async_trait] -impl Service for CoordinatingService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { - if self.is_producer { - { - let mut guard = self.shared_state.lock().unwrap(); - *guard = Some(PRODUCED_VALUE); - } - self.notify.notify_one(); - cancellation_token.cancelled().await; - return Ok(()); +impl Service for ProducerService { + async fn run(self: Box, cancellation_token: CancellationToken) -> Result<()> { + { + let mut guard = self.shared_state.lock().unwrap(); + *guard = Some(PRODUCED_VALUE); } + self.notify.notify_one(); + cancellation_token.cancelled().await; + Ok(()) + } +} + +struct ConsumerService { + notify: Arc, + observation_tx: oneshot::Sender, + shared_state: Arc>>, +} + +#[async_trait] +impl Service for ConsumerService { + async fn run(self: Box, cancellation_token: CancellationToken) -> Result<()> { tokio::select! { () = cancellation_token.cancelled() => return Ok(()), () = self.notify.notified() => { let observed_value = *self.shared_state.lock().unwrap(); - if let Some(value) = observed_value - && let Some(observation_tx) = self.observation_tx.take() - { - observation_tx.send(value).unwrap(); + if let Some(value) = observed_value { + self.observation_tx.send(value).unwrap(); } } } @@ -59,16 +65,13 @@ async fn coordinates_via_shared_holder_between_two_services() { let cancellation_token_for_run = cancellation_token.clone(); let mut manager = ServiceManager::default(); - manager.register_service(CoordinatingService { - is_producer: true, + manager.register_service(ProducerService { notify: notify.clone(), - observation_tx: None, shared_state: shared_state.clone(), }); - manager.register_service(CoordinatingService { - is_producer: false, + manager.register_service(ConsumerService { notify: notify.clone(), - observation_tx: Some(observation_tx), + observation_tx, shared_state: shared_state.clone(), }); diff --git a/trzcina/tests/register_bundle_runs_all_services_returned_by_bundle.rs b/trzcina/tests/register_bundle_runs_all_services_returned_by_bundle.rs index 047f3a8..77e8a8e 100644 --- a/trzcina/tests/register_bundle_runs_all_services_returned_by_bundle.rs +++ b/trzcina/tests/register_bundle_runs_all_services_returned_by_bundle.rs @@ -10,32 +10,30 @@ use trzcina::ServiceBundle; use trzcina::ServiceManager; use trzcina::ServiceShutdownOptions; -struct BundleAndService { - observation_tx: Option>, - sibling_senders: Vec>, +struct ObservableService { + observation_tx: oneshot::Sender<()>, } #[async_trait] -impl Service for BundleAndService { - async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { - if let Some(observation_tx) = self.observation_tx.take() { - observation_tx.send(()).unwrap(); - } +impl Service for ObservableService { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { + self.observation_tx.send(()).unwrap(); Ok(()) } } +struct SiblingsBundle { + sibling_senders: Vec>, +} + #[async_trait] -impl ServiceBundle for BundleAndService { +impl ServiceBundle for SiblingsBundle { async fn services(self) -> Result>> { let services: Vec> = self .sibling_senders .into_iter() .map(|observation_tx| { - Box::new(BundleAndService { - observation_tx: Some(observation_tx), - sibling_senders: Vec::new(), - }) as Box + Box::new(ObservableService { observation_tx }) as Box }) .collect(); Ok(services) @@ -47,8 +45,7 @@ async fn runs_all_services_returned_by_bundle() { let (first_tx, mut first_rx) = oneshot::channel::<()>(); let (second_tx, mut second_rx) = oneshot::channel::<()>(); - let bundle = BundleAndService { - observation_tx: None, + let bundle = SiblingsBundle { sibling_senders: vec![first_tx, second_tx], }; diff --git a/trzcina/tests/register_service_runs_registered_service.rs b/trzcina/tests/register_service_runs_registered_service.rs index ac7bd13..49a3189 100644 --- a/trzcina/tests/register_service_runs_registered_service.rs +++ b/trzcina/tests/register_service_runs_registered_service.rs @@ -10,15 +10,13 @@ use trzcina::ServiceManager; use trzcina::ServiceShutdownOptions; struct ObservableService { - observation_tx: Option>, + observation_tx: oneshot::Sender<()>, } #[async_trait] impl Service for ObservableService { - async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { - if let Some(observation_tx) = self.observation_tx.take() { - observation_tx.send(()).unwrap(); - } + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { + self.observation_tx.send(()).unwrap(); Ok(()) } } @@ -28,9 +26,7 @@ async fn runs_registered_service() { let (observation_tx, mut observation_rx) = oneshot::channel::<()>(); let mut manager = ServiceManager::default(); - manager.register_service(ObservableService { - observation_tx: Some(observation_tx), - }); + manager.register_service(ObservableService { observation_tx }); timeout( Duration::from_secs(5), diff --git a/trzcina/tests/run_aborts_hung_service_after_shutdown_deadline.rs b/trzcina/tests/run_aborts_hung_service_after_shutdown_deadline.rs index bd3d412..d80ab04 100644 --- a/trzcina/tests/run_aborts_hung_service_after_shutdown_deadline.rs +++ b/trzcina/tests/run_aborts_hung_service_after_shutdown_deadline.rs @@ -16,7 +16,7 @@ struct ConfiguredService { #[async_trait] impl Service for ConfiguredService { - async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { if self.hang_ignoring_cancellation { loop { yield_now().await; diff --git a/trzcina/tests/run_aborts_hung_services_on_external_cancel.rs b/trzcina/tests/run_aborts_hung_services_on_external_cancel.rs index b99259a..88556a5 100644 --- a/trzcina/tests/run_aborts_hung_services_on_external_cancel.rs +++ b/trzcina/tests/run_aborts_hung_services_on_external_cancel.rs @@ -14,7 +14,7 @@ struct CancellationIgnoringService; #[async_trait] impl Service for CancellationIgnoringService { - async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { loop { yield_now().await; } diff --git a/trzcina/tests/run_applies_cooperative_and_abort_deadlines_independently.rs b/trzcina/tests/run_applies_cooperative_and_abort_deadlines_independently.rs index 6166592..492b5a9 100644 --- a/trzcina/tests/run_applies_cooperative_and_abort_deadlines_independently.rs +++ b/trzcina/tests/run_applies_cooperative_and_abort_deadlines_independently.rs @@ -15,7 +15,7 @@ struct ThreadBlockingService { #[async_trait] impl Service for ThreadBlockingService { - async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { std::thread::sleep(self.block_duration); Ok(()) } diff --git a/trzcina/tests/run_cancels_all_services_when_external_token_cancelled.rs b/trzcina/tests/run_cancels_all_services_when_external_token_cancelled.rs index 271d932..417b951 100644 --- a/trzcina/tests/run_cancels_all_services_when_external_token_cancelled.rs +++ b/trzcina/tests/run_cancels_all_services_when_external_token_cancelled.rs @@ -11,16 +11,14 @@ use trzcina::ServiceShutdownOptions; use trzcina::ServiceShutdownOutcome; struct AwaitingService { - observation_tx: Option>, + observation_tx: oneshot::Sender<()>, } #[async_trait] impl Service for AwaitingService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + async fn run(self: Box, cancellation_token: CancellationToken) -> Result<()> { cancellation_token.cancelled().await; - if let Some(observation_tx) = self.observation_tx.take() { - observation_tx.send(()).unwrap(); - } + self.observation_tx.send(()).unwrap(); Ok(()) } } @@ -34,9 +32,7 @@ async fn cancels_all_services_when_external_token_cancelled() { for _ in 0..5 { let (observation_tx, observation_rx) = oneshot::channel::<()>(); - manager.register_service(AwaitingService { - observation_tx: Some(observation_tx), - }); + manager.register_service(AwaitingService { observation_tx }); observation_receivers.push(observation_rx); } diff --git a/trzcina/tests/run_cancels_siblings_when_one_service_finishes_first.rs b/trzcina/tests/run_cancels_siblings_when_one_service_finishes_first.rs index 8b5566d..d79b445 100644 --- a/trzcina/tests/run_cancels_siblings_when_one_service_finishes_first.rs +++ b/trzcina/tests/run_cancels_siblings_when_one_service_finishes_first.rs @@ -10,21 +10,24 @@ use trzcina::ServiceManager; use trzcina::ServiceShutdownOptions; use trzcina::ServiceShutdownOutcome; -struct ConfiguredService { - finish_immediately: bool, - observation_tx: Option>, +struct FinishImmediatelyService; + +#[async_trait] +impl Service for FinishImmediatelyService { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { + Ok(()) + } +} + +struct WaitingObserverService { + observation_tx: oneshot::Sender<()>, } #[async_trait] -impl Service for ConfiguredService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { - if self.finish_immediately { - return Ok(()); - } +impl Service for WaitingObserverService { + async fn run(self: Box, cancellation_token: CancellationToken) -> Result<()> { cancellation_token.cancelled().await; - if let Some(observation_tx) = self.observation_tx.take() { - observation_tx.send(()).unwrap(); - } + self.observation_tx.send(()).unwrap(); Ok(()) } } @@ -32,18 +35,12 @@ impl Service for ConfiguredService { #[tokio::test] async fn cancels_siblings_when_one_service_finishes_first() { let mut manager = ServiceManager::default(); - manager.register_service(ConfiguredService { - finish_immediately: true, - observation_tx: None, - }); + manager.register_service(FinishImmediatelyService); let mut sibling_observation_receivers = Vec::new(); for _ in 0..4 { let (observation_tx, observation_rx) = oneshot::channel::<()>(); - manager.register_service(ConfiguredService { - finish_immediately: false, - observation_tx: Some(observation_tx), - }); + manager.register_service(WaitingObserverService { observation_tx }); sibling_observation_receivers.push(observation_rx); } diff --git a/trzcina/tests/run_completes_when_all_services_finish_simultaneously.rs b/trzcina/tests/run_completes_when_all_services_finish_simultaneously.rs index f8835fa..ef88152 100644 --- a/trzcina/tests/run_completes_when_all_services_finish_simultaneously.rs +++ b/trzcina/tests/run_completes_when_all_services_finish_simultaneously.rs @@ -13,7 +13,7 @@ struct InstantOkService; #[async_trait] impl Service for InstantOkService { - async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { Ok(()) } } diff --git a/trzcina/tests/run_records_all_failures_when_multiple_services_error.rs b/trzcina/tests/run_records_all_failures_when_multiple_services_error.rs index fbbca18..963f5b1 100644 --- a/trzcina/tests/run_records_all_failures_when_multiple_services_error.rs +++ b/trzcina/tests/run_records_all_failures_when_multiple_services_error.rs @@ -11,21 +11,24 @@ use trzcina::ServiceManager; use trzcina::ServiceShutdownOptions; use trzcina::ServiceShutdownOutcome; -struct ConfiguredService { - return_err: bool, - observation_tx: Option>, +struct ErroringService; + +#[async_trait] +impl Service for ErroringService { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { + Err(anyhow!("erroring service deliberately failed")) + } +} + +struct WaitingObserverService { + observation_tx: oneshot::Sender<()>, } #[async_trait] -impl Service for ConfiguredService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { - if self.return_err { - return Err(anyhow!("erroring service deliberately failed")); - } +impl Service for WaitingObserverService { + async fn run(self: Box, cancellation_token: CancellationToken) -> Result<()> { cancellation_token.cancelled().await; - if let Some(observation_tx) = self.observation_tx.take() { - observation_tx.send(()).unwrap(); - } + self.observation_tx.send(()).unwrap(); Ok(()) } } @@ -34,19 +37,13 @@ impl Service for ConfiguredService { async fn records_all_failures_when_multiple_services_error() { let mut manager = ServiceManager::default(); for _ in 0..3 { - manager.register_service(ConfiguredService { - return_err: true, - observation_tx: None, - }); + manager.register_service(ErroringService); } let mut sibling_observation_receivers = Vec::new(); for _ in 0..2 { let (observation_tx, observation_rx) = oneshot::channel::<()>(); - manager.register_service(ConfiguredService { - return_err: false, - observation_tx: Some(observation_tx), - }); + manager.register_service(WaitingObserverService { observation_tx }); sibling_observation_receivers.push(observation_rx); } diff --git a/trzcina/tests/run_records_non_string_panic_payload.rs b/trzcina/tests/run_records_non_string_panic_payload.rs index 1ef4d48..b878f99 100644 --- a/trzcina/tests/run_records_non_string_panic_payload.rs +++ b/trzcina/tests/run_records_non_string_panic_payload.rs @@ -14,7 +14,7 @@ struct NonStringPanickingService; #[async_trait] impl Service for NonStringPanickingService { - async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { panic_any(42_u32); } } diff --git a/trzcina/tests/run_records_service_error_and_cancels_siblings.rs b/trzcina/tests/run_records_service_error_and_cancels_siblings.rs index f7bc042..410911a 100644 --- a/trzcina/tests/run_records_service_error_and_cancels_siblings.rs +++ b/trzcina/tests/run_records_service_error_and_cancels_siblings.rs @@ -11,21 +11,24 @@ use trzcina::ServiceManager; use trzcina::ServiceShutdownOptions; use trzcina::ServiceShutdownOutcome; -struct ConfiguredService { - return_err: bool, - observation_tx: Option>, +struct ErroringService; + +#[async_trait] +impl Service for ErroringService { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { + Err(anyhow!("erroring service deliberately failed")) + } +} + +struct WaitingObserverService { + observation_tx: oneshot::Sender<()>, } #[async_trait] -impl Service for ConfiguredService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { - if self.return_err { - return Err(anyhow!("erroring service deliberately failed")); - } +impl Service for WaitingObserverService { + async fn run(self: Box, cancellation_token: CancellationToken) -> Result<()> { cancellation_token.cancelled().await; - if let Some(observation_tx) = self.observation_tx.take() { - observation_tx.send(()).unwrap(); - } + self.observation_tx.send(()).unwrap(); Ok(()) } } @@ -33,18 +36,12 @@ impl Service for ConfiguredService { #[tokio::test] async fn records_service_error_and_cancels_siblings() { let mut manager = ServiceManager::default(); - manager.register_service(ConfiguredService { - return_err: true, - observation_tx: None, - }); + manager.register_service(ErroringService); let mut sibling_observation_receivers = Vec::new(); for _ in 0..4 { let (observation_tx, observation_rx) = oneshot::channel::<()>(); - manager.register_service(ConfiguredService { - return_err: false, - observation_tx: Some(observation_tx), - }); + manager.register_service(WaitingObserverService { observation_tx }); sibling_observation_receivers.push(observation_rx); } diff --git a/trzcina/tests/run_records_service_panic_and_cancels_siblings.rs b/trzcina/tests/run_records_service_panic_and_cancels_siblings.rs index 2d8bfcb..3e2597c 100644 --- a/trzcina/tests/run_records_service_panic_and_cancels_siblings.rs +++ b/trzcina/tests/run_records_service_panic_and_cancels_siblings.rs @@ -12,21 +12,24 @@ use trzcina::ServiceShutdownOutcome; const PANIC_MARKER: &str = "deliberately panicking for cascade test"; -struct ConfiguredService { - should_panic: bool, - observation_tx: Option>, +struct PanickingService; + +#[async_trait] +impl Service for PanickingService { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { + panic!("{}", PANIC_MARKER); + } +} + +struct WaitingObserverService { + observation_tx: oneshot::Sender<()>, } #[async_trait] -impl Service for ConfiguredService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { - if self.should_panic { - panic!("{}", PANIC_MARKER); - } +impl Service for WaitingObserverService { + async fn run(self: Box, cancellation_token: CancellationToken) -> Result<()> { cancellation_token.cancelled().await; - if let Some(observation_tx) = self.observation_tx.take() { - observation_tx.send(()).unwrap(); - } + self.observation_tx.send(()).unwrap(); Ok(()) } } @@ -34,18 +37,12 @@ impl Service for ConfiguredService { #[tokio::test] async fn records_service_panic_and_cancels_siblings() { let mut manager = ServiceManager::default(); - manager.register_service(ConfiguredService { - should_panic: true, - observation_tx: None, - }); + manager.register_service(PanickingService); let mut sibling_observation_receivers = Vec::new(); for _ in 0..4 { let (observation_tx, observation_rx) = oneshot::channel::<()>(); - manager.register_service(ConfiguredService { - should_panic: false, - observation_tx: Some(observation_tx), - }); + manager.register_service(WaitingObserverService { observation_tx }); sibling_observation_receivers.push(observation_rx); } diff --git a/trzcina/tests/run_records_string_literal_panic_payload.rs b/trzcina/tests/run_records_string_literal_panic_payload.rs index 71c4edb..0135e8e 100644 --- a/trzcina/tests/run_records_string_literal_panic_payload.rs +++ b/trzcina/tests/run_records_string_literal_panic_payload.rs @@ -15,7 +15,7 @@ struct LiteralPanickingService; #[async_trait] impl Service for LiteralPanickingService { - async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { panic!("deliberately panicking with a string literal"); } } diff --git a/trzcina/tests/run_records_string_panic_payload.rs b/trzcina/tests/run_records_string_panic_payload.rs index 119287e..05d8f35 100644 --- a/trzcina/tests/run_records_string_panic_payload.rs +++ b/trzcina/tests/run_records_string_panic_payload.rs @@ -15,7 +15,7 @@ struct StringPanickingService { #[async_trait] impl Service for StringPanickingService { - async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { panic!("dynamic message: {}", self.panic_payload); } } diff --git a/trzcina/tests/run_reports_leaked_beyond_shutdown_deadline_when_service_ignores_abort.rs b/trzcina/tests/run_reports_leaked_beyond_shutdown_deadline_when_service_ignores_abort.rs index 022a7c4..d83f473 100644 --- a/trzcina/tests/run_reports_leaked_beyond_shutdown_deadline_when_service_ignores_abort.rs +++ b/trzcina/tests/run_reports_leaked_beyond_shutdown_deadline_when_service_ignores_abort.rs @@ -15,7 +15,7 @@ struct ThreadBlockingService { #[async_trait] impl Service for ThreadBlockingService { - async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + async fn run(self: Box, _cancellation_token: CancellationToken) -> Result<()> { std::thread::sleep(self.block_duration); Ok(()) } diff --git a/trzcina/tests/supports_actix_style_shutdown_signal_pattern.rs b/trzcina/tests/supports_actix_style_shutdown_signal_pattern.rs index 4c74adf..cec67ed 100644 --- a/trzcina/tests/supports_actix_style_shutdown_signal_pattern.rs +++ b/trzcina/tests/supports_actix_style_shutdown_signal_pattern.rs @@ -11,15 +11,13 @@ use trzcina::ServiceShutdownOptions; use trzcina::ServiceShutdownOutcome; struct ActixStyleService { - started_tx: Option>, + started_tx: oneshot::Sender<()>, } #[async_trait] impl Service for ActixStyleService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { - if let Some(started_tx) = self.started_tx.take() { - started_tx.send(()).unwrap(); - } + async fn run(self: Box, cancellation_token: CancellationToken) -> Result<()> { + self.started_tx.send(()).unwrap(); loop { if cancellation_token.is_cancelled() { break; @@ -37,9 +35,7 @@ async fn supports_actix_style_shutdown_signal_pattern() { let (started_tx, started_rx) = oneshot::channel::<()>(); let mut manager = ServiceManager::default(); - manager.register_service(ActixStyleService { - started_tx: Some(started_tx), - }); + manager.register_service(ActixStyleService { started_tx }); let run_task = tokio::spawn(async move { manager diff --git a/trzcina/tests/supports_internal_retry_loop_pattern.rs b/trzcina/tests/supports_internal_retry_loop_pattern.rs index fe8458a..370afe7 100644 --- a/trzcina/tests/supports_internal_retry_loop_pattern.rs +++ b/trzcina/tests/supports_internal_retry_loop_pattern.rs @@ -17,7 +17,7 @@ struct RetryLoopService { #[async_trait] impl Service for RetryLoopService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + async fn run(mut self: Box, cancellation_token: CancellationToken) -> Result<()> { loop { if let Some(backoff_started_tx) = self.backoff_started_tx.take() { backoff_started_tx.send(()).unwrap(); diff --git a/trzcina/tests/supports_interval_ticker_reconciliation_pattern.rs b/trzcina/tests/supports_interval_ticker_reconciliation_pattern.rs index 9661ae1..aeaf220 100644 --- a/trzcina/tests/supports_interval_ticker_reconciliation_pattern.rs +++ b/trzcina/tests/supports_interval_ticker_reconciliation_pattern.rs @@ -21,7 +21,7 @@ struct ReconciliationService { #[async_trait] impl Service for ReconciliationService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + async fn run(mut self: Box, cancellation_token: CancellationToken) -> Result<()> { let mut ticker = interval(Duration::from_millis(10)); loop { tokio::select! { diff --git a/trzcina/tests/supports_multi_channel_select_pump_pattern.rs b/trzcina/tests/supports_multi_channel_select_pump_pattern.rs index a29983e..bfe7cb7 100644 --- a/trzcina/tests/supports_multi_channel_select_pump_pattern.rs +++ b/trzcina/tests/supports_multi_channel_select_pump_pattern.rs @@ -20,7 +20,7 @@ struct MultiChannelPumpService { #[async_trait] impl Service for MultiChannelPumpService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + async fn run(mut self: Box, cancellation_token: CancellationToken) -> Result<()> { loop { tokio::select! { () = cancellation_token.cancelled() => return Ok(()), diff --git a/trzcina/tests/supports_mutable_internal_state_across_iterations.rs b/trzcina/tests/supports_mutable_internal_state_across_iterations.rs index a4d784b..bcf2768 100644 --- a/trzcina/tests/supports_mutable_internal_state_across_iterations.rs +++ b/trzcina/tests/supports_mutable_internal_state_across_iterations.rs @@ -21,7 +21,7 @@ struct StatefulService { #[async_trait] impl Service for StatefulService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + async fn run(mut self: Box, cancellation_token: CancellationToken) -> Result<()> { loop { self.iteration_count += 1; if let Some(work_observer) = self.work_observers.pop_front() { diff --git a/trzcina/tests/supports_notify_driven_event_loop_pattern.rs b/trzcina/tests/supports_notify_driven_event_loop_pattern.rs index a8a0e88..02403f3 100644 --- a/trzcina/tests/supports_notify_driven_event_loop_pattern.rs +++ b/trzcina/tests/supports_notify_driven_event_loop_pattern.rs @@ -20,7 +20,7 @@ struct NotifyDrivenService { #[async_trait] impl Service for NotifyDrivenService { - async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + async fn run(mut self: Box, cancellation_token: CancellationToken) -> Result<()> { loop { if let Some(work_observer) = self.work_observers.pop_front() { work_observer.send(()).unwrap();