Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/servers/ateapi/ateapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func main() {
ateFactory.WaitForCacheSync(stopCh)

dialer := controlapi.NewAteletDialer(workerPodInformer.GetIndexer(), ateletPodInformer.GetIndexer())
sm := controlapi.NewService(redisPersistence, actorTemplateLister, dialer)
sm := controlapi.NewService(redisPersistence, actorTemplateLister, dialer, clientset)

sessionIdentitySrv := sessionidentity.New(*clientJWTIssuer, *clientJWTAudience, *sessionIDJWTPoolFile, *sessionIDCAPoolFile, *workerpoolCACerts)

Expand Down
151 changes: 138 additions & 13 deletions cmd/servers/ateapi/controlapi/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -145,24 +146,30 @@ type FakeAteletServer struct {

Lock sync.Mutex

RunCalled bool
RunCalled bool
RunRequest *ateletpb.RunRequest

CheckpointCalled bool
CheckpointCalled bool
CheckpointRequest *ateletpb.CheckpointRequest

RestoreCalled bool
FailRestore error
RestoreDelay time.Duration
RestoreCalled bool
RestoreRequest *ateletpb.RestoreRequest
FailRestore error
RestoreDelay time.Duration
}

func (f *FakeAteletServer) Reset() {
f.Lock.Lock()
defer f.Lock.Unlock()

f.RunCalled = false
f.RunRequest = nil

f.CheckpointCalled = false
f.CheckpointRequest = nil

f.RestoreCalled = false
f.RestoreRequest = nil
f.FailRestore = nil
f.RestoreDelay = 0
}
Expand All @@ -172,6 +179,7 @@ func (f *FakeAteletServer) Run(ctx context.Context, req *ateletpb.RunRequest) (*
defer f.Lock.Unlock()

f.RunCalled = true
f.RunRequest = proto.Clone(req).(*ateletpb.RunRequest)

return &ateletpb.RunResponse{}, nil
}
Expand All @@ -181,6 +189,7 @@ func (f *FakeAteletServer) Checkpoint(ctx context.Context, req *ateletpb.Checkpo
defer f.Lock.Unlock()

f.CheckpointCalled = true
f.CheckpointRequest = proto.Clone(req).(*ateletpb.CheckpointRequest)

return &ateletpb.CheckpointResponse{}, nil
}
Expand All @@ -190,6 +199,7 @@ func (f *FakeAteletServer) Restore(ctx context.Context, req *ateletpb.RestoreReq
defer f.Lock.Unlock()

f.RestoreCalled = true
f.RestoreRequest = proto.Clone(req).(*ateletpb.RestoreRequest)
if f.RestoreDelay > 0 {
time.Sleep(f.RestoreDelay)
}
Expand All @@ -199,6 +209,16 @@ func (f *FakeAteletServer) Restore(ctx context.Context, req *ateletpb.RestoreReq
return &ateletpb.RestoreResponse{}, nil
}

func (f *FakeAteletServer) lastRestoreRequest() *ateletpb.RestoreRequest {
f.Lock.Lock()
defer f.Lock.Unlock()

if f.RestoreRequest == nil {
return nil
}
return proto.Clone(f.RestoreRequest).(*ateletpb.RestoreRequest)
}

type testContext struct {
mr *miniredis.Miniredis
service *Service
Expand Down Expand Up @@ -260,7 +280,7 @@ func setupTest(t *testing.T, ns string) *testContext {

// 4. Initialize Service
dialer := NewAteletDialer(workerInformer.GetIndexer(), ateletInformer.GetIndexer())
service := NewService(persistence, actorTemplateLister, dialer)
service := NewService(persistence, actorTemplateLister, dialer, k8sClient)

// 5. Start REAL gRPC Server for ATE API
grpcServer := grpc.NewServer(grpc.UnaryInterceptor(StatusErrorInterceptor))
Expand Down Expand Up @@ -329,6 +349,17 @@ func namespaceForTest(baseName string) string {
}

func createTemplate(t *testing.T, tc *testContext, ns string) {
t.Helper()
createTemplateWithContainers(t, tc, ns, []atev1alpha1.Container{
{
Name: "main",
Image: "main",
Command: []string{"/main"},
},
})
}

func createTemplateWithContainers(t *testing.T, tc *testContext, ns string, containers []atev1alpha1.Container) {
t.Helper()
actorTemplate := &atev1alpha1.ActorTemplate{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -343,13 +374,7 @@ func createTemplate(t *testing.T, tc *testContext, ns string) {
},
},
PauseImage: "pause",
Containers: []atev1alpha1.Container{
{
Name: "main",
Image: "main",
Command: []string{"/main"},
},
},
Containers: containers,
WorkerPoolRef: corev1.ObjectReference{
Namespace: ns,
Name: "pool1",
Expand Down Expand Up @@ -767,6 +792,106 @@ func TestResumeActor(t *testing.T) {
}
}

func TestResumeActorResolvesValueFromEnv(t *testing.T) {
ns := namespaceForTest("ns-resume-secret-env")
tc := setupTest(t, ns)
defer tc.cleanup()

_, err := tc.k8sClient.CoreV1().ConfigMaps(ns).Create(context.Background(), &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "settings",
Namespace: ns,
},
Data: map[string]string{
"interval": "45",
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create ConfigMap: %v", err)
}

_, err = tc.k8sClient.CoreV1().Secrets(ns).Create(context.Background(), &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "api-keys",
Namespace: ns,
},
Data: map[string][]byte{
"anthropic": []byte("sk-test"),
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create secret: %v", err)
}

createTemplateWithContainers(t, tc, ns, []atev1alpha1.Container{
{
Name: "main",
Image: "main",
Command: []string{"/main"},
Env: []corev1.EnvVar{
{
Name: "LITERAL",
Value: "plain",
},
{
Name: "INTERVAL_SECONDS",
ValueFrom: &corev1.EnvVarSource{
ConfigMapKeyRef: &corev1.ConfigMapKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: "settings"},
Key: "interval",
},
},
},
{
Name: "ANTHROPIC_API_KEY",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: "api-keys"},
Key: "anthropic",
},
},
},
},
},
})
createWorkerPod(t, tc, ns, "worker-1", "node1")

_, err = tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{
ActorTemplateNamespace: ns,
ActorTemplateName: "tmpl1",
ActorId: "id1",
})
if err != nil {
t.Fatalf("CreateActor failed: %v", err)
}
_, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{
ActorId: "id1",
})
if err != nil {
t.Fatalf("ResumeActor failed: %v", err)
}

restoreReq := tc.fakeAtelet.lastRestoreRequest()
if restoreReq == nil {
t.Fatalf("expected Restore to be called")
}
if len(restoreReq.GetSpec().GetContainers()) != 1 {
t.Fatalf("expected one container in restore request, got %d", len(restoreReq.GetSpec().GetContainers()))
}
gotEnv := map[string]string{}
for _, env := range restoreReq.GetSpec().GetContainers()[0].GetEnv() {
gotEnv[env.GetName()] = env.GetValue()
}
wantEnv := map[string]string{
"LITERAL": "plain",
"INTERVAL_SECONDS": "45",
"ANTHROPIC_API_KEY": "sk-test",
}
if diff := cmp.Diff(wantEnv, gotEnv); diff != "" {
t.Errorf("resolved env mismatch (-want +got):\n%s", diff)
}
}

// TestResumeActor_NoWorkers tests that resuming an actor fails when no free workers are available.
// Workflow:
// 1. Creates a mock ActorTemplate.
Expand Down
5 changes: 3 additions & 2 deletions cmd/servers/ateapi/controlapi/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/client-go/kubernetes"
)

// Service implements ateapipb.Control
Expand All @@ -40,12 +41,12 @@ type Service struct {
var _ ateapipb.ControlServer = (*Service)(nil)

// NewService creates a service.
func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer) *Service {
func NewService(persistence store.Interface, actorTemplateLister listersv1alpha1.ActorTemplateLister, dialer *AteletDialer, kubeClient kubernetes.Interface) *Service {
s := &Service{
persistence: persistence,
actorTemplateLister: actorTemplateLister,
dialer: dialer,
actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister),
actorWorkflow: NewActorWorkflow(persistence, dialer, actorTemplateLister, kubeClient),
}

return s
Expand Down
7 changes: 5 additions & 2 deletions cmd/servers/ateapi/controlapi/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
grpcCodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)

// WorkflowStep represents a single, idempotent operation in a workflow graph.
Expand Down Expand Up @@ -116,14 +117,16 @@ type ActorWorkflow struct {
store store.Interface
dialer *AteletDialer
actorTemplateLister listersv1alpha1.ActorTemplateLister
kubeClient kubernetes.Interface
}

// NewActorWorkflow creates a new ActorWorkflow.
func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister) *ActorWorkflow {
func NewActorWorkflow(store store.Interface, dialer *AteletDialer, actorTemplateLister listersv1alpha1.ActorTemplateLister, kubeClient kubernetes.Interface) *ActorWorkflow {
return &ActorWorkflow{
store: store,
dialer: dialer,
actorTemplateLister: actorTemplateLister,
kubeClient: kubeClient,
}
}

Expand All @@ -146,7 +149,7 @@ func (w *ActorWorkflow) ResumeActor(ctx context.Context, id string, boot bool) (
steps := []WorkflowStep[*ResumeInput, *ResumeState]{
&LoadActorForResumeStep{store: w.store, actorTemplateLister: w.actorTemplateLister},
&AssignWorkerStep{store: w.store},
&CallAteletRestoreStep{dialer: w.dialer},
&CallAteletRestoreStep{dialer: w.dialer, kubeClient: w.kubeClient},
&FinalizeRunningStep{store: w.store},
}

Expand Down
24 changes: 6 additions & 18 deletions cmd/servers/ateapi/controlapi/workflow_resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)

// ResumeInput holds the immutable parameters requested by the client.
Expand Down Expand Up @@ -157,7 +158,8 @@ func (s *AssignWorkerStep) findFreeWorker(workers []*ateapipb.Worker, workerPool
}

type CallAteletRestoreStep struct {
dialer *AteletDialer
dialer *AteletDialer
kubeClient kubernetes.Interface
}

func (s *CallAteletRestoreStep) Name() string { return "CallAteletRestore" }
Expand All @@ -171,23 +173,9 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput,
}
client := ateletpb.NewAteomHerderClient(ateletConn)

workloadSpec := &ateletpb.WorkloadSpec{
PauseImage: state.ActorTemplate.Spec.PauseImage,
}
for _, ctr := range state.ActorTemplate.Spec.Containers {
ateletCtr := &ateletpb.Container{
Name: ctr.Name,
Image: ctr.Image,
Command: ctr.Command,
}
for _, env := range ctr.Env {
ateletEnv := &ateletpb.EnvEntry{
Name: env.Name,
Value: env.Value,
}
ateletCtr.Env = append(ateletCtr.Env, ateletEnv)
}
workloadSpec.Containers = append(workloadSpec.Containers, ateletCtr)
workloadSpec, err := workloadSpecFromActorTemplate(ctx, s.kubeClient, state.ActorTemplate)
if err != nil {
return err
}

runscCfg := &ateletpb.RunscConfig{}
Expand Down
Loading
Loading