diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 65e17a6920b9..87268c6ec878 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -38,6 +38,8 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.ExpMovingAverage; +import org.apache.cassandra.utils.MovingAverage; import static org.apache.cassandra.db.RepairedDataInfo.NO_OP_REPAIRED_DATA_INFO; @@ -224,6 +226,9 @@ public boolean isDigestResponse() // built on the owning node responding to a query private static class LocalDataResponse extends DataResponse { + // Moving average of response sizes, used to set initial size of output buffer. + private static final MovingAverage estimatedResponseBytes = ExpMovingAverage.decayBy1000(); + private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command, RepairedDataInfo rdi) { super(build(iter, command.columnFilter()), @@ -239,10 +244,17 @@ private LocalDataResponse(UnfilteredPartitionIterator iter, ColumnFilter selecti private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection) { - try (DataOutputBuffer buffer = new DataOutputBuffer()) + // Size output buffer to 10% above the moving average to absorb minor variance and limit rebuffering. + double estimatedResponseSize = estimatedResponseBytes.get(); + double bufferSizeEstimate = Double.isNaN(estimatedResponseSize) ? 128 : estimatedResponseSize; + int initialBufferSize = (int) (bufferSizeEstimate * 1.1); + + try (DataOutputBuffer buffer = new DataOutputBuffer(initialBufferSize)) { UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, selection, buffer, MessagingService.current_version); - return buffer.buffer(); + estimatedResponseBytes.update(buffer.position()); + + return buffer.buffer(false); } catch (IOException e) {