Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/dependency-submission.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,9 @@ jobs:
with:
distribution: 'temurin'
java-version: 17
- name: Install Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
target: wasm32-unknown-unknown
- name: Generate and submit dependency graph
uses: gradle/actions/dependency-submission@v4
5 changes: 5 additions & 0 deletions .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ jobs:
- name: Setup Gradle
uses: gradle/actions/setup-gradle@v4

- name: Install Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
target: wasm32-unknown-unknown

- name: Log into GitHub container registry
uses: docker/login-action@v2
with:
Expand Down
8 changes: 7 additions & 1 deletion .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ jobs:
if: ${{ inputs.serviceImage == '' }}
uses: gradle/actions/setup-gradle@v4

- name: Install Rust toolchain
if: ${{ inputs.serviceImage == '' }}
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
target: wasm32-unknown-unknown

- name: Build restatedev/test-services-java image
if: ${{ inputs.serviceImage == '' }}
run: ./gradlew -Djib.console=plain :test-services:jibDockerBuild
Expand All @@ -135,7 +141,7 @@ jobs:

- name: Run test tool
continue-on-error: ${{ inputs.continueOnError == 'true' }}
uses: restatedev/e2e/sdk-tests@v1.0
uses: restatedev/e2e/sdk-tests@v2.1
with:
envVars: ${{ inputs.envVars }}
testArtifactOutput: ${{ inputs.testArtifactOutput != '' && inputs.testArtifactOutput || 'sdk-java-integration-test-report' }}
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/release-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ jobs:
java-version: '21'
- name: Setup Gradle
uses: gradle/actions/setup-gradle@v4
- name: Install Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
target: wasm32-unknown-unknown

- name: Build Javadocs
run: gradle :sdk-aggregated-javadocs:javadoc
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ jobs:
- name: Setup Gradle
uses: gradle/actions/setup-gradle@v4

- name: Install Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
target: wasm32-unknown-unknown

# Retrieve the version of the SDK
- name: Install dasel
run: curl -sSLf "$(curl -sSLf https://api.github.com/repos/tomwright/dasel/releases/latest | grep browser_download_url | grep linux_amd64 | grep -v .gz | cut -d\" -f 4)" -L -o dasel && chmod +x dasel && mv ./dasel /usr/local/bin/dasel
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ jobs:
- name: Setup Gradle
uses: gradle/actions/setup-gradle@v4

- name: Install Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
target: wasm32-unknown-unknown

- name: Pull Restate docker image
run: docker pull ghcr.io/restatedev/restate:main

Expand Down Expand Up @@ -54,6 +59,10 @@ jobs:
java-version: '21'
- name: Setup Gradle
uses: gradle/actions/setup-gradle@v4
- name: Install Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
target: wasm32-unknown-unknown

- name: Build Javadocs
run: gradle :sdk-aggregated-javadocs:javadoc
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ build
kls_database.db
.kotlin

.restate
.restate
/sdk-core/src/main/rust/target/
40 changes: 26 additions & 14 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,6 @@
[libraries.opentelemetry-kotlin.version]
ref = 'opentelemetry'

[libraries.protobuf-java]
module = 'com.google.protobuf:protobuf-java'

[libraries.protobuf-java.version]
ref = 'protobuf'

[libraries.protobuf-kotlin]
module = 'com.google.protobuf:protobuf-kotlin'

[libraries.protobuf-kotlin.version]
ref = 'protobuf'

[libraries.schema-kenerator-core]
module = 'io.github.smiley4:schema-kenerator-core'

Expand Down Expand Up @@ -213,15 +201,39 @@
[libraries.victools-jsonschema-module-jackson.version]
ref = 'victools-json-schema'

[libraries.chicory-runtime]
module = 'com.dylibso.chicory:runtime'

[libraries.chicory-runtime.version]
ref = 'chicory'

[libraries.chicory-annotations]
module = 'com.dylibso.chicory:annotations'

[libraries.chicory-annotations.version]
ref = 'chicory'

[libraries.chicory-annotations-processor]
module = 'com.dylibso.chicory:annotations-processor'

[libraries.chicory-annotations-processor.version]
ref = 'chicory'

[libraries.jackson-cbor]
module = 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor'

[libraries.jackson-cbor.version]
ref = 'jackson'

[plugins]
wasm2class = 'at.released.wasm2class.plugin:0.5.0'
aggregate-javadoc = 'io.freefair.aggregate-javadoc:8.14'
dependency-license-report = 'com.github.jk1.dependency-license-report:2.9'
dokka = 'org.jetbrains.dokka:1.9.20'
jib = 'com.google.cloud.tools.jib:3.4.5'
jsonschema2pojo = 'org.jsonschema2pojo:1.2.2'
nexus-publish = 'io.github.gradle-nexus.publish-plugin:1.3.0'
openapi-generator = 'org.openapi.generator:7.17.0'
protobuf = 'com.google.protobuf:0.9.4'
shadow = 'com.gradleup.shadow:9.0.0-beta8'
spotless = 'com.diffplug.spotless:7.2.1'
spring-dependency-management = 'io.spring.dependency-management:1.1.6'
Expand All @@ -233,14 +245,14 @@
ref = 'ksp'

[versions]
chicory = '1.7.5'
jackson = '2.19.4'
junit = '5.14.1'
kotlinx-coroutines = '1.10.2'
kotlinx-serialization = '1.9.0'
ksp = '2.2.10-2.0.2'
log4j = '2.24.3'
opentelemetry = '1.58.0'
protobuf = '4.29.3'
restate = '2.8.0-SNAPSHOT'
schema-kenerator = '2.1.2'
spring-boot = '3.4.13'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ internal constructor(
)
.await()

object : BaseInvocationHandle<Res>(handlerContext, responseSerde) {
object : BaseInvocationHandle<Res>(this, responseSerde) {
override suspend fun invocationId(): String = invocationIdAsyncResult.poll().await()
}
}
Expand All @@ -134,7 +134,7 @@ internal constructor(
responseTypeTag: TypeTag<Res>,
): InvocationHandle<Res> =
resolveSerde<Res>(responseTypeTag).let { responseSerde ->
object : BaseInvocationHandle<Res>(handlerContext, responseSerde) {
object : BaseInvocationHandle<Res>(this, responseSerde) {
override suspend fun invocationId(): String = invocationId
}
}
Expand Down Expand Up @@ -193,6 +193,14 @@ internal constructor(
return AwakeableHandleImpl(this, id)
}

override suspend fun <T : Any> signal(name: String, typeTag: TypeTag<T>): DurableFuture<T> {
checkNotInsideRun()
val serde: Serde<T> = resolveSerde(typeTag)
return SingleDurableFutureImpl(handlerContext.signal(name).await()).simpleMap {
serde.deserialize(it)
}
}

override fun random(): RestateRandom {
return this.random
}
Expand Down
72 changes: 72 additions & 0 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,21 @@ sealed interface Context {
*/
fun awakeableHandle(id: String): AwakeableHandle

/**
* Create a [DurableFuture] waiting on a named signal targeting the current invocation.
*
* Signals are identified by `(invocationId, name)`. The resolution can arrive before or after the
* handler starts waiting on the signal — there's no need to pre-register.
*
* Another invocation can resolve or reject the signal using [signalHandle].
*
* @param name the signal name.
* @param typeTag the response type tag to use for deserializing the signal result.
* @return a [DurableFuture] that resolves to the signal value (or rejects with a
* [dev.restate.sdk.common.TerminalException]).
*/
suspend fun <T : Any> signal(name: String, typeTag: TypeTag<T>): DurableFuture<T>

/**
* Create a [RestateRandom] instance inherently predictable, seeded on the
* [dev.restate.sdk.common.InvocationId], which is not secret.
Expand Down Expand Up @@ -336,6 +351,15 @@ suspend inline fun <reified T : Any> Context.awakeable(): Awakeable<T> {
return this.awakeable(typeTag<T>())
}

/**
* Create a [DurableFuture] waiting on a named signal targeting the current invocation.
*
* @see Context.signal
*/
suspend inline fun <reified T : Any> Context.signal(name: String): DurableFuture<T> {
return this.signal(name, typeTag<T>())
}

/**
* This interface can be used only within shared handlers of virtual objects. It extends [Context]
* adding access to the virtual object instance key-value state storage.
Expand Down Expand Up @@ -629,6 +653,14 @@ sealed interface InvocationHandle<Res : Any?> {

/** @return the output of this invocation, if present. */
suspend fun output(): Output<Res>

/**
* Get a [SignalHandle] for resolving or rejecting a named signal on this invocation. The
* receiving handler can await on the signal using [Context.signal].
*
* @param name the signal name.
*/
suspend fun signal(name: String): SignalHandle
}

/**
Expand Down Expand Up @@ -677,6 +709,35 @@ suspend inline fun <reified T : Any> AwakeableHandle.resolve(payload: T) {
return this.resolve(typeTag<T>(), payload)
}

/**
* Handle to resolve or reject a named signal on a target invocation.
*
* Unlike awakeables, signals are identified by `(invocationId, name)` and do not need to be
* pre-registered: the resolution can arrive before or after the handler starts waiting.
*/
sealed interface SignalHandle {
/**
* Resolve the signal with the given value.
*
* @param typeTag used to serialize the result payload.
* @param payload the result payload.
*/
suspend fun <T : Any> resolve(typeTag: TypeTag<T>, payload: T)

/**
* Reject the signal with the given reason. The handler awaiting the signal will receive a
* terminal error with [reason] as the message.
*
* @param reason the rejection reason.
*/
suspend fun reject(reason: String)
}

/** Resolve the signal with the given value. */
suspend inline fun <reified T : Any> SignalHandle.resolve(payload: T) {
return this.resolve(typeTag<T>(), payload)
}

/**
* A [DurablePromise] is a durable, distributed version of a Kotlin's Deferred, or more commonly of
* a future/promise. Restate keeps track of the [DurablePromise] across restarts/failures.
Expand Down Expand Up @@ -965,6 +1026,17 @@ suspend fun awakeableHandle(id: String): AwakeableHandle {
return context().awakeableHandle(id)
}

/**
* Create a [DurableFuture] waiting on a named signal targeting the current invocation.
*
* @throws IllegalStateException if called outside of a Restate handler
* @see Context.signal
*/
@org.jetbrains.annotations.ApiStatus.Experimental
suspend inline fun <reified T : Any> signal(name: String): DurableFuture<T> {
return context().signal(name, typeTag<T>())
}

/**
* Get an [InvocationHandle] for an already existing invocation.
*
Expand Down
28 changes: 27 additions & 1 deletion sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/futures.kt
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,12 @@ internal constructor(

internal abstract class BaseInvocationHandle<Res>
internal constructor(
private val handlerContext: HandlerContext,
private val contextImpl: ContextImpl,
private val responseSerde: Serde<Res>,
) : InvocationHandle<Res> {
private val handlerContext: HandlerContext
get() = contextImpl.handlerContext

override suspend fun cancel() {
checkNotInsideRun()
val ignored = handlerContext.cancelInvocation(invocationId()).await()
Expand All @@ -214,6 +217,11 @@ internal constructor(
.simpleMap { it.map { responseSerde.deserialize(it) } }
.await()
}

override suspend fun signal(name: String): SignalHandle {
val resolvedId = invocationId()
return SignalHandleImpl(contextImpl, resolvedId, name)
}
}

internal class AwakeableImpl<T : Any?>
Expand All @@ -237,6 +245,24 @@ internal class AwakeableHandleImpl(val contextImpl: ContextImpl, val id: String)
}
}

internal class SignalHandleImpl(
val contextImpl: ContextImpl,
val invocationId: String,
val name: String,
) : SignalHandle {
override suspend fun <T : Any> resolve(typeTag: TypeTag<T>, payload: T) {
checkNotInsideRun()
contextImpl.handlerContext
.resolveSignal(invocationId, name, contextImpl.resolveAndSerialize(typeTag, payload))
.await()
}

override suspend fun reject(reason: String) {
checkNotInsideRun()
contextImpl.handlerContext.rejectSignal(invocationId, name, TerminalException(reason)).await()
}
}

internal class SelectClauseImpl<T>(override val durableFuture: DurableFuture<T>) : SelectClause<T>

@PublishedApi
Expand Down
Loading
Loading