Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 55 additions & 16 deletions docs/operations/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -5316,25 +5316,51 @@ kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[s
<tr>
<td>

flush-latency-avg
</td>
flush-latency-avg (deprecated)
</td>
<td>

The average flush execution time in ns.
</td>
The average flush execution time in ns. Deprecated: use commit-latency-avg instead.
</td>
<td>

kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
</td> </tr>
<tr>
</td> </tr>
<tr>
<td>

flush-latency-max
</td>
flush-latency-max (deprecated)
</td>
<td>

The maximum flush execution time in ns.
</td>
The maximum flush execution time in ns. Deprecated: use commit-latency-max instead.
</td>
<td>

kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
</td> </tr>
<tr>
<td>

commit-latency-avg
</td>
<td>

The average commit execution time in ns.
</td>
<td>

kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
</td> </tr>
<tr>
<td>

commit-latency-max
</td>
<td>

The maximum commit execution time in ns.
</td>
<td>

kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
Expand Down Expand Up @@ -5472,17 +5498,30 @@ kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[s
<tr>
<td>

flush-rate
</td>
flush-rate (deprecated)
</td>
<td>

The average flush rate for this store.
</td>
The average flush rate for this store. Deprecated: use commit-rate instead.
</td>
<td>

kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
</td> </tr>
<tr>
</td> </tr>
<tr>
<td>

commit-rate
</td>
<td>

The average commit rate for this store.
</td>
<td>

kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
</td> </tr>
<tr>
<td>

restore-rate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class MeteredKeyValueStore<K, V>
protected Sensor allSensor;
protected Sensor rangeSensor;
protected Sensor prefixScanSensor;
private Sensor flushSensor;
private Sensor commitSensor;
private Sensor e2eLatencySensor;
protected Sensor iteratorDurationSensor;
protected InternalProcessorContext<?, ?> internalContext;
Expand Down Expand Up @@ -143,6 +143,7 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
super.init(stateStoreContext, root);
}

@SuppressWarnings("deprecation")
private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
putIfAbsentSensor = StateStoreMetrics.putIfAbsentSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
Expand All @@ -151,7 +152,10 @@ private void registerMetrics() {
allSensor = StateStoreMetrics.allSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
rangeSensor = StateStoreMetrics.rangeSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
prefixScanSensor = StateStoreMetrics.prefixScanSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// flush metrics ar deprecated per KIP-1035 and will be removed in the next major release.
// Here we just register the sensor without recording
StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
commitSensor = StateStoreMetrics.commitSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
deleteSensor = StateStoreMetrics.deleteSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
Expand Down Expand Up @@ -416,7 +420,7 @@ public KeyValueIterator<K, V> reverseAll() {

@Override
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
maybeMeasureLatency(() -> super.commit(changelogOffsets), time, flushSensor);
maybeMeasureLatency(() -> super.commit(changelogOffsets), time, commitSensor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class MeteredSessionStore<K, V>
protected StreamsMetricsImpl streamsMetrics;
protected Sensor putSensor;
protected Sensor fetchSensor;
protected Sensor flushSensor;
protected Sensor commitSensor;
protected Sensor removeSensor;
protected Sensor e2eLatencySensor;
protected Sensor iteratorDurationSensor;
Expand Down Expand Up @@ -119,10 +119,14 @@ public void init(final StateStoreContext stateStoreContext,
super.init(stateStoreContext, root);
}

@SuppressWarnings("deprecation")
private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// flushSensor is deprecated per KIP-1035 and will be removed in the next major release.
// Here we just register the sensor without recording
StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
commitSensor = StateStoreMetrics.commitSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
removeSensor = StateStoreMetrics.removeSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
Expand Down Expand Up @@ -430,7 +434,7 @@ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K keyFrom,

@Override
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
maybeMeasureLatency(() -> super.commit(changelogOffsets), time, flushSensor);
maybeMeasureLatency(() -> super.commit(changelogOffsets), time, commitSensor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class MeteredWindowStore<K, V>
protected StreamsMetricsImpl streamsMetrics;
protected Sensor putSensor;
protected Sensor fetchSensor;
private Sensor flushSensor;
private Sensor commitSensor;
private Sensor e2eLatencySensor;
protected Sensor iteratorDurationSensor;
protected InternalProcessorContext<?, ?> internalContext;
Expand Down Expand Up @@ -141,10 +141,14 @@ protected Serde<V> prepareValueSerde(final Serde<V> valueSerde, final SerdeGette
return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
}

@SuppressWarnings("deprecation")
private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// flushSensor is deprecated per KIP-1035 and will be removed in the next major release.
// Here we just register the sensor without recording
StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
commitSensor = StateStoreMetrics.commitSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics);
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
Expand Down Expand Up @@ -384,7 +388,7 @@ public KeyValueIterator<Windowed<K>, V> backwardAll() {

@Override
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
maybeMeasureLatency(() -> super.commit(changelogOffsets), time, flushSensor);
maybeMeasureLatency(() -> super.commit(changelogOffsets), time, commitSensor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ private StateStoreMetrics() {}
private static final String FLUSH_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + FLUSH_DESCRIPTION;
private static final String FLUSH_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + FLUSH_DESCRIPTION;

private static final String COMMIT = "commit";
private static final String COMMIT_DESCRIPTION = "calls to commit";
private static final String COMMIT_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String COMMIT_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION;
private static final String COMMIT_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION;

private static final String DELETE = "delete";
private static final String DELETE_DESCRIPTION = "calls to delete";
private static final String DELETE_RATE_DESCRIPTION =
Expand Down Expand Up @@ -309,6 +316,10 @@ public static Sensor prefixScanSensor(final String taskId,
return sensor;
}

/**
* @deprecated since 4.3. Use {@link #commitSensor(String, String, String, StreamsMetricsImpl)} instead.
*/
@Deprecated
public static Sensor flushSensor(final String taskId,
final String storeType,
final String storeName,
Expand All @@ -326,6 +337,23 @@ public static Sensor flushSensor(final String taskId,
);
}

public static Sensor commitSensor(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
return throughputAndLatencySensor(
taskId,
storeType,
storeName,
COMMIT,
COMMIT_RATE_DESCRIPTION,
COMMIT_AVG_LATENCY_DESCRIPTION,
COMMIT_MAX_LATENCY_DESCRIPTION,
RecordingLevel.DEBUG,
streamsMetrics
);
}

public static Sensor deleteSensor(final String taskId,
final String storeType,
final String storeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ public void shouldFlushInnerWhenCommitTimeRecords() {

metered.commit(Map.of());

final KafkaMetric metric = metric("flush-rate");
assertTrue((Double) metric.metricValue() > 0);
final KafkaMetric commitMetric = metric("commit-rate");
assertTrue((Double) commitMetric.metricValue() > 0);
}

private interface CachedKeyValueStore extends KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ public void shouldCommitInnerWhenCommitTimeRecords() {

metered.commit(Map.of());

final KafkaMetric metric = metric("flush-rate");
assertTrue((Double) metric.metricValue() > 0);
final KafkaMetric commitMetric = metric("commit-rate");
assertTrue((Double) commitMetric.metricValue() > 0);
}

private interface CachedKeyValueStore extends KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ public void shouldCommitInnerWhenCommitTimeRecords() {

metered.commit(Map.of());

final KafkaMetric metric = metric("flush-rate");
assertTrue((Double) metric.metricValue() > 0);
final KafkaMetric commitMetric = metric("commit-rate");
assertTrue((Double) commitMetric.metricValue() > 0);
}

private interface CachedKeyValueStore extends KeyValueStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void shouldDelegateAndRecordMetricsOnCommit() {
store.commit(Map.of());

verify(inner).commit(Map.of());
assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0));
assertThat((Double) getMetric("commit-rate").metricValue(), greaterThan(0.0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,8 @@ public void shouldRecordCommitLatency() {
store.init(context, store);
store.commit(Map.of());

// it suffices to verify one flush metric since all flush metrics are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("flush-rate");
assertTrue((Double) metric.metricValue() > 0);
final KafkaMetric commitMetric = metric("commit-rate");
assertTrue((Double) commitMetric.metricValue() > 0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,23 @@ public void shouldGetFlushSensor() {
);
}

@Test
public void shouldGetCommitSensor() {
final String metricName = "commit";
final String descriptionOfRate = "The average number of calls to commit per second";
final String descriptionOfAvg = "The average latency of calls to commit";
final String descriptionOfMax = "The maximum latency of calls to commit";
setupStreamsMetrics(metricName);

getAndVerifySensor(
() -> StateStoreMetrics.commitSensor(TASK_ID, STORE_TYPE, STORE_NAME, streamsMetrics),
metricName,
descriptionOfAvg,
descriptionOfMax,
descriptionOfRate
);
}

@Test
public void shouldGetRemoveSensor() {
final String metricName = "remove";
Expand Down
Loading