Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,54 @@ end

## Middleware

Add custom middleware to the Rack stack:
### Invocation Middleware

Invocation middleware wraps handler execution at the SDK level. This is different from Rack middleware — it runs inside the Restate invocation lifecycle and has access to the handler and context.

```ruby
Restate.endpoint.define do
# Inbound: wraps handler execution
use MyInboundMiddleware.new

# Outbound: wraps service-to-service calls
use_outbound MyOutboundMiddleware.new

mount MyService
end
```

Write your own middleware by implementing `#call` and yielding:

```ruby
class TimingMiddleware
def call(handler, context)
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
result = yield
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
puts "#{handler.name} took #{elapsed.round(3)}s"
result
end
end
```

### Built-in: Deadlock Detection

The SDK ships with deadlock detection middleware that catches re-entrant VirtualObject calls that would otherwise block forever:

```ruby
Restate.endpoint.define do
use Restate::Middleware::DeadlockDetection::Inbound.new
use_outbound Restate::Middleware::DeadlockDetection::Outbound.new

mount MyVirtualObject
end
```

If an exclusive handler on VO key "x" calls another exclusive handler on the same VO key "x", the middleware raises a `DeadlockError` (409) immediately instead of hanging. See `examples/middleware/` for a complete example.

### Rack Middleware

For HTTP-level middleware, use the Rack middleware stack:

```ruby
middleware = Restate::MiddlewareStack.new
Expand Down
108 changes: 108 additions & 0 deletions examples/middleware/deadlock_detection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# frozen_string_literal: true

# Deadlock Detection Middleware Example
#
# This example demonstrates how the DeadlockDetection middleware catches
# re-entrant VirtualObject calls that would otherwise block forever.
#
# == The Problem
#
# Restate VirtualObjects serialize exclusive handler access per key. If an
# exclusive handler on key "x" calls another exclusive handler on the same
# VO key "x", the second call waits for the first to finish — which never
# happens because the first is waiting for the second. Deadlock.
#
# == The Solution
#
# The DeadlockDetection middleware tracks which VO keys are held by the
# current call chain and raises immediately when a call would deadlock,
# giving you a clear error instead of a silent hang.
#
# == Running
#
# falcon serve -c examples/middleware/deadlock_detection.rb
#
# Then trigger the deadlock:
#
# curl -X POST http://localhost:8080/Account/my-account/transfer \
# -H 'content-type: application/json' \
# -d '{"to_account": "my-account", "amount": 100}'
#
# Without the middleware, this call would hang forever.
# With it, you get an immediate 409 error explaining the deadlock.

require "restate"

class Account < Restate::VirtualObject
state :balance, Integer, default: 0

handler :deposit
def deposit(input)
amount = input["amount"]
self.balance += amount
{ balance: balance }
end

handler :withdraw
def withdraw(input)
amount = input["amount"]
raise Restate::TerminalError.new("Insufficient funds", 400) if balance < amount

self.balance -= amount
{ balance: balance }
end

# This handler demonstrates a potential deadlock. If `to_account` is the same
# as this object's key, the call to Account.call(to_account).deposit(...)
# would deadlock — we already hold the exclusive lock on this key.
#
# The DeadlockDetection middleware catches this and raises immediately.
handler :transfer
def transfer(input)
to_account = input["to_account"]
amount = input["amount"]

raise Restate::TerminalError.new("Insufficient funds", 400) if balance < amount

self.balance -= amount

# If to_account == key, this call targets the same VO key we're holding.
# Without deadlock detection, it hangs forever.
# With deadlock detection, it raises DeadlockError immediately.
Account.call(to_account).deposit(amount: amount)

{ balance: balance }
end

shared :get_balance
def get_balance
{ balance: balance }
end
end

# Configure with deadlock detection middleware
Restate.configure do |config|
config.bind = "http://localhost:4100"
config.ingress_url = ENV.fetch("RESTATE_INGRESS_URL", "http://localhost:8080")
config.admin_url = ENV.fetch("RESTATE_ADMIN_URL", "http://localhost:9070")
end

Restate.endpoint.define do
# Register both inbound and outbound deadlock detection
use Restate::Middleware::DeadlockDetection::Inbound.new
use_outbound Restate::Middleware::DeadlockDetection::Outbound.new

mount Account
end

# For Falcon: falcon serve -c examples/middleware/deadlock_detection.rb
if $0 == __FILE__
require "falcon/environment/server"

service "restate" do
include Falcon::Environment::Server
count 1
url { Restate.config.bind }
middleware { Falcon::Server.middleware(Restate.endpoint.to_rack_app, verbose:, cache:) }
end
end
4 changes: 4 additions & 0 deletions lib/restate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ def initialize(message, status_code = 500)
autoload :Client, "restate/client"
autoload :InvocationMiddleware, "restate/invocation_middleware"

module Middleware
autoload :DeadlockDetection, "restate/middleware/deadlock_detection"
end

@config = nil

class << self
Expand Down
55 changes: 54 additions & 1 deletion lib/restate/endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def initialize(protocol: nil, identity_keys: nil)
@services = {}
@protocol = protocol
@identity_keys = identity_keys || []
@inbound_middleware = []
@outbound_middleware = []
end

# @return [Symbol, nil] The protocol mode
Expand All @@ -35,6 +37,12 @@ def initialize(protocol: nil, identity_keys: nil)
# @return [Array<String>] Identity verification keys
attr_reader :identity_keys

# @return [Array] Inbound invocation middleware
attr_reader :inbound_middleware

# @return [Array] Outbound invocation middleware
attr_reader :outbound_middleware

# Returns all registered services.
#
# @return [Array<Class>] Array of service classes
Expand All @@ -51,6 +59,30 @@ def handler?(service_name, handler_name)
!!(@services[service_name]&.handler?(handler_name))
end

# Adds inbound invocation middleware.
#
# Inbound middleware wraps handler execution. It receives the handler and
# context, and must yield to continue the chain.
#
# @param middleware [Object] A middleware instance responding to #call(handler, context)
# @return [self]
def use(middleware)
@inbound_middleware << middleware
self
end

# Adds outbound invocation middleware.
#
# Outbound middleware wraps service-to-service calls. It receives the target
# service, handler, and a mutable headers hash, and must yield to continue.
#
# @param middleware [Object] A middleware instance responding to #call(service, handler, headers)
# @return [self]
def use_outbound(middleware)
@outbound_middleware << middleware
self
end

# Adds a service to the endpoint.
#
# @param service [Class] Service class to add
Expand Down Expand Up @@ -82,6 +114,8 @@ def add(service, as: nil)
# end
def define(&)
@services.clear
@inbound_middleware.clear
@outbound_middleware.clear
Mapper.new(self).instance_exec(&)
self
end
Expand Down Expand Up @@ -147,7 +181,10 @@ def invoke(service_name, handler_name, connection)

Context.class_for(service.service_kind, handler.kind).new(connection).wrap do |context|
serialized_input = connection.request.body
handler.call(context, serialized_input)

InvocationMiddleware.invoke(@inbound_middleware, handler, context) do
handler.call(context, serialized_input)
end
end
end

Expand Down Expand Up @@ -184,6 +221,22 @@ def initialize(endpoint)
@endpoint = endpoint
end

# Adds inbound invocation middleware.
#
# @param middleware [Object] A middleware instance
# @return [void]
def use(middleware)
@endpoint.use(middleware)
end

# Adds outbound invocation middleware.
#
# @param middleware [Object] A middleware instance
# @return [void]
def use_outbound(middleware)
@endpoint.use_outbound(middleware)
end

# Mounts a service class.
#
# @param service_class [Class] Service class to mount
Expand Down
82 changes: 82 additions & 0 deletions lib/restate/invocation_middleware.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# frozen_string_literal: true

module Restate
# Middleware executed around each Restate handler invocation.
#
# Invocation middleware wraps the actual handler execution, giving you hooks
# to inspect or modify the invocation before and after the handler runs.
# This is distinct from Rack middleware, which wraps the HTTP layer.
#
# Middleware must implement +#call(handler, context)+ and yield to invoke the
# next middleware (or the handler itself). The return value of +yield+ is the
# handler's encoded output.
#
# @example Implementing a middleware
# class LoggingMiddleware
# def call(handler, context)
# puts "Before #{handler.name}"
# result = yield
# puts "After #{handler.name}"
# result
# end
# end
#
# @example Registering middleware on an endpoint
# Restate.endpoint.define do
# use LoggingMiddleware.new
# mount MyService
# end
#
# == Outbound middleware
#
# Outbound middleware wraps calls from one service to another. It receives
# the target service name, handler name, and a mutable headers hash. Yield
# to continue the call.
#
# @example Implementing outbound middleware
# class HeaderInjector
# def call(service, handler, headers)
# headers["x-custom"] = "value"
# yield
# end
# end
module InvocationMiddleware
# Builds a callable chain from an ordered list of middleware instances
# and a terminal block (the actual handler invocation).
#
# @param middlewares [Array] Middleware instances responding to #call
# @param handler [Restate::Handler] The handler being invoked
# @param context [Restate::Context] The invocation context
# @yield The terminal action (handler invocation)
# @return [Object] The result of the middleware chain
def self.invoke(middlewares, handler, context, &terminal)
if middlewares.empty?
terminal.call
else
chain = middlewares.reverse.reduce(terminal) do |next_step, mw|
proc { mw.call(handler, context) { next_step.call } }
end
chain.call
end
end

# Builds a callable chain for outbound (service-to-service) middleware.
#
# @param middlewares [Array] Middleware instances responding to #call
# @param service [String] Target service name
# @param handler [String] Target handler name
# @param headers [Hash] Mutable headers hash
# @yield The terminal action (the actual outbound call)
# @return [Object] The result of the middleware chain
def self.invoke_outbound(middlewares, service, handler, headers, &terminal)
if middlewares.empty?
terminal.call
else
chain = middlewares.reverse.reduce(terminal) do |next_step, mw|
proc { mw.call(service, handler, headers) { next_step.call } }
end
chain.call
end
end
end
end
Loading