Sync workflow orchestration: lifecycle, locks, pagination, content hashing, retry, batch processing. Zero dependencies, works everywhere (Cloudflare Workers, Node, Deno, Bun).
npm install @systemoperator/syncwraps full sync lifecycle: create run, acquire lock, execute, complete/fail, release lock, notify.
import { withSyncRun } from '@systemoperator/sync';
const result = await withSyncRun(
{
step, // Cloudflare WorkflowStep
tracker: mySyncTracker, // implements SyncRunTracker
config: { runType: 'sync_stripe', caller: 'stripe-sync' },
payload: { connectionId: 'conn-1', trigger: 'cron' },
lockStore: myLockStore, // optional, implements SyncLockStore
notifier: myNotifier, // optional, implements SyncNotifier
},
async (ctx) => {
// ctx has: runId, counts, checkTimeout(), payload, config
await paginateCursor(step, ctx, { ... });
},
);durable cursor and offset pagination with timeout checking.
import { paginateCursor, paginateOffset } from '@systemoperator/sync';
await paginateCursor(step, ctx, {
stepPrefix: 'charges',
fetchPage: async (cursor) => stripe.charges.list({ starting_after: cursor }),
processPage: async (items) => processCharges(items),
getCursor: (items) => items.at(-1)!.id,
});
await paginateOffset(step, ctx, {
stepPrefix: 'transactions',
limit: 100,
fetchPage: async (offset, limit) => mercury.listTransactions({ offset, limit }),
processPage: async (items) => processTransactions(items),
});deterministic SHA-256 hashing with sorted keys, upsert-with-hash pattern.
import { computeContentHash, upsertWithHash, determineAction } from '@systemoperator/sync';
const hash = await computeContentHash(apiRecord);
const result = await upsertWithHash({
newHash: hash,
insert: () => db.insert(charges).values({ ...data, contentHash: hash }),
findExisting: () => db.select().from(charges).where(eq(charges.externalId, id)),
update: (existingId) => db.update(charges).set({ ...data, contentHash: hash }).where(eq(charges.id, existingId)),
});
// result: 'created' | 'updated' | 'unchanged'durable retry with step.sleep() that survives worker restarts.
import { withRetry, RETRY_PRESETS, RetryableError } from '@systemoperator/sync';
const data = await withRetry(step, 'fetch-charges', async () => {
const res = await fetch(url);
if (!res.ok) throw new RetryableError(`HTTP ${res.status}`);
return res.json();
}, RETRY_PRESETS.standard);timeout-aware batch processing with stats tracking.
import { processBatchWithTimeout } from '@systemoperator/sync';
const allDone = await processBatchWithTimeout(step, items, {
batchSize: 50,
checkTimeout: ctx.checkTimeout,
processItem: async (item) => processOne(item),
counts: ctx.counts,
});the package never touches databases directly. products inject implementations:
import type { SyncRunTracker, SyncLockStore, SyncNotifier } from '@systemoperator/sync';
const tracker: SyncRunTracker = {
async createAndStartRun(runType, trigger, input) { /* create run record, return ID */ },
async finishRun(runId, output) { /* mark run completed */ },
async failRun(runId, error, output) { /* mark run failed */ },
};
const lockStore: SyncLockStore = {
async acquireLock(resourceId, lockOwner, timeoutMs) { /* pessimistic lock */ },
async releaseLock(resourceId) { /* release lock */ },
};| preset | maxAttempts | initialDelay | maxDelay | backoff |
|---|---|---|---|---|
| fast | 3 | 1s | 5s | 2x |
| standard | 5 | 2s | 60s | 2x |
| rateLimit | 8 | 5s | 5min | 3x |
| dns | 4 | 10s | 2min | 2x |
| verification | 10 | 30s | 10min | 1.5x |
MIT