diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 35c24ed3d..03db6842a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -187,7 +187,9 @@ jobs: run: pnpm tsx scripts/publish-to-verdaccio.ts --registry-dir ${{ steps.tmp-dir.outputs.dir }}/npm-registry - name: Install Temporal CLI - uses: temporalio/setup-temporal@1059a504f87e7fa2f385e3fa40d1aa7e62f1c6ca # v0 + uses: temporalio/setup-temporal@8bde337644eaaa6644b8b527d9a8406c2207de5b # v0 + with: + version: ${{ env.TESTS_CLI_VERSION }} - name: Run Temporal CLI working-directory: ${{ runner.temp }} @@ -201,6 +203,8 @@ jobs: --dynamic-config-value history.enableRequestIdRefLinks=true \ --dynamic-config-value frontend.activityAPIsEnabled=true \ --dynamic-config-value activity.enableStandalone=true \ + --dynamic-config-value activity.startDelayEnabled=true \ + --dynamic-config-value 'activity.longPollTimeout="5000ms"' \ --dynamic-config-value history.enableChasm=true \ --dynamic-config-value history.enableTransitionHistory=true & diff --git a/packages/client/src/activity-client.ts b/packages/client/src/activity-client.ts index 89709f169..7bc83b26e 100644 --- a/packages/client/src/activity-client.ts +++ b/packages/client/src/activity-client.ts @@ -17,7 +17,7 @@ import { decompileRetryPolicy, } from '@temporalio/common'; import type { Duration } from '@temporalio/common/lib/time'; -import { msOptionalToTs, optionalTsToDate, optionalTsToMs } from '@temporalio/common/lib/time'; +import { msOptionalToTs, msToNumber, optionalTsToDate, optionalTsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { decodeTypedSearchAttributes, @@ -309,6 +309,7 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi header: { fields: input.headers }, userMetadata: await encodeUserMetadata(this.dataConverter, input.options.summary, undefined), priority: input.options.priority ? compilePriority(input.options.priority) : undefined, + startDelay: msOptionalToTs(input.options.startDelay), }; } @@ -546,6 +547,10 @@ export interface ActivityOptions { * Priority to use when starting this activity. */ priority?: Priority; + /** + * Time to wait before dispatching the first activity task. This delay is not applied to retry attempts. + */ + startDelay?: Duration; /** * Specifies behavior if there's a *closed* activity with the same ID. */ @@ -571,6 +576,9 @@ function validateActivityOptions(options: ActivityOptions): void { if (!options.scheduleToCloseTimeout && !options.startToCloseTimeout) { throw new TypeError('Either scheduleToCloseTimeout or startToCloseTimeout is required'); } + if (options.startDelay !== undefined && msToNumber(options.startDelay) < 0) { + throw new TypeError('startDelay must be non-negative'); + } } function buildActivityExecutionInfoCommonPart( diff --git a/packages/test/src/test-standalone-activities.ts b/packages/test/src/test-standalone-activities.ts index 39397bb11..04ddb0769 100644 --- a/packages/test/src/test-standalone-activities.ts +++ b/packages/test/src/test-standalone-activities.ts @@ -4,6 +4,7 @@ import anyTest from 'ava'; import * as rxjs from 'rxjs'; import type { ActivityHandle, TypedActivityClient, ActivityOptions } from '@temporalio/client'; import { + ActivityExecutionStatus, ActivityExecutionAlreadyStartedError, ActivityExecutionFailedError, ServiceError, @@ -16,7 +17,7 @@ import type { TestWorkflowEnvironment } from './helpers'; import { RUN_INTEGRATION_TESTS, waitUntil, Worker } from './helpers'; import { echo, throwAnError } from './activities'; import { heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details'; -import { createLocalTestEnvironment } from './helpers-integration'; +import { createTestWorkflowEnvironment } from './helpers-integration'; // Use a reduced server long-poll expiration timeout, in order to confirm that client // polling/retry strategies result in the expected behavior @@ -78,9 +79,14 @@ async function waitForValue(subject: rxjs.Subject, value: T) { if (RUN_INTEGRATION_TESTS) { test.before(async (t) => { - const env = await createLocalTestEnvironment({ + const env = await createTestWorkflowEnvironment({ server: { - extraArgs: ['--dynamic-config-value', `activity.longPollTimeout="${LONG_POLL_TIMEOUT_MS}ms"`], + extraArgs: [ + '--dynamic-config-value', + `activity.longPollTimeout="${LONG_POLL_TIMEOUT_MS}ms"`, + '--dynamic-config-value', + 'activity.startDelayEnabled=true', + ], }, }); @@ -118,8 +124,8 @@ if (RUN_INTEGRATION_TESTS) { test.after.always(async (t) => { t.context.worker.shutdown(); - await t.context.env.teardown(); await t.context.runPromise; + await t.context.env.teardown(); }); test('Get activity result - success', async (t) => { @@ -175,6 +181,26 @@ if (RUN_INTEGRATION_TESTS) { t.is(err?.cause?.message, 'failure'); }); + test('Start activity with start delay', async (t) => { + const client = t.context.env.client.activity; + const activityId = randomUUID(); + const startDelayMs = 2000; + const handle = await client.start('echo', { + ...defaultOptions, + id: activityId, + args: ['hello'], + startDelay: startDelayMs, + }); + + t.is(await handle.result(), 'hello'); + + const description = await handle.describe(); + t.is(description.status, ActivityExecutionStatus.COMPLETED); + t.truthy(description.scheduleTime); + t.truthy(description.lastStartedTime); + t.true(description.lastStartedTime!.getTime() - description.scheduleTime!.getTime() >= startDelayMs - 500); + }); + test('Describe activity from start handle', async (t) => { const client = t.context.env.client.activity; const activityId = randomUUID();