diff --git a/packages/core/src/shared-exports.ts b/packages/core/src/shared-exports.ts index dcc8c268f509..6edcb3224d81 100644 --- a/packages/core/src/shared-exports.ts +++ b/packages/core/src/shared-exports.ts @@ -173,6 +173,7 @@ export { createLangChainCallbackHandler, instrumentLangChainEmbeddings } from '. export { LANGCHAIN_INTEGRATION_NAME } from './tracing/langchain/constants'; export type { LangChainOptions, LangChainIntegration } from './tracing/langchain/types'; export { instrumentStateGraphCompile, instrumentCreateReactAgent, instrumentLangGraph } from './tracing/langgraph'; +export { mergeSentryCallback } from './tracing/langgraph/utils'; export { LANGGRAPH_INTEGRATION_NAME } from './tracing/langgraph/constants'; export type { LangGraphOptions, LangGraphIntegration, CompiledGraph } from './tracing/langgraph/types'; export type { OpenAiClient, OpenAiOptions, InstrumentedMethod } from './tracing/openai/types'; diff --git a/packages/core/src/tracing/langgraph/utils.ts b/packages/core/src/tracing/langgraph/utils.ts index 8770cbbd629b..0fadded3c82d 100644 --- a/packages/core/src/tracing/langgraph/utils.ts +++ b/packages/core/src/tracing/langgraph/utils.ts @@ -335,7 +335,41 @@ export function setResponseAttributes(span: Span, inputMessages: LangChainMessag } } -/** Merge `sentryHandler` into a langchain `callbacks` value (`BaseCallbackHandler[]` or `BaseCallbackManager`). */ +/** + * Detects a LangChain `CallbackManager` (or subclass) without depending on `instanceof`. + * `@langchain/core` is frequently bundled or deduped, so the imported constructor doesn't + * necessarily match the one at the user's call site. We walk the prototype chain looking + * for the class name, then confirm the shape — the constructor-name check rules out + * unrelated objects that happen to expose `addHandler`/`copy`. + */ +function isCallbackManager(value: unknown): value is { + addHandler: (handler: unknown, inherit?: boolean) => void; + copy: () => unknown; + handlers?: unknown[]; +} { + if (!value || typeof value !== 'object') { + return false; + } + + let proto: object | null = Object.getPrototypeOf(value); + while (proto) { + if ((proto as { constructor?: { name?: string } }).constructor?.name === 'CallbackManager') { + const candidate = value as { addHandler?: unknown; copy?: unknown }; + return typeof candidate.addHandler === 'function' && typeof candidate.copy === 'function'; + } + proto = Object.getPrototypeOf(proto); + } + return false; +} + +/** + * Merge `sentryHandler` into a langchain `callbacks` value (undefined, `BaseCallbackHandler[]`, or `BaseCallbackManager`). + * + * Wrapping a `CallbackManager` into `[manager, sentryHandler]` would make LangChain treat the whole manager + * as one opaque handler and drop its inheritable children — notably LangGraph's `StreamMessagesHandler`, + * which silently breaks per-token streaming. We register on a `.copy()` (so caller state stays clean across + * runs) and add ourselves as inheritable so `getChild()` propagates us into nested calls. + */ export function mergeSentryCallback(existing: unknown, sentryHandler: unknown): unknown { if (!existing) { return [sentryHandler]; @@ -348,12 +382,15 @@ export function mergeSentryCallback(existing: unknown, sentryHandler: unknown): return [...existing, sentryHandler]; } - const manager = existing as { addHandler?: (h: unknown) => void; handlers?: unknown[] }; - if (typeof manager.addHandler === 'function') { - const alreadyAdded = Array.isArray(manager.handlers) && manager.handlers.includes(sentryHandler); - if (!alreadyAdded) { - manager.addHandler(sentryHandler); + if (isCallbackManager(existing)) { + const copied = existing.copy() as { + addHandler: (handler: unknown, inherit?: boolean) => void; + handlers?: unknown[]; + }; + if (!copied.handlers?.includes(sentryHandler)) { + copied.addHandler(sentryHandler, true); } + return copied; } return existing; diff --git a/packages/core/test/lib/utils/langgraph-utils.test.ts b/packages/core/test/lib/utils/langgraph-utils.test.ts index 829317518622..f666a4d6693c 100644 --- a/packages/core/test/lib/utils/langgraph-utils.test.ts +++ b/packages/core/test/lib/utils/langgraph-utils.test.ts @@ -48,6 +48,34 @@ describe('extractAgentNameFromParams', () => { describe('mergeSentryCallback', () => { const sentryHandler = { _sentry: true }; + /** + * Minimal `CallbackManager` stand-in. Mirrors `@langchain/core`'s real + * semantics: `addHandler(_, inherit)` pushes to both `handlers` and + * `inheritableHandlers` when `inherit !== false`, and `copy()` returns + * a fresh manager carrying the same handlers — so we don't accidentally + * test against a degenerate shape that bypasses `addHandler`. + */ + function makeFakeCallbackManager(existingHandlers: unknown[] = [], existingInheritableHandlers?: unknown[]) { + // Use a class so `Object.getPrototypeOf(instance).constructor.name === 'CallbackManager'`, + // which is how the production detector identifies a real LangChain CallbackManager. + class CallbackManager { + public handlers: unknown[]; + public inheritableHandlers: unknown[]; + public addHandler = vi.fn((handler: unknown, inherit?: boolean) => { + this.handlers.push(handler); + if (inherit !== false) { + this.inheritableHandlers.push(handler); + } + }); + public copy = vi.fn(() => makeFakeCallbackManager(this.handlers, this.inheritableHandlers)); + constructor(initialHandlers: unknown[], initialInheritableHandlers: unknown[]) { + this.handlers = [...initialHandlers]; + this.inheritableHandlers = [...initialInheritableHandlers]; + } + } + return new CallbackManager(existingHandlers, existingInheritableHandlers ?? existingHandlers); + } + it('returns a fresh array when no existing callbacks are present', () => { expect(mergeSentryCallback(undefined, sentryHandler)).toStrictEqual([sentryHandler]); expect(mergeSentryCallback(null, sentryHandler)).toStrictEqual([sentryHandler]); @@ -65,19 +93,81 @@ describe('mergeSentryCallback', () => { expect(mergeSentryCallback(existing, sentryHandler)).toBe(existing); }); - it('calls addHandler on a CallbackManager-like object', () => { - const addHandler = vi.fn(); - const manager = { addHandler, handlers: [] as unknown[] }; - const result = mergeSentryCallback(manager, sentryHandler); - expect(result).toBe(manager); - expect(addHandler).toHaveBeenCalledWith(sentryHandler); - expect(addHandler).toHaveBeenCalledTimes(1); + it('preserves inheritable handlers when callbacks is a CallbackManager', () => { + // Reproduces the LangGraph `streamMode: ['messages']` setup: a + // CallbackManager carrying a StreamMessagesHandler is passed via + // options.callbacks. Wrapping it as `[manager, sentryHandler]` would + // drop the manager's inheritable children — instead we register + // Sentry on a copy and keep the existing handler chain intact. + const streamMessagesHandler = { + name: 'StreamMessagesHandler', + lc_prefer_streaming: true, + }; + const manager = makeFakeCallbackManager([streamMessagesHandler]); + const result = mergeSentryCallback(manager, sentryHandler) as { + handlers: unknown[]; + }; + expect(Array.isArray(result)).toBe(false); + expect(result.handlers).toEqual([streamMessagesHandler, sentryHandler]); + }); + + it('copies the manager and registers Sentry as an inheritable handler', () => { + // Two adjacent contracts: we operate on a copy (so repeat invocations + // don't accumulate handlers on the caller), and we pass `inherit=true` + // so LangChain's `getChild()` propagates Sentry into nested calls. + const manager = makeFakeCallbackManager([]); + const result = mergeSentryCallback(manager, sentryHandler) as { + addHandler: ReturnType; + inheritableHandlers: unknown[]; + }; + expect(manager.copy).toHaveBeenCalledTimes(1); + expect(manager.handlers).toEqual([]); + expect(result.addHandler).toHaveBeenCalledWith(sentryHandler, true); + expect(result.inheritableHandlers).toEqual([sentryHandler]); + }); + + it('does not double-register when the copied manager already contains the handler', () => { + const manager = makeFakeCallbackManager([sentryHandler]); + const result = mergeSentryCallback(manager, sentryHandler) as { + handlers: unknown[]; + addHandler: ReturnType; + }; + expect(result.handlers).toEqual([sentryHandler]); + expect(result.addHandler).not.toHaveBeenCalled(); + }); + + it('returns the value unchanged when it is neither an array nor a CallbackManager', () => { + const opaque = { name: 'NotAManager' }; + expect(mergeSentryCallback(opaque, sentryHandler)).toBe(opaque); + }); + + it('does not treat a coincidentally duck-typed object as a CallbackManager', () => { + // A plain object that happens to expose `addHandler`/`copy` shouldn't be + // mistaken for a real LangChain CallbackManager — the constructor-name + // check guards against false positives. + const lookalike = { addHandler: vi.fn(), copy: vi.fn(), handlers: [] }; + expect(mergeSentryCallback(lookalike, sentryHandler)).toBe(lookalike); + expect(lookalike.addHandler).not.toHaveBeenCalled(); + expect(lookalike.copy).not.toHaveBeenCalled(); }); - it('does not re-add when the manager already has the sentry handler', () => { - const addHandler = vi.fn(); - const manager = { addHandler, handlers: [sentryHandler] }; - mergeSentryCallback(manager, sentryHandler); - expect(addHandler).not.toHaveBeenCalled(); + it('recognizes subclasses of CallbackManager via the prototype walk', () => { + class CallbackManager { + public handlers: unknown[] = []; + public inheritableHandlers: unknown[] = []; + public addHandler = vi.fn((handler: unknown, inherit?: boolean) => { + this.handlers.push(handler); + if (inherit !== false) { + this.inheritableHandlers.push(handler); + } + }); + public copy = vi.fn(() => new CallbackManager()); + } + class CustomCallbackManager extends CallbackManager {} + const subclass = new CustomCallbackManager(); + const result = mergeSentryCallback(subclass, sentryHandler) as { + addHandler: ReturnType; + }; + expect(result.addHandler).toHaveBeenCalledWith(sentryHandler, true); }); }); diff --git a/packages/node/src/integrations/tracing/langchain/instrumentation.ts b/packages/node/src/integrations/tracing/langchain/instrumentation.ts index fb3e80b48583..8c4c594a1375 100644 --- a/packages/node/src/integrations/tracing/langchain/instrumentation.ts +++ b/packages/node/src/integrations/tracing/langchain/instrumentation.ts @@ -12,6 +12,7 @@ import { createLangChainCallbackHandler, GOOGLE_GENAI_INTEGRATION_NAME, instrumentLangChainEmbeddings, + mergeSentryCallback, OPENAI_INTEGRATION_NAME, SDK_VERSION, } from '@sentry/core'; @@ -27,34 +28,6 @@ interface PatchedLangChainExports { [key: string]: unknown; } -/** - * Augments a callback handler list with Sentry's handler if not already present - */ -function augmentCallbackHandlers(handlers: unknown, sentryHandler: unknown): unknown { - // Handle null/undefined - return array with just our handler - if (!handlers) { - return [sentryHandler]; - } - - // If handlers is already an array - if (Array.isArray(handlers)) { - // Check if our handler is already in the list - if (handlers.includes(sentryHandler)) { - return handlers; - } - // Add our handler to the list - return [...handlers, sentryHandler]; - } - - // If it's a single handler object, convert to array - if (typeof handlers === 'object') { - return [handlers, sentryHandler]; - } - - // Unknown type - return original - return handlers; -} - /** * Wraps Runnable methods (invoke, stream, batch) to inject Sentry callbacks at request time * Uses a Proxy to intercept method calls and augment the options.callbacks @@ -82,9 +55,7 @@ function wrapRunnableMethod( } // Inject our callback handler into options.callbacks (request time callbacks) - const existingCallbacks = options.callbacks; - const augmentedCallbacks = augmentCallbackHandlers(existingCallbacks, sentryHandler); - options.callbacks = augmentedCallbacks; + options.callbacks = mergeSentryCallback(options.callbacks, sentryHandler); // Call original method with augmented options return Reflect.apply(target, thisArg, args);