From a03a7467f1181f03caf551c41343cc1d627d833f Mon Sep 17 00:00:00 2001 From: m0rt3nlund Date: Mon, 14 Oct 2024 10:18:13 +0200 Subject: [PATCH 1/3] Added unsubscribe function that matches on metadata This allows multiple subscriptions to the same topic and later the ability to remove a specific subscription instead of "all" subscriptions for the topic --- lib/phoenix/pubsub.ex | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/phoenix/pubsub.ex b/lib/phoenix/pubsub.ex index 6ae950da..44ae7b0b 100644 --- a/lib/phoenix/pubsub.ex +++ b/lib/phoenix/pubsub.ex @@ -132,6 +132,17 @@ defmodule Phoenix.PubSub do Registry.unregister(pubsub, topic) end + @doc """ + Unsubscribes the caller from the PubSub adapter's topic taking the metadata into consideration + Unlike `unsubscribe/2` this function match on the metadata provided as option when subscribed + + This is usually needed if you have multiple subscriptions for the same topic with different metadata + """ + @spec unsubscribe(t, topic, metadata) :: :ok + def unsubscribe(pubsub, topic, metadata) when is_atom(pubsub) and is_binary(topic) do + Registry.unregister_match(pubsub, topic, metadata) + end + @doc """ Broadcasts message on given topic across the whole cluster. From ab6e358e0362a6f2218705eeb6823a29ffea7a53 Mon Sep 17 00:00:00 2001 From: Dave Lucia Date: Fri, 30 Jan 2026 08:59:29 -0500 Subject: [PATCH 2/3] add tests and more docs --- lib/phoenix/pubsub.ex | 19 +++++++++++++++---- test/shared/pubsub_test.exs | 27 +++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/lib/phoenix/pubsub.ex b/lib/phoenix/pubsub.ex index 44ae7b0b..828c523f 100644 --- a/lib/phoenix/pubsub.ex +++ b/lib/phoenix/pubsub.ex @@ -133,12 +133,23 @@ defmodule Phoenix.PubSub do end @doc """ - Unsubscribes the caller from the PubSub adapter's topic taking the metadata into consideration - Unlike `unsubscribe/2` this function match on the metadata provided as option when subscribed + Unsubscribes the caller from the PubSub adapter's topic taking the metadata into consideration. + + Unlike `unsubscribe/2`, this function matches on the metadata provided as an option when subscribed. + This is useful when you have multiple subscriptions for the same topic with different metadata. + + ## Example + + iex> PubSub.subscribe(:my_pubsub, "users:123", metadata: :fast) + :ok + iex> PubSub.subscribe(:my_pubsub, "users:123", metadata: :slow) + :ok + iex> PubSub.unsubscribe(:my_pubsub, "users:123", :fast) + :ok + # Only the :fast subscription is removed, :slow remains active - This is usually needed if you have multiple subscriptions for the same topic with different metadata """ - @spec unsubscribe(t, topic, metadata) :: :ok + @spec unsubscribe(t, topic, term) :: :ok def unsubscribe(pubsub, topic, metadata) when is_atom(pubsub) and is_binary(topic) do Registry.unregister_match(pubsub, topic, metadata) end diff --git a/test/shared/pubsub_test.exs b/test/shared/pubsub_test.exs index 265cf2de..f26b03a5 100644 --- a/test/shared/pubsub_test.exs +++ b/test/shared/pubsub_test.exs @@ -70,6 +70,33 @@ defmodule Phoenix.PubSubTest do assert subscribers(config, config.topic) |> length == 0 end + @tag pool_size: size + test "pool #{size}: subscribe and unsubscribe with metadata", config do + pid = spawn_pid() + pid2 = spawn_pid() + assert subscribers(config, config.topic) |> length == 0 + + # Subscribe with different metadata variants + assert rpc(pid, fn -> PubSub.subscribe(config.pubsub, config.topic, metadata: :custom) end) + assert rpc(pid, fn -> PubSub.subscribe(config.pubsub, config.topic, metadata: :other) end) + assert rpc(pid2, fn -> PubSub.subscribe(config.pubsub, config.topic) end) + + # Verify all subscriptions exist + assert length(subscribers(config, config.topic)) == 3 + assert {pid, :custom} in subscribers(config, config.topic) + assert {pid, :other} in subscribers(config, config.topic) + assert {pid2, nil} in subscribers(config, config.topic) + + # Unsubscribe only the :custom metadata subscription + assert rpc(pid, fn -> PubSub.unsubscribe(config.pubsub, config.topic, :custom) end) + + # Verify only :custom was removed, others remain + assert length(subscribers(config, config.topic)) == 2 + assert {pid, :other} in subscribers(config, config.topic) + assert {pid2, nil} in subscribers(config, config.topic) + refute {pid, :custom} in subscribers(config, config.topic) + end + @tag pool_size: size test "pool #{size}: broadcast/3 and broadcast!/3 publishes message to each subscriber", config do From 108cac276451300f15bf34dd81f5f43d0c3d37c0 Mon Sep 17 00:00:00 2001 From: Dave Lucia Date: Thu, 5 Feb 2026 16:00:57 -0500 Subject: [PATCH 3/3] unsubscribe/3 -> unsubscribe_match/3 --- lib/phoenix/pubsub.ex | 14 +++++++------- test/shared/pubsub_test.exs | 12 +++++++++--- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/lib/phoenix/pubsub.ex b/lib/phoenix/pubsub.ex index 828c523f..14bf580f 100644 --- a/lib/phoenix/pubsub.ex +++ b/lib/phoenix/pubsub.ex @@ -140,17 +140,17 @@ defmodule Phoenix.PubSub do ## Example - iex> PubSub.subscribe(:my_pubsub, "users:123", metadata: :fast) + iex> PubSub.subscribe_match(:my_pubsub, "users:123", metadata: :fast) :ok - iex> PubSub.subscribe(:my_pubsub, "users:123", metadata: :slow) + iex> PubSub.subscribe_match(:my_pubsub, "users:123", metadata: :slow) :ok - iex> PubSub.unsubscribe(:my_pubsub, "users:123", :fast) + iex> PubSub.unsubscribe_match(:my_pubsub, "users:123", :fast) :ok # Only the :fast subscription is removed, :slow remains active """ - @spec unsubscribe(t, topic, term) :: :ok - def unsubscribe(pubsub, topic, metadata) when is_atom(pubsub) and is_binary(topic) do + @spec unsubscribe_match(t, topic, term) :: :ok + def unsubscribe_match(pubsub, topic, metadata) when is_atom(pubsub) and is_binary(topic) do Registry.unregister_match(pubsub, topic, metadata) end @@ -184,7 +184,7 @@ defmodule Phoenix.PubSub do The default dispatcher will broadcast the message to all subscribers except for the process that initiated the broadcast. - + A custom dispatcher may also be given as a fifth, optional argument. See the "Custom dispatching" section in the module documentation. """ @@ -224,7 +224,7 @@ defmodule Phoenix.PubSub do The default dispatcher will broadcast the message to all subscribers except for the process that initiated the broadcast. - + A custom dispatcher may also be given as a fifth, optional argument. See the "Custom dispatching" section in the module documentation. """ diff --git a/test/shared/pubsub_test.exs b/test/shared/pubsub_test.exs index f26b03a5..5007b044 100644 --- a/test/shared/pubsub_test.exs +++ b/test/shared/pubsub_test.exs @@ -77,8 +77,14 @@ defmodule Phoenix.PubSubTest do assert subscribers(config, config.topic) |> length == 0 # Subscribe with different metadata variants - assert rpc(pid, fn -> PubSub.subscribe(config.pubsub, config.topic, metadata: :custom) end) - assert rpc(pid, fn -> PubSub.subscribe(config.pubsub, config.topic, metadata: :other) end) + assert rpc(pid, fn -> + PubSub.subscribe(config.pubsub, config.topic, metadata: :custom) + end) + + assert rpc(pid, fn -> + PubSub.subscribe(config.pubsub, config.topic, metadata: :other) + end) + assert rpc(pid2, fn -> PubSub.subscribe(config.pubsub, config.topic) end) # Verify all subscriptions exist @@ -88,7 +94,7 @@ defmodule Phoenix.PubSubTest do assert {pid2, nil} in subscribers(config, config.topic) # Unsubscribe only the :custom metadata subscription - assert rpc(pid, fn -> PubSub.unsubscribe(config.pubsub, config.topic, :custom) end) + assert rpc(pid, fn -> PubSub.unsubscribe_match(config.pubsub, config.topic, :custom) end) # Verify only :custom was removed, others remain assert length(subscribers(config, config.topic)) == 2