From b2d4b797268a94c948a338659f88e1f6c65a1165 Mon Sep 17 00:00:00 2001 From: igalshilman Date: Thu, 21 May 2026 20:25:33 +0200 Subject: [PATCH 1/3] Introduce combinators --- Cargo.lock | 305 +++--------------- docs/INTERNALS.md | 37 ++- docs/USER_GUIDE.md | 40 ++- etc/run-integration-tests.sh | 4 +- examples/service_communication.rb | 26 +- ext/restate_internal/Cargo.toml | 2 +- ext/restate_internal/src/lib.rs | 144 ++++++++- lib/restate.rb | 28 ++ lib/restate/durable_future.rb | 54 +++- lib/restate/server/context.rb | 80 ++++- lib/restate/vm.rb | 10 + sig/restate.rbs | 14 + sorbet/rbi/shims/restate_internal.rbi | 1 + spec/harness_spec.rb | 67 +++- test-services/services/test_utils.rb | 10 + .../virtual_object_command_interpreter.rb | 72 ++++- 16 files changed, 589 insertions(+), 305 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 747c648..24bd58e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,12 +17,6 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" -[[package]] -name = "autocfg" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" - [[package]] name = "base64" version = "0.22.1" @@ -38,7 +32,7 @@ dependencies = [ "bitflags", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools", "proc-macro2", "quote", "regex", @@ -55,11 +49,11 @@ checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" [[package]] name = "block-buffer" -version = "0.11.0-pre.5" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ded684142010808eb980d9974ef794da2bcf97d13396143b1515e9f0fb4a10e" +checksum = "cdd35008169921d80bc60d3d0ab416eecb028c4cd653352907921d95084790be" dependencies = [ - "crypto-common", + "hybrid-array", ] [[package]] @@ -93,16 +87,6 @@ dependencies = [ "either", ] -[[package]] -name = "cc" -version = "1.2.56" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aebf35691d1bfb0ac386a69bac2fde4dd276fb618cf8bf4f5318fe285e821bb2" -dependencies = [ - "find-msvc-tools", - "shlex", -] - [[package]] name = "cexpr" version = "0.6.0" @@ -131,44 +115,33 @@ dependencies = [ [[package]] name = "const-oid" -version = "0.10.0-pre.2" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7e3352a27098ba6b09546e5f13b15165e6a88b5c2723afecb3ea9576b27e3ea" +checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" [[package]] name = "cpufeatures" -version = "0.2.17" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" dependencies = [ "libc", ] [[package]] name = "crypto-common" -version = "0.2.0-pre.5" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7aa2ec04f5120b830272a481e8d9d8ba4dda140d2cda59b0f1110d5eb93c38e" +checksum = "ce6e4c961d6cd6c9a86db418387425e8bdeaf05b3c8bc1411e6dca4c252f1453" dependencies = [ - "getrandom", "hybrid-array", - "rand_core", -] - -[[package]] -name = "deranged" -version = "0.5.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" -dependencies = [ - "powerfmt", ] [[package]] name = "digest" -version = "0.11.0-pre.8" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065d93ead7c220b85d5b4be4795d8398eac4ff68b5ee63895de0a3c1fb6edf25" +checksum = "f1dd6dbb5841937940781866fa1281a1ff7bd3bf827091440879f9994983d5c2" dependencies = [ "block-buffer", "const-oid", @@ -181,12 +154,6 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" -[[package]] -name = "find-msvc-tools" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" - [[package]] name = "getrandom" version = "0.2.17" @@ -194,10 +161,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", - "js-sys", "libc", "wasi", - "wasm-bindgen", ] [[package]] @@ -214,9 +179,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hybrid-array" -version = "0.2.3" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2d35805454dc9f8662a98d6d61886ffe26bd465f5960e0e55345c70d5c0d2a9" +checksum = "9155a582abd142abc056962c29e3ce5ff2ad5469f4246b537ed42c5deba857da" dependencies = [ "typenum", ] @@ -230,15 +195,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" -dependencies = [ - "either", -] - [[package]] name = "itoa" version = "1.0.17" @@ -257,17 +213,17 @@ dependencies = [ [[package]] name = "jsonwebtoken" -version = "9.3.1" +version = "10.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +checksum = "eba32bfb4ffdeaca3e34431072faf01745c9b26d25504aa7a6cf5684334fc4fc" dependencies = [ "base64", + "getrandom", "js-sys", - "pem", - "ring", "serde", "serde_json", - "simple_asn1", + "signature", + "zeroize", ] [[package]] @@ -358,41 +314,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", -] - -[[package]] -name = "num-bigint" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" -dependencies = [ - "num-integer", - "num-traits", -] - -[[package]] -name = "num-conv" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" - -[[package]] -name = "num-integer" -version = "0.1.46" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" -dependencies = [ - "num-traits", -] - -[[package]] -name = "num-traits" -version = "0.2.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" -dependencies = [ - "autocfg", + "windows-sys", ] [[package]] @@ -402,20 +324,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" [[package]] -name = "paste" -version = "1.0.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" - -[[package]] -name = "pem" -version = "3.0.6" +name = "pastey" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" -dependencies = [ - "base64", - "serde_core", -] +checksum = "2ee67f1008b1ba2321834326597b8e186293b049a023cdef258527550b9935b4" [[package]] name = "pin-project-lite" @@ -423,12 +335,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" -[[package]] -name = "powerfmt" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" - [[package]] name = "proc-macro2" version = "1.0.106" @@ -440,9 +346,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.13.5" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" dependencies = [ "bytes", "prost-derive", @@ -450,12 +356,12 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.5" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools", "proc-macro2", "quote", "syn", @@ -540,18 +446,16 @@ checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" [[package]] name = "restate-sdk-shared-core" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc31df792d23376df8d3a6d4264ec3ce658bf68f0ec5296da43de8377c9352aa" +version = "0.10.0" +source = "git+https://github.com/restatedev/sdk-shared-core.git?rev=f6f6e4830226161a441dd7f1063e255cdc4052c1#f6f6e4830226161a441dd7f1063e255cdc4052c1" dependencies = [ "base64", "bs58", "bytes", "bytes-utils", "jsonwebtoken", - "paste", + "pastey", "prost", - "ring", "serde", "sha2", "strum", @@ -569,20 +473,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "ring" -version = "0.17.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" -dependencies = [ - "cc", - "cfg-if", - "getrandom", - "libc", - "untrusted", - "windows-sys 0.52.0", -] - [[package]] name = "rustc-hash" version = "2.1.2" @@ -646,9 +536,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.11.0-pre.3" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f33549bf3064b62478926aa89cbfc7c109aab66ae8f0d5d2ef839e482cc30d6" +checksum = "446ba717509524cb3f22f17ecc096f10f4822d76ab5c0b9822c5f9c284e825f4" dependencies = [ "cfg-if", "cpufeatures", @@ -677,15 +567,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] -name = "simple_asn1" -version = "0.6.4" +name = "signature" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d585997b0ac10be3c5ee635f1bab02d512760d14b7c468801ac8a01d9ae5f1d" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ - "num-bigint", - "num-traits", - "thiserror", - "time", + "rand_core", ] [[package]] @@ -755,37 +642,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "time" -version = "0.3.47" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" -dependencies = [ - "deranged", - "itoa", - "num-conv", - "powerfmt", - "serde_core", - "time-core", - "time-macros", -] - -[[package]] -name = "time-core" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" - -[[package]] -name = "time-macros" -version = "0.2.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" -dependencies = [ - "num-conv", - "time-core", -] - [[package]] name = "tinyvec" version = "1.10.0" @@ -864,9 +720,9 @@ dependencies = [ [[package]] name = "typenum" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" [[package]] name = "unicode-ident" @@ -874,12 +730,6 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" -[[package]] -name = "untrusted" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" - [[package]] name = "valuable" version = "0.1.1" @@ -943,15 +793,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" -[[package]] -name = "windows-sys" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" -dependencies = [ - "windows-targets", -] - [[package]] name = "windows-sys" version = "0.61.2" @@ -962,68 +803,24 @@ dependencies = [ ] [[package]] -name = "windows-targets" -version = "0.52.6" +name = "zeroize" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "zeroize_derive", ] [[package]] -name = "windows_aarch64_gnullvm" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" - -[[package]] -name = "windows_aarch64_msvc" -version = "0.52.6" +name = "zeroize_derive" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" - -[[package]] -name = "windows_i686_gnu" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" - -[[package]] -name = "windows_i686_gnullvm" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" - -[[package]] -name = "windows_i686_msvc" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" - -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "zmij" diff --git a/docs/INTERNALS.md b/docs/INTERNALS.md index b7aa312..d415087 100644 --- a/docs/INTERNALS.md +++ b/docs/INTERNALS.md @@ -59,7 +59,7 @@ lib/ ├── vm.rb VMWrapper — Ruby bridge to native VM └── workflow.rb Workflow class + main/handler DSL + .call/.send! ext/restate_internal/ -├── Cargo.toml Depends on restate-sdk-shared-core 0.7.0, magnus 0.7 +├── Cargo.toml Depends on restate-sdk-shared-core (git rev, cooperative suspensions), magnus 0.8 └── src/lib.rs Rust ↔ Ruby bindings (~1095 lines) spec/ @@ -472,6 +472,41 @@ the same fiber chain (which is guaranteed by Async's cooperative scheduling). | `sys_write_output_failure(failure)` | — | Write final handler error | | `sys_end` | — | Finalize invocation | | `is_replaying` | bool | Check if replaying from journal | +| `do_await(future)` | progress | Tree-aware progress driver — see [Combinators](#combinators) | + +--- + +## Combinators + +`Restate.all` and `Restate.race` are layered on top of a tree-shaped +`UnresolvedFuture` that the shared-core consumes via `do_await`. The Ruby +encoding mirrors the native `WasmUnresolvedFuture`: + +``` +Integer handle → Single(handle) +[:first_completed, [child, ...]] → race / wait_any +[:all_completed, [child, ...]] → all_settled (future combinator) +[:first_succeeded_or_all_failed,[child, ...]] → any (future combinator) +[:all_succeeded_or_first_failed,[child, ...]] → all +[:unknown, [child, ...]] → escape hatch when shape isn't known +``` + +Children may be raw handles (leaves) or nested combinator pairs. The native +extension (`ext/restate_internal/src/lib.rs::parse_unresolved_future`) parses +the structure into `restate_sdk_shared_core::UnresolvedFuture` and hands it to +`CoreVM::do_await`. + +Why the tree matters: the shared-core uses the combinator shape to decide when +the SDK can suspend. For `all`, suspension waits until no pending child can +possibly complete; for `race`, suspension waits until none is in flight. The +legacy flat `do_progress(handles)` path still works — it wraps the handle list +into `FirstCompleted` internally — so non-combinator callers stay unchanged. + +The Ruby-side public API is in `Restate.all` / `Restate.race` +(`lib/restate.rb`), implemented in `Server::Context#all` / `#race` +(`lib/restate/server/context.rb`). Both route through +`Server::Context#wait_combined`, which shares the progress-loop body with the +flat `poll_or_cancel`. --- diff --git a/docs/USER_GUIDE.md b/docs/USER_GUIDE.md index 9601903..62d3234 100644 --- a/docs/USER_GUIDE.md +++ b/docs/USER_GUIDE.md @@ -371,9 +371,43 @@ futures = tasks.map { |t| Restate.service_call(Worker, :process, t) } results = futures.map(&:await) ``` -### Wait Any (Racing Futures) +### Combinators (`Restate.all`, `Restate.race`, `Restate.wait_any`) -Wait for the first future to complete out of several. +The SDK ships three ways to wait on multiple durable futures. All three are +*cooperative*: the shared-core sees the combinator shape and uses it to make +better suspension decisions during replay. + +**`Restate.all(*futures) -> Array`** — wait for every future to settle. +Returns the values in input order. Short-circuits on the first +`TerminalError` (any in-flight futures remain in the journal — they're +durable, so they don't need cancellation). Semantics match JS `Promise.all`. + +```ruby +results = Restate.all( + Restate.service_call(ServiceA, :work, arg1), + Restate.service_call(ServiceB, :work, arg2), + Restate.run('local-step') { compute() } +) +``` + +`Restate.all` accepts either splat futures or a single Array — both forms +behave identically. + +**`Restate.race(*futures) -> value`** — wait for the first future to +settle and return its value. Raises if the winning future failed. Useful +for timeouts: + +```ruby +result = Restate.race( + Restate.service_call(Worker, :process, task), + Restate.sleep(30) # 30-second deadline +) +``` + +**`Restate.wait_any(*futures) -> [completed, remaining]`** — lower-level +variant that just blocks until at least one future is ready and returns +the partition. Use this when you want to peek and continue rather than +commit to a single winner: ```ruby future_a = Restate.service_call(ServiceA, :slow, arg) @@ -1274,6 +1308,8 @@ Restate.resolve_promise(name, payload) Restate.reject_promise(name, message, code: 500) # Futures +Restate.all(*futures) -> Array +Restate.race(*futures) -> value Restate.wait_any(*futures) -> [completed, remaining] # Metadata diff --git a/etc/run-integration-tests.sh b/etc/run-integration-tests.sh index 8ee09e9..b23f153 100755 --- a/etc/run-integration-tests.sh +++ b/etc/run-integration-tests.sh @@ -11,7 +11,7 @@ set -euo pipefail # ./etc/run-integration-tests.sh --skip-build # run tests only (reuse existing image) REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)" -SDK_TEST_SUITE_VERSION="v1.0" +SDK_TEST_SUITE_VERSION="v2.1" JAR_URL="https://github.com/restatedev/e2e/releases/download/${SDK_TEST_SUITE_VERSION}/sdk-tests.jar" JAR_PATH="${REPO_ROOT}/tmp/sdk-tests.jar" SERVICE_IMAGE="restatedev/test-services-ruby" @@ -60,6 +60,6 @@ docker run --rm \ --exclusions-file=/opt/exclusions.yaml \ --service-container-env-file=/opt/service.env \ --report-dir=/opt/test-report \ - "${SERVICE_IMAGE}" + --service-container-image="${SERVICE_IMAGE}" echo "==> Done. Test report: ${REPORT_DIR}" diff --git a/examples/service_communication.rb b/examples/service_communication.rb index d65c3a3..e7c54ed 100644 --- a/examples/service_communication.rb +++ b/examples/service_communication.rb @@ -12,6 +12,8 @@ # - Service.send!.handler(arg) — fluent fire-and-forget (optionally delayed) # - Restate.service_call / Restate.service_send — explicit RPC (same thing, verbose) # - Fan-out/fan-in — launch concurrent calls, collect results +# - Restate.all — wait for every future, short-circuit on terminal failure +# - Restate.race — return the first future to settle (Promise.race semantics) # - Restate.wait_any — race multiple futures, handle first completer # - Restate.awakeable — pause until an external system calls back # @@ -48,15 +50,25 @@ class FanOut < Restate::Service { 'results' => results } end - # Race two calls and return the first result. + # Wait for every parallel call to finish. Returns an Array in input order, + # short-circuiting with a TerminalError if any one call fails. + handler def all(tasks) + futures = tasks.map { |task| Worker.call.process(task) } + Restate.all(*futures) + end + + # Race two calls and return the first to settle. handler def race(tasks) - futures = tasks.map do |task| - Worker.call.process(task) - end + futures = tasks.map { |task| Worker.call.process(task) } + Restate.race(*futures) + end - # wait_any returns [completed, remaining] - completed, _remaining = Restate.wait_any(*futures) - completed.first.await + # Race a call against a sleep — handy for hard deadlines. + handler def race_with_deadline(task) + Restate.race( + Worker.call.process(task), + Restate.sleep(30) # 30-second deadline (winner is `nil` if the timer fires) + ) end # Awakeable: pause until an external system resolves the callback. diff --git a/ext/restate_internal/Cargo.toml b/ext/restate_internal/Cargo.toml index 46348cb..fac1222 100644 --- a/ext/restate_internal/Cargo.toml +++ b/ext/restate_internal/Cargo.toml @@ -12,5 +12,5 @@ doc = false [dependencies] magnus = { version = "0.8", features = ["rb-sys"] } rb-sys = { version = "0.9", features = ["stable-api-compiled-fallback"] } -restate-sdk-shared-core = { version = "=0.7.0", features = ["request_identity", "sha2_random_seed"] } +restate-sdk-shared-core = { git = "https://github.com/restatedev/sdk-shared-core.git", rev = "f6f6e4830226161a441dd7f1063e255cdc4052c1", features = ["request_identity", "sha2_random_seed"] } tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } diff --git a/ext/restate_internal/src/lib.rs b/ext/restate_internal/src/lib.rs index 96d3397..4f494d9 100644 --- a/ext/restate_internal/src/lib.rs +++ b/ext/restate_internal/src/lib.rs @@ -1,14 +1,13 @@ use magnus::{ function, method, - prelude::*, value::ReprValue, - Error, ExceptionClass, Module, Object, RArray, RString, Ruby, Value, + Error, ExceptionClass, Module, Object, RArray, RString, Ruby, Symbol, TryConvert, Value, }; use restate_sdk_shared_core::{ - CallHandle, CoreVM, DoProgressResponse, Error as CoreError, Header, IdentityVerifier, Input, + AwaitResponse, CallHandle, CoreVM, Error as CoreError, Header, IdentityVerifier, Input, NonEmptyValue, NotificationHandle, ResponseHead, RetryPolicy, RunExitResult, - TakeOutputResult, Target, TerminalFailure, VMOptions, Value as CoreValue, VM, - CANCEL_NOTIFICATION_HANDLE, + TakeOutputResult, Target, TerminalFailure, UnresolvedFuture, VMOptions, Value as CoreValue, + VM, CANCEL_NOTIFICATION_HANDLE, }; use std::cell::RefCell; use std::fmt; @@ -438,30 +437,74 @@ impl RbVM { fn do_progress(&self, handles: RArray) -> Result { let ruby = Ruby::get().map_err(|_| Error::new(vm_error_class(), "Ruby not available"))?; let handle_vec: Vec = handles.to_vec()?; - let notification_handles: Vec = - handle_vec.into_iter().map(NotificationHandle::from).collect(); + let future = unresolved_future_from_handles(handle_vec); - let res = self.vm.borrow_mut().do_progress(notification_handles); + let res = self.vm.borrow_mut().do_await(future); match res { Err(e) if e.is_suspended_error() => Ok(ruby.into_value(RbSuspended)), Err(e) => Err(core_error_to_magnus(e)), - Ok(DoProgressResponse::AnyCompleted) => { + Ok(AwaitResponse::AnyCompleted) => { Ok(ruby.into_value(RbDoProgressAnyCompleted)) } - Ok(DoProgressResponse::ReadFromInput) => { - Ok(ruby.into_value(RbDoProgressReadFromInput)) + Ok(AwaitResponse::ExecuteRun(handle)) => Ok(ruby.into_value( + RbDoProgressExecuteRun { + handle: handle.into(), + }, + )), + Ok(AwaitResponse::CancelSignalReceived) => { + Ok(ruby.into_value(RbDoProgressCancelSignalReceived)) + } + Ok(AwaitResponse::WaitingExternalProgress { waiting_input, .. }) => { + // If we still expect more input, surface ReadFromInput so the Ruby side + // dequeues from the input queue (which carries body chunks, eof, and + // run-completion signals). When input is closed, only a pending run + // can wake us, hence DoWaitForPendingRun. + if waiting_input { + Ok(ruby.into_value(RbDoProgressReadFromInput)) + } else { + Ok(ruby.into_value(RbDoWaitForPendingRun)) + } + } + } + } + + // Cooperative-suspension variant of do_progress. Accepts a tree describing a + // combinator over notification handles; the shared-core uses the tree shape to + // decide when the whole combinator can make progress. + // + // The Ruby-side encoding is: + // Integer handle → Single(handle) + // [:first_completed, [child, ...]] → race / wait_any + // [:all_completed, [child, ...]] → all_settled + // [:first_succeeded_or_all_failed, [child, ...]] → any + // [:all_succeeded_or_first_failed, [child, ...]] → all + // [:unknown, [child, ...]] → unknown combinator + fn do_await(&self, future_value: Value) -> Result { + let ruby = Ruby::get().map_err(|_| Error::new(vm_error_class(), "Ruby not available"))?; + let future = parse_unresolved_future(future_value)?; + let res = self.vm.borrow_mut().do_await(future); + + match res { + Err(e) if e.is_suspended_error() => Ok(ruby.into_value(RbSuspended)), + Err(e) => Err(core_error_to_magnus(e)), + Ok(AwaitResponse::AnyCompleted) => { + Ok(ruby.into_value(RbDoProgressAnyCompleted)) } - Ok(DoProgressResponse::ExecuteRun(handle)) => Ok(ruby.into_value( + Ok(AwaitResponse::ExecuteRun(handle)) => Ok(ruby.into_value( RbDoProgressExecuteRun { handle: handle.into(), }, )), - Ok(DoProgressResponse::CancelSignalReceived) => { + Ok(AwaitResponse::CancelSignalReceived) => { Ok(ruby.into_value(RbDoProgressCancelSignalReceived)) } - Ok(DoProgressResponse::WaitingPendingRun) => { - Ok(ruby.into_value(RbDoWaitForPendingRun)) + Ok(AwaitResponse::WaitingExternalProgress { waiting_input, .. }) => { + if waiting_input { + Ok(ruby.into_value(RbDoProgressReadFromInput)) + } else { + Ok(ruby.into_value(RbDoWaitForPendingRun)) + } } } } @@ -590,9 +633,12 @@ impl RbVM { handler, key: key_opt, idempotency_key: idem_opt, + scope: None, + limit_key: None, headers: hdr_vec, }, bytes.into(), + None, Default::default(), ) .map(Into::into) @@ -639,6 +685,8 @@ impl RbVM { handler, key: key_opt, idempotency_key: idem_opt, + scope: None, + limit_key: None, headers: hdr_vec, }, bytes.into(), @@ -648,6 +696,7 @@ impl RbVM { .expect("Duration since unix epoch cannot fail") + Duration::from_millis(millis) }), + None, Default::default(), ) .map(|s| s.invocation_id_notification_handle.into()) @@ -926,6 +975,70 @@ Or remove the begin/rescue altogether if you don't need it. // ── Helpers ── +// Recursive Ruby → UnresolvedFuture parser. Accepts an Integer handle, or a +// [Symbol, Array[children]] pair encoding a combinator node. See the do_await +// docstring for the variant tags. +fn parse_unresolved_future(value: Value) -> Result { + let ruby = Ruby::get().map_err(|_| Error::new(vm_error_class(), "Ruby not available"))?; + + if let Ok(handle) = u32::try_convert(value) { + return Ok(UnresolvedFuture::Single(NotificationHandle::from(handle))); + } + + let pair = RArray::try_convert(value).map_err(|_| { + Error::new( + ruby.exception_arg_error(), + "UnresolvedFuture must be a Integer handle or [Symbol, Array] pair", + ) + })?; + if pair.len() != 2 { + return Err(Error::new( + ruby.exception_arg_error(), + "UnresolvedFuture combinator pair must be [Symbol, Array]", + )); + } + let tag: Symbol = pair.entry(0)?; + let children_arr: RArray = pair.entry(1)?; + let mut children = Vec::with_capacity(children_arr.len()); + for child in children_arr.into_iter() { + children.push(parse_unresolved_future(child)?); + } + let tag_name = tag.name().map_err(|_| { + Error::new(ruby.exception_arg_error(), "UnresolvedFuture tag is not a Symbol") + })?; + match tag_name.as_ref() { + "first_completed" => Ok(UnresolvedFuture::FirstCompleted(children)), + "all_completed" => Ok(UnresolvedFuture::AllCompleted(children)), + "first_succeeded_or_all_failed" => { + Ok(UnresolvedFuture::FirstSucceededOrAllFailed(children)) + } + "all_succeeded_or_first_failed" => { + Ok(UnresolvedFuture::AllSucceededOrFirstFailed(children)) + } + "unknown" => Ok(UnresolvedFuture::Unknown(children)), + other => Err(Error::new( + ruby.exception_arg_error(), + format!("Unknown UnresolvedFuture combinator: {other}"), + )), + } +} + +// Wraps a flat list of notification handles into the tree-shaped UnresolvedFuture +// the shared-core now expects. Mirrors the legacy "wait for any of these" semantics: +// a single handle becomes Single, multiple handles become FirstCompleted(Single, ...). +fn unresolved_future_from_handles(handles: Vec) -> UnresolvedFuture { + match handles.as_slice() { + [] => UnresolvedFuture::FirstCompleted(Vec::new()), + [h] => UnresolvedFuture::Single(NotificationHandle::from(*h)), + _ => UnresolvedFuture::FirstCompleted( + handles + .into_iter() + .map(|h| UnresolvedFuture::Single(NotificationHandle::from(h))) + .collect(), + ), + } +} + fn parse_headers_array(ary: RArray) -> Result, Error> { let mut result = Vec::new(); for item in ary.into_iter() { @@ -1094,6 +1207,7 @@ fn init(ruby: &Ruby) -> Result<(), Error> { vm_class.define_method("is_ready_to_execute", method!(RbVM::is_ready_to_execute, 0))?; vm_class.define_method("is_completed", method!(RbVM::is_completed, 1))?; vm_class.define_method("do_progress", method!(RbVM::do_progress, 1))?; + vm_class.define_method("do_await", method!(RbVM::do_await, 1))?; vm_class.define_method("take_notification", method!(RbVM::take_notification, 1))?; vm_class.define_method("sys_input", method!(RbVM::sys_input, 0))?; vm_class.define_method("sys_get_state", method!(RbVM::sys_get_state, 1))?; diff --git a/lib/restate.rb b/lib/restate.rb index 1d557bf..2057169 100644 --- a/lib/restate.rb +++ b/lib/restate.rb @@ -293,6 +293,34 @@ def wait_any(*futures) fetch_context!.wait_any(*futures) end + # Wait for every future to complete and return their values in input order. + # Short-circuits on the first +TerminalError+. Accepts either splat futures or + # a single Array. Semantics match JS +Promise.all+. + def all(*futures) + fetch_context!.all(*futures) + end + + # Wait for the first future to settle and return its value. Raises if the + # winning future failed. Accepts either splat futures or a single Array. + # Semantics match JS +Promise.race+. + def race(*futures) + fetch_context!.race(*futures) + end + + # Wait for the first successful future and return its value. Raises only if + # every future failed terminally. Accepts splat or single Array. + # Semantics match JS +Promise.any+. + def any(*futures) + fetch_context!.any(*futures) + end + + # Wait for every future to settle. Returns an Array of outcome descriptors + # (+{status: :fulfilled, value: ...}+ or +{status: :rejected, reason: ...}+), + # in input order. Semantics match JS +Promise.allSettled+. + def all_settled(*futures) + fetch_context!.all_settled(*futures) + end + # ── Request metadata ── # Returns metadata about the current invocation (id, headers, raw body). diff --git a/lib/restate/durable_future.rb b/lib/restate/durable_future.rb index 422235f..c841b0c 100644 --- a/lib/restate/durable_future.rb +++ b/lib/restate/durable_future.rb @@ -15,15 +15,16 @@ def initialize(ctx, handle, serde: nil) @value = nil end - # Block until the result is available and return it. Caches across calls. + # Block until the result is available and return it. Caches across calls, + # including failures — a second await on a failed future re-raises the + # same +TerminalError+ rather than re-fetching from the VM (the notification + # is single-shot). # # @return [Object] the deserialized result def await - unless @resolved - raw = @ctx.resolve_handle(@handle) - @value = @serde ? @serde.deserialize(raw) : raw - @resolved = true - end + resolve! unless @resolved + raise @error if @error + @value end @@ -33,6 +34,17 @@ def await def completed? @resolved || @ctx.completed?(@handle) end + + private + + def resolve! + raw = @ctx.resolve_handle(@handle) + @value = @serde ? @serde.deserialize(raw) : raw + rescue TerminalError => e + @error = e + ensure + @resolved = true + end end # A durable future for service/object/workflow calls. @@ -48,19 +60,31 @@ def initialize(ctx, result_handle, invocation_id_handle, output_serde:) end # Block until the result is available and return it. Deserializes via +output_serde+. + # Caches both successes and TerminalError failures across calls. def await - unless @resolved - raw = @ctx.resolve_handle(@handle) - @value = if raw.nil? || @output_serde.nil? - raw - else - @output_serde.deserialize(raw) - end - @resolved = true - end + resolve_call! unless @resolved + raise @error if @error + @value end + private + + def resolve_call! + raw = @ctx.resolve_handle(@handle) + @value = if raw.nil? || @output_serde.nil? + raw + else + @output_serde.deserialize(raw) + end + rescue TerminalError => e + @error = e + ensure + @resolved = true + end + + public + # Returns the invocation ID of the remote call. Lazily resolved. # # @return [String] the invocation ID diff --git a/lib/restate/server/context.rb b/lib/restate/server/context.rb index 164f1e7..a649314 100644 --- a/lib/restate/server/context.rb +++ b/lib/restate/server/context.rb @@ -169,6 +169,68 @@ def wait_any(*futures) futures.partition(&:completed?) end + # Wait for all futures to complete; return their values in input order. + # + # Semantics match JS +Promise.all+: short-circuit on the first +TerminalError+, + # leaving any still-pending futures in the journal. The shared-core uses the + # combinator shape to decide when to suspend, so this is more precise than a + # naive loop over individual awaits. + def all(*futures) + futures = futures.first if futures.length == 1 && futures.first.is_a?(Array) + return [] if futures.empty? + + wait_combined([:all_succeeded_or_first_failed, futures.map(&:handle)]) + futures.map(&:await) + end + + # Wait for the first future to settle; return its value. Raises +TerminalError+ + # if the winning future failed. Semantics match JS +Promise.race+. + def race(*futures) + futures = futures.first if futures.length == 1 && futures.first.is_a?(Array) + raise ArgumentError, 'race requires at least one future' if futures.empty? + + wait_combined([:first_completed, futures.map(&:handle)]) + futures.find(&:completed?).await + end + + # Wait for the first future to succeed; return its value. Raises a +TerminalError+ + # only if every future fails. Semantics match JS +Promise.any+. + def any(*futures) + futures = futures.first if futures.length == 1 && futures.first.is_a?(Array) + raise ArgumentError, 'any requires at least one future' if futures.empty? + + wait_combined([:first_succeeded_or_all_failed, futures.map(&:handle)]) + + errors = [] + futures.each do |f| + next unless f.completed? + + begin + return f.await + rescue TerminalError => e + errors << e + end + end + raise TerminalError.new("all futures failed: #{errors.map(&:message).join('; ')}", + status_code: 500) + end + + # Wait for every future to settle and return outcome descriptors, in input order. + # Each entry is +{ status: :fulfilled, value: ... }+ or + # +{ status: :rejected, reason: TerminalError }+. Semantics match JS +Promise.allSettled+. + def all_settled(*futures) + futures = futures.first if futures.length == 1 && futures.first.is_a?(Array) + return [] if futures.empty? + + wait_combined([:all_completed, futures.map(&:handle)]) + + futures.map do |f| + { status: :fulfilled, value: f.await } + rescue TerminalError => e + { status: :rejected, reason: e } + end + end + # ── Durable run (side effect) ── # Executes a durable side effect. The block runs at most once; its result is @@ -412,12 +474,26 @@ def poll_and_take(handle, &) end def poll_or_cancel(handles) + progress_loop { @vm.do_progress(handles) } + end + + # Drive progress over a combinator tree. Returns when the combinator + # logically completes (the shared-core decides based on the tree shape). + # +future_tree+ follows the encoding documented in lib/restate/vm.rb#do_await. + def wait_combined(future_tree) + progress_loop { @vm.do_await(future_tree) } + end + + # Shared progress-loop body for the flat (do_progress) and tree (do_await) + # entry points. The block makes one VM call per iteration and returns the + # response; this loop interprets it. + def progress_loop loop do flush_output - response = @vm.do_progress(handles) + response = yield if response.is_a?(Exception) - LOGGER.error("Exception in do_progress: #{response}") + LOGGER.error("Exception in progress loop: #{response}") raise InternalError end diff --git a/lib/restate/vm.rb b/lib/restate/vm.rb index 7d4618c..0d1af9d 100644 --- a/lib/restate/vm.rb +++ b/lib/restate/vm.rb @@ -97,6 +97,16 @@ def do_progress(handles) e end + # Tree-aware variant of do_progress. +future+ is either an Integer handle (Single) + # or a [tag_symbol, [children...]] pair (combinator). See lib/restate/combinator.rb + # and the Rust-side parse_unresolved_future for the supported tags. + def do_await(future) + result = @vm.do_await(future) + map_do_progress(result) + rescue Internal::VMError => e + e + end + def take_notification(handle) result = @vm.take_notification(handle) map_notification(result) diff --git a/sig/restate.rbs b/sig/restate.rbs index 66c2c89..84f658f 100644 --- a/sig/restate.rbs +++ b/sig/restate.rbs @@ -60,6 +60,14 @@ module Restate # ── Futures / Metadata / Control ── def self.wait_any: (*DurableFuture futures) -> [Array[DurableFuture], Array[DurableFuture]] + def self.all: (*DurableFuture futures) -> Array[untyped] + | (Array[DurableFuture] futures) -> Array[untyped] + def self.race: (*DurableFuture futures) -> untyped + | (Array[DurableFuture] futures) -> untyped + def self.any: (*DurableFuture futures) -> untyped + | (Array[DurableFuture] futures) -> untyped + def self.all_settled: (*DurableFuture futures) -> Array[Hash[Symbol, untyped]] + | (Array[DurableFuture] futures) -> Array[Hash[Symbol, untyped]] def self.request: () -> untyped def self.key: () -> String def self.cancel_invocation: (String invocation_id) -> void @@ -92,12 +100,18 @@ module Restate def initialize: (untyped ctx, Integer handle, ?serde: untyped) -> void def await: () -> untyped def completed?: () -> bool + + private + def resolve!: () -> void end class DurableCallFuture < DurableFuture def initialize: (untyped ctx, Integer result_handle, Integer invocation_id_handle, output_serde: untyped) -> void def invocation_id: () -> String def cancel: () -> void + + private + def resolve_call!: () -> void end class SendHandle diff --git a/sorbet/rbi/shims/restate_internal.rbi b/sorbet/rbi/shims/restate_internal.rbi index 8485580..e216e0f 100644 --- a/sorbet/rbi/shims/restate_internal.rbi +++ b/sorbet/rbi/shims/restate_internal.rbi @@ -14,6 +14,7 @@ module Restate def is_ready_to_execute?; end def sys_input; end def do_progress(handles); end + def do_await(future); end def take_output; end def take_notification(handle); end def is_completed(handle); end diff --git a/spec/harness_spec.rb b/spec/harness_spec.rb index b9ddd2b..325238c 100644 --- a/spec/harness_spec.rb +++ b/spec/harness_spec.rb @@ -171,6 +171,37 @@ class TestFluentOrchestrator < Restate::Service end end +# ── Combinator test services ────────────────────────────────── + +class TestCombinators < Restate::Service + handler def all_runs(_input) + futures = (1..3).map { |i| Restate.run("step-#{i}") { i * 10 } } + Restate.all(*futures) + end + + handler def all_empty(_input) + Restate.all + end + + handler def race_runs(_input) + fast = Restate.run('fast') { 'fast-result' } + slow = Restate.run('slow') { 'slow-result' } + Restate.race(fast, slow) + end + + handler def race_sleep_vs_value(_input) + quick = Restate.run('quick') { 'value' } + slow_timer = Restate.sleep(60) + Restate.race(quick, slow_timer) + end + + handler def all_short_circuits(_input) + ok = Restate.run('ok') { 'ok' } + bad = Restate.run('bad') { raise Restate::TerminalError.new('boom', status_code: 418) } + Restate.all(ok, bad) + end +end + # ── Signal test services ────────────────────────────────────── class TestSignal < Restate::Service @@ -231,7 +262,8 @@ def attach_invocation(base_url, invocation_id) TypedGreeter, MiddlewareTestService, TestDeclCounter, TestFluentWorker, TestFluentOrchestrator, OutboundTargetService, OutboundCallerService, - TestSignal + TestSignal, + TestCombinators ) do |endpoint| endpoint.use(TestHeaderMiddleware) endpoint.use_outbound(TestOutboundMiddleware) @@ -427,6 +459,39 @@ def attach_invocation(base_url, invocation_id) expect(result.body).to include("boom") end + # ── Combinators ── + + it "resolves all futures with Restate.all and returns values in input order" do + response = post_json(@harness.ingress_url, "/TestCombinators/all_runs", nil) + expect(response.code).to eq("200") + expect(JSON.parse(response.body)).to eq([10, 20, 30]) + end + + it "returns an empty array when Restate.all is called with no futures" do + response = post_json(@harness.ingress_url, "/TestCombinators/all_empty", nil) + expect(response.code).to eq("200") + expect(JSON.parse(response.body)).to eq([]) + end + + it "returns the first settled future with Restate.race" do + response = post_json(@harness.ingress_url, "/TestCombinators/race_runs", nil) + expect(response.code).to eq("200") + # Either run can win; both produce valid results. + expect(%w[fast-result slow-result]).to include(JSON.parse(response.body)) + end + + it "races a quick run against a long sleep and resolves with the run" do + response = post_json(@harness.ingress_url, "/TestCombinators/race_sleep_vs_value", nil) + expect(response.code).to eq("200") + expect(JSON.parse(response.body)).to eq('value') + end + + it "short-circuits Restate.all with the first TerminalError" do + response = post_json(@harness.ingress_url, "/TestCombinators/all_short_circuits", nil) + expect(response.code).to eq("418") + expect(response.body).to include('boom') + end + it "delivers two independently named signals" do inv_id = send_async(@harness.ingress_url, "/TestSignal/wait_for_two", nil) diff --git a/test-services/services/test_utils.rb b/test-services/services/test_utils.rb index 85f7d70..d2ad713 100644 --- a/test-services/services/test_utils.rb +++ b/test-services/services/test_utils.rb @@ -41,4 +41,14 @@ def rawEcho(input) # rubocop:disable Naming/MethodName futures.each(&:await) nil end + + handler def resolveSignal(req) # rubocop:disable Naming/MethodName + Restate.resolve_signal(req['invocationId'], req['signalName'], req['value']) + nil + end + + handler def rejectSignal(req) # rubocop:disable Naming/MethodName + Restate.reject_signal(req['invocationId'], req['signalName'], req['reason']) + nil + end end diff --git a/test-services/services/virtual_object_command_interpreter.rb b/test-services/services/virtual_object_command_interpreter.rb index 6009199..e494ed0 100644 --- a/test-services/services/virtual_object_command_interpreter.rb +++ b/test-services/services/virtual_object_command_interpreter.rb @@ -3,14 +3,21 @@ require 'json' require 'restate' +# Builds a DurableFuture for the given awaitable command type. The interpreter +# uses this to feed children of combinator commands. def create_future_for_command(cmd) # rubocop:disable Metrics/MethodLength case cmd['type'] when 'createAwakeable' awk_id, future = Restate.awakeable Restate.set("awk-#{cmd['awakeableKey']}", awk_id) [:awakeable, future] + when 'createSignal' + [:signal, Restate.signal(cmd['signalName'])] when 'sleep' [:sleep, Restate.sleep(cmd['timeoutMillis'] / 1000.0)] + when 'runReturns' + value = cmd['value'] + [:run, Restate.run('run returns value command') { value }] when 'runThrowTerminalException' reason = cmd['reason'] future = Restate.run('run should fail command') do @@ -21,14 +28,22 @@ def create_future_for_command(cmd) # rubocop:disable Metrics/MethodLength end def await_future_result(type, future) - # DurableFuture#await already deserializes via JsonSerde, so no extra JSON.parse needed. - # For sleep futures, the raw value is nil/empty — return a marker string. - return 'sleep' if type == :sleep - + # Always block until the future settles. DurableFuture#await deserializes via + # JsonSerde so no extra JSON.parse is needed. Sleep futures resolve to Void; + # surface them as the literal 'sleep' marker the test suite expects. future.await + type == :sleep ? 'sleep' : future.await +end + +# Helper used by awaitFirstSucceededOrAllFailed to identify the winning future. +# +.await+ on a future that already failed re-raises, so we swallow that here. +def safely_equal(future, value) + future.await == value +rescue Restate::TerminalError + false end -class VirtualObjectCommandInterpreter < Restate::VirtualObject +class VirtualObjectCommandInterpreter < Restate::VirtualObject # rubocop:disable Metrics/ClassLength shared def getResults # rubocop:disable Naming/MethodName Restate.get('results') || [] end @@ -123,6 +138,53 @@ class VirtualObjectCommandInterpreter < Restate::VirtualObject end end raise Restate::TerminalError, 'All commands failed' unless found + + when 'awaitFirstCompleted' + # JS Promise.race semantics — first to settle (success or failure). + # Uses the cooperative-suspension AllCompleted variant via Restate.race. + entries = cmd['commands'].map { |c| create_future_for_command(c) } + futures = entries.map(&:last) + Restate.wait_any(*futures) # ensure at least one is ready + winner = futures.find(&:completed?) + idx = futures.index(winner) + type, future = entries[idx] + result = await_future_result(type, future) + + when 'awaitFirstSucceededOrAllFailed' + # JS Promise.any semantics. The shared-core variant collects failures + # and returns when one succeeds or all have failed. The interpreter + # cares about the winning value; the sleep awaitable resolves to nil, + # so we substitute the 'sleep' marker when that's what won. + entries = cmd['commands'].map { |c| create_future_for_command(c) } + futures = entries.map(&:last) + winning_value = Restate.any(*futures) + winning_type = entries.zip(futures) + .find { |(_t, _f), fut| fut.completed? && safely_equal(fut, winning_value) } + &.first + &.first + result = winning_type == :sleep ? 'sleep' : winning_value + + when 'awaitAllCompleted' + # JS Promise.allSettled semantics — wait for every future and join the + # per-future outcome strings with '|'. Each entry is tagged 'ok:' or + # 'err:' so the assertions can distinguish successes from failures. + entries = cmd['commands'].map { |c| create_future_for_command(c) } + futures = entries.map(&:last) + Restate.all_settled(*futures) + parts = entries.map do |type, future| + "ok:#{await_future_result(type, future)}" + rescue Restate::TerminalError => e + "err:#{e.message}" + end + result = parts.join('|') + + when 'awaitAllSucceededOrFirstFailed' + # JS Promise.all semantics — short-circuit on first terminal failure; + # otherwise return all values joined with '|'. + entries = cmd['commands'].map { |c| create_future_for_command(c) } + futures = entries.map(&:last) + Restate.all(*futures) # raises on first failure + result = entries.map { |type, future| await_future_result(type, future).to_s }.join('|') end last_results = Restate.get('results') || [] From 3eca124e8ec8ae3482ffb73c398105ed4b87b3ab Mon Sep 17 00:00:00 2001 From: igalshilman Date: Fri, 22 May 2026 11:23:35 +0200 Subject: [PATCH 2/3] Make combinators lazy, advertise protocol v7 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Restate.all/race/any/all_settled now return a CombinedFuture instead of blocking. The future carries a tree shape (variant + children) that the shared-core uses for cooperative-suspension decisions and that composes naturally — e.g. Restate.race(Restate.all(a, b), c).await. - CombinedFuture caches both values and TerminalErrors. The all-succeeded variant scans settled children to surface short-circuit failures so it doesn't block awaiting children still in flight after one fails. - Moved Server::Context#wait_combined above the private boundary so CombinedFuture can drive it via @ctx. - Bumped maxProtocolVersion 6 → 7 (wire bits already pass through the native ext; v7 will activate once the runtime negotiates it). - Interpreter, harness specs, example, RBS and docs updated to .await. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/INTERNALS.md | 2 +- docs/USER_GUIDE.md | 48 +++++--- examples/service_communication.rb | 12 +- lib/restate/discovery.rb | 2 +- lib/restate/durable_future.rb | 116 ++++++++++++++++++ lib/restate/server/context.rb | 73 ++++------- sig/restate.rbs | 28 +++-- spec/harness_spec.rb | 41 ++++++- .../virtual_object_command_interpreter.rb | 16 ++- 9 files changed, 243 insertions(+), 95 deletions(-) diff --git a/docs/INTERNALS.md b/docs/INTERNALS.md index d415087..afa9992 100644 --- a/docs/INTERNALS.md +++ b/docs/INTERNALS.md @@ -285,7 +285,7 @@ Generates the JSON manifest returned at `GET /discover`. Maps internal types to - Service kinds: `service`→`SERVICE`, `object`→`VIRTUAL_OBJECT`, `workflow`→`WORKFLOW` - Handler kinds: `exclusive`→`EXCLUSIVE`, `shared`→`SHARED`, `workflow`→`WORKFLOW` - Protocol mode: `bidi`→`BIDI_STREAM`, `request_response`→`REQUEST_RESPONSE` -- Protocol versions: min=5, max=5 +- Protocol versions: min=5, max=7 ### Test Harness (`lib/restate/testing.rb`) diff --git a/docs/USER_GUIDE.md b/docs/USER_GUIDE.md index 62d3234..fd661f4 100644 --- a/docs/USER_GUIDE.md +++ b/docs/USER_GUIDE.md @@ -371,37 +371,43 @@ futures = tasks.map { |t| Restate.service_call(Worker, :process, t) } results = futures.map(&:await) ``` -### Combinators (`Restate.all`, `Restate.race`, `Restate.wait_any`) +### Combinators (`Restate.all`, `Restate.race`, `Restate.any`, `Restate.all_settled`) -The SDK ships three ways to wait on multiple durable futures. All three are -*cooperative*: the shared-core sees the combinator shape and uses it to make -better suspension decisions during replay. - -**`Restate.all(*futures) -> Array`** — wait for every future to settle. -Returns the values in input order. Short-circuits on the first -`TerminalError` (any in-flight futures remain in the journal — they're -durable, so they don't need cancellation). Semantics match JS `Promise.all`. +The combinator methods return lazy `CombinedFuture` objects — nothing blocks +until you call `.await`. Because they're lazy futures, they **compose**: you +can pass one combinator into another, and the shared-core sees the full tree +and uses the combinator shape to make better suspension decisions. ```ruby +# Single-level: wait for all three calls. results = Restate.all( Restate.service_call(ServiceA, :work, arg1), Restate.service_call(ServiceB, :work, arg2), Restate.run('local-step') { compute() } -) +).await + +# Compose: race an all-of group against a deadline. +group = Restate.all(future_a, future_b, future_c) +Restate.race(group, Restate.sleep(30)).await ``` -`Restate.all` accepts either splat futures or a single Array — both forms -behave identically. +All four accept either splat futures or a single `Array` and return a +`CombinedFuture`. Call `.await` to resolve. -**`Restate.race(*futures) -> value`** — wait for the first future to -settle and return its value. Raises if the winning future failed. Useful -for timeouts: +| Method | JS analog | Awaited result | +|-------------------------|-------------------------|-------------------------------------------------------------------| +| `Restate.all(*fs)` | `Promise.all` | Array of values in input order; raises on first `TerminalError` | +| `Restate.race(*fs)` | `Promise.race` | Value of the first future to settle (raises if it failed) | +| `Restate.any(*fs)` | `Promise.any` | Value of the first success; raises only if every future failed | +| `Restate.all_settled(*fs)` | `Promise.allSettled` | Array of `{status: :fulfilled, value:}` / `{status: :rejected, reason:}` | + +Common pattern — timeout via `race`: ```ruby result = Restate.race( Restate.service_call(Worker, :process, task), Restate.sleep(30) # 30-second deadline -) +).await ``` **`Restate.wait_any(*futures) -> [completed, remaining]`** — lower-level @@ -1307,10 +1313,12 @@ Restate.peek_promise(name) -> value | nil Restate.resolve_promise(name, payload) Restate.reject_promise(name, message, code: 500) -# Futures -Restate.all(*futures) -> Array -Restate.race(*futures) -> value -Restate.wait_any(*futures) -> [completed, remaining] +# Futures (combinators are lazy — call .await to resolve) +Restate.all(*futures) -> CombinedFuture (awaits to Array) +Restate.race(*futures) -> CombinedFuture (awaits to value) +Restate.any(*futures) -> CombinedFuture (awaits to first successful value) +Restate.all_settled(*futures) -> CombinedFuture (awaits to Array of outcome Hashes) +Restate.wait_any(*futures) -> [completed, remaining] # eager, not a future # Metadata Restate.request -> Request{id, headers, body} diff --git a/examples/service_communication.rb b/examples/service_communication.rb index e7c54ed..b4b4a39 100644 --- a/examples/service_communication.rb +++ b/examples/service_communication.rb @@ -54,13 +54,13 @@ class FanOut < Restate::Service # short-circuiting with a TerminalError if any one call fails. handler def all(tasks) futures = tasks.map { |task| Worker.call.process(task) } - Restate.all(*futures) + Restate.all(*futures).await end # Race two calls and return the first to settle. handler def race(tasks) futures = tasks.map { |task| Worker.call.process(task) } - Restate.race(*futures) + Restate.race(*futures).await end # Race a call against a sleep — handy for hard deadlines. @@ -68,7 +68,13 @@ class FanOut < Restate::Service Restate.race( Worker.call.process(task), Restate.sleep(30) # 30-second deadline (winner is `nil` if the timer fires) - ) + ).await + end + + # Compose combinators: race an all-of group against a deadline. + handler def composed(tasks) + group = Restate.all(*tasks.map { |t| Worker.call.process(t) }) + Restate.race(group, Restate.sleep(30)).await end # Awakeable: pause until an external system resolves the callback. diff --git a/lib/restate/discovery.rb b/lib/restate/discovery.rb index c136f72..d8db808 100644 --- a/lib/restate/discovery.rb +++ b/lib/restate/discovery.rb @@ -41,7 +41,7 @@ def compute_discovery(endpoint, discovered_as) compact( protocolMode: protocol_mode, minProtocolVersion: 5, - maxProtocolVersion: 6, + maxProtocolVersion: 7, services: services ) end diff --git a/lib/restate/durable_future.rb b/lib/restate/durable_future.rb index c841b0c..495f383 100644 --- a/lib/restate/durable_future.rb +++ b/lib/restate/durable_future.rb @@ -102,6 +102,122 @@ def cancel end end + # A lazy combinator over one or more child futures. The child set can mix + # +DurableFuture+ leaves (with raw handles) and nested +CombinedFuture+ nodes. + # The shared-core uses the tree shape to make suspension decisions; nothing + # blocks until +.await+ is called, so combinators are composable: + # + # Restate.race(Restate.all(a, b), c).await + # + # Supported variants (mirroring +UnresolvedFuture+ in restate-sdk-shared-core): + # :first_completed → JS Promise.race + # :all_succeeded_or_first_failed → JS Promise.all + # :first_succeeded_or_all_failed → JS Promise.any + # :all_completed → JS Promise.allSettled + class CombinedFuture + VALID_VARIANTS = %i[ + first_completed + all_succeeded_or_first_failed + first_succeeded_or_all_failed + all_completed + ].freeze + + def initialize(ctx, variant, children) + raise ArgumentError, "unknown combinator variant: #{variant}" unless VALID_VARIANTS.include?(variant) + + @ctx = ctx + @variant = variant + @children = children + @resolved = false + @value = nil + @error = nil + end + + # Recursive tree representation the native +do_await+ binding consumes. + # Leaves are integer handles; inner nodes are +[variant, [child...]]+ pairs. + def tree + [@variant, @children.map { |c| c.is_a?(CombinedFuture) ? c.tree : c.handle }] + end + + # Block until this combinator settles per its variant. Caches results + # (including failures) across calls. + def await + resolve_combined! unless @resolved + raise @error if @error + + @value + end + + # Non-blocking introspection. True iff calling +.await+ is guaranteed not to + # block. Conservative for the variants that allow early-completion on failure + # (+:all_succeeded_or_first_failed+, +:first_succeeded_or_all_failed+) — we + # report false until every child is settled, because checking failure status + # of a leaf would require consuming its notification. + def completed? + return true if @resolved + + case @variant + when :first_completed + @children.any?(&:completed?) + else + @children.all?(&:completed?) + end + end + + private + + def resolve_combined! + @ctx.wait_combined(tree) + @value = finalize_value + rescue TerminalError => e + @error = e + ensure + @resolved = true + end + + def finalize_value + case @variant + when :first_completed then finalize_first_completed + when :all_succeeded_or_first_failed then finalize_all_succeeded + when :all_completed then finalize_all_completed + when :first_succeeded_or_all_failed then finalize_first_succeeded + end + end + + def finalize_first_completed + @children.find(&:completed?).await + end + + def finalize_all_succeeded + # Surface any settled-and-failed child first (short-circuit). After this + # scan, if no failure was raised, every child must be settled-and-success + # per AllSucceededOrFirstFailed semantics. + @children.each { |c| c.await if c.completed? } + @children.map(&:await) + end + + def finalize_all_completed + @children.map do |c| + { status: :fulfilled, value: c.await } + rescue TerminalError => e + { status: :rejected, reason: e } + end + end + + def finalize_first_succeeded + errors = [] # : Array[TerminalError] + @children.each do |c| + next unless c.completed? + + return c.await + rescue TerminalError => e + errors << e + end + raise TerminalError.new("all futures failed: #{errors.map(&:message).join('; ')}", + status_code: 500) + end + end + # A handle for fire-and-forget send operations. # Returned by +ctx.service_send+, +ctx.object_send+, +ctx.workflow_send+. class SendHandle diff --git a/lib/restate/server/context.rb b/lib/restate/server/context.rb index a649314..ab1fc53 100644 --- a/lib/restate/server/context.rb +++ b/lib/restate/server/context.rb @@ -169,66 +169,42 @@ def wait_any(*futures) futures.partition(&:completed?) end - # Wait for all futures to complete; return their values in input order. + # Build a lazy combinator over the given futures and return it as a + # +CombinedFuture+. Nothing blocks until +.await+ is called, so the + # combinators compose: +Restate.race(Restate.all(a, b), c).await+. # - # Semantics match JS +Promise.all+: short-circuit on the first +TerminalError+, - # leaving any still-pending futures in the journal. The shared-core uses the - # combinator shape to decide when to suspend, so this is more precise than a - # naive loop over individual awaits. + # Semantics match JS +Promise.all+ — when awaited, returns the values in + # input order, short-circuiting on the first +TerminalError+. def all(*futures) futures = futures.first if futures.length == 1 && futures.first.is_a?(Array) - return [] if futures.empty? - - wait_combined([:all_succeeded_or_first_failed, futures.map(&:handle)]) - futures.map(&:await) + CombinedFuture.new(self, :all_succeeded_or_first_failed, futures) end - # Wait for the first future to settle; return its value. Raises +TerminalError+ - # if the winning future failed. Semantics match JS +Promise.race+. + # Lazy combinator. Awaiting returns the value of the first future to settle, + # or raises if it settled with a +TerminalError+. Matches JS +Promise.race+. def race(*futures) futures = futures.first if futures.length == 1 && futures.first.is_a?(Array) raise ArgumentError, 'race requires at least one future' if futures.empty? - wait_combined([:first_completed, futures.map(&:handle)]) - futures.find(&:completed?).await + CombinedFuture.new(self, :first_completed, futures) end - # Wait for the first future to succeed; return its value. Raises a +TerminalError+ - # only if every future fails. Semantics match JS +Promise.any+. + # Lazy combinator. Awaiting returns the value of the first successful future; + # raises only if every future fails terminally. Matches JS +Promise.any+. def any(*futures) futures = futures.first if futures.length == 1 && futures.first.is_a?(Array) raise ArgumentError, 'any requires at least one future' if futures.empty? - wait_combined([:first_succeeded_or_all_failed, futures.map(&:handle)]) - - errors = [] - futures.each do |f| - next unless f.completed? - - begin - return f.await - rescue TerminalError => e - errors << e - end - end - raise TerminalError.new("all futures failed: #{errors.map(&:message).join('; ')}", - status_code: 500) + CombinedFuture.new(self, :first_succeeded_or_all_failed, futures) end - # Wait for every future to settle and return outcome descriptors, in input order. - # Each entry is +{ status: :fulfilled, value: ... }+ or - # +{ status: :rejected, reason: TerminalError }+. Semantics match JS +Promise.allSettled+. + # Lazy combinator. Awaiting waits for every future to settle and returns + # an Array of +{ status: :fulfilled, value: ... }+ or + # +{ status: :rejected, reason: TerminalError }+ entries, in input order. + # Matches JS +Promise.allSettled+. def all_settled(*futures) futures = futures.first if futures.length == 1 && futures.first.is_a?(Array) - return [] if futures.empty? - - wait_combined([:all_completed, futures.map(&:handle)]) - - futures.map do |f| - { status: :fulfilled, value: f.await } - rescue TerminalError => e - { status: :rejected, reason: e } - end + CombinedFuture.new(self, :all_completed, futures) end # ── Durable run (side effect) ── @@ -463,6 +439,14 @@ def key @invocation.key end + # Drive progress over a combinator tree. Returns when the combinator + # logically completes (the shared-core decides based on the tree shape). + # +future_tree+ follows the encoding documented in lib/restate/vm.rb#do_await. + # Public so +CombinedFuture#await+ can drive it via +@ctx.wait_combined+. + def wait_combined(future_tree) + progress_loop { @vm.do_await(future_tree) } + end + private # ── Progress loop ── @@ -477,13 +461,6 @@ def poll_or_cancel(handles) progress_loop { @vm.do_progress(handles) } end - # Drive progress over a combinator tree. Returns when the combinator - # logically completes (the shared-core decides based on the tree shape). - # +future_tree+ follows the encoding documented in lib/restate/vm.rb#do_await. - def wait_combined(future_tree) - progress_loop { @vm.do_await(future_tree) } - end - # Shared progress-loop body for the flat (do_progress) and tree (do_await) # entry points. The block makes one VM call per iteration and returns the # response; this loop interprets it. diff --git a/sig/restate.rbs b/sig/restate.rbs index 84f658f..de820e8 100644 --- a/sig/restate.rbs +++ b/sig/restate.rbs @@ -60,14 +60,10 @@ module Restate # ── Futures / Metadata / Control ── def self.wait_any: (*DurableFuture futures) -> [Array[DurableFuture], Array[DurableFuture]] - def self.all: (*DurableFuture futures) -> Array[untyped] - | (Array[DurableFuture] futures) -> Array[untyped] - def self.race: (*DurableFuture futures) -> untyped - | (Array[DurableFuture] futures) -> untyped - def self.any: (*DurableFuture futures) -> untyped - | (Array[DurableFuture] futures) -> untyped - def self.all_settled: (*DurableFuture futures) -> Array[Hash[Symbol, untyped]] - | (Array[DurableFuture] futures) -> Array[Hash[Symbol, untyped]] + def self.all: (*untyped futures) -> CombinedFuture + def self.race: (*untyped futures) -> CombinedFuture + def self.any: (*untyped futures) -> CombinedFuture + def self.all_settled: (*untyped futures) -> CombinedFuture def self.request: () -> untyped def self.key: () -> String def self.cancel_invocation: (String invocation_id) -> void @@ -114,6 +110,22 @@ module Restate def resolve_call!: () -> void end + class CombinedFuture + VALID_VARIANTS: Array[Symbol] + def initialize: (untyped ctx, Symbol variant, Array[untyped] children) -> void + def tree: () -> untyped + def await: () -> untyped + def completed?: () -> bool + + private + def resolve_combined!: () -> void + def finalize_value: () -> untyped + def finalize_first_completed: () -> untyped + def finalize_all_succeeded: () -> Array[untyped] + def finalize_all_completed: () -> Array[Hash[Symbol, untyped]] + def finalize_first_succeeded: () -> untyped + end + class SendHandle def initialize: (untyped ctx, Integer invocation_id_handle) -> void def invocation_id: () -> String diff --git a/spec/harness_spec.rb b/spec/harness_spec.rb index 325238c..8eb3d5b 100644 --- a/spec/harness_spec.rb +++ b/spec/harness_spec.rb @@ -176,29 +176,45 @@ class TestFluentOrchestrator < Restate::Service class TestCombinators < Restate::Service handler def all_runs(_input) futures = (1..3).map { |i| Restate.run("step-#{i}") { i * 10 } } - Restate.all(*futures) + Restate.all(*futures).await end handler def all_empty(_input) - Restate.all + Restate.all.await end handler def race_runs(_input) fast = Restate.run('fast') { 'fast-result' } slow = Restate.run('slow') { 'slow-result' } - Restate.race(fast, slow) + Restate.race(fast, slow).await end handler def race_sleep_vs_value(_input) quick = Restate.run('quick') { 'value' } slow_timer = Restate.sleep(60) - Restate.race(quick, slow_timer) + Restate.race(quick, slow_timer).await end handler def all_short_circuits(_input) ok = Restate.run('ok') { 'ok' } bad = Restate.run('bad') { raise Restate::TerminalError.new('boom', status_code: 418) } - Restate.all(ok, bad) + Restate.all(ok, bad).await + end + + # Composes a race of an all-combinator and a sleep — exercises the tree being + # passed end-to-end through the shared-core's cooperative-suspension logic. + handler def race_of_all_vs_sleep(_input) + a = Restate.run('a') { 'A' } + b = Restate.run('b') { 'B' } + inner = Restate.all(a, b) # CombinedFuture + Restate.race(inner, Restate.sleep(60)).await + end + + # all-of-races: each inner race resolves quickly, outer all returns both. + handler def all_of_races(_input) + left = Restate.race(Restate.run('l1') { 'L1' }, Restate.run('l2') { 'L2' }) + right = Restate.race(Restate.run('r1') { 'R1' }, Restate.run('r2') { 'R2' }) + Restate.all(left, right).await end end @@ -492,6 +508,21 @@ def attach_invocation(base_url, invocation_id) expect(response.body).to include('boom') end + it "composes race(all, sleep) — the inner all wins" do + response = post_json(@harness.ingress_url, "/TestCombinators/race_of_all_vs_sleep", nil) + expect(response.code).to eq("200") + expect(JSON.parse(response.body)).to eq(%w[A B]) + end + + it "composes all(race, race) — both inner races resolve" do + response = post_json(@harness.ingress_url, "/TestCombinators/all_of_races", nil) + expect(response.code).to eq("200") + body = JSON.parse(response.body) + expect(body.length).to eq(2) + expect(%w[L1 L2]).to include(body[0]) + expect(%w[R1 R2]).to include(body[1]) + end + it "delivers two independently named signals" do inv_id = send_async(@harness.ingress_url, "/TestSignal/wait_for_two", nil) diff --git a/test-services/services/virtual_object_command_interpreter.rb b/test-services/services/virtual_object_command_interpreter.rb index e494ed0..f0ed1c0 100644 --- a/test-services/services/virtual_object_command_interpreter.rb +++ b/test-services/services/virtual_object_command_interpreter.rb @@ -141,23 +141,21 @@ class VirtualObjectCommandInterpreter < Restate::VirtualObject # rubocop:disable when 'awaitFirstCompleted' # JS Promise.race semantics — first to settle (success or failure). - # Uses the cooperative-suspension AllCompleted variant via Restate.race. entries = cmd['commands'].map { |c| create_future_for_command(c) } futures = entries.map(&:last) - Restate.wait_any(*futures) # ensure at least one is ready + Restate.race(*futures).await winner = futures.find(&:completed?) idx = futures.index(winner) type, future = entries[idx] result = await_future_result(type, future) when 'awaitFirstSucceededOrAllFailed' - # JS Promise.any semantics. The shared-core variant collects failures - # and returns when one succeeds or all have failed. The interpreter - # cares about the winning value; the sleep awaitable resolves to nil, - # so we substitute the 'sleep' marker when that's what won. + # JS Promise.any semantics. The interpreter cares about the winning + # value; the sleep awaitable resolves to Void, so we substitute the + # 'sleep' marker when that's what won. entries = cmd['commands'].map { |c| create_future_for_command(c) } futures = entries.map(&:last) - winning_value = Restate.any(*futures) + winning_value = Restate.any(*futures).await winning_type = entries.zip(futures) .find { |(_t, _f), fut| fut.completed? && safely_equal(fut, winning_value) } &.first @@ -170,7 +168,7 @@ class VirtualObjectCommandInterpreter < Restate::VirtualObject # rubocop:disable # 'err:' so the assertions can distinguish successes from failures. entries = cmd['commands'].map { |c| create_future_for_command(c) } futures = entries.map(&:last) - Restate.all_settled(*futures) + Restate.all_settled(*futures).await parts = entries.map do |type, future| "ok:#{await_future_result(type, future)}" rescue Restate::TerminalError => e @@ -183,7 +181,7 @@ class VirtualObjectCommandInterpreter < Restate::VirtualObject # rubocop:disable # otherwise return all values joined with '|'. entries = cmd['commands'].map { |c| create_future_for_command(c) } futures = entries.map(&:last) - Restate.all(*futures) # raises on first failure + Restate.all(*futures).await # raises on first failure result = entries.map { |type, future| await_future_result(type, future).to_s }.join('|') end From 85a5768b2ce24f8eb4aa0bacfc37da9c2ee349c2 Mon Sep 17 00:00:00 2001 From: igalshilman Date: Fri, 22 May 2026 11:41:37 +0200 Subject: [PATCH 3/3] Drop legacy do_progress; unify on do_await do_progress was a thin wrapper that built a FirstCompleted tree from a flat handle list and called the same shared-core entry point. Keep only do_await on both sides of the FFI; Server::Context#poll_or_cancel builds the tree itself before delegating to wait_combined. Single source of truth, ~57 fewer lines. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/INTERNALS.md | 17 ++++--- ext/restate_internal/src/lib.rs | 71 +++------------------------ lib/restate/server/context.rb | 12 +++-- lib/restate/vm.rb | 14 ++---- sorbet/rbi/shims/restate_internal.rbi | 1 - 5 files changed, 29 insertions(+), 86 deletions(-) diff --git a/docs/INTERNALS.md b/docs/INTERNALS.md index afa9992..cb24b9f 100644 --- a/docs/INTERNALS.md +++ b/docs/INTERNALS.md @@ -122,7 +122,7 @@ Thin Ruby wrapper that: 2. Delegates all `sys_*` calls. 3. Maps native result types to Ruby-side types (e.g., `Internal::Suspended` → `Restate::Suspended`, `Internal::Failure` → `Restate::Failure`). -4. Catches `Internal::VMError` from `do_progress`/`take_notification` and returns it as a value +4. Catches `Internal::VMError` from `do_await`/`take_notification` and returns it as a value (not raised), so `Server::Context` can handle it. **Key types defined here:** @@ -364,7 +364,7 @@ loop: flush_output() ──► drain vm.take_output → output_queue │ ▼ - vm.do_progress(handles) + vm.do_await(future_tree) │ ├── AnyCompleted → return (handle is done) ├── ReadFromInput → dequeue from input_queue @@ -415,7 +415,7 @@ vm.take_output → output_queue.enqueue(chunk) ··· output_queue.dequeue → y │ chunk = read_partial │ │ invoke_handler(...) │ │ input_queue << chunk │ │ poll_or_cancel: │ │ ensure: │ │ flush_output → out_q│ -│ input_queue << :eof │ │ do_progress │ +│ input_queue << :eof │ │ do_await │ │ │ │ input_q.dequeue │ │ │ │ drain remaining output │ │ │ │ output_queue << nil │ @@ -432,7 +432,7 @@ The VM is a synchronous state machine. It does not do I/O itself. The SDK drives 2. Check readiness → `is_ready_to_execute()` 3. Get invocation → `sys_input()` 4. Issue syscalls → `sys_get_state`, `sys_run`, `sys_call`, etc. (returns handles) -5. Drive progress → `do_progress(handles)` (tells you what to do next) +5. Drive progress → `do_await(future_tree)` (tells you what to do next) 6. Collect results → `take_notification(handle)` (gets completed values) 7. Drain output → `take_output()` (gets bytes to send back over HTTP/2) 8. Finish → `sys_write_output_success/failure` then `sys_end` @@ -472,7 +472,7 @@ the same fiber chain (which is guaranteed by Async's cooperative scheduling). | `sys_write_output_failure(failure)` | — | Write final handler error | | `sys_end` | — | Finalize invocation | | `is_replaying` | bool | Check if replaying from journal | -| `do_await(future)` | progress | Tree-aware progress driver — see [Combinators](#combinators) | +| `do_await(future)` | progress | Progress driver — accepts an `UnresolvedFuture` tree (or a single handle as the trivial leaf) | --- @@ -499,8 +499,9 @@ the structure into `restate_sdk_shared_core::UnresolvedFuture` and hands it to Why the tree matters: the shared-core uses the combinator shape to decide when the SDK can suspend. For `all`, suspension waits until no pending child can possibly complete; for `race`, suspension waits until none is in flight. The -legacy flat `do_progress(handles)` path still works — it wraps the handle list -into `FirstCompleted` internally — so non-combinator callers stay unchanged. +single-handle convenience path (`Server::Context#poll_or_cancel`) just wraps +its handle list into a `FirstCompleted` subtree and hands it to the same +`do_await` — there is no separate legacy entry point. The Ruby-side public API is in `Restate.all` / `Restate.race` (`lib/restate.rb`), implemented in `Server::Context#all` / `#race` @@ -544,7 +545,7 @@ When user code calls `Restate.run('name') { ... }`: - Enqueues `:run_completed` to `input_queue` to wake the progress loop 6. The progress loop continues and eventually `AnyCompleted` is returned for the handle -**During replay**, the VM already has the result from the journal. `do_progress` returns +**During replay**, the VM already has the result from the journal. `do_await` returns `AnyCompleted` directly — the action block is never executed. ### Background Runs (`background: true`) diff --git a/ext/restate_internal/src/lib.rs b/ext/restate_internal/src/lib.rs index 4f494d9..26aef11 100644 --- a/ext/restate_internal/src/lib.rs +++ b/ext/restate_internal/src/lib.rs @@ -434,52 +434,14 @@ impl RbVM { self.vm.borrow().is_completed(handle.into()) } - fn do_progress(&self, handles: RArray) -> Result { - let ruby = Ruby::get().map_err(|_| Error::new(vm_error_class(), "Ruby not available"))?; - let handle_vec: Vec = handles.to_vec()?; - let future = unresolved_future_from_handles(handle_vec); - - let res = self.vm.borrow_mut().do_await(future); - - match res { - Err(e) if e.is_suspended_error() => Ok(ruby.into_value(RbSuspended)), - Err(e) => Err(core_error_to_magnus(e)), - Ok(AwaitResponse::AnyCompleted) => { - Ok(ruby.into_value(RbDoProgressAnyCompleted)) - } - Ok(AwaitResponse::ExecuteRun(handle)) => Ok(ruby.into_value( - RbDoProgressExecuteRun { - handle: handle.into(), - }, - )), - Ok(AwaitResponse::CancelSignalReceived) => { - Ok(ruby.into_value(RbDoProgressCancelSignalReceived)) - } - Ok(AwaitResponse::WaitingExternalProgress { waiting_input, .. }) => { - // If we still expect more input, surface ReadFromInput so the Ruby side - // dequeues from the input queue (which carries body chunks, eof, and - // run-completion signals). When input is closed, only a pending run - // can wake us, hence DoWaitForPendingRun. - if waiting_input { - Ok(ruby.into_value(RbDoProgressReadFromInput)) - } else { - Ok(ruby.into_value(RbDoWaitForPendingRun)) - } - } - } - } - - // Cooperative-suspension variant of do_progress. Accepts a tree describing a - // combinator over notification handles; the shared-core uses the tree shape to - // decide when the whole combinator can make progress. - // - // The Ruby-side encoding is: - // Integer handle → Single(handle) - // [:first_completed, [child, ...]] → race / wait_any - // [:all_completed, [child, ...]] → all_settled - // [:first_succeeded_or_all_failed, [child, ...]] → any - // [:all_succeeded_or_first_failed, [child, ...]] → all - // [:unknown, [child, ...]] → unknown combinator + // Drive the VM forward against an UnresolvedFuture tree. The Ruby-side + // encoding is: + // Integer handle → Single(handle) + // [:first_completed, [child, ...]] → race / wait_any + // [:all_completed, [child, ...]] → all_settled + // [:first_succeeded_or_all_failed,[child, ...]] → any + // [:all_succeeded_or_first_failed,[child, ...]] → all + // [:unknown, [child, ...]] → unknown combinator fn do_await(&self, future_value: Value) -> Result { let ruby = Ruby::get().map_err(|_| Error::new(vm_error_class(), "Ruby not available"))?; let future = parse_unresolved_future(future_value)?; @@ -1023,22 +985,6 @@ fn parse_unresolved_future(value: Value) -> Result { } } -// Wraps a flat list of notification handles into the tree-shaped UnresolvedFuture -// the shared-core now expects. Mirrors the legacy "wait for any of these" semantics: -// a single handle becomes Single, multiple handles become FirstCompleted(Single, ...). -fn unresolved_future_from_handles(handles: Vec) -> UnresolvedFuture { - match handles.as_slice() { - [] => UnresolvedFuture::FirstCompleted(Vec::new()), - [h] => UnresolvedFuture::Single(NotificationHandle::from(*h)), - _ => UnresolvedFuture::FirstCompleted( - handles - .into_iter() - .map(|h| UnresolvedFuture::Single(NotificationHandle::from(h))) - .collect(), - ), - } -} - fn parse_headers_array(ary: RArray) -> Result, Error> { let mut result = Vec::new(); for item in ary.into_iter() { @@ -1206,7 +1152,6 @@ fn init(ruby: &Ruby) -> Result<(), Error> { vm_class.define_method("take_output", method!(RbVM::take_output, 0))?; vm_class.define_method("is_ready_to_execute", method!(RbVM::is_ready_to_execute, 0))?; vm_class.define_method("is_completed", method!(RbVM::is_completed, 1))?; - vm_class.define_method("do_progress", method!(RbVM::do_progress, 1))?; vm_class.define_method("do_await", method!(RbVM::do_await, 1))?; vm_class.define_method("take_notification", method!(RbVM::take_notification, 1))?; vm_class.define_method("sys_input", method!(RbVM::sys_input, 0))?; diff --git a/lib/restate/server/context.rb b/lib/restate/server/context.rb index ab1fc53..6e624eb 100644 --- a/lib/restate/server/context.rb +++ b/lib/restate/server/context.rb @@ -457,13 +457,17 @@ def poll_and_take(handle, &) must_take_notification(handle, &) end + # Wait for any of the given handles to complete. Wraps the flat handle + # list into a +FirstCompleted+ subtree (or a +Single+ leaf for one handle) + # and drives the VM via +do_await+ — same path as +wait_combined+. def poll_or_cancel(handles) - progress_loop { @vm.do_progress(handles) } + tree = handles.length == 1 ? handles.first : [:first_completed, handles] + wait_combined(tree) end - # Shared progress-loop body for the flat (do_progress) and tree (do_await) - # entry points. The block makes one VM call per iteration and returns the - # response; this loop interprets it. + # Shared progress-loop body for both the simple poll_or_cancel and the + # tree-aware wait_combined. The block makes one VM call per iteration and + # returns the response; this loop interprets it. def progress_loop loop do flush_output diff --git a/lib/restate/vm.rb b/lib/restate/vm.rb index 0d1af9d..9260676 100644 --- a/lib/restate/vm.rb +++ b/lib/restate/vm.rb @@ -90,16 +90,10 @@ def is_completed(handle) @vm.is_completed(handle) end - def do_progress(handles) - result = @vm.do_progress(handles) - map_do_progress(result) - rescue Internal::VMError => e - e - end - - # Tree-aware variant of do_progress. +future+ is either an Integer handle (Single) - # or a [tag_symbol, [children...]] pair (combinator). See lib/restate/combinator.rb - # and the Rust-side parse_unresolved_future for the supported tags. + # Drive the VM forward against an +UnresolvedFuture+ tree. +future+ is either + # an Integer handle (Single leaf) or a +[tag_symbol, [children...]]+ pair + # (combinator node). See the Rust-side +parse_unresolved_future+ for the + # supported tags. def do_await(future) result = @vm.do_await(future) map_do_progress(result) diff --git a/sorbet/rbi/shims/restate_internal.rbi b/sorbet/rbi/shims/restate_internal.rbi index e216e0f..d0f3748 100644 --- a/sorbet/rbi/shims/restate_internal.rbi +++ b/sorbet/rbi/shims/restate_internal.rbi @@ -13,7 +13,6 @@ module Restate def notify_input_closed; end def is_ready_to_execute?; end def sys_input; end - def do_progress(handles); end def do_await(future); end def take_output; end def take_notification(handle); end