Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 54 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ await workflow.register();

**Step 2: Write a worker**

Workers are TypeScript functions decorated with `@worker` that poll Conductor for tasks and execute them.
Workers are TypeScript functions decorated with `@worker` that poll Conductor for tasks and execute them. The example below uses the legacy decorator style (standalone function). See [Workers](#workers) for the new TypeScript 5.0+ decorator style (class methods).

```typescript
import { worker } from "@io-orkes/conductor-javascript";
Expand Down Expand Up @@ -215,30 +215,77 @@ All of these are type-safe, composable, and registered to the server as JSON —

## Workers

Workers are TypeScript functions that execute Conductor tasks. Decorate any function with `@worker` to register it as a worker (auto-discovered by `TaskHandler`) and use it as a workflow task.
Workers are TypeScript functions that execute Conductor tasks. Decorate functions with `@worker` to register them as workers (auto-discovered by `TaskHandler`) and use them as workflow tasks.

The SDK supports **both** decorator styles:

### Option 1: New decorators (TypeScript 5.0+)

Use class methods with the new Stage 3 decorators. No `experimentalDecorators` needed — remove it from your `tsconfig.json`.

```typescript
import { worker, TaskHandler } from "@io-orkes/conductor-javascript";
import type { Task } from "@io-orkes/conductor-javascript";

class Workers {
@worker({ taskDefName: "greet", concurrency: 5, pollInterval: 100 })
async greet(task: Task) {
return {
status: "COMPLETED" as const,
outputData: { result: `Hello ${task.inputData?.name ?? "World"}` },
};
}

@worker({ taskDefName: "process_payment", domain: "payments" })
async processPayment(task: Task) {
const result = await paymentGateway.charge(task.inputData.customerId, task.inputData.amount);
return { status: "COMPLETED" as const, outputData: { transactionId: result.id } };
}
}

// Class definition triggers decorators — workers are registered
void new Workers();

const handler = new TaskHandler({ client, scanForDecorated: true });
await handler.startWorkers();
```

### Option 2: Legacy decorators (experimentalDecorators)

Use standalone functions. Add `"experimentalDecorators": true` to your `tsconfig.json`.

```typescript
import { worker, TaskHandler } from "@io-orkes/conductor-javascript";
import type { Task } from "@io-orkes/conductor-javascript";

@worker({ taskDefName: "greet", concurrency: 5, pollInterval: 100 })
async function greet(task: Task) {
return {
status: "COMPLETED",
outputData: { result: `Hello ${task.inputData.name}` },
status: "COMPLETED" as const,
outputData: { result: `Hello ${task.inputData?.name ?? "World"}` },
};
}

@worker({ taskDefName: "process_payment", domain: "payments" })
async function processPayment(task: Task) {
const result = await paymentGateway.charge(task.inputData.customerId, task.inputData.amount);
return { status: "COMPLETED", outputData: { transactionId: result.id } };
return { status: "COMPLETED" as const, outputData: { transactionId: result.id } };
}

// Auto-discover and start all decorated workers
const handler = new TaskHandler({ client, scanForDecorated: true });
await handler.startWorkers();
```

### tsconfig setup

// Graceful shutdown
| Decorator style | tsconfig.json |
|-----------------|---------------|
| **New** (TypeScript 5.0+) | Omit `experimentalDecorators` — use class methods |
| **Legacy** | `"experimentalDecorators": true` — use standalone functions |

**Graceful shutdown:**

```typescript
process.on("SIGTERM", async () => {
await handler.stopWorkers();
process.exit(0);
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

105 changes: 105 additions & 0 deletions src/sdk/worker/decorators/__tests__/worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,111 @@ describe("@worker decorator", () => {
});
});

describe("@worker decorator - New API (TypeScript 5.0+ Stage 3 decorators)", () => {
beforeEach(() => {
clearWorkerRegistry();
});

afterEach(() => {
clearWorkerRegistry();
});

test("should register when called with new decorator signature (value, context)", () => {
async function greetMethod(task: Task) {
return {
status: "COMPLETED" as const,
outputData: { result: `Hello ${(task.inputData as Record<string, string>)?.name ?? "World"}` },
};
}

// Simulate new decorator API: decorator(value, context) where context has kind
const decorator = worker({ taskDefName: "new_api_greet" });
decorator(greetMethod, { kind: "method", name: "greet" });

const workers = getRegisteredWorkers();
expect(workers).toHaveLength(1);
expect(workers[0].taskDefName).toBe("new_api_greet");
expect(workers[0].executeFunction).toBe(greetMethod);
});

test("should return replacement function for new API (replaces class method)", () => {
async function originalMethod(task: Task) {
return {
status: "COMPLETED" as const,
outputData: { value: (task.inputData as Record<string, number>).x + 1 },
};
}

const decorator = worker({ taskDefName: "new_api_replace" });
const replacement = decorator(originalMethod, { kind: "method", name: "compute" });

expect(typeof replacement).toBe("function");
expect(replacement).not.toBe(originalMethod);

// Replacement should execute the original when called normally
const result = (replacement as (task: Task) => Promise<{ status: string; outputData: unknown }>)(
{ inputData: { x: 10 } } as Task
);
return expect(result).resolves.toEqual({
status: "COMPLETED",
outputData: { value: 11 },
});
});

test("should support dual-mode (workflow builder) when using new API", () => {
async function processTask(_task: Task) {
return { status: "COMPLETED" as const, outputData: { done: true } };
}

const decorator = worker({ taskDefName: "new_api_dual" });
const replacement = decorator(processTask, { kind: "method", name: "process" }) as (
arg: { taskRefName: string; inputParameters?: Record<string, unknown> }
) => unknown;

const taskDef = replacement({
taskRefName: "step_1",
inputParameters: { key: "value" },
});

expect(taskDef).toMatchObject({
name: "new_api_dual",
taskReferenceName: "step_1",
inputParameters: { key: "value" },
});
});

test("should register with options when using new API", () => {
async function workerFn(_task: Task) {
return { status: "COMPLETED" as const, outputData: {} };
}

const decorator = worker({
taskDefName: "new_api_options",
concurrency: 5,
pollInterval: 300,
domain: "staging",
});
decorator(workerFn, { kind: "method", name: "workerFn" });

const registered = getRegisteredWorker("new_api_options", "staging");
expect(registered).toBeDefined();
expect(registered?.concurrency).toBe(5);
expect(registered?.pollInterval).toBe(300);
expect(registered?.domain).toBe("staging");
});

test("should throw if taskDefName missing with new API", () => {
async function fn(_task: Task) {
return { status: "COMPLETED" as const, outputData: {} };
}

const decorator = worker({} as { taskDefName: string });
expect(() => {
decorator(fn, { kind: "method", name: "fn" });
}).toThrow("requires 'taskDefName'");
});
});

describe("Worker Registry", () => {
beforeEach(() => {
clearWorkerRegistry();
Expand Down
88 changes: 75 additions & 13 deletions src/sdk/worker/decorators/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,28 +208,75 @@ export interface WorkerOptions {
* }
* ```
*/
/** Minimal context shape for Stage 3 method decorators (TypeScript 5.0+). */
interface MethodDecoratorContext {
kind: string;
name: string | symbol;
}

/**
* Type guard for Stage 3 (TypeScript 5.0+) decorator context.
* New decorators pass (value, context) where context has a `kind` property.
*/
function isNewDecoratorContext(arg: unknown): arg is MethodDecoratorContext {
return (
typeof arg === "object" &&
arg !== null &&
"kind" in arg &&
typeof (arg as { kind: string }).kind === "string"
);
}

type WorkerMethod = (
task: Task
) => Promise<Omit<TaskResult, "workflowInstanceId" | "taskId">>;

export function worker(options: WorkerOptions) {
return function (
target: unknown,
function decorator<T extends WorkerMethod>(
value: T,
context: MethodDecoratorContext
): T | undefined;
function decorator(
target: object,
propertyKey?: string,
descriptor?: PropertyDescriptor
) {
// Extract the function to register
const executeFunction = descriptor?.value || target;
): PropertyDescriptor | WorkerMethod | undefined;
function decorator<T extends WorkerMethod>(
target: T | object,
propertyKeyOrContext?: string | MethodDecoratorContext,
descriptor?: PropertyDescriptor
): T | PropertyDescriptor | undefined {
// Detect decorator API: new (Stage 3) vs legacy (experimentalDecorators)
let executeFunction: (task: Task) => Promise<Omit<TaskResult, "workflowInstanceId" | "taskId">>;
let isNewApi = false;

if (isNewDecoratorContext(propertyKeyOrContext)) {
// New decorator API: target is the method itself
executeFunction = target as (task: Task) => Promise<
Omit<TaskResult, "workflowInstanceId" | "taskId">
>;
isNewApi = true;
} else {
// Legacy API: descriptor?.value (method) or target (standalone function)
const fn = (descriptor?.value ?? target) as (
task: Task
) => Promise<Omit<TaskResult, "workflowInstanceId" | "taskId">>;
executeFunction = fn;
}

// Validate that we have a function
if (typeof executeFunction !== "function") {
throw new Error(
`@worker decorator can only be applied to functions. ` +
`Received: ${typeof executeFunction}`
`Received: ${typeof executeFunction}`
);
}

// Validate required options
if (!options.taskDefName) {
throw new Error(
`@worker decorator requires 'taskDefName' option. ` +
`Example: @worker({ taskDefName: "my_task" })`
`Example: @worker({ taskDefName: "my_task" })`
);
}

Expand All @@ -238,16 +285,20 @@ export function worker(options: WorkerOptions) {
let resolvedOutputSchema = options.outputSchema;

if (options.inputType) {
resolvedInputSchema = generateSchemaFromClass(options.inputType) as unknown as Record<string, unknown>;
resolvedInputSchema = generateSchemaFromClass(
options.inputType
) as unknown as Record<string, unknown>;
}
if (options.outputType) {
resolvedOutputSchema = generateSchemaFromClass(options.outputType) as unknown as Record<string, unknown>;
resolvedOutputSchema = generateSchemaFromClass(
options.outputType
) as unknown as Record<string, unknown>;
}

// Create registered worker metadata
const registeredWorker: RegisteredWorker = {
taskDefName: options.taskDefName,
executeFunction: executeFunction as (task: Task) => Promise<Omit<TaskResult, "workflowInstanceId" | "taskId">>,
executeFunction,
concurrency: options.concurrency,
pollInterval: options.pollInterval,
domain: options.domain,
Expand Down Expand Up @@ -285,7 +336,10 @@ export function worker(options: WorkerOptions) {
);
}
// Normal execution mode
return (executeFunction as (...args: unknown[]) => unknown).apply(this, args);
return (executeFunction as (...args: unknown[]) => unknown).apply(
this,
args
);
};

// Preserve original function name
Expand All @@ -294,10 +348,18 @@ export function worker(options: WorkerOptions) {
configurable: true,
});

if (isNewApi) {
// New decorator API: return replacement function
return dualModeFunction as unknown as T;
}

// Legacy API
if (descriptor) {
descriptor.value = dualModeFunction;
return descriptor;
}
return dualModeFunction;
};
return dualModeFunction as unknown as T;
}

return decorator;
}
Loading
Loading