Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions internal/controller/httpproxy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ const (
// +kubebuilder:rbac:groups=gateway.envoyproxy.io,resources=httproutefilters,verbs=get;list;watch;create;update;patch;delete
// HTTPProxy controller reads cert-manager Certificate resources in the downstream cluster for status; ensure downstream role has cert-manager.io/certificates get;list;watch.

func (r *HTTPProxyReconciler) Reconcile(ctx context.Context, req mcreconcile.Request) (_ ctrl.Result, err error) {
func (r *HTTPProxyReconciler) Reconcile(ctx context.Context, req mcreconcile.Request) (result ctrl.Result, err error) {
logger := log.FromContext(ctx, "cluster", req.ClusterName)
ctx = log.IntoContext(ctx, logger)

Expand Down Expand Up @@ -168,9 +168,18 @@ func (r *HTTPProxyReconciler) Reconcile(ctx context.Context, req mcreconcile.Req
if !equality.Semantic.DeepEqual(httpProxy.Status, httpProxyCopy.Status) {
httpProxy.Status = httpProxyCopy.Status
if statusErr := cl.GetClient().Status().Update(ctx, &httpProxy); statusErr != nil {
err = errors.Join(err, fmt.Errorf("failed updating httpproxy status: %w", statusErr))
if apierrors.IsConflict(statusErr) {
// Optimistic-locking conflict on the status write: requeue
// quickly rather than entering the exponential-backoff queue.
// This prevents multi-minute stalls when concurrent reconciles
// race to update the same HTTPProxy status on creation.
result.RequeueAfter = retryAfterConflict
} else {
err = errors.Join(err, fmt.Errorf("failed updating httpproxy status: %w", statusErr))
}
} else {
logger.Info("httpproxy status updated")
}
logger.Info("httpproxy status updated")
}
}()

Expand All @@ -184,7 +193,7 @@ func (r *HTTPProxyReconciler) Reconcile(ctx context.Context, req mcreconcile.Req

gateway := desiredResources.gateway.DeepCopy()

result, err := controllerutil.CreateOrUpdate(ctx, cl.GetClient(), gateway, func() error {
opResult, err := controllerutil.CreateOrUpdate(ctx, cl.GetClient(), gateway, func() error {
if hasControllerConflict(gateway, &httpProxy) {
// return already exists error - a gateway exists with the name we want to
// use, but it's owned by a different resource.
Expand Down Expand Up @@ -220,10 +229,13 @@ func (r *HTTPProxyReconciler) Reconcile(ctx context.Context, req mcreconcile.Req
programmedCondition.Message = fmt.Sprintf("Underlying Gateway with the name %q already exists and is owned by a different resource.", gateway.Name)
return ctrl.Result{}, nil
}
if apierrors.IsConflict(err) {
return ctrl.Result{RequeueAfter: retryAfterConflict}, nil
}
return ctrl.Result{}, fmt.Errorf("failed updating gateway resource: %w", err)
}

logger.Info("processed gateway", "name", gateway.Name, "result", result)
logger.Info("processed gateway", "name", gateway.Name, "result", opResult)

// Maintain an HTTPRoute for all rules in the HTTPProxy

Expand All @@ -234,7 +246,7 @@ func (r *HTTPProxyReconciler) Reconcile(ctx context.Context, req mcreconcile.Req
} else {
for _, desiredFilter := range desiredResources.httpRouteFilters {
httpRouteFilter := desiredFilter.DeepCopy()
result, err := controllerutil.CreateOrUpdate(ctx, cl.GetClient(), httpRouteFilter, func() error {
opResult, err := controllerutil.CreateOrUpdate(ctx, cl.GetClient(), httpRouteFilter, func() error {
if err := controllerutil.SetControllerReference(&httpProxy, httpRouteFilter, cl.GetScheme()); err != nil {
return fmt.Errorf("failed to set controller on HTTPRouteFilter: %w", err)
}
Expand All @@ -244,13 +256,13 @@ func (r *HTTPProxyReconciler) Reconcile(ctx context.Context, req mcreconcile.Req
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed updating httproutefilter resource: %w", err)
}
logger.Info("processed httproutefilter", "name", httpRouteFilter.Name, "result", result)
logger.Info("processed httproutefilter", "name", httpRouteFilter.Name, "result", opResult)
}
}

httpRoute := desiredResources.httpRoute.DeepCopy()

result, err = controllerutil.CreateOrUpdate(ctx, cl.GetClient(), httpRoute, func() error {
opResult, err = controllerutil.CreateOrUpdate(ctx, cl.GetClient(), httpRoute, func() error {
if hasControllerConflict(httpRoute, &httpProxy) {
// return already exists error - an httproute exists with the name we want to
// use, but it's owned by a different resource.
Expand All @@ -272,15 +284,18 @@ func (r *HTTPProxyReconciler) Reconcile(ctx context.Context, req mcreconcile.Req
programmedCondition.Message = fmt.Sprintf("Underlying HTTPRoute with the name %q already exists and is owned by a different resource.", httpRoute.Name)
return ctrl.Result{}, nil
}
if apierrors.IsConflict(err) {
return ctrl.Result{RequeueAfter: retryAfterConflict}, nil
}
return ctrl.Result{}, fmt.Errorf("failed updating httproute resource: %w", err)
}

logger.Info("processed httproute", "name", httpRoute.Name, "result", result)
logger.Info("processed httproute", "name", httpRoute.Name, "result", opResult)

for _, desiredEndpointSlice := range desiredResources.endpointSlices {
endpointSlice := desiredEndpointSlice.DeepCopy()

result, err := controllerutil.CreateOrUpdate(ctx, cl.GetClient(), endpointSlice, func() error {
opResult, err := controllerutil.CreateOrUpdate(ctx, cl.GetClient(), endpointSlice, func() error {
if hasControllerConflict(endpointSlice, &httpProxy) {
// return already exists error - an endpointslice exists with the name we want to
// use, but it's owned by a different resource.
Expand Down Expand Up @@ -317,11 +332,13 @@ func (r *HTTPProxyReconciler) Reconcile(ctx context.Context, req mcreconcile.Req
programmedCondition.Message = fmt.Sprintf("Underlying EndpointSlice with the name %q already exists and is owned by a different resource.", endpointSlice.Name)
return ctrl.Result{}, nil
}

if apierrors.IsConflict(err) {
return ctrl.Result{RequeueAfter: retryAfterConflict}, nil
}
return ctrl.Result{}, fmt.Errorf("failed to create or update endpointslice: %w", err)
}

logger.Info("processed endpointslice", "result", result, "name", desiredEndpointSlice.Name)
logger.Info("processed endpointslice", "result", opResult, "name", desiredEndpointSlice.Name)
}

patchPolicy, hasConnectorBackends, err := r.reconcileConnectorEnvoyPatchPolicy(
Expand Down
153 changes: 153 additions & 0 deletions internal/controller/httpproxy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,159 @@ func (c *fakeCluster) GetScheme() *runtime.Scheme {
return c.cl.Scheme()
}

// TestHTTPProxyReconcileConflictRequeue verifies that optimistic-locking 409
// Conflict errors on child-resource writes and on the HTTPProxy status update
// produce a short RequeueAfter instead of entering the exponential-backoff
// queue. Without this, concurrent reconciles during tunnel creation silenced
// the controller for 3-4 minutes.
func TestHTTPProxyReconcileConflictRequeue(t *testing.T) {
t.Parallel()

logger := zap.New(zap.UseFlagOptions(&zap.Options{Development: true}))
ctx := log.IntoContext(context.Background(), logger)

testScheme := runtime.NewScheme()
require.NoError(t, scheme.AddToScheme(testScheme))
require.NoError(t, gatewayv1.Install(testScheme))
require.NoError(t, envoygatewayv1alpha1.AddToScheme(testScheme))
require.NoError(t, discoveryv1.AddToScheme(testScheme))
require.NoError(t, networkingv1alpha.AddToScheme(testScheme))
require.NoError(t, networkingv1alpha1.AddToScheme(testScheme))

testConfig := config.NetworkServicesOperator{
HTTPProxy: config.HTTPProxyConfig{GatewayClassName: "test-gateway-class"},
Gateway: config.GatewayConfig{
ControllerName: gatewayv1.GatewayController("test-gateway-class"),
DownstreamGatewayClassName: "test-downstream-gateway-class",
TargetDomain: "example.com",
},
}

for _, tc := range []struct {
name string
extraObjects func(httpProxy *networkingv1alpha.HTTPProxy) []client.Object
injectFunc func(interceptor.Funcs) interceptor.Funcs
}{
{
name: "conflict on gateway update requeuees quickly",
// Pre-seed a gateway with no spec so the reconcile mutates it,
// triggering an Update (which the interceptor turns into a 409).
extraObjects: func(p *networkingv1alpha.HTTPProxy) []client.Object {
return []client.Object{&gatewayv1.Gateway{ObjectMeta: metav1.ObjectMeta{
Namespace: p.Namespace,
Name: p.Name,
UID: uuid.NewUUID(),
CreationTimestamp: metav1.Now(),
}}}
},
injectFunc: func(f interceptor.Funcs) interceptor.Funcs {
f.Update = func(ctx context.Context, c client.WithWatch, obj client.Object, opts ...client.UpdateOption) error {
if _, ok := obj.(*gatewayv1.Gateway); ok {
return apierrors.NewConflict(gatewayv1.Resource("gateways"), obj.GetName(), fmt.Errorf("injected"))
}
return c.Update(ctx, obj, opts...)
}
return f
},
},
{
name: "conflict on httproute update requeuees quickly",
// Pre-seed gateway (so that path succeeds) and httproute with no
// spec so the reconcile mutates the route, triggering an Update.
extraObjects: func(p *networkingv1alpha.HTTPProxy) []client.Object {
return []client.Object{
&gatewayv1.Gateway{ObjectMeta: metav1.ObjectMeta{
Namespace: p.Namespace,
Name: p.Name,
UID: uuid.NewUUID(),
CreationTimestamp: metav1.Now(),
}},
&gatewayv1.HTTPRoute{ObjectMeta: metav1.ObjectMeta{
Namespace: p.Namespace,
Name: p.Name,
UID: uuid.NewUUID(),
CreationTimestamp: metav1.Now(),
}},
}
},
injectFunc: func(f interceptor.Funcs) interceptor.Funcs {
f.Update = func(ctx context.Context, c client.WithWatch, obj client.Object, opts ...client.UpdateOption) error {
if _, ok := obj.(*gatewayv1.HTTPRoute); ok {
return apierrors.NewConflict(gatewayv1.Resource("httproutes"), obj.GetName(), fmt.Errorf("injected"))
}
return c.Update(ctx, obj, opts...)
}
return f
},
},
{
name: "conflict on status update requeuees quickly",
injectFunc: func(f interceptor.Funcs) interceptor.Funcs {
f.SubResourceUpdate = func(ctx context.Context, c client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error {
if subResourceName == "status" {
if _, ok := obj.(*networkingv1alpha.HTTPProxy); ok {
return apierrors.NewConflict(gatewayv1.Resource("httpproxies"), obj.GetName(), fmt.Errorf("injected"))
}
}
return c.SubResource(subResourceName).Update(ctx, obj, opts...)
}
return f
},
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

httpProxy := newHTTPProxy()
// Pre-set the finalizer so the reconcile doesn't bail out early
// to add it before we can test child-resource conflict handling.
httpProxy.Finalizers = append(httpProxy.Finalizers, httpProxyFinalizer)

ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: httpProxy.Namespace}}
ns.SetUID("test-ns-uid")

initialObjects := []client.Object{httpProxy, ns}
if tc.extraObjects != nil {
initialObjects = append(initialObjects, tc.extraObjects(httpProxy)...)
}

interceptorFuncs := tc.injectFunc(interceptor.Funcs{
Create: func(ctx context.Context, c client.WithWatch, obj client.Object, opts ...client.CreateOption) error {
obj.SetUID(uuid.NewUUID())
obj.SetCreationTimestamp(metav1.Now())
return c.Create(ctx, obj, opts...)
},
})

fakeClient := fake.NewClientBuilder().
WithScheme(testScheme).
WithObjects(initialObjects...).
WithStatusSubresource(&networkingv1alpha.HTTPProxy{}, &gatewayv1.Gateway{}).
WithInterceptorFuncs(interceptorFuncs).
Build()

mgr := &fakeMockManager{cl: fakeClient}
reconciler := &HTTPProxyReconciler{
mgr: mgr,
Config: testConfig,
DownstreamCluster: &fakeCluster{cl: fake.NewClientBuilder().WithScheme(testScheme).Build()},
}

req := mcreconcile.Request{
Request: reconcile.Request{NamespacedName: client.ObjectKeyFromObject(httpProxy)},
ClusterName: "test-cluster",
}

// The finalizer is already present; this reconcile goes straight to
// child-resource management and should hit the injected 409.
result, err := reconciler.Reconcile(ctx, req)
assert.NoError(t, err, "conflict should not be returned as an error")
assert.Equal(t, retryAfterConflict, result.RequeueAfter,
"conflict should produce a short RequeueAfter, not exponential backoff")
})
}
}

func TestBuildAvailabilityStatuses(t *testing.T) {
t.Parallel()

Expand Down
Loading