Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 68 additions & 71 deletions src/integration-tests/TaskRunner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
} from "../sdk";
import { cleanupWorkflowsAndTasks } from "./utils/cleanup";
import { waitForWorkflowStatus } from "./utils/waitForWorkflowStatus";
import { describeForOrkesV5 } from "./utils/customJestDescribe";

describe("TaskRunner", () => {
const clientPromise = orkesConductorClient();
Expand All @@ -27,87 +26,85 @@ describe("TaskRunner", () => {
);
});

describeForOrkesV5("worker example (requires update-v2)", () => {
test("worker example ", async () => {
const client = await clientPromise;
const executor = new WorkflowExecutor(client);
const taskName = `jsSdkTest-task-manager-int-test-${Date.now()}`;
const workflowName = `jsSdkTest-task-manager-int-test-wf-${Date.now()}`;
test("worker example ", async () => {
const client = await clientPromise;
const executor = new WorkflowExecutor(client);
const taskName = `jsSdkTest-task-manager-int-test-${Date.now()}`;
const workflowName = `jsSdkTest-task-manager-int-test-wf-${Date.now()}`;

const taskRunner = new TaskRunner({
client: client,
worker: {
taskDefName: taskName,
execute: async () => {
return {
outputData: {
hello: "From your worker",
},
status: "COMPLETED",
};
},
},
options: {
pollInterval: 1000,
domain: undefined,
concurrency: 2,
workerID: "",
const taskRunner = new TaskRunner({
client: client,
worker: {
taskDefName: taskName,
execute: async () => {
return {
outputData: {
hello: "From your worker",
},
status: "COMPLETED",
};
},
});
taskRunner.startPolling();
},
options: {
pollInterval: 1000,
domain: undefined,
concurrency: 2,
workerID: "",
},
});
taskRunner.startPolling();

expect(taskRunner.isPolling).toEqual(true);
expect(taskRunner.isPolling).toEqual(true);

await executor.registerWorkflow(true, {
await executor.registerWorkflow(true, {
name: workflowName,
version: 1,
ownerEmail: "developers@orkes.io",
tasks: [simpleTask(taskName, taskName, {})],
inputParameters: [],
outputParameters: {},
timeoutSeconds: 0,
});
workflowsToCleanup.push({ name: workflowName, version: 1 });

const { workflowId: executionId } = await executor.executeWorkflow(
{
name: workflowName,
version: 1,
ownerEmail: "developers@orkes.io",
tasks: [simpleTask(taskName, taskName, {})],
inputParameters: [],
outputParameters: {},
timeoutSeconds: 0,
});
workflowsToCleanup.push({ name: workflowName, version: 1 });

const { workflowId: executionId } = await executor.executeWorkflow(
{
name: workflowName,
version: 1,
},
workflowName,
1,
`${workflowName}-id`
);
expect(executionId).toBeDefined();
},
workflowName,
1,
`${workflowName}-id`
);
expect(executionId).toBeDefined();

taskRunner.updateOptions({ concurrency: 1, pollInterval: 100 });
taskRunner.updateOptions({ concurrency: 1, pollInterval: 100 });

expect(executionId).toBeDefined();
if (!executionId) {
throw new Error("Execution ID is undefined");
}
expect(executionId).toBeDefined();
if (!executionId) {
throw new Error("Execution ID is undefined");
}

const workflowStatus = await waitForWorkflowStatus(
executor,
executionId,
"COMPLETED"
);
const workflowStatus = await waitForWorkflowStatus(
executor,
executionId,
"COMPLETED"
);

const [firstTask] = workflowStatus.tasks || [];
expect(firstTask?.taskType).toEqual(taskName);
expect(workflowStatus.status).toEqual("COMPLETED");
const [firstTask] = workflowStatus.tasks || [];
expect(firstTask?.taskType).toEqual(taskName);
expect(workflowStatus.status).toEqual("COMPLETED");

await taskRunner.stopPolling();
await taskRunner.stopPolling();

expect(taskRunner.isPolling).toEqual(false);
const taskDetails = await executor.getTask(firstTask?.taskId || "");
expect(taskDetails?.status).toEqual("COMPLETED");
expect(taskRunner.isPolling).toEqual(false);
const taskDetails = await executor.getTask(firstTask?.taskId || "");
expect(taskDetails?.status).toEqual("COMPLETED");

const metadataClient = new OrkesClients(client).getMetadataClient();
await cleanupWorkflowsAndTasks(metadataClient, {
workflows: [{ name: workflowName, version: 1 }],
tasks: [taskName],
});
}, 120000);
});
const metadataClient = new OrkesClients(client).getMetadataClient();
await cleanupWorkflowsAndTasks(metadataClient, {
workflows: [{ name: workflowName, version: 1 }],
tasks: [taskName],
});
}, 120000);
});
53 changes: 53 additions & 0 deletions src/integration-tests/WorkerRegistration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
MetadataClient,
NonRetryableException,
TaskHandler,
TaskRunner,
WorkflowExecutor,
clearWorkerRegistry,
getRegisteredWorkers,
Expand Down Expand Up @@ -668,4 +669,56 @@ describe("SDK Worker Registration", () => {

expect(getRegisteredWorkers().length).toBe(0);
});

describeForOrkesV5("update-v2 endpoint verification", () => {
test("SDK detects and uses /api/tasks/update-v2 on v5 server", async () => {
const client = await clientPromise;
const taskName = `sdk_test_update_v2_verify_${Date.now()}`;
const workflowName = `sdk_test_update_v2_verify_wf_${Date.now()}`;

// Reset the static probe so this test measures a live response from the server,
// regardless of what earlier tests in this shard may have set.
(TaskRunner as unknown as { updateV2Available: boolean | null }).updateV2Available = null;

const taskRunner = new TaskRunner({
client,
worker: {
taskDefName: taskName,
execute: async () => ({ outputData: {}, status: "COMPLETED" }),
},
options: { pollInterval: 100, concurrency: 1, workerID: "" },
});
taskRunner.startPolling();

await executor.registerWorkflow(true, {
name: workflowName,
version: 1,
ownerEmail: "developers@orkes.io",
tasks: [simpleTask(taskName, taskName, {})],
inputParameters: [],
outputParameters: {},
timeoutSeconds: 0,
});
workflowsToCleanup.push({ name: workflowName, version: 1 });

const executionId = await executor.startWorkflow({
name: workflowName,
input: {},
version: 1,
});

if (!executionId) throw new Error("Execution ID is undefined");

await waitForWorkflowStatus(executor, executionId, "COMPLETED");
taskRunner.stopPolling();

// The workflow only reaches COMPLETED if a task update was accepted.
// This assertion verifies the SDK used /api/tasks/update-v2 (not the v4
// legacy /api/tasks fallback) — if update-v2 broke silently and the SDK
// fell back, this would be false.
expect(
(TaskRunner as unknown as { updateV2Available: boolean | null }).updateV2Available
).toBe(true);
}, 60000);
});
});
125 changes: 61 additions & 64 deletions src/integration-tests/readme.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
} from "../sdk";
import { TaskType } from "../open-api";
import { waitForWorkflowStatus } from "./utils/waitForWorkflowStatus";
import { describeForOrkesV5 } from "./utils/customJestDescribe";

describe("TaskManager", () => {
const clientPromise = orkesConductorClient();
Expand All @@ -27,71 +26,69 @@ describe("TaskManager", () => {
);
});

describeForOrkesV5("worker example (requires update-v2)", () => {
test("worker example ", async () => {
const client = await clientPromise;
const executor = new WorkflowExecutor(client);
const workflowName = `jsSdkTest-my_first_js_wf-${Date.now()}`;
const taskName = `jsSdkTest-taskmanager-test-${Date.now()}`;

const taskRunner = new TaskRunner({
client: client,
worker: {
taskDefName: taskName,
execute: async () => {
return {
outputData: {
hello: "From your worker",
},
status: "COMPLETED",
};
},
},
options: {
pollInterval: 10,
domain: undefined,
concurrency: 1,
workerID: "",
test("worker example ", async () => {
const client = await clientPromise;
const executor = new WorkflowExecutor(client);
const workflowName = `jsSdkTest-my_first_js_wf-${Date.now()}`;
const taskName = `jsSdkTest-taskmanager-test-${Date.now()}`;

const taskRunner = new TaskRunner({
client: client,
worker: {
taskDefName: taskName,
execute: async () => {
return {
outputData: {
hello: "From your worker",
},
status: "COMPLETED",
};
},
});
taskRunner.startPolling();
},
options: {
pollInterval: 10,
domain: undefined,
concurrency: 1,
workerID: "",
},
});
taskRunner.startPolling();

await executor.registerWorkflow(true, {
name: workflowName,
version: 1,
ownerEmail: "developers@orkes.io",
tasks: [simpleTask(taskName, taskName, {})],
inputParameters: [],
outputParameters: {},
timeoutSeconds: 0,
});
workflowsToCleanup.push({ name: workflowName, version: 1 });

const executionId = await executor.startWorkflow({
name: workflowName,
input: {},
version: 1,
});

if (!executionId) {
throw new Error("Execution ID is undefined");
}

const workflowStatus = await waitForWorkflowStatus(
executor,
executionId,
"COMPLETED"
);

const [firstTask] = workflowStatus.tasks || [];
expect(firstTask?.taskType).toEqual(taskName);
expect(workflowStatus.status).toEqual("COMPLETED");

taskRunner.stopPolling();
const taskDetails = await executor.getTask(firstTask?.taskId || "");
expect(taskDetails?.status).toEqual("COMPLETED");
}, 120000);
});
await executor.registerWorkflow(true, {
name: workflowName,
version: 1,
ownerEmail: "developers@orkes.io",
tasks: [simpleTask(taskName, taskName, {})],
inputParameters: [],
outputParameters: {},
timeoutSeconds: 0,
});
workflowsToCleanup.push({ name: workflowName, version: 1 });

const executionId = await executor.startWorkflow({
name: workflowName,
input: {},
version: 1,
});

if (!executionId) {
throw new Error("Execution ID is undefined");
}

const workflowStatus = await waitForWorkflowStatus(
executor,
executionId,
"COMPLETED"
);

const [firstTask] = workflowStatus.tasks || [];
expect(firstTask?.taskType).toEqual(taskName);
expect(workflowStatus.status).toEqual("COMPLETED");

taskRunner.stopPolling();
const taskDetails = await executor.getTask(firstTask?.taskId || "");
expect(taskDetails?.status).toEqual("COMPLETED");
}, 120000);

test("update task example ", async () => {
const client = await clientPromise;
Expand Down
Loading
Loading