Service API provides a simple way to create microservices that leverage NATS for scalability, load management, and observability. The Service API allows your services to be discovered, queried for status and schema information without additional work.
The core of the Service API is a service. It encapsulates the application logic, collects statistics, and provides additional info.
You can create a service with:
client = NATS.connect
service = client.services.add(options)The options are:
:name- the kind of a service. Multiple services can have the same name. This name can only contain A-Z, a-z, 0-9, dash, and underscore.:version- a service version in the form of a SemVer string.:description(optional) - a human-readable description about a service.:metadata(optional) - a hash that holds additional information about a service.:queue(optional) - a queue group.
While multiple service instances can share the same name, each service has a unique id that is generated upon its creation:
client = NATS.connect
service = client.services.add(
name: "calc",
version: "1.0.0"
)
service.id
# ZrhOTJwPyGeHhM6K257pwlAn endpoint encapsulates the service logic and creates a subscription to handle the requests:
service.endpoints.add(name, options) do |message|
...
endname is an alphanumeric human-readable string that describes the endpoint. Multiple endpoints can have the same names.
Options can contain:
:subject(optional) - an optional NATS subject on which the endpoint will be registered. Defaults toname.:metadata(optional) - a hash containing additional information about an endpoint.:queue(optional) - an override for a service and group.
After creating an endpoint you can publish a request on its subject:
service.endpoints.add("hi") do |message|
message.respond("Hi!")
end
client.request("hi")
# Hi!You can also create multiple endpoints:
client = NATS.connect
service = client.services.add(
name: "calc",
version: "1.0.0"
)
service.endpoints.add("min") do |message|
min = JSON.parse(message.data).min
message.respond(min.to_json)
end
service.endpoints.add("max") do |message|
max = JSON.parse(message.data).max
message.respond(max.to_json)
end
min = client.request("min", [5, 100, -7, 34].to_json)
max = client.request("max", [5, 100, -7, 34].to_json)Endpoints can be aggregated using groups. A group represents a common subject prefix used by all endpoints associated with it.
group = service.groups(name)The name can be anything that is a valid subject prefix. You can pass a queue to a group that will be used for all its endpoints:
group = service.groups(name, queue: "queue")When you add an endpoint to a group, the endpoint is registered on the subject created by concatenating the group name and the endpoint subject:
group = service.groups("numbers")
group.endpoints.add("sum") do |message|
sum = JSON.parse(message.data).sum
message.respond(sum.to_json)
end
client.request("numbers.sum", [1, 2, 3, 4].to_json)You can build a subject hierarchy for your services by creating nested groups:
numbers = service.groups.add("numbers")
aggregation = numbers.groups.add("agg")
aggregation.endpoints.add("avg") do |message|
data = JSON.parse(message.data)
avg = data.sum / data.size.to_f
message.respond(avg.to_json)
end
client.request("numbers.agg.avg", [3, 4, 5, 7].to_json)Every time you create a service or add endpoints to an already existing service, a few subscriptions are created under the hood to handle the service's internal work. If your service finishes its job, you can stop it and drain all its subscriptions with:
service.stopThe service is automatically stopped whenever a NATS-related error occurs during service work.
You can use on_stop callback to handle the error and gracefully finish the service work:
service.on_stop do |error|
puts "Server stopped due to #{error.message}"
end
service.endpoints.add("error") do |message|
raise NATS::IO::ServerError
end
client.request("error")
# Server stopped due to NATS::IO::ServerErrorIf an error occurs in an endpoint, the service will communicate request errors
back to the client with the headers Nats-Service-Error and Nats-Service-Error-Code:
service.endpoints.add("divide") do |message|
dividend, divisor = JSON.parse(message.data)
message.respond((dividend / divisor).to_json)
end
client.request("divide", [5, 0].to_json)
# NATS::Msg(reply: "", data: "", header={"Nats-Service-Error"=>"divided by 0", "Nats-Service-Error-Code"=>"500"})You can also manually send an error message with respond_with_error method:
service.endpoints.add("divide") do |message|
dividend, divisor = JSON.parse(message.data)
if divisor.zero?
message.respond_with_error("It's impossible to divide by zero")
else
message.respond((dividend / divisor).to_json)
end
end
client.request("divide", [5, 0].to_json)
# NATS::Msg(reply: "", data: "", header={"Nats-Service-Error"=>"It's impossible to divide by zero", "Nats-Service-Error-Code"=>"500"})A service collects different stats during its work, which you can access via stats method:
{
name: string,
id: string,
version: string,
metadata: hash,
started: string # ISO Date string when the service started in UTC timezone
endpoints: [
{
name: string, # The name of the endpoint
subject: string, # The subject on which the endpoint is listening
queue_group: string, # Queue group to which this endpoint is assigned to
num_requests: number, # The number of requests received by the endpoint
num_errors: number, # Number of errors that the endpoint has raised
last_error: string, # If set, the last error triggered by the endpoint
data: object, # A field that can be customized with any data as returned by on_stats callback
processing_time: integer, # Total processing time for the service in nanoseconds
average_processing_time: integer, # Average processing time in nanoseconds
}
]
}You can define on_stats callback to add additional metrics to stats:
service.on_stats do |endpoint|
{ object_id: endpoint.object_id }
end
service.endpoints.add("sum") do |message|
...
end
service.stats
# {
# :name=>"calc",
# :id=>"ZrhOTJwPyGeHhM6K257pwl",
# :version=>"1.0.0",
# :metadata=>nil,
# :started=>"2025-01-01T15:47:14Z",
# :endpoints=>[
# {
# :name=>"sum",
# :subject=>"divide",
# :queue_group=>"q",
# :num_requests=>1,
# :processing_time=>34000,
# :average_processing_time=>34000,
# :num_errors=>0,
# :last_error=>"",
# :data=>{:object_id=>26980}
# }
# ]
# }Using the specified name and automatically generated id, the service automatically
creates subscriptions to handle discovery and monitoring requests. The subject for
discovery and requests is prefixed by $SRV with the following verbs:
PING- used for service discovery and RTT calculationINFOreturns service configuration details (used subjects, service metadata, etc.)STATS- service statistics
Each of those operations can be performed on three subjects:
$SRV.PING|STATS|INFO- pings and retrieves status for all services$SRV.PING|STATS|INFO.<name>- pings or retrieves status for all services having the specified name$SRV.PING|STATS|INFO.<name>.<id>- pings or retrieves status of a particular service instance
service = client.services.add(
name: "calc",
version: "1.0.0"
)
client.request("$SRV.PING.calc")
client.request("$SRV.INFO.calc")
client.request("$SRV.STATS.calc")For more examples, refer to examples/service_api directory.