Skip to content

[Rust] Add stream builder API#155

Merged
teodordelibasic-db merged 3 commits intomainfrom
stream-builder-api
Apr 29, 2026
Merged

[Rust] Add stream builder API#155
teodordelibasic-db merged 3 commits intomainfrom
stream-builder-api

Conversation

@teodordelibasic-db
Copy link
Copy Markdown
Contributor

@teodordelibasic-db teodordelibasic-db commented Mar 23, 2026

What changes are proposed in this pull request?

This PR adds a StreamBuilder API to the Rust SDK that replaces the existing create_stream* family of methods with a fluent builder.

All setters can be called in any order. The builder validates at build() time that table name, authentication, and format have been configured.

let stream = sdk
    .stream_builder()
    .table("catalog.schema.table")
    .oauth("client-id", "client-secret")
    .json()
    .max_inflight_requests(500_000)
    .build()
    .await?;

Motivation:

  • The current create_stream API is a wide parameter list where adding or removing arguments is a breaking change.
  • The builder gives us a stable public surface where individual setters can be added/removed/deprecated independently.
  • This is the first step toward providing similar builder APIs in the wrapper SDKs (Python, Java, Go, TypeScript), each idiomatic for their language.

Design decisions:

  • No typestate, no generics — simple StreamBuilder<'a> with runtime validation at build(). Same pattern as ZerobusSdkBuilder. Keeps the public API surface small (one exported type) and allows setters in any order.
  • table_name is a setter (.table()) not a constructor parameter, consistent with all other configuration.
  • build() for gRPC streams (JSON/Proto), build_arrow() for Arrow streams — separate methods because the return types differ (ZerobusStream vs ZerobusArrowStream).
  • validate() lets users check configuration without opening a stream, useful for fail-fast during startup.
  • #[must_use] on StreamBuilder warns if a builder is created but never built.
  • Deprecated methods are kept functional for backward compatibility and for wrapper SDKs that still depend on them.

How is this tested?

  1. 12 unit tests in stream_builder.rs.
  2. 4 integration tests in rust/tests/src/rust_tests.rs.
  3. CI.

Follow-up items

These are scoped out of this PR and tracked for subsequent work.

  1. Add NoOpHeadersProvider and .no_auth() builder method for local testing.

  2. Add public getter methods on ZerobusStream and ZerobusSdk for fields that are currently public, prepares users for field privatization in version 2.0.0.

  3. Unify StreamConfigurationOptions and ArrowStreamConfigurationOptions shared fields into a common config struct. Currently each shared setter dual-writes to both config structs.

  4. Unify ZerobusStream and ZerobusArrowStream into a single stream type so build() can return one type regardless of format.

  5. Add idiomatic stream builder APIs to wrapper SDKs (Python, Java, Go, TypeScript). Each language should use its own idiomatic pattern (kwargs in Python, traditional builder in Java, functional options in Go, options object in TypeScript).

  6. Rust 2.0 breaking changes: privatize all public fields on ZerobusStream, ZerobusSdk, StreamConfigurationOptions, ArrowStreamConfigurationOptions, TableProperties (add getters). Remove deprecated create_stream* methods, ZerobusSdk::new(), use_tls, ingest_record/ingest_records (non-offset variants), and record_type from config.

Comment thread rust/sdk/src/lib.rs
Comment on lines +283 to +284
pub(crate) workspace_id: String,
pub(crate) tls_config: Arc<dyn TlsConfig>,
Copy link
Copy Markdown
Contributor

@elenagaljak-db elenagaljak-db Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I saw claude put this pub(crate) before in my own code, I don't really know whats the purpose of it, but I think it's not necessary

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's necessary. pub(crate) means "visible within this crate but not to external consumers." These fields were previously private (only accessible within lib.rs), but StreamBuilder lives in a different module (builder/stream_builder.rs) and needs to read self.sdk.workspace_id, self.sdk.tls_config, etc. to construct OAuthHeadersProvider and ZerobusArrowStream. Without pub(crate), the builder can't access them. The same applies to get_or_create_channel_zerobus_client() and ZerobusStream::new_stream().

Comment thread rust/sdk/src/lib.rs Outdated
Comment on lines +337 to +342
pub fn stream_builder(
&self,
table_name: impl Into<String>,
) -> StreamBuilder<'_, NoFormat, NoAuth> {
StreamBuilder::new(self, table_name)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any specific reason why the table name is a method parameter and not another builder method .table() ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh not really except that I don't expect a stream to not need a table name in the future, but it probably may be better to have it as a method as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah its a central part of ingestion. But still I would say its worth verifying the contract with Vicky

Copy link
Copy Markdown
Contributor

@danilonajkov-db danilonajkov-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mark the old methods as depricated?

Comment thread rust/sdk/src/builder/stream_builder.rs Outdated
}

/// Select compiled protobuf record format.
pub fn compiled_proto(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just .proto() (just personal preference, sounds more straightforward to me)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea here was to differentiate between compiled protobuf and dynamic protobuf. Dynamic Protobuf is essentially used to construct a Descriptor in-code, without having to have a .proto file and then manually compile it before running the actual Rust program. Taking this into consideration, I don't have strong feelings regarding the naming, if you think proto and dynamic_proto, if it ever gets added, makes more sense, I can change it.

Comment thread rust/sdk/src/builder/stream_builder.rs Outdated
@@ -0,0 +1,665 @@
//! Typestate builder for creating Zerobus ingestion streams.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some validations from LLM that would be good to add:

  - No runtime assertion that descriptor_proto.is_some() when building
  CompiledProto. If a future change lets descriptor_proto be cleared independently
  of the typestate, this silently constructs a broken stream. Today it's safe
  because the only writers to descriptor_proto are .json() (sets None) and
  .compiled_proto(desc) (sets Some(desc)), and each pins the typestate accordingly — but that invariant is a code convention, not a compiler guarantee. Compare to
  the Arrow path, which does .expect("Arrow format guarantees schema is set")
  (stream_builder.rs:919) for the analogous invariant.

  - No check that self.auth.is_some() before resolve_headers_provider — relies on
  the HasAuth typestate. The .expect("HasAuth guarantees auth is set") at
  stream_builder.rs:852 is the runtime fallback if the type system is ever
  bypassed.

Also might be worth adding a method is_valid_config inside ZerobusStream::new_stream() that checks all invariants. To make sure the customer can't weirdly configure something while this conversion from old -> new stream creation API is happening.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm so I think most of this comment is now outdated since I removed compile-time validations from the builder API. It just during build checks if all the fields are set and if there aren't any invalid combinations. Also validate is available to be called without calling build.

@teodordelibasic-db teodordelibasic-db force-pushed the stream-builder-api branch 4 times, most recently from 020bf5f to c068b2a Compare April 27, 2026 16:24
Comment thread rust/sdk/src/lib.rs
Comment on lines 910 to 922

info!(
table_name = %new_stream.table_name(),
"Successfully recreated Arrow Flight stream"
);
for batch in batches {
let _offset = new_stream.ingest_batch(batch).await?;
}

Ok(new_stream)
Ok(new_stream)
}
Err(e) => {
error!("Arrow Flight stream recreation failed: {}", e);
Err(e)
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we wait for these records to get acked before we return the new stream or no? I remember there was some discussion recently about it, maybe even a PR. Imo we don't have to, if ingestion fails it will show up in flush or close anyway, and it slows down the method. But I'm fine with both approaches

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue was that in the Scala SDK the recreate method was doing fire and forget for the records. Here the behavior is fine, we wait for the records to be enqueued, waiting for acks is on the client via calling flush.

Comment thread rust/README.md Outdated
- **High Throughput** - Configurable inflight record limits for optimal performance
- **Batch Ingestion** - Ingest multiple records at once with all-or-nothing semantics for maximum throughput
- **Flexible Serialization** - Support for both JSON (simple) and Protocol Buffers (type-safe) data formats
- **Stream Builder** - Fluent builder API for configuring and creating ingestion streams
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a feature

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh not really, removed.

Comment thread rust/README.md
Comment on lines 589 to 604
// Configure stream with callback
let options = StreamConfigurationOptions {
max_inflight_requests: 10000,
ack_callback: Some(Arc::new(MyCallback)),
..Default::default()
};

let mut stream = sdk.create_stream(
table_properties,
client_id,
client_secret,
Some(options),
).await?;
let mut stream = sdk
.stream_builder().table("catalog.schema.orders")
.oauth(client_id, client_secret)
.json()
.max_inflight_requests(10_000)
.ack_callback(Arc::new(MyCallback))
.build()
.await?;

for i in 0..1000 {
let record = YourMessage { id: Some(i), /* ... */ };
stream.ingest_record_offset(record.encode_to_vec()).await?;
let record = serde_json::json!({"id": i, "name": format!("order-{}", i)});
stream.ingest_record_offset(record.to_string()).await?;
// Callback fires when this record is acknowledged
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any special reason this example is changed to json?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By mistake, reverted.

Comment thread rust/README.md
Err(_) => {
// Stream failed, recreate with unacked records
stream = sdk.recreate_stream(stream).await?;
stream = sdk.recreate_stream(&stream).await?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a mistake in the README?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

@elenagaljak-db
Copy link
Copy Markdown
Contributor

Code looks much cleaner 😃

Signed-off-by: teodordelibasic-db <teodor.delibasic@databricks.com>
Signed-off-by: teodordelibasic-db <teodor.delibasic@databricks.com>
elenagaljak-db
elenagaljak-db previously approved these changes Apr 29, 2026
Copy link
Copy Markdown
Contributor

@elenagaljak-db elenagaljak-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work

@teodordelibasic-db teodordelibasic-db added this pull request to the merge queue Apr 29, 2026
Merged via the queue into main with commit 53be0c4 Apr 29, 2026
34 checks passed
@teodordelibasic-db teodordelibasic-db deleted the stream-builder-api branch April 29, 2026 13:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants