From 6e2cedab57aa8fefed041f1061aa6a078659c59e Mon Sep 17 00:00:00 2001 From: Mangirdas Judeikis Date: Thu, 21 May 2026 08:33:10 +0300 Subject: [PATCH 1/7] wire in labels to pprof --- .../investigations/memory-leak-attribution.md | 273 ++++++++++++++++++ .../kubequota/kubequota_admission.go | 70 +++-- pkg/pproflabels/labels.go | 42 +++ .../kubequota/kubequota_controller.go | 140 ++++----- 4 files changed, 429 insertions(+), 96 deletions(-) create mode 100644 docs/content/developers/investigations/memory-leak-attribution.md create mode 100644 pkg/pproflabels/labels.go diff --git a/docs/content/developers/investigations/memory-leak-attribution.md b/docs/content/developers/investigations/memory-leak-attribution.md new file mode 100644 index 00000000000..b4bf613779b --- /dev/null +++ b/docs/content/developers/investigations/memory-leak-attribution.md @@ -0,0 +1,273 @@ +--- +description: > + Diagnosing memory and goroutine leaks in kcp, with pprof labels for + attribution to a specific controller and logical cluster. +--- + +# Memory and goroutine leak attribution + +This page documents how to diagnose memory and goroutine leaks in kcp, especially +ones that grow as logical clusters are created and deleted (see +[#4071](https://github.com/kcp-dev/kcp/issues/4071), [#3350](https://github.com/kcp-dev/kcp/issues/3350)). + +It is aimed at maintainers debugging an instance that is gradually retaining +memory, and at contributors writing per-cluster controllers who want their +goroutines to be attributable in profiles. + +## Background + +Per-cluster controllers in kcp spawn goroutines whose lifetime is bounded by a +single logical cluster. When a cluster is deleted, the controller's context is +cancelled, but several classes of goroutine outlive that cancellation if they +were not started with a context-aware termination path. The dominant case is +informer `processorListener` goroutines registered via `AddEventHandler` on +shared informers: the listener is owned by the (long-lived) shared informer, so +nothing stops it when the (short-lived) per-cluster controller goes away. + +Each leaked controller registration produces two goroutines (`pop` and `run`), +each with a buffered notification channel. Across thousands of cluster +create/delete cycles, retained memory grows without bound until the pod is +restarted. + +## Enabling pprof + +kcp inherits the upstream apiserver's `--profiling` flag, which is **on by +default**. There are two practical ways to query pprof. + +### Secure port (default, requires auth) + +Any running kcp serves pprof on the same secure port as the API: + +``` +https://:/debug/pprof/ +https://:/debug/pprof/goroutine?debug=2 +https://:/debug/pprof/heap +``` + +Use a kubeconfig that has access to the root admin endpoint, e.g. via +`kubectl proxy` or `kubectl get --raw /debug/pprof/goroutine?debug=2`. + +### Unauth'd unix socket (recommended for debugging) + +Start kcp with `--debug-socket-path`: + +``` +kcp start --debug-socket-path=/tmp/kcp-debug.sock ... +``` + +Then query the socket directly: + +``` +curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/goroutine?debug=2' > goroutines.txt +curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/heap' > heap.pprof +``` + +Quote the URL — in zsh, `?` is a glob and an unquoted URL with a query string +fails with `zsh: no matches found`. + +The unix socket has no authn/authz, so do not enable this in production. + +### In-cluster setups + +For Tilt/Kind/local-up setups where kcp runs in a pod, port-forward the secure +port and use `kubectl get --raw`: + +``` +kubectl get --raw '/debug/pprof/goroutine?debug=2' > goroutines.txt +``` + +### With Prometheus + pprof together + +`hack/run-with-prometheus.sh` brings up a local Prometheus on `localhost:9090` +and auto-configures it to scrape `localhost:6443/metrics` once kcp starts. +Combine it with `--debug-socket-path` to get both worlds — graph the leak in +Prometheus, drill into it with labeled pprof dumps: + +``` +./hack/run-with-prometheus.sh ./bin/kcp start --debug-socket-path=/tmp/kcp-debug.sock +``` + +Key series in Prometheus for leak hunting: + +- `go_goroutines` — total live goroutine count. Should be flat across + workspace churn; any upward slope is a goroutine leak. +- `go_memstats_heap_inuse_bytes` — committed heap. Climbs with retained + objects (cached specs, listener buffers, per-cluster maps). +- `go_memstats_alloc_bytes` — alloc rate proxy. +- `process_resident_memory_bytes` — RSS, including non-Go memory. + +Typical loop: + +``` +# Start Prometheus + kcp +./hack/run-with-prometheus.sh ./bin/kcp start --debug-socket-path=/tmp/kcp-debug.sock + +# In another shell, drive workspace churn +for i in {1..100}; do + kubectl apply -f workspace.yaml; sleep 3 + kubectl delete -f workspace.yaml; sleep 3 +done + +# Watch http://localhost:9090/graph?g0.expr=go_goroutines and confirm slope +# When the count is clearly elevated, grab a labeled dump: +curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/goroutine?debug=2' > after.txt +grep '# labels:' after.txt | sort | uniq -c | sort -rn +``` + +## Counting goroutines + +The pprof goroutine endpoint comes in two formats: + +- `debug=2` — one full stack trace per goroutine. Easy to read, large. +- `debug=1` — stacks aggregated by uniqueness, with a count prefix per stack. + Much smaller; better for "where are all my goroutines coming from". + +Total live goroutines, from a `debug=2` dump: + +``` +grep -c '^goroutine ' goroutines.txt +``` + +Total live goroutines, from a `debug=1` dump: + +``` +curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/goroutine?debug=1' \ + | awk '/^[0-9]+ @/ {sum+=$1} END {print sum}' +``` + +Top 20 stack signatures by goroutine count: + +``` +curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/goroutine?debug=1' \ + | awk '/^[0-9]+ @/ {n=$1; getline label; getline frame; print n"\t"frame}' \ + | sort -rn | head -20 +``` + +## Reading goroutine labels + +Per-cluster code paths are wrapped with [`pkg/pproflabels.Cluster`](https://github.com/kcp-dev/kcp/blob/main/pkg/pproflabels/labels.go), +which attaches `controller` and `logicalcluster` labels via `runtime/pprof.Do`. +Labels propagate to any goroutine spawned during the wrapped call, including +informer `processorListener` goroutines that get spawned when `AddEventHandler` +is invoked on an already-running shared informer. + +Important: pprof emits labels in the **`debug=1` (aggregated)** format, not +`debug=2`. A labeled stack in `goroutine?debug=1` looks like: + +``` +29 @ 0x102437fb8 0x1023cc274 0x1023cbe44 ... +# labels: {"controller":"kcp-kube-quota", "logicalcluster":"2j5lsf4ydtnftsi3"} +# github.com/kcp-dev/apimachinery/v2/third_party/informers.(*processorListener).run.func1+0x43 +# k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1+0x3f +... +``` + +`debug=2` outputs raw stacks one goroutine at a time and does not include +label metadata. Use it to inspect individual stacks, but use `debug=1` for +anything label-based. + +The leading count (`29 @ ...`) is the number of goroutines that share this +stack. The `# labels:` line tells you which controller and which logical +cluster they belong to. + +### Filtering by label + +Work from the `debug=1` dump — that's where labels live. + +To count goroutines per (controller, cluster) label: + +``` +awk '/^[0-9]+ @/ {n=$1; getline label} label ~ /^# labels:/ {c[label]+=n} END {for(k in c) printf "%6d %s\n", c[k], k}' goroutine.summary | sort -rn +``` + +To count goroutines per controller alone (collapse across clusters): + +``` +awk '/^[0-9]+ @/ {n=$1; getline label} label ~ /^# labels:/ { + match(label, /"controller":"[^"]+"/); c[substr(label,RSTART,RLENGTH)]+=n +} END {for(k in c) printf "%6d %s\n", c[k], k}' goroutine.summary | sort -rn +``` + +To find every leaked stack owned by a specific cluster: + +``` +awk '/^[0-9]+ @/ {block=$0; next} /^#/ {block=block"\n"$0; next} /^$/ {if (block ~ /"logicalcluster":"2j5lsf4ydtnftsi3"/) print block; block=""}' goroutine.summary +``` + +To diff before/after a known-clean baseline (the technique used in +[#3350](https://github.com/kcp-dev/kcp/issues/3350)): + +``` +# Baseline: kcp running, no churn +curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/goroutine?debug=2' > before.txt + +# Run churn: create/delete N workspaces in a loop +for i in {1..100}; do + kubectl apply -f workspace.yaml; sleep 3 + kubectl delete -f workspace.yaml; sleep 3 +done + +# After churn +curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/goroutine?debug=2' > after.txt + +# Compare counts per controller label +diff <(grep -oE '"controller":"[^"]+"' before.txt | sort | uniq -c) \ + <(grep -oE '"controller":"[^"]+"' after.txt | sort | uniq -c) +``` + +Any controller whose count grew by ~N (or a multiple thereof) is leaking one or +more handler registrations per cluster. + +## Heap profiles + +`/debug/pprof/heap` returns a sampled heap profile. To attribute retained heap +to (controller, cluster) the program must also have labels set on the goroutine +that allocated the memory — `pprof.Do` does this automatically for everything +allocated during the wrapped call. View with: + +``` +go tool pprof -http=:8080 heap.pprof +``` + +The flame graph's "labels" facet exposes the `controller` and `logicalcluster` +labels. + +## Wiring labels in new controllers + +When introducing a per-cluster controller (anything bounded by a single logical +cluster's lifecycle), wrap the goroutine spawn with `pproflabels.Cluster`: + +```go +import "github.com/kcp-dev/kcp/pkg/pproflabels" + +go pproflabels.Cluster(ctx, ControllerName, clusterName, func(ctx context.Context) { + // controller body: Run, Start, AddEventHandler, etc. + // any goroutines spawned in here inherit the labels. +}) +``` + +This is enough for both the controller's own work goroutines AND any informer +`processorListener` goroutines spawned by `AddEventHandler` while the shared +informer is already running. Both kinds of leaks become attributable. + +If the wrapped code might call `AddEventHandler` *before* the shared informer +is started, the listener goroutines will be spawned later by +`sharedProcessor.run` — outside the labeled scope — and will not inherit +labels. In practice kcp starts shared informers at bootstrap and per-cluster +controllers register handlers afterwards, so this case is rare. + +## Known leak sources + +Cross-reference for current and historical leak fixes: + +- [#3016](https://github.com/kcp-dev/kcp/issues/3016) — original memory/goroutine leak (closed) +- [#3350](https://github.com/kcp-dev/kcp/issues/3350) — workspace churn leaks ~61 goroutines per workspace; root cause is informer handler deregistration (closed; partial fix) +- [#3787](https://github.com/kcp-dev/kcp/issues/3787) — open epic: GC improvements (per-workspace footprint) +- [#4044](https://github.com/kcp-dev/kcp/pull/4044) — cluster-aware GC, merged 2026-04-21 +- [#4071](https://github.com/kcp-dev/kcp/issues/4071) — open: deleting logical clusters does not free memory + +## See also + +- [`pkg/pproflabels`](https://github.com/kcp-dev/kcp/blob/main/pkg/pproflabels/labels.go) — the labeling helper +- [`test/integration/workspace/leak_test.go`](https://github.com/kcp-dev/kcp/blob/main/test/integration/workspace/leak_test.go) — `TestWorkspaceDeletionLeak`, the goroutine leak smoke test +- [Go runtime/pprof](https://pkg.go.dev/runtime/pprof) — `pprof.Do` documentation diff --git a/pkg/admission/kubequota/kubequota_admission.go b/pkg/admission/kubequota/kubequota_admission.go index bab29232063..8dd1cd51c36 100644 --- a/pkg/admission/kubequota/kubequota_admission.go +++ b/pkg/admission/kubequota/kubequota_admission.go @@ -44,6 +44,7 @@ import ( corev1alpha1listers "github.com/kcp-dev/sdk/client/listers/core/v1alpha1" "github.com/kcp-dev/kcp/pkg/admission/initializers" + "github.com/kcp-dev/kcp/pkg/pproflabels" ) // PluginName is the name of this admission plugin. @@ -162,7 +163,7 @@ func (k *KubeResourceQuota) Validate(ctx context.Context, a admission.Attributes } // getOrCreateDelegate creates a resourcequota.QuotaAdmission plugin for clusterName. -func (k *KubeResourceQuota) getOrCreateDelegate(clusterName logicalcluster.Name) (*stoppableQuotaAdmission, error) { +func (k *KubeResourceQuota) getOrCreateDelegate(clusterName logicalcluster.Name) (retDelegate *stoppableQuotaAdmission, retErr error) { k.lock.RLock() delegate := k.delegates[clusterName] k.lock.RUnlock() @@ -181,41 +182,50 @@ func (k *KubeResourceQuota) getOrCreateDelegate(clusterName logicalcluster.Name) // Set up a context that is cancelable and that is bounded by k.serverDone ctx, cancel := context.WithCancel(context.Background()) - go func() { - // Wait for either the context or the server to be done. If it's the server, cancel the context. - select { - case <-ctx.Done(): - case <-k.serverDone: - cancel() - } - }() - const evaluatorWorkersPerWorkspace = 5 - quotaAdmission, err := resourcequota.NewResourceQuota(k.userSuppliedConfiguration, evaluatorWorkersPerWorkspace) - if err != nil { - cancel() - return nil, err - } + // Label the per-cluster init for the duration of this function so that the + // supervisor goroutine below, plus any processorListener goroutines spawned + // by ValidateInitialization() via AddEventHandler on the shared informer, + // inherit pprof labels and remain attributable if they leak (#4071, #3350). + pproflabels.Cluster(ctx, PluginName, clusterName, func(ctx context.Context) { + go func() { + // Wait for either the context or the server to be done. If it's the server, cancel the context. + select { + case <-ctx.Done(): + case <-k.serverDone: + cancel() + } + }() - delegate = &stoppableQuotaAdmission{ - QuotaAdmission: quotaAdmission, - stop: cancel, - } + const evaluatorWorkersPerWorkspace = 5 + quotaAdmission, err := resourcequota.NewResourceQuota(k.userSuppliedConfiguration, evaluatorWorkersPerWorkspace) + if err != nil { + cancel() + retErr = err + return + } - delegate.SetDrainedNotification(ctx.Done()) - delegate.SetResourceQuotaLister(k.scopingResourceQuotaInformer.Cluster(clusterName).Lister()) - delegate.SetResourceQuotaInformer(k.scopingResourceQuotaInformer.Cluster(clusterName).Informer()) - delegate.SetExternalKubeClientSet(k.kubeClusterClient.Cluster(clusterName.Path())) - delegate.SetQuotaConfiguration(k.quotaConfiguration) + delegate = &stoppableQuotaAdmission{ + QuotaAdmission: quotaAdmission, + stop: cancel, + } - if err := delegate.ValidateInitialization(); err != nil { - cancel() - return nil, err - } + delegate.SetDrainedNotification(ctx.Done()) + delegate.SetResourceQuotaLister(k.scopingResourceQuotaInformer.Cluster(clusterName).Lister()) + delegate.SetResourceQuotaInformer(k.scopingResourceQuotaInformer.Cluster(clusterName).Informer()) + delegate.SetExternalKubeClientSet(k.kubeClusterClient.Cluster(clusterName.Path())) + delegate.SetQuotaConfiguration(k.quotaConfiguration) - k.delegates[clusterName] = delegate + if err := delegate.ValidateInitialization(); err != nil { + cancel() + retErr = err + return + } - return delegate, nil + k.delegates[clusterName] = delegate + retDelegate = delegate + }) + return retDelegate, retErr } type stoppableQuotaAdmission struct { diff --git a/pkg/pproflabels/labels.go b/pkg/pproflabels/labels.go new file mode 100644 index 00000000000..877b43cd8e5 --- /dev/null +++ b/pkg/pproflabels/labels.go @@ -0,0 +1,42 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package pproflabels attaches pprof labels to goroutines so that profiles and +// goroutine dumps remain attributable to the controller and logical cluster +// that own them. Labels propagate to goroutines started during the labeled +// call, so wrapping the outermost per-cluster goroutine is enough. +package pproflabels + +import ( + "context" + "runtime/pprof" + + "github.com/kcp-dev/logicalcluster/v3" +) + +// Label keys. Kept stable so tooling (pprof filters, leak tests) can match on them. +const ( + LabelController = "controller" + LabelLogicalCluster = "logicalcluster" +) + +// Cluster runs fn with pprof labels identifying the controller and logical +// cluster. Wrap per-cluster goroutine starts with this so leaked goroutines +// stay attributable in /debug/pprof/goroutine?debug=2 dumps and in goleak +// output. See https://github.com/kcp-dev/kcp/issues/4071. +func Cluster(ctx context.Context, controller string, cluster logicalcluster.Name, fn func(context.Context)) { + pprof.Do(ctx, pprof.Labels(LabelController, controller, LabelLogicalCluster, cluster.String()), fn) +} diff --git a/pkg/reconciler/kubequota/kubequota_controller.go b/pkg/reconciler/kubequota/kubequota_controller.go index 74035be237c..5a0a39ae749 100644 --- a/pkg/reconciler/kubequota/kubequota_controller.go +++ b/pkg/reconciler/kubequota/kubequota_controller.go @@ -43,6 +43,7 @@ import ( "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/logging" + "github.com/kcp-dev/kcp/pkg/pproflabels" ) const ( @@ -252,78 +253,85 @@ func (c *Controller) process(ctx context.Context, key string) error { return nil } -func (c *Controller) startQuotaForLogicalCluster(ctx context.Context, clusterName logicalcluster.Name) error { - logger := klog.FromContext(ctx) - resourceQuotaControllerClient := c.kubeClusterClient.Cluster(clusterName.Path()) - - // TODO(ncdc): find a way to support the default configuration. For now, don't use it, because it is difficult - // to get support for the special evaluators for pods/services/pvcs. - // listerFuncForResource := generic.ListerFuncForResourceFunc(scopedInformerFactory.ForResource) - // quotaConfiguration := install.NewQuotaConfigurationForControllers(listerFuncForResource) - quotaConfiguration := generic.NewConfiguration(nil, install.DefaultIgnoredResources()) - - resourceQuotaControllerOptions := &resourcequota.ControllerOptions{ - QuotaClient: resourceQuotaControllerClient.CoreV1(), - ResourceQuotaInformer: c.resourceQuotaClusterInformer.ClusterWithContext(ctx, clusterName), - ResyncPeriod: controller.StaticResyncPeriodFunc(c.quotaRecalculationPeriod), - InformerFactory: c.scopingGenericSharedInformerFactory.ClusterWithContext(ctx, clusterName), - ReplenishmentResyncPeriod: func() time.Duration { - return c.fullResyncPeriod - }, - // TODO(sttts): this discovery function is wrong. It is some aggregation of all logical clusters, but has non-deterministic - // behaviour if logical clusters don't agree about REST mappings. - DiscoveryFunc: c.dynamicDiscoverySharedInformerFactory.ServerPreferredResources, - IgnoredResourcesFunc: quotaConfiguration.IgnoredResources, - InformersStarted: c.informersStarted, - Registry: generic.NewRegistry(quotaConfiguration.Evaluators()), - } - - resourceQuotaController, err := resourcequota.NewController(ctx, resourceQuotaControllerOptions) - if err != nil { - return err - } +func (c *Controller) startQuotaForLogicalCluster(ctx context.Context, clusterName logicalcluster.Name) (retErr error) { + // Label the current goroutine for the duration of this function. Goroutines + // spawned during this call - including the processorListener.run/pop + // goroutines registered inside resourcequota.NewController via the shared + // informer's AddEventHandler - inherit these labels for their lifetime, + // keeping leaked goroutines attributable in pprof dumps (see #4071, #3350). + pproflabels.Cluster(ctx, ControllerName, clusterName, func(ctx context.Context) { + logger := klog.FromContext(ctx) + resourceQuotaControllerClient := c.kubeClusterClient.Cluster(clusterName.Path()) + + // TODO(ncdc): find a way to support the default configuration. For now, don't use it, because it is difficult + // to get support for the special evaluators for pods/services/pvcs. + // listerFuncForResource := generic.ListerFuncForResourceFunc(scopedInformerFactory.ForResource) + // quotaConfiguration := install.NewQuotaConfigurationForControllers(listerFuncForResource) + quotaConfiguration := generic.NewConfiguration(nil, install.DefaultIgnoredResources()) + + resourceQuotaControllerOptions := &resourcequota.ControllerOptions{ + QuotaClient: resourceQuotaControllerClient.CoreV1(), + ResourceQuotaInformer: c.resourceQuotaClusterInformer.ClusterWithContext(ctx, clusterName), + ResyncPeriod: controller.StaticResyncPeriodFunc(c.quotaRecalculationPeriod), + InformerFactory: c.scopingGenericSharedInformerFactory.ClusterWithContext(ctx, clusterName), + ReplenishmentResyncPeriod: func() time.Duration { + return c.fullResyncPeriod + }, + // TODO(sttts): this discovery function is wrong. It is some aggregation of all logical clusters, but has non-deterministic + // behaviour if logical clusters don't agree about REST mappings. + DiscoveryFunc: c.dynamicDiscoverySharedInformerFactory.ServerPreferredResources, + IgnoredResourcesFunc: quotaConfiguration.IgnoredResources, + InformersStarted: c.informersStarted, + Registry: generic.NewRegistry(quotaConfiguration.Evaluators()), + } - // Here we diverge from what upstream does. Upstream starts a goroutine that retrieves discovery every 30 seconds, - // starting/stopping dynamic informers as needed based on the updated discovery data. We know that kcp contains - // the combination of built-in types plus CRDs. We use that information to drive what quota evaluates. + resourceQuotaController, err := resourcequota.NewController(ctx, resourceQuotaControllerOptions) + if err != nil { + retErr = err + return + } - quotaController := quotaController{ - clusterName: clusterName, - queue: workqueue.NewTypedRateLimitingQueueWithConfig( - workqueue.DefaultTypedControllerRateLimiter[string](), - workqueue.TypedRateLimitingQueueConfig[string]{ - Name: "quota", + // Here we diverge from what upstream does. Upstream starts a goroutine that retrieves discovery every 30 seconds, + // starting/stopping dynamic informers as needed based on the updated discovery data. We know that kcp contains + // the combination of built-in types plus CRDs. We use that information to drive what quota evaluates. + + quotaController := quotaController{ + clusterName: clusterName, + queue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[string](), + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "quota", + }, + ), + work: func(ctx context.Context) { + resourceQuotaController.UpdateMonitors(ctx, c.dynamicDiscoverySharedInformerFactory.ServerPreferredResources) }, - ), - work: func(ctx context.Context) { - resourceQuotaController.UpdateMonitors(ctx, c.dynamicDiscoverySharedInformerFactory.ServerPreferredResources) - }, - } - go quotaController.Start(ctx) - - apisChanged := c.dynamicDiscoverySharedInformerFactory.Subscribe("quota-" + clusterName.String()) - - go func() { - for { - select { - case <-ctx.Done(): - return - case <-apisChanged: - logger.V(4).Info("got API change notification") - quotaController.queue.Add("resync") // this queue only ever has one key in it, as long as it's constant we are OK - } } - }() - - // Do this in a goroutine to avoid holding up a worker in the event UpdateMonitors stalls for whatever reason - go func() { - // Make sure the monitors are synced at least once - resourceQuotaController.UpdateMonitors(ctx, c.dynamicDiscoverySharedInformerFactory.ServerPreferredResources) + go quotaController.Start(ctx) + + apisChanged := c.dynamicDiscoverySharedInformerFactory.Subscribe("quota-" + clusterName.String()) + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-apisChanged: + logger.V(4).Info("got API change notification") + quotaController.queue.Add("resync") // this queue only ever has one key in it, as long as it's constant we are OK + } + } + }() - go resourceQuotaController.Run(ctx, c.workersPerLogicalCluster) - }() + // Do this in a goroutine to avoid holding up a worker in the event UpdateMonitors stalls for whatever reason + go func() { + // Make sure the monitors are synced at least once + resourceQuotaController.UpdateMonitors(ctx, c.dynamicDiscoverySharedInformerFactory.ServerPreferredResources) - return nil + go resourceQuotaController.Run(ctx, c.workersPerLogicalCluster) + }() + }) + return retErr } type quotaController struct { From a860dfd0978e9ff4e5e39bb8bfd132fddc8526f4 Mon Sep 17 00:00:00 2001 From: Mangirdas Judeikis Date: Thu, 21 May 2026 08:33:28 +0300 Subject: [PATCH 2/7] add workspace quickstart into cli for ease of testing --- .../kcp-dev/cli/pkg/quickstart/cmd/cmd.go | 4 + .../cli/pkg/quickstart/plugin/options.go | 28 +- .../pkg/quickstart/plugin/quickstart_test.go | 2 + .../pkg/quickstart/scenarios/api_provider.go | 11 + .../cli/pkg/quickstart/scenarios/scenario.go | 5 + .../quickstart/scenarios/scenarios_test.go | 4 + .../pkg/quickstart/scenarios/workspaces.go | 298 ++++++++++++++++++ .../quickstart/scenarios/workspaces_test.go | 187 +++++++++++ 8 files changed, 533 insertions(+), 6 deletions(-) create mode 100644 staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/workspaces.go create mode 100644 staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/workspaces_test.go diff --git a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/cmd/cmd.go b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/cmd/cmd.go index c6f3d2197be..1dccb67399a 100644 --- a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/cmd/cmd.go +++ b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/cmd/cmd.go @@ -41,6 +41,10 @@ func New(streams genericclioptions.IOStreams) *cobra.Command { api-provider Create a service provider workspace with a sample API (cowboys) and a consumer workspace with a binding to it. (default) + workspaces Create an organization workspace populated with a + randomly-shaped tree of child workspaces. Useful for + performance testing the workspace control plane. + Tune with --tree-depth, --tree-count, --tree-seed. Use --cleanup to tear down all resources created by a previous run. `), diff --git a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/plugin/options.go b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/plugin/options.go index 2c994d364f9..06229d05c87 100644 --- a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/plugin/options.go +++ b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/plugin/options.go @@ -23,7 +23,6 @@ import ( "github.com/spf13/cobra" - "k8s.io/apimachinery/pkg/util/validation" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/client-go/rest" @@ -45,6 +44,11 @@ type QuickstartOptions struct { WithSamples bool Timeout time.Duration + // workspaces scenario tuning + TreeDepth int + TreeCount int + TreeSeed int64 + scenario scenarios.Scenario enterWorkspace func(ctx context.Context, path string) error newUseWorkspaceOpts func(genericiooptions.IOStreams) *workspaceplugin.UseWorkspaceOptions @@ -58,6 +62,8 @@ func NewQuickstartOptions(streams genericiooptions.IOStreams) *QuickstartOptions Scenario: "api-provider", NamePrefix: "quickstart", Timeout: defaultTimeout, + TreeDepth: 3, + TreeCount: 50, newUseWorkspaceOpts: workspaceplugin.NewUseWorkspaceOptions, newKCPClusterClient: defaultKCPClusterClient, newKCPDynamicClient: defaultKCPDynamicClient, @@ -78,12 +84,15 @@ func (o *QuickstartOptions) defaultEnterWorkspace(ctx context.Context, path stri func (o *QuickstartOptions) BindFlags(cmd *cobra.Command) { o.Options.BindFlags(cmd) - cmd.Flags().StringVar(&o.Scenario, "scenario", o.Scenario, "Scenario to bootstrap (api-provider)") + cmd.Flags().StringVar(&o.Scenario, "scenario", o.Scenario, "Scenario to bootstrap (api-provider, workspaces)") cmd.Flags().StringVar(&o.NamePrefix, "name-prefix", o.NamePrefix, "Prefix for created workspace names") cmd.Flags().BoolVar(&o.Cleanup, "cleanup", o.Cleanup, "Delete all resources created by a previous quickstart run. Relies on kcp cascading deletion from the org workspace: APIResourceSchemas, APIExports, and APIBindings inside child workspaces are removed as part of the cascade, not individually") cmd.Flags().BoolVar(&o.Enter, "enter", o.Enter, "Switch kubeconfig to the consumer workspace when done") cmd.Flags().BoolVar(&o.WithSamples, "with-samples", o.WithSamples, "Apply sample resources (Cowboys) into the consumer workspace after setup") cmd.Flags().DurationVar(&o.Timeout, "timeout", o.Timeout, "Maximum time to wait for workspaces to become ready or finish terminating") + cmd.Flags().IntVar(&o.TreeDepth, "tree-depth", o.TreeDepth, "[workspaces scenario] Maximum depth of the random workspace tree below the org workspace") + cmd.Flags().IntVar(&o.TreeCount, "tree-count", o.TreeCount, "[workspaces scenario] Total number of child workspaces to create in the random tree") + cmd.Flags().Int64Var(&o.TreeSeed, "tree-seed", o.TreeSeed, "[workspaces scenario] Seed for the PRNG that shapes the random tree (0 = use wall clock)") } func (o *QuickstartOptions) Complete(args []string) error { @@ -100,6 +109,14 @@ func (o *QuickstartOptions) Complete(args []string) error { return fmt.Errorf("failed to get scenario: %w", err) } + if c, ok := s.(scenarios.WorkspacesConfigurable); ok { + c.SetWorkspacesConfig(scenarios.WorkspacesConfig{ + Depth: o.TreeDepth, + Count: o.TreeCount, + Seed: o.TreeSeed, + }) + } + o.scenario = s return nil @@ -114,10 +131,9 @@ func (o *QuickstartOptions) Validate() error { return fmt.Errorf("--name-prefix must not be empty") } - for _, suffix := range []string{scenarios.OrgSuffix, scenarios.ProviderSuffix, scenarios.ConsumerSuffix} { - name := o.NamePrefix + suffix - if errs := validation.IsDNS1123Label(name); len(errs) > 0 { - return fmt.Errorf("--name-prefix %q produces invalid workspace name %q: %v", o.NamePrefix, name, errs) + if o.scenario != nil { + if err := o.scenario.Validate(o.NamePrefix); err != nil { + return err } } diff --git a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/plugin/quickstart_test.go b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/plugin/quickstart_test.go index ca4bed3dff1..ffb6d14307b 100644 --- a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/plugin/quickstart_test.go +++ b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/plugin/quickstart_test.go @@ -315,6 +315,7 @@ func (c *cleanupScenario) EnterPath(_ map[string]string) string { func (c *cleanupScenario) PrintSummary(_ io.Writer, _ string, _ map[string]string) error { return nil } +func (c *cleanupScenario) Validate(_ string) error { return nil } type mockScenario struct{} @@ -339,6 +340,7 @@ func (m *mockScenario) EnterPath(state map[string]string) string { return state["consumer-path"] } func (m *mockScenario) PrintSummary(_ io.Writer, _ string, _ map[string]string) error { return nil } +func (m *mockScenario) Validate(_ string) error { return nil } type fakeClientConfig struct{} diff --git a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/api_provider.go b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/api_provider.go index 3c04ab02dcc..ee8e4f7aff2 100644 --- a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/api_provider.go +++ b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/api_provider.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/wait" pluginhelpers "github.com/kcp-dev/cli/pkg/helpers" @@ -54,6 +55,16 @@ type apiProviderScenario struct{} func (s *apiProviderScenario) Name() string { return "api-provider" } +func (s *apiProviderScenario) Validate(prefix string) error { + for _, suffix := range []string{OrgSuffix, ProviderSuffix, ConsumerSuffix} { + name := prefix + suffix + if errs := validation.IsDNS1123Label(name); len(errs) > 0 { + return fmt.Errorf("--name-prefix %q produces invalid workspace name %q: %v", prefix, name, errs) + } + } + return nil +} + // EnterPath returns the absolute workspace path that --enter should switch to, // or "" if the scenario does not support it. func (s *apiProviderScenario) EnterPath(state map[string]string) string { diff --git a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/scenario.go b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/scenario.go index 9ea27d91954..143d626c9e2 100644 --- a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/scenario.go +++ b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/scenario.go @@ -57,6 +57,10 @@ type Scenario interface { Samples(prefix string) []Step EnterPath(state map[string]string) string PrintSummary(out io.Writer, prefix string, state map[string]string) error + + // Validate checks scenario-specific constraints, including that all + // workspace names derived from prefix are valid DNS-1123 labels. + Validate(prefix string) error } type ExecutionContext struct { @@ -72,6 +76,7 @@ type ExecutionContext struct { var registry = map[string]func() Scenario{ "api-provider": func() Scenario { return &apiProviderScenario{} }, + "workspaces": func() Scenario { return &workspacesScenario{} }, } func Get(name string) (Scenario, error) { diff --git a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/scenarios_test.go b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/scenarios_test.go index 6f77d7fa9e6..a6fdfd2a708 100644 --- a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/scenarios_test.go +++ b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/scenarios_test.go @@ -36,6 +36,10 @@ func TestScenariosGet(t *testing.T) { name: "valid scenario", scenario: "api-provider", }, + { + name: "workspaces scenario", + scenario: "workspaces", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/workspaces.go b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/workspaces.go new file mode 100644 index 00000000000..cd3e94ed677 --- /dev/null +++ b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/workspaces.go @@ -0,0 +1,298 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scenarios + +import ( + "context" + "fmt" + "io" + "math/rand" + "sync" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/kcp-dev/logicalcluster/v3" + tenancyv1alpha1 "github.com/kcp-dev/sdk/apis/tenancy/v1alpha1" +) + +const ( + stateKeyTreeOrgPath = "tree-org-path" + stateKeyTreeCreated = "tree-created" + + treeProgressInterval = 25 +) + +// WorkspacesConfig configures the workspaces (performance) scenario. +type WorkspacesConfig struct { + // Depth is the maximum depth of the generated tree below the org workspace. + // depth=1 produces a flat fan-out under the org; depth=2 allows grandchildren, etc. + Depth int + // Count is the total number of child workspaces to create below the org. + Count int + // Seed seeds the PRNG used to shape the tree. Zero means "pick at runtime". + Seed int64 +} + +// WorkspacesConfigurable lets the plugin pass scenario-specific options +// without leaking implementation details through the Scenario interface. +type WorkspacesConfigurable interface { + SetWorkspacesConfig(cfg WorkspacesConfig) +} + +type workspacesScenario struct { + depth int + count int + seed int64 + + once sync.Once + tree []treeNode +} + +type treeNode struct { + name string + parentIdx int // -1 means the org workspace + depth int // 0 = org, 1..depth = children +} + +func (s *workspacesScenario) Name() string { return "workspaces" } + +func (s *workspacesScenario) SetWorkspacesConfig(cfg WorkspacesConfig) { + s.depth = cfg.Depth + s.count = cfg.Count + s.seed = cfg.Seed +} + +func (s *workspacesScenario) EnterPath(state map[string]string) string { + return state[stateKeyTreeOrgPath] +} + +func (s *workspacesScenario) Validate(prefix string) error { + name := prefix + OrgSuffix + if errs := validation.IsDNS1123Label(name); len(errs) > 0 { + return fmt.Errorf("--name-prefix %q produces invalid workspace name %q: %v", prefix, name, errs) + } + if s.count > 0 { + last := childName(prefix, s.count) + if errs := validation.IsDNS1123Label(last); len(errs) > 0 { + return fmt.Errorf("--name-prefix %q with --tree-count %d produces invalid workspace name %q: %v", prefix, s.count, last, errs) + } + } + if s.depth < 1 { + return fmt.Errorf("--tree-depth must be >= 1, got %d", s.depth) + } + if s.count < 0 { + return fmt.Errorf("--tree-count must be >= 0, got %d", s.count) + } + return nil +} + +func (s *workspacesScenario) Steps(prefix string) []Step { + orgName := prefix + OrgSuffix + + return []Step{ + { + Description: fmt.Sprintf("Creating organization workspace %q", orgName), + CleanupDescription: fmt.Sprintf("Deleting organization workspace %q (cascades %d child workspaces)", orgName, s.count), + Execute: func(ctx context.Context, execCtx ExecutionContext) error { + return createWorkspaceStep(ctx, execCtx, logicalcluster.NewPath("root"), orgName, + &tenancyv1alpha1.WorkspaceTypeReference{Name: "organization", Path: "root"}, + map[string]string{quickstartLabel: "true", quickstartPrefixLabel: prefix}, + stateKeyTreeOrgPath) + }, + Cleanup: func(ctx context.Context, execCtx ExecutionContext) error { + return deleteOrgAndWait(ctx, execCtx, orgName) + }, + }, + { + Description: fmt.Sprintf("Creating %d workspaces (max depth %d, seed %d)", s.count, s.depth, s.effectiveSeed()), + Execute: func(ctx context.Context, execCtx ExecutionContext) error { + return s.createTree(ctx, execCtx, prefix) + }, + }, + } +} + +func (s *workspacesScenario) Samples(_ string) []Step { return nil } + +func (s *workspacesScenario) PrintSummary(out io.Writer, prefix string, state map[string]string) error { + orgName := prefix + OrgSuffix + orgPath := state[stateKeyTreeOrgPath] + created := state[stateKeyTreeCreated] + if created == "" { + created = "0" + } + + _, err := fmt.Fprintf(out, ` +Quickstart complete! Here's what was created: + + Workspace hierarchy: + root + +-- %s (organization) + +-- %s child workspaces in a random tree (max depth %d, seed %d) + + Org path: %s + + Explore: + kubectl ws :%s + kubectl get workspaces + + Cleanup: + kubectl kcp quickstart --scenario workspaces --cleanup --name-prefix %s +`, + orgName, + created, + s.depth, + s.effectiveSeed(), + orgPath, + orgPath, + prefix, + ) + return err +} + +// effectiveSeed returns the seed actually used: either the explicit one, or a +// stable per-instance value derived once from the wall clock. +func (s *workspacesScenario) effectiveSeed() int64 { + s.once.Do(func() { + if s.seed == 0 { + s.seed = time.Now().UnixNano() + } + }) + return s.seed +} + +// generateTree builds the parent-child structure deterministically given the +// scenario config. It is cached so cleanup and run see the same shape. +func (s *workspacesScenario) generateTree(prefix string) []treeNode { + if s.tree != nil { + return s.tree + } + + rng := rand.New(rand.NewSource(s.effectiveSeed())) + + // index 0 is the org (depth 0). Children come after. + nodes := []treeNode{{name: prefix + OrgSuffix, parentIdx: -1, depth: 0}} + + for i := 1; i <= s.count; i++ { + eligible := make([]int, 0, len(nodes)) + for j, n := range nodes { + if n.depth < s.depth { + eligible = append(eligible, j) + } + } + parentIdx := eligible[rng.Intn(len(eligible))] + nodes = append(nodes, treeNode{ + name: childName(prefix, i), + parentIdx: parentIdx, + depth: nodes[parentIdx].depth + 1, + }) + } + + s.tree = nodes + return s.tree +} + +func (s *workspacesScenario) createTree(ctx context.Context, execCtx ExecutionContext, prefix string) error { + nodes := s.generateTree(prefix) + if len(nodes) <= 1 { + fmt.Fprintf(execCtx.Out, " No child workspaces to create (count=0)\n") + execCtx.State[stateKeyTreeCreated] = "0" + return nil + } + + // Paths indexed alongside nodes. nodePaths[0] is the org. + nodePaths := make([]logicalcluster.Path, len(nodes)) + nodePaths[0] = logicalcluster.NewPath(execCtx.State[stateKeyTreeOrgPath]) + + wsType := &tenancyv1alpha1.WorkspaceTypeReference{Name: "universal", Path: "root"} + labels := map[string]string{quickstartLabel: "true", quickstartPrefixLabel: prefix} + + start := time.Now() + created := 0 + + for i := 1; i < len(nodes); i++ { + n := nodes[i] + parentPath := nodePaths[n.parentIdx] + + childPath, _, err := createWorkspaceAndWait(ctx, execCtx.KCPClusterClient, parentPath, n.name, wsType, labels) + if err != nil { + return fmt.Errorf("creating workspace %d/%d (%q at depth %d under %s): %w", + i, len(nodes)-1, n.name, n.depth, parentPath, err) + } + nodePaths[i] = childPath + created++ + + if created%treeProgressInterval == 0 || i == len(nodes)-1 { + elapsed := time.Since(start) + rate := float64(created) / elapsed.Seconds() + fmt.Fprintf(execCtx.Out, " Progress: %d/%d workspaces created (%.1f ws/s, depth so far: %d)\n", + created, len(nodes)-1, rate, n.depth) + } + } + + execCtx.State[stateKeyTreeCreated] = fmt.Sprintf("%d", created) + return nil +} + +func childName(prefix string, i int) string { + return fmt.Sprintf("%s-ws-%d", prefix, i) +} + +// deleteOrgAndWait removes the org workspace and waits for the cascading +// deletion of all children to complete. Shared shape with api-provider's +// org cleanup but kept local to avoid coupling the two scenarios. +func deleteOrgAndWait(ctx context.Context, execCtx ExecutionContext, orgName string) error { + rootPath := logicalcluster.NewPath("root") + err := execCtx.KCPClusterClient.Cluster(rootPath).TenancyV1alpha1().Workspaces(). + Delete(ctx, orgName, metav1.DeleteOptions{}) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + + fmt.Fprintf(execCtx.Out, " Waiting for workspace %q and all children to finish terminating...\n", orgName) + lastLog := time.Now() + return wait.PollUntilContextCancel(ctx, pollIntervalCleanup, true, + func(ctx context.Context) (bool, error) { + ws, err := execCtx.KCPClusterClient.Cluster(rootPath).TenancyV1alpha1().Workspaces(). + Get(ctx, orgName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return true, nil + } + if err != nil { + return false, err + } + + if time.Since(lastLog) >= logThrottleInterval { + if ws.DeletionTimestamp != nil { + fmt.Fprintf(execCtx.Out, " Still terminating (cascading deletion in progress)...\n") + } else { + fmt.Fprintf(execCtx.Out, " Deletion pending (phase: %s)...\n", ws.Status.Phase) + } + lastLog = time.Now() + } + + return false, nil + }, + ) +} diff --git a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/workspaces_test.go b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/workspaces_test.go new file mode 100644 index 00000000000..3c2fa629a7f --- /dev/null +++ b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/workspaces_test.go @@ -0,0 +1,187 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scenarios + +import ( + "fmt" + "strings" + "testing" +) + +func TestWorkspacesValidate(t *testing.T) { + tests := []struct { + name string + prefix string + cfg WorkspacesConfig + wantErr string + }{ + { + name: "valid defaults", + prefix: "perf", + cfg: WorkspacesConfig{Depth: 3, Count: 50}, + }, + { + name: "depth zero rejected", + prefix: "perf", + cfg: WorkspacesConfig{Depth: 0, Count: 10}, + wantErr: "--tree-depth must be >= 1", + }, + { + name: "negative count rejected", + prefix: "perf", + cfg: WorkspacesConfig{Depth: 2, Count: -1}, + wantErr: "--tree-count must be >= 0", + }, + { + name: "count zero allowed", + prefix: "perf", + cfg: WorkspacesConfig{Depth: 2, Count: 0}, + }, + { + name: "uppercase prefix rejected", + prefix: "Perf", + cfg: WorkspacesConfig{Depth: 2, Count: 5}, + wantErr: "invalid workspace name", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &workspacesScenario{} + s.SetWorkspacesConfig(tt.cfg) + err := s.Validate(tt.prefix) + if tt.wantErr != "" { + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("Validate() error = %v, want error containing %q", err, tt.wantErr) + } + return + } + if err != nil { + t.Errorf("Validate() unexpected error: %v", err) + } + }) + } +} + +func TestWorkspacesGenerateTree(t *testing.T) { + tests := []struct { + name string + depth int + count int + prefix string + }{ + {name: "small flat tree", depth: 1, count: 5, prefix: "p"}, + {name: "small deep tree", depth: 3, count: 20, prefix: "p"}, + {name: "single node tree", depth: 5, count: 1, prefix: "p"}, + {name: "empty tree", depth: 2, count: 0, prefix: "p"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &workspacesScenario{} + s.SetWorkspacesConfig(WorkspacesConfig{Depth: tt.depth, Count: tt.count, Seed: 42}) + + nodes := s.generateTree(tt.prefix) + + if len(nodes) != tt.count+1 { + t.Fatalf("len(nodes) = %d, want %d (org + %d children)", len(nodes), tt.count+1, tt.count) + } + + // First node is the org at depth 0. + if nodes[0].depth != 0 { + t.Errorf("nodes[0].depth = %d, want 0", nodes[0].depth) + } + if nodes[0].parentIdx != -1 { + t.Errorf("nodes[0].parentIdx = %d, want -1", nodes[0].parentIdx) + } + if nodes[0].name != tt.prefix+OrgSuffix { + t.Errorf("nodes[0].name = %q, want %q", nodes[0].name, tt.prefix+OrgSuffix) + } + + seen := map[string]bool{nodes[0].name: true} + for i := 1; i < len(nodes); i++ { + n := nodes[i] + if n.parentIdx < 0 || n.parentIdx >= i { + t.Errorf("node %d has parentIdx %d, must be in [0,%d)", i, n.parentIdx, i) + } + if got, want := n.depth, nodes[n.parentIdx].depth+1; got != want { + t.Errorf("node %d depth = %d, want %d", i, got, want) + } + if n.depth > tt.depth { + t.Errorf("node %d depth %d exceeds max %d", i, n.depth, tt.depth) + } + if n.depth < 1 { + t.Errorf("node %d depth %d must be >= 1 for child", i, n.depth) + } + if seen[n.name] { + t.Errorf("node %d has duplicate name %q", i, n.name) + } + seen[n.name] = true + } + }) + } +} + +func TestWorkspacesGenerateTreeDeterministic(t *testing.T) { + cfg := WorkspacesConfig{Depth: 3, Count: 50, Seed: 1234} + + s1 := &workspacesScenario{} + s1.SetWorkspacesConfig(cfg) + t1 := s1.generateTree("p") + + s2 := &workspacesScenario{} + s2.SetWorkspacesConfig(cfg) + t2 := s2.generateTree("p") + + if len(t1) != len(t2) { + t.Fatalf("tree lengths differ: %d vs %d", len(t1), len(t2)) + } + for i := range t1 { + if t1[i] != t2[i] { + t.Errorf("trees diverge at node %d: %+v vs %+v", i, t1[i], t2[i]) + } + } +} + +func TestWorkspacesGenerateTreeCached(t *testing.T) { + s := &workspacesScenario{} + s.SetWorkspacesConfig(WorkspacesConfig{Depth: 3, Count: 10, Seed: 0}) + + first := s.generateTree("p") + second := s.generateTree("p") + + if fmt.Sprintf("%p", first) != fmt.Sprintf("%p", second) { + t.Error("generateTree returned different slices on repeat call; expected cached result") + } +} + +func TestWorkspacesSteps(t *testing.T) { + s := &workspacesScenario{} + s.SetWorkspacesConfig(WorkspacesConfig{Depth: 2, Count: 5, Seed: 1}) + + steps := s.Steps("perf") + if len(steps) != 2 { + t.Fatalf("Steps() returned %d steps, want 2", len(steps)) + } + if steps[0].Cleanup == nil { + t.Error("Steps()[0] (org creation) must have a Cleanup func") + } + if steps[1].Cleanup != nil { + t.Error("Steps()[1] (tree creation) should not have a Cleanup func; org deletion cascades") + } + if !strings.Contains(steps[0].Description, "perf-org") { + t.Errorf("Steps()[0].Description = %q, want it to contain %q", steps[0].Description, "perf-org") + } +} From e2f3a23048bb60c5d560eb9e4b07e3c1fba3a235 Mon Sep 17 00:00:00 2001 From: Mangirdas Judeikis Date: Thu, 21 May 2026 09:49:26 +0300 Subject: [PATCH 3/7] Add client evictor to recover memory Signed-off-by: Mangirdas Judeikis On-behalf-of: SAP --- pkg/server/clientcache_evictor.go | 57 +++++++++++++++++++ pkg/server/server.go | 1 + .../apimachinery/pkg/client/constructor.go | 55 +++++++++++++++++- 3 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 pkg/server/clientcache_evictor.go diff --git a/pkg/server/clientcache_evictor.go b/pkg/server/clientcache_evictor.go new file mode 100644 index 00000000000..dde54afe22c --- /dev/null +++ b/pkg/server/clientcache_evictor.go @@ -0,0 +1,57 @@ +/* +Copyright 2026 The kcp Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + apiclient "github.com/kcp-dev/apimachinery/v2/pkg/client" + "github.com/kcp-dev/logicalcluster/v3" + corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" + corev1alpha1informers "github.com/kcp-dev/sdk/client/informers/externalversions/core/v1alpha1" +) + +// installClientCacheEvictor registers a LogicalCluster delete handler that +// notifies apiclient.EvictCluster, which fans out to every per-cluster client +// cache (kube, sdk, dynamic, metadata, ...) constructed via apiclient.NewCache. +// Without this, those caches grow monotonically and pin per-cluster REST +// clients, codec factories, JSON-decoded schemas, etc. for the lifetime of +// the process. See https://github.com/kcp-dev/kcp/issues/4071. +func installClientCacheEvictor(informer corev1alpha1informers.LogicalClusterClusterInformer) { + _, _ = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj any) { + lc, ok := obj.(*corev1alpha1.LogicalCluster) + if !ok { + tombstone, tok := obj.(cache.DeletedFinalStateUnknown) + if !tok { + return + } + lc, ok = tombstone.Obj.(*corev1alpha1.LogicalCluster) + if !ok { + return + } + } + name := logicalcluster.From(lc) + if name == "" { + return + } + klog.V(4).InfoS("evicting per-cluster client caches", "logicalcluster", name) + apiclient.EvictCluster(name.Path()) + }, + }) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index e8173ec011f..720dab16db6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -493,6 +493,7 @@ func (s *Server) Run(ctx context.Context) error { go s.CacheKcpSharedInformerFactory.Apis().V1alpha2().APIExports().Informer().Run(hookContext.Done()) go s.CacheKcpSharedInformerFactory.Cache().V1alpha1().CachedResources().Informer().Run(hookContext.Done()) go s.CacheKcpSharedInformerFactory.Cache().V1alpha1().CachedResourceEndpointSlices().Informer().Run(hookContext.Done()) + installClientCacheEvictor(s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters()) go s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters().Informer().Run(hookContext.Done()) go s.KcpSharedInformerFactory.Cache().V1alpha1().CachedResources().Informer().Run(hookContext.Done()) go s.KcpSharedInformerFactory.Cache().V1alpha1().CachedResourceEndpointSlices().Informer().Run(hookContext.Done()) diff --git a/staging/src/github.com/kcp-dev/apimachinery/pkg/client/constructor.go b/staging/src/github.com/kcp-dev/apimachinery/pkg/client/constructor.go index f41b3896a2f..7a6a1f92090 100644 --- a/staging/src/github.com/kcp-dev/apimachinery/pkg/client/constructor.go +++ b/staging/src/github.com/kcp-dev/apimachinery/pkg/client/constructor.go @@ -34,11 +34,55 @@ type Constructor[R any] struct { type Cache[R any] interface { ClusterOrDie(clusterPath logicalcluster.Path) R Cluster(clusterPath logicalcluster.Path) (R, error) + // Evict drops the cached client for clusterPath, if any. Used to release + // per-cluster client state (REST clients, codec factories, parsed + // schemas) when a logical cluster is deleted. Safe to call concurrently + // with Cluster / ClusterOrDie. No-op if the path is not cached. + Evict(clusterPath logicalcluster.Path) +} + +// Evictor is the non-generic subset of Cache used by EvictCluster to fan +// eviction out to every registered cache. Every cache returned by NewCache +// is auto-registered. Callers that wrap or substitute caches may register +// their own implementation via RegisterEvictor. +type Evictor interface { + Evict(clusterPath logicalcluster.Path) +} + +var ( + evictorsMu sync.RWMutex + evictors []Evictor +) + +// RegisterEvictor adds e to the set of caches notified by EvictCluster. +// NewCache calls this automatically. +func RegisterEvictor(e Evictor) { + evictorsMu.Lock() + defer evictorsMu.Unlock() + evictors = append(evictors, e) +} + +// EvictCluster notifies every registered cache that clusterPath has been +// deleted and its cached client (and everything that client transitively +// pins — REST client, codec factory, JSON decoder state, OpenAPI schemas) +// can be released. Wire this to a LogicalCluster delete handler to bound +// retained memory per workspace lifetime. See +// https://github.com/kcp-dev/kcp/issues/4071. +func EvictCluster(clusterPath logicalcluster.Path) { + evictorsMu.RLock() + snapshot := make([]Evictor, len(evictors)) + copy(snapshot, evictors) + evictorsMu.RUnlock() + for _, e := range snapshot { + e.Evict(clusterPath) + } } // NewCache creates a new client factory cache using the given constructor. +// The cache is auto-registered with the package-level EvictCluster fan-out +// so per-cluster entries can be released when a LogicalCluster is deleted. func NewCache[R any](cfg *rest.Config, client *http.Client, constructor *Constructor[R]) Cache[R] { - return &clientCache[R]{ + c := &clientCache[R]{ cfg: cfg, client: client, constructor: constructor, @@ -46,6 +90,8 @@ func NewCache[R any](cfg *rest.Config, client *http.Client, constructor *Constru RWMutex: &sync.RWMutex{}, clientsByClusterPath: map[logicalcluster.Path]R{}, } + RegisterEvictor(c) + return c } type clientCache[R any] struct { @@ -99,3 +145,10 @@ func (c *clientCache[R]) Cluster(clusterPath logicalcluster.Path) (R, error) { return instance, nil } + +// Evict drops the cached client for clusterPath, if any. +func (c *clientCache[R]) Evict(clusterPath logicalcluster.Path) { + c.Lock() + defer c.Unlock() + delete(c.clientsByClusterPath, clusterPath) +} From a4511da00359bf62d7534a336692e6087b676962 Mon Sep 17 00:00:00 2001 From: Mangirdas Judeikis Date: Thu, 21 May 2026 11:40:39 +0300 Subject: [PATCH 4/7] move to ctx logger --- pkg/server/clientcache_evictor.go | 7 +++++-- pkg/server/server.go | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/server/clientcache_evictor.go b/pkg/server/clientcache_evictor.go index dde54afe22c..cc42099dc1b 100644 --- a/pkg/server/clientcache_evictor.go +++ b/pkg/server/clientcache_evictor.go @@ -17,6 +17,8 @@ limitations under the License. package server import ( + "context" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -32,7 +34,8 @@ import ( // Without this, those caches grow monotonically and pin per-cluster REST // clients, codec factories, JSON-decoded schemas, etc. for the lifetime of // the process. See https://github.com/kcp-dev/kcp/issues/4071. -func installClientCacheEvictor(informer corev1alpha1informers.LogicalClusterClusterInformer) { +func installClientCacheEvictor(ctx context.Context, informer corev1alpha1informers.LogicalClusterClusterInformer) { + logger := klog.FromContext(ctx) _, _ = informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj any) { lc, ok := obj.(*corev1alpha1.LogicalCluster) @@ -50,7 +53,7 @@ func installClientCacheEvictor(informer corev1alpha1informers.LogicalClusterClus if name == "" { return } - klog.V(4).InfoS("evicting per-cluster client caches", "logicalcluster", name) + logger.V(4).Info("evicting per-cluster client caches", "logicalcluster", name) apiclient.EvictCluster(name.Path()) }, }) diff --git a/pkg/server/server.go b/pkg/server/server.go index 720dab16db6..520eab65336 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -493,7 +493,7 @@ func (s *Server) Run(ctx context.Context) error { go s.CacheKcpSharedInformerFactory.Apis().V1alpha2().APIExports().Informer().Run(hookContext.Done()) go s.CacheKcpSharedInformerFactory.Cache().V1alpha1().CachedResources().Informer().Run(hookContext.Done()) go s.CacheKcpSharedInformerFactory.Cache().V1alpha1().CachedResourceEndpointSlices().Informer().Run(hookContext.Done()) - installClientCacheEvictor(s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters()) + installClientCacheEvictor(ctx, s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters()) go s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters().Informer().Run(hookContext.Done()) go s.KcpSharedInformerFactory.Cache().V1alpha1().CachedResources().Informer().Run(hookContext.Done()) go s.KcpSharedInformerFactory.Cache().V1alpha1().CachedResourceEndpointSlices().Informer().Run(hookContext.Done()) From c7925ebe35cf90213fac1809f75beb0e0084261f Mon Sep 17 00:00:00 2001 From: Mangirdas Judeikis Date: Thu, 21 May 2026 11:47:53 +0300 Subject: [PATCH 5/7] update doc to use new quickstart --- .../investigations/memory-leak-attribution.md | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/docs/content/developers/investigations/memory-leak-attribution.md b/docs/content/developers/investigations/memory-leak-attribution.md index b4bf613779b..32e6385fd91 100644 --- a/docs/content/developers/investigations/memory-leak-attribution.md +++ b/docs/content/developers/investigations/memory-leak-attribution.md @@ -103,10 +103,7 @@ Typical loop: ./hack/run-with-prometheus.sh ./bin/kcp start --debug-socket-path=/tmp/kcp-debug.sock # In another shell, drive workspace churn -for i in {1..100}; do - kubectl apply -f workspace.yaml; sleep 3 - kubectl delete -f workspace.yaml; sleep 3 -done +kubectl-kcp quickstart --scenario workspaces --tree-depth 10 --tree-count 1000 # Watch http://localhost:9090/graph?g0.expr=go_goroutines and confirm slope # When the count is clearly elevated, grab a labeled dump: @@ -202,10 +199,7 @@ To diff before/after a known-clean baseline (the technique used in curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/goroutine?debug=2' > before.txt # Run churn: create/delete N workspaces in a loop -for i in {1..100}; do - kubectl apply -f workspace.yaml; sleep 3 - kubectl delete -f workspace.yaml; sleep 3 -done +kubectl-kcp quickstart --scenario workspaces --tree-depth 10 --tree-count 1000 # After churn curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/goroutine?debug=2' > after.txt From 285664994f2fd9ffb4fd0f8b782b69ddf0a3e621 Mon Sep 17 00:00:00 2001 From: Mangirdas Judeikis Date: Thu, 21 May 2026 13:03:10 +0300 Subject: [PATCH 6/7] address reviews --- .../developers/investigations/memory-leak-attribution.md | 7 ++++--- pkg/server/server.go | 2 +- .../cli/pkg/quickstart/scenarios/workspaces_test.go | 6 ++++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/docs/content/developers/investigations/memory-leak-attribution.md b/docs/content/developers/investigations/memory-leak-attribution.md index 32e6385fd91..2cd728058e9 100644 --- a/docs/content/developers/investigations/memory-leak-attribution.md +++ b/docs/content/developers/investigations/memory-leak-attribution.md @@ -195,14 +195,15 @@ To diff before/after a known-clean baseline (the technique used in [#3350](https://github.com/kcp-dev/kcp/issues/3350)): ``` -# Baseline: kcp running, no churn -curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/goroutine?debug=2' > before.txt +# Baseline: kcp running, no churn. Use debug=1 — labels only appear in the +# aggregated format. +curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/goroutine?debug=1' > before.txt # Run churn: create/delete N workspaces in a loop kubectl-kcp quickstart --scenario workspaces --tree-depth 10 --tree-count 1000 # After churn -curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/goroutine?debug=2' > after.txt +curl --unix-socket /tmp/kcp-debug.sock 'http://localhost/debug/pprof/goroutine?debug=1' > after.txt # Compare counts per controller label diff <(grep -oE '"controller":"[^"]+"' before.txt | sort | uniq -c) \ diff --git a/pkg/server/server.go b/pkg/server/server.go index 520eab65336..54864738096 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -493,7 +493,7 @@ func (s *Server) Run(ctx context.Context) error { go s.CacheKcpSharedInformerFactory.Apis().V1alpha2().APIExports().Informer().Run(hookContext.Done()) go s.CacheKcpSharedInformerFactory.Cache().V1alpha1().CachedResources().Informer().Run(hookContext.Done()) go s.CacheKcpSharedInformerFactory.Cache().V1alpha1().CachedResourceEndpointSlices().Informer().Run(hookContext.Done()) - installClientCacheEvictor(ctx, s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters()) + installClientCacheEvictor(hookCtx, s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters()) go s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters().Informer().Run(hookContext.Done()) go s.KcpSharedInformerFactory.Cache().V1alpha1().CachedResources().Informer().Run(hookContext.Done()) go s.KcpSharedInformerFactory.Cache().V1alpha1().CachedResourceEndpointSlices().Informer().Run(hookContext.Done()) diff --git a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/workspaces_test.go b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/workspaces_test.go index 3c2fa629a7f..6aeeada22c0 100644 --- a/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/workspaces_test.go +++ b/staging/src/github.com/kcp-dev/cli/pkg/quickstart/scenarios/workspaces_test.go @@ -17,7 +17,6 @@ limitations under the License. package scenarios import ( - "fmt" "strings" "testing" ) @@ -162,7 +161,10 @@ func TestWorkspacesGenerateTreeCached(t *testing.T) { first := s.generateTree("p") second := s.generateTree("p") - if fmt.Sprintf("%p", first) != fmt.Sprintf("%p", second) { + if len(first) == 0 || len(second) == 0 { + t.Fatal("generateTree returned an empty slice; expected non-empty cached result") + } + if &first[0] != &second[0] { t.Error("generateTree returned different slices on repeat call; expected cached result") } } From 60fd0638fc9ceb4b357477e2a4b6b28e199472ee Mon Sep 17 00:00:00 2001 From: Mangirdas Judeikis Date: Fri, 22 May 2026 11:12:29 +0300 Subject: [PATCH 7/7] add generation counter --- .../apimachinery/pkg/client/constructor.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/staging/src/github.com/kcp-dev/apimachinery/pkg/client/constructor.go b/staging/src/github.com/kcp-dev/apimachinery/pkg/client/constructor.go index 7a6a1f92090..ab7424cc552 100644 --- a/staging/src/github.com/kcp-dev/apimachinery/pkg/client/constructor.go +++ b/staging/src/github.com/kcp-dev/apimachinery/pkg/client/constructor.go @@ -89,6 +89,7 @@ func NewCache[R any](cfg *rest.Config, client *http.Client, constructor *Constru RWMutex: &sync.RWMutex{}, clientsByClusterPath: map[logicalcluster.Path]R{}, + evictGen: map[logicalcluster.Path]uint64{}, } RegisterEvictor(c) return c @@ -101,6 +102,16 @@ type clientCache[R any] struct { *sync.RWMutex clientsByClusterPath map[logicalcluster.Path]R + // evictGen is bumped on every Evict(path). A Cluster() build whose captured + // generation no longer matches at write time means an eviction raced in + // mid-build; the freshly-built client is returned to the caller but not + // cached, so we don't resurrect state for a deleted cluster. + // + // Entries are never deleted, so the map grows with the lifetime set of + // evicted paths. Per entry: ~16B string header + ~16-32B path bytes + 8B + // uint64 + ~26B map-bucket overhead ≈ ~70B. 100k churned workspaces ≈ ~7MB, + // which is bounded and not worth GCing. + evictGen map[logicalcluster.Path]uint64 } // ClusterOrDie returns a new client scoped to the given logical cluster, or panics if there @@ -122,6 +133,7 @@ func (c *clientCache[R]) Cluster(clusterPath logicalcluster.Path) (R, error) { var exists bool c.RLock() cachedClient, exists = c.clientsByClusterPath[clusterPath] + genAtStart := c.evictGen[clusterPath] c.RUnlock() if exists { return cachedClient, nil @@ -140,6 +152,12 @@ func (c *clientCache[R]) Cluster(clusterPath logicalcluster.Path) (R, error) { if exists { return cachedClient, nil } + if c.evictGen[clusterPath] != genAtStart { + // An Evict raced with this build. Hand the freshly-built client to the + // caller so its in-flight request can complete, but do not cache it — + // the cluster has been signalled as gone. + return instance, nil + } c.clientsByClusterPath[clusterPath] = instance @@ -151,4 +169,5 @@ func (c *clientCache[R]) Evict(clusterPath logicalcluster.Path) { c.Lock() defer c.Unlock() delete(c.clientsByClusterPath, clusterPath) + c.evictGen[clusterPath]++ }