Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/__tests__/_helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,31 @@ mocked.info =
socket.write(`$${json.length}\r\n${json}\r\n`);
};

mocked.bid =
(bid: string) =>
({ data, socket }: ServerControl) => {
const string = typeof data === "string" ? data : "";
if (string.startsWith("NEW")) {
socket.write(`$${bid.length}\r\n${bid}\r\n`);
} else if (
string.startsWith("COMMIT") ||
string.startsWith("OPEN")
) {
socket.write("+OK\r\n");
} else if (string.startsWith("STATUS")) {
const status = JSON.stringify({
bid,
total: 0,
pending: 0,
failed: 0,
created_at: new Date().toISOString(),
complete_st: "",
success_st: "",
});
socket.write(`$${status.length}\r\n${status}\r\n`);
}
};

export const sleep = (ms: number, value?: unknown): Promise<unknown> => {
return new Promise((resolve) => setTimeout(() => resolve(value), ms));
};
Expand Down
174 changes: 174 additions & 0 deletions src/__tests__/batch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import test from "ava";
import { Client } from "../client";
import { Batch } from "../batch";
import { mocked } from "./_helper";

test("Batch: constructor sets defaults", (t) => {
const client = new Client();
const batch = new Batch(client, {
description: "test batch",
});
t.is(batch.description, "test batch");
t.is(batch.bid, "");
t.is(batch.success, undefined);
t.is(batch.complete, undefined);
});

test("Batch: toJSON serializes options", (t) => {
const client = new Client();
const successJob = client.job("OnSuccess");
const completeJob = client.job("OnComplete");
const batch = new Batch(client, {
description: "test",
success: successJob,
complete: completeJob,
});
const json = batch.toJSON();
t.is(json.description, "test");
t.truthy(json.success);
t.truthy(json.complete);
});

test("Batch: push throws if not opened", async (t) => {
const client = new Client();
const batch = new Batch(client);
const job = client.job("TestJob");
await t.throwsAsync(() => batch.push(job), {
message: /must be opened/,
});
});

test("Batch: push throws if already committed", async (t) => {
await mocked(async (server, port) => {
const bid = "b-test123";
server.on("BATCH", mocked.bid(bid));
server.on("PUSH", mocked.ok());

const client = new Client({ port });
const batch = client.batch({ description: "test" });
await batch.jobs(async () => {
await batch.push(client.job("TestJob"));
});
await t.throwsAsync(() => batch.push(client.job("TestJob")), {
message: /already been committed/,
});
});
});

test("Batch: commit throws if already committed", async (t) => {
await mocked(async (server, port) => {
const bid = "b-test456";
server.on("BATCH", mocked.bid(bid));

const client = new Client({ port });
const batch = client.batch({ description: "test" });
await batch.jobs(async () => {});
await t.throwsAsync(() => batch.commit(), {
message: /already been committed/,
});
});
});

test("Batch: jobs() throws if called twice", async (t) => {
await mocked(async (server, port) => {
const bid = "b-double";
server.on("BATCH", mocked.bid(bid));

const client = new Client({ port });
const batch = client.batch({ description: "test" });
await batch.jobs(async () => {});
await t.throwsAsync(() => batch.jobs(async () => {}), {
message: /must be new/,
});
});
});

test("Batch: push sets custom.bid on job", async (t) => {
t.plan(1);
await mocked(async (server, port) => {
const bid = "b-push-test";
server.on("BATCH", mocked.bid(bid));
server.on("PUSH", ({ data, socket }) => {
t.is(data.custom.bid, bid);
socket.write("+OK\r\n");
});

const client = new Client({ port });
const batch = client.batch();
await batch.jobs(async () => {
await batch.push(client.job("TestJob"));
});
});
});

test("Batch: jobs() sends BATCH NEW then BATCH COMMIT", async (t) => {
const commands: string[] = [];
await mocked(async (server, port) => {
const bid = "b-lifecycle";
server.on("BATCH", ({ data, socket }) => {
const str = typeof data === "string" ? data : "";
if (str.startsWith("NEW")) {
commands.push("NEW");
socket.write(`$${bid.length}\r\n${bid}\r\n`);
} else if (str.startsWith("COMMIT")) {
commands.push("COMMIT");
socket.write("+OK\r\n");
}
});
server.on("PUSH", ({ socket }) => {
commands.push("PUSH");
socket.write("+OK\r\n");
});

const client = new Client({ port });
const batch = client.batch({ description: "lifecycle test" });
await batch.jobs(async () => {
await batch.push(client.job("TestJob"));
});

t.deepEqual(commands, ["NEW", "PUSH", "COMMIT"]);
t.is(batch.bid, bid);
});
});

test("client.batchOpen sends BATCH OPEN and returns usable batch", async (t) => {
t.plan(1);
await mocked(async (server, port) => {
const bid = "b-reopen";
server.on("BATCH", mocked.bid(bid));
server.on("PUSH", ({ data, socket }) => {
t.is(data.custom.bid, bid);
socket.write("+OK\r\n");
});

const client = new Client({ port });
const batch = await client.batchOpen(bid);
await batch.push(client.job("MoreWork"));
await batch.commit();
});
});

test("client.batchStatus returns parsed status", async (t) => {
await mocked(async (server, port) => {
const bid = "b-status";
server.on("BATCH", ({ data, socket }) => {
const status = JSON.stringify({
bid,
total: 5,
pending: 2,
failed: 1,
created_at: "2026-03-24T00:00:00Z",
complete_st: "1",
success_st: "",
});
socket.write(`$${status.length}\r\n${status}\r\n`);
});

const client = new Client({ port });
const status = await client.batchStatus(bid);
t.is(status.bid, bid);
t.is(status.total, 5);
t.is(status.pending, 2);
t.is(status.failed, 1);
});
});
91 changes: 91 additions & 0 deletions src/batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { default as makeDebug } from "debug";
import { Client } from "./client";
import { Job } from "./job";

const debug = makeDebug("faktory-worker:batch");

export type BatchOptions = {
description?: string;
success?: Job;
complete?: Job;
};

export type BatchStatus = {
bid: string;
total: number;
pending: number;
failed: number;
created_at: string;
description?: string;
complete_st: string;
success_st: string;
};

export class Batch {
bid: string;
description?: string;
success?: Job;
complete?: Job;
private client: Client;
private committed: boolean;
private isNew: boolean;

constructor(client: Client, options: BatchOptions = {}) {
this.client = client;
this.bid = "";
this.committed = false;
this.isNew = true;
this.description = options.description;
this.success = options.success;
this.complete = options.complete;
}

toJSON(): Record<string, unknown> {
const payload: Record<string, unknown> = {};
if (this.description != null) payload.description = this.description;
if (this.success) payload.success = this.success.toJSON();
if (this.complete) payload.complete = this.complete.toJSON();
return payload;
}

async jobs(fn: () => Promise<void>): Promise<void> {
if (!this.isNew) {
throw new Error("batch must be new to call jobs()");
}
this.bid = await this.client.batchNew(this);
this.isNew = false;
debug("created batch %s", this.bid);
try {
await fn();
} finally {
await this.commit();
}
}

async push(job: Job): Promise<string> {
if (!this.bid) {
throw new Error("batch must be opened before it can be used");
}
if (this.committed) {
throw new Error("batch has already been committed, must reopen");
}
job.custom = { ...(job.custom || {}), bid: this.bid };
return this.client.push(job);
}

async commit(): Promise<void> {
if (this.committed) {
throw new Error("batch has already been committed, must reopen");
}
await this.client.batchCommit(this.bid);
this.committed = true;
debug("committed batch %s", this.bid);
}

static reopened(client: Client, bid: string): Batch {
const batch = new Batch(client);
batch.bid = bid;
batch.isNew = false;
return batch;
}
}
24 changes: 24 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { ConnectionFactory } from "./connection-factory";
import { Job, JobPayload, JobType, PartialJobPayload } from "./job";
import { DEAD, Mutation, RETRIES, SCHEDULED } from "./mutation";
import { encode, hash, toJobPayloadWithDefaults } from "./utils";
import { Batch, BatchOptions, BatchStatus } from "./batch";

const debug = makeDebug("faktory-worker:client");
const heartDebug = makeDebug("faktory-worker:client:heart");
Expand Down Expand Up @@ -356,4 +357,27 @@ For the best performance, consider pushing ~1000 jobs at a time to the server.
mutation.target = DEAD;
return mutation;
}

batch(options?: BatchOptions): Batch {
return new Batch(this, options);
}

async batchNew(batch: Batch): Promise<string> {
return this.send(["BATCH", "NEW", encode(batch.toJSON())]);
}

async batchOpen(bid: string): Promise<Batch> {
await this.sendWithAssert(["BATCH", "OPEN", bid], "OK");
return Batch.reopened(this, bid);
}

async batchCommit(bid: string): Promise<string> {
return this.sendWithAssert(["BATCH", "COMMIT", bid], "OK");
}

async batchStatus(bid: string): Promise<BatchStatus> {
return JSON.parse(
await this.send(["BATCH", "STATUS", bid])
);
}
}
6 changes: 5 additions & 1 deletion src/faktory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Client, ClientOptions } from "./client";
import { Worker, WorkerOptions } from "./worker";
import { Job, JobType, JobFunction, Registry } from "./job";
import { Mutation } from "./mutation";
import { Batch, BatchOptions, BatchStatus } from "./batch";

const debug = makeDebug("faktory-worker");

Expand All @@ -16,6 +17,7 @@ export {
} from "./job";
export { ContextProvider } from "./job";
export { Middleware, Context as MiddlewareContext } from "./middleware";
export { BatchOptions, BatchStatus } from "./batch";

export interface FaktoryControl {
registry: Registry;
Expand All @@ -29,6 +31,7 @@ export interface FaktoryControl {
Client: typeof Client;
Job: typeof Job;
Mutation: typeof Mutation;
Batch: typeof Batch;
create: FaktoryControlCreator;
}

Expand Down Expand Up @@ -59,6 +62,7 @@ export function create(): FaktoryControl {
Client,
Job,
Mutation,
Batch,
create,
/**
* Returns the registry for the faktory singleton
Expand Down Expand Up @@ -178,7 +182,7 @@ export function create(): FaktoryControl {
};
}

export { Worker, WorkerOptions, Client, ClientOptions, Job, Mutation };
export { Worker, WorkerOptions, Client, ClientOptions, Job, Mutation, Batch };
const singleton = create();
// exclusively for the typedescript declaration file
export default singleton;
Expand Down