@@ -177,19 +177,17 @@ describe("RunQueue Concurrency Sweeper", () => {
177177 ) ;
178178
179179 // When the sweeper acks a run whose messageKey value is still sitting on the worker
180- // queue list (e.g. fast-path enqueued, never BLPOP'd), it deletes the message body but
181- // leaves a stale tombstone on the list. The next BLPOP returns that tombstone, GET
182- // returns nil, and the dequeue path logs "Failed to dequeue message from worker queue".
180+ // queue list (e.g. fast-path enqueued, never BLPOP'd), it must remove the entry from
181+ // the list as well as deleting the message body. Otherwise the list keeps a stale
182+ // tombstone — the next BLPOP returns the messageKey, GET returns nil, and the dequeue
183+ // path logs "Failed to dequeue message from worker queue".
183184 redisTest (
184- "should not produce 'Failed to dequeue message from worker queue' after sweeper acks a fast-path-enqueued run" ,
185+ "should clear the worker queue list when sweeper acks a fast-path-enqueued run" ,
185186 async ( { redisContainer } ) => {
186187 let enableConcurrencySweeper = false ;
187- const logger = new Logger ( "RunQueue" , "debug" ) ;
188- const errorSpy = vi . spyOn ( logger , "error" ) ;
189188
190189 const queue = new RunQueue ( {
191190 ...testOptions ,
192- logger,
193191 logLevel : "debug" ,
194192 queueSelectionStrategy : new FairQueueSelectionStrategy ( {
195193 redis : {
@@ -242,31 +240,26 @@ describe("RunQueue Concurrency Sweeper", () => {
242240 expect ( await queue . readMessage ( messageDev . orgId , messageDev . runId ) ) . toBeDefined ( ) ;
243241
244242 // Sweeper now considers the run completed (test callback returns it), so
245- // processMarkedRun acks with removeFromWorkerQueue: false .
243+ // processMarkedRun acks with removeFromWorkerQueue: true .
246244 enableConcurrencySweeper = true ;
247245 await setTimeout ( 5_000 ) ;
248246
249- // Sweeper has run: operational concurrency released, message body deleted.
247+ // Sweeper has run: operational concurrency released, message body deleted, AND
248+ // the messageKey value has been LREM'd from the worker queue list. Without the
249+ // LREM, the list would still contain the messageKey, and the next BLPOP would
250+ // pop the tombstone and emit "Failed to dequeue message from worker queue".
250251 expect ( await queue . operationalCurrentConcurrencyOfEnvironment ( authenticatedEnvDev ) ) . toBe ( 0 ) ;
251252 expect ( await queue . readMessage ( messageDev . orgId , messageDev . runId ) ) . toBeUndefined ( ) ;
253+ expect ( await queue . peekAllOnWorkerQueue ( authenticatedEnvDev . id ) ) . toEqual ( [ ] ) ;
252254
253- // Trigger a blocking dequeue with a short timeout — production uses blockingPop:true,
254- // which is the only path that emits this error log.
255+ // A subsequent blocking dequeue finds nothing — no real message and no tombstone.
255256 const dequeued = await queue . dequeueMessageFromWorkerQueue (
256257 "test_consumer" ,
257258 authenticatedEnvDev . id ,
258259 { blockingPop : true , blockingPopTimeoutSeconds : 2 }
259260 ) ;
260261 expect ( dequeued ) . toBeUndefined ( ) ;
261-
262- // BUG: the dequeue path logs the exact Sentry-visible error. Match the message and
263- // the structured fields one-to-one with what TRIGGER-CLOUD-VC reports.
264- const failedDequeueErrors = errorSpy . mock . calls . filter (
265- ( [ msg ] ) => msg === "Failed to dequeue message from worker queue"
266- ) ;
267- expect ( failedDequeueErrors ) . toHaveLength ( 0 ) ;
268262 } finally {
269- errorSpy . mockRestore ( ) ;
270263 await queue . quit ( ) ;
271264 }
272265 }
0 commit comments