@@ -164,6 +164,7 @@ def initialize(uri,
164164
165165 @on = { event : -> ( _ ) { } , error : -> ( _ ) { } }
166166 @last_id = last_event_id
167+ @query_params_callback = nil
167168
168169 yield self if block_given?
169170
@@ -206,6 +207,36 @@ def on_error(&action)
206207 @on [ :error ] = action
207208 end
208209
210+ #
211+ # Specifies a block or Proc to generate query parameters dynamically. This will be called before
212+ # each connection attempt (both initial connection and reconnections), allowing you to update
213+ # query parameters based on the current client state.
214+ #
215+ # The block should return a Hash with string keys and string values, which will be merged with
216+ # any existing query parameters in the base URI. If the callback raises an exception, it will be
217+ # logged and the connection will proceed with the base URI's query parameters (or no query
218+ # parameters if none were present).
219+ #
220+ # This is useful for scenarios where query parameters need to reflect the current state of the
221+ # client, such as sending a "basis" parameter that represents what data the client already has.
222+ #
223+ # @example Using dynamic query parameters
224+ # client = SSE::Client.new(base_uri) do |c|
225+ # c.query_params do
226+ # {
227+ # "basis" => (selector.state if selector.defined?),
228+ # "filter" => filter_key
229+ # }.compact
230+ # end
231+ # c.on_event { |event| handle_event(event) }
232+ # end
233+ #
234+ # @yieldreturn [Hash<String, String>] a hash of query parameter names to values
235+ #
236+ def query_params ( &action )
237+ @query_params_callback = action
238+ end
239+
209240 #
210241 # Permanently shuts down the client and its connection. No further events will be dispatched. This
211242 # has no effect if called a second time.
@@ -289,8 +320,9 @@ def connect
289320 end
290321 cxn = nil
291322 begin
292- @logger . info { "Connecting to event stream at #{ @uri } " }
293- cxn = @http_client . request ( @method , @uri , build_opts )
323+ uri = build_uri_with_query_params
324+ @logger . info { "Connecting to event stream at #{ uri } " }
325+ cxn = @http_client . request ( @method , uri , build_opts )
294326 if cxn . status . code == 200
295327 content_type = cxn . content_type . mime_type
296328 if content_type && content_type . start_with? ( "text/event-stream" )
@@ -397,5 +429,27 @@ def build_opts
397429 { headers : build_headers , body : resolved_payload . to_s }
398430 end
399431 end
432+
433+ def build_uri_with_query_params
434+ uri = @uri . dup
435+
436+ if @query_params_callback
437+ begin
438+ dynamic_params = @query_params_callback . call
439+ if dynamic_params . is_a? ( Hash ) && !dynamic_params . empty?
440+ existing_params = uri . query ? URI . decode_www_form ( uri . query ) . to_h : { }
441+ merged_params = existing_params . merge ( dynamic_params )
442+ uri . query = URI . encode_www_form ( merged_params )
443+ elsif !dynamic_params . is_a? ( Hash )
444+ @logger . warn { "query_params callback returned non-Hash value: #{ dynamic_params . class } , ignoring" }
445+ end
446+ rescue StandardError => e
447+ @logger . warn { "query_params callback raised an exception: #{ e . inspect } , proceeding with base URI" }
448+ @logger . debug { "Exception trace: #{ e . backtrace } " }
449+ end
450+ end
451+
452+ uri
453+ end
400454 end
401455end
0 commit comments