diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java index 3c1eef933a38..4f23f86e92da 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java @@ -347,6 +347,15 @@ private void setAndClearChannel() { close(oldChannel); } + /** + * Force-closes the current cached channel so that the next subscriber receives a fresh one. + * This is used for connection-level recovery when the current connection is stale + * but the processor has not detected it (e.g., heartbeats echoed by intermediate infrastructure). + */ + public void forceCloseChannel() { + setAndClearChannel(); + } + /** * Checks the current state of the channel for this channel and returns true if the channel is null or if this * processor is disposed. diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java index d772cfdf0698..20aa2f90c7aa 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java @@ -48,6 +48,10 @@ public final class ReactorConnectionCache implement // any dependent type; instead, the dependent type must acquire Connection only through the cache route, // i.e., by subscribing to 'createOrGetCachedConnection' via 'get()' getter. private volatile T currentConnection; + // Holds the ID of the connection that forceCloseConnection() asked to force-invalidate. + // Only the connection whose getId() matches this value will be invalidated by cacheInvalidateIf; + // a freshly created connection with a different ID is never accidentally invalidated. + private final AtomicReference forceInvalidateConnectionId = new AtomicReference<>(null); private final State state = new State(); /** @@ -113,12 +117,23 @@ public ReactorConnectionCache(Supplier connectionSupplier, String fullyQualif } }).cacheInvalidateIf(c -> { if (c.isDisposed()) { + // Connection disposed for any reason. Clean up the force-invalidate marker if it + // was targeting this connection so it is not accidentally consumed by a future + // connection that happens to have the same ID. + forceInvalidateConnectionId.compareAndSet(c.getId(), null); withConnectionId(logger, c.getId()).log("The connection is closed, requesting a new connection."); return true; - } else { - // Emit cached connection. - return false; } + final String targetId = forceInvalidateConnectionId.get(); + if (targetId != null + && targetId.equals(c.getId()) + && forceInvalidateConnectionId.compareAndSet(targetId, null)) { + // forceCloseConnection() asked to invalidate exactly this connection. + withConnectionId(logger, c.getId()).log("Forcing connection close, requesting a new connection."); + return true; + } + // No forced invalidation targeted this connection — emit it from cache. + return false; }); } @@ -172,6 +187,37 @@ public boolean isCurrentConnectionClosed() { return (currentConnection != null && currentConnection.isDisposed()) || terminated; } + /** + * Closes the current cached connection (if any) so that the next {@link #get()} call creates + * a fresh connection. This is used for connection-level recovery when the current connection + * is in a stale state that the cache's normal error detection (via endpoint state signals) + * has not detected — for example, when intermediate infrastructure (load balancers, NAT gateways) + * is echoing AMQP heartbeats on behalf of a dead connection. + * + *

This is modeled after the Go SDK's {@code Namespace.Recover()} which explicitly closes + * the old connection and increments the connection revision.

+ * + *

This method is safe to call concurrently. If the connection is already closed or being + * closed, this is a no-op.

+ */ + public void forceCloseConnection() { + final T connection = currentConnection; + if (connection != null && !connection.isDisposed()) { + withConnectionId(logger, connection.getId()) + .log("Force-closing connection for recovery. Next get() will create a fresh connection."); + // Set forceInvalidate before starting async close so that cacheInvalidateIf immediately + // invalidates this connection on the next get() call, without blocking the caller + // while the AMQP close handshake completes. ReactorConnection.dispose() calls + // closeAsync().block(), which is illegal on a non-blocking Reactor thread. + forceInvalidateConnectionId.set(connection.getId()); + connection.closeAsync() + .subscribe(null, + error -> logger.atVerbose() + .addKeyValue(CONNECTION_ID_KEY, connection.getId()) + .log("Error during async connection force-close.", error)); + } + } + /** * Terminate so that consumers will no longer be able to request connection. If there is a current (cached) * connection then it will be closed. diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java new file mode 100644 index 000000000000..df3dc4c1d651 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java @@ -0,0 +1,168 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; + +import java.util.Locale; +import java.util.concurrent.TimeoutException; + +/** + * Classifies errors into recovery tiers, determining what resources should be closed + * between retry attempts. This follows the tiered recovery pattern used by the Go, .NET, + * Python, and JS Azure SDKs. + * + *
    + *
  • {@link #NONE} — Retry on the same link (server-busy, timeouts).
  • + *
  • {@link #LINK} — Close the send/receive link; next retry creates a fresh link on the same connection.
  • + *
  • {@link #CONNECTION} — Close the entire connection; next retry creates a fresh connection and link.
  • + *
  • {@link #FATAL} — Do not retry (unauthorized, not-found, message too large).
  • + *
+ */ +public enum RecoveryKind { + /** + * No recovery needed — retry on the same link and connection. + * Applies to: server-busy, timeouts, resource-limit-exceeded. + */ + NONE, + + /** + * Close the link (and its session) before retrying. The next retry creates a fresh link + * on the same connection. + * Applies to: link:detach-forced, link:stolen, transient AMQP errors on the link. + */ + LINK, + + /** + * Close the entire connection before retrying. The next retry creates a fresh connection, + * session, and link. + * Applies to: connection:forced, connection:framing-error, proton:io, internal-error. + */ + CONNECTION, + + /** + * Do not retry — the error is permanent. + * Applies to: unauthorized-access, not-found, message-size-exceeded. + */ + FATAL; + + /** + * Classifies the given error into a {@link RecoveryKind} that determines what resources + * should be invalidated between retry attempts. + * + * @param error The error to classify. + * @return The recovery kind for the given error. + */ + public static RecoveryKind classify(Throwable error) { + if (error == null) { + return NONE; + } + + // Timeouts — retry on same link, the link may still be healthy. + if (error instanceof TimeoutException) { + return NONE; + } + + if (error instanceof AmqpException) { + final AmqpException amqpError = (AmqpException) error; + final AmqpErrorCondition condition = amqpError.getErrorCondition(); + + if (condition != null) { + switch (condition) { + // Connection-level errors — close the entire connection. + case CONNECTION_FORCED: + case CONNECTION_FRAMING_ERROR: + case CONNECTION_REDIRECT: + case PROTON_IO: + case INTERNAL_ERROR: + return CONNECTION; + + // Link-level errors — close the link, keep the connection. + case LINK_DETACH_FORCED: + case LINK_STOLEN: + case LINK_REDIRECT: + case PARTITION_NOT_OWNED_ERROR: + case TRANSFER_LIMIT_EXCEEDED: + // operation-cancelled can signal "AMQP layer unexpectedly aborted or disconnected" + // (e.g. ReceiverUnsettledDeliveries remote Released outcome), requiring link recovery. + case OPERATION_CANCELLED: + return LINK; + + // Fatal errors — do not retry. + case NOT_FOUND: + case UNAUTHORIZED_ACCESS: + case LINK_PAYLOAD_SIZE_EXCEEDED: + case NOT_ALLOWED: + case NOT_IMPLEMENTED: + case ENTITY_DISABLED_ERROR: + case ENTITY_ALREADY_EXISTS: + case PUBLISHER_REVOKED_ERROR: + case ARGUMENT_ERROR: + case ARGUMENT_OUT_OF_RANGE_ERROR: + case ILLEGAL_STATE: + case MESSAGE_LOCK_LOST: + case STORE_LOCK_LOST_ERROR: + return FATAL; + + // Server-busy, timeouts, and resource-limit errors — retry on same link. + // RESOURCE_LIMIT_EXCEEDED is treated as transient here because ReactorSender + // groups it alongside SERVER_BUSY and TIMEOUT in its send-error retry logic. + case SERVER_BUSY_ERROR: + case TIMEOUT_ERROR: + case RESOURCE_LIMIT_EXCEEDED: + return NONE; + + // Session/lock errors — link-level recovery. + // Session lock loss means the session link is invalid and + // a fresh link must be acquired for a new session. + case SESSION_LOCK_LOST: + case SESSION_CANNOT_BE_LOCKED: + case SESSION_NOT_FOUND: + case MESSAGE_NOT_FOUND: + return LINK; + + default: + break; + } + } + + // Transient AMQP errors without a specific condition — link recovery. + if (amqpError.isTransient()) { + return LINK; + } + + // Non-transient AMQP errors without a recognized condition — fatal. + return FATAL; + } + + // RequestResponseChannelClosedException — link-level (parent connection disposing). + if (error instanceof RequestResponseChannelClosedException) { + return LINK; + } + + // IllegalStateException thrown by a disposed ReactorSender (e.g., "Cannot publish + // message when disposed." or "Cannot publish data batch when disposed."). This is + // a link-staleness signal: the link was closed (possibly by a concurrent recovery + // path) before the in-flight send could complete. LINK recovery creates a fresh + // link on the next retry. + // Match both "Cannot publish" and "disposed" to avoid misclassifying unrelated + // disposal signals (e.g., "Connection is disposed. Cannot get management instance."). + if (error instanceof IllegalStateException) { + final String msg = error.getMessage(); + if (msg != null) { + final String normalizedMsg = msg.toLowerCase(Locale.ROOT); + if (normalizedMsg.contains("cannot publish") && normalizedMsg.contains("disposed")) { + return LINK; + } + } + } + + // Unknown non-AMQP errors — treat as fatal (don't retry application or SDK bugs). + // The Go SDK defaults to CONNECTION for unknown errors, but those are AMQP-layer + // errors (io.EOF, net.Error). Java's non-AMQP exceptions (e.g., AzureException, + // RuntimeException) should fail fast rather than trigger connection recovery. + return FATAL; + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java index c22c8ac928b3..d71ea5b064a6 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java @@ -16,7 +16,10 @@ import java.time.Duration; import java.util.Locale; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; /** * Helper class to help with retry policies. @@ -106,6 +109,44 @@ public static Mono withRetry(Mono source, AmqpRetryOptions retryOption return withRetry(source, retryOptions, timeoutMessage, false); } + /** + * Applies the retry policy with tiered recovery between attempts. Before each retry, + * the error is classified via {@link RecoveryKind#classify(Throwable)} and the recovery + * callback is invoked so the caller can close the appropriate resources (link or connection). + * + *

This matches the tiered recovery pattern used by the Go, .NET, Python, and JS SDKs.

+ * + * @param Type of value in the {@link Mono}. + * @param source The publisher to apply the retry policy to. + * @param retryOptions A {@link AmqpRetryOptions}. + * @param errorMessage Text added to error logs. + * @param recoveryAction Called between retry attempts with the classified {@link RecoveryKind}. + * The caller should close the link (for {@link RecoveryKind#LINK}) or connection + * (for {@link RecoveryKind#CONNECTION}) so the next retry creates fresh resources. + * + * @return A publisher that returns the results of the {@link Mono} if any of the retry attempts + * are successful. Otherwise, propagates the last error. + */ + public static Mono withRetryAndRecovery(Mono source, AmqpRetryOptions retryOptions, String errorMessage, + Consumer recoveryAction) { + return withRetryAndRecovery(source, retryOptions, errorMessage, false, recoveryAction); + } + + /** + * Like {@link #withRetryAndRecovery(Mono, AmqpRetryOptions, String, Consumer)} but with an option to allow + * long-running operations that should not be subject to the per-attempt timeout. + * + * @param allowsLongOperation If true, the source Mono will not be wrapped with a per-attempt timeout. + */ + public static Mono withRetryAndRecovery(Mono source, AmqpRetryOptions retryOptions, String errorMessage, + boolean allowsLongOperation, Consumer recoveryAction) { + if (!allowsLongOperation) { + source = source.timeout(retryOptions.getTryTimeout()); + } + return source.retryWhen(createRetryWithRecovery(retryOptions, recoveryAction)) + .doOnError(error -> LOGGER.error(errorMessage, error)); + } + static Retry createRetry(AmqpRetryOptions options) { final Duration delay = options.getDelay().plus(SERVER_BUSY_WAIT_TIME); final RetryBackoffSpec retrySpec; @@ -129,4 +170,78 @@ static Retry createRetry(AmqpRetryOptions options) { .filter(error -> error instanceof TimeoutException || (error instanceof AmqpException && ((AmqpException) error).isTransient())); } + + /** + * Creates a Reactor {@link Retry} spec that performs tiered recovery between retry attempts. + * Before each retry, the error is classified and the recovery callback is invoked. + * + *

Includes a quick-retry optimization matching the Go SDK: on the first LINK or CONNECTION + * error, the retry fires immediately (no backoff) since the error may come from a previously + * stale link and recovery has just created a fresh one.

+ */ + static Retry createRetryWithRecovery(AmqpRetryOptions options, Consumer recoveryAction) { + final int maxRetries = options.getMaxRetries(); + final Duration baseDelay = options.getDelay().plus(SERVER_BUSY_WAIT_TIME); + final Duration maxDelay = options.getMaxDelay(); + final boolean isFixed = options.getMode() == com.azure.core.amqp.AmqpRetryMode.FIXED; + final AtomicBoolean didQuickRetry = new AtomicBoolean(false); + + return Retry.from(retrySignals -> retrySignals.flatMap(signal -> { + final Throwable failure = signal.failure(); + final long attempt = signal.totalRetriesInARow(); + final RecoveryKind kind = RecoveryKind.classify(failure); + + // FATAL errors — do not retry. + if (kind == RecoveryKind.FATAL) { + return Mono.error(failure); + } + + // Check retry budget. + if (attempt >= maxRetries) { + return Mono.error(failure); + } + + // Perform recovery before retry. + if (kind != RecoveryKind.NONE && recoveryAction != null) { + try { + recoveryAction.accept(kind); + } catch (Exception e) { + LOGGER.atWarning().addKeyValue("recoveryKind", kind).log("Recovery action failed.", e); + } + } + + // Quick retry: on the FIRST LINK/CONNECTION error, retry immediately (no backoff). + // Uses didQuickRetry flag to prevent repeated immediate retries under persistent + // errors — similar to the Go SDK's didQuickRetry pattern. Unlike Go's ResetAttempts(), + // the attempt counter is not reset here; subsequent retries continue with standard + // exponential backoff from the current attempt count. + // The kind check must come first: short-circuit evaluation prevents consuming the + // flag on NONE/FATAL failures where no quick-retry should be issued. + if ((kind == RecoveryKind.LINK || kind == RecoveryKind.CONNECTION) && !didQuickRetry.getAndSet(true)) { + LOGGER.atInfo().log("Quick retry after {} recovery (first occurrence).", kind); + return Mono.just(attempt); + } + + // Standard backoff delay. + final Duration delay; + if (isFixed) { + // Cap baseDelay to maxDelay so FIXED mode respects retryOptions.getMaxDelay(). + delay = baseDelay.compareTo(maxDelay) > 0 ? maxDelay : baseDelay; + } else { + final long multiplier = 1L << Math.min(attempt, 30); + final long baseMillis = baseDelay.toMillis(); + // Guard against overflow: if baseMillis * multiplier would exceed Long.MAX_VALUE, + // saturate to maxDelay (the clamp below would cap it there anyway). + final long millis + = baseMillis > Long.MAX_VALUE / multiplier ? maxDelay.toMillis() : baseMillis * multiplier; + delay = Duration.ofMillis(Math.min(millis, maxDelay.toMillis())); + } + final double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1) * JITTER_FACTOR; + // Clamp the final jittered delay to maxDelay so retryOptions are consistently respected. + final Duration jitteredDelay + = Duration.ofMillis(Math.min((long) (delay.toMillis() * jitter), maxDelay.toMillis())); + + return Mono.delay(jitteredDelay).thenReturn(attempt); + })); + } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java new file mode 100644 index 000000000000..d123a2c8963c --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java @@ -0,0 +1,215 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for {@link RecoveryKind#classify(Throwable)}. + */ +class RecoveryKindTest { + + @Test + void nullErrorReturnsNone() { + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(null)); + } + + @Test + void timeoutExceptionReturnsNone() { + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(new TimeoutException("timed out"))); + } + + @Test + void serverBusyReturnsNone() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "server busy", null); + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(error)); + } + + @Test + void timeoutErrorConditionReturnsNone() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "timeout", null); + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(error)); + } + + @Test + void linkDetachForcedReturnsLink() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.LINK_DETACH_FORCED, "detach forced", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void linkStolenReturnsLink() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.LINK_STOLEN, "link stolen", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void transientAmqpErrorWithoutConditionReturnsLink() { + final AmqpException error = new AmqpException(true, "transient error", null, null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void connectionForcedReturnsConnection() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.CONNECTION_FORCED, "connection forced", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void connectionFramingErrorReturnsConnection() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.CONNECTION_FRAMING_ERROR, "framing error", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void internalErrorReturnsConnection() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.INTERNAL_ERROR, "internal error", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void protonIoReturnsConnection() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.PROTON_IO, "io error", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void connectionRedirectReturnsConnection() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.CONNECTION_REDIRECT, "redirect", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void linkRedirectReturnsLink() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.LINK_REDIRECT, "redirect", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void transferLimitExceededReturnsLink() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.TRANSFER_LIMIT_EXCEEDED, "transfer limit", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void argumentErrorReturnsFatal() { + final AmqpException error = new AmqpException(false, AmqpErrorCondition.ARGUMENT_ERROR, "bad argument", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void notFoundReturnsFatal() { + final AmqpException error = new AmqpException(false, AmqpErrorCondition.NOT_FOUND, "not found", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void unauthorizedAccessReturnsFatal() { + final AmqpException error + = new AmqpException(false, AmqpErrorCondition.UNAUTHORIZED_ACCESS, "unauthorized", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void payloadSizeExceededReturnsFatal() { + final AmqpException error + = new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, "too large", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void notAllowedReturnsFatal() { + final AmqpException error = new AmqpException(false, AmqpErrorCondition.NOT_ALLOWED, "not allowed", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void nonTransientAmqpErrorReturnsFatal() { + final AmqpException error = new AmqpException(false, "permanent error", null, null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void requestResponseChannelClosedReturnsLink() { + final RequestResponseChannelClosedException error = new RequestResponseChannelClosedException("channel closed"); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void unknownExceptionReturnsFatal() { + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(new RuntimeException("unknown"))); + } + + @Test + void sessionLockLostReturnsLink() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.SESSION_LOCK_LOST, "session lock lost", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void messageLockLostReturnsFatal() { + final AmqpException error + = new AmqpException(false, AmqpErrorCondition.MESSAGE_LOCK_LOST, "message lock lost", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void storeLockLostReturnsFatal() { + final AmqpException error + = new AmqpException(false, AmqpErrorCondition.STORE_LOCK_LOST_ERROR, "store lock lost", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void operationCancelledReturnsLink() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.OPERATION_CANCELLED, "cancelled", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void resourceLimitExceededReturnsNone() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.RESOURCE_LIMIT_EXCEEDED, "resource limit", null); + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(error)); + } + + @Test + void illegalStateExceptionDisposedMessageReturnsLink() { + // Matches ReactorSender.send() message: "connectionId[%s] linkName[%s] Cannot publish message when disposed." + assertEquals(RecoveryKind.LINK, RecoveryKind.classify( + new IllegalStateException("connectionId[abc] linkName[xyz] Cannot publish message when disposed."))); + } + + @Test + void illegalStateExceptionDisposedDataBatchReturnsLink() { + // Matches ReactorSender.send(List) message: "connectionId[%s] linkName[%s] Cannot publish data batch when disposed." + assertEquals(RecoveryKind.LINK, RecoveryKind.classify( + new IllegalStateException("connectionId[abc] linkName[xyz] Cannot publish data batch when disposed."))); + } + + @Test + void illegalStateExceptionUnrelatedToDisposedReturnsFatal() { + // Non-disposed IllegalStateException must remain FATAL (genuine application or SDK bug). + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(new IllegalStateException("some unexpected state"))); + } + + @Test + void illegalStateExceptionConnectionDisposedReturnsFatal() { + // "Connection is disposed. Cannot get management instance." contains "disposed" but NOT + // "Cannot publish" — must remain FATAL to avoid misclassifying connection-level disposal. + assertEquals(RecoveryKind.FATAL, RecoveryKind + .classify(new IllegalStateException("Connection is disposed. Cannot get management instance."))); + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java index b25d920b31f6..fd48b0ecfddc 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java @@ -9,6 +9,7 @@ import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ExponentialAmqpRetryPolicy; import com.azure.core.amqp.FixedAmqpRetryPolicy; +import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.exception.AmqpException; import org.junit.jupiter.api.Assertions; @@ -24,9 +25,12 @@ import reactor.util.retry.RetryBackoffSpec; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Stream; @@ -203,4 +207,151 @@ void retryFilter(Throwable throwable, boolean expected) { // Assert assertEquals(expected, actual); } + + // ---- createRetryWithRecovery tests ---- + + /** + * FATAL errors must not be retried and must not invoke the recovery callback. + */ + @Test + void createRetryWithRecoveryFatalErrorTerminatesImmediately() { + // Arrange + final AmqpRetryOptions options = new AmqpRetryOptions().setMaxRetries(3).setDelay(Duration.ofMillis(100)); + final AtomicInteger recoveryCount = new AtomicInteger(); + final Retry retry = RetryUtil.createRetryWithRecovery(options, kind -> recoveryCount.incrementAndGet()); + final AmqpException fatalError = new AmqpException(false, AmqpErrorCondition.NOT_FOUND, "not found", null); + + // Act & Assert + StepVerifier.create(Mono.error(fatalError).retryWhen(retry)) + .expectErrorSatisfies(e -> assertEquals(fatalError, e)) + .verify(Duration.ofSeconds(5)); + + assertEquals(0, recoveryCount.get(), "Recovery callback must not be called for FATAL errors"); + } + + /** + * LINK errors must invoke the recovery callback and retry. The first occurrence uses + * the quick-retry path (no backoff delay). + */ + @Test + void createRetryWithRecoveryLinkErrorInvokesRecoveryAndRetries() { + // Arrange + final AmqpRetryOptions options = new AmqpRetryOptions().setMaxRetries(3).setDelay(Duration.ofMillis(100)); + final List recoveries = new ArrayList<>(); + final Retry retry = RetryUtil.createRetryWithRecovery(options, recoveries::add); + final AtomicInteger attempt = new AtomicInteger(); + final Mono source = Mono.defer(() -> { + if (attempt.getAndIncrement() < 2) { + return Mono.error(new AmqpException(true, AmqpErrorCondition.LINK_DETACH_FORCED, "detach", null)); + } + return Mono.just(42); + }); + + // Act & Assert — use virtual time because the base retry delay unconditionally includes + // SERVER_BUSY_WAIT_TIME (4 s), regardless of error type — the cumulative + // wait would exceed real-time test limits. + StepVerifier.withVirtualTime(() -> source.retryWhen(retry)) + .expectSubscription() + .thenAwait(Duration.ofMinutes(1)) + .expectNext(42) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertEquals(2, recoveries.size(), "Recovery callback called once per LINK error"); + assertTrue(recoveries.stream().allMatch(k -> k == RecoveryKind.LINK)); + } + + /** + * CONNECTION errors must invoke the recovery callback with CONNECTION kind. + */ + @Test + void createRetryWithRecoveryConnectionErrorInvokesRecovery() { + // Arrange + final AmqpRetryOptions options = new AmqpRetryOptions().setMaxRetries(2).setDelay(Duration.ofMillis(100)); + final AtomicReference capturedKind = new AtomicReference<>(); + final Retry retry = RetryUtil.createRetryWithRecovery(options, capturedKind::set); + final AtomicInteger attempt = new AtomicInteger(); + final Mono source = Mono.defer(() -> { + if (attempt.getAndIncrement() == 0) { + return Mono.error(new AmqpException(true, AmqpErrorCondition.CONNECTION_FORCED, "forced", null)); + } + return Mono.just(1); + }); + + // Act & Assert + StepVerifier.create(source.retryWhen(retry)).expectNext(1).expectComplete().verify(Duration.ofSeconds(5)); + + assertEquals(RecoveryKind.CONNECTION, capturedKind.get(), + "Recovery callback must receive CONNECTION kind for connection errors"); + } + + /** + * After the retry budget is exhausted the error must propagate without further retries. + */ + @Test + void createRetryWithRecoveryExhaustedRetriesTerminateWithError() { + // Arrange + final int maxRetries = 2; + final AmqpRetryOptions options + = new AmqpRetryOptions().setMaxRetries(maxRetries).setDelay(Duration.ofMillis(10)); + final AtomicInteger recoveryCount = new AtomicInteger(); + final Retry retry = RetryUtil.createRetryWithRecovery(options, kind -> recoveryCount.incrementAndGet()); + final AmqpException transientError + = new AmqpException(true, AmqpErrorCondition.LINK_DETACH_FORCED, "detach", null); + + // Act & Assert — use virtual time because the base retry delay unconditionally includes + // SERVER_BUSY_WAIT_TIME (4 s), regardless of error type — the cumulative + // wait would exceed real-time test limits. + StepVerifier.withVirtualTime(() -> Mono.error(transientError).retryWhen(retry)) + .expectSubscription() + .thenAwait(Duration.ofMinutes(1)) + .expectError(AmqpException.class) + .verify(Duration.ofSeconds(5)); + + // Recovery called on each retry attempt (not the final one which terminates) + assertEquals(maxRetries, recoveryCount.get(), "Recovery callback called once per non-terminal retry"); + } + + /** + * A NONE-kind failure (server-busy) before the first LINK failure must not consume the + * quick-retry flag. The first LINK failure should still trigger the quick-retry optimization + * (no backoff delay). Prior to the T13 fix, {@code didQuickRetry.getAndSet(true)} was evaluated + * unconditionally; this test verifies the kind check comes first. + */ + @Test + void createRetryWithRecoveryNoneFailureBeforeLinkPreservesQuickRetry() { + // Arrange + final AmqpRetryOptions options = new AmqpRetryOptions().setMaxRetries(3).setDelay(Duration.ofMillis(100)); + final List recoveries = new ArrayList<>(); + final Retry retry = RetryUtil.createRetryWithRecovery(options, recoveries::add); + final AtomicInteger attempt = new AtomicInteger(); + final Mono source = Mono.defer(() -> { + switch (attempt.getAndIncrement()) { + case 0: + // NONE kind — server-busy; should not consume the quick-retry flag. + return Mono.error(new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "busy", null)); + + case 1: + // LINK kind — first occurrence; flag must still be available → quick-retry fires. + return Mono.error(new AmqpException(true, AmqpErrorCondition.LINK_DETACH_FORCED, "detach", null)); + + default: + return Mono.just(99); + } + }); + + // Act & Assert — virtual time because the base delay logic unconditionally adds SERVER_BUSY_WAIT_TIME + // (4 s) to every retry; the first LINK error uses the quick-retry path (no delay). + StepVerifier.withVirtualTime(() -> source.retryWhen(retry)) + .expectSubscription() + .thenAwait(Duration.ofMinutes(1)) + .expectNext(99) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + // NONE failures do not invoke recovery; only LINK does. + assertEquals(1, recoveries.size(), "Only the LINK failure should invoke recovery"); + assertEquals(RecoveryKind.LINK, recoveries.get(0), + "Recovery callback must be called with LINK kind for the detach error"); + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 47390eae5860..d55656f00bbc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -69,7 +69,7 @@ com.azure azure-core-amqp - 2.11.3 + 2.12.0-beta.1 com.azure diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ConnectionCacheWrapper.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ConnectionCacheWrapper.java index 1d4aa44b4c42..885ec94e606d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ConnectionCacheWrapper.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ConnectionCacheWrapper.java @@ -51,4 +51,17 @@ AmqpRetryOptions getRetryOptions() { boolean isChannelClosed() { return isV2 ? cache.isCurrentConnectionClosed() : processor.isChannelClosed(); } + + /** + * Force-closes the current cached connection so the next get() creates a fresh one. + * Used for connection-level recovery when the connection is stale but the cache + * has not detected it via endpoint state signals. + */ + void forceCloseConnection() { + if (isV2) { + cache.forceCloseConnection(); + } else { + processor.forceCloseChannel(); + } + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 0b3154ec33ef..21d0753618d2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -11,6 +11,7 @@ import com.azure.core.amqp.implementation.MessageFlux; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.RequestResponseChannelClosedException; +import com.azure.core.amqp.implementation.RecoveryKind; import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.amqp.implementation.StringUtil; import com.azure.core.amqp.implementation.handler.DeliveryNotOnLinkException; @@ -1734,17 +1735,36 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() { // [2]. When we try to create a new session (to host the new link) but on a connection being disposed, // the retry can eventually receive a new connection and then proceed with creating session and link. // - final Mono retryableReceiveLinkMono - = RetryUtil.withRetry(receiveLinkMono.onErrorMap(RequestResponseChannelClosedException.class, e -> { - // When the current connection is being disposed, the V1 ConnectionProcessor or V2 ReactorConnectionCache - // can produce a new connection if downstream request. In this context, treat - // RequestResponseChannelClosedException error from the following two sources as retry-able so that - // retry can obtain a new connection - - // 1. error from the RequestResponseChannel scoped to the current connection being disposed, - // 2. error from the V2 RequestResponseChannelCache scoped to the current connection being disposed. - // + final Mono retryableReceiveLinkMono = RetryUtil + .withRetryAndRecovery(receiveLinkMono.onErrorMap(RequestResponseChannelClosedException.class, e -> { return new AmqpException(true, e.getMessage(), e, null); - }), connectionCacheWrapper.getRetryOptions(), "Failed to create receive link " + linkName, true); + }), connectionCacheWrapper.getRetryOptions(), "Failed to create receive link " + linkName, true, + recoveryKind -> { + if (recoveryKind == RecoveryKind.LINK || recoveryKind == RecoveryKind.CONNECTION) { + LOGGER.atWarning() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue("recoveryKind", recoveryKind) + .log("Receive link creation failed, performing {} recovery.", recoveryKind); + + // For LINK errors during link creation, the session hosting the link may be stale. + // Ask the connection to remove it so the next retry creates a fresh session + link. + // The entityPath is the session name used by createReceiveLink(). + // Note: the error handler fires only if obtaining the connection fails, not if removeSession fails + // (removeSession returns a boolean and never propagates an error into the reactive stream). + connectionProcessor.subscribe(connection -> { + final boolean removed = connection.removeSession(entityPath); + LOGGER.atVerbose() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue("sessionRemoved", removed) + .log("Attempted stale session removal during {} recovery.", recoveryKind); + }, error -> LOGGER.atWarning() + .addKeyValue(LINK_NAME_KEY, linkName) + .log("Error obtaining connection during {} recovery.", recoveryKind, error)); + } + if (recoveryKind == RecoveryKind.CONNECTION) { + connectionCacheWrapper.forceCloseConnection(); + } + }); // A Flux that produces a new AmqpReceiveLink each time it receives a request from the below // 'AmqpReceiveLinkProcessor'. Obviously, the processor requests a link when there is a downstream subscriber. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 58db3b129e65..9dd65e05d85e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -10,7 +10,9 @@ import com.azure.core.amqp.implementation.AmqpSendLink; import com.azure.core.amqp.implementation.ErrorContextProvider; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.RecoveryKind; import com.azure.core.amqp.implementation.RequestResponseChannelClosedException; +import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.annotation.ServiceClient; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; @@ -234,6 +236,7 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable { private final MessagingEntityType entityType; private final Runnable onClientClose; private final String entityName; + private final ConnectionCacheWrapper connectionCacheWrapper; private final Mono connectionProcessor; private final String fullyQualifiedNamespace; private final String viaEntityName; @@ -254,6 +257,7 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable { this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' cannot be null."); this.entityName = Objects.requireNonNull(entityName, "'entityPath' cannot be null."); Objects.requireNonNull(connectionCacheWrapper, "'connectionCacheWrapper' cannot be null."); + this.connectionCacheWrapper = connectionCacheWrapper; this.connectionProcessor = connectionCacheWrapper.getConnection(); this.fullyQualifiedNamespace = connectionCacheWrapper.getFullyQualifiedNamespace(); this.instrumentation = Objects.requireNonNull(instrumentation, "'instrumentation' cannot be null."); @@ -810,16 +814,20 @@ private Mono scheduleMessageInternal(ServiceBusMessage message, OffsetDate return monoError(logger, new NullPointerException("'scheduledEnqueueTime' cannot be null.")); } - return tracer.traceScheduleMono("ServiceBus.scheduleMessage", - getSendLinkWithRetry("schedule-message").flatMap(link -> link.getLinkSize().flatMap(size -> { - final int maxSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; - return connectionProcessor.flatMap(connection -> connection.getManagementNode(entityName, entityType)) - .flatMap( - managementNode -> managementNode - .schedule(Arrays.asList(message), scheduledEnqueueTime, maxSize, link.getLinkName(), - transactionContext) - .next()); - })), message, message.getContext()).onErrorMap(this::mapError); + return tracer + .traceScheduleMono("ServiceBus.scheduleMessage", getSendLinkWithRetry("schedule-message").flatMap(link -> { + return link.getLinkSize().flatMap(size -> { + final int maxSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; + return connectionProcessor + .flatMap(connection -> connection.getManagementNode(entityName, entityType)) + .flatMap( + managementNode -> managementNode + .schedule(Arrays.asList(message), scheduledEnqueueTime, maxSize, link.getLinkName(), + transactionContext) + .next()); + }); + }), message, message.getContext()) + .onErrorMap(this::mapError); } /** @@ -859,7 +867,9 @@ private Mono sendBatchInternal(ServiceBusMessageBatch batch, messages.add(message); }); + final AtomicReference operationLink = new AtomicReference<>(); final Mono sendMessage = getSendLink("send-batch").flatMap(link -> { + operationLink.set(link); if (transactionContext != null && transactionContext.getTransactionId() != null) { final TransactionalState deliveryState = new TransactionalState(); deliveryState.setTxnId(Binary.create(transactionContext.getTransactionId())); @@ -871,8 +881,11 @@ private Mono sendBatchInternal(ServiceBusMessageBatch batch, } }); - final String message = "Sending messages timed out. message-count:" + batch.getCount() + entityId(); - final Mono withRetry = withRetry(sendMessage, retryOptions, message).onErrorMap(this::mapError); + final String timeoutMessage = "Sending messages timed out. message-count:" + batch.getCount() + entityId(); + final Mono withRetry + = RetryUtil.withRetryAndRecovery(sendMessage, retryOptions, timeoutMessage, recoveryKind -> { + performRecovery(recoveryKind, "sendBatch", operationLink); + }).onErrorMap(this::mapError); return instrumentation.instrumentSendBatch("ServiceBus.send", withRetry, batch.getMessages()); } @@ -883,8 +896,17 @@ private Mono sendFluxInternal(Flux messages, new IllegalStateException(String.format(INVALID_OPERATION_DISPOSED_SENDER, "sendMessage"))); } - final Mono> batchList - = getSendLinkWithRetry("send-batches").flatMap(link -> link.getLinkSize().flatMap(size -> { + // Apply retry+recovery only to link acquisition. Keeping messages.collect() outside the + // retry boundary avoids re-subscribing the user-provided Flux on each retry attempt, + // which could duplicate side-effects or re-consume a hot publisher. + final AtomicReference operationLink = new AtomicReference<>(); + final Mono linkWithRecovery + = RetryUtil.withRetryAndRecovery(getSendLink("send-batches").doOnNext(operationLink::set), retryOptions, + "Failed to acquire send link for batch collection." + entityId(), + recoveryKind -> performRecovery(recoveryKind, "sendFlux-link", operationLink)); + + final Mono> batchListMono + = linkWithRecovery.flatMap(link -> link.getLinkSize().flatMap(size -> { final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; final CreateMessageBatchOptions batchOptions = new CreateMessageBatchOptions().setMaximumSizeInBytes(batchSize); @@ -892,10 +914,12 @@ private Mono sendFluxInternal(Flux messages, new AmqpMessageCollector(isV2, batchOptions, 1, link::getErrorContext, tracer, messageSerializer)); })); - return batchList.flatMap(list -> Flux.fromIterable(list) + final Mono sendOperation = batchListMono.flatMap(list -> Flux.fromIterable(list) .flatMap(batch -> sendBatchInternal(batch, transactionContext)) .then() - .doOnError(error -> logger.error("Error sending batch.", error))).onErrorMap(this::mapError); + .doOnError(error -> logger.error("Error sending batch.", error))); + + return sendOperation.onErrorMap(this::mapError); } private Mono getSendLink(String callSite) { @@ -926,7 +950,10 @@ private Mono getSendLink(String callSite) { } private Mono getSendLinkWithRetry(String callSite) { - return withRetry(getSendLink(callSite), retryOptions, String.format(retryGetLinkErrorMessageFormat, callSite)); + return RetryUtil.withRetryAndRecovery(getSendLink(callSite), retryOptions, + String.format(retryGetLinkErrorMessageFormat, callSite), recoveryKind -> { + performRecovery(recoveryKind, "getSendLink-" + callSite, null); + }); } private Throwable mapError(Throwable throwable) { @@ -936,6 +963,51 @@ private Throwable mapError(Throwable throwable) { return throwable; } + /** + * Performs tiered recovery by disposing stale resources based on the classified error. + * For LINK recovery, disposes the send link so the next retry creates a fresh one. + * For CONNECTION recovery, disposes the link and the connection so the connection + * processor creates everything fresh. + * + *

This matches the Go SDK's RecoverIfNeeded() and the .NET SDK's + * FaultTolerantAmqpObject pattern.

+ */ + private void performRecovery(RecoveryKind recoveryKind, String callSite, AtomicReference linkRef) { + if (recoveryKind == RecoveryKind.NONE || recoveryKind == RecoveryKind.FATAL) { + return; + } + + logger.atWarning() + .addKeyValue(ENTITY_PATH_KEY, entityName) + .addKeyValue("recoveryKind", recoveryKind) + .addKeyValue("callSite", callSite) + .log("Performing {} recovery before retry.", recoveryKind); + + // Start async close of the operation-scoped send link so the next retry creates a fresh one. + // Using a per-operation AtomicReference (not a class-level field) prevents concurrent send + // operations from accidentally closing each other's links. + // Use closeAsync() rather than dispose() to avoid blocking the Reactor thread; ReactorSender + // dispose() calls closeAsync().block(tryTimeout), which is illegal on a non-blocking scheduler. + final AmqpSendLink link = linkRef != null ? linkRef.getAndSet(null) : null; + if (link != null) { + link.closeAsync() + .subscribe(null, + error -> logger.atVerbose() + .addKeyValue(ENTITY_PATH_KEY, entityName) + .log("Error closing stale send link during recovery.", error)); + } + linkName.set(null); + + // For CONNECTION errors, explicitly force-close the cached connection so the + // next get() on the connection processor creates a fresh one. This handles the + // stale-connection scenario where heartbeats are echoed by intermediate + // infrastructure and the cache has not yet detected the failure. + // Matches Go SDK's Namespace.Recover(). + if (recoveryKind == RecoveryKind.CONNECTION) { + connectionCacheWrapper.forceCloseConnection(); + } + } + private String entityId() { return " " + ENTITY_PATH_KEY + ":" + entityName + (viaEntityName != null ? " " + VIA_ENTITY_NAME_KEY + ":" + viaEntityName : "") + " "; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java index ee9a0a9e0399..d3a715e9976b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java @@ -9,6 +9,7 @@ import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.implementation.StringUtil; import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2; +import com.azure.core.amqp.implementation.RecoveryKind; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.MessagingEntityType; import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode; @@ -137,12 +138,25 @@ private Mono acquireIntern(String sessionId) { return acquireSession(sessionId).timeout(tryTimeout) .retryWhen(Retry.from(signals -> signals.flatMap(signal -> { final Throwable t = signal.failure(); + final RecoveryKind kind = RecoveryKind.classify(t); + if (kind == RecoveryKind.CONNECTION) { + logger.atWarning() + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Connection-level error acquiring session, forcing connection recovery.", t); + connectionCacheWrapper.forceCloseConnection(); + return Mono.delay(Duration.ZERO); + } if (isTimeoutError(t)) { logger.atVerbose() .addKeyValue(ENTITY_PATH_KEY, entityPath) .addKeyValue("attempt", signal.totalRetriesInARow()) .log("Timeout while acquiring session '{}'.", sessionName(sessionId), t); - // retry session acquire using Schedulers.parallel() and free the QPid thread. + return Mono.delay(Duration.ZERO); + } + if (kind == RecoveryKind.LINK) { + logger.atWarning() + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Link-level error acquiring session, retrying.", t); return Mono.delay(Duration.ZERO); } return publishError(sessionId, t, true); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index 3a6343c98710..59971f46fc80 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -8,6 +8,7 @@ import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.SessionErrorContext; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.RecoveryKind; import com.azure.core.amqp.implementation.StringUtil; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.DispositionStatus; @@ -281,24 +282,32 @@ Mono getActiveLink() { .timeout(operationTimeout) .then(Mono.just(link)))).retryWhen(Retry.from(retrySignals -> retrySignals.flatMap(signal -> { final Throwable failure = signal.failure(); + final RecoveryKind kind = RecoveryKind.classify(failure); LOGGER.atInfo() .addKeyValue(ENTITY_PATH_KEY, entityPath) .addKeyValue("attempt", signal.totalRetriesInARow()) + .addKeyValue("recoveryKind", kind) .log("Error occurred while getting unnamed session.", failure); if (isDisposed.get()) { return Mono.error( new AmqpException(false, "SessionManager is already disposed.", failure, getErrorContext())); - } else if (failure instanceof TimeoutException) { + } + + if (kind == RecoveryKind.CONNECTION) { + LOGGER.atWarning() + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Connection-level error in session manager, forcing connection recovery.", failure); + connectionCacheWrapper.forceCloseConnection(); + } + + if (failure instanceof TimeoutException) { return Mono.delay(Duration.ZERO); } else if (failure instanceof AmqpException && ((AmqpException) failure).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) { - // The link closed remotely with 'Detach {errorCondition:com.microsoft:timeout}' frame because - // the broker waited for N seconds (60 sec hard limit today) but there was no free or new session. - // - // Given N seconds elapsed since the last session acquire attempt, request for a session on - // the 'parallel' Scheduler and free the 'QPid' thread for other IO. - // + return Mono.delay(Duration.ZERO); + } else if (kind == RecoveryKind.LINK || kind == RecoveryKind.CONNECTION) { + // Link or connection-level error — retry to acquire a fresh link (or connection). return Mono.delay(Duration.ZERO); } else { final long id = System.nanoTime(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index 20ca897316e9..204eb1db52f7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -6,6 +6,8 @@ import com.azure.core.amqp.AmqpEndpointState; import com.azure.core.amqp.AmqpRetryMode; import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.AmqpTransaction; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.FixedAmqpRetryPolicy; @@ -668,7 +670,8 @@ void failedSendMessageReportsMetrics(boolean isV2) { when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); - when(sendLink.send(any(Message.class))).thenThrow(new RuntimeException("foo")); + when(sendLink.send(any(Message.class))) + .thenReturn(Mono.error(new AmqpException(false, AmqpErrorCondition.NOT_FOUND, "entity not found", null))); // Act StepVerifier.create(sender.sendMessage(new ServiceBusMessage(TEST_CONTENTS))) diff --git a/sdk/servicebus/ci.yml b/sdk/servicebus/ci.yml index 58823bab7a69..49564ad549aa 100644 --- a/sdk/servicebus/ci.yml +++ b/sdk/servicebus/ci.yml @@ -13,6 +13,7 @@ trigger: - sdk/servicebus/azure-messaging-servicebus-stress/ - sdk/servicebus/azure-messaging-servicebus-track2-perf/ - sdk/servicebus/build/ + - sdk/core/azure-core-amqp/ exclude: - sdk/servicebus/pom.xml - sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -33,6 +34,7 @@ pr: - sdk/servicebus/azure-messaging-servicebus-stress/ - sdk/servicebus/azure-messaging-servicebus-track2-perf/ - sdk/servicebus/build/ + - sdk/core/azure-core-amqp/ exclude: - sdk/servicebus/pom.xml - sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -59,3 +61,6 @@ extends: # required by the above perf libraries - name: perf-test-core groupId: com.azure + - name: azure-core-amqp + groupId: com.azure + # Build azure-core-amqp from source (needed for RecoveryKind, tiered retry)