11import { redisTest } from "@internal/testcontainers" ;
2- import { describe , expect , it , vi } from "vitest" ;
2+ import { describe , expect , it } from "vitest" ;
33import { Logger } from "@trigger.dev/core/logger" ;
44import { MollifierBuffer } from "./buffer.js" ;
55import { MollifierDrainer } from "./drainer.js" ;
@@ -49,7 +49,16 @@ describe("MollifierDrainer.runOnce", () => {
4949 ...noopOptions ,
5050 } ) ;
5151
52- const handler = vi . fn ( async ( ) => { } ) ;
52+ const handlerCalls : Array < { runId : string ; envId : string ; orgId : string ; payload : unknown } > =
53+ [ ] ;
54+ const handler = async ( input : {
55+ runId : string ;
56+ envId : string ;
57+ orgId : string ;
58+ payload : unknown ;
59+ } ) => {
60+ handlerCalls . push ( input ) ;
61+ } ;
5362 const drainer = new MollifierDrainer ( {
5463 buffer,
5564 handler,
@@ -70,15 +79,13 @@ describe("MollifierDrainer.runOnce", () => {
7079 const result = await drainer . runOnce ( ) ;
7180 expect ( result . drained ) . toBe ( 1 ) ;
7281 expect ( result . failed ) . toBe ( 0 ) ;
73- expect ( handler ) . toHaveBeenCalledTimes ( 1 ) ;
74- expect ( handler ) . toHaveBeenCalledWith (
75- expect . objectContaining ( {
76- runId : "run_1" ,
77- envId : "env_a" ,
78- orgId : "org_1" ,
79- payload : { foo : 1 } ,
80- } ) ,
81- ) ;
82+ expect ( handlerCalls ) . toHaveLength ( 1 ) ;
83+ expect ( handlerCalls [ 0 ] ) . toMatchObject ( {
84+ runId : "run_1" ,
85+ envId : "env_a" ,
86+ orgId : "org_1" ,
87+ payload : { foo : 1 } ,
88+ } ) ;
8289
8390 const entry = await buffer . getEntry ( "run_1" ) ;
8491 expect ( entry ) . toBeNull ( ) ;
@@ -97,7 +104,10 @@ describe("MollifierDrainer.runOnce", () => {
97104 ...noopOptions ,
98105 } ) ;
99106
100- const handler = vi . fn ( async ( ) => { } ) ;
107+ let handlerCalls = 0 ;
108+ const handler = async ( ) => {
109+ handlerCalls ++ ;
110+ } ;
101111 const drainer = new MollifierDrainer ( {
102112 buffer,
103113 handler,
@@ -111,7 +121,7 @@ describe("MollifierDrainer.runOnce", () => {
111121 const result = await drainer . runOnce ( ) ;
112122 expect ( result . drained ) . toBe ( 0 ) ;
113123 expect ( result . failed ) . toBe ( 0 ) ;
114- expect ( handler ) . not . toHaveBeenCalled ( ) ;
124+ expect ( handlerCalls ) . toBe ( 0 ) ;
115125 } finally {
116126 await buffer . close ( ) ;
117127 }
@@ -130,10 +140,10 @@ describe("MollifierDrainer error handling", () => {
130140 } ) ;
131141
132142 let calls = 0 ;
133- const handler = vi . fn ( async ( ) => {
143+ const handler = async ( ) => {
134144 calls ++ ;
135145 throw new Error ( "transient" ) ;
136- } ) ;
146+ } ;
137147
138148 const drainer = new MollifierDrainer ( {
139149 buffer,
@@ -176,9 +186,9 @@ describe("MollifierDrainer error handling", () => {
176186 ...noopOptions ,
177187 } ) ;
178188
179- const handler = vi . fn ( async ( ) => {
189+ const handler = async ( ) => {
180190 throw new Error ( "validation failure" ) ;
181- } ) ;
191+ } ;
182192
183193 const drainer = new MollifierDrainer ( {
184194 buffer,
@@ -216,9 +226,9 @@ describe("MollifierDrainer error handling", () => {
216226 } ) ;
217227
218228 const handled : string [ ] = [ ] ;
219- const handler = vi . fn ( async ( input : { runId : string } ) => {
229+ const handler = async ( input : { runId : string } ) => {
220230 handled . push ( input . runId ) ;
221- } ) ;
231+ } ;
222232
223233 const drainer = new MollifierDrainer ( {
224234 buffer,
@@ -1077,9 +1087,9 @@ describe("MollifierDrainer.start/stop", () => {
10771087 } ) ;
10781088
10791089 const handled : string [ ] = [ ] ;
1080- const handler = vi . fn ( async ( input : { runId : string } ) => {
1090+ const handler = async ( input : { runId : string } ) => {
10811091 handled . push ( input . runId ) ;
1082- } ) ;
1092+ } ;
10831093
10841094 const drainer = new MollifierDrainer ( {
10851095 buffer,
@@ -1121,10 +1131,10 @@ describe("MollifierDrainer.start/stop", () => {
11211131 } ) ;
11221132
11231133 let handlerStarted = false ;
1124- const handler = vi . fn ( async ( ) => {
1134+ const handler = async ( ) => {
11251135 handlerStarted = true ;
11261136 await new Promise < void > ( ( ) => { } ) ;
1127- } ) ;
1137+ } ;
11281138
11291139 const drainer = new MollifierDrainer ( {
11301140 buffer,
@@ -1177,15 +1187,17 @@ describe("MollifierDrainer concurrency cap", () => {
11771187 const envCount = 12 ;
11781188 let inflight = 0 ;
11791189 let peak = 0 ;
1180- const handler = vi . fn ( async ( ) => {
1190+ let handlerCalls = 0 ;
1191+ const handler = async ( ) => {
1192+ handlerCalls ++ ;
11811193 inflight ++ ;
11821194 if ( inflight > peak ) peak = inflight ;
11831195 // Sleep long enough that handlers definitely overlap if scheduling
11841196 // allowed it — the assertion is meaningful only if multiple handlers
11851197 // would be running simultaneously without the cap.
11861198 await new Promise ( ( r ) => setTimeout ( r , 75 ) ) ;
11871199 inflight -- ;
1188- } ) ;
1200+ } ;
11891201
11901202 const drainer = new MollifierDrainer ( {
11911203 buffer,
@@ -1213,7 +1225,7 @@ describe("MollifierDrainer concurrency cap", () => {
12131225
12141226 const result = await drainer . runOnce ( ) ;
12151227 expect ( result . drained ) . toBe ( envCount ) ;
1216- expect ( handler ) . toHaveBeenCalledTimes ( envCount ) ;
1228+ expect ( handlerCalls ) . toBe ( envCount ) ;
12171229 expect ( peak ) . toBeGreaterThan ( 1 ) ; // concurrency is real, not serialised
12181230 expect ( peak ) . toBeLessThanOrEqual ( concurrency ) ;
12191231 } finally {
0 commit comments