Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ coverage: node_modules
cargo llvm-cov report
npx @intentee/rust-coverage-check target/llvm-cov.json \
--workspace-root $(CURDIR) \
--gated trzcina=97
--gated trzcina=100

.PHONY: coverage-clean
coverage-clean:
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
# trzcina

Async service lifecycle orchestration for Rust. Run a set of long-lived async services concurrently, cancel siblings when one finishes, surface errors and panics through a typed outcome collection, and enforce an absolute shutdown deadline.
Async service lifecycle orchestration for Rust. Run a set of long-lived async services concurrently, cancel siblings when one finishes, surface errors and panics through a typed outcome collection, and enforce per-phase shutdown deadlines.

## Usage

```rust
use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use trzcina::{Service, ServiceManager};
use trzcina::{Service, ServiceManager, ServiceShutdownOptions};

struct EchoService;

Expand All @@ -29,9 +27,11 @@ async fn main() -> Result<()> {

let running = service_manager.start(CancellationToken::new());
running
.run_to_completion(Duration::from_secs(10))
.run_to_completion(ServiceShutdownOptions::default())
.await
.into_result()?;
Ok(())
}
```

`ServiceShutdownOptions` exposes two independently tunable deadlines that apply after the cancellation token fires: `cooperative_deadline` (how long services have to exit on their own) and `abort_deadline` (how long the tokio abort has to drain). Both default to 10 seconds.
2 changes: 2 additions & 0 deletions trzcina/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod service;
mod service_bundle;
mod service_manager;
mod service_shutdown_error;
mod service_shutdown_options;
mod service_shutdown_outcome;
mod service_shutdown_outcome_collection;
mod service_shutdown_outcome_with_service_name;
Expand All @@ -15,6 +16,7 @@ pub use crate::service::Service;
pub use crate::service_bundle::ServiceBundle;
pub use crate::service_manager::ServiceManager;
pub use crate::service_shutdown_error::ServiceShutdownError;
pub use crate::service_shutdown_options::ServiceShutdownOptions;
pub use crate::service_shutdown_outcome::ServiceShutdownOutcome;
pub use crate::service_shutdown_outcome_collection::ServiceShutdownOutcomeCollection;
pub use crate::service_shutdown_outcome_with_service_name::ServiceShutdownOutcomeWithServiceName;
10 changes: 7 additions & 3 deletions trzcina/src/running_service_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio_util::sync::CancellationToken;
use crate::registered_service::RegisteredService;
use crate::running_service::RunningService;
use crate::service::Service;
use crate::service_shutdown_options::ServiceShutdownOptions;
use crate::service_shutdown_outcome::ServiceShutdownOutcome;
use crate::service_shutdown_outcome_collection::ServiceShutdownOutcomeCollection;
use crate::service_shutdown_outcome_with_service_name::ServiceShutdownOutcomeWithServiceName;
Expand Down Expand Up @@ -104,12 +105,15 @@ impl RunningServiceCollection {

pub async fn run_to_completion(
mut self,
shutdown_deadline: Duration,
ServiceShutdownOptions {
cooperative_deadline,
abort_deadline,
}: ServiceShutdownOptions,
) -> ServiceShutdownOutcomeCollection {
self.wait_for_shutdown_signal().await;

if !self.drain_within_deadline(shutdown_deadline).await {
self.abort_and_drain(shutdown_deadline).await;
if !self.drain_within_deadline(cooperative_deadline).await {
self.abort_and_drain(abort_deadline).await;
}
Comment thread
mcharytoniuk marked this conversation as resolved.

let outcomes: Vec<ServiceShutdownOutcomeWithServiceName> =
Expand Down
14 changes: 7 additions & 7 deletions trzcina/src/service_shutdown_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ impl fmt::Display for ServiceShutdownError {

for ServiceShutdownOutcomeWithServiceName { name, outcome } in &self.failed_outcomes {
match outcome {
ServiceShutdownOutcome::Completed => {}
ServiceShutdownOutcome::Completed => Ok(()),
ServiceShutdownOutcome::Errored(service_error) => {
writeln!(f, " service {name:?} errored: {service_error:#}")?;
writeln!(f, " service {name:?} errored: {service_error:#}")
}
ServiceShutdownOutcome::Panicked(panic_message) => {
writeln!(f, " service {name:?} panicked: {panic_message}")?;
writeln!(f, " service {name:?} panicked: {panic_message}")
}
ServiceShutdownOutcome::AbortedByShutdownDeadline => {
writeln!(f, " service {name:?} aborted after shutdown deadline")?;
writeln!(f, " service {name:?} aborted after shutdown deadline")
}
ServiceShutdownOutcome::LeakedBeyondAbortDeadline => {
writeln!(f, " service {name:?} leaked beyond shutdown deadline")?;
ServiceShutdownOutcome::LeakedBeyondShutdownDeadline => {
writeln!(f, " service {name:?} leaked beyond shutdown deadline")
Comment thread
mcharytoniuk marked this conversation as resolved.
Comment thread
mcharytoniuk marked this conversation as resolved.
}
Comment thread
mcharytoniuk marked this conversation as resolved.
}
}?;
}

Ok(())
Expand Down
15 changes: 15 additions & 0 deletions trzcina/src/service_shutdown_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use std::time::Duration;

pub struct ServiceShutdownOptions {
pub cooperative_deadline: Duration,
pub abort_deadline: Duration,
}

impl Default for ServiceShutdownOptions {
fn default() -> Self {
Self {
cooperative_deadline: Duration::from_secs(10),
abort_deadline: Duration::from_secs(10),
}
}
}
2 changes: 1 addition & 1 deletion trzcina/src/service_shutdown_outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ pub enum ServiceShutdownOutcome {
Errored(Error),
Panicked(String),
AbortedByShutdownDeadline,
LeakedBeyondAbortDeadline,
LeakedBeyondShutdownDeadline,
}
Comment thread
mcharytoniuk marked this conversation as resolved.
2 changes: 1 addition & 1 deletion trzcina/src/service_shutdown_outcome_with_service_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl From<RunningService> for ServiceShutdownOutcomeWithServiceName {
let outcome = match running_service.outcome_receiver.try_recv() {
Ok(outcome) => outcome,
Err(TryRecvError::Closed) => ServiceShutdownOutcome::AbortedByShutdownDeadline,
Err(TryRecvError::Empty) => ServiceShutdownOutcome::LeakedBeyondAbortDeadline,
Err(TryRecvError::Empty) => ServiceShutdownOutcome::LeakedBeyondShutdownDeadline,
};

Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use trzcina::Service;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOutcome;
use tokio::sync::Notify;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use trzcina::Service;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOptions;
use trzcina::ServiceShutdownOutcome;

const PRODUCED_VALUE: u32 = 42;

Expand Down Expand Up @@ -74,7 +75,7 @@ async fn coordinates_via_shared_holder_between_two_services() {
let run_task = tokio::spawn(async move {
manager
.start(cancellation_token_for_run)
.run_to_completion(Duration::from_secs(1))
.run_to_completion(ServiceShutdownOptions::default())
.await
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use trzcina::Service;
use trzcina::ServiceBundle;
use trzcina::ServiceManager;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use trzcina::Service;
use trzcina::ServiceBundle;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOptions;

struct BundleAndService {
observation_tx: Option<oneshot::Sender<()>>,
Expand Down Expand Up @@ -58,7 +59,7 @@ async fn runs_all_services_returned_by_bundle() {
Duration::from_secs(5),
manager
.start(CancellationToken::new())
.run_to_completion(Duration::from_secs(1)),
.run_to_completion(ServiceShutdownOptions::default()),
)
.await
.unwrap()
Expand Down
7 changes: 4 additions & 3 deletions trzcina/tests/register_service_runs_registered_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use trzcina::Service;
use trzcina::ServiceManager;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use trzcina::Service;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOptions;

struct ObservableService {
observation_tx: Option<oneshot::Sender<()>>,
Expand Down Expand Up @@ -35,7 +36,7 @@ async fn runs_registered_service() {
Duration::from_secs(5),
manager
.start(CancellationToken::new())
.run_to_completion(Duration::from_secs(1)),
.run_to_completion(ServiceShutdownOptions::default()),
)
.await
.unwrap()
Expand Down
12 changes: 8 additions & 4 deletions trzcina/tests/run_aborts_hung_service_after_shutdown_deadline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use trzcina::Service;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOutcome;
use tokio::task::yield_now;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use trzcina::Service;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOptions;
use trzcina::ServiceShutdownOutcome;

struct ConfiguredService {
hang_ignoring_cancellation: bool,
Expand Down Expand Up @@ -39,7 +40,10 @@ async fn aborts_hung_service_after_shutdown_deadline() {
Duration::from_secs(5),
manager
.start(CancellationToken::new())
.run_to_completion(Duration::from_millis(50)),
.run_to_completion(ServiceShutdownOptions {
cooperative_deadline: Duration::from_millis(50),
abort_deadline: Duration::from_millis(50),
}),
)
.await
.unwrap();
Expand Down
12 changes: 8 additions & 4 deletions trzcina/tests/run_aborts_hung_services_on_external_cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use trzcina::Service;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOutcome;
use tokio::task::yield_now;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use trzcina::Service;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOptions;
use trzcina::ServiceShutdownOutcome;

struct CancellationIgnoringService;

Expand All @@ -32,7 +33,10 @@ async fn aborts_hung_services_on_external_cancel() {
let run_task = tokio::spawn(async move {
manager
.start(cancellation_token_for_run)
.run_to_completion(Duration::from_millis(50))
.run_to_completion(ServiceShutdownOptions {
cooperative_deadline: Duration::from_millis(50),
abort_deadline: Duration::from_millis(50),
})
.await
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use trzcina::Service;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOptions;
use trzcina::ServiceShutdownOutcome;

struct ThreadBlockingService {
block_duration: Duration,
}

#[async_trait]
impl Service for ThreadBlockingService {
async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> {
std::thread::sleep(self.block_duration);
Ok(())
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn applies_cooperative_and_abort_deadlines_independently() {
let cancellation_token = CancellationToken::new();
let cancellation_token_for_run = cancellation_token.clone();

let mut manager = ServiceManager::default();
manager.register_service(ThreadBlockingService {
block_duration: Duration::from_millis(500),
});

let run_task = tokio::spawn(async move {
manager
.start(cancellation_token_for_run)
.run_to_completion(ServiceShutdownOptions {
cooperative_deadline: Duration::from_millis(50),
abort_deadline: Duration::from_millis(2000),
})
.await
});

cancellation_token.cancel();

let report = timeout(Duration::from_secs(5), run_task)
.await
.expect("manager must return within outer bound")
.unwrap();

assert_eq!(report.outcomes().len(), 1);
assert!(matches!(
report.outcomes()[0].outcome,
ServiceShutdownOutcome::Completed,
));
assert!(report.into_result().is_ok());
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use trzcina::Service;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOutcome;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use trzcina::Service;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOptions;
use trzcina::ServiceShutdownOutcome;

struct AwaitingService {
observation_tx: Option<oneshot::Sender<()>>,
Expand Down Expand Up @@ -42,7 +43,7 @@ async fn cancels_all_services_when_external_token_cancelled() {
let run_task = tokio::spawn(async move {
manager
.start(cancellation_token_for_run)
.run_to_completion(Duration::from_secs(1))
.run_to_completion(ServiceShutdownOptions::default())
.await
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use trzcina::Service;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOutcome;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use trzcina::Service;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOptions;
use trzcina::ServiceShutdownOutcome;

struct ConfiguredService {
finish_immediately: bool,
Expand Down Expand Up @@ -50,7 +51,7 @@ async fn cancels_siblings_when_one_service_finishes_first() {
Duration::from_secs(5),
manager
.start(CancellationToken::new())
.run_to_completion(Duration::from_secs(1)),
.run_to_completion(ServiceShutdownOptions::default()),
)
.await
.unwrap();
Expand Down
Loading
Loading