Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ public void flush() {
public boolean finishWrite(long length) {
lock.lock();
try {
State currentState = state.getState();
if (currentState == State.TERMINAL_SUCCESS || currentState == State.TERMINAL_ERROR) {
return true;
}
// if we're already finalizing, ack rather than enqueueing again
if (state.isFinalizing() && state.getTotalSentBytes() == length) {
return true;
Expand Down Expand Up @@ -192,6 +196,11 @@ public boolean closeStream(long length) {
lock.lock();
try {

State currentState = state.getState();
if (currentState == State.TERMINAL_SUCCESS || currentState == State.TERMINAL_ERROR) {
return true;
}

boolean offer = state.finalFlush(length);
if (offer) {
internalSend();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,11 @@ boolean isFinalizing() {
return false;
}

@Override
State getState() {
return State.RUNNING;
}

@Override
long getTotalSentBytes() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,9 @@ static class DirectWriteService extends StorageImplBase {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectWriteService.class);
private final BiConsumer<StreamObserver<WriteObjectResponse>, List<WriteObjectRequest>> c;

private ImmutableList.Builder<WriteObjectRequest> requests;

DirectWriteService(
BiConsumer<StreamObserver<WriteObjectResponse>, List<WriteObjectRequest>> c) {
this.c = c;
this.requests = new ImmutableList.Builder<>();
}

DirectWriteService(ImmutableMap<List<WriteObjectRequest>, WriteObjectResponse> writes) {
Expand Down Expand Up @@ -420,6 +417,9 @@ private static void logUnexpectedRequest(
@Override
public StreamObserver<WriteObjectRequest> writeObject(StreamObserver<WriteObjectResponse> obs) {
return new Adapter() {
private final ImmutableList.Builder<WriteObjectRequest> requests =
new ImmutableList.Builder<>();

@Override
public void onNext(WriteObjectRequest value) {
requests.add(value);
Expand All @@ -432,7 +432,6 @@ public void onError(Throwable t) {}
public void onCompleted() {
ImmutableList<WriteObjectRequest> build = requests.build();
c.accept(obs, build);
requests = new ImmutableList.Builder<>();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public void start() {
dockerImage,
"gunicorn",
"--bind=0.0.0.0:9000",
"--worker-class=sync",
"--worker-class=gevent",
"--threads=10",
"--access-logfile=-",
"--keep-alive=0",
Expand Down Expand Up @@ -442,8 +442,8 @@ public String toString() {
}

static final class Builder {
private static final String DEFAULT_BASE_URI = "http://localhost:9000";
private static final String DEFAULT_GRPC_BASE_URI = "http://localhost:9005";
private static final String DEFAULT_BASE_URI = "http://127.0.0.1:9000";
private static final String DEFAULT_GRPC_BASE_URI = "http://127.0.0.1:9005";
private static final String DEFAULT_IMAGE_NAME;
private static final String DEFAULT_IMAGE_TAG;

Expand Down
Loading