diff --git a/.rubocop.yml b/.rubocop.yml index baa9c10..5a4ab7c 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -37,8 +37,8 @@ Naming/MethodParameterName: # Complex files need relaxed metrics Metrics/ClassLength: Exclude: - - "lib/restate/server.rb" - - "lib/restate/server_context.rb" + - "lib/restate/server/handler.rb" + - "lib/restate/server/context.rb" - "lib/restate/testing.rb" - "lib/restate/vm.rb" @@ -47,8 +47,8 @@ Metrics/MethodLength: - "examples/**/*" - "lib/restate/discovery.rb" - "lib/restate/handler.rb" - - "lib/restate/server.rb" - - "lib/restate/server_context.rb" + - "lib/restate/server/handler.rb" + - "lib/restate/server/context.rb" - "lib/restate/service.rb" - "lib/restate/testing.rb" - "lib/restate/virtual_object.rb" @@ -57,39 +57,39 @@ Metrics/MethodLength: Metrics/AbcSize: Exclude: - - "lib/restate/server.rb" - - "lib/restate/server_context.rb" + - "lib/restate/server/handler.rb" + - "lib/restate/server/context.rb" - "lib/restate/vm.rb" Metrics/CyclomaticComplexity: Exclude: - - "lib/restate/server.rb" - - "lib/restate/server_context.rb" + - "lib/restate/server/handler.rb" + - "lib/restate/server/context.rb" - "lib/restate/vm.rb" Metrics/PerceivedComplexity: Exclude: - - "lib/restate/server.rb" - - "lib/restate/server_context.rb" + - "lib/restate/server/handler.rb" + - "lib/restate/server/context.rb" - "lib/restate/vm.rb" Metrics/BlockLength: Exclude: - - "lib/restate/server.rb" - - "lib/restate/server_context.rb" + - "lib/restate/server/handler.rb" + - "lib/restate/server/context.rb" Metrics/ParameterLists: Exclude: - "lib/restate.rb" - "lib/restate/context.rb" - - "lib/restate/server_context.rb" + - "lib/restate/server/context.rb" - "lib/restate/virtual_object.rb" - "lib/restate/vm.rb" # The default branch handles the empty accept header case Lint/DuplicateBranch: Exclude: - - "lib/restate/server.rb" + - "lib/restate/server/handler.rb" # These marker classes are intentionally empty (used as type tags) Lint/EmptyClass: diff --git a/docs/INTERNALS.md b/docs/INTERNALS.md index 89eee44..b7aa312 100644 --- a/docs/INTERNALS.md +++ b/docs/INTERNALS.md @@ -15,10 +15,10 @@ the codebase. Falcon (HTTP/2 server, fiber-based via Async) │ Rack 3 interface ▼ - Restate::Server ← lib/restate/server.rb + Restate::Server::Handler ← lib/restate/server/handler.rb │ routes requests, manages streaming I/O ▼ - Restate::ServerContext ← lib/restate/server_context.rb + Restate::Server::Context ← lib/restate/server/context.rb │ progress loop, context API for handlers ▼ Restate::VMWrapper ← lib/restate/vm.rb @@ -123,7 +123,7 @@ Thin Ruby wrapper that: 3. Maps native result types to Ruby-side types (e.g., `Internal::Suspended` → `Restate::Suspended`, `Internal::Failure` → `Restate::Failure`). 4. Catches `Internal::VMError` from `do_progress`/`take_notification` and returns it as a value - (not raised), so `ServerContext` can handle it. + (not raised), so `Server::Context` can handle it. **Key types defined here:** - `Invocation` Struct: `{invocation_id, random_seed, headers, input_buffer, key}` @@ -153,7 +153,7 @@ forced protocol setting. This is the most complex part. See [Invocation Execution Flow](#invocation-execution-flow) below. -### ServerContext (`lib/restate/server_context.rb`) +### Server::Context (`lib/restate/server/context.rb`) The context object that backs the `Restate.*` top-level API. Implements: @@ -277,7 +277,7 @@ Useful for long-running handlers that need to flush work or perform cleanup befo | `DisconnectedError` | HTTP connection lost | No (control flow) | `SuspendedError` and `InternalError` are dangerous because bare `rescue => e` in user handlers -will catch them. `ServerContext#enter` walks the exception cause chain to detect this. +will catch them. `Server::Context#enter` walks the exception cause chain to detect this. ### Discovery (`lib/restate/discovery.rb`) @@ -338,7 +338,7 @@ After the VM is ready: 1. `vm.sys_input` returns the `Invocation` (id, headers, input buffer, key). 2. A background **input reader** Async task continues reading remaining HTTP body into `input_queue`. -3. A `ServerContext` is created with the VM, handler, invocation, output callback, and input queue. +3. A `Server::Context` is created with the VM, handler, invocation, output callback, and input queue. ### Phase 3: Handler Execution (Async task) @@ -717,8 +717,8 @@ so naive checks like `break unless output` create infinite loops. **Always use**: `break if output.nil? || output.empty?` This applies in two places: -- `ServerContext#flush_output` (progress loop output drain) -- `Server#process_invocation` (final output drain after handler completes) +- `Server::Context#flush_output` (progress loop output drain) +- `Server::Handler#process_invocation` (final output drain after handler completes) ### 3. Async::Queue, Not Thread::Queue @@ -746,7 +746,7 @@ passing them to `notify_input`, `sys_set_state`, `sys_write_output_success`, etc ### 6. Error Handling in Handlers `SuspendedError` and `InternalError` are internal control flow exceptions. User handlers that -use bare `rescue => e` will accidentally catch these. The `ServerContext#enter` method walks the +use bare `rescue => e` will accidentally catch these. The `Server::Context#enter` method walks the exception cause chain to detect wrapped internal exceptions. ### 7. `Internal::Failure.new` Requires 3 Arguments diff --git a/lib/restate.rb b/lib/restate.rb index 0e02ed4..dfdff12 100644 --- a/lib/restate.rb +++ b/lib/restate.rb @@ -11,7 +11,6 @@ require_relative 'restate/service' require_relative 'restate/virtual_object' require_relative 'restate/workflow' -require_relative 'restate/server_context' require_relative 'restate/durable_future' require_relative 'restate/discovery' require_relative 'restate/endpoint' diff --git a/lib/restate/endpoint.rb b/lib/restate/endpoint.rb index 02d2dd9..8361de3 100644 --- a/lib/restate/endpoint.rb +++ b/lib/restate/endpoint.rb @@ -133,8 +133,8 @@ def use_outbound(klass, *args, **kwargs) # Build and return the Rack-compatible application. def app - require_relative 'server' - Server.new(self) + require_relative 'server/handler' + Server::Handler.new(self) end private diff --git a/lib/restate/server.rb b/lib/restate/server.rb index 844cf7e..e2b9375 100644 --- a/lib/restate/server.rb +++ b/lib/restate/server.rb @@ -1,266 +1,9 @@ -# typed: ignore # frozen_string_literal: true -require 'async' -require 'async/queue' -require 'logger' - -module Restate - # Rack-compatible application that handles Restate protocol requests. - # Designed to work with Falcon for HTTP/2 bidirectional streaming. - # - # Routes: - # GET /discover → service manifest - # GET /health → health check - # POST /invoke/:service/:handler → handler invocation - class Server - SDK_VERSION = Internal::SDK_VERSION - X_RESTATE_SERVER = "restate-sdk-ruby/#{SDK_VERSION}".freeze - - LOGGER = Logger.new($stdout, progname: 'Restate::Server') - - def initialize(endpoint) - @endpoint = endpoint - @identity_verifier = Internal::IdentityVerifier.new(endpoint.identity_keys) - end - - # Rack interface - def call(env) - path = env['PATH_INFO'] || '/' - parsed = parse_path(path) - - case parsed[:type] - when :health - health_response - when :discover - handle_discover(env) - when :invocation - handle_invocation(env, parsed[:service], parsed[:handler]) - else - not_found_response - end - rescue StandardError => e - LOGGER.error("Exception in Restate server: #{e.inspect}") - LOGGER.error(e.backtrace&.join("\n")) if e.backtrace - error_response(500, 'Internal server error') - end - - private - - def parse_path(path) - segments = path.split('/').reject(&:empty?) - - # Check for /invoke/:service/:handler - if segments.length >= 3 - invoke_idx = segments.rindex('invoke') - if invoke_idx && segments.length > invoke_idx + 2 - return { - type: :invocation, - service: segments[invoke_idx + 1], - handler: segments[invoke_idx + 2] - } - end - end - - case segments.last - when 'health' - { type: :health } - when 'discover' - { type: :discover } - else - { type: :unknown } - end - end - - def health_response - [200, { 'content-type' => 'application/json', 'x-restate-server' => X_RESTATE_SERVER }, ['{"status":"ok"}']] - end - - def not_found_response - [404, { 'x-restate-server' => X_RESTATE_SERVER }, ['']] - end - - def error_response(status, message) - [status, { 'content-type' => 'text/plain', 'x-restate-server' => X_RESTATE_SERVER }, [message]] - end - - def handle_discover(env) - # Detect HTTP version for protocol mode - http_version = env['HTTP_VERSION'] || env['SERVER_PROTOCOL'] || 'HTTP/1.1' - discovered_as = http_version.include?('2') ? 'bidi' : 'request_response' - - # Negotiate discovery protocol version from Accept header - accept = env['HTTP_ACCEPT'] || '' - version = negotiate_version(accept) - return error_response(415, "Unsupported discovery version: #{accept}") unless version - - begin - json = Discovery.compute_discovery_json(@endpoint, version, discovered_as) - content_type = "application/vnd.restate.endpointmanifest.v#{version}+json" - [ - 200, - { - 'content-type' => content_type, - 'x-restate-server' => X_RESTATE_SERVER - }, - [json] - ] - rescue StandardError => e - error_response(500, "Error computing discovery: #{e.message}") - end - end - - def negotiate_version(accept) - if accept.include?('application/vnd.restate.endpointmanifest.v4+json') - 4 - elsif accept.include?('application/vnd.restate.endpointmanifest.v3+json') - 3 - elsif accept.include?('application/vnd.restate.endpointmanifest.v2+json') - 2 - elsif accept.empty? - 2 - end - end - - def handle_invocation(env, service_name, handler_name) - # Verify identity - request_headers = extract_headers(env) - path = env['PATH_INFO'] || '/' - begin - @identity_verifier.verify(request_headers, path) - rescue Internal::IdentityVerificationError - return [401, { 'x-restate-server' => X_RESTATE_SERVER }, ['']] - end - - # Find the service and handler - service = @endpoint.services[service_name] - return not_found_response unless service - - handler = service.handlers[handler_name] - return not_found_response unless handler - - # Process the invocation with streaming - process_invocation(env, handler, request_headers) - end - - def process_invocation(env, handler, request_headers) - vm = VMWrapper.new(request_headers) - status, response_headers = vm.get_response_head - - # Streaming response body — output chunks are sent to Restate as they're - # produced. This is critical for BidiStream mode where the VM needs output - # acknowledged before it can make further progress. - output_queue = Async::Queue.new - send_output = ->(chunk) { output_queue.enqueue(chunk) } - - # Input queue bridges the HTTP body reader and the handler's progress loop. - input_queue = Async::Queue.new - - # Read request body chunks and feed to VM until ready to execute, - # then continue feeding remaining chunks via the input queue. - rack_input = env['rack.input'] - ready = false - if rack_input - # Feed chunks until the VM has enough to start execution - while (chunk = rack_input.read_partial(16_384)) - vm.notify_input(chunk.b) unless chunk.empty? - if vm.is_ready_to_execute - ready = true - break - end - end - vm.notify_input_closed unless ready - end - - invocation = vm.sys_input - - # Spawn a background task to continue reading remaining input - if ready - Async do - while (chunk = rack_input.read_partial(16_384)) - input_queue.enqueue(chunk.b) unless chunk.empty? - end - input_queue.enqueue(:eof) - rescue StandardError => e - LOGGER.error("Input reader error: #{e.inspect}") - input_queue.enqueue(:disconnected) - end - end - - context = ServerContext.new( - vm: vm, - handler: handler, - invocation: invocation, - send_output: send_output, - input_queue: input_queue, - middleware: @endpoint.middleware, - outbound_middleware: @endpoint.outbound_middleware - ) - - # Spawn the handler as an async task so the response body can stream - # output concurrently. - Async do - begin - context.enter - rescue DisconnectedError - # Client disconnected - rescue StandardError => e - LOGGER.error("Exception in handler: #{e.inspect}") - ensure - # Signal that the attempt is finished — wakes any waiters on - # ctx.request.attempt_finished_event and cancels pending background pool jobs. - context.on_attempt_finished - end - - # Drain remaining output from VM - loop do - chunk = vm.take_output - break if chunk.nil? || chunk.empty? - - output_queue.enqueue(chunk) - end - - # Signal end of output - output_queue.enqueue(nil) - end - - body = StreamingBody.new(output_queue) - - merged_headers = response_headers.to_h { |pair| [pair[0], pair[1]] } - merged_headers['x-restate-server'] = X_RESTATE_SERVER - - [status, merged_headers, body] - end - - # Rack 3 streaming body that yields chunks from an Async::Queue. - # Terminates when nil is dequeued. - class StreamingBody - def initialize(queue) - @queue = queue - end - - def each - loop do - chunk = @queue.dequeue - break if chunk.nil? - - yield chunk - end - end - end - - def extract_headers(env) - headers = [] - env.each do |key, value| - next unless key.start_with?('HTTP_') - - header_name = key.byteslice(5..).tr('_', '-').downcase! - headers << [header_name, value] - end - # Also include content-type and content-length if present - headers << ['content-type', env['CONTENT_TYPE']] if env['CONTENT_TYPE'] - headers << ['content-length', env['CONTENT_LENGTH']] if env['CONTENT_LENGTH'] - headers - end - end -end +# Convenience entry point: `require "restate/server"` loads the core SDK +# plus the server module (Rack app + async execution context). +# +# Use this when you want both core and server loaded eagerly, e.g.: +# gem "restate-sdk", require: "restate/server" +require_relative '../restate' +require_relative 'server/handler' diff --git a/lib/restate/server/context.rb b/lib/restate/server/context.rb new file mode 100644 index 0000000..b3b18e1 --- /dev/null +++ b/lib/restate/server/context.rb @@ -0,0 +1,680 @@ +# typed: false +# frozen_string_literal: true + +require 'async' +require 'async/queue' +require 'logger' + +module Restate + module Server + # The core execution context for a Restate handler invocation. + # Implements the progress loop and all context API methods (state, run, sleep, call, send). + # + # Concurrency model: + # - The handler runs inside a Fiber managed by Falcon/Async. + # - `run` blocks spawn child Async tasks. + # - When the progress loop needs input, it dequeues from @input_queue, yielding the Fiber. + # - The HTTP input reader (a separate Async task) feeds chunks into @input_queue. + # - Output chunks are written directly to the streaming response body. + class Context + include WorkflowContext + include WorkflowSharedContext + + LOGGER = Logger.new($stdout, progname: 'Restate::Server::Context') + + attr_reader :vm, :invocation + + def initialize(vm:, handler:, invocation:, send_output:, input_queue:, middleware: [], + outbound_middleware: []) + @vm = vm + @handler = handler + @invocation = invocation + @send_output = send_output + @input_queue = input_queue + @run_coros_to_execute = {} + @attempt_finished_event = AttemptFinishedEvent.new + @middleware = middleware + @outbound_middleware = outbound_middleware + end + + # ── Main entry point ── + + # Runs the handler to completion, writing the output (or failure) to the journal. + def enter + Thread.current[:restate_context] = self + Thread.current[:restate_service_kind] = @handler.service_tag.kind + Thread.current[:restate_handler_kind] = @handler.kind + in_buffer = @invocation.input_buffer + out_buffer = Restate.invoke_handler(handler: @handler, ctx: self, in_buffer: in_buffer, + middleware: @middleware) + @vm.sys_write_output_success(out_buffer) + @vm.sys_end + rescue TerminalError => e + failure = Failure.new(code: e.status_code, message: e.message) + @vm.sys_write_output_failure(failure) + @vm.sys_end + rescue SuspendedError, InternalError + # These are expected internal control flow exceptions; do nothing. + rescue DisconnectedError + raise + rescue StandardError => e + # Walk the cause chain for TerminalError or internal exceptions + cause = e + handled = false + while cause + if cause.is_a?(TerminalError) + f = Failure.new(code: cause.status_code, message: cause.message) + @vm.sys_write_output_failure(f) + @vm.sys_end + handled = true + break + elsif cause.is_a?(SuspendedError) || cause.is_a?(InternalError) + handled = true + break + end + cause = cause.cause + end + unless handled + @vm.notify_error(e.inspect, e.backtrace&.join("\n")) + raise + end + ensure + @run_coros_to_execute.clear + Thread.current[:restate_context] = nil + Thread.current[:restate_service_kind] = nil + Thread.current[:restate_handler_kind] = nil + end + + # Called by the server when the attempt ends (handler completed, disconnected, + # or transient error). Signals the attempt_finished_event so that user code + # and background pool jobs can clean up. + def on_attempt_finished + @attempt_finished_event.set! + end + + # ── State operations ── + + # Durably retrieves a state entry by name. Returns nil if unset. + def get(name, serde: JsonSerde) + get_async(name, serde: serde).await + end + + # Returns a DurableFuture for a state entry. Resolves to nil if unset. + def get_async(name, serde: JsonSerde) + handle = @vm.sys_get_state(name) + DurableFuture.new(self, handle, serde: serde) + end + + # Durably sets a state entry. The value is serialized via +serde+. + def set(name, value, serde: JsonSerde) + @vm.sys_set_state(name, serde.serialize(value)) + end + + # Durably removes a single state entry by name. + def clear(name) + @vm.sys_clear_state(name) + end + + # Durably removes all state entries for this virtual object or workflow. + def clear_all + @vm.sys_clear_all_state + end + + # Returns the list of all state entry names for this virtual object or workflow. + def state_keys + state_keys_async.await + end + + # Returns a DurableFuture for the list of all state entry names. + def state_keys_async + handle = @vm.sys_get_state_keys + DurableFuture.new(self, handle) + end + + # ── Sleep ── + + # Returns a durable future that completes after the given duration. + # The timer survives handler restarts. + def sleep(seconds) + millis = (seconds * 1000).to_i + handle = @vm.sys_sleep(millis) + DurableFuture.new(self, handle) + end + + # Block until a previously created handle completes. Returns the value. + def resolve_handle(handle) + poll_and_take(handle) + end + + # Wait until any of the given handles completes. Does not take notifications. + def wait_any_handle(handles) + poll_or_cancel(handles) unless handles.any? { |h| @vm.is_completed(h) } + end + + # Check if a handle is completed (non-blocking). + def completed?(handle) + @vm.is_completed(handle) + end + + # Take a completed handle's notification, returning the value. + # Raises TerminalError if the handle resolved to a failure. + def take_completed(handle) + must_take_notification(handle) + end + + # Wait until any of the given futures completes. Returns [completed, remaining]. + def wait_any(*futures) + handles = futures.map(&:handle) + wait_any_handle(handles) + futures.partition(&:completed?) + end + + # ── Durable run (side effect) ── + + # Executes a durable side effect. The block runs at most once; its result is + # journaled and replayed on retries. Returns a DurableFuture for the result. + # + # Pass +background: true+ to run the block in a real OS Thread, keeping the + # fiber event loop responsive for other concurrent handlers. Use this for + # CPU-intensive work. + def run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) + handle = @vm.sys_run(name) + + @run_coros_to_execute[handle] = + if background + -> { execute_run_threaded(handle, action, serde, retry_policy) } + else + -> { execute_run(handle, action, serde, retry_policy) } + end + + DurableFuture.new(self, handle, serde: serde) + end + + # Convenience shortcut for +run(...).await+ — executes the durable side effect + # and returns the result directly. + # + # Accepts all the same options as +run+, including +background: true+. + def run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) + run(name, serde: serde, retry_policy: retry_policy, background: background, &action).await + end + + # ── Service calls ── + + # Durably calls a handler on a Restate service and returns a future for its result. + def service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, + input_serde: NOT_SET, output_serde: NOT_SET) + svc_name, handler_name, handler_meta = resolve_call_target(service, handler) + in_serde = resolve_serde(input_serde, handler_meta, :input_serde) + out_serde = resolve_serde(output_serde, handler_meta, :output_serde) + parameter = in_serde.serialize(arg) + with_outbound_middleware(svc_name, handler_name, headers, handler_meta: handler_meta) do |hdrs| + call_handle = @vm.sys_call( + service: svc_name, handler: handler_name, parameter: parameter, + key: key, idempotency_key: idempotency_key, headers: hdrs + ) + DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle, + output_serde: out_serde) + end + end + + # Sends a one-way invocation to a Restate service handler (fire-and-forget). + def service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, + input_serde: NOT_SET) + svc_name, handler_name, handler_meta = resolve_call_target(service, handler) + in_serde = resolve_serde(input_serde, handler_meta, :input_serde) + parameter = in_serde.serialize(arg) + delay_ms = delay ? (delay * 1000).to_i : nil + with_outbound_middleware(svc_name, handler_name, headers, handler_meta: handler_meta) do |hdrs| + invocation_id_handle = @vm.sys_send( + service: svc_name, handler: handler_name, parameter: parameter, + key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs + ) + SendHandle.new(self, invocation_id_handle) + end + end + + # Durably calls a handler on a Restate virtual object, keyed by +key+. + def object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, + input_serde: NOT_SET, output_serde: NOT_SET) + svc_name, handler_name, handler_meta = resolve_call_target(service, handler) + in_serde = resolve_serde(input_serde, handler_meta, :input_serde) + out_serde = resolve_serde(output_serde, handler_meta, :output_serde) + parameter = in_serde.serialize(arg) + with_outbound_middleware(svc_name, handler_name, headers, handler_meta: handler_meta) do |hdrs| + call_handle = @vm.sys_call( + service: svc_name, handler: handler_name, parameter: parameter, + key: key, idempotency_key: idempotency_key, headers: hdrs + ) + DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle, + output_serde: out_serde) + end + end + + # Sends a one-way invocation to a Restate virtual object handler (fire-and-forget). + def object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, + input_serde: NOT_SET) + svc_name, handler_name, handler_meta = resolve_call_target(service, handler) + in_serde = resolve_serde(input_serde, handler_meta, :input_serde) + parameter = in_serde.serialize(arg) + delay_ms = delay ? (delay * 1000).to_i : nil + with_outbound_middleware(svc_name, handler_name, headers, handler_meta: handler_meta) do |hdrs| + invocation_id_handle = @vm.sys_send( + service: svc_name, handler: handler_name, parameter: parameter, + key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs + ) + SendHandle.new(self, invocation_id_handle) + end + end + + # Durably calls a handler on a Restate workflow, keyed by +key+. + def workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, + input_serde: NOT_SET, output_serde: NOT_SET) + object_call(service, handler, key, arg, idempotency_key: idempotency_key, headers: headers, + input_serde: input_serde, output_serde: output_serde) # rubocop:disable Layout/HashAlignment + end + + # Sends a one-way invocation to a Restate workflow handler (fire-and-forget). + def workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, + input_serde: NOT_SET) + object_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key, headers: headers, + input_serde: input_serde) # rubocop:disable Layout/HashAlignment + end + + # ── Awakeables ── + + # Creates an awakeable and returns [awakeable_id, DurableFuture]. + def awakeable(serde: JsonSerde) + id, handle = @vm.sys_awakeable + [id, DurableFuture.new(self, handle, serde: serde)] + end + + # Resolves an awakeable with a success value. + def resolve_awakeable(awakeable_id, payload, serde: JsonSerde) + @vm.sys_complete_awakeable_success(awakeable_id, serde.serialize(payload)) + end + + # Rejects an awakeable with a terminal failure. + def reject_awakeable(awakeable_id, message, code: 500) + failure = Failure.new(code: code, message: message) + @vm.sys_complete_awakeable_failure(awakeable_id, failure) + end + + # ── Signals ── + + # Wait for a named signal addressed to this invocation. Returns a DurableFuture. + def signal(name, serde: JsonSerde) + handle = @vm.sys_signal(name) + DurableFuture.new(self, handle, serde: serde) + end + + # Send a success value to a named signal on another invocation. + def resolve_signal(invocation_id, name, payload, serde: JsonSerde) + @vm.sys_complete_signal_success(invocation_id, name, serde.serialize(payload)) + end + + # Send a terminal failure to a named signal on another invocation. + def reject_signal(invocation_id, name, message, code: 500) + failure = Failure.new(code: code, message: message) + @vm.sys_complete_signal_failure(invocation_id, name, failure) + end + + # ── Promises (Workflow API) ── + + # Gets a durable promise value, blocking until resolved. + def promise(name, serde: JsonSerde) + handle = @vm.sys_get_promise(name) + poll_and_take(handle) do |raw| + raw.nil? ? nil : serde.deserialize(raw) + end + end + + # Peeks at a durable promise value without blocking. Returns nil if not yet resolved. + def peek_promise(name, serde: JsonSerde) + handle = @vm.sys_peek_promise(name) + poll_and_take(handle) do |raw| + raw.nil? ? nil : serde.deserialize(raw) + end + end + + # Resolves a durable promise with a success value. + def resolve_promise(name, payload, serde: JsonSerde) + handle = @vm.sys_complete_promise_success(name, serde.serialize(payload)) + poll_and_take(handle) + nil + end + + # Rejects a durable promise with a terminal failure. + def reject_promise(name, message, code: 500) + failure = Failure.new(code: code, message: message) + handle = @vm.sys_complete_promise_failure(name, failure) + poll_and_take(handle) + nil + end + + # ── Cancel invocation ── + + # Requests cancellation of another invocation by its id. + def cancel_invocation(invocation_id) + @vm.sys_cancel_invocation(invocation_id) + end + + # ── Generic calls (raw bytes, no serde) ── + + # Durably calls a handler using raw bytes (no serialization). Useful for proxying. + def generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) + with_outbound_middleware(service, handler, headers) do |hdrs| + call_handle = @vm.sys_call( + service: service, handler: handler, parameter: arg, + key: key, idempotency_key: idempotency_key, headers: hdrs + ) + DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle, + output_serde: nil) + end + end + + # Sends a one-way invocation using raw bytes (no serialization). Useful for proxying. + def generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) + delay_ms = delay ? (delay * 1000).to_i : nil + with_outbound_middleware(service, handler, headers) do |hdrs| + invocation_id_handle = @vm.sys_send( + service: service, handler: handler, parameter: arg, + key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs + ) + SendHandle.new(self, invocation_id_handle) + end + end + + # ── Request metadata ── + + # Returns metadata about the current invocation (id, headers, raw body). + def request + @request ||= Request.new( + id: @invocation.invocation_id, + headers: @invocation.headers.to_h, + body: @invocation.input_buffer, + attempt_finished_event: @attempt_finished_event + ) + end + + # Returns the key for this virtual object or workflow invocation. + def key + @invocation.key + end + + private + + # ── Progress loop ── + + # Polls until the given handle(s) complete, then takes the notification. + def poll_and_take(handle, &) + poll_or_cancel([handle]) unless @vm.is_completed(handle) + must_take_notification(handle, &) + end + + def poll_or_cancel(handles) + loop do + flush_output + response = @vm.do_progress(handles) + + if response.is_a?(Exception) + LOGGER.error("Exception in do_progress: #{response}") + raise InternalError + end + + case response + when Suspended + raise SuspendedError + when DoProgressAnyCompleted + return + when DoProgressCancelSignalReceived + raise TerminalError.new('cancelled', status_code: 409) + when DoProgressExecuteRun + fn = @run_coros_to_execute.delete(response.handle) + raise "Missing run coroutine for handle #{response.handle}" unless fn + + # Spawn child task for the run action + Async do + fn.call + ensure + @input_queue.enqueue(:run_completed) + end + when DoWaitPendingRun, DoProgressReadFromInput + # Wait for input from the HTTP body reader or a run completion signal + event = @input_queue.dequeue + + case event + when :run_completed + next + when :eof + @vm.notify_input_closed + when :disconnected + raise DisconnectedError + when String + @vm.notify_input(event) + end + end + end + end + + def must_take_notification(handle, &block) + result = @vm.take_notification(handle) + + if result.is_a?(Exception) + flush_output + LOGGER.error("Exception in take_notification: #{result}") + raise InternalError + end + + case result + when Suspended + flush_output + raise SuspendedError + when NotReady + raise "Unexpected NotReady for handle #{handle}" + when Failure + raise TerminalError.new(result.message, status_code: result.code) + else + block ? yield(result) : result + end + end + + def flush_output + loop do + output = @vm.take_output + break if output.nil? || output.empty? + + @send_output.call(output) + end + end + + # ── Outbound middleware ── + + # Runs outbound middleware chain (Sidekiq client middleware pattern). + # Each middleware gets +call(service, handler, headers)+ and must +yield+ + # to continue the chain. The block at the end performs the actual VM call. + # + # The optional +handler_meta+ (a Handler struct from resolve_call_target) + # is exposed via Thread.current[:restate_outbound_handler_meta] so that + # middleware can inspect the target handler's kind without changing the + # middleware interface. + def with_outbound_middleware(service, handler, headers, handler_meta: nil, &action) + return action.call(headers) if @outbound_middleware.empty? + + h = headers || {} + previous_meta = Thread.current[:restate_outbound_handler_meta] + Thread.current[:restate_outbound_handler_meta] = handler_meta + chain = ->(hdrs) { action.call(hdrs) } + @outbound_middleware.reverse_each do |mw| + prev = chain + chain = ->(hdrs) { mw.call(service, handler, hdrs) { prev.call(hdrs) } } + end + chain.call(h) + ensure + Thread.current[:restate_outbound_handler_meta] = previous_meta + end + + # ── Call target resolution ── + + # Resolves a service+handler pair from class/symbol or string/string. + # Returns [service_name, handler_name, handler_metadata_or_nil]. + def resolve_call_target(service, handler) + handler_name = handler.is_a?(Symbol) ? handler.name : handler.to_s + if service.is_a?(Class) && service.respond_to?(:service_name) + svc_name = service.service_name + handler_meta = service.respond_to?(:handlers) ? service.handlers[handler_name] : nil + [svc_name, handler_name, handler_meta] + else + [service.to_s, handler_name, nil] + end + end + + # Resolves a serde value: if the caller passed NOT_SET, fall back to handler metadata, then JsonSerde. + def resolve_serde(caller_serde, handler_meta, field) + return caller_serde unless caller_serde.equal?(NOT_SET) + return JsonSerde unless handler_meta + + io = handler_meta.handler_io + case field + when :input_serde then io.input_serde + when :output_serde then io.output_serde + else JsonSerde + end + end + + # ── Run execution ── + + def execute_run(handle, action, serde, retry_policy) + propose_run_result(handle, action, serde, retry_policy) + end + + # Like execute_run, but offloads the action to a real OS Thread. + # The fiber yields (via IO.pipe) while the thread runs, keeping the event loop responsive. + def execute_run_threaded(handle, action, serde, retry_policy) + propose_run_result(handle, -> { offload_to_thread(action) }, serde, retry_policy) + end + + # Runs the action and proposes the result (success/failure/transient) to the VM. + def propose_run_result(handle, action, serde, retry_policy) + start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + begin + result = action.call + buffer = serde.serialize(result) + @vm.propose_run_completion_success(handle, buffer) + rescue TerminalError => e + failure = Failure.new(code: e.status_code, message: e.message) + @vm.propose_run_completion_failure(handle, failure) + rescue SuspendedError, InternalError + raise + rescue StandardError => e + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start + attempt_duration_ms = (elapsed * 1000).to_i + failure = Failure.new( + code: 500, + message: e.inspect, + stacktrace: e.backtrace&.join("\n") + ) + config = RunRetryConfig.new( + initial_interval: retry_policy&.initial_interval, + max_attempts: retry_policy&.max_attempts, + max_duration: retry_policy&.max_duration, + max_interval: retry_policy&.max_interval, + interval_factor: retry_policy&.interval_factor + ) + @vm.propose_run_completion_transient( + handle, + failure: failure, + attempt_duration_ms: attempt_duration_ms, + config: config + ) + end + end + + # Run a block in an OS thread from a shared pool, yielding the current fiber + # until it completes. Uses IO.pipe to yield the fiber to the Async event loop + # while the thread does work. + # + # The action is wrapped with a cancellation flag so that if the invocation + # finishes (e.g., suspended, terminal error) before the pool picks up the job, + # the action is skipped. + # + # Note: With Async 2.x and Ruby 3.1+, the Fiber Scheduler already intercepts + # most blocking I/O (Net::HTTP, TCPSocket, etc.) and yields the fiber + # automatically. +background: true+ is only needed for CPU-heavy native + # extensions that release the GVL (e.g., image processing, crypto). + def offload_to_thread(action) + read_io, write_io = IO.pipe + result = nil + error = nil + event = @attempt_finished_event + + begin + BackgroundPool.submit do + if event.set? + # Attempt already finished before pool picked up the job — skip. + next + end + + result = action.call + rescue Exception => e # rubocop:disable Lint/RescueException + error = e + ensure + write_io.close unless write_io.closed? + end + + # Yields the fiber in Async context; resumes when the worker closes write_io. + read_io.read(1) + read_io.close + + raise error if error + + result + ensure + read_io.close unless read_io.closed? + write_io.close unless write_io.closed? + end + end + + # A simple fixed-size thread pool for background: true runs. + # Avoids creating a new Thread per call (~1ms + ~1MB stack each). + # Workers are daemon threads that do not prevent process exit. + module BackgroundPool + @queue = Queue.new + @workers = [] + @mutex = Mutex.new + @size = 0 + + POOL_SIZE = Integer(ENV.fetch('RESTATE_BACKGROUND_POOL_SIZE', 8)) + + module_function + + # Submit a block to be executed by a pool worker. + def submit(&block) + ensure_started + @queue.push(block) + end + + def ensure_started + return if @size >= POOL_SIZE + + @mutex.synchronize do + while @size < POOL_SIZE + @size += 1 + worker = Thread.new do + Kernel.loop do + job = @queue.pop + break if job == :shutdown + + job.call + end + end + worker.name = "restate-bg-#{@size}" + # Daemon thread: does not prevent the process from exiting. + worker.report_on_exception = false + @workers << worker + end + end + end + end + end + end +end diff --git a/lib/restate/server/handler.rb b/lib/restate/server/handler.rb new file mode 100644 index 0000000..4e108a4 --- /dev/null +++ b/lib/restate/server/handler.rb @@ -0,0 +1,269 @@ +# typed: ignore +# frozen_string_literal: true + +require 'async' +require 'async/queue' +require 'logger' +require_relative 'context' + +module Restate + module Server + # Rack-compatible application that handles Restate protocol requests. + # Designed to work with Falcon for HTTP/2 bidirectional streaming. + # + # Routes: + # GET /discover → service manifest + # GET /health → health check + # POST /invoke/:service/:handler → handler invocation + class Handler + SDK_VERSION = Internal::SDK_VERSION + X_RESTATE_SERVER = "restate-sdk-ruby/#{SDK_VERSION}".freeze + + LOGGER = Logger.new($stdout, progname: 'Restate::Server::Handler') + + def initialize(endpoint) + @endpoint = endpoint + @identity_verifier = Internal::IdentityVerifier.new(endpoint.identity_keys) + end + + # Rack interface + def call(env) + path = env['PATH_INFO'] || '/' + parsed = parse_path(path) + + case parsed[:type] + when :health + health_response + when :discover + handle_discover(env) + when :invocation + handle_invocation(env, parsed[:service], parsed[:handler]) + else + not_found_response + end + rescue StandardError => e + LOGGER.error("Exception in Restate server: #{e.inspect}") + LOGGER.error(e.backtrace&.join("\n")) if e.backtrace + error_response(500, 'Internal server error') + end + + private + + def parse_path(path) + segments = path.split('/').reject(&:empty?) + + # Check for /invoke/:service/:handler + if segments.length >= 3 + invoke_idx = segments.rindex('invoke') + if invoke_idx && segments.length > invoke_idx + 2 + return { + type: :invocation, + service: segments[invoke_idx + 1], + handler: segments[invoke_idx + 2] + } + end + end + + case segments.last + when 'health' + { type: :health } + when 'discover' + { type: :discover } + else + { type: :unknown } + end + end + + def health_response + [200, { 'content-type' => 'application/json', 'x-restate-server' => X_RESTATE_SERVER }, ['{"status":"ok"}']] + end + + def not_found_response + [404, { 'x-restate-server' => X_RESTATE_SERVER }, ['']] + end + + def error_response(status, message) + [status, { 'content-type' => 'text/plain', 'x-restate-server' => X_RESTATE_SERVER }, [message]] + end + + def handle_discover(env) + # Detect HTTP version for protocol mode + http_version = env['HTTP_VERSION'] || env['SERVER_PROTOCOL'] || 'HTTP/1.1' + discovered_as = http_version.include?('2') ? 'bidi' : 'request_response' + + # Negotiate discovery protocol version from Accept header + accept = env['HTTP_ACCEPT'] || '' + version = negotiate_version(accept) + return error_response(415, "Unsupported discovery version: #{accept}") unless version + + begin + json = Discovery.compute_discovery_json(@endpoint, version, discovered_as) + content_type = "application/vnd.restate.endpointmanifest.v#{version}+json" + [ + 200, + { + 'content-type' => content_type, + 'x-restate-server' => X_RESTATE_SERVER + }, + [json] + ] + rescue StandardError => e + error_response(500, "Error computing discovery: #{e.message}") + end + end + + def negotiate_version(accept) + if accept.include?('application/vnd.restate.endpointmanifest.v4+json') + 4 + elsif accept.include?('application/vnd.restate.endpointmanifest.v3+json') + 3 + elsif accept.include?('application/vnd.restate.endpointmanifest.v2+json') + 2 + elsif accept.empty? + 2 + end + end + + def handle_invocation(env, service_name, handler_name) + # Verify identity + request_headers = extract_headers(env) + path = env['PATH_INFO'] || '/' + begin + @identity_verifier.verify(request_headers, path) + rescue Internal::IdentityVerificationError + return [401, { 'x-restate-server' => X_RESTATE_SERVER }, ['']] + end + + # Find the service and handler + service = @endpoint.services[service_name] + return not_found_response unless service + + handler = service.handlers[handler_name] + return not_found_response unless handler + + # Process the invocation with streaming + process_invocation(env, handler, request_headers) + end + + def process_invocation(env, handler, request_headers) + vm = VMWrapper.new(request_headers) + status, response_headers = vm.get_response_head + + # Streaming response body — output chunks are sent to Restate as they're + # produced. This is critical for BidiStream mode where the VM needs output + # acknowledged before it can make further progress. + output_queue = Async::Queue.new + send_output = ->(chunk) { output_queue.enqueue(chunk) } + + # Input queue bridges the HTTP body reader and the handler's progress loop. + input_queue = Async::Queue.new + + # Read request body chunks and feed to VM until ready to execute, + # then continue feeding remaining chunks via the input queue. + rack_input = env['rack.input'] + ready = false + if rack_input + # Feed chunks until the VM has enough to start execution + while (chunk = rack_input.read_partial(16_384)) + vm.notify_input(chunk.b) unless chunk.empty? + if vm.is_ready_to_execute + ready = true + break + end + end + vm.notify_input_closed unless ready + end + + invocation = vm.sys_input + + # Spawn a background task to continue reading remaining input + if ready + Async do + while (chunk = rack_input.read_partial(16_384)) + input_queue.enqueue(chunk.b) unless chunk.empty? + end + input_queue.enqueue(:eof) + rescue StandardError => e + LOGGER.error("Input reader error: #{e.inspect}") + input_queue.enqueue(:disconnected) + end + end + + context = Context.new( + vm: vm, + handler: handler, + invocation: invocation, + send_output: send_output, + input_queue: input_queue, + middleware: @endpoint.middleware, + outbound_middleware: @endpoint.outbound_middleware + ) + + # Spawn the handler as an async task so the response body can stream + # output concurrently. + Async do + begin + context.enter + rescue DisconnectedError + # Client disconnected + rescue StandardError => e + LOGGER.error("Exception in handler: #{e.inspect}") + ensure + # Signal that the attempt is finished — wakes any waiters on + # ctx.request.attempt_finished_event and cancels pending background pool jobs. + context.on_attempt_finished + end + + # Drain remaining output from VM + loop do + chunk = vm.take_output + break if chunk.nil? || chunk.empty? + + output_queue.enqueue(chunk) + end + + # Signal end of output + output_queue.enqueue(nil) + end + + body = StreamingBody.new(output_queue) + + merged_headers = response_headers.to_h { |pair| [pair[0], pair[1]] } + merged_headers['x-restate-server'] = X_RESTATE_SERVER + + [status, merged_headers, body] + end + + # Rack 3 streaming body that yields chunks from an Async::Queue. + # Terminates when nil is dequeued. + class StreamingBody + def initialize(queue) + @queue = queue + end + + def each + loop do + chunk = @queue.dequeue + break if chunk.nil? + + yield chunk + end + end + end + + def extract_headers(env) + headers = [] + env.each do |key, value| + next unless key.start_with?('HTTP_') + + header_name = key.byteslice(5..).tr('_', '-').downcase! + headers << [header_name, value] + end + # Also include content-type and content-length if present + headers << ['content-type', env['CONTENT_TYPE']] if env['CONTENT_TYPE'] + headers << ['content-length', env['CONTENT_LENGTH']] if env['CONTENT_LENGTH'] + headers + end + end + end +end diff --git a/lib/restate/server_context.rb b/lib/restate/server_context.rb deleted file mode 100644 index 9653360..0000000 --- a/lib/restate/server_context.rb +++ /dev/null @@ -1,678 +0,0 @@ -# typed: false -# frozen_string_literal: true - -require 'async' -require 'async/queue' -require 'logger' - -module Restate - # The core execution context for a Restate handler invocation. - # Implements the progress loop and all context API methods (state, run, sleep, call, send). - # - # Concurrency model: - # - The handler runs inside a Fiber managed by Falcon/Async. - # - `run` blocks spawn child Async tasks. - # - When the progress loop needs input, it dequeues from @input_queue, yielding the Fiber. - # - The HTTP input reader (a separate Async task) feeds chunks into @input_queue. - # - Output chunks are written directly to the streaming response body. - class ServerContext - include WorkflowContext - include WorkflowSharedContext - - LOGGER = Logger.new($stdout, progname: 'Restate::ServerContext') - - attr_reader :vm, :invocation - - def initialize(vm:, handler:, invocation:, send_output:, input_queue:, middleware: [], - outbound_middleware: []) - @vm = vm - @handler = handler - @invocation = invocation - @send_output = send_output - @input_queue = input_queue - @run_coros_to_execute = {} - @attempt_finished_event = AttemptFinishedEvent.new - @middleware = middleware - @outbound_middleware = outbound_middleware - end - - # ── Main entry point ── - - # Runs the handler to completion, writing the output (or failure) to the journal. - def enter - Thread.current[:restate_context] = self - Thread.current[:restate_service_kind] = @handler.service_tag.kind - Thread.current[:restate_handler_kind] = @handler.kind - in_buffer = @invocation.input_buffer - out_buffer = Restate.invoke_handler(handler: @handler, ctx: self, in_buffer: in_buffer, - middleware: @middleware) - @vm.sys_write_output_success(out_buffer) - @vm.sys_end - rescue TerminalError => e - failure = Failure.new(code: e.status_code, message: e.message) - @vm.sys_write_output_failure(failure) - @vm.sys_end - rescue SuspendedError, InternalError - # These are expected internal control flow exceptions; do nothing. - rescue DisconnectedError - raise - rescue StandardError => e - # Walk the cause chain for TerminalError or internal exceptions - cause = e - handled = false - while cause - if cause.is_a?(TerminalError) - f = Failure.new(code: cause.status_code, message: cause.message) - @vm.sys_write_output_failure(f) - @vm.sys_end - handled = true - break - elsif cause.is_a?(SuspendedError) || cause.is_a?(InternalError) - handled = true - break - end - cause = cause.cause - end - unless handled - @vm.notify_error(e.inspect, e.backtrace&.join("\n")) - raise - end - ensure - @run_coros_to_execute.clear - Thread.current[:restate_context] = nil - Thread.current[:restate_service_kind] = nil - Thread.current[:restate_handler_kind] = nil - end - - # Called by the server when the attempt ends (handler completed, disconnected, - # or transient error). Signals the attempt_finished_event so that user code - # and background pool jobs can clean up. - def on_attempt_finished - @attempt_finished_event.set! - end - - # ── State operations ── - - # Durably retrieves a state entry by name. Returns nil if unset. - def get(name, serde: JsonSerde) - get_async(name, serde: serde).await - end - - # Returns a DurableFuture for a state entry. Resolves to nil if unset. - def get_async(name, serde: JsonSerde) - handle = @vm.sys_get_state(name) - DurableFuture.new(self, handle, serde: serde) - end - - # Durably sets a state entry. The value is serialized via +serde+. - def set(name, value, serde: JsonSerde) - @vm.sys_set_state(name, serde.serialize(value)) - end - - # Durably removes a single state entry by name. - def clear(name) - @vm.sys_clear_state(name) - end - - # Durably removes all state entries for this virtual object or workflow. - def clear_all - @vm.sys_clear_all_state - end - - # Returns the list of all state entry names for this virtual object or workflow. - def state_keys - state_keys_async.await - end - - # Returns a DurableFuture for the list of all state entry names. - def state_keys_async - handle = @vm.sys_get_state_keys - DurableFuture.new(self, handle) - end - - # ── Sleep ── - - # Returns a durable future that completes after the given duration. - # The timer survives handler restarts. - def sleep(seconds) - millis = (seconds * 1000).to_i - handle = @vm.sys_sleep(millis) - DurableFuture.new(self, handle) - end - - # Block until a previously created handle completes. Returns the value. - def resolve_handle(handle) - poll_and_take(handle) - end - - # Wait until any of the given handles completes. Does not take notifications. - def wait_any_handle(handles) - poll_or_cancel(handles) unless handles.any? { |h| @vm.is_completed(h) } - end - - # Check if a handle is completed (non-blocking). - def completed?(handle) - @vm.is_completed(handle) - end - - # Take a completed handle's notification, returning the value. - # Raises TerminalError if the handle resolved to a failure. - def take_completed(handle) - must_take_notification(handle) - end - - # Wait until any of the given futures completes. Returns [completed, remaining]. - def wait_any(*futures) - handles = futures.map(&:handle) - wait_any_handle(handles) - futures.partition(&:completed?) - end - - # ── Durable run (side effect) ── - - # Executes a durable side effect. The block runs at most once; its result is - # journaled and replayed on retries. Returns a DurableFuture for the result. - # - # Pass +background: true+ to run the block in a real OS Thread, keeping the - # fiber event loop responsive for other concurrent handlers. Use this for - # CPU-intensive work. - def run(name, serde: JsonSerde, retry_policy: nil, background: false, &action) - handle = @vm.sys_run(name) - - @run_coros_to_execute[handle] = - if background - -> { execute_run_threaded(handle, action, serde, retry_policy) } - else - -> { execute_run(handle, action, serde, retry_policy) } - end - - DurableFuture.new(self, handle, serde: serde) - end - - # Convenience shortcut for +run(...).await+ — executes the durable side effect - # and returns the result directly. - # - # Accepts all the same options as +run+, including +background: true+. - def run_sync(name, serde: JsonSerde, retry_policy: nil, background: false, &action) - run(name, serde: serde, retry_policy: retry_policy, background: background, &action).await - end - - # ── Service calls ── - - # Durably calls a handler on a Restate service and returns a future for its result. - def service_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil, - input_serde: NOT_SET, output_serde: NOT_SET) - svc_name, handler_name, handler_meta = resolve_call_target(service, handler) - in_serde = resolve_serde(input_serde, handler_meta, :input_serde) - out_serde = resolve_serde(output_serde, handler_meta, :output_serde) - parameter = in_serde.serialize(arg) - with_outbound_middleware(svc_name, handler_name, headers, handler_meta: handler_meta) do |hdrs| - call_handle = @vm.sys_call( - service: svc_name, handler: handler_name, parameter: parameter, - key: key, idempotency_key: idempotency_key, headers: hdrs - ) - DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle, - output_serde: out_serde) - end - end - - # Sends a one-way invocation to a Restate service handler (fire-and-forget). - def service_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil, - input_serde: NOT_SET) - svc_name, handler_name, handler_meta = resolve_call_target(service, handler) - in_serde = resolve_serde(input_serde, handler_meta, :input_serde) - parameter = in_serde.serialize(arg) - delay_ms = delay ? (delay * 1000).to_i : nil - with_outbound_middleware(svc_name, handler_name, headers, handler_meta: handler_meta) do |hdrs| - invocation_id_handle = @vm.sys_send( - service: svc_name, handler: handler_name, parameter: parameter, - key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs - ) - SendHandle.new(self, invocation_id_handle) - end - end - - # Durably calls a handler on a Restate virtual object, keyed by +key+. - def object_call(service, handler, key, arg, idempotency_key: nil, headers: nil, - input_serde: NOT_SET, output_serde: NOT_SET) - svc_name, handler_name, handler_meta = resolve_call_target(service, handler) - in_serde = resolve_serde(input_serde, handler_meta, :input_serde) - out_serde = resolve_serde(output_serde, handler_meta, :output_serde) - parameter = in_serde.serialize(arg) - with_outbound_middleware(svc_name, handler_name, headers, handler_meta: handler_meta) do |hdrs| - call_handle = @vm.sys_call( - service: svc_name, handler: handler_name, parameter: parameter, - key: key, idempotency_key: idempotency_key, headers: hdrs - ) - DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle, - output_serde: out_serde) - end - end - - # Sends a one-way invocation to a Restate virtual object handler (fire-and-forget). - def object_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, - input_serde: NOT_SET) - svc_name, handler_name, handler_meta = resolve_call_target(service, handler) - in_serde = resolve_serde(input_serde, handler_meta, :input_serde) - parameter = in_serde.serialize(arg) - delay_ms = delay ? (delay * 1000).to_i : nil - with_outbound_middleware(svc_name, handler_name, headers, handler_meta: handler_meta) do |hdrs| - invocation_id_handle = @vm.sys_send( - service: svc_name, handler: handler_name, parameter: parameter, - key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs - ) - SendHandle.new(self, invocation_id_handle) - end - end - - # Durably calls a handler on a Restate workflow, keyed by +key+. - def workflow_call(service, handler, key, arg, idempotency_key: nil, headers: nil, - input_serde: NOT_SET, output_serde: NOT_SET) - object_call(service, handler, key, arg, idempotency_key: idempotency_key, headers: headers, - input_serde: input_serde, output_serde: output_serde) # rubocop:disable Layout/HashAlignment - end - - # Sends a one-way invocation to a Restate workflow handler (fire-and-forget). - def workflow_send(service, handler, key, arg, delay: nil, idempotency_key: nil, headers: nil, - input_serde: NOT_SET) - object_send(service, handler, key, arg, delay: delay, idempotency_key: idempotency_key, headers: headers, - input_serde: input_serde) # rubocop:disable Layout/HashAlignment - end - - # ── Awakeables ── - - # Creates an awakeable and returns [awakeable_id, DurableFuture]. - def awakeable(serde: JsonSerde) - id, handle = @vm.sys_awakeable - [id, DurableFuture.new(self, handle, serde: serde)] - end - - # Resolves an awakeable with a success value. - def resolve_awakeable(awakeable_id, payload, serde: JsonSerde) - @vm.sys_complete_awakeable_success(awakeable_id, serde.serialize(payload)) - end - - # Rejects an awakeable with a terminal failure. - def reject_awakeable(awakeable_id, message, code: 500) - failure = Failure.new(code: code, message: message) - @vm.sys_complete_awakeable_failure(awakeable_id, failure) - end - - # ── Signals ── - - # Wait for a named signal addressed to this invocation. Returns a DurableFuture. - def signal(name, serde: JsonSerde) - handle = @vm.sys_signal(name) - DurableFuture.new(self, handle, serde: serde) - end - - # Send a success value to a named signal on another invocation. - def resolve_signal(invocation_id, name, payload, serde: JsonSerde) - @vm.sys_complete_signal_success(invocation_id, name, serde.serialize(payload)) - end - - # Send a terminal failure to a named signal on another invocation. - def reject_signal(invocation_id, name, message, code: 500) - failure = Failure.new(code: code, message: message) - @vm.sys_complete_signal_failure(invocation_id, name, failure) - end - - # ── Promises (Workflow API) ── - - # Gets a durable promise value, blocking until resolved. - def promise(name, serde: JsonSerde) - handle = @vm.sys_get_promise(name) - poll_and_take(handle) do |raw| - raw.nil? ? nil : serde.deserialize(raw) - end - end - - # Peeks at a durable promise value without blocking. Returns nil if not yet resolved. - def peek_promise(name, serde: JsonSerde) - handle = @vm.sys_peek_promise(name) - poll_and_take(handle) do |raw| - raw.nil? ? nil : serde.deserialize(raw) - end - end - - # Resolves a durable promise with a success value. - def resolve_promise(name, payload, serde: JsonSerde) - handle = @vm.sys_complete_promise_success(name, serde.serialize(payload)) - poll_and_take(handle) - nil - end - - # Rejects a durable promise with a terminal failure. - def reject_promise(name, message, code: 500) - failure = Failure.new(code: code, message: message) - handle = @vm.sys_complete_promise_failure(name, failure) - poll_and_take(handle) - nil - end - - # ── Cancel invocation ── - - # Requests cancellation of another invocation by its id. - def cancel_invocation(invocation_id) - @vm.sys_cancel_invocation(invocation_id) - end - - # ── Generic calls (raw bytes, no serde) ── - - # Durably calls a handler using raw bytes (no serialization). Useful for proxying. - def generic_call(service, handler, arg, key: nil, idempotency_key: nil, headers: nil) - with_outbound_middleware(service, handler, headers) do |hdrs| - call_handle = @vm.sys_call( - service: service, handler: handler, parameter: arg, - key: key, idempotency_key: idempotency_key, headers: hdrs - ) - DurableCallFuture.new(self, call_handle.result_handle, call_handle.invocation_id_handle, - output_serde: nil) - end - end - - # Sends a one-way invocation using raw bytes (no serialization). Useful for proxying. - def generic_send(service, handler, arg, key: nil, delay: nil, idempotency_key: nil, headers: nil) - delay_ms = delay ? (delay * 1000).to_i : nil - with_outbound_middleware(service, handler, headers) do |hdrs| - invocation_id_handle = @vm.sys_send( - service: service, handler: handler, parameter: arg, - key: key, delay: delay_ms, idempotency_key: idempotency_key, headers: hdrs - ) - SendHandle.new(self, invocation_id_handle) - end - end - - # ── Request metadata ── - - # Returns metadata about the current invocation (id, headers, raw body). - def request - @request ||= Request.new( - id: @invocation.invocation_id, - headers: @invocation.headers.to_h, - body: @invocation.input_buffer, - attempt_finished_event: @attempt_finished_event - ) - end - - # Returns the key for this virtual object or workflow invocation. - def key - @invocation.key - end - - private - - # ── Progress loop ── - - # Polls until the given handle(s) complete, then takes the notification. - def poll_and_take(handle, &) - poll_or_cancel([handle]) unless @vm.is_completed(handle) - must_take_notification(handle, &) - end - - def poll_or_cancel(handles) - loop do - flush_output - response = @vm.do_progress(handles) - - if response.is_a?(Exception) - LOGGER.error("Exception in do_progress: #{response}") - raise InternalError - end - - case response - when Suspended - raise SuspendedError - when DoProgressAnyCompleted - return - when DoProgressCancelSignalReceived - raise TerminalError.new('cancelled', status_code: 409) - when DoProgressExecuteRun - fn = @run_coros_to_execute.delete(response.handle) - raise "Missing run coroutine for handle #{response.handle}" unless fn - - # Spawn child task for the run action - Async do - fn.call - ensure - @input_queue.enqueue(:run_completed) - end - when DoWaitPendingRun, DoProgressReadFromInput - # Wait for input from the HTTP body reader or a run completion signal - event = @input_queue.dequeue - - case event - when :run_completed - next - when :eof - @vm.notify_input_closed - when :disconnected - raise DisconnectedError - when String - @vm.notify_input(event) - end - end - end - end - - def must_take_notification(handle, &block) - result = @vm.take_notification(handle) - - if result.is_a?(Exception) - flush_output - LOGGER.error("Exception in take_notification: #{result}") - raise InternalError - end - - case result - when Suspended - flush_output - raise SuspendedError - when NotReady - raise "Unexpected NotReady for handle #{handle}" - when Failure - raise TerminalError.new(result.message, status_code: result.code) - else - block ? yield(result) : result - end - end - - def flush_output - loop do - output = @vm.take_output - break if output.nil? || output.empty? - - @send_output.call(output) - end - end - - # ── Outbound middleware ── - - # Runs outbound middleware chain (Sidekiq client middleware pattern). - # Each middleware gets +call(service, handler, headers)+ and must +yield+ - # to continue the chain. The block at the end performs the actual VM call. - # - # The optional +handler_meta+ (a Handler struct from resolve_call_target) - # is exposed via Thread.current[:restate_outbound_handler_meta] so that - # middleware can inspect the target handler's kind without changing the - # middleware interface. - def with_outbound_middleware(service, handler, headers, handler_meta: nil, &action) - return action.call(headers) if @outbound_middleware.empty? - - h = headers || {} - previous_meta = Thread.current[:restate_outbound_handler_meta] - Thread.current[:restate_outbound_handler_meta] = handler_meta - chain = ->(hdrs) { action.call(hdrs) } - @outbound_middleware.reverse_each do |mw| - prev = chain - chain = ->(hdrs) { mw.call(service, handler, hdrs) { prev.call(hdrs) } } - end - chain.call(h) - ensure - Thread.current[:restate_outbound_handler_meta] = previous_meta - end - - # ── Call target resolution ── - - # Resolves a service+handler pair from class/symbol or string/string. - # Returns [service_name, handler_name, handler_metadata_or_nil]. - def resolve_call_target(service, handler) - handler_name = handler.is_a?(Symbol) ? handler.name : handler.to_s - if service.is_a?(Class) && service.respond_to?(:service_name) - svc_name = service.service_name - handler_meta = service.respond_to?(:handlers) ? service.handlers[handler_name] : nil - [svc_name, handler_name, handler_meta] - else - [service.to_s, handler_name, nil] - end - end - - # Resolves a serde value: if the caller passed NOT_SET, fall back to handler metadata, then JsonSerde. - def resolve_serde(caller_serde, handler_meta, field) - return caller_serde unless caller_serde.equal?(NOT_SET) - return JsonSerde unless handler_meta - - io = handler_meta.handler_io - case field - when :input_serde then io.input_serde - when :output_serde then io.output_serde - else JsonSerde - end - end - - # ── Run execution ── - - def execute_run(handle, action, serde, retry_policy) - propose_run_result(handle, action, serde, retry_policy) - end - - # Like execute_run, but offloads the action to a real OS Thread. - # The fiber yields (via IO.pipe) while the thread runs, keeping the event loop responsive. - def execute_run_threaded(handle, action, serde, retry_policy) - propose_run_result(handle, -> { offload_to_thread(action) }, serde, retry_policy) - end - - # Runs the action and proposes the result (success/failure/transient) to the VM. - def propose_run_result(handle, action, serde, retry_policy) - start = Process.clock_gettime(Process::CLOCK_MONOTONIC) - begin - result = action.call - buffer = serde.serialize(result) - @vm.propose_run_completion_success(handle, buffer) - rescue TerminalError => e - failure = Failure.new(code: e.status_code, message: e.message) - @vm.propose_run_completion_failure(handle, failure) - rescue SuspendedError, InternalError - raise - rescue StandardError => e - elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start - attempt_duration_ms = (elapsed * 1000).to_i - failure = Failure.new( - code: 500, - message: e.inspect, - stacktrace: e.backtrace&.join("\n") - ) - config = RunRetryConfig.new( - initial_interval: retry_policy&.initial_interval, - max_attempts: retry_policy&.max_attempts, - max_duration: retry_policy&.max_duration, - max_interval: retry_policy&.max_interval, - interval_factor: retry_policy&.interval_factor - ) - @vm.propose_run_completion_transient( - handle, - failure: failure, - attempt_duration_ms: attempt_duration_ms, - config: config - ) - end - end - - # Run a block in an OS thread from a shared pool, yielding the current fiber - # until it completes. Uses IO.pipe to yield the fiber to the Async event loop - # while the thread does work. - # - # The action is wrapped with a cancellation flag so that if the invocation - # finishes (e.g., suspended, terminal error) before the pool picks up the job, - # the action is skipped. - # - # Note: With Async 2.x and Ruby 3.1+, the Fiber Scheduler already intercepts - # most blocking I/O (Net::HTTP, TCPSocket, etc.) and yields the fiber - # automatically. +background: true+ is only needed for CPU-heavy native - # extensions that release the GVL (e.g., image processing, crypto). - def offload_to_thread(action) - read_io, write_io = IO.pipe - result = nil - error = nil - event = @attempt_finished_event - - begin - BackgroundPool.submit do - if event.set? - # Attempt already finished before pool picked up the job — skip. - next - end - - result = action.call - rescue Exception => e # rubocop:disable Lint/RescueException - error = e - ensure - write_io.close unless write_io.closed? - end - - # Yields the fiber in Async context; resumes when the worker closes write_io. - read_io.read(1) - read_io.close - - raise error if error - - result - ensure - read_io.close unless read_io.closed? - write_io.close unless write_io.closed? - end - end - - # A simple fixed-size thread pool for background: true runs. - # Avoids creating a new Thread per call (~1ms + ~1MB stack each). - # Workers are daemon threads that do not prevent process exit. - module BackgroundPool - @queue = Queue.new - @workers = [] - @mutex = Mutex.new - @size = 0 - - POOL_SIZE = Integer(ENV.fetch('RESTATE_BACKGROUND_POOL_SIZE', 8)) - - module_function - - # Submit a block to be executed by a pool worker. - def submit(&block) - ensure_started - @queue.push(block) - end - - def ensure_started - return if @size >= POOL_SIZE - - @mutex.synchronize do - while @size < POOL_SIZE - @size += 1 - worker = Thread.new do - Kernel.loop do - job = @queue.pop - break if job == :shutdown - - job.call - end - end - worker.name = "restate-bg-#{@size}" - # Daemon thread: does not prevent the process from exiting. - worker.report_on_exception = false - @workers << worker - end - end - end - end - end -end diff --git a/sig/restate.rbs b/sig/restate.rbs index 21ffc24..365de85 100644 --- a/sig/restate.rbs +++ b/sig/restate.rbs @@ -185,8 +185,14 @@ module Restate # ── Internal ── - class Server - def initialize: (untyped endpoint) -> void + module Server + class Handler + def initialize: (untyped endpoint) -> void + def call: (Hash[String, untyped] env) -> [Integer, Hash[String, String], untyped] + end + + class Context + end end module JsonSerde diff --git a/sorbet/rbi/shims/restate_internal.rbi b/sorbet/rbi/shims/restate_internal.rbi index c5f3f42..8485580 100644 --- a/sorbet/rbi/shims/restate_internal.rbi +++ b/sorbet/rbi/shims/restate_internal.rbi @@ -71,8 +71,12 @@ module Restate class NotReady; end class DoWaitPendingRun; end class RunRetryConfig; end - class Server - def initialize(endpoint); end + module Server + class Handler + def initialize(endpoint); end + end + + class Context; end end # Server-level types diff --git a/spec/server_context_outbound_middleware_spec.rb b/spec/server_context_outbound_middleware_spec.rb index 5c3f7b0..7867ef0 100644 --- a/spec/server_context_outbound_middleware_spec.rb +++ b/spec/server_context_outbound_middleware_spec.rb @@ -16,7 +16,7 @@ def run(service, handler, headers, handler_meta: nil, &action) private - # Extracted verbatim from ServerContext to test in isolation. + # Extracted verbatim from Server::Context to test in isolation. def with_outbound_middleware(service, handler, headers, handler_meta: nil, &action) return action.call(headers) if @outbound_middleware.empty? @@ -50,7 +50,7 @@ def call(_service, _handler, headers) end end -RSpec.describe 'ServerContext#with_outbound_middleware handler_meta plumbing' do +RSpec.describe 'Server::Context#with_outbound_middleware handler_meta plumbing' do def make_handler(kind:, service_name: 'TestVO') Restate::Handler.new( service_tag: Restate::ServiceTag.new(kind: 'object', name: service_name),