From bd173e6de24f148beb7400dc5a3ae140c47dae54 Mon Sep 17 00:00:00 2001 From: jmaeagle99 <44687433+jmaeagle99@users.noreply.github.com> Date: Mon, 16 Mar 2026 18:26:53 -0700 Subject: [PATCH] Add external storage drivers to worker heartbeat --- .../api_upstream/openapi/openapiv2.json | 112 ++++++++++++++++- .../api_upstream/openapi/openapiv3.yaml | 118 ++++++++++++++++-- .../temporal/api/history/v1/message.proto | 18 +++ .../temporal/api/worker/v1/message.proto | 54 ++++++++ .../workflowservice/v1/request_response.proto | 13 +- .../include/temporal-sdk-core-c-bridge.h | 1 + crates/sdk-core-c-bridge/src/worker.rs | 14 +++ crates/sdk-core/src/worker/mod.rs | 13 +- .../integ_tests/worker_heartbeat_tests.rs | 44 +++++-- 9 files changed, 362 insertions(+), 25 deletions(-) diff --git a/crates/common/protos/api_upstream/openapi/openapiv2.json b/crates/common/protos/api_upstream/openapi/openapiv2.json index 2f21bedcc..79d7c0c7d 100644 --- a/crates/common/protos/api_upstream/openapi/openapiv2.json +++ b/crates/common/protos/api_upstream/openapi/openapiv2.json @@ -2735,7 +2735,7 @@ }, { "name": "query", - "description": "`query` in ListWorkers is used to filter workers based on worker status info.\nThe following worker status attributes are expected are supported as part of the query:\n* WorkerInstanceKey\n* WorkerIdentity\n* HostName\n* TaskQueue\n* DeploymentName\n* BuildId\n* SdkName\n* SdkVersion\n* StartTime\n* LastHeartbeatTime\n* Status\nCurrently metrics are not supported as a part of ListWorkers query.", + "description": "`query` in ListWorkers is used to filter workers based on worker attributes.\nSupported attributes:\n* WorkerInstanceKey\n* WorkerIdentity\n* HostName\n* TaskQueue\n* DeploymentName\n* BuildId\n* SdkName\n* SdkVersion\n* StartTime\n* Status", "in": "query", "required": false, "type": "string" @@ -7235,7 +7235,7 @@ }, { "name": "query", - "description": "`query` in ListWorkers is used to filter workers based on worker status info.\nThe following worker status attributes are expected are supported as part of the query:\n* WorkerInstanceKey\n* WorkerIdentity\n* HostName\n* TaskQueue\n* DeploymentName\n* BuildId\n* SdkName\n* SdkVersion\n* StartTime\n* LastHeartbeatTime\n* Status\nCurrently metrics are not supported as a part of ListWorkers query.", + "description": "`query` in ListWorkers is used to filter workers based on worker attributes.\nSupported attributes:\n* WorkerInstanceKey\n* WorkerIdentity\n* HostName\n* TaskQueue\n* DeploymentName\n* BuildId\n* SdkName\n* SdkVersion\n* StartTime\n* Status", "in": "query", "required": false, "type": "string" @@ -12040,6 +12040,15 @@ } } }, + "v1DeclinedTargetVersionUpgrade": { + "type": "object", + "properties": { + "deploymentVersion": { + "$ref": "#/definitions/v1WorkerDeploymentVersion" + } + }, + "description": "Wrapper for a target deployment version that the SDK declined to upgrade to.\nSee declined_target_version_upgrade on WorkflowExecutionStartedEventAttributes." + }, "v1DeleteNexusEndpointResponse": { "type": "object" }, @@ -13429,7 +13438,16 @@ "items": { "type": "object", "$ref": "#/definitions/v1WorkerInfo" - } + }, + "description": "Deprecated: Use workers instead. This field returns full WorkerInfo which\nincludes expensive runtime metrics. We will stop populating this field in the future." + }, + "workers": { + "type": "array", + "items": { + "type": "object", + "$ref": "#/definitions/v1WorkerListInfo" + }, + "description": "Limited worker information." }, "nextPageToken": { "type": "string", @@ -15864,6 +15882,15 @@ "v1StopBatchOperationResponse": { "type": "object" }, + "v1StorageDriverInfo": { + "type": "object", + "properties": { + "type": { + "type": "string", + "description": "The type of the driver, required." + } + } + }, "v1StructuredCalendarSpec": { "type": "object", "properties": { @@ -16889,6 +16916,14 @@ "$ref": "#/definitions/v1PluginInfo" }, "description": "Plugins currently in use by this SDK." + }, + "drivers": { + "type": "array", + "items": { + "type": "object", + "$ref": "#/definitions/v1StorageDriverInfo" + }, + "description": "Storage drivers in use by this SDK." } }, "description": "Worker info message, contains information about the worker and its current state.\nAll information is provided by the worker itself." @@ -16927,7 +16962,72 @@ "workerHeartbeat": { "$ref": "#/definitions/v1WorkerHeartbeat" } - } + }, + "description": "Detailed worker information." + }, + "v1WorkerListInfo": { + "type": "object", + "properties": { + "workerInstanceKey": { + "type": "string", + "description": "Worker identifier, should be unique for the namespace.\nIt is distinct from worker identity, which is not necessarily namespace-unique." + }, + "workerIdentity": { + "type": "string", + "description": "Worker identity, set by the client, may not be unique.\nUsually host_name+(user group name)+process_id, but can be overwritten by the user." + }, + "taskQueue": { + "type": "string", + "description": "Task queue this worker is polling for tasks." + }, + "deploymentVersion": { + "$ref": "#/definitions/v1WorkerDeploymentVersion" + }, + "sdkName": { + "type": "string" + }, + "sdkVersion": { + "type": "string" + }, + "status": { + "$ref": "#/definitions/v1WorkerStatus", + "description": "Worker status. Defined by SDK." + }, + "startTime": { + "type": "string", + "format": "date-time", + "title": "Worker start time.\nIt can be used to determine worker uptime. (current time - start time)" + }, + "hostName": { + "type": "string", + "description": "Worker host identifier." + }, + "workerGroupingKey": { + "type": "string", + "title": "Worker grouping identifier. A key to group workers that share the same client+namespace+process.\nThis will be used to build the worker command nexus task queue name:\n\"temporal-sys/worker-commands/{worker_grouping_key}\"" + }, + "processId": { + "type": "string", + "description": "Worker process identifier. This id only needs to be unique\nwithin one host (so using e.g. a unix pid would be appropriate)." + }, + "plugins": { + "type": "array", + "items": { + "type": "object", + "$ref": "#/definitions/v1PluginInfo" + }, + "description": "Plugins currently in use by this SDK." + }, + "drivers": { + "type": "array", + "items": { + "type": "object", + "$ref": "#/definitions/v1StorageDriverInfo" + }, + "description": "Storage drivers in use by this SDK." + } + }, + "description": "Limited worker information returned in the list response.\nWhen adding fields here, ensure that it is also added to WorkerInfo (as it carries the full worker information)." }, "v1WorkerPollerInfo": { "type": "object", @@ -17628,6 +17728,10 @@ "eagerExecutionAccepted": { "type": "boolean", "description": "A boolean indicating whether the SDK has asked to eagerly execute the first workflow task for this workflow and\neager execution was accepted by the server.\nOnly populated by server with version >= 1.29.0." + }, + "declinedTargetVersionUpgrade": { + "$ref": "#/definitions/v1DeclinedTargetVersionUpgrade", + "description": "During a previous run of this workflow, the server may have notified the SDK\nthat the Target Worker Deployment Version changed, but the SDK declined to\nupgrade (e.g., by continuing-as-new with PINNED behavior). This field records\nthe target version that was declined.\n\nThis is a wrapper message to distinguish \"never declined\" (nil wrapper) from\n\"declined an unversioned target\" (non-nil wrapper with nil deployment_version).\n\nUsed internally by the server during continue-as-new and retry.\nShould not be read or interpreted by SDKs." } }, "title": "Always the first event in workflow history" diff --git a/crates/common/protos/api_upstream/openapi/openapiv3.yaml b/crates/common/protos/api_upstream/openapi/openapiv3.yaml index c220f6773..24c527682 100644 --- a/crates/common/protos/api_upstream/openapi/openapiv3.yaml +++ b/crates/common/protos/api_upstream/openapi/openapiv3.yaml @@ -2461,8 +2461,8 @@ paths: - name: query in: query description: |- - `query` in ListWorkers is used to filter workers based on worker status info. - The following worker status attributes are expected are supported as part of the query: + `query` in ListWorkers is used to filter workers based on worker attributes. + Supported attributes: * WorkerInstanceKey * WorkerIdentity * HostName @@ -2472,9 +2472,7 @@ paths: * SdkName * SdkVersion * StartTime - * LastHeartbeatTime * Status - Currently metrics are not supported as a part of ListWorkers query. schema: type: string responses: @@ -6496,8 +6494,8 @@ paths: - name: query in: query description: |- - `query` in ListWorkers is used to filter workers based on worker status info. - The following worker status attributes are expected are supported as part of the query: + `query` in ListWorkers is used to filter workers based on worker attributes. + Supported attributes: * WorkerInstanceKey * WorkerIdentity * HostName @@ -6507,9 +6505,7 @@ paths: * SdkName * SdkVersion * StartTime - * LastHeartbeatTime * Status - Currently metrics are not supported as a part of ListWorkers query. schema: type: string responses: @@ -9230,6 +9226,14 @@ components: data: type: string format: bytes + DeclinedTargetVersionUpgrade: + type: object + properties: + deploymentVersion: + $ref: '#/components/schemas/WorkerDeploymentVersion' + description: |- + Wrapper for a target deployment version that the SDK declined to upgrade to. + See declined_target_version_upgrade on WorkflowExecutionStartedEventAttributes. DeleteNexusEndpointResponse: type: object properties: {} @@ -10550,6 +10554,14 @@ components: type: array items: $ref: '#/components/schemas/WorkerInfo' + description: |- + Deprecated: Use workers instead. This field returns full WorkerInfo which + includes expensive runtime metrics. We will stop populating this field in the future. + workers: + type: array + items: + $ref: '#/components/schemas/WorkerListInfo' + description: Limited worker information. nextPageToken: type: string description: Next page token @@ -14042,6 +14054,12 @@ components: StopBatchOperationResponse: type: object properties: {} + StorageDriverInfo: + type: object + properties: + type: + type: string + description: The type of the driver, required. StructuredCalendarSpec: type: object properties: @@ -15515,6 +15533,11 @@ components: items: $ref: '#/components/schemas/PluginInfo' description: Plugins currently in use by this SDK. + drivers: + type: array + items: + $ref: '#/components/schemas/StorageDriverInfo' + description: Storage drivers in use by this SDK. description: |- Worker info message, contains information about the worker and its current state. All information is provided by the worker itself. @@ -15555,6 +15578,71 @@ components: properties: workerHeartbeat: $ref: '#/components/schemas/WorkerHeartbeat' + description: Detailed worker information. + WorkerListInfo: + type: object + properties: + workerInstanceKey: + type: string + description: |- + Worker identifier, should be unique for the namespace. + It is distinct from worker identity, which is not necessarily namespace-unique. + workerIdentity: + type: string + description: |- + Worker identity, set by the client, may not be unique. + Usually host_name+(user group name)+process_id, but can be overwritten by the user. + taskQueue: + type: string + description: Task queue this worker is polling for tasks. + deploymentVersion: + $ref: '#/components/schemas/WorkerDeploymentVersion' + sdkName: + type: string + sdkVersion: + type: string + status: + enum: + - WORKER_STATUS_UNSPECIFIED + - WORKER_STATUS_RUNNING + - WORKER_STATUS_SHUTTING_DOWN + - WORKER_STATUS_SHUTDOWN + type: string + description: Worker status. Defined by SDK. + format: enum + startTime: + type: string + description: |- + Worker start time. + It can be used to determine worker uptime. (current time - start time) + format: date-time + hostName: + type: string + description: Worker host identifier. + workerGroupingKey: + type: string + description: |- + Worker grouping identifier. A key to group workers that share the same client+namespace+process. + This will be used to build the worker command nexus task queue name: + "temporal-sys/worker-commands/{worker_grouping_key}" + processId: + type: string + description: |- + Worker process identifier. This id only needs to be unique + within one host (so using e.g. a unix pid would be appropriate). + plugins: + type: array + items: + $ref: '#/components/schemas/PluginInfo' + description: Plugins currently in use by this SDK. + drivers: + type: array + items: + $ref: '#/components/schemas/StorageDriverInfo' + description: Storage drivers in use by this SDK. + description: |- + Limited worker information returned in the list response. + When adding fields here, ensure that it is also added to WorkerInfo (as it carries the full worker information). WorkerPollerInfo: type: object properties: @@ -16448,6 +16536,20 @@ components: A boolean indicating whether the SDK has asked to eagerly execute the first workflow task for this workflow and eager execution was accepted by the server. Only populated by server with version >= 1.29.0. + declinedTargetVersionUpgrade: + allOf: + - $ref: '#/components/schemas/DeclinedTargetVersionUpgrade' + description: |- + During a previous run of this workflow, the server may have notified the SDK + that the Target Worker Deployment Version changed, but the SDK declined to + upgrade (e.g., by continuing-as-new with PINNED behavior). This field records + the target version that was declined. + + This is a wrapper message to distinguish "never declined" (nil wrapper) from + "declined an unversioned target" (non-nil wrapper with nil deployment_version). + + Used internally by the server during continue-as-new and retry. + Should not be read or interpreted by SDKs. description: Always the first event in workflow history WorkflowExecutionTerminatedEventAttributes: type: object diff --git a/crates/common/protos/api_upstream/temporal/api/history/v1/message.proto b/crates/common/protos/api_upstream/temporal/api/history/v1/message.proto index 2fb99f7e9..21fb13c5e 100644 --- a/crates/common/protos/api_upstream/temporal/api/history/v1/message.proto +++ b/crates/common/protos/api_upstream/temporal/api/history/v1/message.proto @@ -184,6 +184,24 @@ message WorkflowExecutionStartedEventAttributes { // eager execution was accepted by the server. // Only populated by server with version >= 1.29.0. bool eager_execution_accepted = 38; + + // During a previous run of this workflow, the server may have notified the SDK + // that the Target Worker Deployment Version changed, but the SDK declined to + // upgrade (e.g., by continuing-as-new with PINNED behavior). This field records + // the target version that was declined. + // + // This is a wrapper message to distinguish "never declined" (nil wrapper) from + // "declined an unversioned target" (non-nil wrapper with nil deployment_version). + // + // Used internally by the server during continue-as-new and retry. + // Should not be read or interpreted by SDKs. + DeclinedTargetVersionUpgrade declined_target_version_upgrade = 40; +} + +// Wrapper for a target deployment version that the SDK declined to upgrade to. +// See declined_target_version_upgrade on WorkflowExecutionStartedEventAttributes. +message DeclinedTargetVersionUpgrade { + temporal.api.deployment.v1.WorkerDeploymentVersion deployment_version = 1; } message WorkflowExecutionCompletedEventAttributes { diff --git a/crates/common/protos/api_upstream/temporal/api/worker/v1/message.proto b/crates/common/protos/api_upstream/temporal/api/worker/v1/message.proto index 3df3aaa33..b65faeb29 100644 --- a/crates/common/protos/api_upstream/temporal/api/worker/v1/message.proto +++ b/crates/common/protos/api_upstream/temporal/api/worker/v1/message.proto @@ -127,15 +127,69 @@ message WorkerHeartbeat { // Plugins currently in use by this SDK. repeated PluginInfo plugins = 23; + + // Storage drivers in use by this SDK. + repeated StorageDriverInfo drivers = 24; } +// Detailed worker information. message WorkerInfo { WorkerHeartbeat worker_heartbeat = 1; } +// Limited worker information returned in the list response. +// When adding fields here, ensure that it is also added to WorkerInfo (as it carries the full worker information). +message WorkerListInfo { + // Worker identifier, should be unique for the namespace. + // It is distinct from worker identity, which is not necessarily namespace-unique. + string worker_instance_key = 1; + + // Worker identity, set by the client, may not be unique. + // Usually host_name+(user group name)+process_id, but can be overwritten by the user. + string worker_identity = 2; + + // Task queue this worker is polling for tasks. + string task_queue = 3; + + temporal.api.deployment.v1.WorkerDeploymentVersion deployment_version = 4; + + string sdk_name = 5; + string sdk_version = 6; + + // Worker status. Defined by SDK. + temporal.api.enums.v1.WorkerStatus status = 7; + + // Worker start time. + // It can be used to determine worker uptime. (current time - start time) + google.protobuf.Timestamp start_time = 8; + + // Worker host identifier. + string host_name = 9; + + // Worker grouping identifier. A key to group workers that share the same client+namespace+process. + // This will be used to build the worker command nexus task queue name: + // "temporal-sys/worker-commands/{worker_grouping_key}" + string worker_grouping_key = 10; + + // Worker process identifier. This id only needs to be unique + // within one host (so using e.g. a unix pid would be appropriate). + string process_id = 11; + + // Plugins currently in use by this SDK. + repeated PluginInfo plugins = 12; + + // Storage drivers in use by this SDK. + repeated StorageDriverInfo drivers = 13; +} + message PluginInfo { // The name of the plugin, required. string name = 1; // The version of the plugin, may be empty. string version = 2; } + +message StorageDriverInfo { + // The type of the driver, required. + string type = 1; +} diff --git a/crates/common/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto b/crates/common/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto index 4bf3272c6..651816e37 100644 --- a/crates/common/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto +++ b/crates/common/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto @@ -2560,8 +2560,8 @@ message ListWorkersRequest { int32 page_size = 2; bytes next_page_token = 3; - // `query` in ListWorkers is used to filter workers based on worker status info. - // The following worker status attributes are expected are supported as part of the query: + // `query` in ListWorkers is used to filter workers based on worker attributes. + // Supported attributes: //* WorkerInstanceKey //* WorkerIdentity //* HostName @@ -2571,14 +2571,17 @@ message ListWorkersRequest { //* SdkName //* SdkVersion //* StartTime - //* LastHeartbeatTime //* Status - // Currently metrics are not supported as a part of ListWorkers query. string query = 4; } message ListWorkersResponse { - repeated temporal.api.worker.v1.WorkerInfo workers_info = 1; + // Deprecated: Use workers instead. This field returns full WorkerInfo which + // includes expensive runtime metrics. We will stop populating this field in the future. + repeated temporal.api.worker.v1.WorkerInfo workers_info = 1 [deprecated = true]; + + // Limited worker information. + repeated temporal.api.worker.v1.WorkerListInfo workers = 3; // Next page token bytes next_page_token = 2; diff --git a/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h b/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h index 25c38f946..ed03dd47a 100644 --- a/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h +++ b/crates/sdk-core-c-bridge/include/temporal-sdk-core-c-bridge.h @@ -794,6 +794,7 @@ typedef struct TemporalCoreWorkerOptions { bool nondeterminism_as_workflow_fail; struct TemporalCoreByteArrayRefArray nondeterminism_as_workflow_fail_for_types; struct TemporalCoreByteArrayRefArray plugins; + struct TemporalCoreByteArrayRefArray storage_drivers; } TemporalCoreWorkerOptions; /** diff --git a/crates/sdk-core-c-bridge/src/worker.rs b/crates/sdk-core-c-bridge/src/worker.rs index d47f02ae5..7e84e3ccf 100644 --- a/crates/sdk-core-c-bridge/src/worker.rs +++ b/crates/sdk-core-c-bridge/src/worker.rs @@ -51,6 +51,7 @@ pub struct WorkerOptions { pub nondeterminism_as_workflow_fail: bool, pub nondeterminism_as_workflow_fail_for_types: ByteArrayRefArray, pub plugins: ByteArrayRefArray, + pub storage_drivers: ByteArrayRefArray, } #[repr(C)] @@ -1260,6 +1261,19 @@ impl TryFrom<&WorkerOptions> for temporalio_sdk_core::WorkerConfig { ) .collect::>(), ) + .storage_drivers( + opt.storage_drivers + .to_str_vec() + .into_iter() + .collect::>() + .into_iter() + .map(|r#type| { + temporalio_common::protos::temporal::api::worker::v1::StorageDriverInfo { + r#type: r#type.to_owned(), + } + }) + .collect::>(), + ) .build() .map_err(|err| anyhow::anyhow!(err)) } diff --git a/crates/sdk-core/src/worker/mod.rs b/crates/sdk-core/src/worker/mod.rs index 651eed8fd..9e70b5b47 100644 --- a/crates/sdk-core/src/worker/mod.rs +++ b/crates/sdk-core/src/worker/mod.rs @@ -13,7 +13,10 @@ use temporalio_common::{ ActivitySlotInfo, LocalActivitySlotInfo, NamespaceInfo, NexusSlotInfo, WorkflowSlotInfo, activity_result::ActivityExecutionResult, namespace_info, }, - temporal::api::{enums::v1::VersioningBehavior, worker::v1::PluginInfo}, + temporal::api::{ + enums::v1::VersioningBehavior, + worker::v1::{PluginInfo, StorageDriverInfo}, + }, }, telemetry::TelemetryInstance, worker::{WorkerDeploymentOptions, WorkerDeploymentVersion}, @@ -265,6 +268,10 @@ pub struct WorkerConfig { /// Skips the single worker+client+namespace+task_queue check #[builder(default = false)] pub skip_client_worker_set_check: bool, + + /// List of storage drivers used by lang. + #[builder(default)] + pub storage_drivers: HashSet, } impl WorkerConfig { @@ -1992,6 +1999,9 @@ impl WorkerHeartbeatManager { let mut plugins: Vec<_> = config.plugins.clone().into_iter().collect(); plugins.sort_by(|a, b| a.name.cmp(&b.name)); + let mut drivers: Vec<_> = config.storage_drivers.clone().into_iter().collect(); + drivers.sort_by(|a, b| a.r#type.cmp(&b.r#type)); + let mut worker_heartbeat = WorkerHeartbeat { worker_instance_key: worker_instance_key.to_string(), host_info: Some(WorkerHostInfo { @@ -2011,6 +2021,7 @@ impl WorkerHeartbeatManager { status: (*heartbeat_manager_metrics.status.read()) as i32, start_time, plugins, + drivers, // Some Metrics dependent fields are set below, and // some fields like sdk_name, sdk_version, and worker_identity, must be set by diff --git a/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs b/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs index 41a03c232..9f10f37a1 100644 --- a/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs @@ -24,7 +24,7 @@ use temporalio_common::{ temporal::api::{ common::v1::RetryPolicy, enums::v1::WorkerStatus, - worker::v1::{PluginInfo, WorkerHeartbeat}, + worker::v1::{PluginInfo, StorageDriverInfo, WorkerHeartbeat}, workflowservice::v1::{DescribeWorkerRequest, ListWorkersRequest}, }, }, @@ -74,7 +74,7 @@ fn to_system_time(ts: Timestamp) -> SystemTime { async fn list_worker_heartbeats(client: &Client, query: impl Into) -> Vec { let mut raw_client = client.clone(); - WorkflowService::list_workers( + let response = WorkflowService::list_workers( &mut raw_client, ListWorkersRequest { namespace: client.namespace().to_owned(), @@ -86,11 +86,13 @@ async fn list_worker_heartbeats(client: &Client, query: impl Into) -> Ve ) .await .unwrap() - .into_inner() - .workers_info - .into_iter() - .filter_map(|info| info.worker_heartbeat) - .collect() + .into_inner(); + #[allow(deprecated)] + let workers_info = response.workers_info; + workers_info + .into_iter() + .filter_map(|info| info.worker_heartbeat) + .collect() } // Tests that rely on Prometheus running in a docker container need to start @@ -149,6 +151,16 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b ] .into_iter() .collect(); + c.storage_drivers = vec![ + StorageDriverInfo { + r#type: "driver1".to_string(), + }, + StorageDriverInfo { + r#type: "driver2".to_string(), + }, + ] + .into_iter() + .collect(); }); let acts_started = Arc::new(Notify::new()); let acts_done = Arc::new(Notify::new()); @@ -229,6 +241,7 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b .await .unwrap() .into_inner(); + #[allow(deprecated)] let worker_info = workers_list .workers_info .iter() @@ -271,6 +284,7 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b .into_inner(); // Since list_workers finds all workers in the namespace, must find specific worker used in this // test + #[allow(deprecated)] let worker_info = workers_list .workers_info .iter() @@ -376,6 +390,7 @@ async fn docker_worker_heartbeat_tuner() { .into_inner(); // Since list_workers finds all workers in the namespace, must find specific worker used in this // test + #[allow(deprecated)] let worker_info = workers_list .workers_info .iter() @@ -573,6 +588,17 @@ fn after_shutdown_checks( } ] ); + assert_eq!( + heartbeat.drivers, + vec![ + StorageDriverInfo { + r#type: "driver1".to_string() + }, + StorageDriverInfo { + r#type: "driver2".to_string() + } + ] + ); } static HISTORY_WF1_ACTIVITY_STARTED: Notify = Notify::const_new(); @@ -945,6 +971,7 @@ async fn worker_heartbeat_failure_metrics() { .await .unwrap() .into_inner(); + #[allow(deprecated)] let worker_info = workers_list .workers_info .iter() @@ -990,6 +1017,7 @@ async fn worker_heartbeat_failure_metrics() { .await .unwrap() .into_inner(); + #[allow(deprecated)] let worker_info = workers_list .workers_info .iter() @@ -1102,6 +1130,7 @@ async fn worker_heartbeat_no_runtime_heartbeat() { .into_inner(); // Ensure worker has not ever heartbeated + #[allow(deprecated)] let heartbeat = workers_list.workers_info.iter().find(|worker_info| { if let Some(hb) = worker_info.worker_heartbeat.as_ref() { hb.worker_instance_key == worker_instance_key.to_string() @@ -1174,6 +1203,7 @@ async fn worker_heartbeat_skip_client_worker_set_check() { .into_inner(); // Ensure worker still heartbeats + #[allow(deprecated)] let heartbeat = workers_list.workers_info.iter().find(|worker_info| { if let Some(hb) = worker_info.worker_heartbeat.as_ref() { hb.worker_instance_key == worker_instance_key.to_string()