From ab145304b93f352d204d80a9d52193da349f829b Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Fri, 15 May 2026 08:38:46 +0200 Subject: [PATCH 01/18] Add pkg/contextmanager Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- pkg/contextmanager/contextmanager.go | 84 ++++++++++++++++++++++++++++ pkg/contextmanager/doc.go | 18 ++++++ 2 files changed, 102 insertions(+) create mode 100644 pkg/contextmanager/contextmanager.go create mode 100644 pkg/contextmanager/doc.go diff --git a/pkg/contextmanager/contextmanager.go b/pkg/contextmanager/contextmanager.go new file mode 100644 index 00000000000..338338c804a --- /dev/null +++ b/pkg/contextmanager/contextmanager.go @@ -0,0 +1,84 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package contextmanager + +import ( + "context" + "fmt" + "sync" +) + +// Manager tracks contexts derived from the root context. +type Manager[K comparable] struct { + root context.Context //nolint:containedctx + cancelRoot context.CancelCauseFunc + entries sync.Map // K → *entry +} + +type entry struct { + ctx context.Context //nolint:containedctx + cancel context.CancelCauseFunc +} + +// New creates a new context manager. +func New[K comparable](root context.Context) *Manager[K] { + ctx, cancel := context.WithCancelCause(root) + return &Manager[K]{root: ctx, cancelRoot: cancel} +} + +// ContextFor returns a new context that is derived from parent. +// The context will be cancelled if either the manager's root context or the respective key context is cancelled. +func (m *Manager[K]) ContextFor(parent context.Context, key K) (context.Context, context.CancelFunc) { + keyCtx := m.getContext(key) + + ctx, cancel := context.WithCancelCause(parent) + stop := context.AfterFunc(keyCtx, func() { + cancel(fmt.Errorf("%v cancelled", key)) + }) + + cleanup := func() { + stop() + cancel(nil) + } + + return ctx, cleanup +} + +func (m *Manager[K]) getContext(key K) context.Context { + ctx, cancel := context.WithCancelCause(m.root) + e := &entry{ctx: ctx, cancel: cancel} + + if actual, loaded := m.entries.LoadOrStore(key, e); loaded { + cancel(nil) + return actual.(*entry).ctx + } + return ctx +} + +// Cancel cancels the context for the given key. +func (m *Manager[K]) Cancel(key K) { + v, loaded := m.entries.LoadAndDelete(key) + if !loaded { + return + } + v.(*entry).cancel(fmt.Errorf("%v cancelled", key)) +} + +// CancelAll cancels the root context, which propagates to all contexts created by .ContextFor. +func (m *Manager[K]) CancelAll() { + m.cancelRoot(fmt.Errorf("context manager shut down")) +} diff --git a/pkg/contextmanager/doc.go b/pkg/contextmanager/doc.go new file mode 100644 index 00000000000..7e3c62c7ce5 --- /dev/null +++ b/pkg/contextmanager/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package contextmanager simulates contexts having multiple parents. +package contextmanager From c08faff37a0c76b612e5c892147ea728cbbe7d1b Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Fri, 15 May 2026 08:47:05 +0200 Subject: [PATCH 02/18] Add WithPerClusterContext handler Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- pkg/server/filters/perclustercontext.go | 62 +++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 pkg/server/filters/perclustercontext.go diff --git a/pkg/server/filters/perclustercontext.go b/pkg/server/filters/perclustercontext.go new file mode 100644 index 00000000000..feb6a428d22 --- /dev/null +++ b/pkg/server/filters/perclustercontext.go @@ -0,0 +1,62 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "net/http" + "strings" + + "k8s.io/apiserver/pkg/endpoints/request" + + "github.com/kcp-dev/logicalcluster/v3" + + "github.com/kcp-dev/kcp/pkg/contextmanager" +) + +// WithPerClusterContext injects a multiple-parent context for each request +// that is bound by the requests' context and the cluster-specific context. +// +// This handler must run only after the client-provided information has +// been normalized to a logical cluster id. +// +// This is used e.g. to cancel active connections when a logical cluster +// is being migrated. +func WithPerClusterContext(handler http.Handler, mgr *contextmanager.Manager[logicalcluster.Path]) http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + cluster := request.ClusterFrom(req.Context()) + // Explicitly including wildcard requests. When a cluster is + // cancelled the wildcard connections must also be cancelled in + // case a wildcard watch targets objects in the affected logical + // cluster. + if cluster == nil || cluster.Name.Empty() || strings.HasPrefix(cluster.Name.String(), "system:") { + handler.ServeHTTP(w, req) + return + } + + var clusterPath logicalcluster.Path + switch { + case cluster.Wildcard: + clusterPath = logicalcluster.Wildcard + default: + clusterPath = cluster.Name.Path() + } + + ctx, cleanup := mgr.ContextFor(req.Context(), clusterPath) + defer cleanup() + handler.ServeHTTP(w, req.WithContext(ctx)) + } +} From 5cefb9e89b412f62c281dc9dc5cb0a9b5691e09d Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Fri, 15 May 2026 08:59:04 +0200 Subject: [PATCH 03/18] Wire contextmanager into shard Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- pkg/server/config.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/server/config.go b/pkg/server/config.go index 5fede5f11ae..e0b0ee9bb24 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -68,6 +68,7 @@ import ( "github.com/kcp-dev/kcp/pkg/authentication" "github.com/kcp-dev/kcp/pkg/authorization" bootstrappolicy "github.com/kcp-dev/kcp/pkg/authorization/bootstrap" + "github.com/kcp-dev/kcp/pkg/contextmanager" kcpfeatures "github.com/kcp-dev/kcp/pkg/features" "github.com/kcp-dev/kcp/pkg/indexers" "github.com/kcp-dev/kcp/pkg/informer" @@ -132,6 +133,7 @@ type ExtraConfig struct { // misc preHandlerChainMux *handlerChainMuxes quotaAdmissionStopCh chan struct{} + ClusterContextManager *contextmanager.Manager[logicalcluster.Path] openAPIv3Controller *openapiv3.Controller openAPIv3ServiceCache *openapiv3.ServiceCache @@ -507,6 +509,8 @@ func NewConfig(ctx context.Context, opts kcpserveroptions.CompletedOptions) (*Co ) } + c.ExtraConfig.ClusterContextManager = contextmanager.New[logicalcluster.Path](ctx) + // preHandlerChainMux is called before the actual handler chain. Note that BuildHandlerChainFunc below // is called multiple times, but only one of the handler chain will actually be used. Hence, we wrap it // to give handlers below one mux.Handle func to call. @@ -591,6 +595,7 @@ func NewConfig(ctx context.Context, opts kcpserveroptions.CompletedOptions) (*Co apiHandler = filters.WithWarningRecorder(apiHandler) apiHandler = kcpfilters.WithAuditEventClusterAnnotation(apiHandler, c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters()) + apiHandler = kcpfilters.WithPerClusterContext(apiHandler, c.ClusterContextManager) apiHandler = kcpfilters.WithBlockInactiveLogicalClusters(apiHandler, c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters()) // Add a mux before the chain, for other handlers with their own handler chain to hook in. For example, when From ca38e68cb558172d309f908cbd3b58bb81544309 Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Thu, 21 May 2026 09:38:02 +0200 Subject: [PATCH 04/18] Cancel connections when a logical cluster is marked inactive Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- .../logicalcluster_controller.go | 6 +++ .../logicalcluster_reconcile.go | 1 + .../logicalcluster_reconcile_inactive.go | 43 +++++++++++++++++++ pkg/server/controllers.go | 1 + 4 files changed, 51 insertions(+) create mode 100644 pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_inactive.go diff --git a/pkg/reconciler/core/logicalcluster/logicalcluster_controller.go b/pkg/reconciler/core/logicalcluster/logicalcluster_controller.go index 9d4fc33b349..2c48c51cefd 100644 --- a/pkg/reconciler/core/logicalcluster/logicalcluster_controller.go +++ b/pkg/reconciler/core/logicalcluster/logicalcluster_controller.go @@ -30,12 +30,14 @@ import ( "k8s.io/klog/v2" kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" + "github.com/kcp-dev/logicalcluster/v3" corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" kcpclientset "github.com/kcp-dev/sdk/client/clientset/versioned/cluster" corev1alpha1client "github.com/kcp-dev/sdk/client/clientset/versioned/typed/core/v1alpha1" corev1alpha1informers "github.com/kcp-dev/sdk/client/informers/externalversions/core/v1alpha1" corev1alpha1listers "github.com/kcp-dev/sdk/client/listers/core/v1alpha1" + "github.com/kcp-dev/kcp/pkg/contextmanager" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/reconciler/committer" ) @@ -48,6 +50,7 @@ func NewController( shardExternalURL func() string, kcpClusterClient kcpclientset.ClusterInterface, logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer, + clusterContextManager *contextmanager.Manager[logicalcluster.Path], ) (*Controller, error) { c := &Controller{ queue: workqueue.NewTypedRateLimitingQueueWithConfig( @@ -60,6 +63,7 @@ func NewController( kcpClusterClient: kcpClusterClient, logicalClusterIndexer: logicalClusterInformer.Informer().GetIndexer(), logicalClusterLister: logicalClusterInformer.Lister(), + clusterContextManager: clusterContextManager, commit: committer.NewCommitter[*corev1alpha1.LogicalCluster, corev1alpha1client.LogicalClusterInterface, *corev1alpha1.LogicalClusterSpec, *corev1alpha1.LogicalClusterStatus](kcpClusterClient.CoreV1alpha1().LogicalClusters()), } _, _ = logicalClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -85,6 +89,8 @@ type Controller struct { logicalClusterIndexer cache.Indexer logicalClusterLister corev1alpha1listers.LogicalClusterClusterLister + clusterContextManager *contextmanager.Manager[logicalcluster.Path] + // commit creates a patch and submits it, if needed. commit func(ctx context.Context, old, new *logicalClusterResource) error } diff --git a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile.go b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile.go index da2eecab148..9946e77664f 100644 --- a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile.go +++ b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile.go @@ -43,6 +43,7 @@ func (c *Controller) reconcile(ctx context.Context, logicalCluster *corev1alpha1 &terminatorReconciler{}, &phaseReconciler{}, &urlReconciler{shardExternalURL: c.shardExternalURL}, + &inactiveReconciler{clusterContextManager: c.clusterContextManager}, } var errs []error diff --git a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_inactive.go b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_inactive.go new file mode 100644 index 00000000000..37c2dfc7408 --- /dev/null +++ b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_inactive.go @@ -0,0 +1,43 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logicalcluster + +import ( + "context" + + "github.com/kcp-dev/logicalcluster/v3" + corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" + + "github.com/kcp-dev/kcp/pkg/contextmanager" + "github.com/kcp-dev/kcp/pkg/server/filters" +) + +type inactiveReconciler struct { + clusterContextManager *contextmanager.Manager[logicalcluster.Path] +} + +func (r *inactiveReconciler) reconcile(ctx context.Context, logicalCluster *corev1alpha1.LogicalCluster) (reconcileStatus, error) { + if r.clusterContextManager == nil { + return reconcileStatusContinue, nil + } + if logicalCluster.Annotations[filters.InactiveAnnotation] == "true" { + // Cancel connections for this cluster and wildcard connections. + r.clusterContextManager.Cancel(logicalcluster.From(logicalCluster).Path()) + r.clusterContextManager.Cancel(logicalcluster.Wildcard) + } + return reconcileStatusContinue, nil +} diff --git a/pkg/server/controllers.go b/pkg/server/controllers.go index e6031babd40..fccfe5bcd73 100644 --- a/pkg/server/controllers.go +++ b/pkg/server/controllers.go @@ -724,6 +724,7 @@ func (s *Server) installLogicalCluster(ctx context.Context, config *rest.Config) s.CompletedConfig.ShardExternalURL, kcpClusterClient, s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), + s.CompletedConfig.ClusterContextManager, ) if err != nil { return err From 316cd8a186d10096c6b9ff19d38589ea15ca8998 Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Thu, 21 May 2026 09:43:52 +0200 Subject: [PATCH 05/18] Update inactive logical cluster test to verify cancelling client connections Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- test/e2e/workspace/inactive_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/test/e2e/workspace/inactive_test.go b/test/e2e/workspace/inactive_test.go index 8ece5be0da5..14cc45613fa 100644 --- a/test/e2e/workspace/inactive_test.go +++ b/test/e2e/workspace/inactive_test.go @@ -48,6 +48,11 @@ func TestInactiveLogicalCluster(t *testing.T) { kubeClient, err := kcpkubernetesclientset.NewForConfig(cfg) require.NoError(t, err) + t.Log("Opening watch on ConfigMaps before marking inactive") + watcher, err := kubeClient.Cluster(orgPath).CoreV1().ConfigMaps("default").Watch(t.Context(), v1.ListOptions{}) + require.NoError(t, err) + t.Cleanup(watcher.Stop) + kcptestinghelpers.Eventually(t, func() (bool, string) { lc, err := kcpClient.Cluster(orgPath).CoreV1alpha1().LogicalClusters().Get(t.Context(), "cluster", v1.GetOptions{}) if err != nil { @@ -70,6 +75,19 @@ func TestInactiveLogicalCluster(t *testing.T) { return true, "" }, wait.ForeverTestTimeout, time.Millisecond*100) + t.Log("Verify that the open watch is terminated") +drain: + for { + select { + case _, ok := <-watcher.ResultChan(): + if !ok { + break drain + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatal("watch was not terminated after marking logical cluster inactive") + } + } + t.Log("Remove inactive annotation again") kcptestinghelpers.Eventually(t, func() (bool, string) { lc, err := kcpClient.Cluster(orgPath).CoreV1alpha1().LogicalClusters().Get(t.Context(), "cluster", v1.GetOptions{}) @@ -92,4 +110,9 @@ func TestInactiveLogicalCluster(t *testing.T) { } return true, "" }, wait.ForeverTestTimeout, time.Millisecond*100) + + t.Log("Verify that a new watch can be opened after re-activation") + newWatcher, err := kubeClient.Cluster(orgPath).CoreV1().ConfigMaps("default").Watch(t.Context(), v1.ListOptions{}) + require.NoError(t, err) + t.Cleanup(newWatcher.Stop) } From 975b2b529808405d406b737fddb60eca4c7ff012 Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Thu, 21 May 2026 11:11:55 +0200 Subject: [PATCH 06/18] Add LogicalClusterPhaseInactive Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- pkg/authorization/workspace_content_authorizer.go | 5 +++++ .../sdk/apis/core/v1alpha1/logicalcluster_types.go | 8 +++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/authorization/workspace_content_authorizer.go b/pkg/authorization/workspace_content_authorizer.go index 14bdcd85742..79b906c86b0 100644 --- a/pkg/authorization/workspace_content_authorizer.go +++ b/pkg/authorization/workspace_content_authorizer.go @@ -112,6 +112,11 @@ func (a *workspaceContentAuthorizer) Authorize(ctx context.Context, attr authori switch logicalCluster.Status.Phase { case corev1alpha1.LogicalClusterPhaseInitializing, corev1alpha1.LogicalClusterPhaseReady, + // Inactive: WithBlockInactiveLogicalClusters denies content access at the HTTP filter + // layer while permitting /openapi and the LogicalCluster resource itself, so the cluster + // can be reactivated. The authorizer must not also deny based on phase, otherwise the + // LC GET/UPDATE used to clear the inactive annotation is rejected. + corev1alpha1.LogicalClusterPhaseInactive, // Terminating: registered terminator controllers are running and need to clean up content. // Deleting: terminators are done; standard kube finalization (GC, namespace deletion, // finalizer removal) still needs content access until the LogicalCluster object is gone. diff --git a/staging/src/github.com/kcp-dev/sdk/apis/core/v1alpha1/logicalcluster_types.go b/staging/src/github.com/kcp-dev/sdk/apis/core/v1alpha1/logicalcluster_types.go index e7685ecba41..44e27fabca0 100644 --- a/staging/src/github.com/kcp-dev/sdk/apis/core/v1alpha1/logicalcluster_types.go +++ b/staging/src/github.com/kcp-dev/sdk/apis/core/v1alpha1/logicalcluster_types.go @@ -60,7 +60,7 @@ const ( // LogicalClusterPhaseType is the type of the current phase of the logical cluster. // -// +kubebuilder:validation:Enum=Scheduling;Initializing;Ready;Unavailable;Terminating;Deleting +// +kubebuilder:validation:Enum=Scheduling;Initializing;Ready;Unavailable;Inactive;Terminating;Deleting type LogicalClusterPhaseType string const ( @@ -73,6 +73,12 @@ const ( // This should be used when we really can't serve the logical cluster content and not some // temporary flakes, like readiness probe failing. LogicalClusterPhaseUnavailable LogicalClusterPhaseType = "Unavailable" + // LogicalClusterPhaseInactive phase indicates that the logical cluster has been + // intentionally taken offline (for example, during maintenance). + // This phase is driven by the internal.kcp.io/inactive annotation. + // This is distinct from Unavailable in so far that Inactive is an + // intentional admin decision, while Unavailable is caused by an error. + LogicalClusterPhaseInactive LogicalClusterPhaseType = "Inactive" // LogicalClusterPhaseTerminating phase is used to indicate that the logical cluster has a // DeletionTimestamp set and is waiting on terminator controllers to clean up workspace // content. The cluster is still served (the workspace content authorizer permits access) From 9530809f695bbd1910c000315a2005e853d37df7 Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Thu, 21 May 2026 11:13:47 +0200 Subject: [PATCH 07/18] Add comment on the inactive logical cluster annotation Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- pkg/server/filters/inactivelogicalcluster.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/server/filters/inactivelogicalcluster.go b/pkg/server/filters/inactivelogicalcluster.go index b9f7ee7464a..e19b5e38306 100644 --- a/pkg/server/filters/inactivelogicalcluster.go +++ b/pkg/server/filters/inactivelogicalcluster.go @@ -38,6 +38,10 @@ const ( // WithBlockInactiveLogicalClusters ensures that any requests to logical // clusters marked inactive are rejected. +// +// The filter intentionally acts on the InactiveAnnotation to deny +// access as early as possible, as opposed to the Inactive LC phase +// which is only set after the LC reconciler fires. func WithBlockInactiveLogicalClusters(handler http.Handler, kcpClusterClient corev1alpha1informers.LogicalClusterClusterInformer) http.HandlerFunc { allowedPathPrefixes := []string{ "/openapi", From 1b31b8c20890a4193cf4c9afbd13d53bfd1cbd5e Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Thu, 21 May 2026 11:14:12 +0200 Subject: [PATCH 08/18] Add contextmanager.Manager.Has Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- pkg/contextmanager/contextmanager.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/contextmanager/contextmanager.go b/pkg/contextmanager/contextmanager.go index 338338c804a..dd16f5b2317 100644 --- a/pkg/contextmanager/contextmanager.go +++ b/pkg/contextmanager/contextmanager.go @@ -69,6 +69,11 @@ func (m *Manager[K]) getContext(key K) context.Context { return ctx } +func (m *Manager[K]) Has(key K) bool { + _, ok := m.entries.Load(key) + return ok +} + // Cancel cancels the context for the given key. func (m *Manager[K]) Cancel(key K) { v, loaded := m.entries.LoadAndDelete(key) From 156e0b75b6032387a584a6221dd3c5af4051b4f6 Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Thu, 21 May 2026 11:15:40 +0200 Subject: [PATCH 09/18] Resolve inactiveReconciler into phaseReconciler Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- .../logicalcluster_reconcile.go | 3 +- .../logicalcluster_reconcile_inactive.go | 43 ------------------- .../logicalcluster_reconcile_phase.go | 23 +++++++++- 3 files changed, 23 insertions(+), 46 deletions(-) delete mode 100644 pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_inactive.go diff --git a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile.go b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile.go index 9946e77664f..ea8b396c6fb 100644 --- a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile.go +++ b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile.go @@ -41,9 +41,8 @@ func (c *Controller) reconcile(ctx context.Context, logicalCluster *corev1alpha1 reconcilers := []reconciler{ &metaDataReconciler{}, &terminatorReconciler{}, - &phaseReconciler{}, + &phaseReconciler{clusterContextManager: c.clusterContextManager}, &urlReconciler{shardExternalURL: c.shardExternalURL}, - &inactiveReconciler{clusterContextManager: c.clusterContextManager}, } var errs []error diff --git a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_inactive.go b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_inactive.go deleted file mode 100644 index 37c2dfc7408..00000000000 --- a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_inactive.go +++ /dev/null @@ -1,43 +0,0 @@ -/* -Copyright 2026 The kcp Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package logicalcluster - -import ( - "context" - - "github.com/kcp-dev/logicalcluster/v3" - corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" - - "github.com/kcp-dev/kcp/pkg/contextmanager" - "github.com/kcp-dev/kcp/pkg/server/filters" -) - -type inactiveReconciler struct { - clusterContextManager *contextmanager.Manager[logicalcluster.Path] -} - -func (r *inactiveReconciler) reconcile(ctx context.Context, logicalCluster *corev1alpha1.LogicalCluster) (reconcileStatus, error) { - if r.clusterContextManager == nil { - return reconcileStatusContinue, nil - } - if logicalCluster.Annotations[filters.InactiveAnnotation] == "true" { - // Cancel connections for this cluster and wildcard connections. - r.clusterContextManager.Cancel(logicalcluster.From(logicalCluster).Path()) - r.clusterContextManager.Cancel(logicalcluster.Wildcard) - } - return reconcileStatusContinue, nil -} diff --git a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go index 5cc0fe4085a..68b62c5436c 100644 --- a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go +++ b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go @@ -19,13 +19,19 @@ package logicalcluster import ( "context" + "github.com/kcp-dev/logicalcluster/v3" corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" tenancyv1alpha1 "github.com/kcp-dev/sdk/apis/tenancy/v1alpha1" conditionsv1alpha1 "github.com/kcp-dev/sdk/apis/third_party/conditions/apis/conditions/v1alpha1" "github.com/kcp-dev/sdk/apis/third_party/conditions/util/conditions" + + "github.com/kcp-dev/kcp/pkg/contextmanager" + "github.com/kcp-dev/kcp/pkg/server/filters" ) -type phaseReconciler struct{} +type phaseReconciler struct { + clusterContextManager *contextmanager.Manager[logicalcluster.Path] +} func (r *phaseReconciler) reconcile(ctx context.Context, workspace *corev1alpha1.LogicalCluster) (reconcileStatus, error) { if !workspace.DeletionTimestamp.IsZero() { @@ -42,6 +48,8 @@ func (r *phaseReconciler) reconcile(ctx context.Context, workspace *corev1alpha1 default: if workspace.Status.Phase != corev1alpha1.LogicalClusterPhaseDeleting { workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseDeleting + // At this point access to the LC is no longer permitted, cancel contexts. + r.clusterContextManager.Cancel(logicalcluster.From(workspace).Path()) return reconcileStatusContinue, nil } } @@ -65,6 +73,19 @@ func (r *phaseReconciler) reconcile(ctx context.Context, workspace *corev1alpha1 workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseReady conditions.MarkTrue(workspace, tenancyv1alpha1.WorkspaceInitialized) + case corev1alpha1.LogicalClusterPhaseReady: + if workspace.Annotations[filters.InactiveAnnotation] == "true" { + workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseInactive + // Cancel active connections for this LC as well as wildcard + // connections, as they may watch objects in this LC. + lcPath := logicalcluster.From(workspace).Path() + r.clusterContextManager.Cancel(lcPath) + r.clusterContextManager.Cancel(logicalcluster.Wildcard) + } + case corev1alpha1.LogicalClusterPhaseInactive: + if workspace.Annotations[filters.InactiveAnnotation] != "true" { + workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseReady + } } return reconcileStatusContinue, nil From de6d6019a844dda2a458ff4f273943bb5474b9bc Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Thu, 21 May 2026 11:44:21 +0200 Subject: [PATCH 10/18] Wire closing cluster contexts into server shutdown Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- pkg/server/server.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/server/server.go b/pkg/server/server.go index e8173ec011f..0390c7d5d4f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -692,6 +692,12 @@ func (s *Server) Run(ctx context.Context) error { }); err != nil { return err } + if err := s.AddPreShutdownHook("kcp-cluster-context-manager", func() error { + s.ClusterContextManager.CancelAll() + return nil + }); err != nil { + return err + } if len(s.Options.Cache.Client.KubeconfigFile) == 0 { if err := s.installCacheServer(ctx); err != nil { return err From 55d3822ff944a0d7b2ac171f17df9ecfd8d648ca Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Thu, 21 May 2026 11:57:17 +0200 Subject: [PATCH 11/18] Add a fast path for getting a context for a key Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- pkg/contextmanager/contextmanager.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/contextmanager/contextmanager.go b/pkg/contextmanager/contextmanager.go index dd16f5b2317..a8a51591cfc 100644 --- a/pkg/contextmanager/contextmanager.go +++ b/pkg/contextmanager/contextmanager.go @@ -59,14 +59,22 @@ func (m *Manager[K]) ContextFor(parent context.Context, key K) (context.Context, } func (m *Manager[K]) getContext(key K) context.Context { + // Fast path - a context exists for the key + if stored, loaded := m.entries.Load(key); loaded { + return stored.(*entry).ctx + } + + // Slow path - a context does not exist ctx, cancel := context.WithCancelCause(m.root) e := &entry{ctx: ctx, cancel: cancel} - if actual, loaded := m.entries.LoadOrStore(key, e); loaded { + stored, loaded := m.entries.LoadOrStore(key, e) + if loaded { + // If loaded is true a value was already stored, cancel the + // intermitteent context and return the stored value cancel(nil) - return actual.(*entry).ctx } - return ctx + return stored.(*entry).ctx } func (m *Manager[K]) Has(key K) bool { From 64018720b10264950890f05faa9375436edc7e5a Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Fri, 22 May 2026 10:59:35 +0200 Subject: [PATCH 12/18] Refactor contextmanager Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- pkg/contextmanager/contextmanager.go | 66 ++---- pkg/contextmanager/doc.go | 2 +- pkg/contextmanager/rootCtx.go | 110 ++++++++++ pkg/contextmanager/rootCtx_test.go | 207 ++++++++++++++++++ .../logicalcluster_reconcile_phase.go | 16 +- pkg/server/server.go | 2 +- 6 files changed, 350 insertions(+), 53 deletions(-) create mode 100644 pkg/contextmanager/rootCtx.go create mode 100644 pkg/contextmanager/rootCtx_test.go diff --git a/pkg/contextmanager/contextmanager.go b/pkg/contextmanager/contextmanager.go index a8a51591cfc..51db93dcd11 100644 --- a/pkg/contextmanager/contextmanager.go +++ b/pkg/contextmanager/contextmanager.go @@ -18,36 +18,30 @@ package contextmanager import ( "context" + "errors" "fmt" - "sync" ) +var errShutdown = errors.New("context manager shut down") + // Manager tracks contexts derived from the root context. type Manager[K comparable] struct { - root context.Context //nolint:containedctx - cancelRoot context.CancelCauseFunc - entries sync.Map // K → *entry -} - -type entry struct { - ctx context.Context //nolint:containedctx - cancel context.CancelCauseFunc + rc *rootCtx } // New creates a new context manager. func New[K comparable](root context.Context) *Manager[K] { - ctx, cancel := context.WithCancelCause(root) - return &Manager[K]{root: ctx, cancelRoot: cancel} + return &Manager[K]{rc: newRootCtx(root)} } -// ContextFor returns a new context that is derived from parent. +// Context returns a new context that is derived from parent. // The context will be cancelled if either the manager's root context or the respective key context is cancelled. -func (m *Manager[K]) ContextFor(parent context.Context, key K) (context.Context, context.CancelFunc) { - keyCtx := m.getContext(key) +func (m *Manager[K]) Context(parent context.Context, key K) (context.Context, context.CancelFunc) { + keyCtx, _ := m.rc.context(fmt.Sprint(key)) ctx, cancel := context.WithCancelCause(parent) stop := context.AfterFunc(keyCtx, func() { - cancel(fmt.Errorf("%v cancelled", key)) + cancel(context.Cause(keyCtx)) }) cleanup := func() { @@ -58,40 +52,18 @@ func (m *Manager[K]) ContextFor(parent context.Context, key K) (context.Context, return ctx, cleanup } -func (m *Manager[K]) getContext(key K) context.Context { - // Fast path - a context exists for the key - if stored, loaded := m.entries.Load(key); loaded { - return stored.(*entry).ctx - } - - // Slow path - a context does not exist - ctx, cancel := context.WithCancelCause(m.root) - e := &entry{ctx: ctx, cancel: cancel} - - stored, loaded := m.entries.LoadOrStore(key, e) - if loaded { - // If loaded is true a value was already stored, cancel the - // intermitteent context and return the stored value - cancel(nil) - } - return stored.(*entry).ctx +// Cancel cancels the context for the given key with reason. +// If no context exists for the key a context will be created and cancelled. +func (m *Manager[K]) Cancel(key K, reason error) { + m.rc.cancel(fmt.Sprint(key), reason) } -func (m *Manager[K]) Has(key K) bool { - _, ok := m.entries.Load(key) - return ok -} - -// Cancel cancels the context for the given key. -func (m *Manager[K]) Cancel(key K) { - v, loaded := m.entries.LoadAndDelete(key) - if !loaded { - return - } - v.(*entry).cancel(fmt.Errorf("%v cancelled", key)) +// Delete removes the entry for the given key, cancelling its context with reason. +func (m *Manager[K]) Delete(key K, reason error) { + m.rc.delete(fmt.Sprint(key), reason) } -// CancelAll cancels the root context, which propagates to all contexts created by .ContextFor. -func (m *Manager[K]) CancelAll() { - m.cancelRoot(fmt.Errorf("context manager shut down")) +// Shutdown cancels the root context, which propagates to all contexts. +func (m *Manager[K]) Shutdown() { + m.rc.cancelAll(errShutdown) } diff --git a/pkg/contextmanager/doc.go b/pkg/contextmanager/doc.go index 7e3c62c7ce5..84f18c773ae 100644 --- a/pkg/contextmanager/doc.go +++ b/pkg/contextmanager/doc.go @@ -14,5 +14,5 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package contextmanager simulates contexts having multiple parents. +// Package contextmanager simulates multiple-parent contexts with sticky cancellation. package contextmanager diff --git a/pkg/contextmanager/rootCtx.go b/pkg/contextmanager/rootCtx.go new file mode 100644 index 00000000000..c6fb7e5503f --- /dev/null +++ b/pkg/contextmanager/rootCtx.go @@ -0,0 +1,110 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package contextmanager + +import ( + "context" + "sync" + + "golang.org/x/sync/singleflight" +) + +// rootCtx abstracts the synchronisation of getting the shared keyed contexts. +type rootCtx struct { + root context.Context //nolint:containedctx + rootCancel context.CancelCauseFunc + // entries is the hot path quick lookup + entries sync.Map + // group is a singleflight group to synchronize slow path context + // creation + group singleflight.Group + // The mutex is to synchronize the slow path and context + // cancellation + lock sync.Mutex +} + +type entry struct { + ctx context.Context //nolint:containedctx + cancel context.CancelCauseFunc +} + +func newRootCtx(ctx context.Context) *rootCtx { + rc := new(rootCtx) + rc.root, rc.rootCancel = context.WithCancelCause(ctx) + return rc +} + +func (rc *rootCtx) context(key string) (context.Context, context.CancelCauseFunc) { + stored, loaded := rc.entries.Load(key) + if loaded { + return stored.(*entry).ctx, stored.(*entry).cancel + } + + // ignoring the error as the .Do only returns the error from the + // passed function + built, _, _ := rc.group.Do(key, func() (any, error) { + // locking to synchronize with the cancelFor + rc.lock.Lock() + defer rc.lock.Unlock() + + // Double check the stored entries + stored, loaded := rc.entries.Load(key) + if loaded { + return stored, nil + } + + // Create and store the entry + ctx, cancel := context.WithCancelCause(rc.root) + e := &entry{ctx: ctx, cancel: cancel} + rc.entries.Store(key, e) + return e, nil + }) + + return built.(*entry).ctx, built.(*entry).cancel +} + +func (rc *rootCtx) cancel(key string, reason error) { + rc.lock.Lock() + defer rc.lock.Unlock() + + if stored, loaded := rc.entries.Load(key); loaded { + stored.(*entry).cancel(reason) + return + } + + // store a pre-cancelled context so following calls don't get + // a fresh context. + ctx, cancel := context.WithCancelCause(rc.root) + cancel(reason) + e := &entry{ctx: ctx, cancel: cancel} + rc.entries.Store(key, e) +} + +func (rc *rootCtx) delete(key string, reason error) { + rc.lock.Lock() + defer rc.lock.Unlock() + + stored, loaded := rc.entries.LoadAndDelete(key) + if !loaded { + return + } + stored.(*entry).cancel(reason) +} + +func (rc *rootCtx) cancelAll(reason error) { + rc.rootCancel(reason) +} diff --git a/pkg/contextmanager/rootCtx_test.go b/pkg/contextmanager/rootCtx_test.go new file mode 100644 index 00000000000..1bebf0f213c --- /dev/null +++ b/pkg/contextmanager/rootCtx_test.go @@ -0,0 +1,207 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package contextmanager + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestRootCtx_ContextCreatesAndReturnsLive(t *testing.T) { + rc := newRootCtx(context.Background()) + + ctx, _ := rc.context("k") + if err := ctx.Err(); err != nil { + t.Fatalf("expected live context, got Err=%v", err) + } + + ctx2, _ := rc.context("k") + if ctx2 != ctx { + t.Fatalf("expected same context for same key, got different instances") + } +} + +func TestRootCtx_CancelIsSticky(t *testing.T) { + rc := newRootCtx(context.Background()) + + ctx1, _ := rc.context("k") + reason := errors.New("inactive") + rc.cancel("k", reason) + + if err := ctx1.Err(); err == nil { + t.Fatalf("expected first context to be cancelled, got nil") + } + if cause := context.Cause(ctx1); !errors.Is(cause, reason) { + t.Fatalf("expected cancellation cause %v, got %v", reason, cause) + } + + ctx2, _ := rc.context("k") + if ctx2 != ctx1 { + t.Fatalf("expected sticky cancelled context to be returned, got new instance") + } + if err := ctx2.Err(); err == nil { + t.Fatalf("expected sticky context to be cancelled") + } +} + +func TestRootCtx_CancelBeforeContextCreatesTombstone(t *testing.T) { + rc := newRootCtx(context.Background()) + + reason := errors.New("preemptive") + rc.cancel("k", reason) + + ctx, _ := rc.context("k") + if err := ctx.Err(); err == nil { + t.Fatalf("expected tombstoned context to be already cancelled") + } + if cause := context.Cause(ctx); !errors.Is(cause, reason) { + t.Fatalf("expected cancellation cause %v, got %v", reason, cause) + } +} + +func TestRootCtx_CancelIdempotent(t *testing.T) { + rc := newRootCtx(context.Background()) + + ctx, _ := rc.context("k") + rc.cancel("k", errors.New("first")) + firstCause := context.Cause(ctx) + + rc.cancel("k", errors.New("second")) + if got := context.Cause(ctx); got != firstCause { + t.Fatalf("cancel should be idempotent: cause changed from %v to %v", firstCause, got) + } +} + +func TestRootCtx_Delete(t *testing.T) { + rc := newRootCtx(context.Background()) + + ctx1, _ := rc.context("k") + reason := errors.New("migrated") + rc.delete("k", reason) + + if err := ctx1.Err(); err == nil { + t.Fatalf("delete should cancel the removed context") + } + if cause := context.Cause(ctx1); !errors.Is(cause, reason) { + t.Fatalf("expected cancellation cause %v, got %v", reason, cause) + } + + ctx2, _ := rc.context("k") + if ctx2 == ctx1 { + t.Fatalf("expected fresh context after delete, got the deleted one") + } + if err := ctx2.Err(); err != nil { + t.Fatalf("expected fresh live context after delete, got Err=%v", err) + } + + // delete on a missing key is a no-op. + rc.delete("missing", errors.New("nope")) +} + +func TestRootCtx_CancelAll(t *testing.T) { + rc := newRootCtx(context.Background()) + + ctxA, _ := rc.context("a") + ctxB, _ := rc.context("b") + + reason := errors.New("shutdown") + rc.cancelAll(reason) + + for _, c := range []context.Context{ctxA, ctxB} { + if err := c.Err(); err == nil { + t.Fatalf("expected derived context to be cancelled by cancelAll") + } + } +} + +func TestRootCtx_ConcurrentCreateDeduplicates(t *testing.T) { + rc := newRootCtx(context.Background()) + + const n = 64 + var wg sync.WaitGroup + results := make([]context.Context, n) + start := make(chan struct{}) + + for i := range n { + wg.Go(func() { + <-start + ctx, _ := rc.context("k") + results[i] = ctx //nolint:fatcontext // required for the text + }) + } + close(start) + wg.Wait() + + first := results[0] + for i, c := range results { + if c != first { + t.Fatalf("expected all goroutines to receive the same context, mismatch at index %d", i) + } + } +} + +func TestRootCtx_ConcurrentCancelAndContext(t *testing.T) { + rc := newRootCtx(context.Background()) + + // Pre-create the entry so the race is purely between cancel and + // concurrent reads, not against the slow-path create. + rc.context("k") + + const readers = 32 + var wg sync.WaitGroup + stop := make(chan struct{}) + var raced atomic.Bool + + for range readers { + wg.Go(func() { + for { + select { + case <-stop: + return + default: + } + rc.context("k") + } + }) + } + + // Let readers spin up. + time.Sleep(10 * time.Millisecond) + + rc.cancel("k", errors.New("inactive")) + + // After cancel returns, every subsequent context call must yield a + // cancelled context. Probe a bunch. + for range 1000 { + ctx, _ := rc.context("k") + if ctx.Err() == nil { + raced.Store(true) + break + } + } + + close(stop) + wg.Wait() + + if raced.Load() { + t.Fatalf("context returned a non-cancelled ctx after cancel(k)") + } +} diff --git a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go index 68b62c5436c..a2de0b76f0a 100644 --- a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go +++ b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go @@ -18,6 +18,7 @@ package logicalcluster import ( "context" + "fmt" "github.com/kcp-dev/logicalcluster/v3" corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" @@ -48,8 +49,9 @@ func (r *phaseReconciler) reconcile(ctx context.Context, workspace *corev1alpha1 default: if workspace.Status.Phase != corev1alpha1.LogicalClusterPhaseDeleting { workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseDeleting - // At this point access to the LC is no longer permitted, cancel contexts. - r.clusterContextManager.Cancel(logicalcluster.From(workspace).Path()) + // At this point access to the LC is no longer permitted, delete contexts for it. + lcPath := logicalcluster.From(workspace).Path() + r.clusterContextManager.Delete(lcPath, fmt.Errorf("logical cluster %s deleted", lcPath)) return reconcileStatusContinue, nil } } @@ -79,12 +81,18 @@ func (r *phaseReconciler) reconcile(ctx context.Context, workspace *corev1alpha1 // Cancel active connections for this LC as well as wildcard // connections, as they may watch objects in this LC. lcPath := logicalcluster.From(workspace).Path() - r.clusterContextManager.Cancel(lcPath) - r.clusterContextManager.Cancel(logicalcluster.Wildcard) + reason := fmt.Errorf("logical cluster %s inactive", lcPath) + r.clusterContextManager.Cancel(lcPath, reason) + r.clusterContextManager.Cancel(logicalcluster.Wildcard, reason) } case corev1alpha1.LogicalClusterPhaseInactive: if workspace.Annotations[filters.InactiveAnnotation] != "true" { workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseReady + // Drop the cancelled entries so the next request creates fresh live contexts. + lcPath := logicalcluster.From(workspace).Path() + reason := fmt.Errorf("logical cluster %s reactivated", lcPath) + r.clusterContextManager.Delete(lcPath, reason) + r.clusterContextManager.Delete(logicalcluster.Wildcard, reason) } } diff --git a/pkg/server/server.go b/pkg/server/server.go index 0390c7d5d4f..1a9d04791a9 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -693,7 +693,7 @@ func (s *Server) Run(ctx context.Context) error { return err } if err := s.AddPreShutdownHook("kcp-cluster-context-manager", func() error { - s.ClusterContextManager.CancelAll() + s.ClusterContextManager.Shutdown() return nil }); err != nil { return err From c26a86cac1f138716775630ce66631a3a77fb8a1 Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Fri, 22 May 2026 11:04:39 +0200 Subject: [PATCH 13/18] Terser and more explicit connection context wrapping Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- pkg/server/filters/perclustercontext.go | 29 +++++++++++++++++-------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/server/filters/perclustercontext.go b/pkg/server/filters/perclustercontext.go index feb6a428d22..0cce717cebc 100644 --- a/pkg/server/filters/perclustercontext.go +++ b/pkg/server/filters/perclustercontext.go @@ -38,24 +38,35 @@ import ( func WithPerClusterContext(handler http.Handler, mgr *contextmanager.Manager[logicalcluster.Path]) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { cluster := request.ClusterFrom(req.Context()) - // Explicitly including wildcard requests. When a cluster is - // cancelled the wildcard connections must also be cancelled in - // case a wildcard watch targets objects in the affected logical - // cluster. - if cluster == nil || cluster.Name.Empty() || strings.HasPrefix(cluster.Name.String(), "system:") { - handler.ServeHTTP(w, req) - return - } var clusterPath logicalcluster.Path + // Handling the differing cases in a switch to prevent crossing the logic. switch { + case cluster == nil: + handler.ServeHTTP(w, req) + return case cluster.Wildcard: + // Explicitly including wildcard requests. When a cluster is + // cancelled the wildcard connections must also be cancelled in + // case a wildcard watch targets objects in the affected logical + // cluster. clusterPath = logicalcluster.Wildcard + case cluster.Name.Empty(): + // .Name.Empty must be checked after .Wildcard as .Name will + // be empty if .Wildcard is true. + handler.ServeHTTP(w, req) + return + case strings.HasPrefix(cluster.Name.String(), "system:"): + // The per-shard system workspaces do not need a context as + // they can't be migrated and shouldn't be put in inactive + // state. + handler.ServeHTTP(w, req) + return default: clusterPath = cluster.Name.Path() } - ctx, cleanup := mgr.ContextFor(req.Context(), clusterPath) + ctx, cleanup := mgr.Context(req.Context(), clusterPath) defer cleanup() handler.ServeHTTP(w, req.WithContext(ctx)) } From 6aa6b84d8494f3303c0ae78b4dc5062dff0fff91 Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Fri, 22 May 2026 11:36:15 +0200 Subject: [PATCH 14/18] Add corev1alpha1.LogicalClusterInactiveAnnotationKey Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- .../logicalcluster_reconcile_phase.go | 5 ++-- pkg/server/filters/inactivelogicalcluster.go | 10 ++------ .../core/v1alpha1/logicalcluster_types.go | 25 ++++++++++++++++++- test/e2e/workspace/inactive_test.go | 6 ++--- 4 files changed, 31 insertions(+), 15 deletions(-) diff --git a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go index a2de0b76f0a..272675f518f 100644 --- a/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go +++ b/pkg/reconciler/core/logicalcluster/logicalcluster_reconcile_phase.go @@ -27,7 +27,6 @@ import ( "github.com/kcp-dev/sdk/apis/third_party/conditions/util/conditions" "github.com/kcp-dev/kcp/pkg/contextmanager" - "github.com/kcp-dev/kcp/pkg/server/filters" ) type phaseReconciler struct { @@ -76,7 +75,7 @@ func (r *phaseReconciler) reconcile(ctx context.Context, workspace *corev1alpha1 workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseReady conditions.MarkTrue(workspace, tenancyv1alpha1.WorkspaceInitialized) case corev1alpha1.LogicalClusterPhaseReady: - if workspace.Annotations[filters.InactiveAnnotation] == "true" { + if corev1alpha1.IsLogicalClusterInactive(workspace.Annotations) { workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseInactive // Cancel active connections for this LC as well as wildcard // connections, as they may watch objects in this LC. @@ -86,7 +85,7 @@ func (r *phaseReconciler) reconcile(ctx context.Context, workspace *corev1alpha1 r.clusterContextManager.Cancel(logicalcluster.Wildcard, reason) } case corev1alpha1.LogicalClusterPhaseInactive: - if workspace.Annotations[filters.InactiveAnnotation] != "true" { + if !corev1alpha1.IsLogicalClusterInactive(workspace.Annotations) { workspace.Status.Phase = corev1alpha1.LogicalClusterPhaseReady // Drop the cancelled entries so the next request creates fresh live contexts. lcPath := logicalcluster.From(workspace).Path() diff --git a/pkg/server/filters/inactivelogicalcluster.go b/pkg/server/filters/inactivelogicalcluster.go index e19b5e38306..50b9ec7798c 100644 --- a/pkg/server/filters/inactivelogicalcluster.go +++ b/pkg/server/filters/inactivelogicalcluster.go @@ -30,16 +30,10 @@ import ( corev1alpha1informers "github.com/kcp-dev/sdk/client/informers/externalversions/core/v1alpha1" ) -const ( - // InactiveAnnotation is the annotation denoting a logical cluster should be - // deemed unreachable. - InactiveAnnotation = "internal.kcp.io/inactive" -) - // WithBlockInactiveLogicalClusters ensures that any requests to logical // clusters marked inactive are rejected. // -// The filter intentionally acts on the InactiveAnnotation to deny +// The filter intentionally acts on the inactive annotation to deny // access as early as possible, as opposed to the Inactive LC phase // which is only set after the LC reconciler fires. func WithBlockInactiveLogicalClusters(handler http.Handler, kcpClusterClient corev1alpha1informers.LogicalClusterClusterInformer) http.HandlerFunc { @@ -60,7 +54,7 @@ func WithBlockInactiveLogicalClusters(handler http.Handler, kcpClusterClient cor if cluster != nil && !cluster.Name.Empty() && !isException { logicalCluster, err := kcpClusterClient.Cluster(cluster.Name).Lister().Get(corev1alpha1.LogicalClusterName) if err == nil { - if ann, ok := logicalCluster.ObjectMeta.Annotations[InactiveAnnotation]; ok && ann == "true" { + if corev1alpha1.IsLogicalClusterInactive(logicalCluster.Annotations) { responsewriters.ErrorNegotiated( apierrors.NewForbidden(corev1alpha1.Resource("logicalclusters"), cluster.Name.String(), errors.New("logical cluster is marked inactive")), errorCodecs, schema.GroupVersion{}, w, req, diff --git a/staging/src/github.com/kcp-dev/sdk/apis/core/v1alpha1/logicalcluster_types.go b/staging/src/github.com/kcp-dev/sdk/apis/core/v1alpha1/logicalcluster_types.go index 44e27fabca0..5a4e40c53fa 100644 --- a/staging/src/github.com/kcp-dev/sdk/apis/core/v1alpha1/logicalcluster_types.go +++ b/staging/src/github.com/kcp-dev/sdk/apis/core/v1alpha1/logicalcluster_types.go @@ -56,6 +56,20 @@ const ( // LogicalClusterFinalizerName attached to the owner of the LogicalCluster resource (usually a Workspace) so that we can control // deletion of LogicalCluster resources. LogicalClusterFinalizerName = "core.kcp.io/logicalcluster" + + // LogicalClusterInactiveAnnotationKey is the annotation denoting a logical + // cluster should be deemed unreachable. When set to "true" the + // active connections for the logical cluster will be cancelled and + // requests will be rejected. + // The phase of a logical cluster with this annotation is set to + // LogicalClusterPhaseInactive. + LogicalClusterInactiveAnnotationKey = "core.kcp.io/inactive" + + // LogicalClusterInactiveAnnotationKeyLegacy is the previous + // inactive annotation key and honoured for backwards compatibility. + // + // Deprecated: use LogicalClusterInactiveAnnotationKey. + LogicalClusterInactiveAnnotationKeyLegacy = "internal.kcp.io/inactive" ) // LogicalClusterPhaseType is the type of the current phase of the logical cluster. @@ -75,7 +89,7 @@ const ( LogicalClusterPhaseUnavailable LogicalClusterPhaseType = "Unavailable" // LogicalClusterPhaseInactive phase indicates that the logical cluster has been // intentionally taken offline (for example, during maintenance). - // This phase is driven by the internal.kcp.io/inactive annotation. + // This phase is driven by the LogicalClusterInactiveAnnotationKey annotation. // This is distinct from Unavailable in so far that Inactive is an // intentional admin decision, while Unavailable is caused by an error. LogicalClusterPhaseInactive LogicalClusterPhaseType = "Inactive" @@ -248,6 +262,15 @@ func (in *LogicalCluster) GetConditions() conditionsv1alpha1.Conditions { var _ conditions.Getter = &LogicalCluster{} var _ conditions.Setter = &LogicalCluster{} +// IsLogicalClusterInactive reports whether the given annotations mark a +// LogicalCluster as inactive. It accepts both the canonical +// LogicalClusterInactiveAnnotationKey and the deprecated +// LogicalClusterInactiveAnnotationKeyLegacy. +func IsLogicalClusterInactive(annotations map[string]string) bool { + return annotations[LogicalClusterInactiveAnnotationKey] == "true" || + annotations[LogicalClusterInactiveAnnotationKeyLegacy] == "true" +} + // LogicalClusterList is a list of LogicalCluster // // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/test/e2e/workspace/inactive_test.go b/test/e2e/workspace/inactive_test.go index 14cc45613fa..89e212e6b10 100644 --- a/test/e2e/workspace/inactive_test.go +++ b/test/e2e/workspace/inactive_test.go @@ -27,11 +27,11 @@ import ( kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" "github.com/kcp-dev/sdk/apis/core" + corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" kcpclientset "github.com/kcp-dev/sdk/client/clientset/versioned/cluster" kcptesting "github.com/kcp-dev/sdk/testing" kcptestinghelpers "github.com/kcp-dev/sdk/testing/helpers" - "github.com/kcp-dev/kcp/pkg/server/filters" "github.com/kcp-dev/kcp/test/e2e/framework" ) @@ -58,7 +58,7 @@ func TestInactiveLogicalCluster(t *testing.T) { if err != nil { return false, err.Error() } - lc.Annotations[filters.InactiveAnnotation] = "true" + lc.Annotations[corev1alpha1.LogicalClusterInactiveAnnotationKey] = "true" _, err = kcpClient.Cluster(orgPath).CoreV1alpha1().LogicalClusters().Update(t.Context(), lc, v1.UpdateOptions{}) if err != nil { return false, err.Error() @@ -94,7 +94,7 @@ drain: if err != nil { return false, err.Error() } - delete(lc.Annotations, filters.InactiveAnnotation) + delete(lc.Annotations, corev1alpha1.LogicalClusterInactiveAnnotationKey) _, err = kcpClient.Cluster(orgPath).CoreV1alpha1().LogicalClusters().Update(t.Context(), lc, v1.UpdateOptions{}) if err != nil { return false, err.Error() From dd86d760a1a92a94a43060a845cc93768d4932a7 Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Fri, 22 May 2026 11:44:39 +0200 Subject: [PATCH 15/18] codegen Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- config/crds/core.kcp.io_logicalclusters.yaml | 1 + config/crds/tenancy.kcp.io_workspaces.yaml | 1 + config/root-phase0/apiexport-tenancy.kcp.io.yaml | 2 +- .../apiresourceschema-logicalclusters.core.kcp.io.yaml | 3 ++- .../apiresourceschema-workspaces.tenancy.kcp.io.yaml | 3 ++- 5 files changed, 7 insertions(+), 3 deletions(-) diff --git a/config/crds/core.kcp.io_logicalclusters.yaml b/config/crds/core.kcp.io_logicalclusters.yaml index 2819a4b735d..b5c59046aa9 100644 --- a/config/crds/core.kcp.io_logicalclusters.yaml +++ b/config/crds/core.kcp.io_logicalclusters.yaml @@ -233,6 +233,7 @@ spec: - Initializing - Ready - Unavailable + - Inactive - Terminating - Deleting type: string diff --git a/config/crds/tenancy.kcp.io_workspaces.yaml b/config/crds/tenancy.kcp.io_workspaces.yaml index 0b3040a782a..539b4962c42 100644 --- a/config/crds/tenancy.kcp.io_workspaces.yaml +++ b/config/crds/tenancy.kcp.io_workspaces.yaml @@ -298,6 +298,7 @@ spec: - Initializing - Ready - Unavailable + - Inactive - Terminating - Deleting type: string diff --git a/config/root-phase0/apiexport-tenancy.kcp.io.yaml b/config/root-phase0/apiexport-tenancy.kcp.io.yaml index b9c8d4c690b..2883934a592 100644 --- a/config/root-phase0/apiexport-tenancy.kcp.io.yaml +++ b/config/root-phase0/apiexport-tenancy.kcp.io.yaml @@ -13,7 +13,7 @@ spec: crd: {} - group: tenancy.kcp.io name: workspaces - schema: v260428-d075f2d48.workspaces.tenancy.kcp.io + schema: v260522-2bf05df66.workspaces.tenancy.kcp.io storage: crd: {} - group: tenancy.kcp.io diff --git a/config/root-phase0/apiresourceschema-logicalclusters.core.kcp.io.yaml b/config/root-phase0/apiresourceschema-logicalclusters.core.kcp.io.yaml index 5160d5caba1..c6e430f0954 100644 --- a/config/root-phase0/apiresourceschema-logicalclusters.core.kcp.io.yaml +++ b/config/root-phase0/apiresourceschema-logicalclusters.core.kcp.io.yaml @@ -1,7 +1,7 @@ apiVersion: apis.kcp.io/v1alpha1 kind: APIResourceSchema metadata: - name: v260428-d075f2d48.logicalclusters.core.kcp.io + name: v260522-2bf05df66.logicalclusters.core.kcp.io spec: group: core.kcp.io names: @@ -230,6 +230,7 @@ spec: - Initializing - Ready - Unavailable + - Inactive - Terminating - Deleting type: string diff --git a/config/root-phase0/apiresourceschema-workspaces.tenancy.kcp.io.yaml b/config/root-phase0/apiresourceschema-workspaces.tenancy.kcp.io.yaml index 32c9aed86b5..7f7f8fb78e0 100644 --- a/config/root-phase0/apiresourceschema-workspaces.tenancy.kcp.io.yaml +++ b/config/root-phase0/apiresourceschema-workspaces.tenancy.kcp.io.yaml @@ -1,7 +1,7 @@ apiVersion: apis.kcp.io/v1alpha1 kind: APIResourceSchema metadata: - name: v260428-d075f2d48.workspaces.tenancy.kcp.io + name: v260522-2bf05df66.workspaces.tenancy.kcp.io spec: group: tenancy.kcp.io names: @@ -295,6 +295,7 @@ spec: - Initializing - Ready - Unavailable + - Inactive - Terminating - Deleting type: string From 7da1d2b33e394e6df6f926ffdaa520fb7ac1e307 Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Fri, 22 May 2026 12:16:20 +0200 Subject: [PATCH 16/18] Split TestInactiveLogicalCluster into three distinct tests for direct requests, scoped watch and wildcard watch Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- test/e2e/workspace/inactive_test.go | 162 ++++++++++++++++++++-------- 1 file changed, 119 insertions(+), 43 deletions(-) diff --git a/test/e2e/workspace/inactive_test.go b/test/e2e/workspace/inactive_test.go index 89e212e6b10..bbab6ebaa16 100644 --- a/test/e2e/workspace/inactive_test.go +++ b/test/e2e/workspace/inactive_test.go @@ -24,8 +24,10 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" + "github.com/kcp-dev/logicalcluster/v3" "github.com/kcp-dev/sdk/apis/core" corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" kcpclientset "github.com/kcp-dev/sdk/client/clientset/versioned/cluster" @@ -35,73 +37,73 @@ import ( "github.com/kcp-dev/kcp/test/e2e/framework" ) -func TestInactiveLogicalCluster(t *testing.T) { - t.Parallel() - framework.Suite(t, "control-plane") - - server := kcptesting.SharedKcpServer(t) - cfg := server.BaseConfig(t) - orgPath, _ := kcptesting.NewWorkspaceFixture(t, server, core.RootCluster.Path(), kcptesting.WithType(core.RootCluster.Path(), "organization")) - - kcpClient, err := kcpclientset.NewForConfig(cfg) - require.NoError(t, err) - kubeClient, err := kcpkubernetesclientset.NewForConfig(cfg) - require.NoError(t, err) - - t.Log("Opening watch on ConfigMaps before marking inactive") - watcher, err := kubeClient.Cluster(orgPath).CoreV1().ConfigMaps("default").Watch(t.Context(), v1.ListOptions{}) - require.NoError(t, err) - t.Cleanup(watcher.Stop) - +func setInactiveAnnotation(t *testing.T, kcpClient kcpclientset.ClusterInterface, orgPath logicalcluster.Path, value bool) { + t.Helper() kcptestinghelpers.Eventually(t, func() (bool, string) { lc, err := kcpClient.Cluster(orgPath).CoreV1alpha1().LogicalClusters().Get(t.Context(), "cluster", v1.GetOptions{}) if err != nil { return false, err.Error() } - lc.Annotations[corev1alpha1.LogicalClusterInactiveAnnotationKey] = "true" + if value { + lc.Annotations[corev1alpha1.LogicalClusterInactiveAnnotationKey] = "true" + } else { + delete(lc.Annotations, corev1alpha1.LogicalClusterInactiveAnnotationKey) + } _, err = kcpClient.Cluster(orgPath).CoreV1alpha1().LogicalClusters().Update(t.Context(), lc, v1.UpdateOptions{}) if err != nil { return false, err.Error() } return true, "" }, wait.ForeverTestTimeout, time.Millisecond*100) +} - t.Log("Verify that normal requests fail") - kcptestinghelpers.Eventually(t, func() (bool, string) { - _, err := kubeClient.Cluster(orgPath).CoreV1().Namespaces().List(t.Context(), v1.ListOptions{}) - if err == nil { - return false, "expected error when accessing an inactive logical cluster" - } - return true, "" - }, wait.ForeverTestTimeout, time.Millisecond*100) - - t.Log("Verify that the open watch is terminated") -drain: +func drainAndExpectClose(t *testing.T, w watch.Interface) { + t.Helper() for { select { - case _, ok := <-watcher.ResultChan(): + case _, ok := <-w.ResultChan(): if !ok { - break drain + return } case <-time.After(wait.ForeverTestTimeout): t.Fatal("watch was not terminated after marking logical cluster inactive") } } +} - t.Log("Remove inactive annotation again") +// TestInactiveLogicalClusterBlocksRequests checks that requests made +// against a logical cluster marked as inactive are failing and succeed +// after the inactive annotation is no longer "true". +func TestInactiveLogicalClusterBlocksRequests(t *testing.T) { + t.Parallel() + framework.Suite(t, "control-plane") + + // The test requires a separate server as marking a cluster inactive + // breaks wildcard watches which may affect other tests. + server := kcptesting.PrivateKcpServer(t) + cfg := server.BaseConfig(t) + orgPath, _ := kcptesting.NewWorkspaceFixture(t, server, core.RootCluster.Path(), kcptesting.WithType(core.RootCluster.Path(), "organization")) + + kcpClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err) + kubeClient, err := kcpkubernetesclientset.NewForConfig(cfg) + require.NoError(t, err) + + t.Log("Mark logical cluster inactive") + setInactiveAnnotation(t, kcpClient, orgPath, true) + + t.Log("Verify that normal requests fail") kcptestinghelpers.Eventually(t, func() (bool, string) { - lc, err := kcpClient.Cluster(orgPath).CoreV1alpha1().LogicalClusters().Get(t.Context(), "cluster", v1.GetOptions{}) - if err != nil { - return false, err.Error() - } - delete(lc.Annotations, corev1alpha1.LogicalClusterInactiveAnnotationKey) - _, err = kcpClient.Cluster(orgPath).CoreV1alpha1().LogicalClusters().Update(t.Context(), lc, v1.UpdateOptions{}) - if err != nil { - return false, err.Error() + _, err := kubeClient.Cluster(orgPath).CoreV1().Namespaces().List(t.Context(), v1.ListOptions{}) + if err == nil { + return false, "expected error when accessing an inactive logical cluster" } return true, "" }, wait.ForeverTestTimeout, time.Millisecond*100) + t.Log("Remove inactive annotation again") + setInactiveAnnotation(t, kcpClient, orgPath, false) + t.Log("Verify that normal requests succeed again") kcptestinghelpers.Eventually(t, func() (bool, string) { _, err := kubeClient.Cluster(orgPath).CoreV1().Namespaces().List(t.Context(), v1.ListOptions{}) @@ -110,9 +112,83 @@ drain: } return true, "" }, wait.ForeverTestTimeout, time.Millisecond*100) +} + +// TestInactiveLogicalClusterTerminatesClusterScopedWatch verifies that +// a cluster-scoped watch is cancelled when the cluster is marked as +// inactive and can be reestablished after the annotation is no longer +// "true". +func TestInactiveLogicalClusterTerminatesClusterScopedWatch(t *testing.T) { + t.Parallel() + framework.Suite(t, "control-plane") + + // The test requires a separate server as marking a cluster inactive + // breaks wildcard watches which may affect other tests. + server := kcptesting.PrivateKcpServer(t) + cfg := server.BaseConfig(t) + orgPath, _ := kcptesting.NewWorkspaceFixture(t, server, core.RootCluster.Path(), kcptesting.WithType(core.RootCluster.Path(), "organization")) + + kcpClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err) + kubeClient, err := kcpkubernetesclientset.NewForConfig(cfg) + require.NoError(t, err) + + t.Log("Opening watch on ConfigMaps before marking inactive") + watcher, err := kubeClient.Cluster(orgPath).CoreV1().ConfigMaps("default").Watch(t.Context(), v1.ListOptions{}) + require.NoError(t, err) + t.Cleanup(watcher.Stop) + + t.Log("Mark logical cluster inactive") + setInactiveAnnotation(t, kcpClient, orgPath, true) + + t.Log("Verify that the open watch is terminated") + drainAndExpectClose(t, watcher) + + t.Log("Remove inactive annotation again") + setInactiveAnnotation(t, kcpClient, orgPath, false) t.Log("Verify that a new watch can be opened after re-activation") - newWatcher, err := kubeClient.Cluster(orgPath).CoreV1().ConfigMaps("default").Watch(t.Context(), v1.ListOptions{}) + kcptestinghelpers.Eventually(t, func() (bool, string) { + newWatcher, err := kubeClient.Cluster(orgPath).CoreV1().ConfigMaps("default").Watch(t.Context(), v1.ListOptions{}) + if err != nil { + return false, err.Error() + } + t.Cleanup(newWatcher.Stop) + return true, "" + }, wait.ForeverTestTimeout, time.Millisecond*100) +} + +// TestInactiveLogicalClusterTerminatesWildcardWatch does the same check +// as TestInactiveLogicalClusterTerminatesClusterScopedWatch but for +// a wildcard watch. +func TestInactiveLogicalClusterTerminatesWildcardWatch(t *testing.T) { + t.Parallel() + framework.Suite(t, "control-plane") + + // The test requires a separate server as marking a cluster inactive + // breaks wildcard watches which may affect other tests. + server := kcptesting.PrivateKcpServer(t) + cfg := server.BaseConfig(t) + orgPath, _ := kcptesting.NewWorkspaceFixture(t, server, core.RootCluster.Path(), kcptesting.WithType(core.RootCluster.Path(), "organization")) + + kcpClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err) + + wildcardCfg := server.RootShardSystemMasterBaseConfig(t) + wildcardKubeClient, err := kcpkubernetesclientset.NewForConfig(wildcardCfg) require.NoError(t, err) - t.Cleanup(newWatcher.Stop) + + t.Log("Opening wildcard watch on ConfigMaps before marking inactive") + watcher, err := wildcardKubeClient.CoreV1().ConfigMaps().Watch(t.Context(), v1.ListOptions{}) + require.NoError(t, err) + t.Cleanup(watcher.Stop) + + t.Log("Mark logical cluster inactive") + setInactiveAnnotation(t, kcpClient, orgPath, true) + + t.Log("Verify that the open wildcard watch is terminated") + drainAndExpectClose(t, watcher) + + t.Log("Remove inactive annotation again to leave the shared server clean") + setInactiveAnnotation(t, kcpClient, orgPath, false) } From 54f9d887abd98abe262fe9de6eebc5cfe51349cf Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Fri, 22 May 2026 12:25:16 +0200 Subject: [PATCH 17/18] Make contextmanager.Manager generic over fmt.Stringer instead of comparable Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- pkg/contextmanager/contextmanager.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/contextmanager/contextmanager.go b/pkg/contextmanager/contextmanager.go index 51db93dcd11..d2262c3968f 100644 --- a/pkg/contextmanager/contextmanager.go +++ b/pkg/contextmanager/contextmanager.go @@ -25,19 +25,19 @@ import ( var errShutdown = errors.New("context manager shut down") // Manager tracks contexts derived from the root context. -type Manager[K comparable] struct { +type Manager[K fmt.Stringer] struct { rc *rootCtx } // New creates a new context manager. -func New[K comparable](root context.Context) *Manager[K] { +func New[K fmt.Stringer](root context.Context) *Manager[K] { return &Manager[K]{rc: newRootCtx(root)} } // Context returns a new context that is derived from parent. // The context will be cancelled if either the manager's root context or the respective key context is cancelled. func (m *Manager[K]) Context(parent context.Context, key K) (context.Context, context.CancelFunc) { - keyCtx, _ := m.rc.context(fmt.Sprint(key)) + keyCtx, _ := m.rc.context(key.String()) ctx, cancel := context.WithCancelCause(parent) stop := context.AfterFunc(keyCtx, func() { @@ -55,12 +55,12 @@ func (m *Manager[K]) Context(parent context.Context, key K) (context.Context, co // Cancel cancels the context for the given key with reason. // If no context exists for the key a context will be created and cancelled. func (m *Manager[K]) Cancel(key K, reason error) { - m.rc.cancel(fmt.Sprint(key), reason) + m.rc.cancel(key.String(), reason) } // Delete removes the entry for the given key, cancelling its context with reason. func (m *Manager[K]) Delete(key K, reason error) { - m.rc.delete(fmt.Sprint(key), reason) + m.rc.delete(key.String(), reason) } // Shutdown cancels the root context, which propagates to all contexts. From 4000587fd53e76b80e63ea648fadd4e39d058f48 Mon Sep 17 00:00:00 2001 From: "Nelo-T. Wallus" Date: Fri, 22 May 2026 13:53:21 +0200 Subject: [PATCH 18/18] Add the same path exemptions as for inactive logical clusters Signed-off-by: Nelo-T. Wallus Signed-off-by: Nelo-T. Wallus --- pkg/server/filters/perclustercontext.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pkg/server/filters/perclustercontext.go b/pkg/server/filters/perclustercontext.go index 0cce717cebc..36e7e4e58fc 100644 --- a/pkg/server/filters/perclustercontext.go +++ b/pkg/server/filters/perclustercontext.go @@ -36,7 +36,26 @@ import ( // This is used e.g. to cancel active connections when a logical cluster // is being migrated. func WithPerClusterContext(handler http.Handler, mgr *contextmanager.Manager[logicalcluster.Path]) http.HandlerFunc { + // TODO(ntnn): THis is the same list as for inactive logical clusters. + // Will look into deduplicating this when implementing lc migration. + // exemptPathPrefixes allows some paths to pass without adding a context. + exemptPathPrefixes := []string{ + // Kube clients expect the /openapi endpoint to be available to + // e.g. retrieve schemas. E.g. kubectl-edit fetches openapi + // specs to validate the edited resource. + "/openapi", + // logical cluster objects must still be editable to lifecylce the lc. + "/apis/core.kcp.io/v1alpha1/logicalclusters", + } + return func(w http.ResponseWriter, req *http.Request) { + for _, prefix := range exemptPathPrefixes { + if strings.HasPrefix(req.URL.Path, prefix) { + handler.ServeHTTP(w, req) + return + } + } + cluster := request.ClusterFrom(req.Context()) var clusterPath logicalcluster.Path