diff --git a/cmd/servers/ateapi/ateapi.go b/cmd/servers/ateapi/ateapi.go index 88deae0..c8d9dae 100644 --- a/cmd/servers/ateapi/ateapi.go +++ b/cmd/servers/ateapi/ateapi.go @@ -276,7 +276,7 @@ func main() { ateFactory.WaitForCacheSync(stopCh) dialer := controlapi.NewAteletDialer(workerPodInformer.GetIndexer(), ateletPodInformer.GetIndexer()) - sm := controlapi.NewService(redisPersistence, actorTemplateLister, dialer) + sm := controlapi.NewService(redisPersistence, actorTemplateLister, dialer, clientset) sessionIdentitySrv := sessionidentity.New(*clientJWTIssuer, *clientJWTAudience, *sessionIDJWTPoolFile, *sessionIDCAPoolFile, *workerpoolCACerts) diff --git a/cmd/servers/ateapi/controlapi/functional_test.go b/cmd/servers/ateapi/controlapi/functional_test.go index 1b43f54..460d3ac 100644 --- a/cmd/servers/ateapi/controlapi/functional_test.go +++ b/cmd/servers/ateapi/controlapi/functional_test.go @@ -40,6 +40,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/testing/protocmp" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -145,13 +146,16 @@ type FakeAteletServer struct { Lock sync.Mutex - RunCalled bool + RunCalled bool + RunRequest *ateletpb.RunRequest - CheckpointCalled bool + CheckpointCalled bool + CheckpointRequest *ateletpb.CheckpointRequest - RestoreCalled bool - FailRestore error - RestoreDelay time.Duration + RestoreCalled bool + RestoreRequest *ateletpb.RestoreRequest + FailRestore error + RestoreDelay time.Duration } func (f *FakeAteletServer) Reset() { @@ -159,10 +163,13 @@ func (f *FakeAteletServer) Reset() { defer f.Lock.Unlock() f.RunCalled = false + f.RunRequest = nil f.CheckpointCalled = false + f.CheckpointRequest = nil f.RestoreCalled = false + f.RestoreRequest = nil f.FailRestore = nil f.RestoreDelay = 0 } @@ -172,6 +179,7 @@ func (f *FakeAteletServer) Run(ctx context.Context, req *ateletpb.RunRequest) (* defer f.Lock.Unlock() f.RunCalled = true + f.RunRequest = proto.Clone(req).(*ateletpb.RunRequest) return &ateletpb.RunResponse{}, nil } @@ -181,6 +189,7 @@ func (f *FakeAteletServer) Checkpoint(ctx context.Context, req *ateletpb.Checkpo defer f.Lock.Unlock() f.CheckpointCalled = true + f.CheckpointRequest = proto.Clone(req).(*ateletpb.CheckpointRequest) return &ateletpb.CheckpointResponse{}, nil } @@ -190,6 +199,7 @@ func (f *FakeAteletServer) Restore(ctx context.Context, req *ateletpb.RestoreReq defer f.Lock.Unlock() f.RestoreCalled = true + f.RestoreRequest = proto.Clone(req).(*ateletpb.RestoreRequest) if f.RestoreDelay > 0 { time.Sleep(f.RestoreDelay) } @@ -199,6 +209,16 @@ func (f *FakeAteletServer) Restore(ctx context.Context, req *ateletpb.RestoreReq return &ateletpb.RestoreResponse{}, nil } +func (f *FakeAteletServer) lastRestoreRequest() *ateletpb.RestoreRequest { + f.Lock.Lock() + defer f.Lock.Unlock() + + if f.RestoreRequest == nil { + return nil + } + return proto.Clone(f.RestoreRequest).(*ateletpb.RestoreRequest) +} + type testContext struct { mr *miniredis.Miniredis service *Service @@ -260,7 +280,7 @@ func setupTest(t *testing.T, ns string) *testContext { // 4. Initialize Service dialer := NewAteletDialer(workerInformer.GetIndexer(), ateletInformer.GetIndexer()) - service := NewService(persistence, actorTemplateLister, dialer) + service := NewService(persistence, actorTemplateLister, dialer, k8sClient) // 5. Start REAL gRPC Server for ATE API grpcServer := grpc.NewServer(grpc.UnaryInterceptor(StatusErrorInterceptor)) @@ -329,6 +349,17 @@ func namespaceForTest(baseName string) string { } func createTemplate(t *testing.T, tc *testContext, ns string) { + t.Helper() + createTemplateWithContainers(t, tc, ns, []atev1alpha1.Container{ + { + Name: "main", + Image: "main", + Command: []string{"/main"}, + }, + }) +} + +func createTemplateWithContainers(t *testing.T, tc *testContext, ns string, containers []atev1alpha1.Container) { t.Helper() actorTemplate := &atev1alpha1.ActorTemplate{ ObjectMeta: metav1.ObjectMeta{ @@ -343,13 +374,7 @@ func createTemplate(t *testing.T, tc *testContext, ns string) { }, }, PauseImage: "pause", - Containers: []atev1alpha1.Container{ - { - Name: "main", - Image: "main", - Command: []string{"/main"}, - }, - }, + Containers: containers, WorkerPoolRef: corev1.ObjectReference{ Namespace: ns, Name: "pool1", @@ -767,6 +792,106 @@ func TestResumeActor(t *testing.T) { } } +func TestResumeActorResolvesValueFromEnv(t *testing.T) { + ns := namespaceForTest("ns-resume-secret-env") + tc := setupTest(t, ns) + defer tc.cleanup() + + _, err := tc.k8sClient.CoreV1().ConfigMaps(ns).Create(context.Background(), &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "settings", + Namespace: ns, + }, + Data: map[string]string{ + "interval": "45", + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create ConfigMap: %v", err) + } + + _, err = tc.k8sClient.CoreV1().Secrets(ns).Create(context.Background(), &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api-keys", + Namespace: ns, + }, + Data: map[string][]byte{ + "anthropic": []byte("sk-test"), + }, + }, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create secret: %v", err) + } + + createTemplateWithContainers(t, tc, ns, []atev1alpha1.Container{ + { + Name: "main", + Image: "main", + Command: []string{"/main"}, + Env: []corev1.EnvVar{ + { + Name: "LITERAL", + Value: "plain", + }, + { + Name: "INTERVAL_SECONDS", + ValueFrom: &corev1.EnvVarSource{ + ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "settings"}, + Key: "interval", + }, + }, + }, + { + Name: "ANTHROPIC_API_KEY", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "api-keys"}, + Key: "anthropic", + }, + }, + }, + }, + }, + }) + createWorkerPod(t, tc, ns, "worker-1", "node1") + + _, err = tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + ActorTemplateNamespace: ns, + ActorTemplateName: "tmpl1", + ActorId: "id1", + }) + if err != nil { + t.Fatalf("CreateActor failed: %v", err) + } + _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ + ActorId: "id1", + }) + if err != nil { + t.Fatalf("ResumeActor failed: %v", err) + } + + restoreReq := tc.fakeAtelet.lastRestoreRequest() + if restoreReq == nil { + t.Fatalf("expected Restore to be called") + } + if len(restoreReq.GetSpec().GetContainers()) != 1 { + t.Fatalf("expected one container in restore request, got %d", len(restoreReq.GetSpec().GetContainers())) + } + gotEnv := map[string]string{} + for _, env := range restoreReq.GetSpec().GetContainers()[0].GetEnv() { + gotEnv[env.GetName()] = env.GetValue() + } + wantEnv := map[string]string{ + "LITERAL": "plain", + "INTERVAL_SECONDS": "45", + "ANTHROPIC_API_KEY": "sk-test", + } + if diff := cmp.Diff(wantEnv, gotEnv); diff != "" { + t.Errorf("resolved env mismatch (-want +got):\n%s", diff) + } +} + // TestResumeActor_NoWorkers tests that resuming an actor fails when no free workers are available. // Workflow: // 1. Creates a mock ActorTemplate. diff --git a/cmd/servers/ateapi/controlapi/service.go b/cmd/servers/ateapi/controlapi/service.go index 9afd6ac..edd7fd1 100644 --- a/cmd/servers/ateapi/controlapi/service.go +++ b/cmd/servers/ateapi/controlapi/service.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "k8s.io/client-go/kubernetes" ) // Service implements ateapipb.Control @@ -40,12 +41,12 @@ type Service struct { var _ ateapipb.ControlServer = (*Service)(nil) // NewService creates a service. -func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer) *Service { +func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer, kubeClient kubernetes.Interface) *Service { s := &Service{ persistence: persistence, actorTemplateLister: actorTemplateLister, dialer: dialer, - actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister), + actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister, kubeClient), } return s diff --git a/cmd/servers/ateapi/controlapi/workflow.go b/cmd/servers/ateapi/controlapi/workflow.go index 76aba66..94920c7 100644 --- a/cmd/servers/ateapi/controlapi/workflow.go +++ b/cmd/servers/ateapi/controlapi/workflow.go @@ -29,6 +29,7 @@ import ( grpcCodes "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" ) // WorkflowStep represents a single, idempotent operation in a workflow graph. @@ -116,14 +117,16 @@ type ActorWorkflow struct { store store.Interface dialer *AteletDialer actorTemplateLister listersv1alpha1.ActorTemplateLister + kubeClient kubernetes.Interface } // NewActorWorkflow creates a new ActorWorkflow. -func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister) *ActorWorkflow { +func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister, kubeClient kubernetes.Interface) *ActorWorkflow { return &ActorWorkflow{ store: store, dialer: dialer, actorTemplateLister: actorTemplateLister, + kubeClient: kubeClient, } } @@ -146,7 +149,7 @@ func (w *ActorWorkflow) ResumeActor(ctx context.Context, id string, boot bool) ( steps := []WorkflowStep[*ResumeInput, *ResumeState]{ &LoadActorForResumeStep{store: w.store, actorTemplateLister: w.actorTemplateLister}, &AssignWorkerStep{store: w.store}, - &CallAteletRestoreStep{dialer: w.dialer}, + &CallAteletRestoreStep{dialer: w.dialer, kubeClient: w.kubeClient}, &FinalizeRunningStep{store: w.store}, } diff --git a/cmd/servers/ateapi/controlapi/workflow_resume.go b/cmd/servers/ateapi/controlapi/workflow_resume.go index 744422a..24f2305 100644 --- a/cmd/servers/ateapi/controlapi/workflow_resume.go +++ b/cmd/servers/ateapi/controlapi/workflow_resume.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" ) // ResumeInput holds the immutable parameters requested by the client. @@ -157,7 +158,8 @@ func (s *AssignWorkerStep) findFreeWorker(workers []*ateapipb.Worker, workerPool } type CallAteletRestoreStep struct { - dialer *AteletDialer + dialer *AteletDialer + kubeClient kubernetes.Interface } func (s *CallAteletRestoreStep) Name() string { return "CallAteletRestore" } @@ -171,23 +173,9 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput, } client := ateletpb.NewAteomHerderClient(ateletConn) - workloadSpec := &ateletpb.WorkloadSpec{ - PauseImage: state.ActorTemplate.Spec.PauseImage, - } - for _, ctr := range state.ActorTemplate.Spec.Containers { - ateletCtr := &ateletpb.Container{ - Name: ctr.Name, - Image: ctr.Image, - Command: ctr.Command, - } - for _, env := range ctr.Env { - ateletEnv := &ateletpb.EnvEntry{ - Name: env.Name, - Value: env.Value, - } - ateletCtr.Env = append(ateletCtr.Env, ateletEnv) - } - workloadSpec.Containers = append(workloadSpec.Containers, ateletCtr) + workloadSpec, err := workloadSpecFromActorTemplate(ctx, s.kubeClient, state.ActorTemplate) + if err != nil { + return err } runscCfg := &ateletpb.RunscConfig{} diff --git a/cmd/servers/ateapi/controlapi/workload_spec.go b/cmd/servers/ateapi/controlapi/workload_spec.go new file mode 100644 index 0000000..84bdb94 --- /dev/null +++ b/cmd/servers/ateapi/controlapi/workload_spec.go @@ -0,0 +1,229 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "context" + "fmt" + + atev1alpha1 "github.com/agent-substrate/substrate/api/v1alpha1" + "github.com/agent-substrate/substrate/proto/ateletpb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +func workloadSpecFromActorTemplate(ctx context.Context, kubeClient kubernetes.Interface, actorTemplate *atev1alpha1.ActorTemplate) (*ateletpb.WorkloadSpec, error) { + workloadSpec := &ateletpb.WorkloadSpec{ + PauseImage: actorTemplate.Spec.PauseImage, + } + resolver := envResolver{ + kubeClient: kubeClient, + namespace: actorTemplate.Namespace, + configMaps: map[string]*corev1.ConfigMap{}, + secrets: map[string]*corev1.Secret{}, + } + + for _, ctr := range actorTemplate.Spec.Containers { + ateletCtr := &ateletpb.Container{ + Name: ctr.Name, + Image: ctr.Image, + Command: ctr.Command, + } + for _, env := range ctr.Env { + ateletEnv, err := resolver.resolve(ctx, ctr.Name, env) + if err != nil { + return nil, err + } + if ateletEnv != nil { + ateletCtr.Env = append(ateletCtr.Env, ateletEnv) + } + } + workloadSpec.Containers = append(workloadSpec.Containers, ateletCtr) + } + + return workloadSpec, nil +} + +type envResolver struct { + kubeClient kubernetes.Interface + namespace string + configMaps map[string]*corev1.ConfigMap + secrets map[string]*corev1.Secret +} + +func (r *envResolver) resolve(ctx context.Context, containerName string, env corev1.EnvVar) (*ateletpb.EnvEntry, error) { + envID := fmt.Sprintf("container %q env %q", containerName, env.Name) + if env.ValueFrom == nil { + return &ateletpb.EnvEntry{ + Name: env.Name, + Value: env.Value, + }, nil + } + + if env.Value != "" { + return nil, status.Errorf(codes.FailedPrecondition, "%s sets both value and valueFrom", envID) + } + + sources := valueFromSourceCount(env.ValueFrom) + if sources == 0 { + return nil, status.Errorf(codes.FailedPrecondition, "%s valueFrom does not set a source", envID) + } + if sources > 1 { + return nil, status.Errorf(codes.FailedPrecondition, "%s combines multiple valueFrom sources", envID) + } + + value, include, err := r.resolveValueFrom(ctx, envID, env.ValueFrom) + if err != nil { + return nil, err + } + if !include { + return nil, nil + } + + return &ateletpb.EnvEntry{ + Name: env.Name, + Value: value, + }, nil +} + +func (r *envResolver) resolveValueFrom(ctx context.Context, envID string, valueFrom *corev1.EnvVarSource) (string, bool, error) { + if ref := valueFrom.ConfigMapKeyRef; ref != nil { + return r.resolveConfigMapKeyRef(ctx, envID, ref) + } + if ref := valueFrom.SecretKeyRef; ref != nil { + return r.resolveSecretKeyRef(ctx, envID, ref) + } + return "", false, status.Errorf(codes.FailedPrecondition, "%s uses unsupported valueFrom source; only configMapKeyRef and secretKeyRef are supported", envID) +} + +func valueFromSourceCount(src *corev1.EnvVarSource) int { + count := 0 + if src.ConfigMapKeyRef != nil { + count++ + } + if src.SecretKeyRef != nil { + count++ + } + if src.FieldRef != nil { + count++ + } + if src.ResourceFieldRef != nil { + count++ + } + if src.FileKeyRef != nil { + count++ + } + return count +} + +func (r *envResolver) resolveConfigMapKeyRef(ctx context.Context, envID string, ref *corev1.ConfigMapKeySelector) (string, bool, error) { + if ref.Name == "" { + return "", false, status.Errorf(codes.FailedPrecondition, "%s configMapKeyRef.name is required", envID) + } + if ref.Key == "" { + return "", false, status.Errorf(codes.FailedPrecondition, "%s configMapKeyRef.key is required", envID) + } + if r.kubeClient == nil { + return "", false, status.Errorf(codes.FailedPrecondition, "%s cannot resolve configMapKeyRef because Kubernetes client is unavailable", envID) + } + + configMap, err := r.configMap(ctx, ref.Name) + if err != nil { + if apierrors.IsNotFound(err) { + if isOptional(ref.Optional) { + return "", false, nil + } + return "", false, status.Errorf(codes.FailedPrecondition, "%s references missing ConfigMap %s/%s", envID, r.namespace, ref.Name) + } + return "", false, fmt.Errorf("while resolving %s configMapKeyRef %s/%s: %w", envID, r.namespace, ref.Name, err) + } + + value, ok := configMap.Data[ref.Key] + if !ok { + if isOptional(ref.Optional) { + return "", false, nil + } + return "", false, status.Errorf(codes.FailedPrecondition, "%s references missing key %q in ConfigMap %s/%s", envID, ref.Key, r.namespace, ref.Name) + } + + return value, true, nil +} + +func (r *envResolver) resolveSecretKeyRef(ctx context.Context, envID string, ref *corev1.SecretKeySelector) (string, bool, error) { + if ref.Name == "" { + return "", false, status.Errorf(codes.FailedPrecondition, "%s secretKeyRef.name is required", envID) + } + if ref.Key == "" { + return "", false, status.Errorf(codes.FailedPrecondition, "%s secretKeyRef.key is required", envID) + } + if r.kubeClient == nil { + return "", false, status.Errorf(codes.FailedPrecondition, "%s cannot resolve secretKeyRef because Kubernetes client is unavailable", envID) + } + + secret, err := r.secret(ctx, ref.Name) + if err != nil { + if apierrors.IsNotFound(err) { + if isOptional(ref.Optional) { + return "", false, nil + } + return "", false, status.Errorf(codes.FailedPrecondition, "%s references missing secret %s/%s", envID, r.namespace, ref.Name) + } + return "", false, fmt.Errorf("while resolving %s secretKeyRef %s/%s: %w", envID, r.namespace, ref.Name, err) + } + + value, ok := secret.Data[ref.Key] + if !ok { + if isOptional(ref.Optional) { + return "", false, nil + } + return "", false, status.Errorf(codes.FailedPrecondition, "%s references missing key %q in secret %s/%s", envID, ref.Key, r.namespace, ref.Name) + } + + return string(value), true, nil +} + +func (r *envResolver) configMap(ctx context.Context, name string) (*corev1.ConfigMap, error) { + if configMap, ok := r.configMaps[name]; ok { + return configMap, nil + } + + configMap, err := r.kubeClient.CoreV1().ConfigMaps(r.namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + r.configMaps[name] = configMap + return configMap, nil +} + +func (r *envResolver) secret(ctx context.Context, name string) (*corev1.Secret, error) { + if secret, ok := r.secrets[name]; ok { + return secret, nil + } + + secret, err := r.kubeClient.CoreV1().Secrets(r.namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + r.secrets[name] = secret + return secret, nil +} + +func isOptional(optional *bool) bool { + return optional != nil && *optional +} diff --git a/cmd/servers/ateapi/controlapi/workload_spec_test.go b/cmd/servers/ateapi/controlapi/workload_spec_test.go new file mode 100644 index 0000000..c304b9c --- /dev/null +++ b/cmd/servers/ateapi/controlapi/workload_spec_test.go @@ -0,0 +1,291 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "context" + "testing" + + atev1alpha1 "github.com/agent-substrate/substrate/api/v1alpha1" + "github.com/agent-substrate/substrate/proto/ateletpb" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/testing/protocmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func TestWorkloadSpecFromActorTemplateResolvesValueFromEnv(t *testing.T) { + ctx := context.Background() + kubeClient := fake.NewSimpleClientset( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "settings", + Namespace: "agent-ns", + }, + Data: map[string]string{ + "interval": "45", + }, + }, + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api-keys", + Namespace: "agent-ns", + }, + Data: map[string][]byte{ + "anthropic": []byte("sk-test"), + }, + }, + ) + + got, err := workloadSpecFromActorTemplate(ctx, kubeClient, &atev1alpha1.ActorTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tmpl1", + Namespace: "agent-ns", + }, + Spec: atev1alpha1.ActorTemplateSpec{ + PauseImage: "pause", + Containers: []atev1alpha1.Container{ + { + Name: "main", + Image: "main", + Command: []string{"/main"}, + Env: []corev1.EnvVar{ + { + Name: "LITERAL", + Value: "plain", + }, + { + Name: "INTERVAL_SECONDS", + ValueFrom: &corev1.EnvVarSource{ + ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "settings"}, + Key: "interval", + }, + }, + }, + { + Name: "ANTHROPIC_API_KEY", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "api-keys"}, + Key: "anthropic", + }, + }, + }, + }, + }, + }, + }, + }) + if err != nil { + t.Fatalf("workloadSpecFromActorTemplate failed: %v", err) + } + + want := &ateletpb.WorkloadSpec{ + PauseImage: "pause", + Containers: []*ateletpb.Container{ + { + Name: "main", + Image: "main", + Command: []string{"/main"}, + Env: []*ateletpb.EnvEntry{ + {Name: "LITERAL", Value: "plain"}, + {Name: "INTERVAL_SECONDS", Value: "45"}, + {Name: "ANTHROPIC_API_KEY", Value: "sk-test"}, + }, + }, + }, + } + if diff := cmp.Diff(want, got, protocmp.Transform()); diff != "" { + t.Errorf("WorkloadSpec mismatch (-want +got):\n%s", diff) + } +} + +func TestWorkloadSpecFromActorTemplateOptionalConfigMapKeyRefSkipsMissingKey(t *testing.T) { + optional := true + got, err := workloadSpecFromActorTemplate(context.Background(), fake.NewSimpleClientset(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "settings", + Namespace: "agent-ns", + }, + Data: map[string]string{}, + }), &atev1alpha1.ActorTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tmpl1", + Namespace: "agent-ns", + }, + Spec: atev1alpha1.ActorTemplateSpec{ + Containers: []atev1alpha1.Container{ + { + Name: "main", + Image: "main", + Env: []corev1.EnvVar{ + { + Name: "OPTIONAL", + ValueFrom: &corev1.EnvVarSource{ + ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "settings"}, + Key: "missing", + Optional: &optional, + }, + }, + }, + }, + }, + }, + }, + }) + if err != nil { + t.Fatalf("workloadSpecFromActorTemplate failed: %v", err) + } + if len(got.GetContainers()) != 1 { + t.Fatalf("expected one container, got %d", len(got.GetContainers())) + } + if len(got.GetContainers()[0].GetEnv()) != 0 { + t.Fatalf("expected optional missing env to be skipped, got %v", got.GetContainers()[0].GetEnv()) + } +} + +func TestWorkloadSpecFromActorTemplateConfigMapKeyRefMissingConfigMapFails(t *testing.T) { + _, err := workloadSpecFromActorTemplate(context.Background(), fake.NewSimpleClientset(), &atev1alpha1.ActorTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tmpl1", + Namespace: "agent-ns", + }, + Spec: atev1alpha1.ActorTemplateSpec{ + Containers: []atev1alpha1.Container{ + { + Name: "main", + Image: "main", + Env: []corev1.EnvVar{ + { + Name: "REQUIRED", + ValueFrom: &corev1.EnvVarSource{ + ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "missing"}, + Key: "key", + }, + }, + }, + }, + }, + }, + }, + }) + if status.Code(err) != codes.FailedPrecondition { + t.Fatalf("expected FailedPrecondition, got %v: %v", status.Code(err), err) + } +} + +func TestWorkloadSpecFromActorTemplateOptionalSecretKeyRefSkipsMissingSecret(t *testing.T) { + optional := true + got, err := workloadSpecFromActorTemplate(context.Background(), fake.NewSimpleClientset(), &atev1alpha1.ActorTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tmpl1", + Namespace: "agent-ns", + }, + Spec: atev1alpha1.ActorTemplateSpec{ + Containers: []atev1alpha1.Container{ + { + Name: "main", + Image: "main", + Env: []corev1.EnvVar{ + { + Name: "OPTIONAL", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "missing"}, + Key: "key", + Optional: &optional, + }, + }, + }, + }, + }, + }, + }, + }) + if err != nil { + t.Fatalf("workloadSpecFromActorTemplate failed: %v", err) + } + if len(got.GetContainers()) != 1 { + t.Fatalf("expected one container, got %d", len(got.GetContainers())) + } + if len(got.GetContainers()[0].GetEnv()) != 0 { + t.Fatalf("expected optional missing env to be skipped, got %v", got.GetContainers()[0].GetEnv()) + } +} + +func TestWorkloadSpecFromActorTemplateSecretKeyRefMissingSecretFails(t *testing.T) { + _, err := workloadSpecFromActorTemplate(context.Background(), fake.NewSimpleClientset(), &atev1alpha1.ActorTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tmpl1", + Namespace: "agent-ns", + }, + Spec: atev1alpha1.ActorTemplateSpec{ + Containers: []atev1alpha1.Container{ + { + Name: "main", + Image: "main", + Env: []corev1.EnvVar{ + { + Name: "REQUIRED", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "missing"}, + Key: "key", + }, + }, + }, + }, + }, + }, + }, + }) + if status.Code(err) != codes.FailedPrecondition { + t.Fatalf("expected FailedPrecondition, got %v: %v", status.Code(err), err) + } +} + +func TestWorkloadSpecFromActorTemplateUnsupportedValueFromFails(t *testing.T) { + _, err := workloadSpecFromActorTemplate(context.Background(), fake.NewSimpleClientset(), &atev1alpha1.ActorTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Name: "tmpl1", + Namespace: "agent-ns", + }, + Spec: atev1alpha1.ActorTemplateSpec{ + Containers: []atev1alpha1.Container{ + { + Name: "main", + Image: "main", + Env: []corev1.EnvVar{ + { + Name: "FIELD", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}, + }, + }, + }, + }, + }, + }, + }) + if status.Code(err) != codes.FailedPrecondition { + t.Fatalf("expected FailedPrecondition, got %v: %v", status.Code(err), err) + } +} diff --git a/demos/claude-code-multiplex/README.md b/demos/claude-code-multiplex/README.md index 183a716..84d7d46 100644 --- a/demos/claude-code-multiplex/README.md +++ b/demos/claude-code-multiplex/README.md @@ -38,7 +38,7 @@ This guide assumes you know Kubernetes and the general shape of agent runtimes ( | Path | Purpose | |---|---| -| `manifests/claude-code-multiplex/claude-code-multiplex.yaml.tmpl` | Namespace, WorkerPool, ActorTemplates in a single envsubst template | +| `manifests/claude-code-multiplex/claude-code-multiplex.yaml.tmpl` | Namespace, Secret, WorkerPool, and ActorTemplates in a single envsubst template | | `hack/install-demo-claude-code-multiplex.sh` | Sourced by `install-ate.sh`; registers `--deploy-demo-claude-code-multiplex` and `--delete-demo-claude-code-multiplex` | | `demos/claude-code-multiplex/workload/` | The agent container image source (Dockerfile + entrypoint that wires Claude Code; built and pushed by the deploy step) | | `demos/claude-code-multiplex/ui/` | Static dashboard (`index.html` + `server.go`) that talks to the cluster | @@ -57,7 +57,7 @@ BUCKET_NAME=your-substrate-bucket \ ./hack/install-ate.sh --deploy-demo-claude-code-multiplex ``` -This creates the `claude-multiplex-demo` namespace, a 2-pod `WorkerPool`, and three `ActorTemplate` objects named `luna`, `mars`, `orion`. Under the hood, the deploy function builds the workload image with `docker buildx`, pushes it to `${KO_DOCKER_REPO}/claude-multiplex-demo-workload`, resolves the pushed sha256 digest, and substitutes the digest-pinned reference plus `ANTHROPIC_API_KEY` and `BUCKET_NAME` into the manifest template at apply time. +This creates the `claude-multiplex-demo` namespace, an `anthropic-api-key` Secret, a 2-pod `WorkerPool`, and three `ActorTemplate` objects named `agent-luna`, `agent-mars`, `agent-orion`. Under the hood, the deploy function builds the workload image with `docker buildx`, pushes it to `${KO_DOCKER_REPO}/claude-multiplex-demo-workload`, resolves the pushed sha256 digest, and substitutes the digest-pinned reference plus `ANTHROPIC_API_KEY` and `BUCKET_NAME` into the manifest template at apply time. The ActorTemplates consume the key through `valueFrom.secretKeyRef`. ### 2. Start the dashboard @@ -104,10 +104,9 @@ With three agents and two pods, the third agent stays suspended (state snapshott ## Upstream blockers worked around for this demo -This demo currently applies workarounds at runtime for three Substrate issues. Each will be addressed by a separate upstream fix PR; details + workarounds in the linked issue threads. +This demo currently applies workarounds at runtime for two Substrate issues. Each will be addressed by a separate upstream fix PR; details + workarounds in the linked issue threads. - **`#189`** — Atelet OCI bundle gaps (`Args`, `Secret`, symlinks). Bundled fix PR forthcoming. -- **`#197` Bug 2a** — `valueFrom.secretKeyRef` on `ActorTemplate` container env is not supported today. `ANTHROPIC_API_KEY` is passed as a plain `value:` env var (envsubst-substituted at apply time) until upstream support lands. - **`#197` Bug 3** — Atelet symlink resolution. Fix PR forthcoming. > [!NOTE] diff --git a/demos/claude-code-multiplex/workload/run.sh b/demos/claude-code-multiplex/workload/run.sh index b135086..f4d733c 100755 --- a/demos/claude-code-multiplex/workload/run.sh +++ b/demos/claude-code-multiplex/workload/run.sh @@ -20,7 +20,7 @@ # Env vars: # TASK — the prompt to pass to claude-code each tick # INTERVAL_SECONDS — sleep length between ticks (longer = more multiplex headroom) -# ANTHROPIC_API_KEY — required; supplied via Secret mount +# ANTHROPIC_API_KEY — required; supplied via Secret-backed env set -u diff --git a/docs/api-guide.md b/docs/api-guide.md index 5c93cf3..cdf4111 100644 --- a/docs/api-guide.md +++ b/docs/api-guide.md @@ -42,6 +42,8 @@ The `ActorTemplate` defines the code, environment, and state-management policies | `pauseImage` | `string` | **Required.** The image used for the sandbox root (e.g. `gcr.io/gke-release/pause`). | | `runsc` | `RunscConfig` | **Required.** Multi-platform configuration for fetching the gVisor binary. | +Container environment variables support literal `value` entries, `valueFrom.configMapKeyRef`, and `valueFrom.secretKeyRef`. References are resolved by `ate-api-server` from the `ActorTemplate` namespace when an actor is resumed; the default install grants ate-api `get` access for ConfigMaps and Secrets so manifests do not need custom per-namespace RBAC. Other Kubernetes `valueFrom` sources are not supported yet. Since environment variables are part of process state, recreate the template snapshot when rotating a Secret or ConfigMap that an actor reads at startup. + ### Workload Connectivity (Uniform DNS) Substrate has standardized on a **Uniform DNS Mesh**. You no longer need to define `SessionDiscovery` rules. Every actor created from a template is automatically reachable through the **Substrate Router** via its unique ID: diff --git a/manifests/ate-install/ate-api-server.yaml b/manifests/ate-install/ate-api-server.yaml index ec83363..23e3672 100644 --- a/manifests/ate-install/ate-api-server.yaml +++ b/manifests/ate-install/ate-api-server.yaml @@ -22,6 +22,11 @@ rules: - apiGroups: [""] resources: ["pods"] verbs: ["get", "watch", "list"] +# TODO: Decide whether env source resolution should keep using ate-api's +# broad read access or move to a more scoped delegation/admission model. +- apiGroups: [""] + resources: ["configmaps", "secrets"] + verbs: ["get"] - apiGroups: ["ate.dev"] resources: ["actortemplates"] verbs: ["get", "watch", "list"] diff --git a/manifests/claude-code-multiplex/claude-code-multiplex.yaml.tmpl b/manifests/claude-code-multiplex/claude-code-multiplex.yaml.tmpl index 1c2637b..a2e8eda 100644 --- a/manifests/claude-code-multiplex/claude-code-multiplex.yaml.tmpl +++ b/manifests/claude-code-multiplex/claude-code-multiplex.yaml.tmpl @@ -13,9 +13,8 @@ # limitations under the License. # Three ActorTemplates share a 2-pod WorkerPool, so substrate must suspend -# at least one actor at any moment. ANTHROPIC_API_KEY is passed as a plain -# env var per dberkov's PR #203 review — substrate does not currently -# support `valueFrom.secretKeyRef` on ActorTemplate container env. +# at least one actor at any moment. ANTHROPIC_API_KEY is stored in a Secret +# and referenced from ActorTemplate container env with valueFrom.secretKeyRef. # # WORKLOAD_IMAGE is the resolved sha256-digest reference for the # claude-multiplex-demo-workload image — built and pushed to @@ -29,6 +28,17 @@ metadata: --- +apiVersion: v1 +kind: Secret +metadata: + name: anthropic-api-key + namespace: claude-multiplex-demo +type: Opaque +stringData: + api-key: "${ANTHROPIC_API_KEY}" + +--- + # 2 worker replicas for 3 actors — the multiplex pressure that makes the # substrate suspend/resume behavior visible. apiVersion: ate.dev/v1alpha1 @@ -72,7 +82,10 @@ spec: - name: INTERVAL_SECONDS value: "45" - name: ANTHROPIC_API_KEY - value: "${ANTHROPIC_API_KEY}" + valueFrom: + secretKeyRef: + name: anthropic-api-key + key: api-key workerPoolRef: namespace: claude-multiplex-demo name: claude-workerpool @@ -106,7 +119,10 @@ spec: - name: INTERVAL_SECONDS value: "45" - name: ANTHROPIC_API_KEY - value: "${ANTHROPIC_API_KEY}" + valueFrom: + secretKeyRef: + name: anthropic-api-key + key: api-key workerPoolRef: namespace: claude-multiplex-demo name: claude-workerpool @@ -140,7 +156,10 @@ spec: - name: INTERVAL_SECONDS value: "45" - name: ANTHROPIC_API_KEY - value: "${ANTHROPIC_API_KEY}" + valueFrom: + secretKeyRef: + name: anthropic-api-key + key: api-key workerPoolRef: namespace: claude-multiplex-demo name: claude-workerpool