Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
[workspace]
resolver = "3"
members = ["trzcina"]
members = [
"trzcina",
"trzcina-local-service",
"trzcina-sendable-service",
"trzcina-service",
]

[workspace.dependencies]
anyhow = "1.0"
Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
COVERAGE_PACKAGES := -p trzcina
COVERAGE_PACKAGES := --workspace
RUST_LOG ?= debug

# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -34,7 +34,9 @@ coverage: node_modules
cargo llvm-cov report
npx @intentee/rust-coverage-check target/llvm-cov.json \
--workspace-root $(CURDIR) \
--gated trzcina=97
--gated trzcina-local-service=100 \
--gated trzcina-sendable-service=100 \
--gated trzcina-service=100

.PHONY: coverage-clean
coverage-clean:
Expand Down
49 changes: 41 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@

Async service lifecycle orchestration for Rust. Run a set of long-lived async services concurrently, cancel siblings when one finishes, surface errors and panics through a typed outcome collection, and enforce an absolute shutdown deadline.

Cancellation is cooperative: every service must `.await` on the `CancellationToken` passed to `run` (typically inside a `tokio::select!`) and return when it fires. Services that ignore the token are aborted when the shutdown deadline expires.

If your service spawns child tasks via `tokio::spawn`, clone the cancellation token into them — trzcina only bounds the service's own task, not detached children. (Inside a `LocalService`, `tokio::task::spawn_local` is scoped to trzcina's `LocalSet` and is cancelled automatically; `tokio::spawn` still escapes.)
Comment thread
mcharytoniuk marked this conversation as resolved.

## Usage

```rust
use std::time::Duration;
A service that simply waits for shutdown:

```rust
use anyhow::Result;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use trzcina::{Service, ServiceManager};
use trzcina::{Manager, RunToCompletionOptions, RunningCollection, Service, ServiceManager};

struct EchoService;

Expand All @@ -27,11 +31,40 @@ async fn main() -> Result<()> {
let mut service_manager = ServiceManager::default();
service_manager.register_service(EchoService);

let running = service_manager.start(CancellationToken::new());
running
.run_to_completion(Duration::from_secs(10))
service_manager
.start(CancellationToken::new())
.run_to_completion(RunToCompletionOptions::default())
.await
.into_result()?;
Ok(())
.into_result()
.map_err(Into::into)
}
```

A service that does periodic work and yields to cancellation on every tick:

```rust
use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use trzcina::Service;

struct TickerService;

#[async_trait]
impl Service for TickerService {
async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> {
let mut ticker = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
biased;
() = cancellation_token.cancelled() => return Ok(()),
_ = ticker.tick() => {
// do periodic work here
}
}
}
}
}
```
24 changes: 24 additions & 0 deletions trzcina-local-service/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "trzcina-local-service"
version = "0.2.0"
edition = "2024"
license = "Apache-2.0"
description = "Local (!Send) services for trzcina: cooperative async lifecycle on a tokio LocalSet."
repository = "https://github.com/intentee/trzcina"
homepage = "https://github.com/intentee/trzcina"
readme = "../README.md"
authors = ["Intentee"]
keywords = ["async", "service", "lifecycle", "cancellation", "tokio"]
categories = ["asynchronous", "concurrency"]

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
futures-util = { workspace = true }
log = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
trzcina-service = { version = "0.2.0", path = "../trzcina-service" }

[lints]
workspace = true
11 changes: 11 additions & 0 deletions trzcina-local-service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
mod local_registered_service;
mod local_running_service_collection;
mod local_service;
mod local_service_bundle;
mod local_service_manager;

pub use crate::local_registered_service::LocalRegisteredService;
pub use crate::local_running_service_collection::LocalRunningServiceCollection;
pub use crate::local_service::LocalService;
pub use crate::local_service_bundle::LocalServiceBundle;
pub use crate::local_service_manager::LocalServiceManager;
6 changes: 6 additions & 0 deletions trzcina-local-service/src/local_registered_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use crate::local_service::LocalService;

pub struct LocalRegisteredService {
pub name: &'static str,
pub service: Box<dyn LocalService>,
}
95 changes: 95 additions & 0 deletions trzcina-local-service/src/local_running_service_collection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use tokio::sync::oneshot;
use tokio::task::JoinSet;
use tokio::task::LocalSet;
use tokio_util::sync::CancellationToken;
use trzcina_service::RunToCompletionOptions;
use trzcina_service::RunningCollection;
use trzcina_service::RunningService;
use trzcina_service::ServiceShutdownOutcome;
use trzcina_service::ServiceShutdownOutcomeCollection;
use trzcina_service::ServiceShutdownOutcomeWithServiceName;
use trzcina_service::SiblingCancellationGuard;
use trzcina_service::classify_future_outcome;
use trzcina_service::drain_to_completion;

use crate::local_registered_service::LocalRegisteredService;

pub struct LocalRunningServiceCollection {
cancellation_token: CancellationToken,
local_set: LocalSet,
running_services: Vec<RunningService>,
task_set: JoinSet<()>,
}

impl LocalRunningServiceCollection {
#[must_use]
pub fn start(
Comment thread
mcharytoniuk marked this conversation as resolved.
registered: Vec<LocalRegisteredService>,
cancellation_token: CancellationToken,
) -> Self {
let mut running_services: Vec<RunningService> = Vec::with_capacity(registered.len());
let mut task_set: JoinSet<()> = JoinSet::new();
let local_set = LocalSet::new();
let internal_cancellation_token = cancellation_token.child_token();

for LocalRegisteredService { name, service } in registered {
let (outcome_sender, outcome_receiver) = oneshot::channel::<ServiceShutdownOutcome>();
let service_cancellation_token = internal_cancellation_token.clone();

task_set.spawn_local_on(
async move {
let _sibling_cancellation_guard =
SiblingCancellationGuard::new(service_cancellation_token.clone());
let mut service = service;
let outcome =
classify_future_outcome(name, service.run(service_cancellation_token))
.await;
let _ = outcome_sender.send(outcome);
},
&local_set,
);

running_services.push(RunningService::new(name, outcome_receiver));
}

Self {
cancellation_token: internal_cancellation_token,
local_set,
running_services,
task_set,
}
}
}

impl RunningCollection for LocalRunningServiceCollection {
async fn run_to_completion(
self,
options: RunToCompletionOptions,
) -> ServiceShutdownOutcomeCollection {
let Self {
cancellation_token,
local_set,
running_services,
mut task_set,
} = self;

let has_running_services = !running_services.is_empty();

local_set
.run_until(async {
drain_to_completion(
&mut task_set,
&cancellation_token,
has_running_services,
options.shutdown_deadline,
)
.await;
})
.await;

let outcomes: Vec<ServiceShutdownOutcomeWithServiceName> =
running_services.into_iter().map(Into::into).collect();

ServiceShutdownOutcomeCollection::new(outcomes)
}
}
12 changes: 12 additions & 0 deletions trzcina-local-service/src/local_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use anyhow::Result;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;

#[async_trait(?Send)]
pub trait LocalService: 'static {
fn name(&self) -> &'static str {
std::any::type_name::<Self>()
}

async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()>;
}
9 changes: 9 additions & 0 deletions trzcina-local-service/src/local_service_bundle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use anyhow::Result;
use async_trait::async_trait;

use crate::local_service::LocalService;

#[async_trait(?Send)]
pub trait LocalServiceBundle {
async fn services(self) -> Result<Vec<Box<dyn LocalService>>>;
}
43 changes: 43 additions & 0 deletions trzcina-local-service/src/local_service_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use anyhow::Result;
use tokio_util::sync::CancellationToken;
use trzcina_service::Manager;

use crate::local_registered_service::LocalRegisteredService;
use crate::local_running_service_collection::LocalRunningServiceCollection;
use crate::local_service::LocalService;
use crate::local_service_bundle::LocalServiceBundle;

#[derive(Default)]
pub struct LocalServiceManager {
services: Vec<LocalRegisteredService>,
}

impl LocalServiceManager {
pub async fn register_bundle<TLocalServiceBundle: LocalServiceBundle>(
&mut self,
bundle: TLocalServiceBundle,
) -> Result<()> {
for service in bundle.services().await? {
let name = service.name();
self.services.push(LocalRegisteredService { name, service });
}

Ok(())
}

pub fn register_service(&mut self, service: impl LocalService) {
let name = service.name();
self.services.push(LocalRegisteredService {
name,
service: Box::new(service),
});
}
}

impl Manager for LocalServiceManager {
type Running = LocalRunningServiceCollection;

fn start(self, cancellation_token: CancellationToken) -> LocalRunningServiceCollection {
LocalRunningServiceCollection::start(self.services, cancellation_token)
}
Comment on lines +37 to +42
}
Loading
Loading