Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ pub use async_activity_handle::{
ActivityHeartbeatResponse, ActivityIdentifier, AsyncActivityHandle,
};

pub use metrics::{LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME};
pub use metrics::{
LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME,
RPC_MESSAGE_SIZE_HISTOGRAM_NAME,
};
pub use options_structs::*;
pub use replaceable::SharedReplaceableClient;
pub use retry::RetryOptions;
Expand Down
110 changes: 105 additions & 5 deletions crates/client/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::{AttachMetricLabels, CallType, callback_based, dbg_panic};
use bytes::Bytes;
use futures_util::{
FutureExt, TryFutureExt,
future::{BoxFuture, Either},
};
use http_body_util::BodyExt;
use std::{
fmt,
task::{Context, Poll},
Expand All @@ -11,13 +13,16 @@ use std::{
use temporalio_common::telemetry::{
TaskQueueLabelStrategy,
metrics::{
Counter, CounterBase, HistogramDuration, HistogramDurationBase, MetricAttributable,
MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter,
Counter, CounterBase, Histogram, HistogramBase, HistogramDuration, HistogramDurationBase,
MESSAGE_DIRECTION_REQUEST, MESSAGE_DIRECTION_RESPONSE, MetricAttributable,
MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter, message_direction,
},
};
use tonic::{Code, body::Body, transport::Channel};
use tower::Service;

pub use temporalio_common::telemetry::metrics::RPC_MESSAGE_SIZE_HISTOGRAM_NAME;

/// The string name (which may be prefixed) for this metric
pub static REQUEST_LATENCY_HISTOGRAM_NAME: &str = "request_latency";
/// The string name (which may be prefixed) for this metric
Expand All @@ -40,6 +45,7 @@ struct Instruments {

svc_request_latency: HistogramDuration,
long_svc_request_latency: HistogramDuration,
rpc_message_size: Histogram,
}

impl MetricsContext {
Expand Down Expand Up @@ -75,6 +81,11 @@ impl MetricsContext {
unit: "duration".into(),
description: "Histogram of client long-poll request latencies".into(),
}),
rpc_message_size: tm.histogram(MetricParameters {
name: RPC_MESSAGE_SIZE_HISTOGRAM_NAME.into(),
unit: "By".into(),
description: "Histogram of client gRPC request and response body sizes".into(),
}),
};
Self {
poll_is_long: false,
Expand Down Expand Up @@ -109,8 +120,14 @@ impl MetricsContext {
.long_svc_request_latency
.with_attributes(self.meter.get_default_attributes())
})
.map(|v| {
.and_then(|v| {
self.instruments.long_svc_request_latency = v;
self.instruments
.rpc_message_size
.with_attributes(self.meter.get_default_attributes())
})
.map(|v| {
self.instruments.rpc_message_size = v;
})
.inspect_err(|e| {
dbg_panic!("Failed to extend client metrics attributes: {:?}", e);
Expand Down Expand Up @@ -166,6 +183,10 @@ impl MetricsContext {
self.instruments.svc_request_latency.records(dur);
}
}

pub(crate) fn rpc_message_size(&self, size_bytes: u64) {
self.instruments.rpc_message_size.records(size_bytes);
}
}

const KEY_NAMESPACE: &str = "namespace";
Expand Down Expand Up @@ -216,6 +237,44 @@ fn code_as_screaming_snake(code: &Code) -> &'static str {
}
}

struct BodySizeRecorder {
size_bytes: u64,
record: Box<dyn Fn(u64) + Send + Sync>,
}

impl BodySizeRecorder {
fn new(record: impl Fn(u64) + Send + Sync + 'static) -> Self {
Self {
size_bytes: 0,
record: Box::new(record),
}
}

fn add_frame(&mut self, frame: &Bytes) {
self.size_bytes += frame.len() as u64;
}
}

impl Drop for BodySizeRecorder {
fn drop(&mut self) {
(self.record)(self.size_bytes);
}
}

fn body_with_size_recorder(body: Body, recorder: BodySizeRecorder) -> Body {
let mut recorder = recorder;
Body::new(body.map_frame(move |frame| {
if let Some(data) = frame.data_ref() {
recorder.add_frame(data);
}
frame
}))
}

fn body_size_recorder(metrics: MetricsContext) -> BodySizeRecorder {
BodySizeRecorder::new(move |size_bytes| metrics.rpc_message_size(size_bytes))
}

/// Implements metrics functionality for gRPC (really, any http) calls
#[derive(Debug, Clone)]
pub(crate) struct GrpcMetricSvc {
Expand Down Expand Up @@ -294,6 +353,11 @@ impl Service<http::Request<Body>> for GrpcMetricSvc {
metrics
})
});
if let Some(metrics) = metrics.as_ref() {
let mut req_metrics = metrics.clone();
req_metrics.with_new_attrs([message_direction(MESSAGE_DIRECTION_REQUEST)]);
req = req.map(|body| body_with_size_recorder(body, body_size_recorder(req_metrics)));
}
let callfut = match &mut self.inner {
ChannelOrGrpcOverride::Channel(inner) => {
Either::Left(inner.call(req).map_err(Into::into))
Expand All @@ -306,7 +370,7 @@ impl Service<http::Request<Body>> for GrpcMetricSvc {
async move {
let started = Instant::now();
let res = callfut.await;
if let Some(metrics) = metrics {
if let Some(metrics) = metrics.as_ref() {
metrics.record_svc_req_latency(started.elapsed());
match res {
Ok(ref ok_res) => {
Expand Down Expand Up @@ -339,8 +403,44 @@ impl Service<http::Request<Body>> for GrpcMetricSvc {
}
}
}
res
match (res, metrics) {
(Ok(res), Some(metrics)) => {
let mut resp_metrics = metrics;
resp_metrics.with_new_attrs([message_direction(MESSAGE_DIRECTION_RESPONSE)]);
Ok(res.map(|body| {
body_with_size_recorder(body, body_size_recorder(resp_metrics))
}))
}
(res, _) => res,
}
}
.boxed()
}
}

#[cfg(test)]
mod tests {
use super::*;
use http_body_util::Full;
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};

#[tokio::test]
async fn body_size_recorder_counts_data_frames() {
let recorded = Arc::new(AtomicU64::new(0));
let recorded_clone = recorded.clone();
let body = Body::new(Full::new(Bytes::from_static(b"hello")));
let body = body_with_size_recorder(
body,
BodySizeRecorder::new(move |size_bytes| {
recorded_clone.store(size_bytes, Ordering::Relaxed);
}),
);

let _ = body.collect().await.unwrap();

assert_eq!(recorded.load(Ordering::Relaxed), 5);
}
}
52 changes: 46 additions & 6 deletions crates/common/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ pub const ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str =
"activity_schedule_to_start_latency";
/// The string name (which may be prefixed) for this metric
pub const ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME: &str = "activity_execution_latency";
/// The string name (which may be prefixed) for this metric
pub const WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME: &str = "workflow_payload_size";
/// The string name (which may be prefixed) for this metric
pub const ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME: &str = "activity_payload_size";
/// The string name (which may be prefixed) for this metric
pub const RPC_MESSAGE_SIZE_HISTOGRAM_NAME: &str = "rpc_message_size";

const KEY_MESSAGE_DIRECTION: &str = "message_direction";
/// Kept shared so all payload-size emitters use the same request direction label.
pub const MESSAGE_DIRECTION_REQUEST: &str = "request";
/// Kept shared so all payload-size emitters use the same response direction label.
pub const MESSAGE_DIRECTION_RESPONSE: &str = "response";

/// Keeps the payload-size label key consistent across client and core emitters.
pub fn message_direction(direction: &'static str) -> MetricKeyValue {
MetricKeyValue::new(KEY_MESSAGE_DIRECTION, direction)
}

/// Helps define buckets once in terms of millis, but also generates a seconds version
macro_rules! define_latency_buckets {
Expand All @@ -37,12 +54,7 @@ macro_rules! define_latency_buckets {
pub(super) static $sec_name: &[f64] = &[$( $bucket / 1000.0, )*];
)*

/// Returns the default histogram buckets that lang should use for a given metric name if
/// they have not been overridden by the user. If `use_seconds` is true, returns buckets
/// in terms of seconds rather than milliseconds.
///
/// The name must *not* be prefixed with `temporal_`
pub fn default_buckets_for(histo_name: &str, use_seconds: bool) -> &'static [f64] {
fn default_duration_buckets_for(histo_name: &str, use_seconds: bool) -> &'static [f64] {
match histo_name {
$(
$metric_name => { if use_seconds { &$sec_name } else { &$name } },
Expand Down Expand Up @@ -104,6 +116,34 @@ define_latency_buckets!(
)
);

pub(super) static PAYLOAD_SIZE_BUCKETS: &[f64] = &[
128.0,
512.0,
1_024.0,
4_096.0,
16_384.0,
65_536.0,
262_144.0,
1_048_576.0,
4_194_304.0,
16_777_216.0,
67_108_864.0,
];

/// Returns the default histogram buckets that lang should use for a given metric name if they have
/// not been overridden by the user. If `use_seconds` is true, duration metric buckets are returned
/// in terms of seconds rather than milliseconds.
///
/// The name must *not* be prefixed with `temporal_`.
pub fn default_buckets_for(histo_name: &str, use_seconds: bool) -> &'static [f64] {
match histo_name {
WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME
| ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME
| RPC_MESSAGE_SIZE_HISTOGRAM_NAME => PAYLOAD_SIZE_BUCKETS,
_ => default_duration_buckets_for(histo_name, use_seconds),
}
}

/// Implementors of this trait are expected to be defined in each language's bridge.
/// The implementor is responsible for the allocation/instantiation of new metric meters which
/// Core has requested.
Expand Down
21 changes: 16 additions & 5 deletions crates/common/src/telemetry/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use super::{
HistogramBucketOverrides, MetricTemporality, OtelCollectorOptions, OtlpProtocol,
TELEM_SERVICE_NAME,
metrics::{
ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME,
CoreMeter, Counter, DEFAULT_MS_BUCKETS, DEFAULT_S_BUCKETS, Gauge, GaugeF64, Histogram,
HistogramBase, HistogramDuration, HistogramDurationBase, HistogramF64, HistogramF64Base,
MetricAttributable, MetricAttributes, MetricParameters, NewAttributes, UpDownCounter,
WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME,
ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME, ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME,
ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, CoreMeter, Counter, DEFAULT_MS_BUCKETS,
DEFAULT_S_BUCKETS, Gauge, GaugeF64, Histogram, HistogramBase, HistogramDuration,
HistogramDurationBase, HistogramF64, HistogramF64Base, MetricAttributable,
MetricAttributes, MetricParameters, NewAttributes, RPC_MESSAGE_SIZE_HISTOGRAM_NAME,
UpDownCounter, WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME, WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME,
WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME,
WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME,
WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME, default_buckets_for,
},
Expand Down Expand Up @@ -94,6 +96,15 @@ pub(super) fn augment_meter_provider_with_defaults(
ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME,
use_seconds,
));
mpb = mpb.with_view(histo_view(
WORKFLOW_PAYLOAD_SIZE_HISTOGRAM_NAME,
use_seconds,
));
mpb = mpb.with_view(histo_view(
ACTIVITY_PAYLOAD_SIZE_HISTOGRAM_NAME,
use_seconds,
));
mpb = mpb.with_view(histo_view(RPC_MESSAGE_SIZE_HISTOGRAM_NAME, use_seconds));
// Fallback default
mpb = mpb.with_view(move |ins: &Instrument| {
if ins.kind() == InstrumentKind::Histogram {
Expand Down
Loading