Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ jobs:
run: pnpm tsx scripts/publish-to-verdaccio.ts --registry-dir ${{ steps.tmp-dir.outputs.dir }}/npm-registry

- name: Install Temporal CLI
uses: temporalio/setup-temporal@1059a504f87e7fa2f385e3fa40d1aa7e62f1c6ca # v0
uses: temporalio/setup-temporal@8bde337644eaaa6644b8b527d9a8406c2207de5b # v0
with:
version: ${{ env.TESTS_CLI_VERSION }}

- name: Run Temporal CLI
working-directory: ${{ runner.temp }}
Expand All @@ -201,6 +203,8 @@ jobs:
--dynamic-config-value history.enableRequestIdRefLinks=true \
--dynamic-config-value frontend.activityAPIsEnabled=true \
--dynamic-config-value activity.enableStandalone=true \
--dynamic-config-value activity.startDelayEnabled=true \
--dynamic-config-value 'activity.longPollTimeout="5000ms"' \
--dynamic-config-value history.enableChasm=true \
--dynamic-config-value history.enableTransitionHistory=true &

Expand Down
10 changes: 9 additions & 1 deletion packages/client/src/activity-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
decompileRetryPolicy,
} from '@temporalio/common';
import type { Duration } from '@temporalio/common/lib/time';
import { msOptionalToTs, optionalTsToDate, optionalTsToMs } from '@temporalio/common/lib/time';
import { msOptionalToTs, msToNumber, optionalTsToDate, optionalTsToMs } from '@temporalio/common/lib/time';
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
import {
decodeTypedSearchAttributes,
Expand Down Expand Up @@ -309,6 +309,7 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi
header: { fields: input.headers },
userMetadata: await encodeUserMetadata(this.dataConverter, input.options.summary, undefined),
priority: input.options.priority ? compilePriority(input.options.priority) : undefined,
startDelay: msOptionalToTs(input.options.startDelay),
};
}

Expand Down Expand Up @@ -546,6 +547,10 @@ export interface ActivityOptions {
* Priority to use when starting this activity.
*/
priority?: Priority;
/**
* Time to wait before dispatching the first activity task. This delay is not applied to retry attempts.
*/
startDelay?: Duration;
/**
* Specifies behavior if there's a *closed* activity with the same ID.
*/
Expand All @@ -571,6 +576,9 @@ function validateActivityOptions(options: ActivityOptions): void {
if (!options.scheduleToCloseTimeout && !options.startToCloseTimeout) {
throw new TypeError('Either scheduleToCloseTimeout or startToCloseTimeout is required');
}
if (options.startDelay !== undefined && msToNumber(options.startDelay) < 0) {
throw new TypeError('startDelay must be non-negative');
}
}

function buildActivityExecutionInfoCommonPart(
Expand Down
34 changes: 30 additions & 4 deletions packages/test/src/test-standalone-activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import anyTest from 'ava';
import * as rxjs from 'rxjs';
import type { ActivityHandle, TypedActivityClient, ActivityOptions } from '@temporalio/client';
import {
ActivityExecutionStatus,
ActivityExecutionAlreadyStartedError,
ActivityExecutionFailedError,
ServiceError,
Expand All @@ -16,7 +17,7 @@ import type { TestWorkflowEnvironment } from './helpers';
import { RUN_INTEGRATION_TESTS, waitUntil, Worker } from './helpers';
import { echo, throwAnError } from './activities';
import { heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details';
import { createLocalTestEnvironment } from './helpers-integration';
import { createTestWorkflowEnvironment } from './helpers-integration';

// Use a reduced server long-poll expiration timeout, in order to confirm that client
// polling/retry strategies result in the expected behavior
Expand Down Expand Up @@ -78,9 +79,14 @@ async function waitForValue<T>(subject: rxjs.Subject<T>, value: T) {

if (RUN_INTEGRATION_TESTS) {
test.before(async (t) => {
const env = await createLocalTestEnvironment({
const env = await createTestWorkflowEnvironment({
server: {
extraArgs: ['--dynamic-config-value', `activity.longPollTimeout="${LONG_POLL_TIMEOUT_MS}ms"`],
extraArgs: [
'--dynamic-config-value',
`activity.longPollTimeout="${LONG_POLL_TIMEOUT_MS}ms"`,
'--dynamic-config-value',
'activity.startDelayEnabled=true',
],
},
});

Expand Down Expand Up @@ -118,8 +124,8 @@ if (RUN_INTEGRATION_TESTS) {

test.after.always(async (t) => {
t.context.worker.shutdown();
await t.context.env.teardown();
await t.context.runPromise;
await t.context.env.teardown();
});

test('Get activity result - success', async (t) => {
Expand Down Expand Up @@ -175,6 +181,26 @@ if (RUN_INTEGRATION_TESTS) {
t.is(err?.cause?.message, 'failure');
});

test('Start activity with start delay', async (t) => {
const client = t.context.env.client.activity;
const activityId = randomUUID();
const startDelayMs = 2000;
const handle = await client.start('echo', {
...defaultOptions,
id: activityId,
args: ['hello'],
startDelay: startDelayMs,
});

t.is(await handle.result(), 'hello');

const description = await handle.describe();
t.is(description.status, ActivityExecutionStatus.COMPLETED);
t.truthy(description.scheduleTime);
t.truthy(description.lastStartedTime);
t.true(description.lastStartedTime!.getTime() - description.scheduleTime!.getTime() >= startDelayMs - 500);
});

test('Describe activity from start handle', async (t) => {
const client = t.context.env.client.activity;
const activityId = randomUUID();
Expand Down
Loading