Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 103 additions & 81 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
MongoErrorLabel,
MongoExpiredSessionError,
MongoInvalidArgumentError,
MongoOperationTimeoutError,
MongoRuntimeError,
MongoServerError,
MongoTransactionError,
Expand Down Expand Up @@ -725,7 +726,7 @@ export class ClientSession
timeoutMS?: number;
}
): Promise<T> {
const MAX_TIMEOUT = 120000;
const MAX_TIMEOUT = 120_000;

const timeoutMS = options?.timeoutMS ?? this.timeoutMS ?? null;
this.timeoutContext =
Expand All @@ -737,10 +738,24 @@ export class ClientSession
})
: null;

// 1. Record the current monotonic time, which will be used to enforce the 120-second timeout before later retry attempts.
const startTime = this.timeoutContext?.csotEnabled() // This is strictly to appease TS. We must narrow the context to a CSOT context before accessing `.start`.
? this.timeoutContext.start
: processTimeMS();
// 1. Define the following:
// 1.1 Record the current monotonic time, which will be used to enforce the 120-second / CSOT timeout before later retry attempts.
// 1.2 Set `transactionAttempt` to `0`.
// 1.3 Set `TIMEOUT_MS` to be `timeoutMS` if given, otherwise MAX_TIMEOUT (120-seconds).
//
// The spec describes timeout checks as "elapsed time < TIMEOUT_MS" (where elapsed = now - start).
// We precompute `deadline = start + TIMEOUT_MS` so each check becomes simply `now < deadline`.
//
// Timeout Error propagation mechanism
// When the TIMEOUT_MS (calculated in step 1.3) is reached we MUST report a timeout error wrapping the previously
// encountered error. If timeoutMS is set, then timeout error is a special type which is defined in CSOT
// specification, If timeoutMS is not set, then propagate it as timeout error if the language allows to expose the
// previously encountered error as a cause of a timeout error (see makeTimeoutError below in pseudo-code). If
// timeout error is thrown then it SHOULD copy all error label(s) from the previously encountered retriable error.
const csotEnabled = !!this.timeoutContext?.csotEnabled();
const deadline = this.timeoutContext?.csotEnabled()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: simplify this a bit by introducing a timeout variable

const remainingTimeMS = this.timeoutContext?.csotEnabled() ? this.timeoutContext.remainingTimeMS : MAX_TIMEOUT';
const deadline = processTimeMS() + remainingTimeMS;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see real difference here between 2 ternary conditions essentially:

const deadline = this.timeoutContext?.csotEnabled()
  ? processTimeMS() + this.timeoutContext.remainingTimeMS
  : processTimeMS() + MAX_TIMEOUT;

and

const remainingTimeMS = this.timeoutContext?.csotEnabled()
  ? this.timeoutContext.remainingTimeMS
  : MAX_TIMEOUT;
const deadline = processTimeMS() + remainingTimeMS;

I don't have strong opinion here as well, do you think the 2nd is simpler or easier to to read?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing remainingTimeMS clarifies that that's what is changing wrt CSOT ("depending on CSOT, we'll change how much time remains in this operation"). And we also end up calling processTimeMS() only once.

? processTimeMS() + this.timeoutContext.remainingTimeMS
: processTimeMS() + MAX_TIMEOUT;

let committed = false;
let result: T;
Expand All @@ -749,23 +764,23 @@ export class ClientSession

try {
retryTransaction: for (
// 2. Set `transactionAttempt` to `0`.
// 1.2 Set `transactionAttempt` to `0`.
let transactionAttempt = 0, isRetry = false;
!committed;
++transactionAttempt, isRetry = transactionAttempt > 0
) {
// 2. If `transactionAttempt` > 0:
if (isRetry) {
// 2.i If elapsed time + `backoffMS` > `TIMEOUT_MS`, then raise the previously encountered error. If the elapsed time of
// `withTransaction` is less than TIMEOUT_MS, calculate the backoffMS to be
// `jitter * min(BACKOFF_INITIAL * 1.5 ** (transactionAttempt - 1), BACKOFF_MAX)`. sleep for `backoffMS`.
// 2.i.i jitter is a random float between \[0, 1)
// 2.i.ii `transactionAttempt` is the variable defined in step 1.
// 2.i.iii `BACKOFF_INITIAL` is 5ms
// 2.i.iv `BACKOFF_MAX` is 500ms
// 2.1 If elapsed time + backoffMS > TIMEOUT_MS, then propagate the previously encountered
// error (see propagation section above). If the elapsed time of withTransaction is less
// than TIMEOUT_MS, calculate the backoffMS to be
// jitter * min(BACKOFF_INITIAL * 1.5 ** (transactionAttempt - 1), BACKOFF_MAX).
// sleep for backoffMS.
const BACKOFF_INITIAL_MS = 5;
const BACKOFF_MAX_MS = 500;
const BACKOFF_GROWTH = 1.5;
// 2.1.1 Jitter is a random float between [0, 1), optionally including 1, depending on what is most natural
// for the given driver language.
const jitter = Math.random();
const backoffMS =
jitter *
Expand All @@ -774,50 +789,50 @@ export class ClientSession
BACKOFF_MAX_MS
);

const willExceedTransactionDeadline =
(this.timeoutContext?.csotEnabled() &&
backoffMS > this.timeoutContext.remainingTimeMS) ||
processTimeMS() + backoffMS > startTime + MAX_TIMEOUT;

if (willExceedTransactionDeadline) {
throw (
if (processTimeMS() + backoffMS >= deadline) {
throw makeTimeoutError(
lastError ??
new MongoRuntimeError(
`Transaction retry did not record an error: should never occur. Please file a bug.`
)
new MongoRuntimeError(
`Transaction retry did not record an error: should never occur. Please file a bug.`
),
csotEnabled
);
}

await setTimeout(backoffMS);
}

// 3. Invoke startTransaction on the session
// 4. If `startTransaction` reported an error, propagate that error to the caller of `withTransaction` and return immediately.
// 3. Invoke startTransaction on the session and increment transactionAttempt. If TransactionOptions were
// specified in the call to withTransaction, those MUST be used for startTransaction. Note that
// ClientSession.defaultTransactionOptions will be used in the absence of any explicit TransactionOptions.
// 4. If startTransaction reported an error, propagate that error to the caller and return immediately.
this.startTransaction(options); // may throw on error

try {
// 5. Invoke the callback.
// 6. Control returns to withTransaction. (continued below)
// 5. Invoke the callback. Drivers MUST ensure that the ClientSession can be accessed within the callback
// (e.g. pass ClientSession as the first parameter, rely on lexical scoping). Drivers MAY pass additional
// parameters as needed (e.g. user data solicited by withTransaction).
const promise = fn(this);
if (!isPromiseLike(promise)) {
throw new MongoInvalidArgumentError(
'Function provided to `withTransaction` must return a Promise'
);
}

// 6. Control returns to withTransaction. Determine the current state of the ClientSession and whether the
// callback reported an error (e.g. thrown exception, error output parameter).
result = await promise;

// 6. (cont.) Determine the current state of the ClientSession (continued below)
// 8. If the ClientSession is in the "no transaction", "transaction aborted", or "transaction committed"
// state, assume the callback intentionally aborted or committed the transaction and return immediately.
// Drivers MAY allow the callback to return a value to be propagated as the return value of withTransaction.
if (
this.transaction.state === TxnState.NO_TRANSACTION ||
this.transaction.state === TxnState.TRANSACTION_COMMITTED ||
this.transaction.state === TxnState.TRANSACTION_ABORTED
) {
// 8. If the ClientSession is in the "no transaction", "transaction aborted", or "transaction committed" state,
// assume the callback intentionally aborted or committed the transaction and return immediately.
return result;
}
// 5. (cont.) and whether the callback reported an error
// 7. If the callback reported an error:
} catch (fnError) {
if (!(fnError instanceof MongoError) || fnError instanceof MongoInvalidArgumentError) {
Expand All @@ -827,83 +842,71 @@ export class ClientSession
throw fnError;
}

lastError = fnError;

// 7.1 If the ClientSession is in the "starting transaction" or "transaction in progress"
// state, invoke abortTransaction on the session.
if (
this.transaction.state === TxnState.STARTING_TRANSACTION ||
this.transaction.state === TxnState.TRANSACTION_IN_PROGRESS
) {
// 7.i If the ClientSession is in the "starting transaction" or "transaction in progress" state,
// invoke abortTransaction on the session
await this.abortTransaction();
}

if (
fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
(this.timeoutContext?.csotEnabled() || processTimeMS() - startTime < MAX_TIMEOUT)
) {
// 7.ii If the callback's error includes a "TransientTransactionError" label and the elapsed time of `withTransaction`
// is less than 120 seconds, jump back to step two.
lastError = fnError;
// 7.2 If the callback's error includes a "TransientTransactionError" label, jump back to step two.
if (fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
if (processTimeMS() >= deadline) {
throw makeTimeoutError(lastError, csotEnabled);
}
continue retryTransaction;
}

// 7.iii If the callback's error includes a "UnknownTransactionCommitResult" label, the callback must have manually committed a transaction,
// propagate the callback's error to the caller of withTransaction and return immediately.
// The 7.iii check is redundant with 6.iv, so we don't write code for it
// 7.iv Otherwise, propagate the callback's error to the caller of withTransaction and return immediately.
// 7.3 If the callback's error includes a "UnknownTransactionCommitResult" label, the callback
// must have manually committed a transaction, propagate the error and return immediately.
// (This check is redundant with step 8, so we don't write code for it.)
// 7.4 Otherwise, propagate the callback's error (see Note 1) and return immediately.
throw fnError;
}

// 9. Invoke commitTransaction on the session.
// We will rely on ClientSession.commitTransaction() to apply a majority write concern
// if commitTransaction is being retried (see: DRIVERS-601).
retryCommit: while (!committed) {
try {
/*
* We will rely on ClientSession.commitTransaction() to
* apply a majority write concern if commitTransaction is
* being retried (see: DRIVERS-601)
*/
// 9. Invoke commitTransaction on the session.
await this.commitTransaction();
committed = true;
// 10. If commitTransaction reported an error:
} catch (commitError) {
// If CSOT is enabled, we repeatedly retry until timeoutMS expires. This is enforced by providing a
// timeoutContext to each async API, which know how to cancel themselves (i.e., the next retry will
// abort the withTransaction call).
// If CSOT is not enabled, do we still have time remaining or have we timed out?
const hasTimedOut =
!this.timeoutContext?.csotEnabled() && processTimeMS() - startTime >= MAX_TIMEOUT;

if (!hasTimedOut) {
/*
* Note: a maxTimeMS error will have the MaxTimeMSExpired
* code (50) and can be reported as a top-level error or
* inside writeConcernError, ex.
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
*/
if (
!isMaxTimeMSExpiredError(commitError) &&
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)
) {
// 10.i If the `commitTransaction` error includes a "UnknownTransactionCommitResult" label and the error is not
// MaxTimeMSExpired and the elapsed time of `withTransaction` is less than 120 seconds, jump back to step eight.
continue retryCommit;
// 10. If commitTransaction reported an error:
lastError = commitError;

// 10.1 If the commitTransaction error includes a UnknownTransactionCommitResult label and the error is not MaxTimeMSExpired
if (
!isMaxTimeMSExpiredError(commitError) &&
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)
) {
// 10.1.1 If the elapsed time of withTransaction exceeded TIMEOUT_MS, propagate the commitTransaction error to the caller
// of withTransaction and return immediately (see propagation section above)
if (processTimeMS() >= deadline) {
throw makeTimeoutError(commitError, csotEnabled);
}
// 10.1.2 If the elapsed time of withTransaction is less than TIMEOUT_MS, jump back to step nine. We will trust
// commitTransaction to apply a majority write concern on retry attempts (see: Majority write concern is used
// when retrying commitTransaction).
continue retryCommit;
}

if (commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
// 10.ii If the commitTransaction error includes a "TransientTransactionError" label
// and the elapsed time of withTransaction is less than 120 seconds, jump back to step two.
lastError = commitError;

continue retryTransaction;
}
// 10.2 If the commitTransaction error includes a "TransientTransactionError" label, jump back to step two.
if (commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
continue retryTransaction;
}

// 10.iii Otherwise, propagate the commitTransaction error to the caller of withTransaction and return immediately.
// 10.3 Otherwise, propagate the commitTransaction error to the caller of withTransaction and return immediately.
throw commitError;
}
}
}

// 11. The transaction was committed successfully. Return immediately.
// @ts-expect-error Result is always defined if we reach here, the for-loop above convinces TS it is not.
return result;
} finally {
Expand All @@ -912,6 +915,25 @@ export class ClientSession
}
}

function makeTimeoutError(cause: Error, csotEnabled: boolean): Error {
// Async APIs know how to cancel themselves and might return CSOT error
if (cause instanceof MongoOperationTimeoutError) {
return cause;
}
if (csotEnabled) {
const timeoutError = new MongoOperationTimeoutError('Timed out during withTransaction', {
cause
});
if (cause instanceof MongoError) {
for (const label of cause.errorLabels) {
timeoutError.addErrorLabel(label);
}
}
return timeoutError;
}
return cause;
}

const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
'CannotSatisfyWriteConcern',
'UnknownReplWriteConcern',
Expand Down
Loading
Loading