Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -793,8 +793,15 @@ && hasActiveBlockDataToUpload()) { // there is
*/
private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException {
// writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush
uploadBlockAsync(getBlockManager().getActiveBlock(),
true, isClose);
try {
uploadBlockAsync(getBlockManager().getActiveBlock(),
true, isClose);
} finally {
if (getBlockManager().hasActiveBlock()) {
// The block has been consumed by upload; new writes need a new block.
getBlockManager().clearActiveBlock();
}
}
waitForAppendsToComplete();
shrinkWriteOperationQueue();
maybeThrowLastError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;

public final class TestAbfsOutputStream {

Expand All @@ -81,6 +82,23 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
TracingContext tracingContext,
ExecutorService executorService) throws IOException,
IllegalAccessException {
return populateAbfsOutputStreamContext(writeBufferSize, isFlushEnabled,
disableOutputStreamFlush, isAppendBlob, isExpectHeaderEnabled,
clientHandler, path, tracingContext, executorService, false);
}

private AbfsOutputStreamContext populateAbfsOutputStreamContext(
int writeBufferSize,
boolean isFlushEnabled,
boolean disableOutputStreamFlush,
boolean isAppendBlob,
boolean isExpectHeaderEnabled,
AbfsClientHandler clientHandler,
String path,
TracingContext tracingContext,
ExecutorService executorService,
boolean enableSmallWriteOptimization) throws IOException,
IllegalAccessException {
AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(),
accountName1);
String blockFactoryName =
Expand All @@ -95,6 +113,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withWriteBufferSize(writeBufferSize)
.enableExpectHeader(isExpectHeaderEnabled)
.enableFlush(isFlushEnabled)
.enableSmallWriteOptimization(enableSmallWriteOptimization)
.disableOutputStreamFlush(disableOutputStreamFlush)
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
Expand All @@ -108,6 +127,48 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.build();
}

@Test
public void verifySmallWriteOptimizedHFlushFollowedByClose() throws Exception {
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
AbfsConfiguration abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.getAbfsConfiguration()).thenReturn(abfsConf);
when(client.getAbfsCounters()).thenReturn(abfsCounters);
when(client.append(anyString(), any(byte[].class),
any(AppendRequestParameters.class), any(), any(),
any(TracingContext.class))).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);

AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE, true, false, false, true, clientHandler, PATH,
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), null),
createExecutorService(abfsConf), true)));
when(out.getClient()).thenReturn(client);
when(out.getMd5()).thenReturn(null);

out.write(new byte[WRITE_SIZE]);
out.hflush();
out.close();

AppendRequestParameters reqParameters = new AppendRequestParameters(
0, 0, WRITE_SIZE, FLUSH_MODE, false, null, true, null);
verify(client, times(1)).append(eq(PATH), any(byte[].class),
refEq(reqParameters), any(), any(), any(TracingContext.class));
verify(client, times(1)).append(eq(PATH), any(byte[].class), any(),
any(), any(), any(TracingContext.class));
}

/**
* The test verifies OutputStream shortwrite case(2000bytes write followed by flush, hflush, hsync) is making correct HTTP calls to the server
*/
Expand Down
Loading