From a517a25108f9fab29bc40dc2aef5817dc805c707 Mon Sep 17 00:00:00 2001 From: Kyle Kincer Date: Sat, 4 Apr 2026 06:25:22 -0400 Subject: [PATCH] feat: configurable stream expiration with activity-based timeout Two bugs/limitations fixed: 1. The cleanup cron checked `_creationTime` instead of last activity, so actively-streaming sessions were killed after 20 minutes from creation regardless of ongoing chunk activity. 2. The 20-minute expiration was hardcoded with no way to configure or disable it, making the component unsuitable for long-running streams (e.g. AI agent sessions that can run for hours). Changes: - Track `lastActivityTime` on each stream, updated on every `addChunk` - Cleanup now uses `lastActivityTime` (falling back to `_creationTime` for streams created before this field existed) - New `configure` mutation accepts `expirationMs: number | null` to override the default 20-minute timeout or disable expiration entirely - `PersistentTextStreaming` client class accepts typed `PersistentTextStreamingOptions` with `expirationMs` and auto-configures on first `createStream` call - Fully backward-compatible: unconfigured deployments retain the existing 20-minute default Made-with: Cursor --- src/client/index.ts | 33 +++++++++++++++++++++++- src/component/lib.ts | 57 ++++++++++++++++++++++++++++++++--------- src/component/schema.ts | 4 +++ 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/src/client/index.ts b/src/client/index.ts index 2140955..fdbc682 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -25,6 +25,15 @@ export type StreamWriter> = ( chunkAppender: ChunkAppender, ) => Promise; +export interface PersistentTextStreamingOptions { + /** + * How long (in ms) a stream may be idle before the cleanup cron marks + * it as timed-out. Set to `null` to disable expiration entirely. + * When omitted the component defaults to 20 minutes. + */ + expirationMs?: number | null; +} + // TODO -- make more flexible. # of bytes, etc? const hasDelimeter = (text: string) => { return text.includes(".") || text.includes("!") || text.includes("?"); @@ -32,11 +41,30 @@ const hasDelimeter = (text: string) => { // TODO -- some sort of wrapper with easy ergonomics for working with LLMs? export class PersistentTextStreaming { + private _configured = false; + constructor( public component: UseApi, - public options?: object, + public options?: PersistentTextStreamingOptions, ) {} + /** + * Persist the expiration configuration into the component's database. + * This only needs to be called once (the setting is durable), but is + * safe to call repeatedly — it upserts a singleton config document. + * + * If `options.expirationMs` was provided to the constructor this is + * called automatically on the first `createStream` invocation. + */ + async configure(ctx: RunMutationCtx): Promise { + const expirationMs = this.options?.expirationMs; + if (expirationMs === undefined) return; + await ctx.runMutation(this.component.lib.configure, { + expirationMs, + }); + this._configured = true; + } + /** * Create a new stream. This will return a stream ID that can be used * in an HTTP action to stream data back out to the client while also @@ -56,6 +84,9 @@ export class PersistentTextStreaming { */ async createStream(ctx: RunMutationCtx): Promise { + if (!this._configured && this.options?.expirationMs !== undefined) { + await this.configure(ctx); + } const id = await ctx.runMutation(this.component.lib.createStream); return id as StreamId; } diff --git a/src/component/lib.ts b/src/component/lib.ts index cc01906..6530dc4 100644 --- a/src/component/lib.ts +++ b/src/component/lib.ts @@ -27,22 +27,22 @@ export const addChunk = mutation({ if (!stream) { throw new Error("Stream not found"); } + const patch: Record = { + lastActivityTime: Date.now(), + }; if (stream.status === "pending") { - await ctx.db.patch(args.streamId, { - status: "streaming", - }); + patch.status = "streaming"; } else if (stream.status !== "streaming") { throw new Error("Stream is not streaming; did it timeout?"); } + if (args.final) { + patch.status = "done"; + } + await ctx.db.patch(args.streamId, patch); await ctx.db.insert("chunks", { streamId: args.streamId, text: args.text, }); - if (args.final) { - await ctx.db.patch(args.streamId, { - status: "done", - }); - } }, }); @@ -119,14 +119,45 @@ export const getStreamText = query({ }, }); -const EXPIRATION_TIME = 20 * 60 * 1000; // 20 minutes in milliseconds +// Configure stream expiration behavior. +// expirationMs: number of milliseconds of inactivity before a stream is +// timed out. Set to null to disable expiration entirely. +export const configure = mutation({ + args: { + expirationMs: v.union(v.number(), v.null()), + }, + handler: async (ctx, args) => { + const existing = await ctx.db.query("streamConfig").first(); + if (existing) { + await ctx.db.patch(existing._id, { + expirationMs: args.expirationMs, + }); + } else { + await ctx.db.insert("streamConfig", { + expirationMs: args.expirationMs, + }); + } + }, +}); + +const DEFAULT_EXPIRATION_TIME = 20 * 60 * 1000; // 20 minutes in milliseconds const BATCH_SIZE = 100; -// If the last chunk of a stream was added more than 20 minutes ago, -// set the stream to timeout. The action feeding it has to be dead. +// Clean up streams that have been inactive longer than the configured +// expiration. Uses lastActivityTime (falling back to _creationTime for +// streams created before this field existed). Skipped entirely when +// expiration is configured as null. export const cleanupExpiredStreams = internalMutation({ args: {}, handler: async (ctx) => { + const config = await ctx.db.query("streamConfig").first(); + const expirationMs = config?.expirationMs; + + if (expirationMs === null) return; + + const effectiveExpiration = + typeof expirationMs === "number" ? expirationMs : DEFAULT_EXPIRATION_TIME; + const now = Date.now(); const pendingStreams = await ctx.db .query("streams") @@ -138,7 +169,9 @@ export const cleanupExpiredStreams = internalMutation({ .take(BATCH_SIZE); for (const stream of [...pendingStreams, ...streamingStreams]) { - if (now - stream._creationTime > EXPIRATION_TIME) { + const lastActive = + (stream as any).lastActivityTime ?? stream._creationTime; + if (now - lastActive > effectiveExpiration) { console.log("Cleaning up expired stream", stream._id); await ctx.db.patch(stream._id, { status: "timeout", diff --git a/src/component/schema.ts b/src/component/schema.ts index d7435fc..2d2cc8f 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -13,9 +13,13 @@ export type StreamStatus = Infer; export default defineSchema({ streams: defineTable({ status: streamStatusValidator, + lastActivityTime: v.optional(v.number()), }).index("byStatus", ["status"]), chunks: defineTable({ streamId: v.id("streams"), text: v.string(), }).index("byStream", ["streamId"]), + streamConfig: defineTable({ + expirationMs: v.optional(v.union(v.number(), v.null())), + }), });