From 658046dcc54585e6ee69e32c6c7205f30c466253 Mon Sep 17 00:00:00 2001 From: Banchenko Aleksey Date: Mon, 13 Apr 2026 15:47:14 +0300 Subject: [PATCH 1/3] MSP-7279 add delete queues --- go.mod | 2 +- go.sum | 4 ++-- grmqx/client.go | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d05170f..05315fe 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/txix-open/bellows v1.2.0 github.com/txix-open/bgjob v1.6.0 github.com/txix-open/etp/v4 v4.0.1 - github.com/txix-open/grmq v1.10.0 + github.com/txix-open/grmq v1.11.0 github.com/txix-open/jsonschema v1.3.0 github.com/txix-open/validator/v10 v10.0.0-20250506161033-f8ce404fffdb github.com/xeipuuv/gojsonschema v1.2.0 diff --git a/go.sum b/go.sum index 7e6a1f1..7ee8ce4 100644 --- a/go.sum +++ b/go.sum @@ -170,8 +170,8 @@ github.com/txix-open/bgjob v1.6.0 h1:Vwj9cAsIhMrHPKRZVxfg6mHqE9LI/atDCq1aeihH3+I github.com/txix-open/bgjob v1.6.0/go.mod h1:aKbwpVclWmq82s7fchazp099yWjEpVKFkxT85i4hARM= github.com/txix-open/etp/v4 v4.0.1 h1:5VSYgGsvnMz0tvUHsjVWFETI0qWBTCF1O3ARCHaCIgA= github.com/txix-open/etp/v4 v4.0.1/go.mod h1:FIu7TUDdwcgmtmF6tFTV1bGA2fRY/iVaPQdsGzvqYc8= -github.com/txix-open/grmq v1.10.0 h1:1bXuF+7TxtUvoh+Wn6SOCYpVACFbWDuDmz/+tKwW9Ms= -github.com/txix-open/grmq v1.10.0/go.mod h1:EyFOP17KrwEFOUjR+PH8iH3LCtiN9TSQ8t0ysyk62po= +github.com/txix-open/grmq v1.11.0 h1:MJX10Sbspv21tKKYv9AqnSF1yMBrTa1UcwSDdoi19K0= +github.com/txix-open/grmq v1.11.0/go.mod h1:6m9CCTQao/HAFalu5rFOZGJ5KLEP6WzzIwj80Ml3KGE= github.com/txix-open/jsonschema v1.3.0 h1:XGuOTg22G7PdqfKypjajM20/i5FyGfFYsJKmheebKi4= github.com/txix-open/jsonschema v1.3.0/go.mod h1:l8YDZ1nvJrw6uxWowSVOxCV/ebiMJyapffW87ZEqH00= github.com/txix-open/validator/v10 v10.0.0-20250506161033-f8ce404fffdb h1:UJgT4u/QMv5QHKOQeJ7igShHa36c2/vIRqJiRLdDlf0= diff --git a/grmqx/client.go b/grmqx/client.go index 7766273..d98934b 100644 --- a/grmqx/client.go +++ b/grmqx/client.go @@ -66,6 +66,42 @@ func (c *Client) Close() { } } +// QueuesDelete removes queues on the broker by name using underlying grmq.UnsafeConnection. +func (c *Client) QueuesDelete(ctx context.Context, queueNames ...string) error { + if len(queueNames) == 0 { + return nil + } + c.lock.Lock() + defer c.lock.Unlock() + if c.cli == nil { + return errors.New("client is not initialized") + } + conn := c.cli.UnsafeConnection() + if conn == nil { + return errors.New("rabbit mq is not connected") + } + ch, err := conn.Channel() + if err != nil { + return errors.WithMessage(err, "open channel") + } + defer ch.Close() + + for _, name := range queueNames { + if name == "" { + continue + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if _, err := ch.QueueDelete(name, false, false, false); err != nil { + c.logger.Warn(ctx, "failed delete queue ", log.String("name", name)) + } + } + return nil +} + func (c *Client) upgrade(ctx context.Context, config Config, justServe bool) error { c.lock.Lock() defer c.lock.Unlock() From bf7b20eb7e590065a7cf0b5b2b2aa29e78674895 Mon Sep 17 00:00:00 2001 From: Banchenko Aleksey Date: Mon, 13 Apr 2026 15:52:50 +0300 Subject: [PATCH 2/3] MSP-7279 add test delete --- grmqx/grmqx_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/grmqx/grmqx_test.go b/grmqx/grmqx_test.go index c8c064c..892dc62 100644 --- a/grmqx/grmqx_test.go +++ b/grmqx/grmqx_test.go @@ -173,3 +173,54 @@ func TestRecovery(t *testing.T) { require.EqualValues(1, cli.QueueLength("test.DLQ")) } + +func TestQueuesDelete(t *testing.T) { + t.Parallel() + test, require := test.New(t) + + const queueName = "queue_to_delete" + consumerCfg := grmqx.Consumer{ + Queue: queueName, + } + + testCli := grmqt.New(test) + config := grmqx.NewConfig( + testCli.ConnectionConfig().Url(), + grmqx.WithDeclarations(grmqx.TopologyFromConsumers(consumerCfg)), + ) + err := testCli.GrmqxCli.Upgrade(t.Context(), config) + require.NoError(err) + + require.True(queueExists(t, testCli.ConnectionConfig().Url(), queueName)) + + err = testCli.GrmqxCli.QueuesDelete(t.Context(), queueName) + require.NoError(err) + + require.False(queueExists(t, testCli.ConnectionConfig().Url(), queueName)) +} + +func TestQueuesDeleteUninitializedClient(t *testing.T) { + t.Parallel() + test, require := test.New(t) + + cli := grmqx.New(test.Logger()) + err := cli.QueuesDelete(t.Context(), "some-queue") + require.EqualError(err, "client is not initialized") +} + +func queueExists(t *testing.T, url string, queue string) bool { + conn, err := amqp091.Dial(url) + if err != nil { + t.Fatalf("dial rabbit mq: %v", err) + } + defer conn.Close() + + ch, err := conn.Channel() + if err != nil { + t.Fatalf("open channel: %v", err) + } + defer ch.Close() + + _, err = ch.QueueInspect(queue) + return err == nil +} From 9a4434b156928402c9e29057030446b8e122aeff Mon Sep 17 00:00:00 2001 From: Banchenko Aleksey Date: Mon, 13 Apr 2026 15:54:31 +0300 Subject: [PATCH 3/3] MSP-7279 bump version to 1.66.4 --- .version | 2 +- CHANGELOG.md | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.version b/.version index 3cb930a..26c8251 100644 --- a/.version +++ b/.version @@ -1 +1 @@ -1.66.3 +1.66.4 diff --git a/CHANGELOG.md b/CHANGELOG.md index f368538..0f41960 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +## v1.66.4 +* Добавлен функционал удаления очередей для rmq клиента ## v1.66.3 * Добавлена опция `LogCombined` объединяющая логи `request` и `response` для `hhtpclix.LogConfigToContext` * Добавлена middleware `LogWithOptions` в пакет `grpc/client` принимающая опции `WithLogBody`, `WithLogResponseBody`, `WithLogRequestBody`, `WithCombinedLog`