From 1f5c87c60c021fbb771259ae74275621ae30389d Mon Sep 17 00:00:00 2001 From: "C. Scott Andreas" Date: Sat, 4 Apr 2026 17:34:06 -0700 Subject: [PATCH 1/2] Reduce allocations + copies due to rebuffering in LocalDataResponse row serialization --- src/java/org/apache/cassandra/db/ReadResponse.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 65e17a6920b9..52b41993e1f7 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -38,6 +38,8 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.ExpMovingAverage; +import org.apache.cassandra.utils.MovingAverage; import static org.apache.cassandra.db.RepairedDataInfo.NO_OP_REPAIRED_DATA_INFO; @@ -224,6 +226,9 @@ public boolean isDigestResponse() // built on the owning node responding to a query private static class LocalDataResponse extends DataResponse { + // Moving average of response sizes, used to set initial size of output buffer. + private static final MovingAverage estimatedResponseBytes = ExpMovingAverage.decayBy1000(); + private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command, RepairedDataInfo rdi) { super(build(iter, command.columnFilter()), @@ -239,9 +244,15 @@ private LocalDataResponse(UnfilteredPartitionIterator iter, ColumnFilter selecti private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection) { - try (DataOutputBuffer buffer = new DataOutputBuffer()) + // Size output buffer to 10% above the moving average to absorb minor variance and limit rebuffering. + double bufferSizeEstimate = Double.isNaN(estimatedResponseBytes.get()) ? 128 : estimatedResponseBytes.get(); + int initialBufferSize = (int) (bufferSizeEstimate * 1.1); + + try (DataOutputBuffer buffer = new DataOutputBuffer(initialBufferSize)) { UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, selection, buffer, MessagingService.current_version); + estimatedResponseBytes.update(buffer.position()); + return buffer.buffer(); } catch (IOException e) From df95df14e358e6d7d44420d882a96922530c9eb5 Mon Sep 17 00:00:00 2001 From: "C. Scott Andreas" Date: Sun, 5 Apr 2026 10:51:02 -0700 Subject: [PATCH 2/2] Eliminate duplicate call to estimatedResponseBytes.get(); return output buffer without duplicating. --- src/java/org/apache/cassandra/db/ReadResponse.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 52b41993e1f7..87268c6ec878 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -245,7 +245,8 @@ private LocalDataResponse(UnfilteredPartitionIterator iter, ColumnFilter selecti private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection) { // Size output buffer to 10% above the moving average to absorb minor variance and limit rebuffering. - double bufferSizeEstimate = Double.isNaN(estimatedResponseBytes.get()) ? 128 : estimatedResponseBytes.get(); + double estimatedResponseSize = estimatedResponseBytes.get(); + double bufferSizeEstimate = Double.isNaN(estimatedResponseSize) ? 128 : estimatedResponseSize; int initialBufferSize = (int) (bufferSizeEstimate * 1.1); try (DataOutputBuffer buffer = new DataOutputBuffer(initialBufferSize)) @@ -253,7 +254,7 @@ private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter s UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, selection, buffer, MessagingService.current_version); estimatedResponseBytes.update(buffer.position()); - return buffer.buffer(); + return buffer.buffer(false); } catch (IOException e) {