Skip to content
6 changes: 3 additions & 3 deletions crates/sdk-core/src/core_tests/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ async fn replay_with_empty_first_task() {
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
let core = mock_worker(mock);

// In this task imagine we are waiting on the first update being sent, hence no commands come
// out, and on replay the first activation should only be init.
// Initialization and updates should remain in separate activations even if the first WFT
// is empty, to maintain consistency between incremental processing and replay.
let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
Expand All @@ -59,7 +59,7 @@ async fn replay_with_empty_first_task() {
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::DoUpdate(_)),
},]
}]
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
Expand Down
21 changes: 14 additions & 7 deletions crates/sdk-core/src/worker/workflow/history_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,6 @@ fn find_end_index_of_next_wft_seq(
}
let mut last_index = 0;
let mut saw_command_or_started = false;
let mut saw_command = false;
let mut wft_started_event_id_to_index = vec![];
for (ix, e) in events.iter().enumerate() {
last_index = ix;
Expand All @@ -709,7 +708,6 @@ fn find_end_index_of_next_wft_seq(
}

if e.is_command_event() {
saw_command = true;
saw_command_or_started = true;
}
if e.event_type() == EventType::WorkflowExecutionStarted {
Expand Down Expand Up @@ -739,12 +737,17 @@ fn find_end_index_of_next_wft_seq(
continue;
} else if next_event_type == EventType::WorkflowTaskCompleted {
if let Some(next_next_event) = events.get(ix + 2) {
if !saw_command
if !saw_command_or_started
&& next_next_event.event_type() == EventType::WorkflowTaskScheduled
{
// If we've never seen an interesting event and the next two events are
// a completion followed immediately again by scheduled, then this is a
// WFT heartbeat and also doesn't conclude the sequence.
// Keep this consistent with other skipped WFTs: do not allow a
// heartbeat task to become a synthetic boundary for update sequencing,
// since whether that heartbeat appears in a given history window can
// vary between incremental processing and replay-after-cache-miss.
wft_started_event_id_to_index.pop();
continue;
} else {
// If we see an update accepted command after WFT completed, we want to
Expand Down Expand Up @@ -928,7 +931,9 @@ mod tests {

let mut update = t.as_history_update();
let seq = next_check_peek(&mut update, 0);
assert_eq!(seq.len(), 6);
assert_eq!(seq.len(), 3);
let seq = next_check_peek(&mut update, 3);
assert_eq!(seq.len(), 3);
let seq = next_check_peek(&mut update, 6);
assert_eq!(seq.len(), 4);
let seq = next_check_peek(&mut update, 10);
Expand Down Expand Up @@ -1657,11 +1662,13 @@ mod tests {

let mut update = t.as_history_update();
let seq = next_check_peek(&mut update, 0);
// unlike the case with a wft failure, here the first task should not extend through to
// the update, because here the first empty WFT happened with _just_ the workflow init,
// not also with the update.
// The first task remains a boundary.
assert_eq!(seq.len(), 3);
let seq = next_check_peek(&mut update, 3);
// The second heartbeat-like task IS NOT collapsed here because it is part of the
// sequencing window for the update.
assert_eq!(seq.len(), 3);
let seq = next_check_peek(&mut update, 6);
assert_eq!(seq.len(), 7);
}
}
106 changes: 79 additions & 27 deletions crates/sdk-core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) use history_update::HistoryUpdate;
use crate::{
MetricsContext, WorkerConfig,
abstractions::{
MeteredPermitDealer, TrackedOwnedMeteredSemPermit, UsedMeteredSemPermit, dbg_panic,
MeteredPermitDealer, TrackedOwnedMeteredSemPermit, UsedMeteredSemPermit,
take_cell::TakeCell,
},
internal_flags::InternalFlags,
Expand Down Expand Up @@ -233,21 +233,18 @@ impl Workflows {
while let Some(output) = stream.next().await {
match output {
Ok(o) => {
for fetchreq in o.fetch_histories {
fetch_tx
.send(fetchreq)
.expect("Fetch channel must not be dropped");
}
for act in o.activations {
activation_tx
.send(Ok(act))
.expect("Activation processor channel not dropped");
if !forward_stream_output(&fetch_tx, &activation_tx, o) {
return;
}
}
Err(e) => {
let _ = activation_tx.send(Err(e)).inspect_err(|e| {
error!(activation=?e.0, "Activation processor channel dropped");
});
if let Err(e) = activation_tx.send(Err(e)) {
error!(
activation=?e.0,
"Activation processor channel dropped, stopping workflow processing"
);
return;
}
}
}
}
Expand Down Expand Up @@ -543,10 +540,7 @@ impl Workflows {
// Empty complete which is likely an evict reply, we can just ignore.
return Ok(());
}
panic!(
"A non-empty completion was not processed. Workflow processing may have \
terminated unexpectedly. This is a bug."
);
return Err(CompleteWfError::WorkflowNotEnabled);
}

let completion_outcome = if let Ok(c) = rx.await {
Expand All @@ -557,13 +551,11 @@ impl Workflows {
// Empty complete which is likely an evict reply, we can just ignore as above.
return Ok(());
} else {
dbg_panic!("Send half of activation complete response channel went missing");
self.request_eviction(
return Err(CompleteWfError::MalformedWorkflowCompletion {
reason: "Send half of activation complete response channel went missing"
.to_string(),
run_id,
"Send half of activation complete response channel went missing",
EvictionReason::Fatal,
);
return Ok(());
});
};
let replaying = completion_outcome.replaying;

Expand Down Expand Up @@ -785,9 +777,10 @@ impl Workflows {
at_handle.add_tasks(with_permits);
}
} else if !eager_acts.is_empty() {
panic!(
error!(
"Requested eager activity execution but this worker has no activity task \
manager! This is an internal bug, Core should not have asked for tasks."
manager! This is an internal bug, Core should not have asked for tasks. \
Ignoring the tasks."
)
}
}
Expand Down Expand Up @@ -871,6 +864,29 @@ impl Workflows {
}
}

fn forward_stream_output(
fetch_tx: &UnboundedSender<HistoryFetchReq>,
activation_tx: &UnboundedSender<Result<ActivationOrAuto, PollError>>,
output: WFStreamOutput,
) -> bool {
for fetchreq in output.fetch_histories {
if let Err(e) = fetch_tx.send(fetchreq) {
warn!(fetch=?e.0, "Fetch channel dropped, stopping workflow processing");
return false;
}
}
for act in output.activations {
if let Err(e) = activation_tx.send(Ok(act)) {
debug!(
activation=?e.0,
"Activation processor channel dropped, stopping workflow processing"
);
return false;
}
}
true
}

/// Returned when a cache miss happens and we need to fetch history from the beginning to
/// replay a run
#[derive(Debug, derive_more::Display)]
Expand Down Expand Up @@ -1678,7 +1694,7 @@ fn prepare_to_ship_activation(wfa: &mut WorkflowActivation) {
)
});
if any_job_is_query && !all_jobs_are_query {
dbg_panic!(
error!(
"About to issue an activation that contains query jobs with non-query jobs: {:?}",
&wfa
);
Expand Down Expand Up @@ -1733,6 +1749,7 @@ mod tests {
use super::*;
use itertools::Itertools;
use temporalio_common::protos::coresdk::workflow_activation::SignalWorkflow;
use tokio::sync::mpsc::unbounded_channel;

#[test]
fn jobs_sort() {
Expand Down Expand Up @@ -1803,7 +1820,6 @@ mod tests {
}

#[test]
#[should_panic]
fn queries_cannot_go_with_other_jobs() {
let mut act = WorkflowActivation {
jobs: vec![
Expand All @@ -1822,4 +1838,40 @@ mod tests {
};
prepare_to_ship_activation(&mut act);
}

#[test]
fn forwarding_output_stops_when_activation_channel_dropped() {
let (fetch_tx, _fetch_rx) = unbounded_channel();
let (activation_tx, activation_rx) = unbounded_channel();
drop(activation_rx);

let output = WFStreamOutput {
activations: vec![ActivationOrAuto::Autocomplete {
run_id: "run-id".to_string(),
}]
.into_iter()
.collect(),
fetch_histories: Default::default(),
};

assert!(!forward_stream_output(&fetch_tx, &activation_tx, output));
}

#[test]
fn forwarding_output_succeeds_with_live_channels() {
let (fetch_tx, _fetch_rx) = unbounded_channel();
let (activation_tx, mut activation_rx) = unbounded_channel();

let output = WFStreamOutput {
activations: vec![ActivationOrAuto::Autocomplete {
run_id: "run-id".to_string(),
}]
.into_iter()
.collect(),
fetch_histories: Default::default(),
};

assert!(forward_stream_output(&fetch_tx, &activation_tx, output));
assert!(activation_rx.try_recv().is_ok());
}
}
19 changes: 11 additions & 8 deletions crates/sdk-core/src/worker/workflow/workflow_stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{
MetricsContext,
abstractions::dbg_panic,
worker::workflow::{
managed_run::RunUpdateAct,
run_cache::RunCache,
Expand Down Expand Up @@ -260,7 +259,7 @@ impl WFStream {
let rh = if let Some(rh) = self.runs.get_mut(complete.run_id()) {
rh
} else {
dbg_panic!("Run missing during completion {:?}", complete);
warn!("Run missing during completion {:?}", complete);
return vec![];
};
let mut acts: Vec<_> = match complete {
Expand Down Expand Up @@ -329,11 +328,11 @@ impl WFStream {
if let Some(WFTWithPaginator { wft, .. }) = &wft_from_complete {
debug!(run_id=%wft.execution.run_id, "New WFT from completion");
if &wft.execution.run_id != run_id {
dbg_panic!(
"Server returned a WFT on completion for a different run ({}) than the \
one being completed ({}). This is a server bug.",
wft.execution.run_id,
run_id
warn!(
server_wft_run_id = %wft.execution.run_id,
completed_run_id = %run_id,
"Server returned a WFT on completion for a different run than the one being \
completed. This is a server bug. Ignoring the WFT."
);
}
}
Expand Down Expand Up @@ -385,9 +384,13 @@ impl WFStream {
// If there happened to be more than one buffered WFT for this run, move them into the
// now-instantiated run's buffer.
for wft in maybe_wfts {
let rid = wft.work.execution.run_id.clone();
let should_be_nothing = self.instantiate_or_update(wft);
if should_be_nothing.is_some() {
dbg_panic!("Extra buffered run should not have produced an activation");
warn!(
run_id = %rid,
"Extra buffered run should not have produced an activation"
);
}
}
}
Expand Down