Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 51 additions & 254 deletions Cargo.lock

Large diffs are not rendered by default.

50 changes: 43 additions & 7 deletions docs/INTERNALS.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ lib/
├── vm.rb VMWrapper — Ruby bridge to native VM
└── workflow.rb Workflow class + main/handler DSL + .call/.send!
ext/restate_internal/
├── Cargo.toml Depends on restate-sdk-shared-core 0.7.0, magnus 0.7
├── Cargo.toml Depends on restate-sdk-shared-core (git rev, cooperative suspensions), magnus 0.8
└── src/lib.rs Rust ↔ Ruby bindings (~1095 lines)

spec/
Expand Down Expand Up @@ -122,7 +122,7 @@ Thin Ruby wrapper that:
2. Delegates all `sys_*` calls.
3. Maps native result types to Ruby-side types (e.g., `Internal::Suspended` → `Restate::Suspended`,
`Internal::Failure` → `Restate::Failure`).
4. Catches `Internal::VMError` from `do_progress`/`take_notification` and returns it as a value
4. Catches `Internal::VMError` from `do_await`/`take_notification` and returns it as a value
(not raised), so `Server::Context` can handle it.

**Key types defined here:**
Expand Down Expand Up @@ -285,7 +285,7 @@ Generates the JSON manifest returned at `GET /discover`. Maps internal types to
- Service kinds: `service`→`SERVICE`, `object`→`VIRTUAL_OBJECT`, `workflow`→`WORKFLOW`
- Handler kinds: `exclusive`→`EXCLUSIVE`, `shared`→`SHARED`, `workflow`→`WORKFLOW`
- Protocol mode: `bidi`→`BIDI_STREAM`, `request_response`→`REQUEST_RESPONSE`
- Protocol versions: min=5, max=5
- Protocol versions: min=5, max=7

### Test Harness (`lib/restate/testing.rb`)

Expand Down Expand Up @@ -364,7 +364,7 @@ loop:
flush_output() ──► drain vm.take_output → output_queue
vm.do_progress(handles)
vm.do_await(future_tree)
├── AnyCompleted → return (handle is done)
├── ReadFromInput → dequeue from input_queue
Expand Down Expand Up @@ -415,7 +415,7 @@ vm.take_output → output_queue.enqueue(chunk) ··· output_queue.dequeue → y
│ chunk = read_partial │ │ invoke_handler(...) │
│ input_queue << chunk │ │ poll_or_cancel: │
│ ensure: │ │ flush_output → out_q│
│ input_queue << :eof │ │ do_progress
│ input_queue << :eof │ │ do_await
│ │ │ input_q.dequeue │
│ │ │ drain remaining output │
│ │ │ output_queue << nil │
Expand All @@ -432,7 +432,7 @@ The VM is a synchronous state machine. It does not do I/O itself. The SDK drives
2. Check readiness → `is_ready_to_execute()`
3. Get invocation → `sys_input()`
4. Issue syscalls → `sys_get_state`, `sys_run`, `sys_call`, etc. (returns handles)
5. Drive progress → `do_progress(handles)` (tells you what to do next)
5. Drive progress → `do_await(future_tree)` (tells you what to do next)
6. Collect results → `take_notification(handle)` (gets completed values)
7. Drain output → `take_output()` (gets bytes to send back over HTTP/2)
8. Finish → `sys_write_output_success/failure` then `sys_end`
Expand Down Expand Up @@ -472,6 +472,42 @@ the same fiber chain (which is guaranteed by Async's cooperative scheduling).
| `sys_write_output_failure(failure)` | — | Write final handler error |
| `sys_end` | — | Finalize invocation |
| `is_replaying` | bool | Check if replaying from journal |
| `do_await(future)` | progress | Progress driver — accepts an `UnresolvedFuture` tree (or a single handle as the trivial leaf) |

---

## Combinators

`Restate.all` and `Restate.race` are layered on top of a tree-shaped
`UnresolvedFuture` that the shared-core consumes via `do_await`. The Ruby
encoding mirrors the native `WasmUnresolvedFuture`:

```
Integer handle → Single(handle)
[:first_completed, [child, ...]] → race / wait_any
[:all_completed, [child, ...]] → all_settled (future combinator)
[:first_succeeded_or_all_failed,[child, ...]] → any (future combinator)
[:all_succeeded_or_first_failed,[child, ...]] → all
[:unknown, [child, ...]] → escape hatch when shape isn't known
```

Children may be raw handles (leaves) or nested combinator pairs. The native
extension (`ext/restate_internal/src/lib.rs::parse_unresolved_future`) parses
the structure into `restate_sdk_shared_core::UnresolvedFuture` and hands it to
`CoreVM::do_await`.

Why the tree matters: the shared-core uses the combinator shape to decide when
the SDK can suspend. For `all`, suspension waits until no pending child can
possibly complete; for `race`, suspension waits until none is in flight. The
single-handle convenience path (`Server::Context#poll_or_cancel`) just wraps
its handle list into a `FirstCompleted` subtree and hands it to the same
`do_await` — there is no separate legacy entry point.

The Ruby-side public API is in `Restate.all` / `Restate.race`
(`lib/restate.rb`), implemented in `Server::Context#all` / `#race`
(`lib/restate/server/context.rb`). Both route through
`Server::Context#wait_combined`, which shares the progress-loop body with the
flat `poll_or_cancel`.

---

Expand Down Expand Up @@ -509,7 +545,7 @@ When user code calls `Restate.run('name') { ... }`:
- Enqueues `:run_completed` to `input_queue` to wake the progress loop
6. The progress loop continues and eventually `AnyCompleted` is returned for the handle

**During replay**, the VM already has the result from the journal. `do_progress` returns
**During replay**, the VM already has the result from the journal. `do_await` returns
`AnyCompleted` directly — the action block is never executed.

### Background Runs (`background: true`)
Expand Down
52 changes: 48 additions & 4 deletions docs/USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,49 @@ futures = tasks.map { |t| Restate.service_call(Worker, :process, t) }
results = futures.map(&:await)
```

### Wait Any (Racing Futures)
### Combinators (`Restate.all`, `Restate.race`, `Restate.any`, `Restate.all_settled`)

Wait for the first future to complete out of several.
The combinator methods return lazy `CombinedFuture` objects — nothing blocks
until you call `.await`. Because they're lazy futures, they **compose**: you
can pass one combinator into another, and the shared-core sees the full tree
and uses the combinator shape to make better suspension decisions.

```ruby
# Single-level: wait for all three calls.
results = Restate.all(
Restate.service_call(ServiceA, :work, arg1),
Restate.service_call(ServiceB, :work, arg2),
Restate.run('local-step') { compute() }
).await

# Compose: race an all-of group against a deadline.
group = Restate.all(future_a, future_b, future_c)
Restate.race(group, Restate.sleep(30)).await
```

All four accept either splat futures or a single `Array` and return a
`CombinedFuture`. Call `.await` to resolve.

| Method | JS analog | Awaited result |
|-------------------------|-------------------------|-------------------------------------------------------------------|
| `Restate.all(*fs)` | `Promise.all` | Array of values in input order; raises on first `TerminalError` |
| `Restate.race(*fs)` | `Promise.race` | Value of the first future to settle (raises if it failed) |
| `Restate.any(*fs)` | `Promise.any` | Value of the first success; raises only if every future failed |
| `Restate.all_settled(*fs)` | `Promise.allSettled` | Array of `{status: :fulfilled, value:}` / `{status: :rejected, reason:}` |

Common pattern — timeout via `race`:

```ruby
result = Restate.race(
Restate.service_call(Worker, :process, task),
Restate.sleep(30) # 30-second deadline
).await
```

**`Restate.wait_any(*futures) -> [completed, remaining]`** — lower-level
variant that just blocks until at least one future is ready and returns
the partition. Use this when you want to peek and continue rather than
commit to a single winner:

```ruby
future_a = Restate.service_call(ServiceA, :slow, arg)
Expand Down Expand Up @@ -1273,8 +1313,12 @@ Restate.peek_promise(name) -> value | nil
Restate.resolve_promise(name, payload)
Restate.reject_promise(name, message, code: 500)

# Futures
Restate.wait_any(*futures) -> [completed, remaining]
# Futures (combinators are lazy — call .await to resolve)
Restate.all(*futures) -> CombinedFuture (awaits to Array)
Restate.race(*futures) -> CombinedFuture (awaits to value)
Restate.any(*futures) -> CombinedFuture (awaits to first successful value)
Restate.all_settled(*futures) -> CombinedFuture (awaits to Array of outcome Hashes)
Restate.wait_any(*futures) -> [completed, remaining] # eager, not a future

# Metadata
Restate.request -> Request{id, headers, body}
Expand Down
4 changes: 2 additions & 2 deletions etc/run-integration-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ set -euo pipefail
# ./etc/run-integration-tests.sh --skip-build # run tests only (reuse existing image)

REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
SDK_TEST_SUITE_VERSION="v1.0"
SDK_TEST_SUITE_VERSION="v2.1"
JAR_URL="https://github.com/restatedev/e2e/releases/download/${SDK_TEST_SUITE_VERSION}/sdk-tests.jar"
JAR_PATH="${REPO_ROOT}/tmp/sdk-tests.jar"
SERVICE_IMAGE="restatedev/test-services-ruby"
Expand Down Expand Up @@ -60,6 +60,6 @@ docker run --rm \
--exclusions-file=/opt/exclusions.yaml \
--service-container-env-file=/opt/service.env \
--report-dir=/opt/test-report \
"${SERVICE_IMAGE}"
--service-container-image="${SERVICE_IMAGE}"

echo "==> Done. Test report: ${REPORT_DIR}"
32 changes: 25 additions & 7 deletions examples/service_communication.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# - Service.send!.handler(arg) — fluent fire-and-forget (optionally delayed)
# - Restate.service_call / Restate.service_send — explicit RPC (same thing, verbose)
# - Fan-out/fan-in — launch concurrent calls, collect results
# - Restate.all — wait for every future, short-circuit on terminal failure
# - Restate.race — return the first future to settle (Promise.race semantics)
# - Restate.wait_any — race multiple futures, handle first completer
# - Restate.awakeable — pause until an external system calls back
#
Expand Down Expand Up @@ -48,15 +50,31 @@ class FanOut < Restate::Service
{ 'results' => results }
end

# Race two calls and return the first result.
# Wait for every parallel call to finish. Returns an Array in input order,
# short-circuiting with a TerminalError if any one call fails.
handler def all(tasks)
futures = tasks.map { |task| Worker.call.process(task) }
Restate.all(*futures).await
end

# Race two calls and return the first to settle.
handler def race(tasks)
futures = tasks.map do |task|
Worker.call.process(task)
end
futures = tasks.map { |task| Worker.call.process(task) }
Restate.race(*futures).await
end

# Race a call against a sleep — handy for hard deadlines.
handler def race_with_deadline(task)
Restate.race(
Worker.call.process(task),
Restate.sleep(30) # 30-second deadline (winner is `nil` if the timer fires)
).await
end

# wait_any returns [completed, remaining]
completed, _remaining = Restate.wait_any(*futures)
completed.first.await
# Compose combinators: race an all-of group against a deadline.
handler def composed(tasks)
group = Restate.all(*tasks.map { |t| Worker.call.process(t) })
Restate.race(group, Restate.sleep(30)).await
end

# Awakeable: pause until an external system resolves the callback.
Expand Down
2 changes: 1 addition & 1 deletion ext/restate_internal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ doc = false
[dependencies]
magnus = { version = "0.8", features = ["rb-sys"] }
rb-sys = { version = "0.9", features = ["stable-api-compiled-fallback"] }
restate-sdk-shared-core = { version = "=0.7.0", features = ["request_identity", "sha2_random_seed"] }
restate-sdk-shared-core = { git = "https://github.com/restatedev/sdk-shared-core.git", rev = "f6f6e4830226161a441dd7f1063e255cdc4052c1", features = ["request_identity", "sha2_random_seed"] }
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
Loading
Loading