Skip to content

Commit 01748a5

Browse files
authored
fix: Allow non blocking all the way down to the instance level (#27)
* fix: Allows non-blocking setup_worker method * fix: Ensure we do not block in non-blocking mode
1 parent e7ef335 commit 01748a5

1 file changed

Lines changed: 34 additions & 13 deletions

File tree

lib/leopard/nats_api_server.rb

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def use(klass, *args, &block)
7575
def run(nats_url:, service_opts:, instances: 1, blocking: true)
7676
logger.info 'Booting NATS API server...'
7777
workers = Concurrent::Array.new
78-
pool = spawn_instances(nats_url, service_opts, instances, workers)
78+
pool = spawn_instances(nats_url, service_opts, instances, workers, blocking)
7979
logger.info 'Setting up signal trap...'
8080
trap_signals(workers, pool)
8181
return pool unless blocking
@@ -90,16 +90,16 @@ def run(nats_url:, service_opts:, instances: 1, blocking: true)
9090
# @param url [String] The URL of the NATS server.
9191
# @param opts [Hash] Options for the NATS service.
9292
# @param count [Integer] The number of instances to spawn.
93+
# @param workers [Array] The array to store worker instances.
94+
# @param blocking [Boolean] If false, does not block current thread after starting the server.
9395
#
9496
# @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads.
95-
def spawn_instances(url, opts, count, workers)
97+
def spawn_instances(url, opts, count, workers, blocking)
9698
pool = Concurrent::FixedThreadPool.new(count)
9799
@instance_args = opts.delete(:instance_args) || nil
98100
logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}"
99101
count.times do
100-
eps = endpoints.dup
101-
gps = groups.dup
102-
pool.post { build_worker(url, opts, eps, gps, workers) }
102+
pool.post { build_worker(url, opts, workers, blocking) }
103103
end
104104
pool
105105
end
@@ -108,15 +108,16 @@ def spawn_instances(url, opts, count, workers)
108108
#
109109
# @param url [String] The URL of the NATS server.
110110
# @param opts [Hash] Options for the NATS service.
111-
# @param eps [Array<Hash>] The list of endpoints to add.
112-
# @param gps [Hash] The groups to add.
113111
# @param workers [Array] The array to store worker instances.
112+
# @param blocking [Boolean] If true, blocks the current thread until the worker is set up.
114113
#
115114
# @return [void]
116-
def build_worker(url, opts, eps, gps, workers)
115+
def build_worker(url, opts, workers, blocking)
117116
worker = @instance_args ? new(*@instance_args) : new
118117
workers << worker
119-
worker.setup_worker(url, opts, eps, gps)
118+
return worker.setup_worker!(nats_url: url, service_opts: opts) if blocking
119+
120+
worker.setup_worker(nats_url: url, service_opts: opts)
120121
end
121122

122123
# Shuts down the NATS API server gracefully.
@@ -174,20 +175,28 @@ def logger = self.class.logger
174175

175176
# Sets up a worker thread for the NATS API server.
176177
# This method connects to the NATS server, adds the service, groups, and endpoints,
177-
# and keeps the worker thread alive.
178178
#
179179
# @param url [String] The URL of the NATS server.
180180
# @param opts [Hash] Options for the NATS service.
181181
# @param eps [Array<Hash>] The list of endpoints to add.
182182
# @param gps [Hash] The groups to add.
183183
#
184184
# @return [void]
185-
def setup_worker(url, opts, eps, gps)
185+
def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {})
186186
@thread = Thread.current
187-
@client = NATS.connect url
188-
@service = @client.services.add(**opts)
187+
@client = NATS.connect nats_url
188+
@service = @client.services.add(build_service_opts(service_opts:))
189+
gps = self.class.groups.dup
190+
eps = self.class.endpoints.dup
189191
group_map = add_groups(gps)
190192
add_endpoints eps, group_map
193+
end
194+
195+
# Sets up a worker thread for the NATS API server and blocks the current thread.
196+
#
197+
# @see #setup_worker
198+
def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {})
199+
setup_worker(nats_url:, service_opts:)
191200
sleep
192201
end
193202

@@ -202,6 +211,18 @@ def stop
202211

203212
private
204213

214+
# Builds the service options for the NATS service.
215+
#
216+
# @param service_opts [Hash] Options for the NATS service.
217+
#
218+
# @return [Hash] The complete service options including name and version.
219+
def build_service_opts(service_opts:)
220+
{
221+
name: self.class.name.split('::').join('.'),
222+
version: '0.1.0',
223+
}.merge(service_opts)
224+
end
225+
205226
# Adds groups to the NATS service.
206227
#
207228
# @param gps [Hash] The groups to add, where keys are group names and values are group definitions.

0 commit comments

Comments
 (0)