Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ struct EchoService;

#[async_trait]
impl Service for EchoService {
async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> {
async fn run(self: Box<Self>, cancellation_token: CancellationToken) -> Result<()> {
cancellation_token.cancelled().await;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion trzcina/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "trzcina"
version = "0.2.1"
version = "0.3.0"
edition = "2024"
license = "Apache-2.0"
description = "Async service lifecycle orchestration with cooperative cancellation and shutdown deadlines."
Expand Down
6 changes: 3 additions & 3 deletions trzcina/src/running_service_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ fn panic_payload_to_string(panic_payload: Box<dyn Any + Send>) -> String {

async fn run_service_with_sibling_cancellation_on_return(
service_name: &'static str,
mut service: Box<dyn Service>,
service: Box<dyn Service>,
cancellation_token: CancellationToken,
) -> ServiceShutdownOutcome {
let _sibling_cancellation_guard = SiblingCancellationGuard::new(cancellation_token.clone());
classify_service_outcome(service_name, &mut service, cancellation_token).await
classify_service_outcome(service_name, service, cancellation_token).await
}

async fn classify_service_outcome(
service_name: &'static str,
service: &mut Box<dyn Service>,
service: Box<dyn Service>,
cancellation_token: CancellationToken,
) -> ServiceShutdownOutcome {
info!("Service {service_name:?} starting");
Expand Down
2 changes: 1 addition & 1 deletion trzcina/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ pub trait Service: Send + 'static {
fn name(&self) -> &'static str {
std::any::type_name::<Self>()
}
async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()>;
async fn run(self: Box<Self>, cancellation_token: CancellationToken) -> Result<()>;
}
49 changes: 26 additions & 23 deletions trzcina/tests/coordinates_via_shared_holder_between_two_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,39 @@ use trzcina::ServiceShutdownOutcome;

const PRODUCED_VALUE: u32 = 42;

struct CoordinatingService {
is_producer: bool,
struct ProducerService {
notify: Arc<Notify>,
observation_tx: Option<oneshot::Sender<u32>>,
shared_state: Arc<Mutex<Option<u32>>>,
}

#[async_trait]
impl Service for CoordinatingService {
async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> {
if self.is_producer {
{
let mut guard = self.shared_state.lock().unwrap();
*guard = Some(PRODUCED_VALUE);
}
self.notify.notify_one();
cancellation_token.cancelled().await;
return Ok(());
impl Service for ProducerService {
async fn run(self: Box<Self>, cancellation_token: CancellationToken) -> Result<()> {
{
let mut guard = self.shared_state.lock().unwrap();
*guard = Some(PRODUCED_VALUE);
}
self.notify.notify_one();
cancellation_token.cancelled().await;
Ok(())
}
}

struct ConsumerService {
notify: Arc<Notify>,
observation_tx: oneshot::Sender<u32>,
shared_state: Arc<Mutex<Option<u32>>>,
}

#[async_trait]
impl Service for ConsumerService {
async fn run(self: Box<Self>, cancellation_token: CancellationToken) -> Result<()> {
tokio::select! {
() = cancellation_token.cancelled() => return Ok(()),
() = self.notify.notified() => {
let observed_value = *self.shared_state.lock().unwrap();
if let Some(value) = observed_value
&& let Some(observation_tx) = self.observation_tx.take()
{
observation_tx.send(value).unwrap();
if let Some(value) = observed_value {
self.observation_tx.send(value).unwrap();
}
Comment thread
mcharytoniuk marked this conversation as resolved.
}
}
Expand All @@ -59,16 +65,13 @@ async fn coordinates_via_shared_holder_between_two_services() {
let cancellation_token_for_run = cancellation_token.clone();

let mut manager = ServiceManager::default();
manager.register_service(CoordinatingService {
is_producer: true,
manager.register_service(ProducerService {
notify: notify.clone(),
observation_tx: None,
shared_state: shared_state.clone(),
});
manager.register_service(CoordinatingService {
is_producer: false,
manager.register_service(ConsumerService {
notify: notify.clone(),
observation_tx: Some(observation_tx),
observation_tx,
shared_state: shared_state.clone(),
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,30 @@ use trzcina::ServiceBundle;
use trzcina::ServiceManager;
use trzcina::ServiceShutdownOptions;

struct BundleAndService {
observation_tx: Option<oneshot::Sender<()>>,
sibling_senders: Vec<oneshot::Sender<()>>,
struct ObservableService {
observation_tx: oneshot::Sender<()>,
}

#[async_trait]
impl Service for BundleAndService {
async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> {
if let Some(observation_tx) = self.observation_tx.take() {
observation_tx.send(()).unwrap();
}
impl Service for ObservableService {
async fn run(self: Box<Self>, _cancellation_token: CancellationToken) -> Result<()> {
self.observation_tx.send(()).unwrap();
Comment thread
mcharytoniuk marked this conversation as resolved.
Ok(())
}
}

struct SiblingsBundle {
sibling_senders: Vec<oneshot::Sender<()>>,
}

#[async_trait]
impl ServiceBundle for BundleAndService {
impl ServiceBundle for SiblingsBundle {
async fn services(self) -> Result<Vec<Box<dyn Service>>> {
let services: Vec<Box<dyn Service>> = self
.sibling_senders
.into_iter()
.map(|observation_tx| {
Box::new(BundleAndService {
observation_tx: Some(observation_tx),
sibling_senders: Vec::new(),
}) as Box<dyn Service>
Box::new(ObservableService { observation_tx }) as Box<dyn Service>
})
.collect();
Ok(services)
Expand All @@ -47,8 +45,7 @@ async fn runs_all_services_returned_by_bundle() {
let (first_tx, mut first_rx) = oneshot::channel::<()>();
let (second_tx, mut second_rx) = oneshot::channel::<()>();

let bundle = BundleAndService {
observation_tx: None,
let bundle = SiblingsBundle {
sibling_senders: vec![first_tx, second_tx],
};

Expand Down
12 changes: 4 additions & 8 deletions trzcina/tests/register_service_runs_registered_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ use trzcina::ServiceManager;
use trzcina::ServiceShutdownOptions;

struct ObservableService {
observation_tx: Option<oneshot::Sender<()>>,
observation_tx: oneshot::Sender<()>,
}

#[async_trait]
impl Service for ObservableService {
async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> {
if let Some(observation_tx) = self.observation_tx.take() {
observation_tx.send(()).unwrap();
}
async fn run(self: Box<Self>, _cancellation_token: CancellationToken) -> Result<()> {
self.observation_tx.send(()).unwrap();
Comment thread
mcharytoniuk marked this conversation as resolved.
Ok(())
}
}
Expand All @@ -28,9 +26,7 @@ async fn runs_registered_service() {
let (observation_tx, mut observation_rx) = oneshot::channel::<()>();

let mut manager = ServiceManager::default();
manager.register_service(ObservableService {
observation_tx: Some(observation_tx),
});
manager.register_service(ObservableService { observation_tx });

timeout(
Duration::from_secs(5),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ struct ConfiguredService {

#[async_trait]
impl Service for ConfiguredService {
async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> {
async fn run(self: Box<Self>, _cancellation_token: CancellationToken) -> Result<()> {
if self.hang_ignoring_cancellation {
loop {
yield_now().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ struct CancellationIgnoringService;

#[async_trait]
impl Service for CancellationIgnoringService {
async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> {
async fn run(self: Box<Self>, _cancellation_token: CancellationToken) -> Result<()> {
loop {
yield_now().await;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct ThreadBlockingService {

#[async_trait]
impl Service for ThreadBlockingService {
async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> {
async fn run(self: Box<Self>, _cancellation_token: CancellationToken) -> Result<()> {
std::thread::sleep(self.block_duration);
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ use trzcina::ServiceShutdownOptions;
use trzcina::ServiceShutdownOutcome;

struct AwaitingService {
observation_tx: Option<oneshot::Sender<()>>,
observation_tx: oneshot::Sender<()>,
}

#[async_trait]
impl Service for AwaitingService {
async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> {
async fn run(self: Box<Self>, cancellation_token: CancellationToken) -> Result<()> {
cancellation_token.cancelled().await;
if let Some(observation_tx) = self.observation_tx.take() {
observation_tx.send(()).unwrap();
}
self.observation_tx.send(()).unwrap();
Comment thread
mcharytoniuk marked this conversation as resolved.
Ok(())
}
}
Expand All @@ -34,9 +32,7 @@ async fn cancels_all_services_when_external_token_cancelled() {

for _ in 0..5 {
let (observation_tx, observation_rx) = oneshot::channel::<()>();
manager.register_service(AwaitingService {
observation_tx: Some(observation_tx),
});
manager.register_service(AwaitingService { observation_tx });
observation_receivers.push(observation_rx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,37 @@ use trzcina::ServiceManager;
use trzcina::ServiceShutdownOptions;
use trzcina::ServiceShutdownOutcome;

struct ConfiguredService {
finish_immediately: bool,
observation_tx: Option<oneshot::Sender<()>>,
struct FinishImmediatelyService;

#[async_trait]
impl Service for FinishImmediatelyService {
async fn run(self: Box<Self>, _cancellation_token: CancellationToken) -> Result<()> {
Ok(())
}
}

struct WaitingObserverService {
observation_tx: oneshot::Sender<()>,
}

#[async_trait]
impl Service for ConfiguredService {
async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> {
if self.finish_immediately {
return Ok(());
}
impl Service for WaitingObserverService {
async fn run(self: Box<Self>, cancellation_token: CancellationToken) -> Result<()> {
cancellation_token.cancelled().await;
if let Some(observation_tx) = self.observation_tx.take() {
observation_tx.send(()).unwrap();
}
self.observation_tx.send(()).unwrap();
Comment thread
mcharytoniuk marked this conversation as resolved.
Ok(())
}
}

#[tokio::test]
async fn cancels_siblings_when_one_service_finishes_first() {
let mut manager = ServiceManager::default();
manager.register_service(ConfiguredService {
finish_immediately: true,
observation_tx: None,
});
manager.register_service(FinishImmediatelyService);

let mut sibling_observation_receivers = Vec::new();
for _ in 0..4 {
let (observation_tx, observation_rx) = oneshot::channel::<()>();
manager.register_service(ConfiguredService {
finish_immediately: false,
observation_tx: Some(observation_tx),
});
manager.register_service(WaitingObserverService { observation_tx });
sibling_observation_receivers.push(observation_rx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ struct InstantOkService;

#[async_trait]
impl Service for InstantOkService {
async fn run(&mut self, _cancellation_token: CancellationToken) -> Result<()> {
async fn run(self: Box<Self>, _cancellation_token: CancellationToken) -> Result<()> {
Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,24 @@ use trzcina::ServiceManager;
use trzcina::ServiceShutdownOptions;
use trzcina::ServiceShutdownOutcome;

struct ConfiguredService {
return_err: bool,
observation_tx: Option<oneshot::Sender<()>>,
struct ErroringService;

#[async_trait]
impl Service for ErroringService {
async fn run(self: Box<Self>, _cancellation_token: CancellationToken) -> Result<()> {
Err(anyhow!("erroring service deliberately failed"))
}
}

struct WaitingObserverService {
observation_tx: oneshot::Sender<()>,
}

#[async_trait]
impl Service for ConfiguredService {
async fn run(&mut self, cancellation_token: CancellationToken) -> Result<()> {
if self.return_err {
return Err(anyhow!("erroring service deliberately failed"));
}
impl Service for WaitingObserverService {
async fn run(self: Box<Self>, cancellation_token: CancellationToken) -> Result<()> {
cancellation_token.cancelled().await;
if let Some(observation_tx) = self.observation_tx.take() {
observation_tx.send(()).unwrap();
}
self.observation_tx.send(()).unwrap();
Comment thread
mcharytoniuk marked this conversation as resolved.
Ok(())
}
}
Expand All @@ -34,19 +37,13 @@ impl Service for ConfiguredService {
async fn records_all_failures_when_multiple_services_error() {
let mut manager = ServiceManager::default();
for _ in 0..3 {
manager.register_service(ConfiguredService {
return_err: true,
observation_tx: None,
});
manager.register_service(ErroringService);
}

let mut sibling_observation_receivers = Vec::new();
for _ in 0..2 {
let (observation_tx, observation_rx) = oneshot::channel::<()>();
manager.register_service(ConfiguredService {
return_err: false,
observation_tx: Some(observation_tx),
});
manager.register_service(WaitingObserverService { observation_tx });
sibling_observation_receivers.push(observation_rx);
}

Expand Down
Loading
Loading