Add QueueBundle.Remove() to stop and remove queues#1235
Add QueueBundle.Remove() to stop and remove queues#1235brandur merged 1 commit intoriverqueue:masterfrom
Conversation
|
Cool! Thx. A few small asks on this one:
Anything else you want to add @bgentry? |
While doing a little work on #1235, my LLM flagged that the way that we're accessing `producersByQueueName` today is already a little unsafe as it may be read in multiple places concurrently. There's a mutex to protect it somewhat in the `QueueBundle`, but it may still race between a change there a "notify producer" send. Here, add one additional `RWMutex` that makes sure to synchronize read access on the map.
While doing a little work on #1235, my LLM flagged that the way that we're accessing `producersByQueueName` today is already a little unsafe as it may be read in multiple places concurrently. There's a mutex to protect it somewhat in the `QueueBundle`, but it may still race between a change there a "notify producer" send. Here, add one additional `RWMutex` that makes sure to synchronize read access on the map.
|
And actually, while looking at this discovered a minor concurrency bug around producer map access. I patched that in #1236 — can you rebase, and make sure to lock that mutex in the |
|
@brandur thank your for the review. changes pushed. |
Adds dynamic queue removal at runtime. When removed, the queue's producer is stopped and jobs can no longer be worked on that queue. Queues can be re-added after removal. Signed-off-by: Tiago Silva <3629062+tigrato@users.noreply.github.com>
|
@tigrato Thanks! Going to pull this, but I'm going to make a few tweaks on top of it before release. I think the biggest one is that since |
|
@tigrato Actually, just before I do, would you mind signing our CLA over at https://github.com/riverqueue/rivercla? |
This builds on #1235 to bring in a few tweaks: * Removing a queue is a blocking operation because it needs to wait for the producer to finish up its jobs and shut down. It'd be better to provide a way for this not to block forever, so here we add a context parameter to `QueueBundle.Remove` similar to the one taken by `Client.Stop`. If the client becomes done before the producer resolves, `QueueBundle.Remove` falls through with the error. * Add a "stress" test case for `QueueBundle.Remove`. It's meant to detect a deadlock or other concurrency bug in case there is one and gives us a little more confidence that what we have here is right. * Renamed `addProducer` and `removeProducer` to `producerAdd` and `producerRemove` so they sort more nicely against each other. * Add changelogentry.
Adds dynamic queue removal at runtime. When removed, the queue's producer is stopped and jobs can no longer be worked on that queue. Queues can be re-added after removal.