Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo-minimal.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1977,6 +1977,15 @@ dependencies = [
"tracing",
]

[[package]]
name = "hyperloglogplus"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "621debdf94dcac33e50475fdd76d34d5ea9c0362a834b9db08c3024696c1fbe3"
dependencies = [
"serde",
]

[[package]]
name = "iana-time-zone"
version = "0.1.64"
Expand Down Expand Up @@ -2831,6 +2840,7 @@ dependencies = [
"hyper",
"hyper-rustls",
"hyper-util",
"hyperloglogplus",
"ipnet",
"maxminddb",
"mockito",
Expand Down
10 changes: 10 additions & 0 deletions Cargo-recent.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1977,6 +1977,15 @@ dependencies = [
"tracing",
]

[[package]]
name = "hyperloglogplus"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "621debdf94dcac33e50475fdd76d34d5ea9c0362a834b9db08c3024696c1fbe3"
dependencies = [
"serde",
]

[[package]]
name = "iana-time-zone"
version = "0.1.64"
Expand Down Expand Up @@ -2831,6 +2840,7 @@ dependencies = [
"hyper",
"hyper-rustls",
"hyper-util",
"hyperloglogplus",
"ipnet",
"maxminddb",
"mockito",
Expand Down
1 change: 1 addition & 0 deletions payjoin-mailroom/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ config = "0.15"
flate2 = { version = "1.1", optional = true }
futures = "0.3"
hex = { package = "hex-conservative", version = "0.1.1" }
hyperloglogplus = "0.4.1"
http = "1.3.1"
http-body-util = "0.1.3"
hyper = { version = "1.6.0", features = ["http1", "server"] }
Expand Down
4 changes: 4 additions & 0 deletions payjoin-mailroom/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ impl<D: Db> Db for MetricsDb<D> {
mailbox_id: &ShortId,
data: Vec<u8>,
) -> Result<Option<()>, Error<Self::OperationalError>> {
self.metrics.record_short_id(mailbox_id);
let result = self.inner.post_v2_payload(mailbox_id, data).await?;
if result.is_some() {
self.metrics.record_db_entry(PayjoinVersion::Two);
Expand All @@ -267,6 +268,7 @@ impl<D: Db> Db for MetricsDb<D> {
&self,
mailbox_id: &ShortId,
) -> Result<Arc<Vec<u8>>, Error<Self::OperationalError>> {
self.metrics.record_short_id(mailbox_id);
self.inner.wait_for_v2_payload(mailbox_id).await
}

Expand All @@ -275,6 +277,7 @@ impl<D: Db> Db for MetricsDb<D> {
mailbox_id: &ShortId,
data: Vec<u8>,
) -> Result<(), Error<Self::OperationalError>> {
self.metrics.record_short_id(mailbox_id);
self.inner.post_v1_response(mailbox_id, data).await
}

Expand All @@ -283,6 +286,7 @@ impl<D: Db> Db for MetricsDb<D> {
mailbox_id: &ShortId,
data: Vec<u8>,
) -> Result<Arc<Vec<u8>>, Error<Self::OperationalError>> {
self.metrics.record_short_id(mailbox_id);
let result = self.inner.post_v1_request_and_wait_for_response(mailbox_id, data).await?;
self.metrics.record_db_entry(PayjoinVersion::One);
Ok(result)
Expand Down
54 changes: 54 additions & 0 deletions payjoin-mailroom/src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,4 +865,58 @@ mod tests {
other => panic!("expected U64 Sum, got {other:?}"),
}
}

#[tokio::test]
async fn post_mailbox_records_short_id_cardinality() {
use opentelemetry_sdk::metrics::{
InMemoryMetricExporter, PeriodicReader, SdkMeterProvider,
};

use crate::db::MetricsDb;
use crate::metrics::{MetricsService, UNIQUE_SHORT_IDS};

let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone()).build();
let provider = SdkMeterProvider::builder().with_reader(reader).build();
let metrics = MetricsService::new(Some(provider.clone()));

let dir = tempfile::tempdir().expect("tempdir");
let db = FilesDb::init(Duration::from_millis(100), dir.keep()).await.expect("db init");
let db = MetricsDb::new(db, metrics);
let ohttp: ohttp::Server =
crate::key_config::gen_ohttp_server_config().expect("ohttp config").into();
let svc = Service::new(db, ohttp, SentinelTag::new([0u8; 32]), None);

let id = valid_short_id_path();
let res = svc
.post_mailbox(&id, Body::from(b"payload".to_vec()))
.await
.expect("post_mailbox should succeed");
assert_eq!(res.status(), StatusCode::OK);

provider.force_flush().expect("flush failed");
let finished = exporter.get_finished_metrics().expect("metrics");
let gauge = finished
.iter()
.flat_map(|rm| rm.scope_metrics())
.flat_map(|sm| sm.metrics())
.find(|m| m.name() == UNIQUE_SHORT_IDS)
.expect("missing unique_short_ids metric");

use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};

match gauge.data() {
AggregatedMetrics::U64(MetricData::Gauge(g)) => {
let points: Vec<_> = g.data_points().collect();
assert!(!points.is_empty(), "expected at least one data point");
let hourly = points.iter().find(|dp| {
dp.attributes().any(|kv| kv == &KeyValue::new("interval", "hourly"))
});
assert!(hourly.is_some(), "expected hourly data point");
assert!(hourly.unwrap().value() >= 1, "expected at least 1 unique short ID");
}
other => panic!("expected U64 Gauge, got {other:?}"),
}
}
}
147 changes: 145 additions & 2 deletions payjoin-mailroom/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,116 @@
use std::collections::hash_map::RandomState;
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};

use opentelemetry::metrics::{Counter, MeterProvider, UpDownCounter};
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use opentelemetry::metrics::{Counter, MeterProvider, ObservableGauge, UpDownCounter};
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use payjoin::directory::ShortId;

pub(crate) const TOTAL_CONNECTIONS: &str = "total_connections";
pub(crate) const ACTIVE_CONNECTIONS: &str = "active_connections";
pub(crate) const HTTP_REQUESTS: &str = "http_request_total";
pub(crate) const DB_ENTRIES: &str = "db_entries_total";
pub(crate) const UNIQUE_SHORT_IDS: &str = "unique_short_ids";

const HLL_PRECISION: u8 = 14;
const HOURLY_RETENTION_HOURS: u64 = 168; // 7 days
const DAILY_RETENTION_DAYS: u64 = 90;

type HllSketch = HyperLogLogPlus<[u8; 8], RandomState>;

fn new_sketch() -> HllSketch {
HyperLogLogPlus::new(HLL_PRECISION, RandomState::new()).expect("precision 14 is always valid")
}

/// Estimates the number of unique ShortIds seen per time window.
/// Two tiers of HLL sketches:
/// - **Hourly** -- one sketch per hour, pruned after 7 days.
/// - **Daily** -- one sketch per day, pruned after 90 days.
struct HllSketches {
hourly: HashMap<u64, HllSketch>,
daily: HashMap<u64, HllSketch>,
}

impl HllSketches {
fn new() -> Self { Self { hourly: HashMap::new(), daily: HashMap::new() } }

fn add_id(&mut self, id: &ShortId) {
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock before UNIX epoch")
.as_secs();
let hour = secs / 3600;
let day = secs / 86400;

self.hourly.entry(hour).or_insert_with(new_sketch).insert(&id.0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tl;dr - hashing &id before insertion here would eliminate a theoretical leak that may result here

since we don't know what hashing HyperLogLogPlus does internally, i think there's a chance that with access to the metrics and access to a list of candidate short IDs, an adversary may be able to confirm if a subset of the short IDs was included, if they are lucky (i.e. those are the extremal values selected).

to prevent this kind of leak the short IDs could be hashed (keyed hash, where the key is secret) before insertion into the HLL sketch, the resulting keys would be a 1:1 mapping (well, almost.... some hash functions are bijective for u64, but i don't think that's really valuable since the chance of collisions on non-duplicate entries is basically 0 for the cardinality we expect)

so basically, by hashing before insertions with some keyed hash (or even a keyed cryptographic hash even), we can ensure that even the complete HLL sketch if published will not leak information about specific short IDs that were used during the relevant time period.

the overhead of even a cryptographic hash here should be negligible, so perhaps the safest approach is to compute a salted SHA256 of the short ID or something like that, but i think a non-cryptographic keyed hash is sufficient since collision resistance against adversarial inputs subsumes not being able to reverse the hash at least if the salt isn't leaked...

if HyperLogLogPlus's hashing is sufficient, perhaps we can control it and ensure that it is adequately keyed, then pinning it as a dependency (to ensure there's no regression) is arguably sufficient, but i think it's easier to reason that the cost of hashing one more time is basically 0 and worth the peace of mind here

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hyperlologplus uses Sip1-3 hash https://docs.rs/crate/hyperloglogplus/latest/source/src/hyperloglogplus.rs, it's a keyed hash function , so i'm thinking it's sufficient

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the sketches only reside in process memory it doesn't even need that, but a comment on the initialization that notes that if sketches are exported, not just counts, then it will become important for privacy to continue to key the hash function with secret randomness (i.e. keep using RandomState::new(), the stdlib hashing makes strong enough guarantees) and to not leak this key. this still applies if prometheus eventually supports cardinality estimation natively, all keys should be hashed with a secret key before being passed to whatever cardinality estimation API may be used in the future.

self.daily.entry(day).or_insert_with(new_sketch).insert(&id.0);

self.hourly.retain(|&k, _| hour.saturating_sub(HOURLY_RETENTION_HOURS) <= k);
self.daily.retain(|&k, _| day.saturating_sub(DAILY_RETENTION_DAYS) <= k);
}

fn hourly_count(&mut self) -> u64 {
let hour = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock before UNIX epoch")
.as_secs()
/ 3600;
self.hourly.get_mut(&hour).map(|hll| hll.count().trunc() as u64).unwrap_or(0)
}

fn daily_count(&mut self) -> u64 {
let day = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock before UNIX epoch")
.as_secs()
/ 86400;
self.daily.get_mut(&day).map(|hll| hll.count().trunc() as u64).unwrap_or(0)
}

fn weekly_count(&mut self) -> u64 {
let today = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock before UNIX epoch")
.as_secs()
/ 86400;
let mut union = new_sketch();
for offset in 0..7 {
if let Some(sketch) = self.daily.get(&(today - offset)) {
union.merge(sketch).expect("same precision");
}
}
union.count().trunc() as u64
}
}

#[derive(Clone)]
pub struct UniqueShortIdTracker {
inner: Arc<Mutex<HllSketches>>,
}

impl UniqueShortIdTracker {
pub fn new() -> Self { Self { inner: Arc::new(Mutex::new(HllSketches::new())) } }

pub fn add_id(&self, id: &ShortId) {
self.inner.lock().expect("tracker lock poisoned").add_id(id);
}

pub fn hourly_count(&self) -> u64 {
self.inner.lock().expect("tracker lock poisoned").hourly_count()
}

pub fn daily_count(&self) -> u64 {
self.inner.lock().expect("tracker lock poisoned").daily_count()
}

pub fn weekly_count(&self) -> u64 {
self.inner.lock().expect("tracker lock poisoned").weekly_count()
}
}

#[derive(Clone)]
pub struct MetricsService {
Expand All @@ -19,6 +122,8 @@ pub struct MetricsService {
active_connections: UpDownCounter<i64>,
/// Total v1/v2 mailbox entries written, labelled by `version`
db_entries_total: Counter<u64>,
tracker: UniqueShortIdTracker,
_unique_ids_gauge: Option<Arc<ObservableGauge<u64>>>,
}

#[repr(u8)]
Expand All @@ -36,6 +141,7 @@ impl fmt::Display for PayjoinVersion {

impl MetricsService {
pub fn new(provider: Option<SdkMeterProvider>) -> Self {
let has_reader = provider.is_some();
let provider = provider.unwrap_or_default();
let meter = provider.meter("payjoin-mailroom");

Expand All @@ -59,7 +165,42 @@ impl MetricsService {
.with_description("Total mailbox entries stored by protocol version")
.build();

Self { http_requests_total, total_connections, active_connections, db_entries_total }
let tracker = UniqueShortIdTracker::new();

let unique_ids_gauge = if has_reader {
let gauge_tracker = tracker.clone();
Some(Arc::new(
meter
.u64_observable_gauge(UNIQUE_SHORT_IDS)
.with_description("Estimated unique short IDs")
.with_callback(move |observer| {
observer.observe(
gauge_tracker.hourly_count(),
&[KeyValue::new("interval", "hourly")],
);
observer.observe(
gauge_tracker.daily_count(),
&[KeyValue::new("interval", "daily")],
);
observer.observe(
gauge_tracker.weekly_count(),
&[KeyValue::new("interval", "weekly")],
);
})
.build(),
))
} else {
None
};

Self {
http_requests_total,
total_connections,
active_connections,
db_entries_total,
tracker,
_unique_ids_gauge: unique_ids_gauge,
}
}

pub fn record_http_request(&self, endpoint: &str, method: &str, status_code: u16) {
Expand All @@ -83,4 +224,6 @@ impl MetricsService {
pub fn record_db_entry(&self, version: PayjoinVersion) {
self.db_entries_total.add(1, &[KeyValue::new("version", version.to_string())]);
}

pub fn record_short_id(&self, id: &ShortId) { self.tracker.add_id(id); }
}
Loading