[Rust] Add stream builder API#155
Conversation
0695a36 to
74b5211
Compare
| pub(crate) workspace_id: String, | ||
| pub(crate) tls_config: Arc<dyn TlsConfig>, |
There was a problem hiding this comment.
nit: I saw claude put this pub(crate) before in my own code, I don't really know whats the purpose of it, but I think it's not necessary
There was a problem hiding this comment.
It's necessary. pub(crate) means "visible within this crate but not to external consumers." These fields were previously private (only accessible within lib.rs), but StreamBuilder lives in a different module (builder/stream_builder.rs) and needs to read self.sdk.workspace_id, self.sdk.tls_config, etc. to construct OAuthHeadersProvider and ZerobusArrowStream. Without pub(crate), the builder can't access them. The same applies to get_or_create_channel_zerobus_client() and ZerobusStream::new_stream().
| pub fn stream_builder( | ||
| &self, | ||
| table_name: impl Into<String>, | ||
| ) -> StreamBuilder<'_, NoFormat, NoAuth> { | ||
| StreamBuilder::new(self, table_name) | ||
| } |
There was a problem hiding this comment.
Is there any specific reason why the table name is a method parameter and not another builder method .table() ?
There was a problem hiding this comment.
Eh not really except that I don't expect a stream to not need a table name in the future, but it probably may be better to have it as a method as well.
There was a problem hiding this comment.
yeah its a central part of ingestion. But still I would say its worth verifying the contract with Vicky
danilonajkov-db
left a comment
There was a problem hiding this comment.
Should we mark the old methods as depricated?
| } | ||
|
|
||
| /// Select compiled protobuf record format. | ||
| pub fn compiled_proto( |
There was a problem hiding this comment.
maybe just .proto() (just personal preference, sounds more straightforward to me)
There was a problem hiding this comment.
Idea here was to differentiate between compiled protobuf and dynamic protobuf. Dynamic Protobuf is essentially used to construct a Descriptor in-code, without having to have a .proto file and then manually compile it before running the actual Rust program. Taking this into consideration, I don't have strong feelings regarding the naming, if you think proto and dynamic_proto, if it ever gets added, makes more sense, I can change it.
| @@ -0,0 +1,665 @@ | |||
| //! Typestate builder for creating Zerobus ingestion streams. | |||
There was a problem hiding this comment.
Some validations from LLM that would be good to add:
- No runtime assertion that descriptor_proto.is_some() when building
CompiledProto. If a future change lets descriptor_proto be cleared independently
of the typestate, this silently constructs a broken stream. Today it's safe
because the only writers to descriptor_proto are .json() (sets None) and
.compiled_proto(desc) (sets Some(desc)), and each pins the typestate accordingly — but that invariant is a code convention, not a compiler guarantee. Compare to
the Arrow path, which does .expect("Arrow format guarantees schema is set")
(stream_builder.rs:919) for the analogous invariant.
- No check that self.auth.is_some() before resolve_headers_provider — relies on
the HasAuth typestate. The .expect("HasAuth guarantees auth is set") at
stream_builder.rs:852 is the runtime fallback if the type system is ever
bypassed.
Also might be worth adding a method is_valid_config inside ZerobusStream::new_stream() that checks all invariants. To make sure the customer can't weirdly configure something while this conversion from old -> new stream creation API is happening.
There was a problem hiding this comment.
Hm so I think most of this comment is now outdated since I removed compile-time validations from the builder API. It just during build checks if all the fields are set and if there aren't any invalid combinations. Also validate is available to be called without calling build.
020bf5f to
c068b2a
Compare
|
|
||
| info!( | ||
| table_name = %new_stream.table_name(), | ||
| "Successfully recreated Arrow Flight stream" | ||
| ); | ||
| for batch in batches { | ||
| let _offset = new_stream.ingest_batch(batch).await?; | ||
| } | ||
|
|
||
| Ok(new_stream) | ||
| Ok(new_stream) | ||
| } | ||
| Err(e) => { | ||
| error!("Arrow Flight stream recreation failed: {}", e); | ||
| Err(e) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Should we wait for these records to get acked before we return the new stream or no? I remember there was some discussion recently about it, maybe even a PR. Imo we don't have to, if ingestion fails it will show up in flush or close anyway, and it slows down the method. But I'm fine with both approaches
There was a problem hiding this comment.
The issue was that in the Scala SDK the recreate method was doing fire and forget for the records. Here the behavior is fine, we wait for the records to be enqueued, waiting for acks is on the client via calling flush.
| - **High Throughput** - Configurable inflight record limits for optimal performance | ||
| - **Batch Ingestion** - Ingest multiple records at once with all-or-nothing semantics for maximum throughput | ||
| - **Flexible Serialization** - Support for both JSON (simple) and Protocol Buffers (type-safe) data formats | ||
| - **Stream Builder** - Fluent builder API for configuring and creating ingestion streams |
There was a problem hiding this comment.
Is this really a feature
There was a problem hiding this comment.
Eh not really, removed.
| // Configure stream with callback | ||
| let options = StreamConfigurationOptions { | ||
| max_inflight_requests: 10000, | ||
| ack_callback: Some(Arc::new(MyCallback)), | ||
| ..Default::default() | ||
| }; | ||
|
|
||
| let mut stream = sdk.create_stream( | ||
| table_properties, | ||
| client_id, | ||
| client_secret, | ||
| Some(options), | ||
| ).await?; | ||
| let mut stream = sdk | ||
| .stream_builder().table("catalog.schema.orders") | ||
| .oauth(client_id, client_secret) | ||
| .json() | ||
| .max_inflight_requests(10_000) | ||
| .ack_callback(Arc::new(MyCallback)) | ||
| .build() | ||
| .await?; | ||
|
|
||
| for i in 0..1000 { | ||
| let record = YourMessage { id: Some(i), /* ... */ }; | ||
| stream.ingest_record_offset(record.encode_to_vec()).await?; | ||
| let record = serde_json::json!({"id": i, "name": format!("order-{}", i)}); | ||
| stream.ingest_record_offset(record.to_string()).await?; | ||
| // Callback fires when this record is acknowledged | ||
| } | ||
|
|
There was a problem hiding this comment.
Any special reason this example is changed to json?
There was a problem hiding this comment.
By mistake, reverted.
| Err(_) => { | ||
| // Stream failed, recreate with unacked records | ||
| stream = sdk.recreate_stream(stream).await?; | ||
| stream = sdk.recreate_stream(&stream).await?; |
There was a problem hiding this comment.
Was this a mistake in the README?
|
Code looks much cleaner 😃 |
Signed-off-by: teodordelibasic-db <teodor.delibasic@databricks.com>
Signed-off-by: teodordelibasic-db <teodor.delibasic@databricks.com>
c068b2a to
7d0278d
Compare
What changes are proposed in this pull request?
This PR adds a
StreamBuilderAPI to the Rust SDK that replaces the existingcreate_stream*family of methods with a fluent builder.All setters can be called in any order. The builder validates at
build()time that table name, authentication, and format have been configured.Motivation:
create_streamAPI is a wide parameter list where adding or removing arguments is a breaking change.Design decisions:
StreamBuilder<'a>with runtime validation atbuild(). Same pattern asZerobusSdkBuilder. Keeps the public API surface small (one exported type) and allows setters in any order.table_nameis a setter (.table()) not a constructor parameter, consistent with all other configuration.build()for gRPC streams (JSON/Proto),build_arrow()for Arrow streams — separate methods because the return types differ (ZerobusStreamvsZerobusArrowStream).validate()lets users check configuration without opening a stream, useful for fail-fast during startup.#[must_use]onStreamBuilderwarns if a builder is created but never built.How is this tested?
stream_builder.rs.rust/tests/src/rust_tests.rs.Follow-up items
These are scoped out of this PR and tracked for subsequent work.
Add
NoOpHeadersProviderand.no_auth()builder method for local testing.Add public getter methods on
ZerobusStreamandZerobusSdkfor fields that are currently public, prepares users for field privatization in version 2.0.0.Unify
StreamConfigurationOptionsandArrowStreamConfigurationOptionsshared fields into a common config struct. Currently each shared setter dual-writes to both config structs.Unify
ZerobusStreamandZerobusArrowStreaminto a single stream type sobuild()can return one type regardless of format.Add idiomatic stream builder APIs to wrapper SDKs (Python, Java, Go, TypeScript). Each language should use its own idiomatic pattern (kwargs in Python, traditional builder in Java, functional options in Go, options object in TypeScript).
Rust 2.0 breaking changes: privatize all public fields on
ZerobusStream,ZerobusSdk,StreamConfigurationOptions,ArrowStreamConfigurationOptions,TableProperties(add getters). Remove deprecatedcreate_stream*methods,ZerobusSdk::new(),use_tls,ingest_record/ingest_records(non-offset variants), andrecord_typefrom config.