Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
432 changes: 241 additions & 191 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = "0.7.11"
libp2p = { version = "0.56", features = ["full", "secp256k1"] }
url = "2.5"
percent-encoding = "2.3"
aes = "0.8.4"
ctr = "0.9.2"
cipher = "0.4.4"
Expand Down
14 changes: 10 additions & 4 deletions crates/cli/src/commands/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ pub enum ConsoleColor {
Disable,
}

/// Builds a console tracing configuration for CLI commands.
/// Builds a tracing configuration for CLI commands, optionally enabling Loki.
///
/// `loki` is `Some` when the caller wants events forwarded to a Loki endpoint
/// (e.g. via `--loki-addresses`), and `None` for commands that only need
/// console output.
// TODO: wire `log-output-path` (Charon's `LogOutputPath`) into the file layer.
pub fn build_console_tracing_config(
level: impl Into<String>,
color: &ConsoleColor,
loki: Option<pluto_tracing::LokiConfig>,
) -> pluto_tracing::TracingConfig {
let mut builder = pluto_tracing::TracingConfig::builder().with_default_console();

Expand All @@ -36,9 +42,9 @@ pub fn build_console_tracing_config(
ConsoleColor::Disable => builder.console_with_ansi(false),
};

// TODO: Handle loki config

// TODO: Handle log output path
if let Some(loki) = loki {
builder = builder.loki(loki);
}

builder.override_env_filter(level.into()).build()
}
Expand Down
3 changes: 2 additions & 1 deletion crates/cli/src/commands/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ impl TryFrom<DkgArgs> for pluto_dkg::dkg::Config {
fn try_from(args: DkgArgs) -> Result<Self> {
validate_p2p_args(&args.p2p)?;

let tracing_config = build_console_tracing_config(args.log.level.clone(), &args.log.color);
let tracing_config =
build_console_tracing_config(args.log.level.clone(), &args.log.color, None);
let p2p_config = {
let mut relays = Vec::new();

Expand Down
84 changes: 81 additions & 3 deletions crates/cli/src/commands/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ use crate::{
};
use libp2p::multiaddr::Protocol;
use pluto_p2p::k1;
use std::path::PathBuf;
use std::{collections::HashMap, path::PathBuf, time::Duration};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

/// Grace period given to the Loki background task to flush buffered logs
/// once `BackgroundTaskController::shutdown` has been signalled.
const LOKI_FLUSH_TIMEOUT: Duration = Duration::from_secs(3);

/// Arguments for the relay command.
#[derive(clap::Args, Clone)]
pub struct RelayArgs {
Expand Down Expand Up @@ -60,7 +64,34 @@ impl TryInto<pluto_relay_server::config::Config> for RelayArgs {
}
};

let log_config = build_console_tracing_config(self.log.level.clone(), &self.log.color);
let loki_config = match self.loki.loki_addresses.as_slice() {
[] => None,
[loki_url, rest @ ..] => {
if !rest.is_empty() {
// Charon fans logs out to every entry in `loki-addresses`, but
// `pluto_tracing::TracingConfig` only supports a single Loki
// layer today. `tracing::warn!` would be a no-op here because
// no subscriber is installed yet (init happens later inside
// `commands::relay::run`), so write directly to stderr.
eprintln!(
"warning: {extra} additional --loki-addresses ignored; only the first is used",
extra = rest.len(),
);
}

let labels =
HashMap::from([("service".to_string(), self.loki.loki_service.clone())]);

Some(pluto_tracing::LokiConfig {
loki_url: loki_url.clone(),
Comment thread
varex83 marked this conversation as resolved.
labels,
extra_fields: HashMap::new(),
})
}
};

let log_config =
build_console_tracing_config(self.log.level.clone(), &self.log.color, loki_config);

let builder = pluto_relay_server::config::Config::builder()
.data_dir(self.data_dir.data_dir)
Expand Down Expand Up @@ -264,6 +295,53 @@ pub struct RelayLokiArgs {
pub async fn run(
config: pluto_relay_server::config::Config,
ct: CancellationToken,
) -> Result<(), CliError> {
let loki_shutdown = match pluto_tracing::init(&config.log_config) {
Ok(Some(loki)) => {
let controller = loki.controller;
let handle = tokio::spawn(loki.task);
Some((controller, handle))
}
Ok(None) => None,
// In tests, the global tracing subscriber is shared across runs in the
// same process, so reinitializing fails. In production this would mean
// the relay silently uses an unrelated subscriber and Loki forwarding
// is dropped — fail loudly instead.
#[cfg(test)]
Err(pluto_tracing::init::Error::Init(_)) => None,
Err(err) => return Err(err.into()),
};

// Run the relay in an inner scope so every early `?` / `return Err(..)` is
// captured into `result` and the Loki cleanup below always runs.
let result = serve_relay(&config, ct).await;

if let Err(err) = &result {
// Surface the shutdown reason through the subscriber so it reaches
// Loki before we close the worker; `main` only `eprintln!`s the
// returned error and that path bypasses the tracing subscriber.
error!(error = %err, "relay exited with error");
}

// Drain the Loki worker under a single budget so a hung Loki endpoint
// (e.g. `controller.shutdown` blocked on a full mpsc) cannot wedge
// process exit. After the budget elapses we hard-abort the worker.
if let Some((controller, handle)) = loki_shutdown {
let abort_handle = handle.abort_handle();
let _ = tokio::time::timeout(LOKI_FLUSH_TIMEOUT, async {
controller.shutdown().await;
let _ = handle.await;
})
.await;
abort_handle.abort();
}

result
}

async fn serve_relay(
config: &pluto_relay_server::config::Config,
ct: CancellationToken,
) -> Result<(), CliError> {
info!("{LICENSE}");
info!(config = ?config);
Expand Down Expand Up @@ -291,7 +369,7 @@ pub async fn run(
e => e,
}?;

pluto_relay_server::p2p::run_relay_p2p_node(&config, key, ct)
pluto_relay_server::p2p::run_relay_p2p_node(config, key, ct)
.await
.map(|_| ())
.map_err(Into::into)
Expand Down
1 change: 0 additions & 1 deletion crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ async fn run() -> std::result::Result<(), CliError> {
}
Commands::Relay(args) => {
let config: pluto_relay_server::config::Config = (*args).clone().try_into()?;
pluto_tracing::init(&config.log_config).expect("Failed to initialize tracing");
commands::relay::run(config, ct).await
}
Commands::Alpha(args) => match args.command {
Expand Down
7 changes: 2 additions & 5 deletions crates/peerinfo/examples/peerinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,8 @@ async fn main() -> anyhow::Result<()> {

// Initialize tracing with optional Loki support
let tracing_config = build_tracing_config(&args);
let loki_task = pluto_tracing::init(&tracing_config)?;

// Spawn Loki background task if configured
if let Some(task) = loki_task {
tokio::spawn(task);
if let Some(loki) = pluto_tracing::init(&tracing_config)? {
tokio::spawn(loki.task);
tracing::info!("Loki logging enabled");
}

Expand Down
2 changes: 2 additions & 0 deletions crates/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ tracing-loki.workspace = true
thiserror.workspace = true
tokio.workspace = true
vise.workspace = true
base64.workspace = true
percent-encoding.workspace = true

[[example]]
name = "basic"
Expand Down
6 changes: 3 additions & 3 deletions crates/tracing/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ async fn main() {
.override_env_filter("debug")
.build();

let background_task = init(&config)
let loki = init(&config)
.expect("Failed to initialize tracing")
.expect("Background task should be Some");
.expect("Loki background task should be Some");

tokio::spawn(background_task);
tokio::spawn(loki.task);

let bind_address = SocketAddr::from(([0, 0, 0, 0], 9464));

Expand Down
65 changes: 63 additions & 2 deletions crates/tracing/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, fmt};

/// Configuration for the tracing.
#[derive(Debug, Clone, Default)]
Expand All @@ -20,7 +20,7 @@ pub struct TracingConfig {
}

/// Configuration for the loki logging.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct LokiConfig {
/// URL of the Loki instance.
pub loki_url: String,
Expand All @@ -32,6 +32,31 @@ pub struct LokiConfig {
pub extra_fields: HashMap<String, String>,
}

impl fmt::Debug for LokiConfig {
// Redacts basic-auth credentials embedded in `loki_url` so the value is
// safe to log via `info!(config = ?config)`.
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LokiConfig")
.field("loki_url", &redact_url_userinfo(&self.loki_url))
.field("labels", &self.labels)
.field("extra_fields", &self.extra_fields)
.finish()
}
}

fn redact_url_userinfo(raw: &str) -> String {
let Ok(mut url) = tracing_loki::url::Url::parse(raw) else {
return raw.to_string();
};
if url.username().is_empty() && url.password().is_none() {
return url.into();
}
if url.set_username("").is_err() || url.set_password(None).is_err() {
return "<redacted: unable to strip credentials>".to_string();
}
url.into()
}

/// Configuration for the console logging.
#[derive(Debug, Clone)]
pub struct ConsoleConfig {
Expand Down Expand Up @@ -183,3 +208,39 @@ impl TracingConfig {
TracingConfigBuilder::new()
}
}

#[cfg(test)]
mod tests {
use super::*;

fn loki_with_url(url: &str) -> LokiConfig {
LokiConfig {
loki_url: url.to_string(),
labels: HashMap::new(),
extra_fields: HashMap::new(),
}
}

#[test]
fn debug_redacts_basic_auth_credentials() {
let cfg = loki_with_url("https://user:secret@loki.example.com/push");
let dbg = format!("{cfg:?}");
assert!(!dbg.contains("user"), "username leaked in Debug: {dbg}");
assert!(!dbg.contains("secret"), "password leaked in Debug: {dbg}");
assert!(dbg.contains("loki.example.com"));
}

#[test]
fn debug_preserves_url_without_credentials() {
let cfg = loki_with_url("https://loki.example.com/loki/api/v1/push");
let dbg = format!("{cfg:?}");
assert!(dbg.contains("loki.example.com/loki/api/v1/push"));
}

#[test]
fn debug_falls_back_on_unparseable_url() {
let cfg = loki_with_url("not a url");
let dbg = format!("{cfg:?}");
assert!(dbg.contains("not a url"));
}
}
Loading
Loading