diff --git a/Cargo.toml b/Cargo.toml index 9f3843119..d07fbeb00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ prost = "0.14" prost-types = { version = "0.7", package = "prost-wkt-types" } pbjson = "0.9" pbjson-build = "0.9" +serde_json = "1.0" [workspace.lints.rust] unreachable_pub = "warn" diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index 7b538a67d..147f4b604 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -42,6 +42,7 @@ tracing = "0.1" url = "2.5" uuid = { version = "1.18", features = ["v4"] } rand = "0.10" +serde_json = { workspace = true } [dependencies.temporalio-common] path = "../common" diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 26bc766ef..75d6dbfde 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -72,7 +72,7 @@ use temporalio_common::{ HasWorkflowDefinition, data_converters::{DataConverter, SerializationContextData}, protos::{ - coresdk::IntoPayloadsExt, + coresdk::{AsJsonPayloadExt, IntoPayloadsExt}, grpc::health::v1::health_client::HealthClient, proto_ts_to_system_time, temporal::api::{ @@ -81,6 +81,7 @@ use temporalio_common::{ enums::v1::{TaskQueueKind, WorkflowExecutionStatus}, errordetails::v1::WorkflowExecutionAlreadyStartedFailure, operatorservice::v1::operator_service_client::OperatorServiceClient, + sdk::v1::UserMetadata, taskqueue::v1::TaskQueue, testservice::v1::test_service_client::TestServiceClient, workflow::v1 as workflow, @@ -1030,6 +1031,22 @@ where let workflow_id = options.workflow_id.clone(); let task_queue_name = options.task_queue.clone(); + let user_metadata = if options.static_summary.is_some() || options.static_details.is_some() + { + Some(UserMetadata { + summary: options.static_summary.map(|s| { + s.as_json_payload() + .expect("String-to-JSON payload serialization is infallible") + }), + details: options.static_details.map(|s| { + s.as_json_payload() + .expect("String-to-JSON payload serialization is infallible") + }), + }) + } else { + None + }; + let run_id = if let Some(start_signal) = options.start_signal { // Use signal-with-start when a start_signal is provided let res = WorkflowService::signal_with_start_workflow_execution( @@ -1060,6 +1077,7 @@ where search_attributes: options.search_attributes.map(|d| d.into()), cron_schedule: options.cron_schedule.unwrap_or_default(), header: options.header.or(start_signal.header), + user_metadata, ..Default::default() } .into_request(), @@ -1100,6 +1118,7 @@ where completion_callbacks: options.completion_callbacks, priority: Some(options.priority.into()), header: options.header, + user_metadata, ..Default::default() } .into_request(), diff --git a/crates/client/src/options_structs.rs b/crates/client/src/options_structs.rs index c58c77987..6153522db 100644 --- a/crates/client/src/options_structs.rs +++ b/crates/client/src/options_structs.rs @@ -238,6 +238,12 @@ pub struct WorkflowStartOptions { /// Headers to include with the start request. pub header: Option
, + + /// Single-line static summary for the workflow, shown in the Temporal UI. + pub static_summary: Option, + + /// Multi-line static details for the workflow, shown in the Temporal UI. + pub static_details: Option, } /// A signal to send atomically when starting a workflow. diff --git a/crates/client/src/workflow_handle.rs b/crates/client/src/workflow_handle.rs index 661269c87..441ccc30b 100644 --- a/crates/client/src/workflow_handle.rs +++ b/crates/client/src/workflow_handle.rs @@ -73,6 +73,34 @@ impl WorkflowExecutionDescription { fn new(raw_description: DescribeWorkflowExecutionResponse) -> Self { Self { raw_description } } + + /// The static summary set when the workflow was started, if any. + // TOOD: Use DataConverter to avoid direct dependency on serde_json + pub fn static_summary(&self) -> Option { + let payload = self + .raw_description + .execution_config + .as_ref()? + .user_metadata + .as_ref()? + .summary + .as_ref()?; + serde_json::from_slice(&payload.data).ok() + } + + /// The static details set when the workflow was started, if any. + // TOOD: Use DataConverter to avoid direct dependency on serde_json + pub fn static_details(&self) -> Option { + let payload = self + .raw_description + .execution_config + .as_ref()? + .user_metadata + .as_ref()? + .details + .as_ref()?; + serde_json::from_slice(&payload.data).ok() + } } // TODO [rust-sdk-branch]: Could implment stream a-la ListWorkflowsStream diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index f88c5c8ab..7091a2070 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -75,7 +75,7 @@ prost-types = { workspace = true } rand = { version = "0.10", optional = true } ringbuf = { version = "0.4", optional = true } serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" +serde_json = { workspace = true } thiserror = { workspace = true } tokio = { version = "1.47", features = [], optional = true } toml = { version = "1.0", optional = true } diff --git a/crates/common/build.rs b/crates/common/build.rs index 45aed80ae..39248237e 100644 --- a/crates/common/build.rs +++ b/crates/common/build.rs @@ -158,6 +158,7 @@ fn main() -> Result<(), Box> { }, &[ "./protos/local/temporal/sdk/core/core_interface.proto", + "./protos/api_upstream/temporal/api/sdk/v1/workflow_metadata.proto", "./protos/api_upstream/temporal/api/workflowservice/v1/service.proto", "./protos/api_upstream/temporal/api/operatorservice/v1/service.proto", "./protos/api_upstream/temporal/api/errordetails/v1/message.proto", diff --git a/crates/sdk-core-c-bridge/Cargo.toml b/crates/sdk-core-c-bridge/Cargo.toml index 5b50d6231..5105b0a19 100644 --- a/crates/sdk-core-c-bridge/Cargo.toml +++ b/crates/sdk-core-c-bridge/Cargo.toml @@ -29,7 +29,7 @@ prost = { workspace = true } rand = "0.10" rand_pcg = "0.10" serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" +serde_json = { workspace = true } tokio = "1.47" tokio-stream = "0.1" tokio-util = "0.7" diff --git a/crates/sdk-core/Cargo.toml b/crates/sdk-core/Cargo.toml index 1f49a5854..09116e58a 100644 --- a/crates/sdk-core/Cargo.toml +++ b/crates/sdk-core/Cargo.toml @@ -78,7 +78,7 @@ reqwest = { version = "0.13", features = [ "rustls", ], default-features = false, optional = true } serde = "1.0" -serde_json = "1.0" +serde_json = { workspace = true } siphasher = "1.0" slotmap = "1.0" sysinfo = { version = "0.38", default-features = false, features = ["system"] } diff --git a/crates/sdk-core/src/test_help/integ_helpers.rs b/crates/sdk-core/src/test_help/integ_helpers.rs index 829615a45..773244b96 100644 --- a/crates/sdk-core/src/test_help/integ_helpers.rs +++ b/crates/sdk-core/src/test_help/integ_helpers.rs @@ -4,7 +4,10 @@ pub use crate::{ internal_flags::CoreInternalFlags, - worker::{LEGACY_QUERY_ID, client::mocks::mock_worker_client}, + worker::{ + LEGACY_QUERY_ID, + client::{LegacyQueryResult, mocks::mock_worker_client}, + }, }; use crate::{ @@ -16,7 +19,7 @@ use crate::{ sticky_q_name_for_worker, worker::{ TaskPollers, WorkerTelemetry, - client::{LegacyQueryResult, MockWorkerClient, WorkerClient, WorkflowTaskCompletion}, + client::{MockWorkerClient, WorkerClient, WorkflowTaskCompletion}, worker_config_builder, }, }; diff --git a/crates/sdk-core/src/worker/client.rs b/crates/sdk-core/src/worker/client.rs index 206da074e..0fce87351 100644 --- a/crates/sdk-core/src/worker/client.rs +++ b/crates/sdk-core/src/worker/client.rs @@ -44,8 +44,11 @@ use uuid::Uuid; type Result = std::result::Result; +/// The result of a legacy query sent via `respond_legacy_query`. pub enum LegacyQueryResult { + /// The query handler returned a result successfully. Succeeded(QueryResult), + /// The query handler failed. Failed(workflow_completion::Failure), } diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs index 41c2b92a3..45ac5222c 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs @@ -21,7 +21,8 @@ use temporalio_common::{ protos::{ DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE, TestHistoryBuilder, canned_histories, coresdk::{ - ActivityHeartbeat, ActivityTaskCompletion, IntoCompletion, IntoPayloadsExt, + ActivityHeartbeat, ActivityTaskCompletion, AsJsonPayloadExt, IntoCompletion, + IntoPayloadsExt, activity_result::{ self, ActivityExecutionResult, ActivityResolution, activity_resolution as act_res, }, @@ -1286,7 +1287,7 @@ async fn pass_activity_summary_to_metadata() { let wf_id = mock_cfg.hists[0].wf_id.clone(); let wf_type = DEFAULT_WORKFLOW_TYPE; let expected_user_metadata = Some(UserMetadata { - summary: Some(b"activity summary".into()), + summary: Some("activity summary".as_json_payload().unwrap()), details: None, }); mock_cfg.completion_asserts_from_expectations(|mut asserts| { diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/child_workflows.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/child_workflows.rs index b167d8e3e..984d83110 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/child_workflows.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/child_workflows.rs @@ -640,8 +640,8 @@ async fn pass_child_workflow_summary_to_metadata() { let t = canned_histories::single_child_workflow(wf_id); let mut mock_cfg = MockPollCfg::from_hist_builder(t); let expected_user_metadata = Some(UserMetadata { - summary: Some(b"child summary".into()), - details: Some(b"child details".into()), + summary: Some("child summary".as_json_payload().unwrap()), + details: Some("child details".as_json_payload().unwrap()), }); mock_cfg.completion_asserts_from_expectations(|mut asserts| { asserts diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/client_interactions.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/client_interactions.rs index caaceb86e..3004db806 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/client_interactions.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/client_interactions.rs @@ -1,7 +1,8 @@ use crate::common::CoreWfStarter; use temporalio_client::{ - UntypedQuery, UntypedSignal, UntypedUpdate, WorkflowExecuteUpdateOptions, WorkflowQueryOptions, - WorkflowSignalOptions, WorkflowStartOptions, + UntypedQuery, UntypedSignal, UntypedUpdate, WorkflowDescribeOptions, + WorkflowExecuteUpdateOptions, WorkflowQueryOptions, WorkflowSignalOptions, + WorkflowStartOptions, }; use temporalio_common::{ data_converters::{PayloadConverter, RawValue}, @@ -620,3 +621,52 @@ async fn test_typed_signal_query_update() { let result = handle.get_result(Default::default()).await.unwrap(); assert_eq!(result.counter, 100); } + +#[workflow] +#[derive(Default)] +struct ImmediatelyCompletingWf; + +#[workflow_methods] +impl ImmediatelyCompletingWf { + #[run] + async fn run(_ctx: &mut WorkflowContext) -> WorkflowResult<()> { + Ok(()) + } +} + +/// Verify that `static_summary` and `static_details` set in `WorkflowStartOptions` are +/// stored on the server and visible via `DescribeWorkflowExecution`. +#[tokio::test] +async fn static_summary_and_details_visible_after_start() { + let wf_name = "static_summary_and_details_visible_after_start"; + let mut starter = CoreWfStarter::new(wf_name); + starter.sdk_config.task_types = WorkerTaskTypes::workflow_only(); + let mut worker = starter.worker().await; + worker.register_workflow::(); + + let task_queue = starter.get_task_queue().to_owned(); + let handle = worker + .submit_workflow( + ImmediatelyCompletingWf::run, + (), + WorkflowStartOptions::new(task_queue, wf_name) + .static_summary("my static summary") + .static_details("my static details") + .build(), + ) + .await + .unwrap(); + + worker.run_until_done().await.unwrap(); + + let description = handle + .describe(WorkflowDescribeOptions::default()) + .await + .unwrap(); + + let summary = description.static_summary().expect("summary present"); + assert_eq!(summary, "my static summary",); + + let details = description.static_details().expect("details present"); + assert_eq!(details, "my static details",); +} diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/queries.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/queries.rs index 0ad99e3e4..a2ea8c081 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/queries.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/queries.rs @@ -17,7 +17,7 @@ use temporalio_common::protos::{ use temporalio_macros::{workflow, workflow_methods}; use temporalio_sdk::{SyncWorkflowContext, WorkflowContext, WorkflowContextView, WorkflowResult}; use temporalio_sdk_core::test_help::{ - MockPollCfg, ResponseType, hist_to_poll_resp, mock_worker_client, + LegacyQueryResult, MockPollCfg, ResponseType, hist_to_poll_resp, mock_worker_client, }; /// A workflow that returns Pending on first poll and Ready on second poll. @@ -412,3 +412,172 @@ async fn query_returns_workflow_context_view_info() { worker.register_workflow::(); worker.run().await.unwrap(); } + +/// Workflow that sets current_details and then blocks indefinitely. +/// The pending await keeps the workflow alive when the metadata query arrives. +#[workflow] +#[derive(Default)] +struct CurrentDetailsWf; + +#[workflow_methods] +impl CurrentDetailsWf { + #[run(name = DEFAULT_WORKFLOW_TYPE)] + async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { + ctx.set_current_details("details from workflow"); + ctx.wait_condition(|_| false).await; + Ok(()) + } +} + +/// Verify that the query returns a proto-JSON-encoded `WorkflowMetadata` +/// whose `current_details` field reflects the value set by `set_current_details`. +#[tokio::test] +async fn workflow_metadata_query_returns_current_details() { + let wfid = "workflow_metadata_query_test"; + + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + + let tasks = [ + // First task: workflow starts, sets current_details, and blocks on pending. + hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::ToTaskNum(1)), + // Second task: legacy query for __temporal_workflow_metadata while workflow is blocked. + { + let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::ToTaskNum(1)); + pr.query = Some(WorkflowQuery { + query_type: "__temporal_workflow_metadata".to_string(), + query_args: None, + header: None, + }); + pr.history = Some(Default::default()); + pr + }, + ]; + + let mut mock_cfg = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_worker_client()); + mock_cfg.num_expected_legacy_query_resps = 1; + + mock_cfg.completion_asserts_from_expectations(|mut asserts| { + asserts.then(|wft| { + // First activation: workflow runs, sets current_details, and blocks. + // No commands should be emitted. + assert!( + wft.commands.is_empty(), + "Expected no commands on first activation, got: {:?}", + wft.commands + ); + }); + }); + + // The legacy query response goes through respond_legacy_query, not complete_workflow_activation. + // Use expect_legacy_query_matcher to assert on the actual payload sent to the server. + mock_cfg.expect_legacy_query_matcher = Box::new(|_, result| { + let LegacyQueryResult::Succeeded(qr) = result else { + panic!("Expected Succeeded legacy query result"); + }; + let Some(query_result::Variant::Succeeded(success)) = &qr.variant else { + panic!("Expected Succeeded query result variant"); + }; + let payload = success + .response + .as_ref() + .expect("Expected a response payload in __temporal_workflow_metadata response"); + + assert_eq!( + payload.metadata.get("encoding").map(|v| v.as_slice()), + Some(b"json/plain".as_slice()), + "Expected json/plain encoding" + ); + + // The data is proto-JSON: {"currentDetails":"..."} + let json: serde_json::Value = + serde_json::from_slice(&payload.data).expect("Response data should be valid JSON"); + assert_eq!( + json["currentDetails"].as_str(), + Some("details from workflow"), + "current_details should match what the workflow set" + ); + + true + }); + + let mut worker = build_fake_sdk(mock_cfg); + worker.register_workflow::(); + worker.run().await.unwrap(); +} + +/// Workflow that blocks indefinitely without ever setting current_details. +#[workflow] +#[derive(Default)] +struct NoCurrentDetailsWf; + +#[workflow_methods] +impl NoCurrentDetailsWf { + #[run(name = DEFAULT_WORKFLOW_TYPE)] + async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { + ctx.wait_condition(|_| false).await; + Ok(()) + } +} + +/// Verify that the query returns `{}` when `set_current_details` was never +/// called, matching proto3 JSON behavior where default (empty) fields are omitted. +#[tokio::test] +async fn workflow_metadata_query_empty_details() { + let wfid = "workflow_metadata_query_empty_test"; + + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + + let tasks = [ + hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::ToTaskNum(1)), + { + let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::ToTaskNum(1)); + pr.query = Some(WorkflowQuery { + query_type: "__temporal_workflow_metadata".to_string(), + query_args: None, + header: None, + }); + pr.history = Some(Default::default()); + pr + }, + ]; + + let mut mock_cfg = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_worker_client()); + mock_cfg.num_expected_legacy_query_resps = 1; + + mock_cfg.expect_legacy_query_matcher = Box::new(|_, result| { + let LegacyQueryResult::Succeeded(qr) = result else { + panic!("Expected Succeeded legacy query result"); + }; + let Some(query_result::Variant::Succeeded(success)) = &qr.variant else { + panic!("Expected Succeeded query result variant"); + }; + let payload = success + .response + .as_ref() + .expect("Expected a response payload"); + + assert_eq!( + payload.metadata.get("encoding").map(|v| v.as_slice()), + Some(b"json/plain".as_slice()), + "Expected json/plain encoding" + ); + + // With no current_details set the field is omitted per proto3 JSON rules. + assert_eq!( + &payload.data, + b"{}", + "Expected {{}} when current_details is empty, got: {}", + String::from_utf8_lossy(&payload.data) + ); + + true + }); + + let mut worker = build_fake_sdk(mock_cfg); + worker.register_workflow::(); + worker.run().await.unwrap(); +} diff --git a/crates/sdk/src/workflow_context.rs b/crates/sdk/src/workflow_context.rs index 0e6142a8b..ec4f912b7 100644 --- a/crates/sdk/src/workflow_context.rs +++ b/crates/sdk/src/workflow_context.rs @@ -402,6 +402,11 @@ impl BaseWorkflowContext { self.inner.state_mutated.set(true); } + /// Return the current value of current_details. + pub(crate) fn current_details(&self) -> String { + self.inner.shared.borrow().current_details.clone() + } + /// Cancel any cancellable operation by ID fn cancel(&self, cancellable_id: CancellableID) { self.send(RustWfCmd::Cancel(cancellable_id)); @@ -872,6 +877,14 @@ impl SyncWorkflowContext { )) } + /// Set the current details string for this workflow execution. + /// + /// The value is surfaced to the Temporal server UI in real time via the + /// the workflow metadata query. + pub fn set_current_details(&self, details: impl Into) { + self.base.inner.shared.borrow_mut().current_details = details.into(); + } + /// Force a workflow task failure (EX: in order to retry on non-sticky queue) pub fn force_task_fail(&self, with: anyhow::Error) { self.base.send(with.into()); @@ -1096,6 +1109,13 @@ impl WorkflowContext { self.sync.upsert_memo(attr_iter) } + /// Set the current details string for this workflow execution. + /// + /// See [`SyncWorkflowContext::set_current_details`]. + pub fn set_current_details(&self, details: impl Into) { + self.sync.set_current_details(details) + } + /// Force a workflow task failure (EX: in order to retry on non-sticky queue) pub fn force_task_fail(&self, with: anyhow::Error) { self.sync.force_task_fail(with) @@ -1228,6 +1248,8 @@ pub(crate) struct WorkflowContextSharedData { pub(crate) current_deployment_version: Option, pub(crate) search_attributes: SearchAttributes, pub(crate) random_seed: u64, + /// Current details string, surfaced via the workflow metadata query. + pub(crate) current_details: String, } /// A Future that can be cancelled. diff --git a/crates/sdk/src/workflow_context/options.rs b/crates/sdk/src/workflow_context/options.rs index cf9f8df49..678f55cd4 100644 --- a/crates/sdk/src/workflow_context/options.rs +++ b/crates/sdk/src/workflow_context/options.rs @@ -110,10 +110,16 @@ impl ActivityOptions { } .into(), ), - user_metadata: self.summary.map(|s| UserMetadata { - summary: Some(s.into()), - details: None, - }), + user_metadata: self + .summary + .map(|s| { + s.as_json_payload() + .expect("String-to-JSON payload serialization is infallible") + }) + .map(|summary| UserMetadata { + summary: Some(summary), + details: None, + }), } } } @@ -202,7 +208,11 @@ impl LocalActivityOptions { ), user_metadata: self .summary - .and_then(|summary| summary.as_json_payload().ok()) + .map(|summary| { + summary + .as_json_payload() + .expect("String-to-JSON payload serialization is infallible") + }) .map(|summary| UserMetadata { summary: Some(summary), details: None, @@ -253,8 +263,14 @@ impl ChildWorkflowOptions { ) -> WorkflowCommand { let user_metadata = if self.static_summary.is_some() || self.static_details.is_some() { Some(UserMetadata { - summary: self.static_summary.map(Into::into), - details: self.static_details.map(Into::into), + summary: self.static_summary.map(|s| { + s.as_json_payload() + .expect("String-to-JSON payload serialization is infallible") + }), + details: self.static_details.map(|s| { + s.as_json_payload() + .expect("String-to-JSON payload serialization is infallible") + }), }) } else { None diff --git a/crates/sdk/src/workflow_future.rs b/crates/sdk/src/workflow_future.rs index 5cb3bcae0..57cbe45cf 100644 --- a/crates/sdk/src/workflow_future.rs +++ b/crates/sdk/src/workflow_future.rs @@ -16,7 +16,9 @@ use std::{ task::{Context, Poll}, }; use temporalio_common::{ - data_converters::PayloadConverter, + data_converters::{ + GenericPayloadConverter, PayloadConverter, SerializationContext, SerializationContextData, + }, protos::{ coresdk::{ workflow_activation::{ @@ -263,15 +265,40 @@ impl WorkflowFuture { converter: &self.payload_converter, }; - let dispatch_result = match panic::catch_unwind(AssertUnwindSafe(|| { - self.execution.dispatch_query(&query_type, data) - })) { - Ok(r) => r, - Err(e) => Some(Err(anyhow!( - "Panic in query handler: {}", - panic_formatter(e) - ) - .into())), + let dispatch_result = if query_type == "__temporal_workflow_metadata" { + // Mirror the proto JSON shape of temporal.api.sdk.v1.WorkflowMetadata. + // TODO [rust-sdk-branch]: support normal JSON and proto JSON serialization, and this will no longer be necessary. + #[derive(serde::Serialize)] + struct WorkflowMetadataJson { + #[serde( + rename = "currentDetails", + skip_serializing_if = "String::is_empty" + )] + current_details: String, + } + let converter = PayloadConverter::default(); + let ctx = SerializationContext { + data: &SerializationContextData::Workflow, + converter: &converter, + }; + let payload = converter.to_payload( + &ctx, + &WorkflowMetadataJson { + current_details: self.base_ctx.current_details(), + }, + ); + Some(Ok(payload?)) + } else { + match panic::catch_unwind(AssertUnwindSafe(|| { + self.execution.dispatch_query(&query_type, data) + })) { + Ok(r) => r, + Err(e) => Some(Err(anyhow!( + "Panic in query handler: {}", + panic_formatter(e) + ) + .into())), + } }; let response = match dispatch_result {