diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index cb48fea42fca89..c7f090f91c438c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -793,8 +793,15 @@ && hasActiveBlockDataToUpload()) { // there is */ private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException { // writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush - uploadBlockAsync(getBlockManager().getActiveBlock(), - true, isClose); + try { + uploadBlockAsync(getBlockManager().getActiveBlock(), + true, isClose); + } finally { + if (getBlockManager().hasActiveBlock()) { + // The block has been consumed by upload; new writes need a new block. + getBlockManager().clearActiveBlock(); + } + } waitForAppendsToComplete(); shrinkWriteOperationQueue(); maybeThrowLastError(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index ec080369e19373..0f5bdca1d583a1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -59,6 +59,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE; public final class TestAbfsOutputStream { @@ -81,6 +82,23 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( TracingContext tracingContext, ExecutorService executorService) throws IOException, IllegalAccessException { + return populateAbfsOutputStreamContext(writeBufferSize, isFlushEnabled, + disableOutputStreamFlush, isAppendBlob, isExpectHeaderEnabled, + clientHandler, path, tracingContext, executorService, false); + } + + private AbfsOutputStreamContext populateAbfsOutputStreamContext( + int writeBufferSize, + boolean isFlushEnabled, + boolean disableOutputStreamFlush, + boolean isAppendBlob, + boolean isExpectHeaderEnabled, + AbfsClientHandler clientHandler, + String path, + TracingContext tracingContext, + ExecutorService executorService, + boolean enableSmallWriteOptimization) throws IOException, + IllegalAccessException { AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(), accountName1); String blockFactoryName = @@ -95,6 +113,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( .withWriteBufferSize(writeBufferSize) .enableExpectHeader(isExpectHeaderEnabled) .enableFlush(isFlushEnabled) + .enableSmallWriteOptimization(enableSmallWriteOptimization) .disableOutputStreamFlush(disableOutputStreamFlush) .withStreamStatistics(new AbfsOutputStreamStatisticsImpl()) .withAppendBlob(isAppendBlob) @@ -108,6 +127,48 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( .build(); } + @Test + public void verifySmallWriteOptimizedHFlushFollowedByClose() throws Exception { + AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); + AbfsClientHandler clientHandler = mock(AbfsClientHandler.class); + AbfsDfsClient client = mock(AbfsDfsClient.class); + AbfsRestOperation op = mock(AbfsRestOperation.class); + final Configuration conf = new Configuration(); + conf.set(accountKey1, accountValue1); + AbfsConfiguration abfsConf = new AbfsConfiguration(conf, accountName1); + AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); + when(client.getAbfsPerfTracker()).thenReturn(tracker); + when(client.getAbfsConfiguration()).thenReturn(abfsConf); + when(client.getAbfsCounters()).thenReturn(abfsCounters); + when(client.append(anyString(), any(byte[].class), + any(AppendRequestParameters.class), any(), any(), + any(TracingContext.class))).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), + isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op); + when(clientHandler.getClient(any())).thenReturn(client); + when(clientHandler.getDfsClient()).thenReturn(client); + + AbfsOutputStream out = Mockito.spy(new AbfsOutputStream( + populateAbfsOutputStreamContext( + BUFFER_SIZE, true, false, false, true, clientHandler, PATH, + new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", + FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), null), + createExecutorService(abfsConf), true))); + when(out.getClient()).thenReturn(client); + when(out.getMd5()).thenReturn(null); + + out.write(new byte[WRITE_SIZE]); + out.hflush(); + out.close(); + + AppendRequestParameters reqParameters = new AppendRequestParameters( + 0, 0, WRITE_SIZE, FLUSH_MODE, false, null, true, null); + verify(client, times(1)).append(eq(PATH), any(byte[].class), + refEq(reqParameters), any(), any(), any(TracingContext.class)); + verify(client, times(1)).append(eq(PATH), any(byte[].class), any(), + any(), any(), any(TracingContext.class)); + } + /** * The test verifies OutputStream shortwrite case(2000bytes write followed by flush, hflush, hsync) is making correct HTTP calls to the server */