Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions eventstore-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,14 @@ pub fn options(input: TokenStream) -> TokenStream {
}

impl #name {
/// Performs the command with the given credentials.
pub fn authenticated(mut self, credentials: crate::types::Credentials) -> Self {
self.common_operation_options.credentials = Some(credentials);
/// Performs the command with the given authentication.
///
/// Accepts `Credentials` or `Authentication` (the latter for Bearer tokens).
pub fn authenticated<A>(mut self, authentication: A) -> Self
where
A: Into<crate::types::Authentication>,
{
self.common_operation_options.authentication = Some(authentication.into());
self
}

Expand Down
2 changes: 1 addition & 1 deletion kurrentdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
authors = ["Yorick Laupa <yo.eight@gmail.com>"]
edition = "2024"
name = "kurrentdb"
version = "1.0.0"
version = "1.1.0"

# Uncomment if you want to update messages.rs code-gen.
# We disabled codegen.rs because it requires having `protoc` installed on your machine
Expand Down
1 change: 1 addition & 0 deletions kurrentdb/src/event_store/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use chrono::{DateTime, Utc};
use std::ops::Add;
use std::time::{Duration, SystemTime};

#[allow(dead_code)]
pub mod common;
pub mod google_rpc;
pub mod gossip;
Expand Down
12 changes: 6 additions & 6 deletions kurrentdb/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,10 +949,10 @@ impl NodeConnection {

loop {
if let Some(request) = request.take() {
if self.id != request.correlation {
if let Some(handle) = self.handle.clone() {
return Ok(handle);
}
if self.id != request.correlation
&& let Some(handle) = self.handle.clone()
{
return Ok(handle);
}

failed_endpoint = self.handle.take().map(|h| h.endpoint);
Expand Down Expand Up @@ -1439,15 +1439,15 @@ fn determine_best_node(

let member_opt = members.min_by(|a, b| {
if let NodePreference::Random = preference {
if rng.next_u32() % 2 == 0 {
if rng.next_u32().is_multiple_of(2) {
return Ordering::Greater;
}

return Ordering::Less;
}

if preference.match_preference(&a.state) && preference.match_preference(&b.state) {
if rng.next_u32() % 2 == 0 {
if rng.next_u32().is_multiple_of(2) {
return Ordering::Less;
} else {
return Ordering::Greater;
Expand Down
108 changes: 100 additions & 8 deletions kurrentdb/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
use tracing::error;
pub mod persistent_subscriptions;

pub(crate) fn resolve_authentication(
options: &crate::options::CommonOperationOptions,
settings: &crate::ClientSettings,
) -> Option<crate::Authentication> {
options.authentication.clone().or_else(|| {
settings
.default_authenticated_user()
.as_ref()
.map(|c| crate::Authentication::Basic(c.clone()))
})
}

pub fn http_configure_auth(
builder: reqwest::RequestBuilder,
creds_opt: Option<&crate::Credentials>,
auth_opt: Option<&crate::Authentication>,
) -> reqwest::RequestBuilder {
if let Some(creds) = creds_opt {
builder.basic_auth(
unsafe { std::str::from_utf8_unchecked(creds.login.as_ref()) },
unsafe { Some(std::str::from_utf8_unchecked(creds.password.as_ref())) },
)
} else {
builder
match auth_opt {
Some(crate::Authentication::Basic(creds)) => builder.basic_auth(
String::from_utf8_lossy(creds.login.as_ref()),
Some(String::from_utf8_lossy(creds.password.as_ref())),
),
Some(crate::Authentication::Bearer(token)) => {
builder.bearer_auth(String::from_utf8_lossy(token.as_ref()))
}
Comment thread
qodo-code-review[bot] marked this conversation as resolved.
None => builder,
}
}

Expand Down Expand Up @@ -66,3 +80,81 @@ pub async fn http_execute_request(
}
}
}

#[cfg(test)]
mod auth_tests {
use super::*;
use crate::options::CommonOperationOptions;
use crate::{Authentication, ClientSettings, Credentials};

fn settings_from(connection_string: &str) -> ClientSettings {
connection_string
.parse::<ClientSettings>()
.expect("valid connection string")
}

fn authorization_header(builder: reqwest::RequestBuilder) -> Option<String> {
let request = builder.build().expect("buildable request");
request
.headers()
.get(reqwest::header::AUTHORIZATION)
.map(|v| v.to_str().expect("ASCII header").to_owned())
}

fn fresh_builder() -> reqwest::RequestBuilder {
reqwest::Client::new().get("http://localhost/")
}

#[test]
fn http_configure_auth_with_basic_sets_basic_authorization_header() {
let auth = Authentication::basic("admin", "changeit");
let header = authorization_header(http_configure_auth(fresh_builder(), Some(&auth)))
.expect("authorization header present");
assert_eq!(header, "Basic YWRtaW46Y2hhbmdlaXQ=");
}

#[test]
fn http_configure_auth_with_bearer_sets_bearer_authorization_header() {
let auth = Authentication::bearer("abc.def.ghi");
let header = authorization_header(http_configure_auth(fresh_builder(), Some(&auth)))
.expect("authorization header present");
assert_eq!(header, "Bearer abc.def.ghi");
}

#[test]
fn http_configure_auth_with_none_leaves_authorization_unset() {
assert!(authorization_header(http_configure_auth(fresh_builder(), None)).is_none());
}

#[test]
fn resolve_authentication_prefers_per_call_over_default_user() {
let settings = settings_from("esdb://admin:changeit@localhost:2113?tls=false");
let common = CommonOperationOptions {
authentication: Some(Authentication::bearer("call-token")),
..Default::default()
};

let resolved = resolve_authentication(&common, &settings).expect("present");
assert_eq!(resolved, Authentication::bearer("call-token"));
}

#[test]
fn resolve_authentication_falls_back_to_default_user_as_basic() {
let settings = settings_from("esdb://admin:changeit@localhost:2113?tls=false");
let common = CommonOperationOptions::default();

let resolved = resolve_authentication(&common, &settings).expect("present");
assert_eq!(
resolved,
Authentication::Basic(Credentials::new("admin", "changeit"))
);
}

#[test]
fn resolve_authentication_returns_none_when_neither_configured() {
let settings = settings_from("esdb://localhost:2113?tls=false");
let common = CommonOperationOptions::default();

assert!(resolve_authentication(&common, &settings).is_none());
}
}
50 changes: 10 additions & 40 deletions kurrentdb/src/http/persistent_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,8 @@ pub(crate) async fn replay_parked_messages(
builder = builder.query(&[("stopAt", stop_at.to_string().as_str())])
}

builder = super::http_configure_auth(
builder,
options
.common_operation_options
.credentials
.as_ref()
.or_else(|| settings.default_authenticated_user().as_ref()),
);
let auth = super::resolve_authentication(&options.common_operation_options, settings);
builder = super::http_configure_auth(builder, auth.as_ref());

super::http_execute_request(builder).await?;

Expand Down Expand Up @@ -132,14 +126,8 @@ pub(crate) async fn list_all_persistent_subscriptions(
.get(format!("{}/subscriptions", handle.url()))
.header("content-type", "application/json");

builder = super::http_configure_auth(
builder,
options
.common_operation_options
.credentials
.as_ref()
.or_else(|| settings.default_authenticated_user().as_ref()),
);
let auth = super::resolve_authentication(&options.common_operation_options, settings);
builder = super::http_configure_auth(builder, auth.as_ref());

let resp = super::http_execute_request(builder).await?;

Expand Down Expand Up @@ -179,14 +167,8 @@ where
))
.header("content-type", "application/json");

builder = super::http_configure_auth(
builder,
options
.common_operation_options
.credentials
.as_ref()
.or_else(|| settings.default_authenticated_user().as_ref()),
);
let auth = super::resolve_authentication(&options.common_operation_options, settings);
builder = super::http_configure_auth(builder, auth.as_ref());

let resp = super::http_execute_request(builder).await?;

Expand Down Expand Up @@ -228,14 +210,8 @@ where
))
.header("content-type", "application/json");

builder = super::http_configure_auth(
builder,
options
.common_operation_options
.credentials
.as_ref()
.or_else(|| settings.default_authenticated_user().as_ref()),
);
let auth = super::resolve_authentication(&options.common_operation_options, settings);
builder = super::http_configure_auth(builder, auth.as_ref());

let resp = super::http_execute_request(builder).await?;

Expand All @@ -258,14 +234,8 @@ pub(crate) async fn restart_persistent_subscription_subsystem(
.header("content-type", "application/json")
.header("content-length", "0");

builder = super::http_configure_auth(
builder,
options
.common_operation_options
.credentials
.as_ref()
.or_else(|| settings.default_authenticated_user().as_ref()),
);
let auth = super::resolve_authentication(&options.common_operation_options, settings);
builder = super::http_configure_auth(builder, auth.as_ref());

super::http_execute_request(builder).await?;

Expand Down
7 changes: 6 additions & 1 deletion kurrentdb/src/operations/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,14 @@ pub(crate) async fn http_read(
.danger_accept_invalid_certs(!setts.tls_verify_cert)
.build()?;

let default_auth = setts
.default_user_name
.as_ref()
.map(|c| crate::Authentication::Basic(c.clone()));

let resp = http_configure_auth(
client.get(format!("{}/gossip", handle.url())),
setts.default_user_name.as_ref(),
default_auth.as_ref(),
)
.send()
.await?;
Expand Down
4 changes: 2 additions & 2 deletions kurrentdb/src/options/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use crate::Credentials;
use crate::Authentication;

pub mod append_to_stream;
pub mod batch_append;
Expand All @@ -21,7 +21,7 @@ pub(crate) trait Options {

#[derive(Clone, Default)]
pub(crate) struct CommonOperationOptions {
pub(crate) credentials: Option<Credentials>,
pub(crate) authentication: Option<Authentication>,
pub(crate) requires_leader: bool,
pub(crate) deadline: Option<Duration>,
}
Expand Down
Loading
Loading