diff --git a/Cargo.lock b/Cargo.lock index 093cfc1..9a38ed6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -269,10 +269,48 @@ dependencies = [ [[package]] name = "trzcina" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "async-trait", + "tokio", + "tokio-util", + "trzcina-local-service", + "trzcina-sendable-service", + "trzcina-service", +] + +[[package]] +name = "trzcina-local-service" +version = "0.2.0" +dependencies = [ + "anyhow", + "async-trait", + "futures-util", + "log", + "tokio", + "tokio-util", + "trzcina-service", +] + +[[package]] +name = "trzcina-sendable-service" +version = "0.2.0" +dependencies = [ + "anyhow", + "async-trait", + "futures-util", + "log", + "tokio", + "tokio-util", + "trzcina-service", +] + +[[package]] +name = "trzcina-service" +version = "0.2.0" +dependencies = [ + "anyhow", "futures-util", "log", "tokio", diff --git a/Cargo.toml b/Cargo.toml index e5516eb..2336a77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,11 @@ [workspace] resolver = "3" -members = ["trzcina"] +members = [ + "trzcina", + "trzcina-local-service", + "trzcina-sendable-service", + "trzcina-service", +] [workspace.dependencies] anyhow = "1.0" diff --git a/Makefile b/Makefile index 29bfcd6..bc6d8fb 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -COVERAGE_PACKAGES := -p trzcina +COVERAGE_PACKAGES := --workspace RUST_LOG ?= debug # ----------------------------------------------------------------------------- @@ -34,7 +34,9 @@ coverage: node_modules cargo llvm-cov report npx @intentee/rust-coverage-check target/llvm-cov.json \ --workspace-root $(CURDIR) \ - --gated trzcina=97 + --gated trzcina-local-service=100 \ + --gated trzcina-sendable-service=100 \ + --gated trzcina-service=100 .PHONY: coverage-clean coverage-clean: diff --git a/README.md b/README.md index a424374..2d658a9 100644 --- a/README.md +++ b/README.md @@ -2,15 +2,19 @@ Async service lifecycle orchestration for Rust. Run a set of long-lived async services concurrently, cancel siblings when one finishes, surface errors and panics through a typed outcome collection, and enforce an absolute shutdown deadline. +Cancellation is cooperative: every service must `.await` on the `CancellationToken` passed to `run` (typically inside a `tokio::select!`) and return when it fires. Services that ignore the token are aborted when the shutdown deadline expires. + +If your service spawns child tasks via `tokio::spawn`, clone the cancellation token into them — trzcina only bounds the service's own task, not detached children. (Inside a `LocalService`, `tokio::task::spawn_local` is scoped to trzcina's `LocalSet` and is cancelled automatically; `tokio::spawn` still escapes.) + ## Usage -```rust -use std::time::Duration; +A service that simply waits for shutdown: +```rust use anyhow::Result; use async_trait::async_trait; use tokio_util::sync::CancellationToken; -use trzcina::{Service, ServiceManager}; +use trzcina::{Manager, RunToCompletionOptions, RunningCollection, Service, ServiceManager}; struct EchoService; @@ -27,11 +31,40 @@ async fn main() -> Result<()> { let mut service_manager = ServiceManager::default(); service_manager.register_service(EchoService); - let running = service_manager.start(CancellationToken::new()); - running - .run_to_completion(Duration::from_secs(10)) + service_manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions::default()) .await - .into_result()?; - Ok(()) + .into_result() + .map_err(Into::into) +} +``` + +A service that does periodic work and yields to cancellation on every tick: + +```rust +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio_util::sync::CancellationToken; +use trzcina::Service; + +struct TickerService; + +#[async_trait] +impl Service for TickerService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + let mut ticker = tokio::time::interval(Duration::from_secs(1)); + loop { + tokio::select! { + biased; + () = cancellation_token.cancelled() => return Ok(()), + _ = ticker.tick() => { + // do periodic work here + } + } + } + } } ``` diff --git a/trzcina-local-service/Cargo.toml b/trzcina-local-service/Cargo.toml new file mode 100644 index 0000000..9adfb3a --- /dev/null +++ b/trzcina-local-service/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "trzcina-local-service" +version = "0.2.0" +edition = "2024" +license = "Apache-2.0" +description = "Local (!Send) services for trzcina: cooperative async lifecycle on a tokio LocalSet." +repository = "https://github.com/intentee/trzcina" +homepage = "https://github.com/intentee/trzcina" +readme = "../README.md" +authors = ["Intentee"] +keywords = ["async", "service", "lifecycle", "cancellation", "tokio"] +categories = ["asynchronous", "concurrency"] + +[dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } +futures-util = { workspace = true } +log = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } +trzcina-service = { version = "0.2.0", path = "../trzcina-service" } + +[lints] +workspace = true diff --git a/trzcina-local-service/src/lib.rs b/trzcina-local-service/src/lib.rs new file mode 100644 index 0000000..73219d7 --- /dev/null +++ b/trzcina-local-service/src/lib.rs @@ -0,0 +1,11 @@ +mod local_registered_service; +mod local_running_service_collection; +mod local_service; +mod local_service_bundle; +mod local_service_manager; + +pub use crate::local_registered_service::LocalRegisteredService; +pub use crate::local_running_service_collection::LocalRunningServiceCollection; +pub use crate::local_service::LocalService; +pub use crate::local_service_bundle::LocalServiceBundle; +pub use crate::local_service_manager::LocalServiceManager; diff --git a/trzcina-local-service/src/local_registered_service.rs b/trzcina-local-service/src/local_registered_service.rs new file mode 100644 index 0000000..e62691a --- /dev/null +++ b/trzcina-local-service/src/local_registered_service.rs @@ -0,0 +1,6 @@ +use crate::local_service::LocalService; + +pub struct LocalRegisteredService { + pub name: &'static str, + pub service: Box, +} diff --git a/trzcina-local-service/src/local_running_service_collection.rs b/trzcina-local-service/src/local_running_service_collection.rs new file mode 100644 index 0000000..899219d --- /dev/null +++ b/trzcina-local-service/src/local_running_service_collection.rs @@ -0,0 +1,95 @@ +use tokio::sync::oneshot; +use tokio::task::JoinSet; +use tokio::task::LocalSet; +use tokio_util::sync::CancellationToken; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::RunningService; +use trzcina_service::ServiceShutdownOutcome; +use trzcina_service::ServiceShutdownOutcomeCollection; +use trzcina_service::ServiceShutdownOutcomeWithServiceName; +use trzcina_service::SiblingCancellationGuard; +use trzcina_service::classify_future_outcome; +use trzcina_service::drain_to_completion; + +use crate::local_registered_service::LocalRegisteredService; + +pub struct LocalRunningServiceCollection { + cancellation_token: CancellationToken, + local_set: LocalSet, + running_services: Vec, + task_set: JoinSet<()>, +} + +impl LocalRunningServiceCollection { + #[must_use] + pub fn start( + registered: Vec, + cancellation_token: CancellationToken, + ) -> Self { + let mut running_services: Vec = Vec::with_capacity(registered.len()); + let mut task_set: JoinSet<()> = JoinSet::new(); + let local_set = LocalSet::new(); + let internal_cancellation_token = cancellation_token.child_token(); + + for LocalRegisteredService { name, service } in registered { + let (outcome_sender, outcome_receiver) = oneshot::channel::(); + let service_cancellation_token = internal_cancellation_token.clone(); + + task_set.spawn_local_on( + async move { + let _sibling_cancellation_guard = + SiblingCancellationGuard::new(service_cancellation_token.clone()); + let mut service = service; + let outcome = + classify_future_outcome(name, service.run(service_cancellation_token)) + .await; + let _ = outcome_sender.send(outcome); + }, + &local_set, + ); + + running_services.push(RunningService::new(name, outcome_receiver)); + } + + Self { + cancellation_token: internal_cancellation_token, + local_set, + running_services, + task_set, + } + } +} + +impl RunningCollection for LocalRunningServiceCollection { + async fn run_to_completion( + self, + options: RunToCompletionOptions, + ) -> ServiceShutdownOutcomeCollection { + let Self { + cancellation_token, + local_set, + running_services, + mut task_set, + } = self; + + let has_running_services = !running_services.is_empty(); + + local_set + .run_until(async { + drain_to_completion( + &mut task_set, + &cancellation_token, + has_running_services, + options.shutdown_deadline, + ) + .await; + }) + .await; + + let outcomes: Vec = + running_services.into_iter().map(Into::into).collect(); + + ServiceShutdownOutcomeCollection::new(outcomes) + } +} diff --git a/trzcina-local-service/src/local_service.rs b/trzcina-local-service/src/local_service.rs new file mode 100644 index 0000000..6f65087 --- /dev/null +++ b/trzcina-local-service/src/local_service.rs @@ -0,0 +1,12 @@ +use anyhow::Result; +use async_trait::async_trait; +use tokio_util::sync::CancellationToken; + +#[async_trait(?Send)] +pub trait LocalService: 'static { + fn name(&self) -> &'static str { + std::any::type_name::() + } + + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()>; +} diff --git a/trzcina-local-service/src/local_service_bundle.rs b/trzcina-local-service/src/local_service_bundle.rs new file mode 100644 index 0000000..24132e7 --- /dev/null +++ b/trzcina-local-service/src/local_service_bundle.rs @@ -0,0 +1,9 @@ +use anyhow::Result; +use async_trait::async_trait; + +use crate::local_service::LocalService; + +#[async_trait(?Send)] +pub trait LocalServiceBundle { + async fn services(self) -> Result>>; +} diff --git a/trzcina-local-service/src/local_service_manager.rs b/trzcina-local-service/src/local_service_manager.rs new file mode 100644 index 0000000..fedc90f --- /dev/null +++ b/trzcina-local-service/src/local_service_manager.rs @@ -0,0 +1,43 @@ +use anyhow::Result; +use tokio_util::sync::CancellationToken; +use trzcina_service::Manager; + +use crate::local_registered_service::LocalRegisteredService; +use crate::local_running_service_collection::LocalRunningServiceCollection; +use crate::local_service::LocalService; +use crate::local_service_bundle::LocalServiceBundle; + +#[derive(Default)] +pub struct LocalServiceManager { + services: Vec, +} + +impl LocalServiceManager { + pub async fn register_bundle( + &mut self, + bundle: TLocalServiceBundle, + ) -> Result<()> { + for service in bundle.services().await? { + let name = service.name(); + self.services.push(LocalRegisteredService { name, service }); + } + + Ok(()) + } + + pub fn register_service(&mut self, service: impl LocalService) { + let name = service.name(); + self.services.push(LocalRegisteredService { + name, + service: Box::new(service), + }); + } +} + +impl Manager for LocalServiceManager { + type Running = LocalRunningServiceCollection; + + fn start(self, cancellation_token: CancellationToken) -> LocalRunningServiceCollection { + LocalRunningServiceCollection::start(self.services, cancellation_token) + } +} diff --git a/trzcina-local-service/tests/local_coordinates_via_shared_holder_between_two_services.rs b/trzcina-local-service/tests/local_coordinates_via_shared_holder_between_two_services.rs new file mode 100644 index 0000000..ab38589 --- /dev/null +++ b/trzcina-local-service/tests/local_coordinates_via_shared_holder_between_two_services.rs @@ -0,0 +1,99 @@ +use std::cell::RefCell; +use std::rc::Rc; +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::Notify; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +const PRODUCED_VALUE: u32 = 42; + +struct CoordinatingService { + is_producer: bool, + notify: Rc, + observation_tx: Option>, + shared_state: Rc>>, +} + +#[async_trait(?Send)] +impl LocalService for CoordinatingService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + if self.is_producer { + *self.shared_state.borrow_mut() = Some(PRODUCED_VALUE); + self.notify.notify_one(); + cancellation_token.cancelled().await; + return Ok(()); + } + tokio::select! { + () = cancellation_token.cancelled() => return Ok(()), + () = self.notify.notified() => { + let observed_value = *self.shared_state.borrow(); + if let Some(value) = observed_value + && let Some(observation_tx) = self.observation_tx.take() + { + observation_tx.send(value).unwrap(); + } + } + } + cancellation_token.cancelled().await; + Ok(()) + } +} + +#[tokio::test] +async fn local_coordinates_via_shared_holder_between_two_services() { + let shared_state: Rc>> = Rc::new(RefCell::new(None)); + let notify = Rc::new(Notify::new()); + let (observation_tx, observation_rx) = oneshot::channel::(); + let cancellation_token = CancellationToken::new(); + let cancellation_token_for_run = cancellation_token.clone(); + + let mut manager = LocalServiceManager::default(); + manager.register_service(CoordinatingService { + is_producer: true, + notify: notify.clone(), + observation_tx: None, + shared_state: shared_state.clone(), + }); + manager.register_service(CoordinatingService { + is_producer: false, + notify: notify.clone(), + observation_tx: Some(observation_tx), + shared_state: shared_state.clone(), + }); + + let run_future = + manager + .start(cancellation_token_for_run) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }); + let trigger_future = async move { + let observed = observation_rx.await.unwrap(); + assert_eq!(observed, PRODUCED_VALUE); + cancellation_token.cancel(); + }; + + let (report, ()) = timeout(Duration::from_secs(5), async { + tokio::join!(run_future, trigger_future) + }) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 2); + for named_outcome in report.outcomes() { + assert!(matches!( + named_outcome.outcome, + ServiceShutdownOutcome::Completed + )); + } +} diff --git a/trzcina-local-service/tests/local_register_bundle_propagates_error_from_bundle.rs b/trzcina-local-service/tests/local_register_bundle_propagates_error_from_bundle.rs new file mode 100644 index 0000000..05464dc --- /dev/null +++ b/trzcina-local-service/tests/local_register_bundle_propagates_error_from_bundle.rs @@ -0,0 +1,21 @@ +use anyhow::Result; +use anyhow::anyhow; +use async_trait::async_trait; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceBundle; +use trzcina_local_service::LocalServiceManager; + +struct ErringBundle; + +#[async_trait(?Send)] +impl LocalServiceBundle for ErringBundle { + async fn services(self) -> Result>> { + Err(anyhow!("test bundle deliberately failed")) + } +} + +#[tokio::test] +async fn local_propagates_error_from_bundle() { + let mut manager = LocalServiceManager::default(); + manager.register_bundle(ErringBundle).await.unwrap_err(); +} diff --git a/trzcina-local-service/tests/local_register_bundle_runs_all_services_returned_by_bundle.rs b/trzcina-local-service/tests/local_register_bundle_runs_all_services_returned_by_bundle.rs new file mode 100644 index 0000000..1eb23a1 --- /dev/null +++ b/trzcina-local-service/tests/local_register_bundle_runs_all_services_returned_by_bundle.rs @@ -0,0 +1,75 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceBundle; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; + +struct BundleAndService { + observation_tx: Option>, + sibling_senders: Vec>, +} + +#[async_trait(?Send)] +impl LocalService for BundleAndService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + if let Some(observation_tx) = self.observation_tx.take() { + observation_tx.send(()).unwrap(); + } + Ok(()) + } +} + +#[async_trait(?Send)] +impl LocalServiceBundle for BundleAndService { + async fn services(self) -> Result>> { + let services: Vec> = self + .sibling_senders + .into_iter() + .map(|observation_tx| { + Box::new(BundleAndService { + observation_tx: Some(observation_tx), + sibling_senders: Vec::new(), + }) as Box + }) + .collect(); + Ok(services) + } +} + +#[tokio::test] +async fn local_runs_all_services_returned_by_bundle() { + let (first_tx, mut first_rx) = oneshot::channel::<()>(); + let (second_tx, mut second_rx) = oneshot::channel::<()>(); + + let bundle = BundleAndService { + observation_tx: None, + sibling_senders: vec![first_tx, second_tx], + }; + + let mut manager = LocalServiceManager::default(); + manager.register_bundle(bundle).await.unwrap(); + + timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), + ) + .await + .unwrap() + .into_result() + .unwrap(); + + first_rx.try_recv().unwrap(); + second_rx.try_recv().unwrap(); +} diff --git a/trzcina-local-service/tests/local_register_service_runs_registered_service.rs b/trzcina-local-service/tests/local_register_service_runs_registered_service.rs new file mode 100644 index 0000000..7a26bce --- /dev/null +++ b/trzcina-local-service/tests/local_register_service_runs_registered_service.rs @@ -0,0 +1,51 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; + +struct ObservableService { + observation_tx: Option>, +} + +#[async_trait(?Send)] +impl LocalService for ObservableService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + if let Some(observation_tx) = self.observation_tx.take() { + observation_tx.send(()).unwrap(); + } + Ok(()) + } +} + +#[tokio::test] +async fn local_runs_registered_service() { + let (observation_tx, mut observation_rx) = oneshot::channel::<()>(); + + let mut manager = LocalServiceManager::default(); + manager.register_service(ObservableService { + observation_tx: Some(observation_tx), + }); + + timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), + ) + .await + .unwrap() + .into_result() + .unwrap(); + + observation_rx.try_recv().unwrap(); +} diff --git a/trzcina-local-service/tests/local_run_aborts_hung_service_after_shutdown_deadline.rs b/trzcina-local-service/tests/local_run_aborts_hung_service_after_shutdown_deadline.rs new file mode 100644 index 0000000..339cfd1 --- /dev/null +++ b/trzcina-local-service/tests/local_run_aborts_hung_service_after_shutdown_deadline.rs @@ -0,0 +1,62 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::task::yield_now; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct ConfiguredService { + hang_ignoring_cancellation: bool, +} + +#[async_trait(?Send)] +impl LocalService for ConfiguredService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + if self.hang_ignoring_cancellation { + loop { + yield_now().await; + } + } + Ok(()) + } +} + +#[tokio::test] +async fn local_aborts_hung_service_after_shutdown_deadline() { + let mut manager = LocalServiceManager::default(); + manager.register_service(ConfiguredService { + hang_ignoring_cancellation: false, + }); + manager.register_service(ConfiguredService { + hang_ignoring_cancellation: true, + }); + + let report = timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_millis(50), + }), + ) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 2); + assert!(matches!( + report.outcomes()[0].outcome, + ServiceShutdownOutcome::Completed + )); + assert!(matches!( + report.outcomes()[1].outcome, + ServiceShutdownOutcome::AbortedByShutdownDeadline + )); + assert!(report.into_result().is_err()); +} diff --git a/trzcina-local-service/tests/local_run_aborts_hung_services_on_external_cancel.rs b/trzcina-local-service/tests/local_run_aborts_hung_services_on_external_cancel.rs new file mode 100644 index 0000000..af40e81 --- /dev/null +++ b/trzcina-local-service/tests/local_run_aborts_hung_services_on_external_cancel.rs @@ -0,0 +1,59 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::task::yield_now; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct CancellationIgnoringService; + +#[async_trait(?Send)] +impl LocalService for CancellationIgnoringService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + loop { + yield_now().await; + } + } +} + +#[tokio::test] +async fn local_aborts_hung_services_on_external_cancel() { + let cancellation_token = CancellationToken::new(); + let cancellation_token_for_run = cancellation_token.clone(); + + let mut manager = LocalServiceManager::default(); + manager.register_service(CancellationIgnoringService); + manager.register_service(CancellationIgnoringService); + + let run_future = + manager + .start(cancellation_token_for_run) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_millis(50), + }); + let trigger_future = async move { + cancellation_token.cancel(); + }; + + let (report, ()) = timeout(Duration::from_secs(5), async { + tokio::join!(run_future, trigger_future) + }) + .await + .expect("manager must return within outer timeout when token is externally cancelled"); + + assert_eq!(report.outcomes().len(), 2); + for named_outcome in report.outcomes() { + assert!(matches!( + named_outcome.outcome, + ServiceShutdownOutcome::AbortedByShutdownDeadline, + )); + } + assert!(report.into_result().is_err()); +} diff --git a/trzcina-local-service/tests/local_run_cancels_all_services_when_external_token_cancelled.rs b/trzcina-local-service/tests/local_run_cancels_all_services_when_external_token_cancelled.rs new file mode 100644 index 0000000..aab9a31 --- /dev/null +++ b/trzcina-local-service/tests/local_run_cancels_all_services_when_external_token_cancelled.rs @@ -0,0 +1,71 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct AwaitingService { + observation_tx: Option>, +} + +#[async_trait(?Send)] +impl LocalService for AwaitingService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + cancellation_token.cancelled().await; + if let Some(observation_tx) = self.observation_tx.take() { + observation_tx.send(()).unwrap(); + } + Ok(()) + } +} + +#[tokio::test] +async fn local_cancels_all_services_when_external_token_cancelled() { + let cancellation_token = CancellationToken::new(); + let cancellation_token_for_run = cancellation_token.clone(); + let mut manager = LocalServiceManager::default(); + let mut observation_receivers = Vec::new(); + + for _ in 0..5 { + let (observation_tx, observation_rx) = oneshot::channel::<()>(); + manager.register_service(AwaitingService { + observation_tx: Some(observation_tx), + }); + observation_receivers.push(observation_rx); + } + + let run_future = + manager + .start(cancellation_token_for_run) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }); + let trigger_future = async move { + cancellation_token.cancel(); + }; + + let (report, ()) = timeout(Duration::from_secs(5), async { + tokio::join!(run_future, trigger_future) + }) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 5); + for named_outcome in report.outcomes() { + assert!(matches!( + named_outcome.outcome, + ServiceShutdownOutcome::Completed + )); + } + for mut observation_rx in observation_receivers { + observation_rx.try_recv().unwrap(); + } +} diff --git a/trzcina-local-service/tests/local_run_cancels_siblings_when_one_service_finishes_first.rs b/trzcina-local-service/tests/local_run_cancels_siblings_when_one_service_finishes_first.rs new file mode 100644 index 0000000..bcd1e59 --- /dev/null +++ b/trzcina-local-service/tests/local_run_cancels_siblings_when_one_service_finishes_first.rs @@ -0,0 +1,73 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct ConfiguredService { + finish_immediately: bool, + observation_tx: Option>, +} + +#[async_trait(?Send)] +impl LocalService for ConfiguredService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + if self.finish_immediately { + return Ok(()); + } + cancellation_token.cancelled().await; + if let Some(observation_tx) = self.observation_tx.take() { + observation_tx.send(()).unwrap(); + } + Ok(()) + } +} + +#[tokio::test] +async fn local_cancels_siblings_when_one_service_finishes_first() { + let mut manager = LocalServiceManager::default(); + manager.register_service(ConfiguredService { + finish_immediately: true, + observation_tx: None, + }); + + let mut sibling_observation_receivers = Vec::new(); + for _ in 0..4 { + let (observation_tx, observation_rx) = oneshot::channel::<()>(); + manager.register_service(ConfiguredService { + finish_immediately: false, + observation_tx: Some(observation_tx), + }); + sibling_observation_receivers.push(observation_rx); + } + + let report = timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), + ) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 5); + for named_outcome in report.outcomes() { + assert!(matches!( + named_outcome.outcome, + ServiceShutdownOutcome::Completed + )); + } + for mut observation_rx in sibling_observation_receivers { + observation_rx.try_recv().unwrap(); + } +} diff --git a/trzcina-local-service/tests/local_run_completes_immediately_when_no_services_registered.rs b/trzcina-local-service/tests/local_run_completes_immediately_when_no_services_registered.rs new file mode 100644 index 0000000..22d80a0 --- /dev/null +++ b/trzcina-local-service/tests/local_run_completes_immediately_when_no_services_registered.rs @@ -0,0 +1,25 @@ +use std::time::Duration; + +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; + +#[tokio::test] +async fn local_completes_immediately_when_no_services_registered() { + let manager = LocalServiceManager::default(); + timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), + ) + .await + .unwrap() + .into_result() + .unwrap(); +} diff --git a/trzcina-local-service/tests/local_run_completes_when_all_services_finish_simultaneously.rs b/trzcina-local-service/tests/local_run_completes_when_all_services_finish_simultaneously.rs new file mode 100644 index 0000000..e24b786 --- /dev/null +++ b/trzcina-local-service/tests/local_run_completes_when_all_services_finish_simultaneously.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct InstantOkService; + +#[async_trait(?Send)] +impl LocalService for InstantOkService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + Ok(()) + } +} + +#[tokio::test] +async fn local_completes_when_all_services_finish_simultaneously() { + let mut manager = LocalServiceManager::default(); + for _ in 0..5 { + manager.register_service(InstantOkService); + } + + let report = timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), + ) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 5); + for named_outcome in report.outcomes() { + assert!(matches!( + named_outcome.outcome, + ServiceShutdownOutcome::Completed + )); + } +} diff --git a/trzcina-local-service/tests/local_run_leaves_external_token_uncancelled_after_error_exit.rs b/trzcina-local-service/tests/local_run_leaves_external_token_uncancelled_after_error_exit.rs new file mode 100644 index 0000000..d9257eb --- /dev/null +++ b/trzcina-local-service/tests/local_run_leaves_external_token_uncancelled_after_error_exit.rs @@ -0,0 +1,37 @@ +use std::time::Duration; + +use anyhow::Result; +use anyhow::anyhow; +use async_trait::async_trait; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; + +struct ImmediatelyErroringLocalService; + +#[async_trait(?Send)] +impl LocalService for ImmediatelyErroringLocalService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + Err(anyhow!("service failed")) + } +} + +#[tokio::test] +async fn local_leaves_external_token_uncancelled_after_error_exit() { + let external_token = CancellationToken::new(); + let mut manager = LocalServiceManager::default(); + manager.register_service(ImmediatelyErroringLocalService); + + let _ = manager + .start(external_token.clone()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }) + .await + .into_result(); + + assert!(!external_token.is_cancelled()); +} diff --git a/trzcina-local-service/tests/local_run_leaves_external_token_uncancelled_after_normal_exit.rs b/trzcina-local-service/tests/local_run_leaves_external_token_uncancelled_after_normal_exit.rs new file mode 100644 index 0000000..d5389e1 --- /dev/null +++ b/trzcina-local-service/tests/local_run_leaves_external_token_uncancelled_after_normal_exit.rs @@ -0,0 +1,37 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; + +struct ImmediatelyExitingLocalService; + +#[async_trait(?Send)] +impl LocalService for ImmediatelyExitingLocalService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + Ok(()) + } +} + +#[tokio::test] +async fn local_leaves_external_token_uncancelled_after_normal_exit() { + let external_token = CancellationToken::new(); + let mut manager = LocalServiceManager::default(); + manager.register_service(ImmediatelyExitingLocalService); + + manager + .start(external_token.clone()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }) + .await + .into_result() + .unwrap(); + + assert!(!external_token.is_cancelled()); +} diff --git a/trzcina-local-service/tests/local_run_records_all_failures_when_multiple_services_error.rs b/trzcina-local-service/tests/local_run_records_all_failures_when_multiple_services_error.rs new file mode 100644 index 0000000..9282f9e --- /dev/null +++ b/trzcina-local-service/tests/local_run_records_all_failures_when_multiple_services_error.rs @@ -0,0 +1,83 @@ +use std::time::Duration; + +use anyhow::Result; +use anyhow::anyhow; +use async_trait::async_trait; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct ConfiguredService { + return_err: bool, + observation_tx: Option>, +} + +#[async_trait(?Send)] +impl LocalService for ConfiguredService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + if self.return_err { + return Err(anyhow!("erroring service deliberately failed")); + } + cancellation_token.cancelled().await; + if let Some(observation_tx) = self.observation_tx.take() { + observation_tx.send(()).unwrap(); + } + Ok(()) + } +} + +#[tokio::test] +async fn local_records_all_failures_when_multiple_services_error() { + let mut manager = LocalServiceManager::default(); + for _ in 0..3 { + manager.register_service(ConfiguredService { + return_err: true, + observation_tx: None, + }); + } + + let mut sibling_observation_receivers = Vec::new(); + for _ in 0..2 { + let (observation_tx, observation_rx) = oneshot::channel::<()>(); + manager.register_service(ConfiguredService { + return_err: false, + observation_tx: Some(observation_tx), + }); + sibling_observation_receivers.push(observation_rx); + } + + let report = timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), + ) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 5); + let errored_count = report + .outcomes() + .iter() + .filter(|named_outcome| matches!(named_outcome.outcome, ServiceShutdownOutcome::Errored(_))) + .count(); + let completed_count = report + .outcomes() + .iter() + .filter(|named_outcome| matches!(named_outcome.outcome, ServiceShutdownOutcome::Completed)) + .count(); + assert_eq!(errored_count, 3); + assert_eq!(completed_count, 2); + for mut observation_rx in sibling_observation_receivers { + observation_rx.try_recv().unwrap(); + } + assert!(report.into_result().is_err()); +} diff --git a/trzcina-local-service/tests/local_run_records_non_string_panic_payload.rs b/trzcina-local-service/tests/local_run_records_non_string_panic_payload.rs new file mode 100644 index 0000000..dab750a --- /dev/null +++ b/trzcina-local-service/tests/local_run_records_non_string_panic_payload.rs @@ -0,0 +1,45 @@ +use std::panic::panic_any; +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct NonStringPanickingService; + +#[async_trait(?Send)] +impl LocalService for NonStringPanickingService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + panic_any(42_u32); + } +} + +#[tokio::test] +async fn local_records_non_string_panic_payload_as_generic_message() { + let mut manager = LocalServiceManager::default(); + manager.register_service(NonStringPanickingService); + + let report = timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), + ) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 1); + assert!(matches!( + report.outcomes()[0].outcome, + ServiceShutdownOutcome::Panicked(_) + )); +} diff --git a/trzcina-local-service/tests/local_run_records_service_error_and_cancels_siblings.rs b/trzcina-local-service/tests/local_run_records_service_error_and_cancels_siblings.rs new file mode 100644 index 0000000..fb3489c --- /dev/null +++ b/trzcina-local-service/tests/local_run_records_service_error_and_cancels_siblings.rs @@ -0,0 +1,79 @@ +use std::time::Duration; + +use anyhow::Result; +use anyhow::anyhow; +use async_trait::async_trait; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct ConfiguredService { + return_err: bool, + observation_tx: Option>, +} + +#[async_trait(?Send)] +impl LocalService for ConfiguredService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + if self.return_err { + return Err(anyhow!("erroring service deliberately failed")); + } + cancellation_token.cancelled().await; + if let Some(observation_tx) = self.observation_tx.take() { + observation_tx.send(()).unwrap(); + } + Ok(()) + } +} + +#[tokio::test] +async fn local_records_service_error_and_cancels_siblings() { + let mut manager = LocalServiceManager::default(); + manager.register_service(ConfiguredService { + return_err: true, + observation_tx: None, + }); + + let mut sibling_observation_receivers = Vec::new(); + for _ in 0..4 { + let (observation_tx, observation_rx) = oneshot::channel::<()>(); + manager.register_service(ConfiguredService { + return_err: false, + observation_tx: Some(observation_tx), + }); + sibling_observation_receivers.push(observation_rx); + } + + let report = timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), + ) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 5); + assert!(matches!( + report.outcomes()[0].outcome, + ServiceShutdownOutcome::Errored(_) + )); + for sibling_outcome in &report.outcomes()[1..] { + assert!(matches!( + sibling_outcome.outcome, + ServiceShutdownOutcome::Completed + )); + } + for mut observation_rx in sibling_observation_receivers { + observation_rx.try_recv().unwrap(); + } + assert!(report.into_result().is_err()); +} diff --git a/trzcina-local-service/tests/local_run_records_service_panic_and_cancels_siblings.rs b/trzcina-local-service/tests/local_run_records_service_panic_and_cancels_siblings.rs new file mode 100644 index 0000000..670e1dd --- /dev/null +++ b/trzcina-local-service/tests/local_run_records_service_panic_and_cancels_siblings.rs @@ -0,0 +1,82 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +const PANIC_MARKER: &str = "deliberately panicking for cascade test"; + +struct ConfiguredService { + should_panic: bool, + observation_tx: Option>, +} + +#[async_trait(?Send)] +impl LocalService for ConfiguredService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + if self.should_panic { + panic!("{}", PANIC_MARKER); + } + cancellation_token.cancelled().await; + if let Some(observation_tx) = self.observation_tx.take() { + observation_tx.send(()).unwrap(); + } + Ok(()) + } +} + +#[tokio::test] +async fn local_records_service_panic_and_cancels_siblings() { + let mut manager = LocalServiceManager::default(); + manager.register_service(ConfiguredService { + should_panic: true, + observation_tx: None, + }); + + let mut sibling_observation_receivers = Vec::new(); + for _ in 0..4 { + let (observation_tx, observation_rx) = oneshot::channel::<()>(); + manager.register_service(ConfiguredService { + should_panic: false, + observation_tx: Some(observation_tx), + }); + sibling_observation_receivers.push(observation_rx); + } + + let report = timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), + ) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 5); + match &report.outcomes()[0].outcome { + ServiceShutdownOutcome::Panicked(panic_message) => { + assert!(panic_message.contains(PANIC_MARKER)); + } + other_outcome => panic!("expected ServiceShutdownOutcome::Panicked, got {other_outcome:?}"), + } + for sibling_outcome in &report.outcomes()[1..] { + assert!(matches!( + sibling_outcome.outcome, + ServiceShutdownOutcome::Completed + )); + } + for mut observation_rx in sibling_observation_receivers { + observation_rx.try_recv().unwrap(); + } + assert!(report.into_result().is_err()); +} diff --git a/trzcina-local-service/tests/local_run_records_string_literal_panic_payload.rs b/trzcina-local-service/tests/local_run_records_string_literal_panic_payload.rs new file mode 100644 index 0000000..34e84f3 --- /dev/null +++ b/trzcina-local-service/tests/local_run_records_string_literal_panic_payload.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +const PANIC_LITERAL: &str = "deliberately panicking with a string literal"; + +struct LiteralPanickingService; + +#[async_trait(?Send)] +impl LocalService for LiteralPanickingService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + panic!("deliberately panicking with a string literal"); + } +} + +#[tokio::test] +async fn local_records_string_literal_panic_payload() { + let mut manager = LocalServiceManager::default(); + manager.register_service(LiteralPanickingService); + + let report = timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), + ) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 1); + match &report.outcomes()[0].outcome { + ServiceShutdownOutcome::Panicked(panic_message) => { + assert!(panic_message.contains(PANIC_LITERAL)); + } + other_outcome => panic!("expected ServiceShutdownOutcome::Panicked, got {other_outcome:?}"), + } +} diff --git a/trzcina-local-service/tests/local_runs_non_send_service.rs b/trzcina-local-service/tests/local_runs_non_send_service.rs new file mode 100644 index 0000000..28ab3f5 --- /dev/null +++ b/trzcina-local-service/tests/local_runs_non_send_service.rs @@ -0,0 +1,65 @@ +use std::cell::RefCell; +use std::rc::Rc; +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::oneshot; +use tokio::task::yield_now; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct NonSendCounterService { + counter: Rc>, + observation_tx: Option>, +} + +#[async_trait(?Send)] +impl LocalService for NonSendCounterService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + *self.counter.borrow_mut() += 1; + yield_now().await; + *self.counter.borrow_mut() += 1; + if let Some(observation_tx) = self.observation_tx.take() { + observation_tx.send(*self.counter.borrow()).unwrap(); + } + Ok(()) + } +} + +#[tokio::test] +async fn local_runs_non_send_service() { + let counter: Rc> = Rc::new(RefCell::new(0)); + let (observation_tx, mut observation_rx) = oneshot::channel::(); + + let mut manager = LocalServiceManager::default(); + manager.register_service(NonSendCounterService { + counter: counter.clone(), + observation_tx: Some(observation_tx), + }); + + let report = timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), + ) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 1); + assert!(matches!( + report.outcomes()[0].outcome, + ServiceShutdownOutcome::Completed + )); + assert_eq!(observation_rx.try_recv().unwrap(), 2); + assert_eq!(*counter.borrow(), 2); +} diff --git a/trzcina-local-service/tests/local_runs_tokio_spawn_local_inside_service.rs b/trzcina-local-service/tests/local_runs_tokio_spawn_local_inside_service.rs new file mode 100644 index 0000000..b2137e2 --- /dev/null +++ b/trzcina-local-service/tests/local_runs_tokio_spawn_local_inside_service.rs @@ -0,0 +1,59 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +const CHILD_TASK_RESULT: u32 = 42; + +struct SpawnLocalUsingService { + observation_tx: Option>, +} + +#[async_trait(?Send)] +impl LocalService for SpawnLocalUsingService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + let child_join_handle = tokio::task::spawn_local(async { CHILD_TASK_RESULT }); + let observed = child_join_handle.await?; + if let Some(observation_tx) = self.observation_tx.take() { + observation_tx.send(observed).unwrap(); + } + Ok(()) + } +} + +#[tokio::test] +async fn local_runs_tokio_spawn_local_inside_service() { + let (observation_tx, mut observation_rx) = oneshot::channel::(); + + let mut manager = LocalServiceManager::default(); + manager.register_service(SpawnLocalUsingService { + observation_tx: Some(observation_tx), + }); + + let report = timeout( + Duration::from_secs(5), + manager + .start(CancellationToken::new()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), + ) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 1); + assert!(matches!( + report.outcomes()[0].outcome, + ServiceShutdownOutcome::Completed + )); + assert_eq!(observation_rx.try_recv().unwrap(), CHILD_TASK_RESULT); +} diff --git a/trzcina-local-service/tests/local_supports_actix_style_shutdown_signal_pattern.rs b/trzcina-local-service/tests/local_supports_actix_style_shutdown_signal_pattern.rs new file mode 100644 index 0000000..ffb8f22 --- /dev/null +++ b/trzcina-local-service/tests/local_supports_actix_style_shutdown_signal_pattern.rs @@ -0,0 +1,68 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct ActixStyleService { + started_tx: Option>, +} + +#[async_trait(?Send)] +impl LocalService for ActixStyleService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + if let Some(started_tx) = self.started_tx.take() { + started_tx.send(()).unwrap(); + } + loop { + if cancellation_token.is_cancelled() { + break; + } + cancellation_token.cancelled().await; + } + Ok(()) + } +} + +#[tokio::test] +async fn local_supports_actix_style_shutdown_signal_pattern() { + let cancellation_token = CancellationToken::new(); + let cancellation_token_for_run = cancellation_token.clone(); + let (started_tx, started_rx) = oneshot::channel::<()>(); + + let mut manager = LocalServiceManager::default(); + manager.register_service(ActixStyleService { + started_tx: Some(started_tx), + }); + + let run_future = + manager + .start(cancellation_token_for_run) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }); + let trigger_future = async move { + started_rx.await.unwrap(); + cancellation_token.cancel(); + }; + + let (report, ()) = timeout(Duration::from_secs(5), async { + tokio::join!(run_future, trigger_future) + }) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 1); + assert!(matches!( + report.outcomes()[0].outcome, + ServiceShutdownOutcome::Completed + )); +} diff --git a/trzcina-local-service/tests/local_supports_internal_retry_loop_pattern.rs b/trzcina-local-service/tests/local_supports_internal_retry_loop_pattern.rs new file mode 100644 index 0000000..a85e914 --- /dev/null +++ b/trzcina-local-service/tests/local_supports_internal_retry_loop_pattern.rs @@ -0,0 +1,68 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::oneshot; +use tokio::time::sleep; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct RetryLoopService { + backoff_started_tx: Option>, +} + +#[async_trait(?Send)] +impl LocalService for RetryLoopService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + loop { + if let Some(backoff_started_tx) = self.backoff_started_tx.take() { + backoff_started_tx.send(()).unwrap(); + } + tokio::select! { + () = cancellation_token.cancelled() => return Ok(()), + () = sleep(Duration::from_secs(10)) => continue, + } + } + } +} + +#[tokio::test] +async fn local_supports_internal_retry_loop_pattern() { + let (backoff_started_tx, backoff_started_rx) = oneshot::channel::<()>(); + let cancellation_token = CancellationToken::new(); + let cancellation_token_for_run = cancellation_token.clone(); + + let mut manager = LocalServiceManager::default(); + manager.register_service(RetryLoopService { + backoff_started_tx: Some(backoff_started_tx), + }); + + let run_future = + manager + .start(cancellation_token_for_run) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }); + let trigger_future = async move { + backoff_started_rx.await.unwrap(); + cancellation_token.cancel(); + }; + + let (report, ()) = timeout(Duration::from_secs(5), async { + tokio::join!(run_future, trigger_future) + }) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 1); + assert!(matches!( + report.outcomes()[0].outcome, + ServiceShutdownOutcome::Completed + )); +} diff --git a/trzcina-local-service/tests/local_supports_interval_ticker_reconciliation_pattern.rs b/trzcina-local-service/tests/local_supports_interval_ticker_reconciliation_pattern.rs new file mode 100644 index 0000000..ba01f4b --- /dev/null +++ b/trzcina-local-service/tests/local_supports_interval_ticker_reconciliation_pattern.rs @@ -0,0 +1,83 @@ +use std::cell::Cell; +use std::rc::Rc; +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::oneshot; +use tokio::time::interval; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct ReconciliationService { + first_tick_tx: Option>, + tick_counter: Rc>, +} + +#[async_trait(?Send)] +impl LocalService for ReconciliationService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + let mut ticker = interval(Duration::from_millis(10)); + loop { + tokio::select! { + () = cancellation_token.cancelled() => return Ok(()), + _ = ticker.tick() => { + let previous = self.tick_counter.get(); + self.tick_counter.set(previous + 1); + if previous == 0 + && let Some(first_tick_tx) = self.first_tick_tx.take() + { + first_tick_tx.send(()).unwrap(); + } + } + } + } + } +} + +#[tokio::test] +async fn local_supports_interval_ticker_reconciliation_pattern() { + let tick_counter: Rc> = Rc::new(Cell::new(0)); + let cancellation_token = CancellationToken::new(); + let cancellation_token_for_run = cancellation_token.clone(); + let (first_tick_tx, first_tick_rx) = oneshot::channel::<()>(); + + let mut manager = LocalServiceManager::default(); + manager.register_service(ReconciliationService { + first_tick_tx: Some(first_tick_tx), + tick_counter: tick_counter.clone(), + }); + + let run_future = + manager + .start(cancellation_token_for_run) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }); + let trigger_future = async move { + first_tick_rx.await.unwrap(); + cancellation_token.cancel(); + }; + + let (report, ()) = timeout(Duration::from_secs(5), async { + tokio::join!(run_future, trigger_future) + }) + .await + .unwrap(); + + assert!( + tick_counter.get() > 0, + "ticker must have fired at least once" + ); + assert_eq!(report.outcomes().len(), 1); + assert!(matches!( + report.outcomes()[0].outcome, + ServiceShutdownOutcome::Completed + )); +} diff --git a/trzcina-local-service/tests/local_supports_multi_channel_select_pump_pattern.rs b/trzcina-local-service/tests/local_supports_multi_channel_select_pump_pattern.rs new file mode 100644 index 0000000..3d84c58 --- /dev/null +++ b/trzcina-local-service/tests/local_supports_multi_channel_select_pump_pattern.rs @@ -0,0 +1,88 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct MultiChannelPumpService { + primary_observed_tx: Option>, + primary_rx: mpsc::Receiver<()>, + secondary_observed_tx: Option>, + secondary_rx: mpsc::Receiver<()>, +} + +#[async_trait(?Send)] +impl LocalService for MultiChannelPumpService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + loop { + tokio::select! { + () = cancellation_token.cancelled() => return Ok(()), + Some(()) = self.primary_rx.recv() => { + if let Some(primary_observed_tx) = self.primary_observed_tx.take() { + primary_observed_tx.send(()).unwrap(); + } + } + Some(()) = self.secondary_rx.recv() => { + if let Some(secondary_observed_tx) = self.secondary_observed_tx.take() { + secondary_observed_tx.send(()).unwrap(); + } + } + } + } + } +} + +#[tokio::test] +async fn local_supports_multi_channel_select_pump_pattern() { + let (primary_tx, primary_rx) = mpsc::channel::<()>(1); + let (secondary_tx, secondary_rx) = mpsc::channel::<()>(1); + let (primary_observed_tx, primary_observed_rx) = oneshot::channel::<()>(); + let (secondary_observed_tx, secondary_observed_rx) = oneshot::channel::<()>(); + let cancellation_token = CancellationToken::new(); + let cancellation_token_for_run = cancellation_token.clone(); + + let mut manager = LocalServiceManager::default(); + manager.register_service(MultiChannelPumpService { + primary_observed_tx: Some(primary_observed_tx), + primary_rx, + secondary_observed_tx: Some(secondary_observed_tx), + secondary_rx, + }); + + let run_future = + manager + .start(cancellation_token_for_run) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }); + let trigger_future = async move { + primary_tx.send(()).await.unwrap(); + secondary_tx.send(()).await.unwrap(); + + primary_observed_rx.await.unwrap(); + secondary_observed_rx.await.unwrap(); + + cancellation_token.cancel(); + }; + + let (report, ()) = timeout(Duration::from_secs(5), async { + tokio::join!(run_future, trigger_future) + }) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 1); + assert!(matches!( + report.outcomes()[0].outcome, + ServiceShutdownOutcome::Completed + )); +} diff --git a/trzcina-local-service/tests/local_supports_mutable_internal_state_across_iterations.rs b/trzcina-local-service/tests/local_supports_mutable_internal_state_across_iterations.rs new file mode 100644 index 0000000..666d313 --- /dev/null +++ b/trzcina-local-service/tests/local_supports_mutable_internal_state_across_iterations.rs @@ -0,0 +1,88 @@ +use std::collections::VecDeque; +use std::rc::Rc; +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::Notify; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct StatefulService { + iteration_count: usize, + notify: Rc, + work_observers: VecDeque>, +} + +#[async_trait(?Send)] +impl LocalService for StatefulService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + loop { + self.iteration_count += 1; + if let Some(work_observer) = self.work_observers.pop_front() { + work_observer.send(self.iteration_count).unwrap(); + } + tokio::select! { + () = cancellation_token.cancelled() => return Ok(()), + () = self.notify.notified() => continue, + } + } + } +} + +#[tokio::test] +async fn local_supports_mutable_internal_state_across_iterations() { + let notify = Rc::new(Notify::new()); + let (first_work_tx, first_work_rx) = oneshot::channel::(); + let (second_work_tx, second_work_rx) = oneshot::channel::(); + let (third_work_tx, third_work_rx) = oneshot::channel::(); + let cancellation_token = CancellationToken::new(); + let cancellation_token_for_run = cancellation_token.clone(); + + let mut manager = LocalServiceManager::default(); + manager.register_service(StatefulService { + iteration_count: 0, + notify: notify.clone(), + work_observers: VecDeque::from(vec![first_work_tx, second_work_tx, third_work_tx]), + }); + + let run_future = + manager + .start(cancellation_token_for_run) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }); + let trigger_future = async move { + let first_count = first_work_rx.await.unwrap(); + assert_eq!(first_count, 1); + + notify.notify_one(); + let second_count = second_work_rx.await.unwrap(); + assert_eq!(second_count, 2); + + notify.notify_one(); + let third_count = third_work_rx.await.unwrap(); + assert_eq!(third_count, 3); + + cancellation_token.cancel(); + }; + + let (report, ()) = timeout(Duration::from_secs(5), async { + tokio::join!(run_future, trigger_future) + }) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 1); + assert!(matches!( + report.outcomes()[0].outcome, + ServiceShutdownOutcome::Completed + )); +} diff --git a/trzcina-local-service/tests/local_supports_notify_driven_event_loop_pattern.rs b/trzcina-local-service/tests/local_supports_notify_driven_event_loop_pattern.rs new file mode 100644 index 0000000..d637c11 --- /dev/null +++ b/trzcina-local-service/tests/local_supports_notify_driven_event_loop_pattern.rs @@ -0,0 +1,79 @@ +use std::collections::VecDeque; +use std::rc::Rc; +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio::sync::Notify; +use tokio::sync::oneshot; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; +use trzcina_local_service::LocalService; +use trzcina_local_service::LocalServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; + +struct NotifyDrivenService { + notify: Rc, + work_observers: VecDeque>, +} + +#[async_trait(?Send)] +impl LocalService for NotifyDrivenService { + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> { + loop { + if let Some(work_observer) = self.work_observers.pop_front() { + work_observer.send(()).unwrap(); + } + tokio::select! { + () = cancellation_token.cancelled() => return Ok(()), + () = self.notify.notified() => continue, + } + } + } +} + +#[tokio::test] +async fn local_supports_notify_driven_event_loop_pattern() { + let notify = Rc::new(Notify::new()); + let (first_work_tx, first_work_rx) = oneshot::channel::<()>(); + let (second_work_tx, second_work_rx) = oneshot::channel::<()>(); + let (third_work_tx, third_work_rx) = oneshot::channel::<()>(); + let cancellation_token = CancellationToken::new(); + let cancellation_token_for_run = cancellation_token.clone(); + + let mut manager = LocalServiceManager::default(); + manager.register_service(NotifyDrivenService { + notify: notify.clone(), + work_observers: VecDeque::from(vec![first_work_tx, second_work_tx, third_work_tx]), + }); + + let run_future = + manager + .start(cancellation_token_for_run) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }); + let trigger_future = async move { + first_work_rx.await.unwrap(); + notify.notify_one(); + second_work_rx.await.unwrap(); + notify.notify_one(); + third_work_rx.await.unwrap(); + cancellation_token.cancel(); + }; + + let (report, ()) = timeout(Duration::from_secs(5), async { + tokio::join!(run_future, trigger_future) + }) + .await + .unwrap(); + + assert_eq!(report.outcomes().len(), 1); + assert!(matches!( + report.outcomes()[0].outcome, + ServiceShutdownOutcome::Completed + )); +} diff --git a/trzcina-sendable-service/Cargo.toml b/trzcina-sendable-service/Cargo.toml new file mode 100644 index 0000000..87d7679 --- /dev/null +++ b/trzcina-sendable-service/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "trzcina-sendable-service" +version = "0.2.0" +edition = "2024" +license = "Apache-2.0" +description = "Sendable (Send + 'static) services for trzcina: cooperative async lifecycle on a multi-thread runtime." +repository = "https://github.com/intentee/trzcina" +homepage = "https://github.com/intentee/trzcina" +readme = "../README.md" +authors = ["Intentee"] +keywords = ["async", "service", "lifecycle", "cancellation", "tokio"] +categories = ["asynchronous", "concurrency"] + +[dependencies] +anyhow = { workspace = true } +async-trait = { workspace = true } +futures-util = { workspace = true } +log = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } +trzcina-service = { version = "0.2.0", path = "../trzcina-service" } + +[lints] +workspace = true diff --git a/trzcina-sendable-service/src/lib.rs b/trzcina-sendable-service/src/lib.rs new file mode 100644 index 0000000..37d0209 --- /dev/null +++ b/trzcina-sendable-service/src/lib.rs @@ -0,0 +1,11 @@ +mod registered_service; +mod running_service_collection; +mod service; +mod service_bundle; +mod service_manager; + +pub use crate::registered_service::RegisteredService; +pub use crate::running_service_collection::RunningServiceCollection; +pub use crate::service::Service; +pub use crate::service_bundle::ServiceBundle; +pub use crate::service_manager::ServiceManager; diff --git a/trzcina/src/registered_service.rs b/trzcina-sendable-service/src/registered_service.rs similarity index 100% rename from trzcina/src/registered_service.rs rename to trzcina-sendable-service/src/registered_service.rs diff --git a/trzcina-sendable-service/src/running_service_collection.rs b/trzcina-sendable-service/src/running_service_collection.rs new file mode 100644 index 0000000..c26df12 --- /dev/null +++ b/trzcina-sendable-service/src/running_service_collection.rs @@ -0,0 +1,82 @@ +use tokio::sync::oneshot; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::RunningService; +use trzcina_service::ServiceShutdownOutcome; +use trzcina_service::ServiceShutdownOutcomeCollection; +use trzcina_service::ServiceShutdownOutcomeWithServiceName; +use trzcina_service::SiblingCancellationGuard; +use trzcina_service::classify_future_outcome; +use trzcina_service::drain_to_completion; + +use crate::registered_service::RegisteredService; + +pub struct RunningServiceCollection { + cancellation_token: CancellationToken, + running_services: Vec, + task_set: JoinSet<()>, +} + +impl RunningServiceCollection { + #[must_use] + pub fn start( + registered: Vec, + cancellation_token: CancellationToken, + ) -> Self { + let mut running_services: Vec = Vec::with_capacity(registered.len()); + let mut task_set: JoinSet<()> = JoinSet::new(); + let internal_cancellation_token = cancellation_token.child_token(); + + for RegisteredService { name, service } in registered { + let (outcome_sender, outcome_receiver) = oneshot::channel::(); + let service_cancellation_token = internal_cancellation_token.clone(); + + task_set.spawn(async move { + let _sibling_cancellation_guard = + SiblingCancellationGuard::new(service_cancellation_token.clone()); + let mut service = service; + let outcome = + classify_future_outcome(name, service.run(service_cancellation_token)).await; + let _ = outcome_sender.send(outcome); + }); + + running_services.push(RunningService::new(name, outcome_receiver)); + } + + Self { + cancellation_token: internal_cancellation_token, + running_services, + task_set, + } + } +} + +impl RunningCollection for RunningServiceCollection { + async fn run_to_completion( + self, + options: RunToCompletionOptions, + ) -> ServiceShutdownOutcomeCollection { + let Self { + cancellation_token, + running_services, + mut task_set, + } = self; + + let has_running_services = !running_services.is_empty(); + + drain_to_completion( + &mut task_set, + &cancellation_token, + has_running_services, + options.shutdown_deadline, + ) + .await; + + let outcomes: Vec = + running_services.into_iter().map(Into::into).collect(); + + ServiceShutdownOutcomeCollection::new(outcomes) + } +} diff --git a/trzcina/src/service.rs b/trzcina-sendable-service/src/service.rs similarity index 99% rename from trzcina/src/service.rs rename to trzcina-sendable-service/src/service.rs index 24af5be..ab7a698 100644 --- a/trzcina/src/service.rs +++ b/trzcina-sendable-service/src/service.rs @@ -7,5 +7,6 @@ pub trait Service: Send + 'static { fn name(&self) -> &'static str { std::any::type_name::() } + async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()>; } diff --git a/trzcina/src/service_bundle.rs b/trzcina-sendable-service/src/service_bundle.rs similarity index 84% rename from trzcina/src/service_bundle.rs rename to trzcina-sendable-service/src/service_bundle.rs index 6747aa8..0503a32 100644 --- a/trzcina/src/service_bundle.rs +++ b/trzcina-sendable-service/src/service_bundle.rs @@ -1,7 +1,7 @@ use anyhow::Result; use async_trait::async_trait; -use crate::Service; +use crate::service::Service; #[async_trait] pub trait ServiceBundle { diff --git a/trzcina/src/service_manager.rs b/trzcina-sendable-service/src/service_manager.rs similarity index 79% rename from trzcina/src/service_manager.rs rename to trzcina-sendable-service/src/service_manager.rs index 079bd81..ea90dd6 100644 --- a/trzcina/src/service_manager.rs +++ b/trzcina-sendable-service/src/service_manager.rs @@ -1,10 +1,11 @@ use anyhow::Result; use tokio_util::sync::CancellationToken; +use trzcina_service::Manager; -use crate::ServiceBundle; use crate::registered_service::RegisteredService; use crate::running_service_collection::RunningServiceCollection; use crate::service::Service; +use crate::service_bundle::ServiceBundle; #[derive(Default)] pub struct ServiceManager { @@ -31,9 +32,12 @@ impl ServiceManager { service: Box::new(service), }); } +} + +impl Manager for ServiceManager { + type Running = RunningServiceCollection; - #[must_use] - pub fn start(self, cancellation_token: CancellationToken) -> RunningServiceCollection { + fn start(self, cancellation_token: CancellationToken) -> RunningServiceCollection { RunningServiceCollection::start(self.services, cancellation_token) } } diff --git a/trzcina/tests/coordinates_via_shared_holder_between_two_services.rs b/trzcina-sendable-service/tests/coordinates_via_shared_holder_between_two_services.rs similarity index 88% rename from trzcina/tests/coordinates_via_shared_holder_between_two_services.rs rename to trzcina-sendable-service/tests/coordinates_via_shared_holder_between_two_services.rs index d7b0265..2ba3bbb 100644 --- a/trzcina/tests/coordinates_via_shared_holder_between_two_services.rs +++ b/trzcina-sendable-service/tests/coordinates_via_shared_holder_between_two_services.rs @@ -4,13 +4,16 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::sync::Notify; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; const PRODUCED_VALUE: u32 = 42; @@ -74,7 +77,9 @@ async fn coordinates_via_shared_holder_between_two_services() { let run_task = tokio::spawn(async move { manager .start(cancellation_token_for_run) - .run_to_completion(Duration::from_secs(1)) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }) .await }); diff --git a/trzcina/tests/register_bundle_propagates_error_from_bundle.rs b/trzcina-sendable-service/tests/register_bundle_propagates_error_from_bundle.rs similarity index 77% rename from trzcina/tests/register_bundle_propagates_error_from_bundle.rs rename to trzcina-sendable-service/tests/register_bundle_propagates_error_from_bundle.rs index a5b6e87..a667513 100644 --- a/trzcina/tests/register_bundle_propagates_error_from_bundle.rs +++ b/trzcina-sendable-service/tests/register_bundle_propagates_error_from_bundle.rs @@ -1,9 +1,9 @@ use anyhow::Result; use anyhow::anyhow; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceBundle; -use trzcina::ServiceManager; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceBundle; +use trzcina_sendable_service::ServiceManager; struct ErringBundle; diff --git a/trzcina/tests/register_bundle_runs_all_services_returned_by_bundle.rs b/trzcina-sendable-service/tests/register_bundle_runs_all_services_returned_by_bundle.rs similarity index 82% rename from trzcina/tests/register_bundle_runs_all_services_returned_by_bundle.rs rename to trzcina-sendable-service/tests/register_bundle_runs_all_services_returned_by_bundle.rs index 45a3f6d..3834e69 100644 --- a/trzcina/tests/register_bundle_runs_all_services_returned_by_bundle.rs +++ b/trzcina-sendable-service/tests/register_bundle_runs_all_services_returned_by_bundle.rs @@ -2,12 +2,15 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceBundle; -use trzcina::ServiceManager; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceBundle; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; struct BundleAndService { observation_tx: Option>, @@ -58,7 +61,9 @@ async fn runs_all_services_returned_by_bundle() { Duration::from_secs(5), manager .start(CancellationToken::new()) - .run_to_completion(Duration::from_secs(1)), + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), ) .await .unwrap() diff --git a/trzcina/tests/register_service_runs_registered_service.rs b/trzcina-sendable-service/tests/register_service_runs_registered_service.rs similarity index 75% rename from trzcina/tests/register_service_runs_registered_service.rs rename to trzcina-sendable-service/tests/register_service_runs_registered_service.rs index 688b127..538d627 100644 --- a/trzcina/tests/register_service_runs_registered_service.rs +++ b/trzcina-sendable-service/tests/register_service_runs_registered_service.rs @@ -2,11 +2,14 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; struct ObservableService { observation_tx: Option>, @@ -35,7 +38,9 @@ async fn runs_registered_service() { Duration::from_secs(5), manager .start(CancellationToken::new()) - .run_to_completion(Duration::from_secs(1)), + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), ) .await .unwrap() diff --git a/trzcina/tests/run_aborts_hung_service_after_shutdown_deadline.rs b/trzcina-sendable-service/tests/run_aborts_hung_service_after_shutdown_deadline.rs similarity index 77% rename from trzcina/tests/run_aborts_hung_service_after_shutdown_deadline.rs rename to trzcina-sendable-service/tests/run_aborts_hung_service_after_shutdown_deadline.rs index 619b9d0..78ff536 100644 --- a/trzcina/tests/run_aborts_hung_service_after_shutdown_deadline.rs +++ b/trzcina-sendable-service/tests/run_aborts_hung_service_after_shutdown_deadline.rs @@ -2,12 +2,15 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::task::yield_now; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct ConfiguredService { hang_ignoring_cancellation: bool, @@ -39,7 +42,9 @@ async fn aborts_hung_service_after_shutdown_deadline() { Duration::from_secs(5), manager .start(CancellationToken::new()) - .run_to_completion(Duration::from_millis(50)), + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_millis(50), + }), ) .await .unwrap(); diff --git a/trzcina/tests/run_aborts_hung_services_on_external_cancel.rs b/trzcina-sendable-service/tests/run_aborts_hung_services_on_external_cancel.rs similarity index 78% rename from trzcina/tests/run_aborts_hung_services_on_external_cancel.rs rename to trzcina-sendable-service/tests/run_aborts_hung_services_on_external_cancel.rs index ff27bb5..d25990b 100644 --- a/trzcina/tests/run_aborts_hung_services_on_external_cancel.rs +++ b/trzcina-sendable-service/tests/run_aborts_hung_services_on_external_cancel.rs @@ -2,12 +2,15 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::task::yield_now; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct CancellationIgnoringService; @@ -32,7 +35,9 @@ async fn aborts_hung_services_on_external_cancel() { let run_task = tokio::spawn(async move { manager .start(cancellation_token_for_run) - .run_to_completion(Duration::from_millis(50)) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_millis(50), + }) .await }); diff --git a/trzcina/tests/run_cancels_all_services_when_external_token_cancelled.rs b/trzcina-sendable-service/tests/run_cancels_all_services_when_external_token_cancelled.rs similarity index 82% rename from trzcina/tests/run_cancels_all_services_when_external_token_cancelled.rs rename to trzcina-sendable-service/tests/run_cancels_all_services_when_external_token_cancelled.rs index 2917849..7e5758c 100644 --- a/trzcina/tests/run_cancels_all_services_when_external_token_cancelled.rs +++ b/trzcina-sendable-service/tests/run_cancels_all_services_when_external_token_cancelled.rs @@ -2,12 +2,15 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct AwaitingService { observation_tx: Option>, @@ -42,7 +45,9 @@ async fn cancels_all_services_when_external_token_cancelled() { let run_task = tokio::spawn(async move { manager .start(cancellation_token_for_run) - .run_to_completion(Duration::from_secs(1)) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }) .await }); diff --git a/trzcina/tests/run_cancels_siblings_when_one_service_finishes_first.rs b/trzcina-sendable-service/tests/run_cancels_siblings_when_one_service_finishes_first.rs similarity index 82% rename from trzcina/tests/run_cancels_siblings_when_one_service_finishes_first.rs rename to trzcina-sendable-service/tests/run_cancels_siblings_when_one_service_finishes_first.rs index b4405bd..dfcc3d5 100644 --- a/trzcina/tests/run_cancels_siblings_when_one_service_finishes_first.rs +++ b/trzcina-sendable-service/tests/run_cancels_siblings_when_one_service_finishes_first.rs @@ -2,12 +2,15 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct ConfiguredService { finish_immediately: bool, @@ -50,7 +53,9 @@ async fn cancels_siblings_when_one_service_finishes_first() { Duration::from_secs(5), manager .start(CancellationToken::new()) - .run_to_completion(Duration::from_secs(1)), + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), ) .await .unwrap(); diff --git a/trzcina/tests/run_completes_immediately_when_no_services_registered.rs b/trzcina-sendable-service/tests/run_completes_immediately_when_no_services_registered.rs similarity index 57% rename from trzcina/tests/run_completes_immediately_when_no_services_registered.rs rename to trzcina-sendable-service/tests/run_completes_immediately_when_no_services_registered.rs index 45c0aea..5007e84 100644 --- a/trzcina/tests/run_completes_immediately_when_no_services_registered.rs +++ b/trzcina-sendable-service/tests/run_completes_immediately_when_no_services_registered.rs @@ -1,8 +1,11 @@ use std::time::Duration; -use trzcina::ServiceManager; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; #[tokio::test] async fn completes_immediately_when_no_services_registered() { @@ -11,7 +14,9 @@ async fn completes_immediately_when_no_services_registered() { Duration::from_secs(5), manager .start(CancellationToken::new()) - .run_to_completion(Duration::from_secs(1)), + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), ) .await .unwrap() diff --git a/trzcina/tests/run_completes_when_all_services_finish_simultaneously.rs b/trzcina-sendable-service/tests/run_completes_when_all_services_finish_simultaneously.rs similarity index 70% rename from trzcina/tests/run_completes_when_all_services_finish_simultaneously.rs rename to trzcina-sendable-service/tests/run_completes_when_all_services_finish_simultaneously.rs index 7039f04..bb13cdd 100644 --- a/trzcina/tests/run_completes_when_all_services_finish_simultaneously.rs +++ b/trzcina-sendable-service/tests/run_completes_when_all_services_finish_simultaneously.rs @@ -2,11 +2,14 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct InstantOkService; @@ -28,7 +31,9 @@ async fn completes_when_all_services_finish_simultaneously() { Duration::from_secs(5), manager .start(CancellationToken::new()) - .run_to_completion(Duration::from_secs(1)), + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), ) .await .unwrap(); diff --git a/trzcina-sendable-service/tests/run_leaves_external_token_uncancelled_after_error_exit.rs b/trzcina-sendable-service/tests/run_leaves_external_token_uncancelled_after_error_exit.rs new file mode 100644 index 0000000..3731b92 --- /dev/null +++ b/trzcina-sendable-service/tests/run_leaves_external_token_uncancelled_after_error_exit.rs @@ -0,0 +1,37 @@ +use std::time::Duration; + +use anyhow::Result; +use anyhow::anyhow; +use async_trait::async_trait; +use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; + +struct ImmediatelyErroringService; + +#[async_trait] +impl Service for ImmediatelyErroringService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + Err(anyhow!("service failed")) + } +} + +#[tokio::test] +async fn leaves_external_token_uncancelled_after_error_exit() { + let external_token = CancellationToken::new(); + let mut manager = ServiceManager::default(); + manager.register_service(ImmediatelyErroringService); + + let _ = manager + .start(external_token.clone()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }) + .await + .into_result(); + + assert!(!external_token.is_cancelled()); +} diff --git a/trzcina-sendable-service/tests/run_leaves_external_token_uncancelled_after_normal_exit.rs b/trzcina-sendable-service/tests/run_leaves_external_token_uncancelled_after_normal_exit.rs new file mode 100644 index 0000000..eddaf67 --- /dev/null +++ b/trzcina-sendable-service/tests/run_leaves_external_token_uncancelled_after_normal_exit.rs @@ -0,0 +1,37 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; + +struct ImmediatelyExitingService; + +#[async_trait] +impl Service for ImmediatelyExitingService { + async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> { + Ok(()) + } +} + +#[tokio::test] +async fn leaves_external_token_uncancelled_after_normal_exit() { + let external_token = CancellationToken::new(); + let mut manager = ServiceManager::default(); + manager.register_service(ImmediatelyExitingService); + + manager + .start(external_token.clone()) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }) + .await + .into_result() + .unwrap(); + + assert!(!external_token.is_cancelled()); +} diff --git a/trzcina/tests/run_records_all_failures_when_multiple_services_error.rs b/trzcina-sendable-service/tests/run_records_all_failures_when_multiple_services_error.rs similarity index 85% rename from trzcina/tests/run_records_all_failures_when_multiple_services_error.rs rename to trzcina-sendable-service/tests/run_records_all_failures_when_multiple_services_error.rs index c966b92..c4202d1 100644 --- a/trzcina/tests/run_records_all_failures_when_multiple_services_error.rs +++ b/trzcina-sendable-service/tests/run_records_all_failures_when_multiple_services_error.rs @@ -3,12 +3,15 @@ use std::time::Duration; use anyhow::Result; use anyhow::anyhow; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct ConfiguredService { return_err: bool, @@ -53,7 +56,9 @@ async fn records_all_failures_when_multiple_services_error() { Duration::from_secs(5), manager .start(CancellationToken::new()) - .run_to_completion(Duration::from_secs(1)), + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), ) .await .unwrap(); diff --git a/trzcina/tests/run_records_non_string_panic_payload.rs b/trzcina-sendable-service/tests/run_records_non_string_panic_payload.rs similarity index 70% rename from trzcina/tests/run_records_non_string_panic_payload.rs rename to trzcina-sendable-service/tests/run_records_non_string_panic_payload.rs index 3f3c77b..35703a6 100644 --- a/trzcina/tests/run_records_non_string_panic_payload.rs +++ b/trzcina-sendable-service/tests/run_records_non_string_panic_payload.rs @@ -3,11 +3,14 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct NonStringPanickingService; @@ -27,7 +30,9 @@ async fn records_non_string_panic_payload_as_generic_message() { Duration::from_secs(5), manager .start(CancellationToken::new()) - .run_to_completion(Duration::from_secs(1)), + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), ) .await .unwrap(); diff --git a/trzcina/tests/run_records_service_error_and_cancels_siblings.rs b/trzcina-sendable-service/tests/run_records_service_error_and_cancels_siblings.rs similarity index 83% rename from trzcina/tests/run_records_service_error_and_cancels_siblings.rs rename to trzcina-sendable-service/tests/run_records_service_error_and_cancels_siblings.rs index 18f82d5..2b9922d 100644 --- a/trzcina/tests/run_records_service_error_and_cancels_siblings.rs +++ b/trzcina-sendable-service/tests/run_records_service_error_and_cancels_siblings.rs @@ -3,12 +3,15 @@ use std::time::Duration; use anyhow::Result; use anyhow::anyhow; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct ConfiguredService { return_err: bool, @@ -51,7 +54,9 @@ async fn records_service_error_and_cancels_siblings() { Duration::from_secs(5), manager .start(CancellationToken::new()) - .run_to_completion(Duration::from_secs(1)), + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), ) .await .unwrap(); diff --git a/trzcina/tests/run_records_service_panic_and_cancels_siblings.rs b/trzcina-sendable-service/tests/run_records_service_panic_and_cancels_siblings.rs similarity index 85% rename from trzcina/tests/run_records_service_panic_and_cancels_siblings.rs rename to trzcina-sendable-service/tests/run_records_service_panic_and_cancels_siblings.rs index 4893792..2a9316d 100644 --- a/trzcina/tests/run_records_service_panic_and_cancels_siblings.rs +++ b/trzcina-sendable-service/tests/run_records_service_panic_and_cancels_siblings.rs @@ -2,12 +2,15 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; const PANIC_MARKER: &str = "deliberately panicking for cascade test"; @@ -52,7 +55,9 @@ async fn records_service_panic_and_cancels_siblings() { Duration::from_secs(5), manager .start(CancellationToken::new()) - .run_to_completion(Duration::from_secs(1)), + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), ) .await .unwrap(); diff --git a/trzcina/tests/run_records_string_literal_panic_payload.rs b/trzcina-sendable-service/tests/run_records_string_literal_panic_payload.rs similarity index 74% rename from trzcina/tests/run_records_string_literal_panic_payload.rs rename to trzcina-sendable-service/tests/run_records_string_literal_panic_payload.rs index 56ce31a..06bedeb 100644 --- a/trzcina/tests/run_records_string_literal_panic_payload.rs +++ b/trzcina-sendable-service/tests/run_records_string_literal_panic_payload.rs @@ -2,11 +2,14 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; const PANIC_LITERAL: &str = "deliberately panicking with a string literal"; @@ -28,7 +31,9 @@ async fn records_string_literal_panic_payload() { Duration::from_secs(5), manager .start(CancellationToken::new()) - .run_to_completion(Duration::from_secs(1)), + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }), ) .await .unwrap(); diff --git a/trzcina/tests/run_reports_leaked_beyond_abort_deadline_when_service_ignores_abort.rs b/trzcina-sendable-service/tests/run_reports_leaked_beyond_abort_deadline_when_service_ignores_abort.rs similarity index 78% rename from trzcina/tests/run_reports_leaked_beyond_abort_deadline_when_service_ignores_abort.rs rename to trzcina-sendable-service/tests/run_reports_leaked_beyond_abort_deadline_when_service_ignores_abort.rs index 296a8fc..ff61062 100644 --- a/trzcina/tests/run_reports_leaked_beyond_abort_deadline_when_service_ignores_abort.rs +++ b/trzcina-sendable-service/tests/run_reports_leaked_beyond_abort_deadline_when_service_ignores_abort.rs @@ -2,11 +2,14 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct ThreadBlockingService { block_duration: Duration, @@ -33,7 +36,9 @@ async fn reports_leaked_beyond_abort_deadline_when_service_ignores_abort() { let run_task = tokio::spawn(async move { manager .start(cancellation_token_for_run) - .run_to_completion(Duration::from_millis(50)) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_millis(50), + }) .await }); diff --git a/trzcina/tests/supports_actix_style_shutdown_signal_pattern.rs b/trzcina-sendable-service/tests/supports_actix_style_shutdown_signal_pattern.rs similarity index 80% rename from trzcina/tests/supports_actix_style_shutdown_signal_pattern.rs rename to trzcina-sendable-service/tests/supports_actix_style_shutdown_signal_pattern.rs index 388a545..37f3dbe 100644 --- a/trzcina/tests/supports_actix_style_shutdown_signal_pattern.rs +++ b/trzcina-sendable-service/tests/supports_actix_style_shutdown_signal_pattern.rs @@ -2,12 +2,15 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct ActixStyleService { started_tx: Option>, @@ -43,7 +46,9 @@ async fn supports_actix_style_shutdown_signal_pattern() { let run_task = tokio::spawn(async move { manager .start(cancellation_token_for_run) - .run_to_completion(Duration::from_secs(1)) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }) .await }); diff --git a/trzcina/tests/supports_internal_retry_loop_pattern.rs b/trzcina-sendable-service/tests/supports_internal_retry_loop_pattern.rs similarity index 81% rename from trzcina/tests/supports_internal_retry_loop_pattern.rs rename to trzcina-sendable-service/tests/supports_internal_retry_loop_pattern.rs index d5ebbce..2c5e46a 100644 --- a/trzcina/tests/supports_internal_retry_loop_pattern.rs +++ b/trzcina-sendable-service/tests/supports_internal_retry_loop_pattern.rs @@ -2,13 +2,16 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::sync::oneshot; use tokio::time::sleep; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct RetryLoopService { backoff_started_tx: Option>, @@ -43,7 +46,9 @@ async fn supports_internal_retry_loop_pattern() { let run_task = tokio::spawn(async move { manager .start(cancellation_token_for_run) - .run_to_completion(Duration::from_secs(1)) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }) .await }); diff --git a/trzcina/tests/supports_interval_ticker_reconciliation_pattern.rs b/trzcina-sendable-service/tests/supports_interval_ticker_reconciliation_pattern.rs similarity index 85% rename from trzcina/tests/supports_interval_ticker_reconciliation_pattern.rs rename to trzcina-sendable-service/tests/supports_interval_ticker_reconciliation_pattern.rs index cc18af0..7da277a 100644 --- a/trzcina/tests/supports_interval_ticker_reconciliation_pattern.rs +++ b/trzcina-sendable-service/tests/supports_interval_ticker_reconciliation_pattern.rs @@ -5,13 +5,16 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::sync::oneshot; use tokio::time::interval; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct ReconciliationService { first_tick_tx: Option>, @@ -54,7 +57,9 @@ async fn supports_interval_ticker_reconciliation_pattern() { let run_task = tokio::spawn(async move { manager .start(cancellation_token_for_run) - .run_to_completion(Duration::from_secs(1)) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }) .await }); diff --git a/trzcina/tests/supports_multi_channel_select_pump_pattern.rs b/trzcina-sendable-service/tests/supports_multi_channel_select_pump_pattern.rs similarity index 87% rename from trzcina/tests/supports_multi_channel_select_pump_pattern.rs rename to trzcina-sendable-service/tests/supports_multi_channel_select_pump_pattern.rs index 13150e5..cfd4f90 100644 --- a/trzcina/tests/supports_multi_channel_select_pump_pattern.rs +++ b/trzcina-sendable-service/tests/supports_multi_channel_select_pump_pattern.rs @@ -2,13 +2,16 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct MultiChannelPumpService { primary_observed_tx: Option>, @@ -58,7 +61,9 @@ async fn supports_multi_channel_select_pump_pattern() { let run_task = tokio::spawn(async move { manager .start(cancellation_token_for_run) - .run_to_completion(Duration::from_secs(1)) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }) .await }); diff --git a/trzcina/tests/supports_mutable_internal_state_across_iterations.rs b/trzcina-sendable-service/tests/supports_mutable_internal_state_across_iterations.rs similarity index 86% rename from trzcina/tests/supports_mutable_internal_state_across_iterations.rs rename to trzcina-sendable-service/tests/supports_mutable_internal_state_across_iterations.rs index bdc87ac..9c6b214 100644 --- a/trzcina/tests/supports_mutable_internal_state_across_iterations.rs +++ b/trzcina-sendable-service/tests/supports_mutable_internal_state_across_iterations.rs @@ -4,13 +4,16 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::sync::Notify; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct StatefulService { iteration_count: usize, @@ -53,7 +56,9 @@ async fn supports_mutable_internal_state_across_iterations() { let run_task = tokio::spawn(async move { manager .start(cancellation_token_for_run) - .run_to_completion(Duration::from_secs(1)) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }) .await }); diff --git a/trzcina/tests/supports_notify_driven_event_loop_pattern.rs b/trzcina-sendable-service/tests/supports_notify_driven_event_loop_pattern.rs similarity index 84% rename from trzcina/tests/supports_notify_driven_event_loop_pattern.rs rename to trzcina-sendable-service/tests/supports_notify_driven_event_loop_pattern.rs index b9a4f8a..4437081 100644 --- a/trzcina/tests/supports_notify_driven_event_loop_pattern.rs +++ b/trzcina-sendable-service/tests/supports_notify_driven_event_loop_pattern.rs @@ -4,13 +4,16 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use trzcina::Service; -use trzcina::ServiceManager; -use trzcina::ServiceShutdownOutcome; use tokio::sync::Notify; use tokio::sync::oneshot; use tokio::time::timeout; use tokio_util::sync::CancellationToken; +use trzcina_sendable_service::Service; +use trzcina_sendable_service::ServiceManager; +use trzcina_service::Manager; +use trzcina_service::RunToCompletionOptions; +use trzcina_service::RunningCollection; +use trzcina_service::ServiceShutdownOutcome; struct NotifyDrivenService { notify: Arc, @@ -50,7 +53,9 @@ async fn supports_notify_driven_event_loop_pattern() { let run_task = tokio::spawn(async move { manager .start(cancellation_token_for_run) - .run_to_completion(Duration::from_secs(1)) + .run_to_completion(RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }) .await }); diff --git a/trzcina-service/Cargo.toml b/trzcina-service/Cargo.toml new file mode 100644 index 0000000..415c0a9 --- /dev/null +++ b/trzcina-service/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "trzcina-service" +version = "0.2.0" +edition = "2024" +license = "Apache-2.0" +description = "Shared primitives for trzcina: service lifecycle outcomes, errors, and shutdown machinery." +repository = "https://github.com/intentee/trzcina" +homepage = "https://github.com/intentee/trzcina" +readme = "../README.md" +authors = ["Intentee"] +keywords = ["async", "service", "lifecycle", "cancellation", "tokio"] +categories = ["asynchronous", "concurrency"] + +[dependencies] +anyhow = { workspace = true } +futures-util = { workspace = true } +log = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } + +[lints] +workspace = true diff --git a/trzcina-service/src/lib.rs b/trzcina-service/src/lib.rs new file mode 100644 index 0000000..e5ede9f --- /dev/null +++ b/trzcina-service/src/lib.rs @@ -0,0 +1,23 @@ +mod manager; +mod run_to_completion_options; +mod running_collection; +mod running_service; +mod service_outcome_classifier; +mod service_shutdown_error; +mod service_shutdown_outcome; +mod service_shutdown_outcome_collection; +mod service_shutdown_outcome_with_service_name; +mod service_task_drainer; +mod sibling_cancellation_guard; + +pub use crate::manager::Manager; +pub use crate::run_to_completion_options::RunToCompletionOptions; +pub use crate::running_collection::RunningCollection; +pub use crate::running_service::RunningService; +pub use crate::service_outcome_classifier::classify_future_outcome; +pub use crate::service_shutdown_error::ServiceShutdownError; +pub use crate::service_shutdown_outcome::ServiceShutdownOutcome; +pub use crate::service_shutdown_outcome_collection::ServiceShutdownOutcomeCollection; +pub use crate::service_shutdown_outcome_with_service_name::ServiceShutdownOutcomeWithServiceName; +pub use crate::service_task_drainer::drain_to_completion; +pub use crate::sibling_cancellation_guard::SiblingCancellationGuard; diff --git a/trzcina-service/src/manager.rs b/trzcina-service/src/manager.rs new file mode 100644 index 0000000..a1c43d3 --- /dev/null +++ b/trzcina-service/src/manager.rs @@ -0,0 +1,9 @@ +use tokio_util::sync::CancellationToken; + +use crate::running_collection::RunningCollection; + +pub trait Manager: Default { + type Running: RunningCollection; + + fn start(self, cancellation_token: CancellationToken) -> Self::Running; +} diff --git a/trzcina-service/src/run_to_completion_options.rs b/trzcina-service/src/run_to_completion_options.rs new file mode 100644 index 0000000..8897514 --- /dev/null +++ b/trzcina-service/src/run_to_completion_options.rs @@ -0,0 +1,13 @@ +use std::time::Duration; + +pub struct RunToCompletionOptions { + pub shutdown_deadline: Duration, +} + +impl Default for RunToCompletionOptions { + fn default() -> Self { + Self { + shutdown_deadline: Duration::from_secs(10), + } + } +} diff --git a/trzcina-service/src/running_collection.rs b/trzcina-service/src/running_collection.rs new file mode 100644 index 0000000..161fc4f --- /dev/null +++ b/trzcina-service/src/running_collection.rs @@ -0,0 +1,11 @@ +use std::future::Future; + +use crate::run_to_completion_options::RunToCompletionOptions; +use crate::service_shutdown_outcome_collection::ServiceShutdownOutcomeCollection; + +pub trait RunningCollection { + fn run_to_completion( + self, + options: RunToCompletionOptions, + ) -> impl Future; +} diff --git a/trzcina/src/running_service.rs b/trzcina-service/src/running_service.rs similarity index 100% rename from trzcina/src/running_service.rs rename to trzcina-service/src/running_service.rs diff --git a/trzcina-service/src/service_outcome_classifier.rs b/trzcina-service/src/service_outcome_classifier.rs new file mode 100644 index 0000000..2c913ce --- /dev/null +++ b/trzcina-service/src/service_outcome_classifier.rs @@ -0,0 +1,46 @@ +use std::any::Any; +use std::future::Future; +use std::panic::AssertUnwindSafe; + +use futures_util::FutureExt; +use log::error; +use log::info; + +use crate::service_shutdown_outcome::ServiceShutdownOutcome; + +fn panic_payload_to_string(panic_payload: Box) -> String { + if let Some(static_str_message) = panic_payload.downcast_ref::<&'static str>() { + return (*static_str_message).to_owned(); + } + if let Ok(boxed_message) = panic_payload.downcast::() { + return *boxed_message; + } + "non-string panic payload".to_owned() +} + +pub async fn classify_future_outcome( + service_name: &'static str, + run_future: TServiceFuture, +) -> ServiceShutdownOutcome +where + TServiceFuture: Future>, +{ + info!("Service {service_name:?} starting"); + let panic_caught_outcome = AssertUnwindSafe(run_future).catch_unwind().await; + + match panic_caught_outcome { + Ok(Ok(())) => { + info!("Service {service_name:?} stopped"); + ServiceShutdownOutcome::Completed + } + Ok(Err(service_error)) => { + error!("Service {service_name:?} error: {service_error:#?}"); + ServiceShutdownOutcome::Errored(service_error) + } + Err(panic_payload) => { + let panic_message = panic_payload_to_string(panic_payload); + error!("Service {service_name:?} panicked: {panic_message}"); + ServiceShutdownOutcome::Panicked(panic_message) + } + } +} diff --git a/trzcina-service/src/service_shutdown_error.rs b/trzcina-service/src/service_shutdown_error.rs new file mode 100644 index 0000000..ffc8c18 --- /dev/null +++ b/trzcina-service/src/service_shutdown_error.rs @@ -0,0 +1,55 @@ +use std::error::Error; +use std::fmt; + +use crate::service_shutdown_outcome::ServiceShutdownOutcome; +use crate::service_shutdown_outcome_with_service_name::ServiceShutdownOutcomeWithServiceName; + +fn build_failure_line(entry: &ServiceShutdownOutcomeWithServiceName) -> Option { + let ServiceShutdownOutcomeWithServiceName { name, outcome } = entry; + match outcome { + ServiceShutdownOutcome::Completed => None, + ServiceShutdownOutcome::Errored(service_error) => { + Some(format!(" service {name:?} errored: {service_error:#}\n")) + } + ServiceShutdownOutcome::Panicked(panic_message) => { + Some(format!(" service {name:?} panicked: {panic_message}\n")) + } + ServiceShutdownOutcome::AbortedByShutdownDeadline => Some(format!( + " service {name:?} aborted after shutdown deadline\n" + )), + ServiceShutdownOutcome::LeakedBeyondAbortDeadline => Some(format!( + " service {name:?} leaked beyond shutdown deadline\n" + )), + } +} + +#[derive(Debug)] +pub struct ServiceShutdownError { + failed_outcomes: Vec, +} + +impl ServiceShutdownError { + #[must_use] + pub fn new(failed_outcomes: Vec) -> Self { + Self { failed_outcomes } + } + + #[must_use] + pub fn failed_outcomes(&self) -> &[ServiceShutdownOutcomeWithServiceName] { + &self.failed_outcomes + } +} + +impl fmt::Display for ServiceShutdownError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("service shutdown failed:\n")?; + for entry in &self.failed_outcomes { + if let Some(line) = build_failure_line(entry) { + f.write_str(&line)?; + } + } + Ok(()) + } +} + +impl Error for ServiceShutdownError {} diff --git a/trzcina/src/service_shutdown_outcome.rs b/trzcina-service/src/service_shutdown_outcome.rs similarity index 100% rename from trzcina/src/service_shutdown_outcome.rs rename to trzcina-service/src/service_shutdown_outcome.rs diff --git a/trzcina/src/service_shutdown_outcome_collection.rs b/trzcina-service/src/service_shutdown_outcome_collection.rs similarity index 100% rename from trzcina/src/service_shutdown_outcome_collection.rs rename to trzcina-service/src/service_shutdown_outcome_collection.rs diff --git a/trzcina/src/service_shutdown_outcome_with_service_name.rs b/trzcina-service/src/service_shutdown_outcome_with_service_name.rs similarity index 100% rename from trzcina/src/service_shutdown_outcome_with_service_name.rs rename to trzcina-service/src/service_shutdown_outcome_with_service_name.rs diff --git a/trzcina-service/src/service_task_drainer.rs b/trzcina-service/src/service_task_drainer.rs new file mode 100644 index 0000000..2da8c87 --- /dev/null +++ b/trzcina-service/src/service_task_drainer.rs @@ -0,0 +1,56 @@ +use std::time::Duration; + +use log::error; +use log::info; +use tokio::task::JoinSet; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; + +async fn wait_for_shutdown_signal( + cancellation_token: &CancellationToken, + has_running_services: bool, +) { + if !has_running_services { + return; + } + + cancellation_token.cancelled().await; + info!("Service is shutting down"); +} + +async fn drain_within_deadline(task_set: &mut JoinSet<()>, deadline: Duration) -> bool { + timeout(deadline, async { + while task_set.join_next().await.is_some() {} + }) + .await + .is_ok() +} + +async fn abort_and_drain(task_set: &mut JoinSet<()>, abort_deadline: Duration) { + error!("Shutdown deadline exceeded; aborting remaining services"); + task_set.abort_all(); + + let abort_drain_result = timeout(abort_deadline, async { + while task_set.join_next().await.is_some() {} + }) + .await; + + if abort_drain_result.is_err() { + error!( + "Abort drain exceeded {abort_deadline:?}; one or more services ignored the abort signal and are leaked beyond the manager's lifetime", + ); + } +} + +pub async fn drain_to_completion( + task_set: &mut JoinSet<()>, + cancellation_token: &CancellationToken, + has_running_services: bool, + shutdown_deadline: Duration, +) { + wait_for_shutdown_signal(cancellation_token, has_running_services).await; + + if !drain_within_deadline(task_set, shutdown_deadline).await { + abort_and_drain(task_set, shutdown_deadline).await; + } +} diff --git a/trzcina/src/sibling_cancellation_guard.rs b/trzcina-service/src/sibling_cancellation_guard.rs similarity index 100% rename from trzcina/src/sibling_cancellation_guard.rs rename to trzcina-service/src/sibling_cancellation_guard.rs diff --git a/trzcina-service/tests/display_propagates_writer_errors.rs b/trzcina-service/tests/display_propagates_writer_errors.rs new file mode 100644 index 0000000..edcdfe7 --- /dev/null +++ b/trzcina-service/tests/display_propagates_writer_errors.rs @@ -0,0 +1,52 @@ +use std::fmt; +use std::fmt::Write; + +use anyhow::anyhow; +use trzcina_service::ServiceShutdownError; +use trzcina_service::ServiceShutdownOutcome; +use trzcina_service::ServiceShutdownOutcomeWithServiceName; + +struct WriterFailingAfter { + remaining_successful_calls: usize, +} + +impl WriterFailingAfter { + fn new(successful_calls_before_failure: usize) -> Self { + Self { + remaining_successful_calls: successful_calls_before_failure, + } + } +} + +impl Write for WriterFailingAfter { + fn write_str(&mut self, _payload: &str) -> fmt::Result { + if self.remaining_successful_calls == 0 { + return Err(fmt::Error); + } + self.remaining_successful_calls -= 1; + Ok(()) + } +} + +fn build_error_with_one_errored_service() -> ServiceShutdownError { + ServiceShutdownError::new(vec![ServiceShutdownOutcomeWithServiceName { + name: "errored_service", + outcome: ServiceShutdownOutcome::Errored(anyhow!("service failed")), + }]) +} + +#[test] +fn display_propagates_header_write_failure() { + let error = build_error_with_one_errored_service(); + let mut writer = WriterFailingAfter::new(0); + let format_result = write!(writer, "{error}"); + assert!(format_result.is_err()); +} + +#[test] +fn display_propagates_body_write_failure() { + let error = build_error_with_one_errored_service(); + let mut writer = WriterFailingAfter::new(1); + let format_result = write!(writer, "{error}"); + assert!(format_result.is_err()); +} diff --git a/trzcina-service/tests/run_to_completion_options_default_uses_ten_second_deadline.rs b/trzcina-service/tests/run_to_completion_options_default_uses_ten_second_deadline.rs new file mode 100644 index 0000000..a27b157 --- /dev/null +++ b/trzcina-service/tests/run_to_completion_options_default_uses_ten_second_deadline.rs @@ -0,0 +1,9 @@ +use std::time::Duration; + +use trzcina_service::RunToCompletionOptions; + +#[test] +fn default_uses_ten_second_deadline() { + let options = RunToCompletionOptions::default(); + assert_eq!(options.shutdown_deadline, Duration::from_secs(10)); +} diff --git a/trzcina/tests/service_shutdown_error_display_formats_all_failure_variants.rs b/trzcina-service/tests/service_shutdown_error_display_formats_all_failure_variants.rs similarity index 88% rename from trzcina/tests/service_shutdown_error_display_formats_all_failure_variants.rs rename to trzcina-service/tests/service_shutdown_error_display_formats_all_failure_variants.rs index 008012c..9cd08a1 100644 --- a/trzcina/tests/service_shutdown_error_display_formats_all_failure_variants.rs +++ b/trzcina-service/tests/service_shutdown_error_display_formats_all_failure_variants.rs @@ -1,7 +1,7 @@ use anyhow::anyhow; -use trzcina::ServiceShutdownError; -use trzcina::ServiceShutdownOutcome; -use trzcina::ServiceShutdownOutcomeWithServiceName; +use trzcina_service::ServiceShutdownError; +use trzcina_service::ServiceShutdownOutcome; +use trzcina_service::ServiceShutdownOutcomeWithServiceName; #[test] fn display_formats_all_failure_variants() { diff --git a/trzcina/Cargo.toml b/trzcina/Cargo.toml index 22b0965..d7cd71d 100644 --- a/trzcina/Cargo.toml +++ b/trzcina/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "trzcina" -version = "0.1.0" +version = "0.2.0" edition = "2024" license = "Apache-2.0" description = "Async service lifecycle orchestration with cooperative cancellation and shutdown deadlines." @@ -12,10 +12,13 @@ keywords = ["async", "service", "lifecycle", "cancellation", "tokio"] categories = ["asynchronous", "concurrency"] [dependencies] +trzcina-local-service = { version = "0.2.0", path = "../trzcina-local-service" } +trzcina-sendable-service = { version = "0.2.0", path = "../trzcina-sendable-service" } +trzcina-service = { version = "0.2.0", path = "../trzcina-service" } + +[dev-dependencies] anyhow = { workspace = true } async-trait = { workspace = true } -futures-util = { workspace = true } -log = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } diff --git a/trzcina/src/lib.rs b/trzcina/src/lib.rs index 5f0f6f2..005921f 100644 --- a/trzcina/src/lib.rs +++ b/trzcina/src/lib.rs @@ -1,20 +1,17 @@ -mod registered_service; -mod running_service; -mod running_service_collection; -mod service; -mod service_bundle; -mod service_manager; -mod service_shutdown_error; -mod service_shutdown_outcome; -mod service_shutdown_outcome_collection; -mod service_shutdown_outcome_with_service_name; -mod sibling_cancellation_guard; - -pub use crate::running_service_collection::RunningServiceCollection; -pub use crate::service::Service; -pub use crate::service_bundle::ServiceBundle; -pub use crate::service_manager::ServiceManager; -pub use crate::service_shutdown_error::ServiceShutdownError; -pub use crate::service_shutdown_outcome::ServiceShutdownOutcome; -pub use crate::service_shutdown_outcome_collection::ServiceShutdownOutcomeCollection; -pub use crate::service_shutdown_outcome_with_service_name::ServiceShutdownOutcomeWithServiceName; +pub use trzcina_local_service::LocalRegisteredService; +pub use trzcina_local_service::LocalRunningServiceCollection; +pub use trzcina_local_service::LocalService; +pub use trzcina_local_service::LocalServiceBundle; +pub use trzcina_local_service::LocalServiceManager; +pub use trzcina_sendable_service::RegisteredService; +pub use trzcina_sendable_service::RunningServiceCollection; +pub use trzcina_sendable_service::Service; +pub use trzcina_sendable_service::ServiceBundle; +pub use trzcina_sendable_service::ServiceManager; +pub use trzcina_service::Manager; +pub use trzcina_service::RunToCompletionOptions; +pub use trzcina_service::RunningCollection; +pub use trzcina_service::ServiceShutdownError; +pub use trzcina_service::ServiceShutdownOutcome; +pub use trzcina_service::ServiceShutdownOutcomeCollection; +pub use trzcina_service::ServiceShutdownOutcomeWithServiceName; diff --git a/trzcina/src/running_service_collection.rs b/trzcina/src/running_service_collection.rs deleted file mode 100644 index 6a3f915..0000000 --- a/trzcina/src/running_service_collection.rs +++ /dev/null @@ -1,153 +0,0 @@ -use std::any::Any; -use std::panic::AssertUnwindSafe; -use std::time::Duration; - -use futures_util::FutureExt; -use log::error; -use log::info; -use tokio::sync::oneshot; -use tokio::task::JoinSet; -use tokio::time::timeout; -use tokio_util::sync::CancellationToken; - -use crate::registered_service::RegisteredService; -use crate::running_service::RunningService; -use crate::service::Service; -use crate::service_shutdown_outcome::ServiceShutdownOutcome; -use crate::service_shutdown_outcome_collection::ServiceShutdownOutcomeCollection; -use crate::service_shutdown_outcome_with_service_name::ServiceShutdownOutcomeWithServiceName; -use crate::sibling_cancellation_guard::SiblingCancellationGuard; - -fn panic_payload_to_string(panic_payload: Box) -> String { - if let Some(static_str_message) = panic_payload.downcast_ref::<&'static str>() { - return (*static_str_message).to_owned(); - } - if let Ok(boxed_message) = panic_payload.downcast::() { - return *boxed_message; - } - "non-string panic payload".to_owned() -} - -async fn run_service_with_sibling_cancellation_on_return( - service_name: &'static str, - mut service: Box, - cancellation_token: CancellationToken, -) -> ServiceShutdownOutcome { - let _sibling_cancellation_guard = SiblingCancellationGuard::new(cancellation_token.clone()); - classify_service_outcome(service_name, &mut service, cancellation_token).await -} - -async fn classify_service_outcome( - service_name: &'static str, - service: &mut Box, - cancellation_token: CancellationToken, -) -> ServiceShutdownOutcome { - info!("Service {service_name:?} starting"); - let panic_caught_outcome = AssertUnwindSafe(service.run(cancellation_token)) - .catch_unwind() - .await; - - match panic_caught_outcome { - Ok(Ok(())) => { - info!("Service {service_name:?} stopped"); - ServiceShutdownOutcome::Completed - } - Ok(Err(service_error)) => { - error!("Service {service_name:?} error: {service_error:#?}"); - ServiceShutdownOutcome::Errored(service_error) - } - Err(panic_payload) => { - let panic_message = panic_payload_to_string(panic_payload); - error!("Service {service_name:?} panicked: {panic_message}"); - ServiceShutdownOutcome::Panicked(panic_message) - } - } -} - -pub struct RunningServiceCollection { - cancellation_token: CancellationToken, - running_services: Vec, - task_set: JoinSet<()>, -} - -impl RunningServiceCollection { - pub(crate) fn start( - registered: Vec, - cancellation_token: CancellationToken, - ) -> Self { - let mut running_services: Vec = Vec::with_capacity(registered.len()); - let mut task_set: JoinSet<()> = JoinSet::new(); - - for RegisteredService { name, service } in registered { - let (outcome_sender, outcome_receiver) = oneshot::channel::(); - let service_cancellation_token = cancellation_token.clone(); - - task_set.spawn(async move { - let outcome = run_service_with_sibling_cancellation_on_return( - name, - service, - service_cancellation_token, - ) - .await; - let _ = outcome_sender.send(outcome); - }); - - running_services.push(RunningService::new(name, outcome_receiver)); - } - - Self { - cancellation_token, - running_services, - task_set, - } - } - - pub async fn run_to_completion( - mut self, - shutdown_deadline: Duration, - ) -> ServiceShutdownOutcomeCollection { - self.wait_for_shutdown_signal().await; - - if !self.drain_within_deadline(shutdown_deadline).await { - self.abort_and_drain(shutdown_deadline).await; - } - - let outcomes: Vec = - self.running_services.into_iter().map(Into::into).collect(); - - ServiceShutdownOutcomeCollection::new(outcomes) - } - - async fn wait_for_shutdown_signal(&self) { - if self.running_services.is_empty() { - return; - } - - self.cancellation_token.cancelled().await; - info!("Service is shutting down"); - } - - async fn drain_within_deadline(&mut self, deadline: Duration) -> bool { - timeout(deadline, async { - while self.task_set.join_next().await.is_some() {} - }) - .await - .is_ok() - } - - async fn abort_and_drain(&mut self, abort_deadline: Duration) { - error!("Shutdown deadline exceeded; aborting remaining services"); - self.task_set.abort_all(); - - let abort_drain_result = timeout(abort_deadline, async { - while self.task_set.join_next().await.is_some() {} - }) - .await; - - if abort_drain_result.is_err() { - error!( - "Abort drain exceeded {abort_deadline:?}; one or more services ignored the abort signal and are leaked beyond the manager's lifetime", - ); - } - } -} diff --git a/trzcina/src/service_shutdown_error.rs b/trzcina/src/service_shutdown_error.rs deleted file mode 100644 index fedc81a..0000000 --- a/trzcina/src/service_shutdown_error.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::error::Error; -use std::fmt; - -use crate::service_shutdown_outcome::ServiceShutdownOutcome; -use crate::service_shutdown_outcome_with_service_name::ServiceShutdownOutcomeWithServiceName; - -#[derive(Debug)] -pub struct ServiceShutdownError { - failed_outcomes: Vec, -} - -impl ServiceShutdownError { - #[must_use] - pub fn new(failed_outcomes: Vec) -> Self { - Self { failed_outcomes } - } - - #[must_use] - pub fn failed_outcomes(&self) -> &[ServiceShutdownOutcomeWithServiceName] { - &self.failed_outcomes - } -} - -impl fmt::Display for ServiceShutdownError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - writeln!(f, "service shutdown failed:")?; - - for ServiceShutdownOutcomeWithServiceName { name, outcome } in &self.failed_outcomes { - match outcome { - ServiceShutdownOutcome::Completed => {} - ServiceShutdownOutcome::Errored(service_error) => { - writeln!(f, " service {name:?} errored: {service_error:#}")?; - } - ServiceShutdownOutcome::Panicked(panic_message) => { - writeln!(f, " service {name:?} panicked: {panic_message}")?; - } - ServiceShutdownOutcome::AbortedByShutdownDeadline => { - writeln!(f, " service {name:?} aborted after shutdown deadline")?; - } - ServiceShutdownOutcome::LeakedBeyondAbortDeadline => { - writeln!(f, " service {name:?} leaked beyond shutdown deadline")?; - } - } - } - - Ok(()) - } -} - -impl Error for ServiceShutdownError {} diff --git a/trzcina/tests/managers_implement_common_trait.rs b/trzcina/tests/managers_implement_common_trait.rs new file mode 100644 index 0000000..4fe6813 --- /dev/null +++ b/trzcina/tests/managers_implement_common_trait.rs @@ -0,0 +1,29 @@ +use std::time::Duration; + +use tokio_util::sync::CancellationToken; +use trzcina::LocalServiceManager; +use trzcina::Manager; +use trzcina::RunToCompletionOptions; +use trzcina::RunningCollection; +use trzcina::ServiceManager; + +async fn drive(manager: TManager) { + let running = Manager::start(manager, CancellationToken::new()); + let _ = RunningCollection::run_to_completion( + running, + RunToCompletionOptions { + shutdown_deadline: Duration::from_secs(1), + }, + ) + .await; +} + +#[tokio::test] +async fn sendable_service_manager_implements_manager() { + drive(ServiceManager::default()).await; +} + +#[tokio::test] +async fn local_service_manager_implements_manager() { + drive(LocalServiceManager::default()).await; +}