Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,11 @@ public RouteSegmentedConnPool(
this.timeouts = new ScheduledThreadPoolExecutor(1, tf);
this.timeouts.setRemoveOnCancelPolicy(true);

// Asynchronous disposer for slow GRACEFUL closes.
// Asynchronous disposer for slow GRACEFUL closes. Capped at 32 so very
// many-core boxes do not over-provision; threads time out when idle.
final int cores = Math.max(2, Runtime.getRuntime().availableProcessors());
final int nThreads = Math.min(8, cores);
final int qsize = 1024;
final int nThreads = Math.min(32, Math.max(8, cores * 2));
final int qsize = 4096;

final ThreadFactory df = r -> {
final Thread t = new Thread(r, "seg-pool-disposer");
Expand All @@ -184,10 +185,11 @@ public RouteSegmentedConnPool(
};
this.disposer = new ThreadPoolExecutor(
nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
30_000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(qsize),
df,
new ThreadPoolExecutor.AbortPolicy());
this.disposer.allowCoreThreadTimeOut(true);
}

final class Segment {
Expand Down Expand Up @@ -293,19 +295,21 @@ public Future<PoolEntry<R, C>> lease(
// Late hit after enqueuing
final PoolEntry<R, C> late = pollAvailable(seg, state);
if (late != null) {
if (seg.waiters.remove(w)) {
if (w.complete(late)) {
cancelTimeout(w);
fireOnLease(route);
if (callback != null) {
callback.completed(late);
}
w.complete(late);
dequeueIfDrained(seg);
return w;
} else {
boolean handedOff = false;
for (Waiter other; (other = seg.waiters.pollFirst()) != null; ) {
if (!other.cancelled && compatible(other.state, late.getState())) {
if (other.cancelled || other.isDone()) {
continue;
}
if (compatible(other.state, late.getState())) {
cancelTimeout(other);
handedOff = other.complete(late);
if (handedOff) {
Expand Down Expand Up @@ -361,8 +365,10 @@ public void release(final PoolEntry<R, C> entry, final boolean reusable) {
if (stillValid) {
if (!handOffToCompatibleWaiter(entry, seg)) {
offerAvailable(seg, entry);
enqueueIfNeeded(route, seg);
triggerDrainIfMany();
if (!seg.waiters.isEmpty()) {
enqueueIfNeeded(route, seg);
triggerDrainIfMany();
}
}
} else {
discardAndDecr(entry, CloseMode.GRACEFUL);
Expand Down Expand Up @@ -612,7 +618,7 @@ private boolean compatible(final Object needed, final Object have) {
}

private boolean handOffToCompatibleWaiter(final PoolEntry<R, C> entry, final Segment seg) {
final Deque<Waiter> skipped = new ArrayDeque<>();
Deque<Waiter> skipped = null;
boolean handedOff = false;

for (; ; ) {
Expand All @@ -632,12 +638,17 @@ private boolean handOffToCompatibleWaiter(final PoolEntry<R, C> entry, final Seg
break;
}
} else {
if (skipped == null) {
skipped = new ArrayDeque<>();
}
skipped.addLast(w);
}
}

while (!skipped.isEmpty()) {
seg.waiters.addFirst(skipped.pollLast());
if (skipped != null) {
while (!skipped.isEmpty()) {
seg.waiters.addFirst(skipped.pollLast());
}
}
return handedOff;
}
Expand Down
Loading