diff --git a/lib/phoenix/pubsub.ex b/lib/phoenix/pubsub.ex index 6ae950da..14bf580f 100644 --- a/lib/phoenix/pubsub.ex +++ b/lib/phoenix/pubsub.ex @@ -132,6 +132,28 @@ defmodule Phoenix.PubSub do Registry.unregister(pubsub, topic) end + @doc """ + Unsubscribes the caller from the PubSub adapter's topic taking the metadata into consideration. + + Unlike `unsubscribe/2`, this function matches on the metadata provided as an option when subscribed. + This is useful when you have multiple subscriptions for the same topic with different metadata. + + ## Example + + iex> PubSub.subscribe_match(:my_pubsub, "users:123", metadata: :fast) + :ok + iex> PubSub.subscribe_match(:my_pubsub, "users:123", metadata: :slow) + :ok + iex> PubSub.unsubscribe_match(:my_pubsub, "users:123", :fast) + :ok + # Only the :fast subscription is removed, :slow remains active + + """ + @spec unsubscribe_match(t, topic, term) :: :ok + def unsubscribe_match(pubsub, topic, metadata) when is_atom(pubsub) and is_binary(topic) do + Registry.unregister_match(pubsub, topic, metadata) + end + @doc """ Broadcasts message on given topic across the whole cluster. @@ -162,7 +184,7 @@ defmodule Phoenix.PubSub do The default dispatcher will broadcast the message to all subscribers except for the process that initiated the broadcast. - + A custom dispatcher may also be given as a fifth, optional argument. See the "Custom dispatching" section in the module documentation. """ @@ -202,7 +224,7 @@ defmodule Phoenix.PubSub do The default dispatcher will broadcast the message to all subscribers except for the process that initiated the broadcast. - + A custom dispatcher may also be given as a fifth, optional argument. See the "Custom dispatching" section in the module documentation. """ diff --git a/test/shared/pubsub_test.exs b/test/shared/pubsub_test.exs index 265cf2de..5007b044 100644 --- a/test/shared/pubsub_test.exs +++ b/test/shared/pubsub_test.exs @@ -70,6 +70,39 @@ defmodule Phoenix.PubSubTest do assert subscribers(config, config.topic) |> length == 0 end + @tag pool_size: size + test "pool #{size}: subscribe and unsubscribe with metadata", config do + pid = spawn_pid() + pid2 = spawn_pid() + assert subscribers(config, config.topic) |> length == 0 + + # Subscribe with different metadata variants + assert rpc(pid, fn -> + PubSub.subscribe(config.pubsub, config.topic, metadata: :custom) + end) + + assert rpc(pid, fn -> + PubSub.subscribe(config.pubsub, config.topic, metadata: :other) + end) + + assert rpc(pid2, fn -> PubSub.subscribe(config.pubsub, config.topic) end) + + # Verify all subscriptions exist + assert length(subscribers(config, config.topic)) == 3 + assert {pid, :custom} in subscribers(config, config.topic) + assert {pid, :other} in subscribers(config, config.topic) + assert {pid2, nil} in subscribers(config, config.topic) + + # Unsubscribe only the :custom metadata subscription + assert rpc(pid, fn -> PubSub.unsubscribe_match(config.pubsub, config.topic, :custom) end) + + # Verify only :custom was removed, others remain + assert length(subscribers(config, config.topic)) == 2 + assert {pid, :other} in subscribers(config, config.topic) + assert {pid2, nil} in subscribers(config, config.topic) + refute {pid, :custom} in subscribers(config, config.topic) + end + @tag pool_size: size test "pool #{size}: broadcast/3 and broadcast!/3 publishes message to each subscriber", config do