Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
### Documentation

### Internal Changes
* Implement dynamic auth token stale period based on initial token lifetime. Increased up to 20 mins for standard OAuth with proportionally shorter periods for short-lived tokens. Manually setting the stale period using the CachedTokeSource builder reverts the behaviour to the legacy fixed stale duration.

### API Changes
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ private enum TokenState {
// Default duration before expiry to consider a token as 'stale'. This value is chosen to cover
// the maximum monthly downtime allowed by a 99.99% uptime SLA (~4.38 minutes).
private static final Duration DEFAULT_STALE_DURATION = Duration.ofMinutes(5);
// The maximum stale duration that can be achieved before expiry to consider a token as 'stale'
// when using the dynamic stale duration method. This value is chosen to cover the maximum
// monthly downtime allowed by a 99.99% uptime SLA (~4.38 minutes) while increasing the likelihood
// that the token is refreshed asynchronously if the auth server is down.
private static final Duration MAX_STALE_DURATION = Duration.ofMinutes(20);
// Default additional buffer before expiry to consider a token as expired.
// This is 40 seconds by default since Azure Databricks rejects tokens that are within 30 seconds
// of expiry.
Expand All @@ -42,8 +47,12 @@ private enum TokenState {
private final TokenSource tokenSource;
// Whether asynchronous refresh is enabled.
private boolean asyncDisabled = false;
// Duration before expiry to consider a token as 'stale'.
private final Duration staleDuration;
// The legacy duration before expiry to consider a token as 'stale'.
private final Duration staticStaleDuration;
// Whether to use the dynamic stale duration computation or defer to the legacy duration.
private final boolean useDynamicStaleDuration;
// The dynamically computed duration before expiry to consider a token as 'stale'.
private volatile Duration dynamicStaleDuration;
// Additional buffer before expiry to consider a token as expired.
private final Duration expiryBuffer;
// Clock supplier for current time.
Expand All @@ -59,10 +68,17 @@ private enum TokenState {
private CachedTokenSource(Builder builder) {
this.tokenSource = builder.tokenSource;
this.asyncDisabled = builder.asyncDisabled;
this.staleDuration = builder.staleDuration;
this.staticStaleDuration = builder.staleDuration;
this.useDynamicStaleDuration = builder.useDynamicStaleDuration;
this.expiryBuffer = builder.expiryBuffer;
this.clockSupplier = builder.clockSupplier;
this.token = builder.token;

if (this.useDynamicStaleDuration && this.token != null) {
this.dynamicStaleDuration = computeStaleDuration(this.token);
} else {
this.dynamicStaleDuration = Duration.ofMinutes(0);
}
}

/**
Expand All @@ -75,6 +91,7 @@ public static class Builder {
private final TokenSource tokenSource;
private boolean asyncDisabled = false;
private Duration staleDuration = DEFAULT_STALE_DURATION;
private boolean useDynamicStaleDuration = true;
private Duration expiryBuffer = DEFAULT_EXPIRY_BUFFER;
private ClockSupplier clockSupplier = new UtcClockSupplier();
private Token token;
Expand Down Expand Up @@ -130,6 +147,7 @@ public Builder setAsyncDisabled(boolean asyncDisabled) {
*/
public Builder setStaleDuration(Duration staleDuration) {
this.staleDuration = staleDuration;
this.useDynamicStaleDuration = false;
return this;
}

Expand Down Expand Up @@ -188,6 +206,21 @@ public Token getToken() {
return getTokenAsync();
}

private Duration computeStaleDuration(Token t) {
if (t.getExpiry() == null) {
return Duration.ZERO; // Tokens with no expiry are considered permanent.
}

Duration ttl = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry());

if (ttl.compareTo(Duration.ZERO) <= 0) {
return Duration.ZERO;
}

Duration halfTtl = ttl.dividedBy(2);
return halfTtl.compareTo(MAX_STALE_DURATION) > 0 ? MAX_STALE_DURATION : halfTtl;
}

/**
* Determine the state of the current token (fresh, stale, or expired).
*
Expand All @@ -197,10 +230,15 @@ protected TokenState getTokenState(Token t) {
if (t == null) {
return TokenState.EXPIRED;
}
if (t.getExpiry() == null) {
return TokenState.FRESH; // Tokens with no expiry are considered permanent.
}

Duration lifeTime = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry());
if (lifeTime.compareTo(expiryBuffer) <= 0) {
return TokenState.EXPIRED;
}
Duration staleDuration = useDynamicStaleDuration ? dynamicStaleDuration : staticStaleDuration;
if (lifeTime.compareTo(staleDuration) <= 0) {
return TokenState.STALE;
}
Expand Down Expand Up @@ -228,13 +266,22 @@ protected Token getTokenBlocking() {
return token;
}
lastRefreshSucceeded = false;
Token newToken;
try {
token = tokenSource.getToken();
newToken = tokenSource.getToken();
} catch (Exception e) {
logger.error("Failed to refresh token synchronously", e);
throw e;
}
lastRefreshSucceeded = true;

// Write dynamicStaleDuration before publishing the new token via the volatile write,
// so unsynchronized readers that see the new token are guaranteed to also see the
// updated dynamicStaleDuration.
if (useDynamicStaleDuration && newToken != null) {
dynamicStaleDuration = computeStaleDuration(newToken);
}
token = newToken;
return token;
}
}
Expand Down Expand Up @@ -279,6 +326,12 @@ private synchronized void triggerAsyncRefresh() {
// Attempt to refresh the token in the background.
Token newToken = tokenSource.getToken();
synchronized (this) {
// Write dynamicStaleDuration before publishing the new token via the volatile
// write, so unsynchronized readers that see the new token are guaranteed to also
// see the updated dynamicStaleDuration.
if (useDynamicStaleDuration && newToken != null) {
dynamicStaleDuration = computeStaleDuration(newToken);
}
token = newToken;
refreshInProgress = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,81 @@ public class CachedTokenSourceTest {
private static final String TOKEN_TYPE = "Bearer";
private static final String INITIAL_TOKEN = "initial-token";
private static final String REFRESH_TOKEN = "refreshed-token";

private static final long FRESH_MINUTES = 10;
private static final long STALE_MINUTES = 1;

// Token TTL for the stale scenario: 4 minutes.
// dynamicStaleDuration = min(4/2, 20) = 2 min.
// After advancing the clock by STALE_ADVANCE_MINUTES = 3, lifeTime = 1 min.
// 1 min ≤ 2 min (stale) and 1 min > 40s (not expired) → STALE.
private static final long STALE_MINUTES = 4;
private static final long STALE_ADVANCE_MINUTES = 3;

// Token TTL for the capped stale duration scenario: 60 minutes.
// dynamicStaleDuration = min(60/2, 20) = 20 min (MAX_STALE_DURATION cap).
// After advancing the clock by CAPPED_STALE_ADVANCE_MINUTES = 41, lifeTime = 19 min.
// 19 min ≤ 20 min (stale) and 19 min > 40s (not expired) → STALE.
private static final long CAPPED_STALE_MINUTES = 60;
private static final long CAPPED_STALE_ADVANCE_MINUTES = 41;

private static final long EXPIRED_MINUTES = -1;

private static Stream<Arguments> provideAsyncRefreshScenarios() {
return Stream.of(
Arguments.of("Fresh token, async enabled", FRESH_MINUTES, false, false, INITIAL_TOKEN),
Arguments.of("Stale token, async enabled", STALE_MINUTES, false, true, INITIAL_TOKEN),
Arguments.of("Expired token, async enabled", EXPIRED_MINUTES, false, true, REFRESH_TOKEN),
Arguments.of("Fresh token, async disabled", FRESH_MINUTES, true, false, INITIAL_TOKEN),
Arguments.of("Stale token, async disabled", STALE_MINUTES, true, false, INITIAL_TOKEN),
Arguments.of("Expired token, async disabled", EXPIRED_MINUTES, true, true, REFRESH_TOKEN));
Arguments.of("Fresh token, async enabled", FRESH_MINUTES, 0L, false, false, INITIAL_TOKEN),
Arguments.of(
"Stale token, async enabled",
STALE_MINUTES,
STALE_ADVANCE_MINUTES,
false,
true,
INITIAL_TOKEN),
Arguments.of(
"Expired token, async enabled", EXPIRED_MINUTES, 0L, false, true, REFRESH_TOKEN),
Arguments.of("Fresh token, async disabled", FRESH_MINUTES, 0L, true, false, INITIAL_TOKEN),
Arguments.of(
"Stale token, async disabled",
STALE_MINUTES,
STALE_ADVANCE_MINUTES,
true,
false,
INITIAL_TOKEN),
Arguments.of(
"Stale token, capped stale duration, async enabled",
CAPPED_STALE_MINUTES,
CAPPED_STALE_ADVANCE_MINUTES,
false,
true,
INITIAL_TOKEN),
Arguments.of(
"Expired token, async disabled", EXPIRED_MINUTES, 0L, true, true, REFRESH_TOKEN));
}

@ParameterizedTest(name = "{0}")
@MethodSource("provideAsyncRefreshScenarios")
void testAsyncRefreshParametrized(
String testName,
long minutesUntilExpiry,
long clockAdvanceMinutes,
boolean asyncDisabled,
boolean expectRefresh,
String expectedToken)
throws Exception {

TestClockSupplier clockSupplier = new TestClockSupplier(Instant.now());

Token initialToken =
new Token(
INITIAL_TOKEN,
TOKEN_TYPE,
null,
Instant.now().plus(Duration.ofMinutes(minutesUntilExpiry)));
Instant.now(clockSupplier.getClock()).plus(Duration.ofMinutes(minutesUntilExpiry)));
Token refreshedToken =
new Token(REFRESH_TOKEN, TOKEN_TYPE, null, Instant.now().plus(Duration.ofMinutes(10)));
new Token(
REFRESH_TOKEN,
TOKEN_TYPE,
null,
Instant.now(clockSupplier.getClock()).plus(Duration.ofMinutes(10)));
CountDownLatch refreshCalled = new CountDownLatch(1);

TokenSource tokenSource =
Expand All @@ -69,8 +112,12 @@ public Token getToken() {
new CachedTokenSource.Builder(tokenSource)
.setAsyncDisabled(asyncDisabled)
.setToken(initialToken)
.setClockSupplier(clockSupplier)
.build();

// Advance the clock to put the token in the expected state before calling getToken().
clockSupplier.advanceTime(Duration.ofMinutes(clockAdvanceMinutes));

Token token = source.getToken();

boolean refreshed = refreshCalled.await(1, TimeUnit.SECONDS);
Expand All @@ -90,13 +137,13 @@ void testAsyncRefreshFailureFallback() throws Exception {
// Create a mutable clock supplier that we can control
TestClockSupplier clockSupplier = new TestClockSupplier(Instant.now());

// Create a token that will be stale (2 minutes until expiry)
// Create a token with a TTL of 4 minutes that will be stale in 3 minutes.
Token staleToken =
new Token(
INITIAL_TOKEN,
TOKEN_TYPE,
null,
Instant.now(clockSupplier.getClock()).plus(Duration.ofMinutes(2)));
Instant.now(clockSupplier.getClock()).plus(Duration.ofMinutes(4)));

class TestSource implements TokenSource {
int refreshCallCount = 0;
Expand Down Expand Up @@ -132,6 +179,9 @@ public Token getToken() {
.setClockSupplier(clockSupplier)
.build();

// Advance clock to put the token in the stale window.
clockSupplier.advanceTime(Duration.ofMinutes(3));

// First call triggers async refresh, which fails
// Should return stale token immediately (async refresh)
Token token = source.getToken();
Expand Down
Loading