diff --git a/config/crds/core.kcp.io_logicalclusters.yaml b/config/crds/core.kcp.io_logicalclusters.yaml index 2819a4b735d..b5c59046aa9 100644 --- a/config/crds/core.kcp.io_logicalclusters.yaml +++ b/config/crds/core.kcp.io_logicalclusters.yaml @@ -233,6 +233,7 @@ spec: - Initializing - Ready - Unavailable + - Inactive - Terminating - Deleting type: string diff --git a/config/crds/tenancy.kcp.io_workspaces.yaml b/config/crds/tenancy.kcp.io_workspaces.yaml index 0b3040a782a..539b4962c42 100644 --- a/config/crds/tenancy.kcp.io_workspaces.yaml +++ b/config/crds/tenancy.kcp.io_workspaces.yaml @@ -298,6 +298,7 @@ spec: - Initializing - Ready - Unavailable + - Inactive - Terminating - Deleting type: string diff --git a/config/root-phase0/apiexport-tenancy.kcp.io.yaml b/config/root-phase0/apiexport-tenancy.kcp.io.yaml index b9c8d4c690b..2883934a592 100644 --- a/config/root-phase0/apiexport-tenancy.kcp.io.yaml +++ b/config/root-phase0/apiexport-tenancy.kcp.io.yaml @@ -13,7 +13,7 @@ spec: crd: {} - group: tenancy.kcp.io name: workspaces - schema: v260428-d075f2d48.workspaces.tenancy.kcp.io + schema: v260522-2bf05df66.workspaces.tenancy.kcp.io storage: crd: {} - group: tenancy.kcp.io diff --git a/config/root-phase0/apiresourceschema-logicalclusters.core.kcp.io.yaml b/config/root-phase0/apiresourceschema-logicalclusters.core.kcp.io.yaml index 5160d5caba1..c6e430f0954 100644 --- a/config/root-phase0/apiresourceschema-logicalclusters.core.kcp.io.yaml +++ b/config/root-phase0/apiresourceschema-logicalclusters.core.kcp.io.yaml @@ -1,7 +1,7 @@ apiVersion: apis.kcp.io/v1alpha1 kind: APIResourceSchema metadata: - name: v260428-d075f2d48.logicalclusters.core.kcp.io + name: v260522-2bf05df66.logicalclusters.core.kcp.io spec: group: core.kcp.io names: @@ -230,6 +230,7 @@ spec: - Initializing - Ready - Unavailable + - Inactive - Terminating - Deleting type: string diff --git a/config/root-phase0/apiresourceschema-workspaces.tenancy.kcp.io.yaml b/config/root-phase0/apiresourceschema-workspaces.tenancy.kcp.io.yaml index 32c9aed86b5..7f7f8fb78e0 100644 --- a/config/root-phase0/apiresourceschema-workspaces.tenancy.kcp.io.yaml +++ b/config/root-phase0/apiresourceschema-workspaces.tenancy.kcp.io.yaml @@ -1,7 +1,7 @@ apiVersion: apis.kcp.io/v1alpha1 kind: APIResourceSchema metadata: - name: v260428-d075f2d48.workspaces.tenancy.kcp.io + name: v260522-2bf05df66.workspaces.tenancy.kcp.io spec: group: tenancy.kcp.io names: @@ -295,6 +295,7 @@ spec: - Initializing - Ready - Unavailable + - Inactive - Terminating - Deleting type: string diff --git a/pkg/authorization/workspace_content_authorizer.go b/pkg/authorization/workspace_content_authorizer.go index 14bdcd85742..79b906c86b0 100644 --- a/pkg/authorization/workspace_content_authorizer.go +++ b/pkg/authorization/workspace_content_authorizer.go @@ -112,6 +112,11 @@ func (a *workspaceContentAuthorizer) Authorize(ctx context.Context, attr authori switch logicalCluster.Status.Phase { case corev1alpha1.LogicalClusterPhaseInitializing, corev1alpha1.LogicalClusterPhaseReady, + // Inactive: WithBlockInactiveLogicalClusters denies content access at the HTTP filter + // layer while permitting /openapi and the LogicalCluster resource itself, so the cluster + // can be reactivated. The authorizer must not also deny based on phase, otherwise the + // LC GET/UPDATE used to clear the inactive annotation is rejected. + corev1alpha1.LogicalClusterPhaseInactive, // Terminating: registered terminator controllers are running and need to clean up content. // Deleting: terminators are done; standard kube finalization (GC, namespace deletion, // finalizer removal) still needs content access until the LogicalCluster object is gone. diff --git a/pkg/contextmanager/contextmanager.go b/pkg/contextmanager/contextmanager.go new file mode 100644 index 00000000000..d2262c3968f --- /dev/null +++ b/pkg/contextmanager/contextmanager.go @@ -0,0 +1,69 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package contextmanager + +import ( + "context" + "errors" + "fmt" +) + +var errShutdown = errors.New("context manager shut down") + +// Manager tracks contexts derived from the root context. +type Manager[K fmt.Stringer] struct { + rc *rootCtx +} + +// New creates a new context manager. +func New[K fmt.Stringer](root context.Context) *Manager[K] { + return &Manager[K]{rc: newRootCtx(root)} +} + +// Context returns a new context that is derived from parent. +// The context will be cancelled if either the manager's root context or the respective key context is cancelled. +func (m *Manager[K]) Context(parent context.Context, key K) (context.Context, context.CancelFunc) { + keyCtx, _ := m.rc.context(key.String()) + + ctx, cancel := context.WithCancelCause(parent) + stop := context.AfterFunc(keyCtx, func() { + cancel(context.Cause(keyCtx)) + }) + + cleanup := func() { + stop() + cancel(nil) + } + + return ctx, cleanup +} + +// Cancel cancels the context for the given key with reason. +// If no context exists for the key a context will be created and cancelled. +func (m *Manager[K]) Cancel(key K, reason error) { + m.rc.cancel(key.String(), reason) +} + +// Delete removes the entry for the given key, cancelling its context with reason. +func (m *Manager[K]) Delete(key K, reason error) { + m.rc.delete(key.String(), reason) +} + +// Shutdown cancels the root context, which propagates to all contexts. +func (m *Manager[K]) Shutdown() { + m.rc.cancelAll(errShutdown) +} diff --git a/pkg/contextmanager/doc.go b/pkg/contextmanager/doc.go new file mode 100644 index 00000000000..84f18c773ae --- /dev/null +++ b/pkg/contextmanager/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package contextmanager simulates multiple-parent contexts with sticky cancellation. +package contextmanager diff --git a/pkg/contextmanager/rootCtx.go b/pkg/contextmanager/rootCtx.go new file mode 100644 index 00000000000..c6fb7e5503f --- /dev/null +++ b/pkg/contextmanager/rootCtx.go @@ -0,0 +1,110 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package contextmanager + +import ( + "context" + "sync" + + "golang.org/x/sync/singleflight" +) + +// rootCtx abstracts the synchronisation of getting the shared keyed contexts. +type rootCtx struct { + root context.Context //nolint:containedctx + rootCancel context.CancelCauseFunc + // entries is the hot path quick lookup + entries sync.Map + // group is a singleflight group to synchronize slow path context + // creation + group singleflight.Group + // The mutex is to synchronize the slow path and context + // cancellation + lock sync.Mutex +} + +type entry struct { + ctx context.Context //nolint:containedctx + cancel context.CancelCauseFunc +} + +func newRootCtx(ctx context.Context) *rootCtx { + rc := new(rootCtx) + rc.root, rc.rootCancel = context.WithCancelCause(ctx) + return rc +} + +func (rc *rootCtx) context(key string) (context.Context, context.CancelCauseFunc) { + stored, loaded := rc.entries.Load(key) + if loaded { + return stored.(*entry).ctx, stored.(*entry).cancel + } + + // ignoring the error as the .Do only returns the error from the + // passed function + built, _, _ := rc.group.Do(key, func() (any, error) { + // locking to synchronize with the cancelFor + rc.lock.Lock() + defer rc.lock.Unlock() + + // Double check the stored entries + stored, loaded := rc.entries.Load(key) + if loaded { + return stored, nil + } + + // Create and store the entry + ctx, cancel := context.WithCancelCause(rc.root) + e := &entry{ctx: ctx, cancel: cancel} + rc.entries.Store(key, e) + return e, nil + }) + + return built.(*entry).ctx, built.(*entry).cancel +} + +func (rc *rootCtx) cancel(key string, reason error) { + rc.lock.Lock() + defer rc.lock.Unlock() + + if stored, loaded := rc.entries.Load(key); loaded { + stored.(*entry).cancel(reason) + return + } + + // store a pre-cancelled context so following calls don't get + // a fresh context. + ctx, cancel := context.WithCancelCause(rc.root) + cancel(reason) + e := &entry{ctx: ctx, cancel: cancel} + rc.entries.Store(key, e) +} + +func (rc *rootCtx) delete(key string, reason error) { + rc.lock.Lock() + defer rc.lock.Unlock() + + stored, loaded := rc.entries.LoadAndDelete(key) + if !loaded { + return + } + stored.(*entry).cancel(reason) +} + +func (rc *rootCtx) cancelAll(reason error) { + rc.rootCancel(reason) +} diff --git a/pkg/contextmanager/rootCtx_test.go b/pkg/contextmanager/rootCtx_test.go new file mode 100644 index 00000000000..1bebf0f213c --- /dev/null +++ b/pkg/contextmanager/rootCtx_test.go @@ -0,0 +1,207 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package contextmanager + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestRootCtx_ContextCreatesAndReturnsLive(t *testing.T) { + rc := newRootCtx(context.Background()) + + ctx, _ := rc.context("k") + if err := ctx.Err(); err != nil { + t.Fatalf("expected live context, got Err=%v", err) + } + + ctx2, _ := rc.context("k") + if ctx2 != ctx { + t.Fatalf("expected same context for same key, got different instances") + } +} + +func TestRootCtx_CancelIsSticky(t *testing.T) { + rc := newRootCtx(context.Background()) + + ctx1, _ := rc.context("k") + reason := errors.New("inactive") + rc.cancel("k", reason) + + if err := ctx1.Err(); err == nil { + t.Fatalf("expected first context to be cancelled, got nil") + } + if cause := context.Cause(ctx1); !errors.Is(cause, reason) { + t.Fatalf("expected cancellation cause %v, got %v", reason, cause) + } + + ctx2, _ := rc.context("k") + if ctx2 != ctx1 { + t.Fatalf("expected sticky cancelled context to be returned, got new instance") + } + if err := ctx2.Err(); err == nil { + t.Fatalf("expected sticky context to be cancelled") + } +} + +func TestRootCtx_CancelBeforeContextCreatesTombstone(t *testing.T) { + rc := newRootCtx(context.Background()) + + reason := errors.New("preemptive") + rc.cancel("k", reason) + + ctx, _ := rc.context("k") + if err := ctx.Err(); err == nil { + t.Fatalf("expected tombstoned context to be already cancelled") + } + if cause := context.Cause(ctx); !errors.Is(cause, reason) { + t.Fatalf("expected cancellation cause %v, got %v", reason, cause) + } +} + +func TestRootCtx_CancelIdempotent(t *testing.T) { + rc := newRootCtx(context.Background()) + + ctx, _ := rc.context("k") + rc.cancel("k", errors.New("first")) + firstCause := context.Cause(ctx) + + rc.cancel("k", errors.New("second")) + if got := context.Cause(ctx); got != firstCause { + t.Fatalf("cancel should be idempotent: cause changed from %v to %v", firstCause, got) + } +} + +func TestRootCtx_Delete(t *testing.T) { + rc := newRootCtx(context.Background()) + + ctx1, _ := rc.context("k") + reason := errors.New("migrated") + rc.delete("k", reason) + + if err := ctx1.Err(); err == nil { + t.Fatalf("delete should cancel the removed context") + } + if cause := context.Cause(ctx1); !errors.Is(cause, reason) { + t.Fatalf("expected cancellation cause %v, got %v", reason, cause) + } + + ctx2, _ := rc.context("k") + if ctx2 == ctx1 { + t.Fatalf("expected fresh context after delete, got the deleted one") + } + if err := ctx2.Err(); err != nil { + t.Fatalf("expected fresh live context after delete, got Err=%v", err) + } + + // delete on a missing key is a no-op. + rc.delete("missing", errors.New("nope")) +} + +func TestRootCtx_CancelAll(t *testing.T) { + rc := newRootCtx(context.Background()) + + ctxA, _ := rc.context("a") + ctxB, _ := rc.context("b") + + reason := errors.New("shutdown") + rc.cancelAll(reason) + + for _, c := range []context.Context{ctxA, ctxB} { + if err := c.Err(); err == nil { + t.Fatalf("expected derived context to be cancelled by cancelAll") + } + } +} + +func TestRootCtx_ConcurrentCreateDeduplicates(t *testing.T) { + rc := newRootCtx(context.Background()) + + const n = 64 + var wg sync.WaitGroup + results := make([]context.Context, n) + start := make(chan struct{}) + + for i := range n { + wg.Go(func() { + <-start + ctx, _ := rc.context("k") + results[i] = ctx //nolint:fatcontext // required for the text + }) + } + close(start) + wg.Wait() + + first := results[0] + for i, c := range results { + if c != first { + t.Fatalf("expected all goroutines to receive the same context, mismatch at index %d", i) + } + } +} + +func TestRootCtx_ConcurrentCancelAndContext(t *testing.T) { + rc := newRootCtx(context.Background()) + + // Pre-create the entry so the race is purely between cancel and + // concurrent reads, not against the slow-path create. + rc.context("k") + + const readers = 32 + var wg sync.WaitGroup + stop := make(chan struct{}) + var raced atomic.Bool + + for range readers { + wg.Go(func() { + for { + select { + case <-stop: + return + default: + } + rc.context("k") + } + }) + } + + // Let readers spin up. + time.Sleep(10 * time.Millisecond) + + rc.cancel("k", errors.New("inactive")) + + // After cancel returns, every subsequent context call must yield a + // cancelled context. Probe a bunch. + for range 1000 { + ctx, _ := rc.context("k") + if ctx.Err() == nil { + raced.Store(true) + break + } + } + + close(stop) + wg.Wait() + + if raced.Load() { + t.Fatalf("context returned a non-cancelled ctx after cancel(k)") + } +} diff --git a/pkg/reconciler/core/logicalcluster/logicalcluster_controller.go b/pkg/reconciler/core/logicalcluster/logicalcluster_controller.go index 9d4fc33b349..2c48c51cefd 100644 --- a/pkg/reconciler/core/logicalcluster/logicalcluster_controller.go +++ b/pkg/reconciler/core/logicalcluster/logicalcluster_controller.go @@ -30,12 +30,14 @@ import ( "k8s.io/klog/v2" kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" + "github.com/kcp-dev/logicalcluster/v3" corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" kcpclientset "github.com/kcp-dev/sdk/client/clientset/versioned/cluster" corev1alpha1client "github.com/kcp-dev/sdk/client/clientset/versioned/typed/core/v1alpha1" corev1alpha1informers "github.com/kcp-dev/sdk/client/informers/externalversions/core/v1alpha1" corev1alpha1listers "github.com/kcp-dev/sdk/client/listers/core/v1alpha1" + "github.com/kcp-dev/kcp/pkg/contextmanager" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/reconciler/committer" ) @@ -48,6 +50,7 @@ func NewController( shardExternalURL func() string, kcpClusterClient kcpclientset.ClusterInterface, logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer, + clusterContextManager *contextmanager.Manager[logicalcluster.Path], ) (*Controller, error) { c := &Controller{ queue: workqueue.NewTypedRateLimitingQueueWithConfig( @@ -60,6 +63,7 @@ func NewController( kcpClusterClient: kcpClusterClient, logicalClusterIndexer: logicalClusterInformer.Informer().GetIndexer(), logicalClusterLister: logicalClusterInformer.Lister(), + clusterContextManager: clusterContextManager, commit: committer.NewCommitter[*corev1alpha1.LogicalCluster, corev1alpha1client.LogicalClusterInterface, *corev1alpha1.LogicalClusterSpec, *corev1alpha1.LogicalClusterStatus](kcpClusterClient.CoreV1alpha1().LogicalClusters()), } _, _ = logicalClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -85,6 +89,8 @@ type Controller struct { logicalClusterIndexer cache.Indexer logicalClusterLister corev1alpha1listers.LogicalClusterClusterLister + clusterContextManager *contextmanager.Manager[logicalcluster.Path] + // commit creates a patch and submits it, if needed. commit func(ctx context.Context, old, new *logicalClusterResource) error } diff --git a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile.go b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile.go index da2eecab148..ea8b396c6fb 100644 --- a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile.go +++ b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile.go @@ -41,7 +41,7 @@ func (c *Controller) reconcile(ctx context.Context, logicalCluster *corev1alpha1 reconcilers := []reconciler{ &metaDataReconciler{}, &terminatorReconciler{}, - &phaseReconciler{}, + &phaseReconciler{clusterContextManager: c.clusterContextManager}, &urlReconciler{shardExternalURL: c.shardExternalURL}, } diff --git a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go index 5cc0fe4085a..272675f518f 100644 --- a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go +++ b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go @@ -18,14 +18,20 @@ package logicalcluster import ( "context" + "fmt" + "github.com/kcp-dev/logicalcluster/v3" corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" tenancyv1alpha1 "github.com/kcp-dev/sdk/apis/tenancy/v1alpha1" conditionsv1alpha1 "github.com/kcp-dev/sdk/apis/third_party/conditions/apis/conditions/v1alpha1" "github.com/kcp-dev/sdk/apis/third_party/conditions/util/conditions" + + "github.com/kcp-dev/kcp/pkg/contextmanager" ) -type phaseReconciler struct{} +type phaseReconciler struct { + clusterContextManager *contextmanager.Manager[logicalcluster.Path] +} func (r *phaseReconciler) reconcile(ctx context.Context, workspace *corev1alpha1.LogicalCluster) (reconcileStatus, error) { if !workspace.DeletionTimestamp.IsZero() { @@ -42,6 +48,9 @@ func (r *phaseReconciler) reconcile(ctx context.Context, workspace *corev1alpha1 default: if workspace.Status.Phase != corev1alpha1.LogicalClusterPhaseDeleting { workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseDeleting + // At this point access to the LC is no longer permitted, delete contexts for it. + lcPath := logicalcluster.From(workspace).Path() + r.clusterContextManager.Delete(lcPath, fmt.Errorf("logical cluster %s deleted", lcPath)) return reconcileStatusContinue, nil } } @@ -65,6 +74,25 @@ func (r *phaseReconciler) reconcile(ctx context.Context, workspace *corev1alpha1 workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseReady conditions.MarkTrue(workspace, tenancyv1alpha1.WorkspaceInitialized) + case corev1alpha1.LogicalClusterPhaseReady: + if corev1alpha1.IsLogicalClusterInactive(workspace.Annotations) { + workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseInactive + // Cancel active connections for this LC as well as wildcard + // connections, as they may watch objects in this LC. + lcPath := logicalcluster.From(workspace).Path() + reason := fmt.Errorf("logical cluster %s inactive", lcPath) + r.clusterContextManager.Cancel(lcPath, reason) + r.clusterContextManager.Cancel(logicalcluster.Wildcard, reason) + } + case corev1alpha1.LogicalClusterPhaseInactive: + if !corev1alpha1.IsLogicalClusterInactive(workspace.Annotations) { + workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseReady + // Drop the cancelled entries so the next request creates fresh live contexts. + lcPath := logicalcluster.From(workspace).Path() + reason := fmt.Errorf("logical cluster %s reactivated", lcPath) + r.clusterContextManager.Delete(lcPath, reason) + r.clusterContextManager.Delete(logicalcluster.Wildcard, reason) + } } return reconcileStatusContinue, nil diff --git a/pkg/server/config.go b/pkg/server/config.go index 5fede5f11ae..e0b0ee9bb24 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -68,6 +68,7 @@ import ( "github.com/kcp-dev/kcp/pkg/authentication" "github.com/kcp-dev/kcp/pkg/authorization" bootstrappolicy "github.com/kcp-dev/kcp/pkg/authorization/bootstrap" + "github.com/kcp-dev/kcp/pkg/contextmanager" kcpfeatures "github.com/kcp-dev/kcp/pkg/features" "github.com/kcp-dev/kcp/pkg/indexers" "github.com/kcp-dev/kcp/pkg/informer" @@ -132,6 +133,7 @@ type ExtraConfig struct { // misc preHandlerChainMux *handlerChainMuxes quotaAdmissionStopCh chan struct{} + ClusterContextManager *contextmanager.Manager[logicalcluster.Path] openAPIv3Controller *openapiv3.Controller openAPIv3ServiceCache *openapiv3.ServiceCache @@ -507,6 +509,8 @@ func NewConfig(ctx context.Context, opts kcpserveroptions.CompletedOptions) (*Co ) } + c.ExtraConfig.ClusterContextManager = contextmanager.New[logicalcluster.Path](ctx) + // preHandlerChainMux is called before the actual handler chain. Note that BuildHandlerChainFunc below // is called multiple times, but only one of the handler chain will actually be used. Hence, we wrap it // to give handlers below one mux.Handle func to call. @@ -591,6 +595,7 @@ func NewConfig(ctx context.Context, opts kcpserveroptions.CompletedOptions) (*Co apiHandler = filters.WithWarningRecorder(apiHandler) apiHandler = kcpfilters.WithAuditEventClusterAnnotation(apiHandler, c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters()) + apiHandler = kcpfilters.WithPerClusterContext(apiHandler, c.ClusterContextManager) apiHandler = kcpfilters.WithBlockInactiveLogicalClusters(apiHandler, c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters()) // Add a mux before the chain, for other handlers with their own handler chain to hook in. For example, when diff --git a/pkg/server/controllers.go b/pkg/server/controllers.go index e6031babd40..fccfe5bcd73 100644 --- a/pkg/server/controllers.go +++ b/pkg/server/controllers.go @@ -724,6 +724,7 @@ func (s *Server) installLogicalCluster(ctx context.Context, config *rest.Config) s.CompletedConfig.ShardExternalURL, kcpClusterClient, s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + s.CompletedConfig.ClusterContextManager, ) if err != nil { return err diff --git a/pkg/server/filters/inactivelogicalcluster.go b/pkg/server/filters/inactivelogicalcluster.go index b9f7ee7464a..50b9ec7798c 100644 --- a/pkg/server/filters/inactivelogicalcluster.go +++ b/pkg/server/filters/inactivelogicalcluster.go @@ -30,14 +30,12 @@ import ( corev1alpha1informers "github.com/kcp-dev/sdk/client/informers/externalversions/core/v1alpha1" ) -const ( - // InactiveAnnotation is the annotation denoting a logical cluster should be - // deemed unreachable. - InactiveAnnotation = "internal.kcp.io/inactive" -) - // WithBlockInactiveLogicalClusters ensures that any requests to logical // clusters marked inactive are rejected. +// +// The filter intentionally acts on the inactive annotation to deny +// access as early as possible, as opposed to the Inactive LC phase +// which is only set after the LC reconciler fires. func WithBlockInactiveLogicalClusters(handler http.Handler, kcpClusterClient corev1alpha1informers.LogicalClusterClusterInformer) http.HandlerFunc { allowedPathPrefixes := []string{ "/openapi", @@ -56,7 +54,7 @@ func WithBlockInactiveLogicalClusters(handler http.Handler, kcpClusterClient cor if cluster != nil && !cluster.Name.Empty() && !isException { logicalCluster, err := kcpClusterClient.Cluster(cluster.Name).Lister().Get(corev1alpha1.LogicalClusterName) if err == nil { - if ann, ok := logicalCluster.ObjectMeta.Annotations[InactiveAnnotation]; ok && ann == "true" { + if corev1alpha1.IsLogicalClusterInactive(logicalCluster.Annotations) { responsewriters.ErrorNegotiated( apierrors.NewForbidden(corev1alpha1.Resource("logicalclusters"), cluster.Name.String(), errors.New("logical cluster is marked inactive")), errorCodecs, schema.GroupVersion{}, w, req, diff --git a/pkg/server/filters/perclustercontext.go b/pkg/server/filters/perclustercontext.go new file mode 100644 index 00000000000..36e7e4e58fc --- /dev/null +++ b/pkg/server/filters/perclustercontext.go @@ -0,0 +1,92 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "net/http" + "strings" + + "k8s.io/apiserver/pkg/endpoints/request" + + "github.com/kcp-dev/logicalcluster/v3" + + "github.com/kcp-dev/kcp/pkg/contextmanager" +) + +// WithPerClusterContext injects a multiple-parent context for each request +// that is bound by the requests' context and the cluster-specific context. +// +// This handler must run only after the client-provided information has +// been normalized to a logical cluster id. +// +// This is used e.g. to cancel active connections when a logical cluster +// is being migrated. +func WithPerClusterContext(handler http.Handler, mgr *contextmanager.Manager[logicalcluster.Path]) http.HandlerFunc { + // TODO(ntnn): THis is the same list as for inactive logical clusters. + // Will look into deduplicating this when implementing lc migration. + // exemptPathPrefixes allows some paths to pass without adding a context. + exemptPathPrefixes := []string{ + // Kube clients expect the /openapi endpoint to be available to + // e.g. retrieve schemas. E.g. kubectl-edit fetches openapi + // specs to validate the edited resource. + "/openapi", + // logical cluster objects must still be editable to lifecylce the lc. + "/apis/core.kcp.io/v1alpha1/logicalclusters", + } + + return func(w http.ResponseWriter, req *http.Request) { + for _, prefix := range exemptPathPrefixes { + if strings.HasPrefix(req.URL.Path, prefix) { + handler.ServeHTTP(w, req) + return + } + } + + cluster := request.ClusterFrom(req.Context()) + + var clusterPath logicalcluster.Path + // Handling the differing cases in a switch to prevent crossing the logic. + switch { + case cluster == nil: + handler.ServeHTTP(w, req) + return + case cluster.Wildcard: + // Explicitly including wildcard requests. When a cluster is + // cancelled the wildcard connections must also be cancelled in + // case a wildcard watch targets objects in the affected logical + // cluster. + clusterPath = logicalcluster.Wildcard + case cluster.Name.Empty(): + // .Name.Empty must be checked after .Wildcard as .Name will + // be empty if .Wildcard is true. + handler.ServeHTTP(w, req) + return + case strings.HasPrefix(cluster.Name.String(), "system:"): + // The per-shard system workspaces do not need a context as + // they can't be migrated and shouldn't be put in inactive + // state. + handler.ServeHTTP(w, req) + return + default: + clusterPath = cluster.Name.Path() + } + + ctx, cleanup := mgr.Context(req.Context(), clusterPath) + defer cleanup() + handler.ServeHTTP(w, req.WithContext(ctx)) + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index e8173ec011f..1a9d04791a9 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -692,6 +692,12 @@ func (s *Server) Run(ctx context.Context) error { }); err != nil { return err } + if err := s.AddPreShutdownHook("kcp-cluster-context-manager", func() error { + s.ClusterContextManager.Shutdown() + return nil + }); err != nil { + return err + } if len(s.Options.Cache.Client.KubeconfigFile) == 0 { if err := s.installCacheServer(ctx); err != nil { return err diff --git a/staging/src/github.com/kcp-dev/sdk/apis/core/v1alpha1/logicalcluster_types.go b/staging/src/github.com/kcp-dev/sdk/apis/core/v1alpha1/logicalcluster_types.go index e7685ecba41..5a4e40c53fa 100644 --- a/staging/src/github.com/kcp-dev/sdk/apis/core/v1alpha1/logicalcluster_types.go +++ b/staging/src/github.com/kcp-dev/sdk/apis/core/v1alpha1/logicalcluster_types.go @@ -56,11 +56,25 @@ const ( // LogicalClusterFinalizerName attached to the owner of the LogicalCluster resource (usually a Workspace) so that we can control // deletion of LogicalCluster resources. LogicalClusterFinalizerName = "core.kcp.io/logicalcluster" + + // LogicalClusterInactiveAnnotationKey is the annotation denoting a logical + // cluster should be deemed unreachable. When set to "true" the + // active connections for the logical cluster will be cancelled and + // requests will be rejected. + // The phase of a logical cluster with this annotation is set to + // LogicalClusterPhaseInactive. + LogicalClusterInactiveAnnotationKey = "core.kcp.io/inactive" + + // LogicalClusterInactiveAnnotationKeyLegacy is the previous + // inactive annotation key and honoured for backwards compatibility. + // + // Deprecated: use LogicalClusterInactiveAnnotationKey. + LogicalClusterInactiveAnnotationKeyLegacy = "internal.kcp.io/inactive" ) // LogicalClusterPhaseType is the type of the current phase of the logical cluster. // -// +kubebuilder:validation:Enum=Scheduling;Initializing;Ready;Unavailable;Terminating;Deleting +// +kubebuilder:validation:Enum=Scheduling;Initializing;Ready;Unavailable;Inactive;Terminating;Deleting type LogicalClusterPhaseType string const ( @@ -73,6 +87,12 @@ const ( // This should be used when we really can't serve the logical cluster content and not some // temporary flakes, like readiness probe failing. LogicalClusterPhaseUnavailable LogicalClusterPhaseType = "Unavailable" + // LogicalClusterPhaseInactive phase indicates that the logical cluster has been + // intentionally taken offline (for example, during maintenance). + // This phase is driven by the LogicalClusterInactiveAnnotationKey annotation. + // This is distinct from Unavailable in so far that Inactive is an + // intentional admin decision, while Unavailable is caused by an error. + LogicalClusterPhaseInactive LogicalClusterPhaseType = "Inactive" // LogicalClusterPhaseTerminating phase is used to indicate that the logical cluster has a // DeletionTimestamp set and is waiting on terminator controllers to clean up workspace // content. The cluster is still served (the workspace content authorizer permits access) @@ -242,6 +262,15 @@ func (in *LogicalCluster) GetConditions() conditionsv1alpha1.Conditions { var _ conditions.Getter = &LogicalCluster{} var _ conditions.Setter = &LogicalCluster{} +// IsLogicalClusterInactive reports whether the given annotations mark a +// LogicalCluster as inactive. It accepts both the canonical +// LogicalClusterInactiveAnnotationKey and the deprecated +// LogicalClusterInactiveAnnotationKeyLegacy. +func IsLogicalClusterInactive(annotations map[string]string) bool { + return annotations[LogicalClusterInactiveAnnotationKey] == "true" || + annotations[LogicalClusterInactiveAnnotationKeyLegacy] == "true" +} + // LogicalClusterList is a list of LogicalCluster // // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/test/e2e/workspace/inactive_test.go b/test/e2e/workspace/inactive_test.go index 8ece5be0da5..bbab6ebaa16 100644 --- a/test/e2e/workspace/inactive_test.go +++ b/test/e2e/workspace/inactive_test.go @@ -24,42 +24,73 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" + "github.com/kcp-dev/logicalcluster/v3" "github.com/kcp-dev/sdk/apis/core" + corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" kcpclientset "github.com/kcp-dev/sdk/client/clientset/versioned/cluster" kcptesting "github.com/kcp-dev/sdk/testing" kcptestinghelpers "github.com/kcp-dev/sdk/testing/helpers" - "github.com/kcp-dev/kcp/pkg/server/filters" "github.com/kcp-dev/kcp/test/e2e/framework" ) -func TestInactiveLogicalCluster(t *testing.T) { - t.Parallel() - framework.Suite(t, "control-plane") - - server := kcptesting.SharedKcpServer(t) - cfg := server.BaseConfig(t) - orgPath, _ := kcptesting.NewWorkspaceFixture(t, server, core.RootCluster.Path(), kcptesting.WithType(core.RootCluster.Path(), "organization")) - - kcpClient, err := kcpclientset.NewForConfig(cfg) - require.NoError(t, err) - kubeClient, err := kcpkubernetesclientset.NewForConfig(cfg) - require.NoError(t, err) - +func setInactiveAnnotation(t *testing.T, kcpClient kcpclientset.ClusterInterface, orgPath logicalcluster.Path, value bool) { + t.Helper() kcptestinghelpers.Eventually(t, func() (bool, string) { lc, err := kcpClient.Cluster(orgPath).CoreV1alpha1().LogicalClusters().Get(t.Context(), "cluster", v1.GetOptions{}) if err != nil { return false, err.Error() } - lc.Annotations[filters.InactiveAnnotation] = "true" + if value { + lc.Annotations[corev1alpha1.LogicalClusterInactiveAnnotationKey] = "true" + } else { + delete(lc.Annotations, corev1alpha1.LogicalClusterInactiveAnnotationKey) + } _, err = kcpClient.Cluster(orgPath).CoreV1alpha1().LogicalClusters().Update(t.Context(), lc, v1.UpdateOptions{}) if err != nil { return false, err.Error() } return true, "" }, wait.ForeverTestTimeout, time.Millisecond*100) +} + +func drainAndExpectClose(t *testing.T, w watch.Interface) { + t.Helper() + for { + select { + case _, ok := <-w.ResultChan(): + if !ok { + return + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatal("watch was not terminated after marking logical cluster inactive") + } + } +} + +// TestInactiveLogicalClusterBlocksRequests checks that requests made +// against a logical cluster marked as inactive are failing and succeed +// after the inactive annotation is no longer "true". +func TestInactiveLogicalClusterBlocksRequests(t *testing.T) { + t.Parallel() + framework.Suite(t, "control-plane") + + // The test requires a separate server as marking a cluster inactive + // breaks wildcard watches which may affect other tests. + server := kcptesting.PrivateKcpServer(t) + cfg := server.BaseConfig(t) + orgPath, _ := kcptesting.NewWorkspaceFixture(t, server, core.RootCluster.Path(), kcptesting.WithType(core.RootCluster.Path(), "organization")) + + kcpClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err) + kubeClient, err := kcpkubernetesclientset.NewForConfig(cfg) + require.NoError(t, err) + + t.Log("Mark logical cluster inactive") + setInactiveAnnotation(t, kcpClient, orgPath, true) t.Log("Verify that normal requests fail") kcptestinghelpers.Eventually(t, func() (bool, string) { @@ -71,25 +102,93 @@ func TestInactiveLogicalCluster(t *testing.T) { }, wait.ForeverTestTimeout, time.Millisecond*100) t.Log("Remove inactive annotation again") + setInactiveAnnotation(t, kcpClient, orgPath, false) + + t.Log("Verify that normal requests succeed again") kcptestinghelpers.Eventually(t, func() (bool, string) { - lc, err := kcpClient.Cluster(orgPath).CoreV1alpha1().LogicalClusters().Get(t.Context(), "cluster", v1.GetOptions{}) - if err != nil { - return false, err.Error() - } - delete(lc.Annotations, filters.InactiveAnnotation) - _, err = kcpClient.Cluster(orgPath).CoreV1alpha1().LogicalClusters().Update(t.Context(), lc, v1.UpdateOptions{}) + _, err := kubeClient.Cluster(orgPath).CoreV1().Namespaces().List(t.Context(), v1.ListOptions{}) if err != nil { return false, err.Error() } return true, "" }, wait.ForeverTestTimeout, time.Millisecond*100) +} - t.Log("Verify that normal requests succeed again") +// TestInactiveLogicalClusterTerminatesClusterScopedWatch verifies that +// a cluster-scoped watch is cancelled when the cluster is marked as +// inactive and can be reestablished after the annotation is no longer +// "true". +func TestInactiveLogicalClusterTerminatesClusterScopedWatch(t *testing.T) { + t.Parallel() + framework.Suite(t, "control-plane") + + // The test requires a separate server as marking a cluster inactive + // breaks wildcard watches which may affect other tests. + server := kcptesting.PrivateKcpServer(t) + cfg := server.BaseConfig(t) + orgPath, _ := kcptesting.NewWorkspaceFixture(t, server, core.RootCluster.Path(), kcptesting.WithType(core.RootCluster.Path(), "organization")) + + kcpClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err) + kubeClient, err := kcpkubernetesclientset.NewForConfig(cfg) + require.NoError(t, err) + + t.Log("Opening watch on ConfigMaps before marking inactive") + watcher, err := kubeClient.Cluster(orgPath).CoreV1().ConfigMaps("default").Watch(t.Context(), v1.ListOptions{}) + require.NoError(t, err) + t.Cleanup(watcher.Stop) + + t.Log("Mark logical cluster inactive") + setInactiveAnnotation(t, kcpClient, orgPath, true) + + t.Log("Verify that the open watch is terminated") + drainAndExpectClose(t, watcher) + + t.Log("Remove inactive annotation again") + setInactiveAnnotation(t, kcpClient, orgPath, false) + + t.Log("Verify that a new watch can be opened after re-activation") kcptestinghelpers.Eventually(t, func() (bool, string) { - _, err := kubeClient.Cluster(orgPath).CoreV1().Namespaces().List(t.Context(), v1.ListOptions{}) + newWatcher, err := kubeClient.Cluster(orgPath).CoreV1().ConfigMaps("default").Watch(t.Context(), v1.ListOptions{}) if err != nil { return false, err.Error() } + t.Cleanup(newWatcher.Stop) return true, "" }, wait.ForeverTestTimeout, time.Millisecond*100) } + +// TestInactiveLogicalClusterTerminatesWildcardWatch does the same check +// as TestInactiveLogicalClusterTerminatesClusterScopedWatch but for +// a wildcard watch. +func TestInactiveLogicalClusterTerminatesWildcardWatch(t *testing.T) { + t.Parallel() + framework.Suite(t, "control-plane") + + // The test requires a separate server as marking a cluster inactive + // breaks wildcard watches which may affect other tests. + server := kcptesting.PrivateKcpServer(t) + cfg := server.BaseConfig(t) + orgPath, _ := kcptesting.NewWorkspaceFixture(t, server, core.RootCluster.Path(), kcptesting.WithType(core.RootCluster.Path(), "organization")) + + kcpClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err) + + wildcardCfg := server.RootShardSystemMasterBaseConfig(t) + wildcardKubeClient, err := kcpkubernetesclientset.NewForConfig(wildcardCfg) + require.NoError(t, err) + + t.Log("Opening wildcard watch on ConfigMaps before marking inactive") + watcher, err := wildcardKubeClient.CoreV1().ConfigMaps().Watch(t.Context(), v1.ListOptions{}) + require.NoError(t, err) + t.Cleanup(watcher.Stop) + + t.Log("Mark logical cluster inactive") + setInactiveAnnotation(t, kcpClient, orgPath, true) + + t.Log("Verify that the open wildcard watch is terminated") + drainAndExpectClose(t, watcher) + + t.Log("Remove inactive annotation again to leave the shared server clean") + setInactiveAnnotation(t, kcpClient, orgPath, false) +}