From c747e7917dc235a28f35bf91f71ca0cbbc317555 Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Thu, 9 Apr 2026 15:47:27 -0700 Subject: [PATCH 1/3] feat: ability to match agents to specific resources --- .../pkg/selector/jobagents.go | 60 ++- .../controllers/deploymentplan/controller.go | 22 +- .../svc/controllers/jobdispatch/getters.go | 1 + .../jobdispatch/getters_postgres.go | 11 + .../svc/controllers/jobdispatch/reconcile.go | 12 +- .../controllers/jobdispatch/reconcile_test.go | 12 + .../controllers/jobeligibility/reconcile.go | 7 +- .../jobeligibility/reconcile_test.go | 379 ++++++++++++++++++ .../test/controllers/harness/mocks.go | 10 + 9 files changed, 501 insertions(+), 13 deletions(-) diff --git a/apps/workspace-engine/pkg/selector/jobagents.go b/apps/workspace-engine/pkg/selector/jobagents.go index 41cd86bb4..76d52a783 100644 --- a/apps/workspace-engine/pkg/selector/jobagents.go +++ b/apps/workspace-engine/pkg/selector/jobagents.go @@ -14,6 +14,11 @@ var jobAgentEnv, _ = celutil.NewEnvBuilder(). WithStandardExtensions(). BuildCached(12 * time.Hour) +var jobAgentWithResourceEnv, _ = celutil.NewEnvBuilder(). + WithMapVariables("jobAgent", "resource"). + WithStandardExtensions(). + BuildCached(12 * time.Hour) + func jobAgentToMap(a *oapi.JobAgent) map[string]any { m := make(map[string]any, 6) m["id"] = a.Id @@ -27,13 +32,61 @@ func jobAgentToMap(a *oapi.JobAgent) map[string]any { return m } +func resourceToMap(r *oapi.Resource) map[string]any { + m := make(map[string]any, 13) + m["id"] = r.Id + m["identifier"] = r.Identifier + m["name"] = r.Name + m["kind"] = r.Kind + m["version"] = r.Version + m["workspaceId"] = r.WorkspaceId + m["config"] = r.Config + m["metadata"] = r.Metadata + m["createdAt"] = r.CreatedAt + if r.ProviderId != nil { + m["providerId"] = *r.ProviderId + } + if r.UpdatedAt != nil { + m["updatedAt"] = *r.UpdatedAt + } + if r.DeletedAt != nil { + m["deletedAt"] = *r.DeletedAt + } + if r.LockedAt != nil { + m["lockedAt"] = *r.LockedAt + } + return m +} + // MatchJobAgents evaluates a CEL job agent selector against a list of job // agents and returns those that match. If the selector is empty or "false", // no agents match. func MatchJobAgents( - ctx context.Context, + _ context.Context, selector string, agents []oapi.JobAgent, +) ([]oapi.JobAgent, error) { + return matchJobAgents(jobAgentEnv, selector, agents, nil) +} + +// MatchJobAgentsWithResource evaluates a CEL job agent selector against a list +// of job agents with the resource available in the CEL context, and returns +// those that match. This allows selectors to reference resource properties, +// e.g. "jobAgent.config.server == resource.config.argocd.serverUrl". +func MatchJobAgentsWithResource( + _ context.Context, + selector string, + agents []oapi.JobAgent, + resource *oapi.Resource, +) ([]oapi.JobAgent, error) { + return matchJobAgents(jobAgentWithResourceEnv, selector, agents, resource) +} + +func matchJobAgents( + env *celutil.CompiledEnv, + selector string, + agents []oapi.JobAgent, + resource *oapi.Resource, ) ([]oapi.JobAgent, error) { if selector == "" || selector == "false" { return nil, nil @@ -43,7 +96,7 @@ func MatchJobAgents( return agents, nil } - prg, err := jobAgentEnv.Compile(selector) + prg, err := env.Compile(selector) if err != nil { return nil, fmt.Errorf("compile job agent selector: %w", err) } @@ -53,6 +106,9 @@ func MatchJobAgents( vars := map[string]any{ "jobAgent": jobAgentToMap(&agents[i]), } + if resource != nil { + vars["resource"] = resourceToMap(resource) + } ok, err := celutil.EvalBool(prg, vars) if err != nil { return nil, fmt.Errorf("eval job agent selector for agent %s: %w", agents[i].Id, err) diff --git a/apps/workspace-engine/svc/controllers/deploymentplan/controller.go b/apps/workspace-engine/svc/controllers/deploymentplan/controller.go index e315a39f2..56143eaca 100644 --- a/apps/workspace-engine/svc/controllers/deploymentplan/controller.go +++ b/apps/workspace-engine/svc/controllers/deploymentplan/controller.go @@ -79,12 +79,7 @@ func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcil return reconcile.Result{}, fmt.Errorf("list job agents: %w", err) } - matchedAgents, err := selector.MatchJobAgents(ctx, deployment.JobAgentSelector, allAgents) - if err != nil { - return reconcile.Result{}, fmt.Errorf("match job agents: %w", err) - } - - if len(matchedAgents) == 0 { + if len(allAgents) == 0 { if err := c.setter.CompletePlan(ctx, planID); err != nil { return reconcile.Result{}, fmt.Errorf("mark plan completed: %w", err) } @@ -122,7 +117,7 @@ func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcil ctx, plan, deployment, - matchedAgents, + allAgents, version, target, ); err != nil { @@ -161,6 +156,15 @@ func (c *Controller) processTarget( return fmt.Errorf("get resource %s: %w", target.ResourceID, err) } + matchedAgents, err := selector.MatchJobAgentsWithResource(ctx, deployment.JobAgentSelector, agents, resource) + if err != nil { + return fmt.Errorf("match job agents for resource %s: %w", target.ResourceID, err) + } + + if len(matchedAgents) == 0 { + return nil + } + scope := &variableresolver.Scope{ Resource: resource, Deployment: deployment, @@ -174,8 +178,8 @@ func (c *Controller) processTarget( return fmt.Errorf("resolve variables: %w", err) } - for i := range agents { - agent := &agents[i] + for i := range matchedAgents { + agent := &matchedAgents[i] mergedConfig := oapi.DeepMergeConfigs( agent.Config, deployment.JobAgentConfig, version.JobAgentConfig, diff --git a/apps/workspace-engine/svc/controllers/jobdispatch/getters.go b/apps/workspace-engine/svc/controllers/jobdispatch/getters.go index 5e0aa2bb2..a719c9b69 100644 --- a/apps/workspace-engine/svc/controllers/jobdispatch/getters.go +++ b/apps/workspace-engine/svc/controllers/jobdispatch/getters.go @@ -11,6 +11,7 @@ type Getter interface { GetJob(ctx context.Context, jobID uuid.UUID) (*oapi.Job, error) GetRelease(ctx context.Context, releaseID uuid.UUID) (*oapi.Release, error) GetDeployment(ctx context.Context, deploymentID uuid.UUID) (*oapi.Deployment, error) + GetResource(ctx context.Context, resourceID uuid.UUID) (*oapi.Resource, error) GetJobAgent(ctx context.Context, jobAgentID uuid.UUID) (*oapi.JobAgent, error) ListJobAgentsByWorkspaceID(ctx context.Context, workspaceID uuid.UUID) ([]oapi.JobAgent, error) GetVerificationPolicies( diff --git a/apps/workspace-engine/svc/controllers/jobdispatch/getters_postgres.go b/apps/workspace-engine/svc/controllers/jobdispatch/getters_postgres.go index d3e1607ee..3d5e4bb4f 100644 --- a/apps/workspace-engine/svc/controllers/jobdispatch/getters_postgres.go +++ b/apps/workspace-engine/svc/controllers/jobdispatch/getters_postgres.go @@ -59,6 +59,17 @@ func (p *PostgresGetter) GetDeployment( return db.ToOapiDeployment(row), nil } +func (p *PostgresGetter) GetResource( + ctx context.Context, + resourceID uuid.UUID, +) (*oapi.Resource, error) { + row, err := db.GetQueries(ctx).GetResourceByID(ctx, resourceID) + if err != nil { + return nil, err + } + return db.ToOapiResource(row), nil +} + func (p *PostgresGetter) ListJobAgentsByWorkspaceID( ctx context.Context, workspaceID uuid.UUID, diff --git a/apps/workspace-engine/svc/controllers/jobdispatch/reconcile.go b/apps/workspace-engine/svc/controllers/jobdispatch/reconcile.go index 07454067d..831c64d5f 100644 --- a/apps/workspace-engine/svc/controllers/jobdispatch/reconcile.go +++ b/apps/workspace-engine/svc/controllers/jobdispatch/reconcile.go @@ -69,12 +69,22 @@ func getJobAgents( return nil, fmt.Errorf("deployment job agent selector is empty") } + resourceID, err := uuid.Parse(release.ReleaseTarget.ResourceId) + if err != nil { + return nil, fmt.Errorf("parse resource id: %w", err) + } + + resource, err := getter.GetResource(ctx, resourceID) + if err != nil { + return nil, fmt.Errorf("get resource: %w", err) + } + allAgents, err := getter.ListJobAgentsByWorkspaceID(ctx, workspaceID) if err != nil { return nil, fmt.Errorf("list job agents: %w", err) } - matched, err := selector.MatchJobAgents(ctx, deployment.JobAgentSelector, allAgents) + matched, err := selector.MatchJobAgentsWithResource(ctx, deployment.JobAgentSelector, allAgents, resource) if err != nil { return nil, fmt.Errorf("match job agents: %w", err) } diff --git a/apps/workspace-engine/svc/controllers/jobdispatch/reconcile_test.go b/apps/workspace-engine/svc/controllers/jobdispatch/reconcile_test.go index 1e8d50b7f..1231014ed 100644 --- a/apps/workspace-engine/svc/controllers/jobdispatch/reconcile_test.go +++ b/apps/workspace-engine/svc/controllers/jobdispatch/reconcile_test.go @@ -21,6 +21,8 @@ type mockGetter struct { releaseErr error deployment *oapi.Deployment deploymentErr error + resource *oapi.Resource + resourceErr error jobAgents map[string]*oapi.JobAgent jobAgentErr error workspaceAgents []oapi.JobAgent @@ -40,6 +42,16 @@ func (m *mockGetter) GetDeployment(_ context.Context, _ uuid.UUID) (*oapi.Deploy return m.deployment, m.deploymentErr } +func (m *mockGetter) GetResource(_ context.Context, _ uuid.UUID) (*oapi.Resource, error) { + if m.resource != nil || m.resourceErr != nil { + return m.resource, m.resourceErr + } + return &oapi.Resource{ + Config: map[string]interface{}{}, + Metadata: map[string]string{}, + }, nil +} + func (m *mockGetter) GetJobAgent(_ context.Context, id uuid.UUID) (*oapi.JobAgent, error) { if m.jobAgentErr != nil { return nil, m.jobAgentErr diff --git a/apps/workspace-engine/svc/controllers/jobeligibility/reconcile.go b/apps/workspace-engine/svc/controllers/jobeligibility/reconcile.go index 91106e5db..8630d982f 100644 --- a/apps/workspace-engine/svc/controllers/jobeligibility/reconcile.go +++ b/apps/workspace-engine/svc/controllers/jobeligibility/reconcile.go @@ -210,13 +210,18 @@ func (r *reconciler) buildAndDispatchJob(ctx context.Context) error { return r.createFailureJob(ctx, oapi.JobStatusInvalidJobAgent, msg) } + resource, err := r.getter.GetResource(ctx, r.rt.ResourceID) + if err != nil { + return recordErr(span, "get resource", err) + } + allAgents, err := r.getter.ListJobAgentsByWorkspaceID(ctx, r.workspaceID) if err != nil { return recordErr(span, "list job agents", err) } span.SetAttributes(attribute.Int("workspace_agents.count", len(allAgents))) - matchedAgents, err := selector.MatchJobAgents(ctx, deployment.JobAgentSelector, allAgents) + matchedAgents, err := selector.MatchJobAgentsWithResource(ctx, deployment.JobAgentSelector, allAgents, resource) if err != nil { return recordErr(span, "match job agents", err) } diff --git a/apps/workspace-engine/svc/controllers/jobeligibility/reconcile_test.go b/apps/workspace-engine/svc/controllers/jobeligibility/reconcile_test.go index 01cdbe5a0..2cf301510 100644 --- a/apps/workspace-engine/svc/controllers/jobeligibility/reconcile_test.go +++ b/apps/workspace-engine/svc/controllers/jobeligibility/reconcile_test.go @@ -1751,3 +1751,382 @@ func TestReconcile_SelectorByName_MatchesSingle(t *testing.T) { require.Len(t, setter.createdJobs, 1) assert.Equal(t, prod.Id, setter.createdJobs[0].JobAgentId) } + +// --------------------------------------------------------------------------- +// 15. Resource-aware job agent selector +// --------------------------------------------------------------------------- + +func TestReconcile_ResourceAwareSelector_MatchesAgentByResourceConfig(t *testing.T) { + rt := testRT() + release := testRelease(rt) + + argoUS := &oapi.JobAgent{ + Id: uuid.New().String(), + Name: "argo-us", + Type: "argo-cd", + Config: oapi.JobAgentConfig{ + "server": "https://argo-us.example.com", + }, + } + argoEU := &oapi.JobAgent{ + Id: uuid.New().String(), + Name: "argo-eu", + Type: "argo-cd", + Config: oapi.JobAgentConfig{ + "server": "https://argo-eu.example.com", + }, + } + + deployment := testDeployment(rt) + deployment.JobAgentSelector = `jobAgent.config.server == resource.config.argocd.serverUrl` + + getter := &mockGetter{ + rtExists: true, + release: release, + jobs: []*oapi.Job{}, + policies: []*oapi.Policy{}, + deployment: deployment, + jobAgents: map[string]*oapi.JobAgent{argoUS.Id: argoUS, argoEU.Id: argoEU}, + workspaceAgents: []oapi.JobAgent{*argoUS, *argoEU}, + environment: &oapi.Environment{ + Id: rt.EnvironmentID.String(), + Name: "test-env", + Metadata: map[string]string{}, + }, + resource: &oapi.Resource{ + Id: rt.ResourceID.String(), + Name: "us-cluster", + Identifier: "us-cluster", + Kind: "kubernetes", + Metadata: map[string]string{}, + Config: map[string]any{ + "argocd": map[string]any{ + "serverUrl": "https://argo-us.example.com", + }, + }, + }, + } + setter := &mockSetter{} + + _, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + require.NoError(t, err) + require.Len(t, setter.createdJobs, 1) + assert.Equal(t, argoUS.Id, setter.createdJobs[0].JobAgentId) +} + +func TestReconcile_ResourceAwareSelector_NoMatch(t *testing.T) { + rt := testRT() + release := testRelease(rt) + + agent := &oapi.JobAgent{ + Id: uuid.New().String(), + Name: "argo-us", + Type: "argo-cd", + Config: oapi.JobAgentConfig{ + "server": "https://argo-us.example.com", + }, + } + + deployment := testDeployment(rt) + deployment.JobAgentSelector = `jobAgent.config.server == resource.config.argocd.serverUrl` + + getter := &mockGetter{ + rtExists: true, + release: release, + jobs: []*oapi.Job{}, + policies: []*oapi.Policy{}, + deployment: deployment, + jobAgents: map[string]*oapi.JobAgent{agent.Id: agent}, + workspaceAgents: []oapi.JobAgent{*agent}, + environment: &oapi.Environment{ + Id: rt.EnvironmentID.String(), + Name: "test-env", + Metadata: map[string]string{}, + }, + resource: &oapi.Resource{ + Id: rt.ResourceID.String(), + Name: "eu-cluster", + Identifier: "eu-cluster", + Kind: "kubernetes", + Metadata: map[string]string{}, + Config: map[string]any{ + "argocd": map[string]any{ + "serverUrl": "https://argo-eu.example.com", + }, + }, + }, + } + setter := &mockSetter{} + + _, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + require.NoError(t, err) + require.Len(t, setter.createdJobs, 1) + assert.Equal(t, oapi.JobStatusInvalidJobAgent, setter.createdJobs[0].Status) +} + +func TestReconcile_ResourceAwareSelector_MatchesByMetadata(t *testing.T) { + rt := testRT() + release := testRelease(rt) + + agentProd := &oapi.JobAgent{ + Id: uuid.New().String(), + Name: "prod-agent", + Type: "argo-cd", + Config: oapi.JobAgentConfig{ + "region": "us-east-1", + }, + } + agentStaging := &oapi.JobAgent{ + Id: uuid.New().String(), + Name: "staging-agent", + Type: "argo-cd", + Config: oapi.JobAgentConfig{ + "region": "eu-west-1", + }, + } + + deployment := testDeployment(rt) + deployment.JobAgentSelector = `jobAgent.config.region == resource.metadata.region` + + getter := &mockGetter{ + rtExists: true, + release: release, + jobs: []*oapi.Job{}, + policies: []*oapi.Policy{}, + deployment: deployment, + jobAgents: map[string]*oapi.JobAgent{ + agentProd.Id: agentProd, + agentStaging.Id: agentStaging, + }, + workspaceAgents: []oapi.JobAgent{*agentProd, *agentStaging}, + environment: &oapi.Environment{ + Id: rt.EnvironmentID.String(), + Name: "test-env", + Metadata: map[string]string{}, + }, + resource: &oapi.Resource{ + Id: rt.ResourceID.String(), + Name: "us-east-cluster", + Identifier: "us-east-cluster", + Kind: "kubernetes", + Metadata: map[string]string{"region": "us-east-1"}, + Config: map[string]any{}, + }, + } + setter := &mockSetter{} + + _, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + require.NoError(t, err) + require.Len(t, setter.createdJobs, 1) + assert.Equal(t, agentProd.Id, setter.createdJobs[0].JobAgentId) +} + +func TestReconcile_ResourceAwareSelector_MultipleAgentsMatch(t *testing.T) { + rt := testRT() + release := testRelease(rt) + + agent1 := &oapi.JobAgent{ + Id: uuid.New().String(), + Name: "argo-1", + Type: "argo-cd", + Config: oapi.JobAgentConfig{ + "cluster": "prod", + }, + } + agent2 := &oapi.JobAgent{ + Id: uuid.New().String(), + Name: "argo-2", + Type: "argo-cd", + Config: oapi.JobAgentConfig{ + "cluster": "prod", + }, + } + agent3 := &oapi.JobAgent{ + Id: uuid.New().String(), + Name: "argo-staging", + Type: "argo-cd", + Config: oapi.JobAgentConfig{ + "cluster": "staging", + }, + } + + deployment := testDeployment(rt) + deployment.JobAgentSelector = `jobAgent.config.cluster == resource.metadata.cluster` + + getter := &mockGetter{ + rtExists: true, + release: release, + jobs: []*oapi.Job{}, + policies: []*oapi.Policy{}, + deployment: deployment, + jobAgents: map[string]*oapi.JobAgent{ + agent1.Id: agent1, + agent2.Id: agent2, + agent3.Id: agent3, + }, + workspaceAgents: []oapi.JobAgent{*agent1, *agent2, *agent3}, + environment: &oapi.Environment{ + Id: rt.EnvironmentID.String(), + Name: "test-env", + Metadata: map[string]string{}, + }, + resource: &oapi.Resource{ + Id: rt.ResourceID.String(), + Name: "prod-cluster", + Identifier: "prod-cluster", + Kind: "kubernetes", + Metadata: map[string]string{"cluster": "prod"}, + Config: map[string]any{}, + }, + } + setter := &mockSetter{} + + _, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + require.NoError(t, err) + require.Len(t, setter.createdJobs, 2) + + agentIDs := map[string]bool{ + setter.createdJobs[0].JobAgentId: true, + setter.createdJobs[1].JobAgentId: true, + } + assert.True(t, agentIDs[agent1.Id], "agent1 should match") + assert.True(t, agentIDs[agent2.Id], "agent2 should match") + assert.False(t, agentIDs[agent3.Id], "staging agent should not match") +} + +func TestReconcile_ResourceAwareSelector_MixedWithJobAgentFields(t *testing.T) { + rt := testRT() + release := testRelease(rt) + + argoUS := &oapi.JobAgent{ + Id: uuid.New().String(), + Name: "argo-us", + Type: "argo-cd", + Config: oapi.JobAgentConfig{ + "server": "https://argo-us.example.com", + }, + } + githubUS := &oapi.JobAgent{ + Id: uuid.New().String(), + Name: "github-us", + Type: "github-app", + Config: oapi.JobAgentConfig{ + "server": "https://argo-us.example.com", + }, + } + + deployment := testDeployment(rt) + deployment.JobAgentSelector = `jobAgent.type == "argo-cd" && jobAgent.config.server == resource.config.argocd.serverUrl` + + getter := &mockGetter{ + rtExists: true, + release: release, + jobs: []*oapi.Job{}, + policies: []*oapi.Policy{}, + deployment: deployment, + jobAgents: map[string]*oapi.JobAgent{ + argoUS.Id: argoUS, + githubUS.Id: githubUS, + }, + workspaceAgents: []oapi.JobAgent{*argoUS, *githubUS}, + environment: &oapi.Environment{ + Id: rt.EnvironmentID.String(), + Name: "test-env", + Metadata: map[string]string{}, + }, + resource: &oapi.Resource{ + Id: rt.ResourceID.String(), + Name: "us-cluster", + Identifier: "us-cluster", + Kind: "kubernetes", + Metadata: map[string]string{}, + Config: map[string]any{ + "argocd": map[string]any{ + "serverUrl": "https://argo-us.example.com", + }, + }, + }, + } + setter := &mockSetter{} + + _, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + require.NoError(t, err) + require.Len(t, setter.createdJobs, 1) + assert.Equal(t, argoUS.Id, setter.createdJobs[0].JobAgentId) +} + +func TestReconcile_ResourceAwareSelector_MissingResourceConfigKey(t *testing.T) { + rt := testRT() + release := testRelease(rt) + + agent := &oapi.JobAgent{ + Id: uuid.New().String(), + Name: "argo-agent", + Type: "argo-cd", + Config: oapi.JobAgentConfig{ + "server": "https://argo.example.com", + }, + } + + deployment := testDeployment(rt) + deployment.JobAgentSelector = `jobAgent.config.server == resource.config.argocd.serverUrl` + + getter := &mockGetter{ + rtExists: true, + release: release, + jobs: []*oapi.Job{}, + policies: []*oapi.Policy{}, + deployment: deployment, + jobAgents: map[string]*oapi.JobAgent{agent.Id: agent}, + workspaceAgents: []oapi.JobAgent{*agent}, + environment: &oapi.Environment{ + Id: rt.EnvironmentID.String(), + Name: "test-env", + Metadata: map[string]string{}, + }, + resource: &oapi.Resource{ + Id: rt.ResourceID.String(), + Name: "cluster-no-argocd", + Identifier: "cluster-no-argocd", + Kind: "kubernetes", + Metadata: map[string]string{}, + Config: map[string]any{}, + }, + } + setter := &mockSetter{} + + _, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + require.NoError(t, err) + require.Len(t, setter.createdJobs, 1) + assert.Equal(t, oapi.JobStatusInvalidJobAgent, setter.createdJobs[0].Status) +} + +func TestReconcile_ResourceAwareSelector_GetResourceFails(t *testing.T) { + rt := testRT() + release := testRelease(rt) + + agent := testAgent() + deployment := testDeployment(rt) + deployment.JobAgentSelector = `jobAgent.config.server == resource.config.argocd.serverUrl` + + getter := &mockGetter{ + rtExists: true, + release: release, + jobs: []*oapi.Job{}, + policies: []*oapi.Policy{}, + deployment: deployment, + jobAgents: map[string]*oapi.JobAgent{agent.Id: agent}, + workspaceAgents: []oapi.JobAgent{*agent}, + environment: &oapi.Environment{ + Id: rt.EnvironmentID.String(), + Name: "test-env", + Metadata: map[string]string{}, + }, + resourceErr: fmt.Errorf("resource not found"), + } + setter := &mockSetter{} + + _, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + require.Error(t, err) + assert.Contains(t, err.Error(), "get resource") +} diff --git a/apps/workspace-engine/test/controllers/harness/mocks.go b/apps/workspace-engine/test/controllers/harness/mocks.go index da1e6da05..64ed1c8c4 100644 --- a/apps/workspace-engine/test/controllers/harness/mocks.go +++ b/apps/workspace-engine/test/controllers/harness/mocks.go @@ -615,6 +615,16 @@ func (g *JobDispatchGetter) GetDeployment( return g.Deployment, nil } +func (g *JobDispatchGetter) GetResource( + _ context.Context, + _ uuid.UUID, +) (*oapi.Resource, error) { + return &oapi.Resource{ + Config: map[string]interface{}{}, + Metadata: map[string]string{}, + }, nil +} + func (g *JobDispatchGetter) GetJobAgent( _ context.Context, jobAgentID uuid.UUID, From 8ba1088f732d2df34929a3cfa2631e58832cf93d Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Thu, 9 Apr 2026 16:05:48 -0700 Subject: [PATCH 2/3] cleanup --- .../pkg/selector/jobagents.go | 33 +++---------------- .../http/server/openapi/deployments/server.go | 29 +++++++++++++--- 2 files changed, 29 insertions(+), 33 deletions(-) diff --git a/apps/workspace-engine/pkg/selector/jobagents.go b/apps/workspace-engine/pkg/selector/jobagents.go index 76d52a783..f749db7b8 100644 --- a/apps/workspace-engine/pkg/selector/jobagents.go +++ b/apps/workspace-engine/pkg/selector/jobagents.go @@ -9,11 +9,6 @@ import ( "workspace-engine/pkg/oapi" ) -var jobAgentEnv, _ = celutil.NewEnvBuilder(). - WithMapVariable("jobAgent"). - WithStandardExtensions(). - BuildCached(12 * time.Hour) - var jobAgentWithResourceEnv, _ = celutil.NewEnvBuilder(). WithMapVariables("jobAgent", "resource"). WithStandardExtensions(). @@ -58,17 +53,6 @@ func resourceToMap(r *oapi.Resource) map[string]any { return m } -// MatchJobAgents evaluates a CEL job agent selector against a list of job -// agents and returns those that match. If the selector is empty or "false", -// no agents match. -func MatchJobAgents( - _ context.Context, - selector string, - agents []oapi.JobAgent, -) ([]oapi.JobAgent, error) { - return matchJobAgents(jobAgentEnv, selector, agents, nil) -} - // MatchJobAgentsWithResource evaluates a CEL job agent selector against a list // of job agents with the resource available in the CEL context, and returns // those that match. This allows selectors to reference resource properties, @@ -78,15 +62,6 @@ func MatchJobAgentsWithResource( selector string, agents []oapi.JobAgent, resource *oapi.Resource, -) ([]oapi.JobAgent, error) { - return matchJobAgents(jobAgentWithResourceEnv, selector, agents, resource) -} - -func matchJobAgents( - env *celutil.CompiledEnv, - selector string, - agents []oapi.JobAgent, - resource *oapi.Resource, ) ([]oapi.JobAgent, error) { if selector == "" || selector == "false" { return nil, nil @@ -96,18 +71,18 @@ func matchJobAgents( return agents, nil } - prg, err := env.Compile(selector) + prg, err := jobAgentWithResourceEnv.Compile(selector) if err != nil { return nil, fmt.Errorf("compile job agent selector: %w", err) } + resourceMap := resourceToMap(resource) + var matched []oapi.JobAgent for i := range agents { vars := map[string]any{ "jobAgent": jobAgentToMap(&agents[i]), - } - if resource != nil { - vars["resource"] = resourceToMap(resource) + "resource": resourceMap, } ok, err := celutil.EvalBool(prg, vars) if err != nil { diff --git a/apps/workspace-engine/svc/http/server/openapi/deployments/server.go b/apps/workspace-engine/svc/http/server/openapi/deployments/server.go index cea936154..36fa161d2 100644 --- a/apps/workspace-engine/svc/http/server/openapi/deployments/server.go +++ b/apps/workspace-engine/svc/http/server/openapi/deployments/server.go @@ -45,14 +45,35 @@ func (d *Deployments) GetJobAgentsForDeployment(c *gin.Context, deploymentId str oapiAgents[i] = *db.ToOapiJobAgent(row) } - matched, err := selector.MatchJobAgents(ctx, deployment.JobAgentSelector, oapiAgents) + releaseTargets, err := queries.GetReleaseTargetsForDeployment(ctx, deploymentUUID) if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to evaluate selector: " + err.Error()}) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get release targets"}) return } - if matched == nil { - matched = []oapi.JobAgent{} + agentSet := make(map[string]oapi.JobAgent) + + for _, rt := range releaseTargets { + resourceRow, err := queries.GetResourceByID(ctx, rt.ResourceID) + if err != nil { + continue + } + resource := db.ToOapiResource(resourceRow) + + agents, err := selector.MatchJobAgentsWithResource(ctx, deployment.JobAgentSelector, oapiAgents, resource) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to evaluate selector: " + err.Error()}) + return + } + + for _, agent := range agents { + agentSet[agent.Id] = agent + } + } + + matched := make([]oapi.JobAgent, 0, len(agentSet)) + for _, agent := range agentSet { + matched = append(matched, agent) } c.JSON(http.StatusOK, gin.H{"items": matched}) From 60e60afa67f2275f37ade81cf5e9b24c30da3eb2 Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Thu, 9 Apr 2026 16:06:05 -0700 Subject: [PATCH 3/3] lint --- .../svc/controllers/deploymentplan/controller.go | 7 ++++++- .../svc/controllers/jobdispatch/reconcile.go | 7 ++++++- .../svc/controllers/jobdispatch/reconcile_test.go | 2 +- .../svc/controllers/jobeligibility/reconcile.go | 7 ++++++- .../svc/http/server/openapi/deployments/server.go | 12 ++++++++++-- .../test/controllers/harness/mocks.go | 2 +- 6 files changed, 30 insertions(+), 7 deletions(-) diff --git a/apps/workspace-engine/svc/controllers/deploymentplan/controller.go b/apps/workspace-engine/svc/controllers/deploymentplan/controller.go index 56143eaca..e77ce53f2 100644 --- a/apps/workspace-engine/svc/controllers/deploymentplan/controller.go +++ b/apps/workspace-engine/svc/controllers/deploymentplan/controller.go @@ -156,7 +156,12 @@ func (c *Controller) processTarget( return fmt.Errorf("get resource %s: %w", target.ResourceID, err) } - matchedAgents, err := selector.MatchJobAgentsWithResource(ctx, deployment.JobAgentSelector, agents, resource) + matchedAgents, err := selector.MatchJobAgentsWithResource( + ctx, + deployment.JobAgentSelector, + agents, + resource, + ) if err != nil { return fmt.Errorf("match job agents for resource %s: %w", target.ResourceID, err) } diff --git a/apps/workspace-engine/svc/controllers/jobdispatch/reconcile.go b/apps/workspace-engine/svc/controllers/jobdispatch/reconcile.go index 831c64d5f..03b82aa1d 100644 --- a/apps/workspace-engine/svc/controllers/jobdispatch/reconcile.go +++ b/apps/workspace-engine/svc/controllers/jobdispatch/reconcile.go @@ -84,7 +84,12 @@ func getJobAgents( return nil, fmt.Errorf("list job agents: %w", err) } - matched, err := selector.MatchJobAgentsWithResource(ctx, deployment.JobAgentSelector, allAgents, resource) + matched, err := selector.MatchJobAgentsWithResource( + ctx, + deployment.JobAgentSelector, + allAgents, + resource, + ) if err != nil { return nil, fmt.Errorf("match job agents: %w", err) } diff --git a/apps/workspace-engine/svc/controllers/jobdispatch/reconcile_test.go b/apps/workspace-engine/svc/controllers/jobdispatch/reconcile_test.go index 1231014ed..ab60bdf38 100644 --- a/apps/workspace-engine/svc/controllers/jobdispatch/reconcile_test.go +++ b/apps/workspace-engine/svc/controllers/jobdispatch/reconcile_test.go @@ -47,7 +47,7 @@ func (m *mockGetter) GetResource(_ context.Context, _ uuid.UUID) (*oapi.Resource return m.resource, m.resourceErr } return &oapi.Resource{ - Config: map[string]interface{}{}, + Config: map[string]any{}, Metadata: map[string]string{}, }, nil } diff --git a/apps/workspace-engine/svc/controllers/jobeligibility/reconcile.go b/apps/workspace-engine/svc/controllers/jobeligibility/reconcile.go index 8630d982f..4ad29c2c5 100644 --- a/apps/workspace-engine/svc/controllers/jobeligibility/reconcile.go +++ b/apps/workspace-engine/svc/controllers/jobeligibility/reconcile.go @@ -221,7 +221,12 @@ func (r *reconciler) buildAndDispatchJob(ctx context.Context) error { } span.SetAttributes(attribute.Int("workspace_agents.count", len(allAgents))) - matchedAgents, err := selector.MatchJobAgentsWithResource(ctx, deployment.JobAgentSelector, allAgents, resource) + matchedAgents, err := selector.MatchJobAgentsWithResource( + ctx, + deployment.JobAgentSelector, + allAgents, + resource, + ) if err != nil { return recordErr(span, "match job agents", err) } diff --git a/apps/workspace-engine/svc/http/server/openapi/deployments/server.go b/apps/workspace-engine/svc/http/server/openapi/deployments/server.go index 36fa161d2..b3a8a39b4 100644 --- a/apps/workspace-engine/svc/http/server/openapi/deployments/server.go +++ b/apps/workspace-engine/svc/http/server/openapi/deployments/server.go @@ -60,9 +60,17 @@ func (d *Deployments) GetJobAgentsForDeployment(c *gin.Context, deploymentId str } resource := db.ToOapiResource(resourceRow) - agents, err := selector.MatchJobAgentsWithResource(ctx, deployment.JobAgentSelector, oapiAgents, resource) + agents, err := selector.MatchJobAgentsWithResource( + ctx, + deployment.JobAgentSelector, + oapiAgents, + resource, + ) if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to evaluate selector: " + err.Error()}) + c.JSON( + http.StatusBadRequest, + gin.H{"error": "Failed to evaluate selector: " + err.Error()}, + ) return } diff --git a/apps/workspace-engine/test/controllers/harness/mocks.go b/apps/workspace-engine/test/controllers/harness/mocks.go index 64ed1c8c4..7b3963586 100644 --- a/apps/workspace-engine/test/controllers/harness/mocks.go +++ b/apps/workspace-engine/test/controllers/harness/mocks.go @@ -620,7 +620,7 @@ func (g *JobDispatchGetter) GetResource( _ uuid.UUID, ) (*oapi.Resource, error) { return &oapi.Resource{ - Config: map[string]interface{}{}, + Config: map[string]any{}, Metadata: map[string]string{}, }, nil }