Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,368 changes: 748 additions & 620 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions reflectapi-demo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ axum = "0.8.1"
http = "1.2.0"
tower-http = { version = "0.6.2", features = ["trace", "cors", "timeout"] }

futures-util = "0.3.28"
uuid = "1.7.0"
chrono = { version = "0.4.37", features = ["serde"] }
chrono-tz = { version = "0.10.4", features = ["serde"] }
Expand All @@ -36,6 +37,7 @@ serde = { version = "1.0.218", features = ["derive", "rc"] }
serde_json = "1.0.139"
indexmap = "2.2.6"
url = "2.5.0"
async-stream = "0.3.6"

[dev-dependencies]
anyhow = "1"
Expand Down
5 changes: 3 additions & 2 deletions reflectapi-demo/clients/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ workspace = true
reflectapi-demo-client-generated = { path = "generated" }

tokio = { version = "1.38.0", features = ["full"] }
reqwest = { version = "0.12.2" }
reqwest = { version = "0.12.2", features = ["stream"] }
futures-util = "0.3"

reflectapi = { workspace = true, features = ["rt"] }
reflectapi = { workspace = true, features = ["rt", "rt-sse"] }
24 changes: 24 additions & 0 deletions reflectapi-demo/clients/rust/generated/src/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,30 @@ pub mod interface {
)
.await
}
/// Stream of change data capture events for pets
#[tracing::instrument(name = "/pets.cdc-events", skip(self, headers))]
pub async fn cdc_events(
&self,
input: reflectapi::Empty,
headers: super::types::myapi::proto::Headers,
) -> reflectapi::rt::StreamResponse<
super::types::myapi::model::output::Pet,
super::types::myapi::proto::UnauthorizedError,
C::Error,
>
where
C::Error: Send + 'static,
{
reflectapi::rt::__stream_request_impl(
&self.client,
self.base_url
.join("/pets.cdc-events")
.expect("checked base_url already and path is valid"),
input,
headers,
)
.await
}
}
}
pub mod types {
Expand Down
147 changes: 110 additions & 37 deletions reflectapi-demo/clients/rust/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,124 @@
use reflectapi_demo_client_generated::{DemoServerClient, Error, ProtocolErrorStage};
use futures_util::StreamExt;
use reflectapi_demo_client_generated::types::myapi::model::input::Pet;
use reflectapi_demo_client_generated::types::myapi::model::Kind;
use reflectapi_demo_client_generated::types::myapi::proto::Headers;
use reflectapi_demo_client_generated::DemoServerClient;

type Client = DemoServerClient<reqwest::Client>;

fn headers() -> Headers {
Headers {
authorization: "password".into(),
}
}

fn pet(name: &str, kind: Kind) -> Pet {
#[allow(deprecated)]
Pet {
name: name.into(),
kind,
age: None,
updated_at: Default::default(),
behaviors: vec![],
}
}

#[tokio::main]
async fn main() {
let client = DemoServerClient::try_new(
let client: Client = DemoServerClient::try_new(
reqwest::Client::new(),
"http://localhost:3000".parse().unwrap(),
)
.unwrap();

let result = client
.health
.check(reflectapi::Empty {}, reflectapi::Empty {})
.await;
// error handling demo:
match result {
Ok(_v) => {
// use structured application response data here
println!("Health check successful")
}
Err(e) => match e {
Error::Application(_v) => {
// use structured application error here
println!("Health check failed")
}
Error::Network(e) => {
println!("Network error: {e:?}")
}
Error::Protocol { info, stage } => match stage {
ProtocolErrorStage::SerializeRequestBody => {
eprint!("Failed to serialize request body: {info}")
}
ProtocolErrorStage::SerializeRequestHeaders => {
eprint!("Failed to serialize request headers: {info}")
}
ProtocolErrorStage::DeserializeResponseBody(body) => {
eprint!("Failed to deserialize response body: {info}: {body:#?}")
println!("streaming cdc events while mutating pets");
let mut stream = client
.pets
.cdc_events(reflectapi::Empty {}, headers())
.await
.expect("start stream");

let received = std::sync::Arc::new(tokio::sync::Mutex::new(Vec::<String>::new()));
let received_clone = received.clone();

let stream_handle = tokio::spawn(async move {
while let Some(item) = stream.next().await {
match item {
Ok(p) => {
println!("received event: {} {:?}", p.name, p.kind);
received_clone.lock().await.push(p.name);
}
ProtocolErrorStage::DeserializeResponseError(status, body) => {
eprint!("Failed to deserialize response error: {info} at {status:?}: {body:#?}")
Err(e) => {
eprintln!("stream error: {e:?}");
break;
}
},
Error::Server(status, body) => {
println!("Server error: {status} with body: {body:?}")
}
},
}
});

tokio::time::sleep(std::time::Duration::from_millis(100)).await;

println!("creating Whiskers");
client
.pets
.create(pet("Whiskers", Kind::Cat { lives: 9 }), headers())
.await
.expect("create Whiskers");

println!("creating Tweety");
client
.pets
.create(pet("Tweety", Kind::Bird), headers())
.await
.expect("create Tweety");

println!("removing Whiskers");
client
.pets
.remove(
reflectapi_demo_client_generated::types::myapi::proto::PetsRemoveRequest {
name: "Whiskers".into(),
},
headers(),
)
.await
.expect("remove Whiskers");

tokio::time::sleep(std::time::Duration::from_millis(500)).await;
stream_handle.abort();
let _ = stream_handle.await;

let received = received.lock().await;
let expected = vec!["Whiskers", "Tweety", "Whiskers"];
if *received == expected {
println!("stream test passed");
} else {
println!(
"stream test FAILED: expected {:?}, got {:?}",
expected, *received
);
}

println!("done")
// println!("{:#?}", result);
println!("removing remaining pets");
let remove = |name: &'static str| {
let client = &client;
async move {
let _ = client
.pets
.remove(
reflectapi_demo_client_generated::types::myapi::proto::PetsRemoveRequest {
name: name.into(),
},
headers(),
)
.await
.inspect_err(|err| eprintln!("failed to remove {name}: {err:?}"));
}
};

remove("Tweety").await;
remove("BadPet").await;
remove("GoodPet").await;

println!("done");
}
Loading
Loading