Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions apps/workspace-engine/pkg/selector/jobagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"workspace-engine/pkg/oapi"
)

var jobAgentEnv, _ = celutil.NewEnvBuilder().
WithMapVariable("jobAgent").
var jobAgentWithResourceEnv, _ = celutil.NewEnvBuilder().
WithMapVariables("jobAgent", "resource").
WithStandardExtensions().
BuildCached(12 * time.Hour)

Expand All @@ -27,13 +27,41 @@ func jobAgentToMap(a *oapi.JobAgent) map[string]any {
return m
}

// MatchJobAgents evaluates a CEL job agent selector against a list of job
// agents and returns those that match. If the selector is empty or "false",
// no agents match.
func MatchJobAgents(
ctx context.Context,
func resourceToMap(r *oapi.Resource) map[string]any {
m := make(map[string]any, 13)
m["id"] = r.Id
m["identifier"] = r.Identifier
m["name"] = r.Name
m["kind"] = r.Kind
m["version"] = r.Version
m["workspaceId"] = r.WorkspaceId
m["config"] = r.Config
m["metadata"] = r.Metadata
m["createdAt"] = r.CreatedAt
if r.ProviderId != nil {
m["providerId"] = *r.ProviderId
}
if r.UpdatedAt != nil {
m["updatedAt"] = *r.UpdatedAt
}
if r.DeletedAt != nil {
m["deletedAt"] = *r.DeletedAt
}
if r.LockedAt != nil {
m["lockedAt"] = *r.LockedAt
}
return m
}

// MatchJobAgentsWithResource evaluates a CEL job agent selector against a list
// of job agents with the resource available in the CEL context, and returns
// those that match. This allows selectors to reference resource properties,
// e.g. "jobAgent.config.server == resource.config.argocd.serverUrl".
func MatchJobAgentsWithResource(
_ context.Context,
selector string,
agents []oapi.JobAgent,
resource *oapi.Resource,
) ([]oapi.JobAgent, error) {
if selector == "" || selector == "false" {
return nil, nil
Expand All @@ -43,15 +71,18 @@ func MatchJobAgents(
return agents, nil
}

prg, err := jobAgentEnv.Compile(selector)
prg, err := jobAgentWithResourceEnv.Compile(selector)
if err != nil {
return nil, fmt.Errorf("compile job agent selector: %w", err)
}

resourceMap := resourceToMap(resource)

var matched []oapi.JobAgent
for i := range agents {
vars := map[string]any{
"jobAgent": jobAgentToMap(&agents[i]),
"resource": resourceMap,
}
Comment on lines 83 to +86
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resourceToMap(resource) is recomputed for every agent during selector evaluation, allocating and populating the same map repeatedly. Precompute the mapped resource once (outside the agent loop) and reuse it in vars to reduce allocations and CPU when evaluating selectors against many agents.

Copilot uses AI. Check for mistakes.
ok, err := celutil.EvalBool(prg, vars)
if err != nil {
Expand Down
27 changes: 18 additions & 9 deletions apps/workspace-engine/svc/controllers/deploymentplan/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,7 @@ func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcil
return reconcile.Result{}, fmt.Errorf("list job agents: %w", err)
}

matchedAgents, err := selector.MatchJobAgents(ctx, deployment.JobAgentSelector, allAgents)
if err != nil {
return reconcile.Result{}, fmt.Errorf("match job agents: %w", err)
}

if len(matchedAgents) == 0 {
if len(allAgents) == 0 {
if err := c.setter.CompletePlan(ctx, planID); err != nil {
return reconcile.Result{}, fmt.Errorf("mark plan completed: %w", err)
}
Expand Down Expand Up @@ -122,7 +117,7 @@ func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcil
ctx,
plan,
deployment,
matchedAgents,
allAgents,
version,
target,
); err != nil {
Expand Down Expand Up @@ -161,6 +156,20 @@ func (c *Controller) processTarget(
return fmt.Errorf("get resource %s: %w", target.ResourceID, err)
}

matchedAgents, err := selector.MatchJobAgentsWithResource(
ctx,
deployment.JobAgentSelector,
agents,
resource,
)
if err != nil {
return fmt.Errorf("match job agents for resource %s: %w", target.ResourceID, err)
}

if len(matchedAgents) == 0 {
return nil
}

scope := &variableresolver.Scope{
Resource: resource,
Deployment: deployment,
Expand All @@ -174,8 +183,8 @@ func (c *Controller) processTarget(
return fmt.Errorf("resolve variables: %w", err)
}

for i := range agents {
agent := &agents[i]
for i := range matchedAgents {
agent := &matchedAgents[i]

mergedConfig := oapi.DeepMergeConfigs(
agent.Config, deployment.JobAgentConfig, version.JobAgentConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Getter interface {
GetJob(ctx context.Context, jobID uuid.UUID) (*oapi.Job, error)
GetRelease(ctx context.Context, releaseID uuid.UUID) (*oapi.Release, error)
GetDeployment(ctx context.Context, deploymentID uuid.UUID) (*oapi.Deployment, error)
GetResource(ctx context.Context, resourceID uuid.UUID) (*oapi.Resource, error)
GetJobAgent(ctx context.Context, jobAgentID uuid.UUID) (*oapi.JobAgent, error)
ListJobAgentsByWorkspaceID(ctx context.Context, workspaceID uuid.UUID) ([]oapi.JobAgent, error)
GetVerificationPolicies(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ func (p *PostgresGetter) GetDeployment(
return db.ToOapiDeployment(row), nil
}

func (p *PostgresGetter) GetResource(
ctx context.Context,
resourceID uuid.UUID,
) (*oapi.Resource, error) {
row, err := db.GetQueries(ctx).GetResourceByID(ctx, resourceID)
if err != nil {
return nil, err
}
return db.ToOapiResource(row), nil
}

func (p *PostgresGetter) ListJobAgentsByWorkspaceID(
ctx context.Context,
workspaceID uuid.UUID,
Expand Down
17 changes: 16 additions & 1 deletion apps/workspace-engine/svc/controllers/jobdispatch/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,27 @@ func getJobAgents(
return nil, fmt.Errorf("deployment job agent selector is empty")
}

resourceID, err := uuid.Parse(release.ReleaseTarget.ResourceId)
if err != nil {
return nil, fmt.Errorf("parse resource id: %w", err)
}

resource, err := getter.GetResource(ctx, resourceID)
if err != nil {
return nil, fmt.Errorf("get resource: %w", err)
}

allAgents, err := getter.ListJobAgentsByWorkspaceID(ctx, workspaceID)
if err != nil {
return nil, fmt.Errorf("list job agents: %w", err)
}

matched, err := selector.MatchJobAgents(ctx, deployment.JobAgentSelector, allAgents)
matched, err := selector.MatchJobAgentsWithResource(
ctx,
deployment.JobAgentSelector,
allAgents,
resource,
)
if err != nil {
return nil, fmt.Errorf("match job agents: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type mockGetter struct {
releaseErr error
deployment *oapi.Deployment
deploymentErr error
resource *oapi.Resource
resourceErr error
jobAgents map[string]*oapi.JobAgent
jobAgentErr error
workspaceAgents []oapi.JobAgent
Expand All @@ -40,6 +42,16 @@ func (m *mockGetter) GetDeployment(_ context.Context, _ uuid.UUID) (*oapi.Deploy
return m.deployment, m.deploymentErr
}

func (m *mockGetter) GetResource(_ context.Context, _ uuid.UUID) (*oapi.Resource, error) {
if m.resource != nil || m.resourceErr != nil {
return m.resource, m.resourceErr
}
return &oapi.Resource{
Config: map[string]any{},
Metadata: map[string]string{},
}, nil
}

func (m *mockGetter) GetJobAgent(_ context.Context, id uuid.UUID) (*oapi.JobAgent, error) {
if m.jobAgentErr != nil {
return nil, m.jobAgentErr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,23 @@ func (r *reconciler) buildAndDispatchJob(ctx context.Context) error {
return r.createFailureJob(ctx, oapi.JobStatusInvalidJobAgent, msg)
}

resource, err := r.getter.GetResource(ctx, r.rt.ResourceID)
if err != nil {
return recordErr(span, "get resource", err)
}

allAgents, err := r.getter.ListJobAgentsByWorkspaceID(ctx, r.workspaceID)
if err != nil {
return recordErr(span, "list job agents", err)
}
span.SetAttributes(attribute.Int("workspace_agents.count", len(allAgents)))

matchedAgents, err := selector.MatchJobAgents(ctx, deployment.JobAgentSelector, allAgents)
matchedAgents, err := selector.MatchJobAgentsWithResource(
ctx,
deployment.JobAgentSelector,
allAgents,
resource,
)
if err != nil {
return recordErr(span, "match job agents", err)
}
Expand Down
Loading
Loading