diff --git a/README.md b/README.md index 9cb7c46..2de2dff 100644 --- a/README.md +++ b/README.md @@ -253,6 +253,86 @@ docker run --rm -p 6443:6443 \ --anonymous-auth=true ``` +### kplane-native endpoints +The apiserver exposes a small set of kplane-native primitives in addition to +the standard Kubernetes API. They are intentionally minimal in V0 — capability, +not opinion — and are mounted *inside* the multicluster routing chain so they +share the same auth, audit, and panic-recovery filters as regular K8s requests. + +#### `GET /clusters/{cluster}/control-plane/snapshot` +Returns an aggregate read of every resource type for which the apiserver +currently holds a live `MultiClusterInformer` in the requested cluster. The +read is served from in-memory caches (no etcd round-trip), so it is fast +enough to use in tight RL/agent rollout loops. + +Query params: +- `resource=[,...]` — limit the snapshot to a subset of resources. +- `includeEmpty=true` — include resources with zero items (default: omitted). +- `warm=true` — force creation of `MultiClusterInformer`s for every + registered storage before snapshotting. Off by default to keep snapshots + cheap; turn on only when you control what gets watched. + +Response shape: +```json +{ + "cluster": "team-alpha", + "snapshotTime": "2026-05-21T18:05:00.000Z", + "liveResources": 12, + "resources": [ + {"group": "", "resource": "namespaces", "synced": true, "itemCount": 4, "items": [...]}, + {"group": "", "resource": "configmaps", "synced": true, "itemCount": 2, "items": [...]} + ] +} +``` + +#### `kind: Fleet` (`kplane.dev/v1`) +A `Fleet` declares a desired number of virtual control planes derived from a +template. The in-process `FleetController` installs the CRD into the root +control plane on startup, then watches `Fleet` objects there and primes +member VCPs via the same `OnClusterSelected` pipeline that organic traffic +triggers. + +```yaml +apiVersion: kplane.dev/v1 +kind: Fleet +metadata: + name: rl-rollout +spec: + replicas: 1000 + namePrefix: rl- # synthesizes rl-0000, rl-0001, ... + # names: ["custom-a", "custom-b"] # alternative: explicit member IDs +``` + +Status reports per-member readiness and an aggregate `readyReplicas`: +```yaml +status: + observedGeneration: 1 + readyReplicas: 998 + members: + - clusterID: rl-0000 + phase: Ready + lastTransitionTime: 2026-05-21T18:06:12Z + ... +``` + +V0 scope intentionally excludes scenario seeding (apply YAML into each member +on bootstrap), TTL-based destruction, and snapshot of CRD-defined types — all +follow-ups. See [`docs/snapshot-and-fleet.md`](docs/snapshot-and-fleet.md) for +design notes. + +#### OpenAPI +The canonical OpenAPI 3 document for the kplane-native surface lives at +[`api/openapi/kplane.v1.yaml`](api/openapi/kplane.v1.yaml) and is also served +live by the apiserver at two server-scoped endpoints: + +- `GET /openapi/kplane.yaml` +- `GET /openapi/kplane.json` + +These endpoints are not cluster-scoped (no `/clusters/{cid}/control-plane/` +prefix) and are intentionally reachable anonymously so SDK generators and CI +can fetch them without credentials. SDKs generate against this document — see +the [Python SDK](https://github.com/kplane-dev/sdk-python) for the first one. + ### Tests Smoke test brings up the server against etcd and probes `/readyz` and discovery: ```bash diff --git a/api/openapi/kplane.v1.yaml b/api/openapi/kplane.v1.yaml new file mode 100644 index 0000000..51f54f6 --- /dev/null +++ b/api/openapi/kplane.v1.yaml @@ -0,0 +1,424 @@ +openapi: 3.1.0 +info: + title: kplane API + version: 0.1.0 + summary: kplane-native primitives for virtual control planes + description: | + The kplane apiserver exposes two surfaces beyond the standard Kubernetes + REST API: + + 1. **`GET /clusters/{cluster}/control-plane/snapshot`** — an aggregate, + in-memory read of every resource type with a live informer for the + requested virtual control plane (VCP). Designed for RL/agent rollouts + where you need to score plane state in a single round-trip. + + 2. **`kind: Fleet` (`kplane.dev/v1`)** — a Kubernetes-style CRD that + declares N VCPs derived from a template. Reconciled by an in-process + controller that primes members via the same bootstrap pipeline organic + traffic triggers. + + Fleet REST endpoints follow standard Kubernetes API conventions and their + full OpenAPI surface is also published live at + `/openapi/v3/apis/kplane.dev/v1`. The Fleet schemas here are the + authoritative version for SDK code generation. + license: + name: Apache-2.0 + url: https://www.apache.org/licenses/LICENSE-2.0 + contact: + name: kplane + url: https://kplane.dev + +servers: + - url: "https://{host}" + description: Apiserver + variables: + host: + default: "127.0.0.1:6443" + +security: + - bearerAuth: [] + +tags: + - name: snapshot + description: Aggregate in-memory reads of a virtual control plane + - name: fleet + description: Declarative N-plane provisioning + +paths: + /clusters/{cluster}/control-plane/snapshot: + get: + operationId: getSnapshot + summary: Aggregate snapshot of a virtual control plane + description: | + Returns the contents of every live `MultiClusterInformer` for the + requested cluster, served from in-memory caches (no etcd read). Use + this to score an agent rollout or capture a trajectory step. + tags: [snapshot] + parameters: + - $ref: "#/components/parameters/Cluster" + - in: query + name: resource + description: Comma-separated plural resource names to include. Omit for all. + required: false + schema: + type: string + example: "pods,configmaps" + - in: query + name: includeEmpty + description: Include resources with zero items (default false). + required: false + schema: + type: boolean + default: false + - in: query + name: warm + description: | + Force creation of `MultiClusterInformer`s for every registered + storage before snapshotting. Off by default; enable only when you + understand that it starts new watches that persist after the + request completes. + required: false + schema: + type: boolean + default: false + responses: + "200": + description: Snapshot payload + content: + application/json: + schema: + $ref: "#/components/schemas/Snapshot" + headers: + X-Snapshot-Cluster: + schema: { type: string } + description: The cluster the snapshot was taken from. + X-Snapshot-Time: + schema: { type: string, format: date-time } + description: Wall-clock time the snapshot was assembled. + "400": + $ref: "#/components/responses/BadRequest" + "401": + $ref: "#/components/responses/Unauthorized" + "403": + $ref: "#/components/responses/Forbidden" + "500": + $ref: "#/components/responses/ServerError" + + /apis/kplane.dev/v1/fleets: + get: + operationId: listFleets + summary: List all Fleets in the root control plane + tags: [fleet] + responses: + "200": + description: FleetList + content: + application/json: + schema: + $ref: "#/components/schemas/FleetList" + post: + operationId: createFleet + summary: Create a Fleet + tags: [fleet] + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/Fleet" + responses: + "201": + description: Fleet created + content: + application/json: + schema: + $ref: "#/components/schemas/Fleet" + + /apis/kplane.dev/v1/fleets/{name}: + parameters: + - $ref: "#/components/parameters/FleetName" + get: + operationId: getFleet + summary: Get a Fleet by name + tags: [fleet] + responses: + "200": + description: Fleet + content: + application/json: + schema: + $ref: "#/components/schemas/Fleet" + "404": + $ref: "#/components/responses/NotFound" + patch: + operationId: patchFleet + summary: Patch a Fleet + tags: [fleet] + requestBody: + required: true + content: + application/merge-patch+json: + schema: { type: object } + application/strategic-merge-patch+json: + schema: { type: object } + responses: + "200": + description: Patched Fleet + content: + application/json: + schema: + $ref: "#/components/schemas/Fleet" + delete: + operationId: deleteFleet + summary: Delete a Fleet + tags: [fleet] + responses: + "200": + description: Deleted + + /apis/kplane.dev/v1/fleets/{name}/status: + parameters: + - $ref: "#/components/parameters/FleetName" + get: + operationId: getFleetStatus + summary: Get only the .status subresource + tags: [fleet] + responses: + "200": + description: Fleet with status + content: + application/json: + schema: + $ref: "#/components/schemas/Fleet" + patch: + operationId: patchFleetStatus + summary: Patch .status subresource + tags: [fleet] + requestBody: + required: true + content: + application/merge-patch+json: + schema: { type: object } + responses: + "200": + description: Patched Fleet + content: + application/json: + schema: + $ref: "#/components/schemas/Fleet" + +components: + securitySchemes: + bearerAuth: + type: http + scheme: bearer + description: Kubernetes bearer token authentication. + + parameters: + Cluster: + in: path + name: cluster + required: true + description: Virtual control plane (VCP) ID. + schema: + type: string + pattern: "^[A-Za-z0-9][A-Za-z0-9_.-]*$" + example: "team-alpha" + FleetName: + in: path + name: name + required: true + schema: + type: string + example: "rl-rollout" + + responses: + BadRequest: + description: Bad request + Unauthorized: + description: Unauthenticated + Forbidden: + description: Forbidden + NotFound: + description: Not found + ServerError: + description: Internal server error + + schemas: + # ---------------- Snapshot ---------------- + Snapshot: + type: object + required: [cluster, snapshotTime, liveResources, resources] + properties: + cluster: + type: string + description: The cluster (VCP ID) this snapshot was taken from. + snapshotTime: + type: string + format: date-time + description: Wall-clock time the snapshot was assembled. + liveResources: + type: integer + description: Count of GroupResources with a live MultiClusterInformer at snapshot time. + resources: + type: array + description: Per-resource sections. + items: + $ref: "#/components/schemas/SnapshotResource" + + SnapshotResource: + type: object + required: [group, resource, synced, itemCount, items] + properties: + group: + type: string + description: API group, empty string for core. + resource: + type: string + description: Plural resource name. + synced: + type: boolean + description: Whether the underlying MultiClusterInformer has completed its initial sync. + itemCount: + type: integer + items: + type: array + description: | + Resource objects, with `metadata.managedFields` stripped to keep + payloads tractable. Schema is whatever the underlying K8s + resource defines; clients should accept arbitrary object shapes + here and validate against per-resource schemas if needed. + items: + type: object + additionalProperties: true + + # ---------------- Fleet ---------------- + Fleet: + type: object + required: [apiVersion, kind, metadata, spec] + properties: + apiVersion: + type: string + enum: ["kplane.dev/v1"] + kind: + type: string + enum: ["Fleet"] + metadata: + $ref: "#/components/schemas/ObjectMeta" + spec: + $ref: "#/components/schemas/FleetSpec" + status: + $ref: "#/components/schemas/FleetStatus" + + FleetList: + type: object + required: [apiVersion, kind, items] + properties: + apiVersion: + type: string + enum: ["kplane.dev/v1"] + kind: + type: string + enum: ["FleetList"] + metadata: + $ref: "#/components/schemas/ListMeta" + items: + type: array + items: + $ref: "#/components/schemas/Fleet" + + FleetSpec: + type: object + required: [replicas] + properties: + replicas: + type: integer + format: int32 + minimum: 0 + description: Desired number of VCPs in this Fleet. + namePrefix: + type: string + description: | + Synthesized cluster IDs use this prefix plus a 4-digit index + (e.g. `rl-0000`). Defaults to `-`. Ignored when + `names` is non-empty. + names: + type: array + description: Explicit cluster IDs. Length must equal `replicas`. + items: + type: string + ttlSeconds: + type: integer + format: int64 + minimum: 0 + description: | + Reserved for future use. V0 does not garbage-collect VCPs when a + Fleet is deleted; this field is parsed and stored but not yet + enforced. + + FleetStatus: + type: object + properties: + observedGeneration: + type: integer + format: int64 + readyReplicas: + type: integer + format: int32 + members: + type: array + items: + $ref: "#/components/schemas/FleetMember" + + FleetMember: + type: object + required: [clusterID, phase] + properties: + clusterID: + type: string + phase: + type: string + enum: [Pending, Ready, Failed] + message: + type: string + lastTransitionTime: + type: string + format: date-time + + # ---------------- Common K8s metadata ---------------- + ObjectMeta: + type: object + properties: + name: + type: string + namespace: + type: string + generation: + type: integer + format: int64 + resourceVersion: + type: string + creationTimestamp: + type: string + format: date-time + uid: + type: string + labels: + type: object + additionalProperties: + type: string + annotations: + type: object + additionalProperties: + type: string + + ListMeta: + type: object + properties: + resourceVersion: + type: string + continue: + type: string + remainingItemCount: + type: integer + format: int64 diff --git a/api/openapi/spec.go b/api/openapi/spec.go new file mode 100644 index 0000000..7b8e207 --- /dev/null +++ b/api/openapi/spec.go @@ -0,0 +1,41 @@ +/* +Copyright 2026 The kplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +// Package openapi embeds and serves the canonical OpenAPI 3 document for +// kplane-native endpoints (snapshot + Fleet). SDKs and tooling generate +// against this single source of truth at api/openapi/kplane.v1.yaml. +package openapi + +import ( + _ "embed" + "sync" + + "sigs.k8s.io/yaml" +) + +//go:embed kplane.v1.yaml +var specYAML []byte + +var ( + specJSONOnce sync.Once + specJSON []byte + specJSONErr error +) + +// YAML returns the OpenAPI document as YAML. +func YAML() []byte { return specYAML } + +// JSON returns the OpenAPI document as JSON (lazily converted, cached). +func JSON() ([]byte, error) { + specJSONOnce.Do(func() { + specJSON, specJSONErr = yaml.YAMLToJSON(specYAML) + }) + return specJSON, specJSONErr +} diff --git a/api/openapi/spec_test.go b/api/openapi/spec_test.go new file mode 100644 index 0000000..86cc1d2 --- /dev/null +++ b/api/openapi/spec_test.go @@ -0,0 +1,73 @@ +/* +Copyright 2026 The kplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package openapi + +import ( + "encoding/json" + "strings" + "testing" +) + +// TestSpecParses ensures the embedded YAML can be turned into JSON without +// errors. Catches any drift / syntax breakage at build time. +func TestSpecParses(t *testing.T) { + t.Parallel() + b, err := JSON() + if err != nil { + t.Fatalf("YAML→JSON conversion failed: %v", err) + } + if len(b) == 0 { + t.Fatalf("converted JSON is empty") + } + var doc map[string]any + if err := json.Unmarshal(b, &doc); err != nil { + t.Fatalf("JSON not parseable: %v", err) + } + if v, ok := doc["openapi"].(string); !ok || !strings.HasPrefix(v, "3.") { + t.Fatalf("missing/invalid openapi version field: %v", doc["openapi"]) + } +} + +// TestSpecContainsCoreSurface guards against accidental removal of the two +// V0 endpoints the SDK depends on. +func TestSpecContainsCoreSurface(t *testing.T) { + t.Parallel() + b, err := JSON() + if err != nil { + t.Fatalf("JSON: %v", err) + } + var doc struct { + Paths map[string]any `json:"paths"` + Components struct { + Schemas map[string]any `json:"schemas"` + } `json:"components"` + } + if err := json.Unmarshal(b, &doc); err != nil { + t.Fatalf("unmarshal: %v", err) + } + wantPaths := []string{ + "/clusters/{cluster}/control-plane/snapshot", + "/apis/kplane.dev/v1/fleets", + "/apis/kplane.dev/v1/fleets/{name}", + "/apis/kplane.dev/v1/fleets/{name}/status", + } + for _, p := range wantPaths { + if _, ok := doc.Paths[p]; !ok { + t.Errorf("missing path %q in spec", p) + } + } + wantSchemas := []string{"Snapshot", "SnapshotResource", "Fleet", "FleetList", "FleetSpec", "FleetStatus", "FleetMember"} + for _, s := range wantSchemas { + if _, ok := doc.Components.Schemas[s]; !ok { + t.Errorf("missing component schema %q in spec", s) + } + } +} diff --git a/cmd/apiserver/app/config.go b/cmd/apiserver/app/config.go index fdeb4c1..24aa929 100644 --- a/cmd/apiserver/app/config.go +++ b/cmd/apiserver/app/config.go @@ -148,8 +148,18 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { genericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler { ex := mc.PathExtractor{PathPrefix: mcOpts.PathPrefix, ControlPlaneSegment: mcOpts.ControlPlaneSegment} base := withVersionOverride(server.DefaultBuildHandlerChain(h, conf)) + // kplane OpenAPI document is server-scoped (not per-VCP) and intentionally + // public so SDK generators and CI can fetch it anonymously. It is wrapped + // here outside the cluster router. dispatch := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { cid, _, _ := mc.FromContext(r.Context()) + // kplane-native endpoints (non-K8s-resource routes) live here so they + // see the cluster context but still run through the upstream auth/ + // audit/panic-recovery chain via wrapClusterCRDHandler. + if matchSnapshot(r) { + wrapClusterCRDHandler(newSnapshotHandler(informerRegistry), conf, cid, false).ServeHTTP(w, r) + return + } if cid != "" && cid != mcOpts.DefaultCluster && crdRuntimeMgr != nil { if group, version, ok := apisGroupVersionFromPath(r.URL.Path); ok { served, err := crdRuntimeMgr.ServesGroupVersion(cid, group, version, genericConfig.DrainedNotify()) @@ -173,7 +183,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { } base.ServeHTTP(w, r) }) - return mc.WithClusterRouting(dispatch, ex, mcOpts) + return withKplaneOpenAPI(mc.WithClusterRouting(dispatch, ex, mcOpts)) } authManager := mcauth.NewManager(wait.ContextForChannel(genericConfig.DrainedNotify()), mcauth.Options{ @@ -336,6 +346,27 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { } crdController.EnsureCluster(clusterID) } + + // FleetController watches Fleet objects in the root control plane and + // primes member VCPs via the same OnClusterSelected pipeline that real + // traffic triggers. It installs its CRD into root on startup. + fleetController, fleetErr := mcbootstrap.NewFleetController(mcbootstrap.FleetControllerOptions{ + RootCluster: mcOpts.DefaultCluster, + BaseLoopbackClientConfig: genericConfig.LoopbackClientConfig, + PathPrefix: mcOpts.PathPrefix, + ControlPlaneSegment: mcOpts.ControlPlaneSegment, + EnsureCluster: func(clusterID string) { + if fn := mcOpts.OnClusterSelected; fn != nil { + fn(clusterID) + } + }, + ClientForCluster: clientPool.KubeClientForCluster, + }) + if fleetErr != nil { + klog.Errorf("mc.fleet controller init failed: %v", fleetErr) + } else { + fleetController.Start(genericConfig.DrainedNotify()) + } serveClusterCRD := func(w http.ResponseWriter, r *http.Request, conf *server.Config, clusterID, caller string) bool { group, version, ok := apisGroupVersionFromPath(r.URL.Path) if !ok { @@ -374,7 +405,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { } base.ServeHTTP(w, r) }) - return mc.WithClusterRouting(dispatch, ex, mcOpts) + return withKplaneOpenAPI(mc.WithClusterRouting(dispatch, ex, mcOpts)) } // Install admission chain on apiextensions as well { @@ -410,7 +441,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { } base.ServeHTTP(w, r) }) - return mc.WithClusterRouting(dispatch, ex, mcOpts) + return withKplaneOpenAPI(mc.WithClusterRouting(dispatch, ex, mcOpts)) } // Install admission chain on aggregator { diff --git a/cmd/apiserver/app/openapi.go b/cmd/apiserver/app/openapi.go new file mode 100644 index 0000000..4028689 --- /dev/null +++ b/cmd/apiserver/app/openapi.go @@ -0,0 +1,76 @@ +/* +Copyright 2026 The kplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package app + +import ( + "net/http" + + "github.com/kplane-dev/apiserver/api/openapi" +) + +// matchKplaneOpenAPI reports whether the request targets the kplane OpenAPI +// document. These endpoints live at the *server* root and are not scoped to +// any virtual control plane: +// +// /openapi/kplane.json +// /openapi/kplane.yaml +// +// PathExtractor only rewrites paths under /clusters/{cid}/control-plane/, so +// the unstripped form is what hits this matcher. +func matchKplaneOpenAPI(r *http.Request) (format string, ok bool) { + if r == nil { + return "", false + } + switch r.URL.Path { + case "/openapi/kplane.json": + return "json", true + case "/openapi/kplane.yaml": + return "yaml", true + } + return "", false +} + +// withKplaneOpenAPI wraps next so requests to /openapi/kplane.{json,yaml} +// short-circuit and serve the embedded spec. Anything else falls through. +// +// This wrapper is mounted on every BuildHandlerChainFunc in the apiserver +// (kube, apiextensions, aggregator) so the spec is reachable no matter which +// server ultimately handles the request. +func withKplaneOpenAPI(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if format, ok := matchKplaneOpenAPI(r); ok { + serveKplaneOpenAPI(w, format) + return + } + next.ServeHTTP(w, r) + }) +} + +// serveKplaneOpenAPI writes the embedded kplane OpenAPI document to w in the +// requested format. The document is the contract every SDK generates from. +func serveKplaneOpenAPI(w http.ResponseWriter, format string) { + switch format { + case "yaml": + w.Header().Set("Content-Type", "application/yaml") + _, _ = w.Write(openapi.YAML()) + return + case "json": + b, err := openapi.JSON() + if err != nil { + http.Error(w, "openapi: "+err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(b) + return + } + http.NotFound(w, nil) +} diff --git a/cmd/apiserver/app/snapshot.go b/cmd/apiserver/app/snapshot.go new file mode 100644 index 0000000..dd5cea4 --- /dev/null +++ b/cmd/apiserver/app/snapshot.go @@ -0,0 +1,182 @@ +/* +Copyright 2026 The kplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package app + +import ( + "encoding/json" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" + + mc "github.com/kplane-dev/apiserver/pkg/multicluster" +) + +// snapshotPath is the request URL after PathExtractor has stripped +// /clusters/{cid}/control-plane/. Hitting that URL returns an aggregate +// read of the live informer cache for that cluster. +const snapshotPath = "/snapshot" + +// matchSnapshot reports whether the request targets the snapshot endpoint +// (post-PathExtractor URL is exactly /snapshot). +func matchSnapshot(r *http.Request) bool { + return r != nil && r.URL.Path == snapshotPath +} + +// newSnapshotHandler returns an http.Handler that serves /snapshot using the +// given InformerRegistry. Callers are expected to wrap this handler in the +// apiserver's standard auth/audit/panic-recovery filter chain (see +// wrapClusterCRDHandler) before mounting it on a route. +func newSnapshotHandler(registry *mc.InformerRegistry) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + serveSnapshot(w, r, registry) + }) +} + +// snapshotResource is the per-resource section of a snapshot response. +type snapshotResource struct { + Group string `json:"group"` + Resource string `json:"resource"` + Synced bool `json:"synced"` + ItemCount int `json:"itemCount"` + Items []runtime.Object `json:"items"` +} + +// snapshotResponse is the wire shape of GET .../snapshot. +type snapshotResponse struct { + Cluster string `json:"cluster"` + SnapshotTime metav1.Time `json:"snapshotTime"` + LiveResources int `json:"liveResources"` + Resources []snapshotResource `json:"resources"` +} + +// serveSnapshot writes a JSON aggregate snapshot of every resource type for +// which the apiserver currently holds a live MultiClusterInformer for the +// cluster in the request context. Returns true if the request was handled. +// +// Query params: +// - resource=[,...] limit to a subset of resources +// - includeEmpty=true include resources with zero items +// - warm=true force creation of MCIs for all +// registered storages (off by default) +func serveSnapshot(w http.ResponseWriter, r *http.Request, registry *mc.InformerRegistry) bool { + if registry == nil || r.URL.Path != snapshotPath { + return false + } + cid, _, _ := mc.FromContext(r.Context()) + if cid == "" { + http.Error(w, "snapshot requires a cluster: use /clusters/{cluster}/control-plane/snapshot", http.StatusBadRequest) + return true + } + + q := r.URL.Query() + includeEmpty, _ := strconv.ParseBool(q.Get("includeEmpty")) + warm, _ := strconv.ParseBool(q.Get("warm")) + filter := map[string]struct{}{} + if v := q.Get("resource"); v != "" { + for _, s := range strings.Split(v, ",") { + if s = strings.TrimSpace(s); s != "" { + filter[s] = struct{}{} + } + } + } + + var grs []schema.GroupResource + if warm { + grs = registry.ListRegistered() + } else { + grs = registry.ListLive() + } + sort.Slice(grs, func(i, j int) bool { + if grs[i].Group == grs[j].Group { + return grs[i].Resource < grs[j].Resource + } + return grs[i].Group < grs[j].Group + }) + + resp := snapshotResponse{ + Cluster: cid, + SnapshotTime: metav1.Now(), + LiveResources: len(grs), + } + + for _, gr := range grs { + if len(filter) > 0 { + if _, ok := filter[gr.Resource]; !ok { + continue + } + } + + var objs []runtime.Object + var synced bool + if warm { + got, err := registry.Get(gr) + if err != nil { + klog.Warningf("mc.snapshot warm get failed cluster=%s gr=%s err=%v", cid, gr, err) + continue + } + objs = got.List(cid) + synced = got.HasSynced() + } else { + peek, ok := registry.Peek(gr) + if !ok || peek == nil { + continue + } + objs = peek.List(cid) + synced = peek.HasSynced() + } + + if res := buildSnapshotResource(gr, objs, synced, includeEmpty); res != nil { + resp.Resources = append(resp.Resources, *res) + } + } + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Snapshot-Cluster", cid) + w.Header().Set("X-Snapshot-Time", time.Now().UTC().Format(time.RFC3339Nano)) + if err := json.NewEncoder(w).Encode(resp); err != nil { + klog.Errorf("mc.snapshot encode failed cluster=%s err=%v", cid, err) + } + return true +} + +// buildSnapshotResource turns a list of runtime.Object into a snapshotResource +// suitable for JSON encoding. It strips managedFields to keep payloads small +// and skips empty resources unless includeEmpty is true. +func buildSnapshotResource(gr schema.GroupResource, objs []runtime.Object, synced, includeEmpty bool) *snapshotResource { + if len(objs) == 0 && !includeEmpty { + return nil + } + items := make([]runtime.Object, 0, len(objs)) + for _, obj := range objs { + if obj == nil { + continue + } + if accessor, err := meta.Accessor(obj); err == nil && accessor != nil { + accessor.SetManagedFields(nil) + } + items = append(items, obj) + } + return &snapshotResource{ + Group: gr.Group, + Resource: gr.Resource, + Synced: synced, + ItemCount: len(items), + Items: items, + } +} diff --git a/docs/snapshot-and-fleet.md b/docs/snapshot-and-fleet.md new file mode 100644 index 0000000..22ad6fd --- /dev/null +++ b/docs/snapshot-and-fleet.md @@ -0,0 +1,127 @@ +# Snapshot + Fleet (V0) + +This document describes two kplane-native primitives added to the apiserver: + +1. `GET /clusters/{cluster}/control-plane/snapshot` — an aggregate, in-memory + read of all live informer state for a virtual control plane. +2. `kind: Fleet` (group `kplane.dev/v1`) — a declarative way to provision N + virtual control planes from a template. + +Together they let an external client (a Python SDK, an RL training loop, an +agent platform) treat a VCP as a cheap, scoreable, forkable unit of work. + +## Why these primitives + +Existing AI "sandboxes" are Linux-shaped: an agent gets a shell, a filesystem, +and maybe a Jupyter kernel. That works for code execution, but it falls down +the moment an agent needs to deploy a service, wire pods together, share +state between sub-agents, or be granted scoped infrastructure capabilities by +a human. + +Kubernetes is the universal infrastructure API. kplane is the only system +that can hand a *real* control plane to every agent or RL rollout at single- +digit-millisecond latency and single-digit-megabyte memory cost. Snapshot + +Fleet turn that capability into a usable substrate: + +- **Snapshot** is the read primitive. It returns a structured view of the + current state of a plane — every resource the apiserver knows about — in a + single round-trip, served from the shared informer cache (no etcd hit). + This is what makes scoring an agent rollout cheap. +- **Fleet** is the write primitive. It lets a caller say "I want 10,000 + planes derived from this template" and have the apiserver pick IDs, prime + them with the standard bootstrap pipeline (system namespaces, RBAC, + default service, CRD runtime), and report readiness. + +V0 is deliberately small. We expose *capabilities*, not opinions: no +scenario DSL, no scoring DSL, no opinionated trajectory format. Callers +build those on top in whatever shape suits their training stack. + +## Architecture + +``` + ┌────────────────────────────────────────┐ +client (kubectl, │ kplane apiserver (single Go process) │ + SDK, controller) │ │ + │ ┌──────────────────────────────────┐ │ + /clusters/X/ │ │ multicluster routing │ │ + control-plane/ ├──┤ PathExtractor → cid in ctx │ │ + {api|snapshot} │ └──────────────┬───────────────────┘ │ + │ │ │ + │ ┌──────────────▼───────────────────┐ │ + │ │ dispatch │ │ + │ │ matchSnapshot? → snapshot │ │ + │ │ CRD path? → CRD runtime │ │ + │ │ else → kube REST │ │ + │ └──────────────┬───────────────────┘ │ + │ │ │ + │ ┌──────────────▼───────────────────┐ │ + │ │ shared InformerRegistry │ │ + │ │ one MCI per resource │ │ + │ │ List(cid) is in-memory │ │ + │ └──────────────────────────────────┘ │ + │ │ + │ ┌──────────────────────────────────┐ │ + │ │ FleetController (in-process) │ │ + │ │ installs CRD into root │ │ + │ │ watches Fleet objects │ │ + │ │ calls OnClusterSelected(cid) │ │ + │ └──────────────────────────────────┘ │ + └────────────────────────────────────────┘ +``` + +### Snapshot + +- Mounted in the same dispatch layer as CRD routing, so it sees the cluster + context that `WithClusterRouting` populates and runs through the standard + auth/audit/panic-recovery chain via `wrapClusterCRDHandler`. +- Iterates either `InformerRegistry.ListLive()` (default) or + `InformerRegistry.ListRegistered()` (`?warm=true`) and calls + `mci.List(cid)` for each resource. Reads never hit etcd. +- Strips `managedFields` from each object to keep payloads tractable. +- Reports a `synced: bool` per resource so callers can tell apart "no + objects" from "informer hasn't synced yet." + +### Fleet + +- `Fleet` is a cluster-scoped CRD in the `kplane.dev` API group. The CRD is + installed into the root control plane on startup by `FleetController.Start`. +- The controller uses a `dynamic` client + `dynamicinformer` against the + root cluster loopback. Reconciles are driven by a `workqueue` plus a + periodic resync to refresh per-member readiness. +- `EnsureCluster` is wired by `cmd/apiserver/app/config.go` to invoke the + composed `mcOpts.OnClusterSelected` — exactly the same pipeline that + organic traffic to a new cluster ID would trigger. This means a Fleet + member is bootstrapped identically to a member created by a `kubectl` + request landing on that path. +- Readiness probing uses each cluster's loopback kube client to call + `/readyz` with a short timeout. The `phase` transitions are coarse for + V0; structured conditions are a follow-up. + +## Out of scope for V0 + +These are deliberate omissions, not oversights: + +| Concern | V0 behavior | Follow-up | +|---|---|---| +| Scenario seeding (apply manifests into each Fleet member on creation) | Members boot empty | `Fleet.spec.template.objects` or a sidecar Bootstrapper | +| TTL-based destruction of Fleet members | `ttlSeconds` is parsed but not enforced; members linger | `FleetGCController` + per-plane delete primitive | +| Snapshot of CRD-defined types | Only core types appear in `/snapshot` | Merge `CRDRuntimeManager` projection into snapshot | +| Scoring DSL | Callers write asserts against `snapshotResponse` in their language of choice | Optional `ScoringPolicy` CRD if a pattern emerges | +| Trajectory format | Callers diff snapshots themselves | Optional sdec-framed delta stream | + +The intent is to ship the primitive, get it in front of real RL/agent +workloads, and let observed needs (not design committee opinions) drive the +shape of V1. + +## Files touched + +| File | Purpose | +|---|---| +| `pkg/multicluster/informer_registry.go` | `ListRegistered`, `ListLive`, `Peek` | +| `cmd/apiserver/app/snapshot.go` | `/snapshot` handler | +| `cmd/apiserver/app/config.go` | Mount snapshot + start FleetController | +| `pkg/apis/kplane/v1/{doc,types,crd}.go` | Fleet API types and CRD manifest | +| `pkg/multicluster/bootstrap/fleet_controller.go` | Fleet reconciler | +| `pkg/multicluster/bootstrap/fleet_controller_test.go` | Unit tests for member-ID derivation | +| `test/smoke/snapshot_test.go` | End-to-end snapshot smoke | +| `test/smoke/fleet_test.go` | End-to-end Fleet smoke | diff --git a/pkg/apis/kplane/v1/crd.go b/pkg/apis/kplane/v1/crd.go new file mode 100644 index 0000000..7cfd724 --- /dev/null +++ b/pkg/apis/kplane/v1/crd.go @@ -0,0 +1,121 @@ +/* +Copyright 2026 The kplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package v1 + +import ( + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// FleetCRDName is the metadata.name of the Fleet CRD. +const FleetCRDName = "fleets." + GroupName + +// FleetCRD returns the CustomResourceDefinition object that registers the +// Fleet resource in the root control plane. +// +// The CRD is intentionally permissive on schema fields (no maximums) so +// callers can experiment with large fleets in V0; the controller validates +// shape at reconcile time. +func FleetCRD() *apiextensionsv1.CustomResourceDefinition { + preserveUnknownFields := false + return &apiextensionsv1.CustomResourceDefinition{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apiextensions.k8s.io/v1", + Kind: "CustomResourceDefinition", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: FleetCRDName, + Labels: map[string]string{ + "app.kubernetes.io/managed-by": "kplane-apiserver", + "kplane.dev/native": "true", + }, + }, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: GroupName, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: "fleets", + Singular: "fleet", + Kind: "Fleet", + ListKind: "FleetList", + ShortNames: []string{"flt"}, + Categories: []string{"kplane"}, + }, + Scope: apiextensionsv1.ClusterScoped, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: Version, + Served: true, + Storage: true, + Subresources: &apiextensionsv1.CustomResourceSubresources{ + Status: &apiextensionsv1.CustomResourceSubresourceStatus{}, + }, + AdditionalPrinterColumns: []apiextensionsv1.CustomResourceColumnDefinition{ + {Name: "Desired", Type: "integer", JSONPath: ".spec.replicas"}, + {Name: "Ready", Type: "integer", JSONPath: ".status.readyReplicas"}, + {Name: "Age", Type: "date", JSONPath: ".metadata.creationTimestamp"}, + }, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + XPreserveUnknownFields: &preserveUnknownFields, + Required: []string{"spec"}, + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "spec": { + Type: "object", + Required: []string{"replicas"}, + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "replicas": { + Type: "integer", + Minimum: ptrFloat64(0), + Format: "int32", + }, + "namePrefix": {Type: "string"}, + "names": { + Type: "array", + Items: &apiextensionsv1.JSONSchemaPropsOrArray{ + Schema: &apiextensionsv1.JSONSchemaProps{Type: "string"}, + }, + }, + "ttlSeconds": {Type: "integer", Format: "int64", Minimum: ptrFloat64(0)}, + }, + }, + "status": { + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "observedGeneration": {Type: "integer", Format: "int64"}, + "readyReplicas": {Type: "integer", Format: "int32"}, + "members": { + Type: "array", + Items: &apiextensionsv1.JSONSchemaPropsOrArray{ + Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Required: []string{"clusterID", "phase"}, + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "clusterID": {Type: "string"}, + "phase": {Type: "string"}, + "message": {Type: "string"}, + "lastTransitionTime": {Type: "string", Format: "date-time"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } +} + +func ptrFloat64(v float64) *float64 { return &v } diff --git a/pkg/apis/kplane/v1/doc.go b/pkg/apis/kplane/v1/doc.go new file mode 100644 index 0000000..6f3972f --- /dev/null +++ b/pkg/apis/kplane/v1/doc.go @@ -0,0 +1,16 @@ +/* +Copyright 2026 The kplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +// Package v1 contains kplane-native API types served by the kplane apiserver. +// +// These types are installed as CRDs in the root control plane on apiserver +// startup, and provide management-plane primitives — like Fleet — for +// orchestrating virtual control planes (VCPs). +package v1 diff --git a/pkg/apis/kplane/v1/types.go b/pkg/apis/kplane/v1/types.go new file mode 100644 index 0000000..65ae7c2 --- /dev/null +++ b/pkg/apis/kplane/v1/types.go @@ -0,0 +1,119 @@ +/* +Copyright 2026 The kplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package v1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// GroupName is the kplane-native API group. Resources in this group are +// management-plane primitives; they live in the root control plane only. +const GroupName = "kplane.dev" + +// Version is the served version of the kplane API. +const Version = "v1" + +// SchemeGroupVersion is the group/version used to register kplane types. +var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: Version} + +// FleetGroupResource identifies the Fleet resource. +var FleetGroupResource = schema.GroupResource{Group: GroupName, Resource: "fleets"} + +// Fleet declares a desired number of virtual control planes derived from a +// template. The Fleet controller running inside the apiserver picks cluster +// IDs, primes them with the same bootstrap that organic traffic would +// trigger, and reports per-member readiness in Status. +type Fleet struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec FleetSpec `json:"spec,omitempty"` + Status FleetStatus `json:"status,omitempty"` +} + +// FleetSpec is the desired state of a Fleet. +// +// V0 semantics: a Fleet provisions empty VCPs (system namespaces, RBAC, +// default service). Scenario seeding, manifest application, and TTL-based +// destruction are not part of V0. +type FleetSpec struct { + // Replicas is the desired number of VCPs in this Fleet. + // Required. Must be >= 0. + Replicas int32 `json:"replicas"` + + // NamePrefix is prepended to the synthesized cluster IDs. If unset, + // "-" is used. Synthesized IDs are "" where + // index is a 4-digit zero-padded counter starting at 0000. + // Ignored when Names is non-empty. + NamePrefix string `json:"namePrefix,omitempty"` + + // Names overrides synthesized cluster IDs. When set, len(Names) must + // equal Replicas (validated by the controller). Each name must be a + // valid DNS label. + Names []string `json:"names,omitempty"` + + // TTLSeconds is reserved for future use. V0 does not garbage-collect + // VCPs when a Fleet is deleted; this field is parsed and stored but + // not yet enforced. + TTLSeconds *int64 `json:"ttlSeconds,omitempty"` +} + +// FleetMemberPhase is a coarse state for a single Fleet member. +type FleetMemberPhase string + +const ( + // FleetMemberPending means the bootstrap workers have not finished for + // this member yet. + FleetMemberPending FleetMemberPhase = "Pending" + // FleetMemberReady means the apiserver has finished priming the VCP and + // the cluster's /readyz responds 200. + FleetMemberReady FleetMemberPhase = "Ready" + // FleetMemberFailed indicates a non-recoverable bootstrap error for this + // member (current V0 surfaces this via the Message field; the controller + // will retry on the next resync). + FleetMemberFailed FleetMemberPhase = "Failed" +) + +// FleetMember is the observed state of a single VCP in a Fleet. +type FleetMember struct { + // ClusterID is the path segment under /clusters/{id}/control-plane/... + ClusterID string `json:"clusterID"` + // Phase is a coarse readiness state. + Phase FleetMemberPhase `json:"phase"` + // Message is a free-form description of the last reconcile outcome for + // this member. Useful for debugging V0; will be replaced by structured + // conditions in a future version. + Message string `json:"message,omitempty"` + // LastTransitionTime is when Phase last changed. + LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` +} + +// FleetStatus is the observed state of a Fleet. +type FleetStatus struct { + // ObservedGeneration is the Fleet metadata.generation that the + // controller last reconciled. + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + + // ReadyReplicas is the number of members currently in the Ready phase. + ReadyReplicas int32 `json:"readyReplicas,omitempty"` + + // Members is the per-VCP detail for every desired member. + Members []FleetMember `json:"members,omitempty"` +} + +// FleetList is the list type for Fleet. +type FleetList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + + Items []Fleet `json:"items"` +} diff --git a/pkg/multicluster/bootstrap/fleet_controller.go b/pkg/multicluster/bootstrap/fleet_controller.go new file mode 100644 index 0000000..1089ae0 --- /dev/null +++ b/pkg/multicluster/bootstrap/fleet_controller.go @@ -0,0 +1,545 @@ +/* +Copyright 2026 The kplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package bootstrap + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + kplanev1 "github.com/kplane-dev/apiserver/pkg/apis/kplane/v1" + mc "github.com/kplane-dev/apiserver/pkg/multicluster" +) + +const ( + fleetWorkerCount = 4 + fleetResyncPeriod = 30 * time.Second + fleetCRDPollEvery = 250 * time.Millisecond + fleetCRDPollTotal = 60 * time.Second +) + +// FleetControllerOptions configures a FleetController. +type FleetControllerOptions struct { + // RootCluster is the cluster ID of the root control plane where Fleet + // objects live. + RootCluster string + + // BaseLoopbackClientConfig is the apiserver's loopback client config. + // The controller derives root-scoped clients from it. + BaseLoopbackClientConfig *rest.Config + + // PathPrefix and ControlPlaneSegment configure cluster URL routing + // (default: "/clusters/" and "control-plane"). + PathPrefix string + ControlPlaneSegment string + + // EnsureCluster is the per-member bootstrap hook. The controller invokes + // it for every desired Fleet member, exactly like organic traffic would + // trigger via mcOpts.OnClusterSelected. + EnsureCluster func(clusterID string) + + // ClientForCluster returns a kube client scoped to a given VCP. + // Used by status reconciliation to probe per-cluster readiness. + ClientForCluster func(clusterID string) (kubernetes.Interface, error) + + // ResyncInterval controls how often each known Fleet is re-enqueued to + // refresh its readiness. Defaults to 30s. + ResyncInterval time.Duration +} + +// FleetController watches Fleet objects in the root control plane and +// reconciles them by priming N virtual control planes via EnsureCluster. +type FleetController struct { + opts FleetControllerOptions + + rootHost string + + apiext apiextensionsclientset.Interface + dyn dynamic.Interface + factory dynamicinformer.DynamicSharedInformerFactory + + informer cache.SharedIndexInformer + queue workqueue.TypedRateLimitingInterface[string] + + started atomic.Bool + + mu sync.Mutex + known map[string]struct{} + stopCh <-chan struct{} + resyncCh chan struct{} +} + +// NewFleetController constructs a controller. It does not start any workers; +// call Start to install the Fleet CRD and begin reconciling. +func NewFleetController(opts FleetControllerOptions) (*FleetController, error) { + if opts.BaseLoopbackClientConfig == nil { + return nil, fmt.Errorf("FleetController: BaseLoopbackClientConfig is required") + } + if opts.RootCluster == "" { + opts.RootCluster = mc.DefaultClusterName + } + if opts.ResyncInterval <= 0 { + opts.ResyncInterval = fleetResyncPeriod + } + + rootCfg := rest.CopyConfig(opts.BaseLoopbackClientConfig) + host, err := mc.ClusterHost(rootCfg.Host, mc.Options{ + PathPrefix: opts.PathPrefix, + ControlPlaneSegment: opts.ControlPlaneSegment, + }, opts.RootCluster) + if err != nil { + return nil, fmt.Errorf("FleetController: build root host: %w", err) + } + rootCfg.Host = host + + apiext, err := apiextensionsclientset.NewForConfig(rootCfg) + if err != nil { + return nil, fmt.Errorf("FleetController: apiextensions client: %w", err) + } + dyn, err := dynamic.NewForConfig(rootCfg) + if err != nil { + return nil, fmt.Errorf("FleetController: dynamic client: %w", err) + } + + c := &FleetController{ + opts: opts, + rootHost: rootCfg.Host, + apiext: apiext, + dyn: dyn, + known: map[string]struct{}{}, + queue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[string](), + workqueue.TypedRateLimitingQueueConfig[string]{Name: "mc_fleet_controller"}, + ), + } + return c, nil +} + +// Start installs the Fleet CRD into the root control plane (idempotently) and +// then begins reconciling Fleet objects. It returns immediately; work runs in +// background goroutines until stopCh is closed. +func (c *FleetController) Start(stopCh <-chan struct{}) { + if c == nil { + return + } + if !c.started.CompareAndSwap(false, true) { + return + } + c.mu.Lock() + c.stopCh = stopCh + c.mu.Unlock() + go c.runLifecycle(stopCh) +} + +func (c *FleetController) runLifecycle(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + if err := c.installCRD(stopCh); err != nil { + klog.Errorf("mc.fleet CRD install failed: %v", err) + return + } + if err := c.waitForCRDEstablished(stopCh); err != nil { + klog.Errorf("mc.fleet CRD never became established: %v", err) + return + } + + c.factory = dynamicinformer.NewDynamicSharedInformerFactory(c.dyn, c.opts.ResyncInterval) + gvr := schema.GroupVersionResource{ + Group: kplanev1.GroupName, + Version: kplanev1.Version, + Resource: "fleets", + } + c.informer = c.factory.ForResource(gvr).Informer() + _, _ = c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.onFleetAdd, + UpdateFunc: c.onFleetUpdate, + DeleteFunc: c.onFleetDelete, + }) + + c.factory.Start(stopCh) + if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { + klog.Warningf("mc.fleet informer cache sync aborted") + return + } + + for i := 0; i < fleetWorkerCount; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + go c.runPeriodicResync(stopCh) + + <-stopCh + c.queue.ShutDown() +} + +func (c *FleetController) installCRD(stopCh <-chan struct{}) error { + desired := kplanev1.FleetCRD() + ctx, cancel := contextFromStop(stopCh, 30*time.Second) + defer cancel() + + existing, err := c.apiext.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, desired.Name, metav1.GetOptions{}) + switch { + case apierrors.IsNotFound(err): + _, createErr := c.apiext.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, desired, metav1.CreateOptions{}) + if createErr != nil && !apierrors.IsAlreadyExists(createErr) { + return fmt.Errorf("create CRD %s: %w", desired.Name, createErr) + } + klog.Infof("mc.fleet installed CRD %s", desired.Name) + return nil + case err != nil: + return fmt.Errorf("get CRD %s: %w", desired.Name, err) + } + + desired.ResourceVersion = existing.ResourceVersion + if _, err := c.apiext.ApiextensionsV1().CustomResourceDefinitions().Update(ctx, desired, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("update CRD %s: %w", desired.Name, err) + } + klog.Infof("mc.fleet updated CRD %s", desired.Name) + return nil +} + +func (c *FleetController) waitForCRDEstablished(stopCh <-chan struct{}) error { + deadline := time.Now().Add(fleetCRDPollTotal) + for { + select { + case <-stopCh: + return fmt.Errorf("stopped while waiting for CRD") + default: + } + ctx, cancel := contextFromStop(stopCh, 5*time.Second) + crd, err := c.apiext.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, kplanev1.FleetCRDName, metav1.GetOptions{}) + cancel() + if err == nil { + for _, cond := range crd.Status.Conditions { + if cond.Type == apiextensionsv1.Established && cond.Status == apiextensionsv1.ConditionTrue { + return nil + } + } + } + if time.Now().After(deadline) { + return fmt.Errorf("timed out after %s", fleetCRDPollTotal) + } + time.Sleep(fleetCRDPollEvery) + } +} + +func (c *FleetController) onFleetAdd(obj interface{}) { c.enqueue(obj) } +func (c *FleetController) onFleetUpdate(_, obj interface{}) { c.enqueue(obj) } +func (c *FleetController) onFleetDelete(obj interface{}) { + if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = d.Obj + } + u, ok := obj.(*unstructured.Unstructured) + if !ok { + return + } + name := u.GetName() + c.mu.Lock() + delete(c.known, name) + c.mu.Unlock() + klog.V(2).Infof("mc.fleet observed deletion fleet=%s (V0 does not GC member VCPs)", name) +} + +func (c *FleetController) enqueue(obj interface{}) { + u, ok := obj.(*unstructured.Unstructured) + if !ok { + return + } + name := u.GetName() + if name == "" { + return + } + c.mu.Lock() + c.known[name] = struct{}{} + c.mu.Unlock() + c.queue.Add(name) +} + +func (c *FleetController) runPeriodicResync(stopCh <-chan struct{}) { + ticker := time.NewTicker(c.opts.ResyncInterval) + defer ticker.Stop() + for { + select { + case <-stopCh: + return + case <-ticker.C: + c.mu.Lock() + for name := range c.known { + c.queue.Add(name) + } + c.mu.Unlock() + } + } +} + +func (c *FleetController) runWorker() { + for { + key, quit := c.queue.Get() + if quit { + return + } + func() { + defer c.queue.Done(key) + if err := c.reconcile(key); err != nil { + klog.Errorf("mc.fleet reconcile failed fleet=%s err=%v", key, err) + c.queue.AddRateLimited(key) + return + } + c.queue.Forget(key) + }() + } +} + +// reconcile is the per-Fleet worker. It computes desired members, primes them +// via EnsureCluster, probes readiness, and updates the Fleet status. +func (c *FleetController) reconcile(name string) error { + gvr := schema.GroupVersionResource{ + Group: kplanev1.GroupName, + Version: kplanev1.Version, + Resource: "fleets", + } + ctx, cancel := contextFromStop(c.stopChSnapshot(), 30*time.Second) + defer cancel() + + obj, err := c.dyn.Resource(gvr).Get(ctx, name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return fmt.Errorf("get fleet: %w", err) + } + + fleet, err := decodeFleet(obj) + if err != nil { + return fmt.Errorf("decode fleet: %w", err) + } + + desired, derr := desiredMemberIDs(fleet) + if derr != nil { + return c.writeStatus(ctx, gvr, fleet, kplanev1.FleetStatus{ + ObservedGeneration: fleet.Generation, + Members: []kplanev1.FleetMember{{ + ClusterID: "", + Phase: kplanev1.FleetMemberFailed, + Message: derr.Error(), + LastTransitionTime: metav1.Now(), + }}, + }) + } + + // Trigger bootstrap for every desired member. Ensure is idempotent. + if c.opts.EnsureCluster != nil { + for _, cid := range desired { + c.opts.EnsureCluster(cid) + } + } + + // Probe readiness with bounded concurrency. + members := c.probeMembers(ctx, fleet, desired) + + ready := int32(0) + for _, m := range members { + if m.Phase == kplanev1.FleetMemberReady { + ready++ + } + } + + return c.writeStatus(ctx, gvr, fleet, kplanev1.FleetStatus{ + ObservedGeneration: fleet.Generation, + ReadyReplicas: ready, + Members: members, + }) +} + +func (c *FleetController) probeMembers(ctx context.Context, fleet *kplanev1.Fleet, desired []string) []kplanev1.FleetMember { + out := make([]kplanev1.FleetMember, len(desired)) + prevByID := map[string]kplanev1.FleetMember{} + for _, m := range fleet.Status.Members { + prevByID[m.ClusterID] = m + } + + type result struct { + idx int + m kplanev1.FleetMember + } + results := make(chan result, len(desired)) + sem := make(chan struct{}, 8) + var wg sync.WaitGroup + + for i, cid := range desired { + wg.Add(1) + go func(i int, cid string) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + + phase, msg := c.probeOne(ctx, cid) + m := kplanev1.FleetMember{ + ClusterID: cid, + Phase: phase, + Message: msg, + } + if prev, ok := prevByID[cid]; ok && prev.Phase == phase { + m.LastTransitionTime = prev.LastTransitionTime + } else { + m.LastTransitionTime = metav1.Now() + } + results <- result{idx: i, m: m} + }(i, cid) + } + wg.Wait() + close(results) + + for r := range results { + out[r.idx] = r.m + } + return out +} + +func (c *FleetController) probeOne(ctx context.Context, cid string) (kplanev1.FleetMemberPhase, string) { + if c.opts.ClientForCluster == nil { + return kplanev1.FleetMemberPending, "no client factory configured" + } + cs, err := c.opts.ClientForCluster(cid) + if err != nil { + return kplanev1.FleetMemberPending, fmt.Sprintf("client: %v", err) + } + probeCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + body, err := cs.Discovery().RESTClient().Get().AbsPath("/readyz").DoRaw(probeCtx) + if err != nil { + return kplanev1.FleetMemberPending, fmt.Sprintf("readyz: %v", err) + } + if string(body) == "ok" || len(body) == 0 { + return kplanev1.FleetMemberReady, "" + } + return kplanev1.FleetMemberReady, string(body) +} + +func (c *FleetController) writeStatus(ctx context.Context, gvr schema.GroupVersionResource, fleet *kplanev1.Fleet, status kplanev1.FleetStatus) error { + // Build an unstructured patch with only the status subresource fields. + statusMap, err := runtimeToMap(status) + if err != nil { + return fmt.Errorf("encode status: %w", err) + } + patch := map[string]interface{}{ + "status": statusMap, + } + patchBytes, err := json.Marshal(patch) + if err != nil { + return fmt.Errorf("marshal status patch: %w", err) + } + + _, err = c.dyn.Resource(gvr).Patch( + ctx, fleet.Name, types.MergePatchType, patchBytes, + metav1.PatchOptions{}, "status", + ) + if apierrors.IsNotFound(err) { + return nil + } + return err +} + +// stopChSnapshot returns the current stopCh under lock so the worker can use +// it to derive request contexts. +func (c *FleetController) stopChSnapshot() <-chan struct{} { + c.mu.Lock() + defer c.mu.Unlock() + return c.stopCh +} + +// desiredMemberIDs computes the list of cluster IDs for a Fleet. +func desiredMemberIDs(fleet *kplanev1.Fleet) ([]string, error) { + if fleet.Spec.Replicas < 0 { + return nil, fmt.Errorf("spec.replicas must be >= 0 (got %d)", fleet.Spec.Replicas) + } + if len(fleet.Spec.Names) > 0 { + if int32(len(fleet.Spec.Names)) != fleet.Spec.Replicas { + return nil, fmt.Errorf("spec.names length (%d) must equal spec.replicas (%d)", len(fleet.Spec.Names), fleet.Spec.Replicas) + } + out := make([]string, len(fleet.Spec.Names)) + copy(out, fleet.Spec.Names) + return out, nil + } + prefix := fleet.Spec.NamePrefix + if prefix == "" { + prefix = fleet.Name + "-" + } + out := make([]string, fleet.Spec.Replicas) + for i := int32(0); i < fleet.Spec.Replicas; i++ { + out[i] = fmt.Sprintf("%s%04d", prefix, i) + } + return out, nil +} + +// decodeFleet turns an unstructured Fleet object into our typed struct. +func decodeFleet(u *unstructured.Unstructured) (*kplanev1.Fleet, error) { + if u == nil { + return nil, fmt.Errorf("nil object") + } + b, err := u.MarshalJSON() + if err != nil { + return nil, err + } + var f kplanev1.Fleet + if err := json.Unmarshal(b, &f); err != nil { + return nil, err + } + return &f, nil +} + +// runtimeToMap converts any JSON-serializable struct into map[string]any. +func runtimeToMap(v interface{}) (map[string]interface{}, error) { + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + out := map[string]interface{}{} + if err := json.Unmarshal(b, &out); err != nil { + return nil, err + } + return out, nil +} + +// contextFromStop derives a context that respects both stopCh and a timeout. +func contextFromStop(stopCh <-chan struct{}, timeout time.Duration) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + if stopCh == nil { + return ctx, cancel + } + go func() { + select { + case <-stopCh: + cancel() + case <-ctx.Done(): + } + }() + return ctx, cancel +} diff --git a/pkg/multicluster/bootstrap/fleet_controller_test.go b/pkg/multicluster/bootstrap/fleet_controller_test.go new file mode 100644 index 0000000..a7dc22c --- /dev/null +++ b/pkg/multicluster/bootstrap/fleet_controller_test.go @@ -0,0 +1,107 @@ +/* +Copyright 2026 The kplane Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package bootstrap + +import ( + "reflect" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + kplanev1 "github.com/kplane-dev/apiserver/pkg/apis/kplane/v1" +) + +func TestDesiredMemberIDs(t *testing.T) { + t.Parallel() + tests := []struct { + name string + fleet *kplanev1.Fleet + want []string + wantErr bool + }{ + { + name: "synthesized names with default prefix", + fleet: &kplanev1.Fleet{ + ObjectMeta: metav1.ObjectMeta{Name: "rl"}, + Spec: kplanev1.FleetSpec{Replicas: 3}, + }, + want: []string{"rl-0000", "rl-0001", "rl-0002"}, + }, + { + name: "synthesized names with custom prefix", + fleet: &kplanev1.Fleet{ + ObjectMeta: metav1.ObjectMeta{Name: "rl"}, + Spec: kplanev1.FleetSpec{Replicas: 2, NamePrefix: "tenant-"}, + }, + want: []string{"tenant-0000", "tenant-0001"}, + }, + { + name: "explicit names override prefix", + fleet: &kplanev1.Fleet{ + ObjectMeta: metav1.ObjectMeta{Name: "rl"}, + Spec: kplanev1.FleetSpec{ + Replicas: 2, + Names: []string{"alpha", "bravo"}, + }, + }, + want: []string{"alpha", "bravo"}, + }, + { + name: "explicit names with length mismatch is rejected", + fleet: &kplanev1.Fleet{ + ObjectMeta: metav1.ObjectMeta{Name: "rl"}, + Spec: kplanev1.FleetSpec{ + Replicas: 3, + Names: []string{"alpha", "bravo"}, + }, + }, + wantErr: true, + }, + { + name: "negative replicas is rejected", + fleet: &kplanev1.Fleet{ + ObjectMeta: metav1.ObjectMeta{Name: "rl"}, + Spec: kplanev1.FleetSpec{Replicas: -1}, + }, + wantErr: true, + }, + { + name: "zero replicas yields empty slice", + fleet: &kplanev1.Fleet{ + ObjectMeta: metav1.ObjectMeta{Name: "rl"}, + Spec: kplanev1.FleetSpec{Replicas: 0}, + }, + want: []string{}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got, err := desiredMemberIDs(tc.fleet) + if tc.wantErr { + if err == nil { + t.Fatalf("expected error, got nil (out=%v)", got) + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) == 0 && len(tc.want) == 0 { + return + } + if !reflect.DeepEqual(got, tc.want) { + t.Fatalf("got %v, want %v", got, tc.want) + } + }) + } +} diff --git a/pkg/multicluster/informer_registry.go b/pkg/multicluster/informer_registry.go index f27f2d6..207d6d1 100644 --- a/pkg/multicluster/informer_registry.go +++ b/pkg/multicluster/informer_registry.go @@ -40,6 +40,43 @@ func (r *InformerRegistry) RegisterStorage(gr schema.GroupResource, cs *clustere klog.V(2).Infof("mc.informerRegistry registered storage for %s", gr) } +// ListRegistered returns the GroupResources for which storage has been +// registered. Storage registration happens lazily as the apiserver wires up +// each resource, so this set grows over the server lifetime. The returned +// slice is a copy and safe to retain. +func (r *InformerRegistry) ListRegistered() []schema.GroupResource { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]schema.GroupResource, 0, len(r.storages)) + for gr := range r.storages { + out = append(out, gr) + } + return out +} + +// ListLive returns the GroupResources for which a MultiClusterInformer has +// already been created (i.e. at least one consumer has read or watched the +// resource). Reading from these MCIs does not start additional watches. +func (r *InformerRegistry) ListLive() []schema.GroupResource { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]schema.GroupResource, 0, len(r.informers)) + for gr := range r.informers { + out = append(out, gr) + } + return out +} + +// Peek returns the MultiClusterInformer for a resource if one has already been +// created, without triggering cacher/MCI creation. Returns (nil, false) if no +// MCI exists yet for the resource. +func (r *InformerRegistry) Peek(gr schema.GroupResource) (*informer.MultiClusterInformer, bool) { + r.mu.Lock() + defer r.mu.Unlock() + mci, ok := r.informers[gr] + return mci, ok +} + // Get returns (or creates) the MultiClusterInformer for a resource. // Forces cacher creation via ensureStore() if needed. func (r *InformerRegistry) Get(gr schema.GroupResource) (*informer.MultiClusterInformer, error) { diff --git a/test/smoke/fleet_test.go b/test/smoke/fleet_test.go new file mode 100644 index 0000000..4589fdc --- /dev/null +++ b/test/smoke/fleet_test.go @@ -0,0 +1,97 @@ +package smoke + +import ( + "context" + "os" + "testing" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + + kplanev1 "github.com/kplane-dev/apiserver/pkg/apis/kplane/v1" +) + +var fleetGVR = schema.GroupVersionResource{ + Group: kplanev1.GroupName, + Version: kplanev1.Version, + Resource: "fleets", +} + +// TestFleetCreatesMemberVCPs creates a Fleet object in the root control plane +// and verifies that the FleetController primes the member VCPs and reports +// them ready in status. +func TestFleetCreatesMemberVCPs(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServer(t, etcd) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + dyn := dynamicClientForCluster(t, s, s.root) + + // Wait for the Fleet CRD to be registered by the controller. It runs + // asynchronously after apiserver startup. + if err := waitForFleetCRD(ctx, dyn); err != nil { + t.Fatalf("Fleet CRD never registered: %v\nlogs:\n%s", err, s.logs()) + } + + fleetName := "smoke-" + randSuffix(3) + obj := &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": kplanev1.GroupName + "/" + kplanev1.Version, + "kind": "Fleet", + "metadata": map[string]interface{}{"name": fleetName}, + "spec": map[string]interface{}{ + "replicas": int64(2), + "namePrefix": fleetName + "-", + }, + }} + + if _, err := dyn.Resource(fleetGVR).Create(ctx, obj, metav1.CreateOptions{}); err != nil { + t.Fatalf("create Fleet: %v", err) + } + t.Cleanup(func() { + _ = dyn.Resource(fleetGVR).Delete(context.Background(), fleetName, metav1.DeleteOptions{}) + }) + + deadline := time.Now().Add(90 * time.Second) + var lastReady int64 + for time.Now().Before(deadline) { + got, err := dyn.Resource(fleetGVR).Get(ctx, fleetName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + time.Sleep(500 * time.Millisecond) + continue + } + t.Fatalf("get Fleet: %v", err) + } + ready, _, _ := unstructured.NestedInt64(got.Object, "status", "readyReplicas") + lastReady = ready + if ready >= 2 { + members, _, _ := unstructured.NestedSlice(got.Object, "status", "members") + if len(members) == 2 { + return + } + } + time.Sleep(750 * time.Millisecond) + } + + t.Fatalf("Fleet %s did not reach readyReplicas=2 in 90s (last=%d)\nlogs:\n%s", + fleetName, lastReady, s.logs()) +} + +func waitForFleetCRD(ctx context.Context, dyn dynamic.Interface) error { + deadline := time.Now().Add(60 * time.Second) + for time.Now().Before(deadline) { + _, err := dyn.Resource(fleetGVR).List(ctx, metav1.ListOptions{Limit: 1}) + if err == nil { + return nil + } + time.Sleep(500 * time.Millisecond) + } + return context.DeadlineExceeded +} + diff --git a/test/smoke/snapshot_test.go b/test/smoke/snapshot_test.go new file mode 100644 index 0000000..01436e2 --- /dev/null +++ b/test/smoke/snapshot_test.go @@ -0,0 +1,145 @@ +package smoke + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// TestSnapshotEndpoint creates a few resources in a VCP and verifies that +// GET /clusters/{cid}/control-plane/snapshot returns them from the live +// informer cache. +func TestSnapshotEndpoint(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServer(t, etcd) + + const cid = "snap-vcp" + s.waitReady(t, cid) + + cs := kubeClientForCluster(t, s, cid) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // Create a couple of objects so the cache has something to show. + if err := waitForNamespace(ctx, cs, "default"); err != nil { + t.Fatalf("default namespace never appeared: %v", err) + } + cmName := "snap-cm-" + randSuffix(4) + if _, err := cs.CoreV1().ConfigMaps("default").Create(ctx, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: cmName}, + Data: map[string]string{"hello": "kplane"}, + }, metav1.CreateOptions{}); err != nil { + t.Fatalf("create configmap: %v", err) + } + + // Listing once warms the MultiClusterInformer for configmaps so it + // shows up in the snapshot. The snapshot endpoint deliberately only + // reports live MCIs to avoid surprise watches. + if _, err := cs.CoreV1().ConfigMaps("default").List(ctx, metav1.ListOptions{}); err != nil { + t.Fatalf("list configmaps to warm cache: %v", err) + } + if _, err := cs.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}); err != nil { + t.Fatalf("list namespaces to warm cache: %v", err) + } + + // Give the informer a brief moment to absorb the writes. + time.Sleep(500 * time.Millisecond) + + snap := mustFetchSnapshot(t, s, cid, "") + if snap.Cluster != cid { + t.Fatalf("snapshot cluster mismatch: got %q want %q", snap.Cluster, cid) + } + if snap.LiveResources == 0 { + t.Fatalf("expected at least one live resource, got 0") + } + + found := false + for _, r := range snap.Resources { + if r.Resource == "configmaps" { + for _, item := range r.Items { + name, _ := nestedString(item, "metadata", "name") + if name == cmName { + found = true + break + } + } + } + } + if !found { + body, _ := json.MarshalIndent(snap, "", " ") + t.Fatalf("snapshot did not contain configmap %q\nresponse:\n%s", cmName, string(body)) + } + + // resource= filter narrows the response. + filtered := mustFetchSnapshot(t, s, cid, "?resource=configmaps") + for _, r := range filtered.Resources { + if r.Resource != "configmaps" { + t.Fatalf("filter=configmaps returned unexpected resource %q", r.Resource) + } + } +} + +type snapshotResp struct { + Cluster string `json:"cluster"` + SnapshotTime string `json:"snapshotTime"` + LiveResources int `json:"liveResources"` + Resources []struct { + Group string `json:"group"` + Resource string `json:"resource"` + Synced bool `json:"synced"` + ItemCount int `json:"itemCount"` + Items []map[string]interface{} `json:"items"` + } `json:"resources"` +} + +func mustFetchSnapshot(t *testing.T, s *testAPIServer, cid, query string) snapshotResp { + t.Helper() + url := fmt.Sprintf("%s/snapshot%s", s.clusterURL(cid), query) + client := &http.Client{ + Timeout: 15 * time.Second, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec + }, + } + req, _ := http.NewRequest(http.MethodGet, url, nil) + req.Header.Set("Authorization", "Bearer smoketoken") + resp, err := client.Do(req) + if err != nil { + t.Fatalf("fetch snapshot: %v", err) + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != 200 { + t.Fatalf("snapshot status=%d body=%s", resp.StatusCode, string(body)) + } + var out snapshotResp + if err := json.Unmarshal(body, &out); err != nil { + t.Fatalf("decode snapshot: %v\nbody=%s", err, string(body)) + } + return out +} + +func nestedString(m map[string]interface{}, path ...string) (string, bool) { + var cur interface{} = m + for _, p := range path { + mp, ok := cur.(map[string]interface{}) + if !ok { + return "", false + } + cur, ok = mp[p] + if !ok { + return "", false + } + } + s, ok := cur.(string) + return s, ok +}