Skip to content

Commit 6e0d001

Browse files
refactor: job dispatcher only fetches single job agent (#968)
1 parent 33ebbed commit 6e0d001

5 files changed

Lines changed: 67 additions & 225 deletions

File tree

apps/workspace-engine/svc/controllers/jobdispatch/controller.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,6 @@ func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcil
5252
return reconcile.Result{}, fmt.Errorf("parse job id: %w", err)
5353
}
5454

55-
workspaceID, err := uuid.Parse(item.WorkspaceID)
56-
if err != nil {
57-
return reconcile.Result{}, fmt.Errorf("parse workspace id: %w", err)
58-
}
59-
6055
job, err := c.getter.GetJob(ctx, jobID)
6156
if err != nil {
6257
span.RecordError(err)
@@ -66,7 +61,7 @@ func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcil
6661

6762
span.SetAttributes(attribute.String("job", fmt.Sprintf("%+v", job)))
6863

69-
result, err := c.reconcileJob(ctx, workspaceID, jobID, job)
64+
result, err := c.reconcileJob(ctx, jobID, job)
7065
if err != nil {
7166
span.RecordError(err)
7267
span.SetStatus(codes.Error, err.Error())
@@ -95,7 +90,6 @@ func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcil
9590

9691
func (c *Controller) reconcileJob(
9792
ctx context.Context,
98-
workspaceID uuid.UUID,
9993
jobID uuid.UUID,
10094
job *oapi.Job,
10195
) (*ReconcileResult, error) {
@@ -106,7 +100,7 @@ func (c *Controller) reconcileJob(
106100
if isWorkflowJob {
107101
return ReconcileWorkflowJob(ctx, c.dispatcher, job)
108102
}
109-
return Reconcile(ctx, c.getter, c.setter, c.verifier, c.dispatcher, workspaceID, job)
103+
return Reconcile(ctx, c.getter, c.setter, c.verifier, c.dispatcher, job)
110104
}
111105

112106
// NewController creates a Controller with the given dependencies.

apps/workspace-engine/svc/controllers/jobdispatch/getters.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@ import (
1010
type Getter interface {
1111
GetJob(ctx context.Context, jobID uuid.UUID) (*oapi.Job, error)
1212
GetRelease(ctx context.Context, releaseID uuid.UUID) (*oapi.Release, error)
13-
GetDeployment(ctx context.Context, deploymentID uuid.UUID) (*oapi.Deployment, error)
14-
GetResource(ctx context.Context, resourceID uuid.UUID) (*oapi.Resource, error)
1513
GetJobAgent(ctx context.Context, jobAgentID uuid.UUID) (*oapi.JobAgent, error)
16-
ListJobAgentsByWorkspaceID(ctx context.Context, workspaceID uuid.UUID) ([]oapi.JobAgent, error)
1714
GetVerificationPolicies(
1815
ctx context.Context,
1916
rt *ReleaseTarget,

apps/workspace-engine/svc/controllers/jobdispatch/getters_postgres.go

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -48,43 +48,6 @@ func (p *PostgresGetter) GetRelease(
4848
return release, nil
4949
}
5050

51-
func (p *PostgresGetter) GetDeployment(
52-
ctx context.Context,
53-
deploymentID uuid.UUID,
54-
) (*oapi.Deployment, error) {
55-
row, err := db.GetQueries(ctx).GetDeploymentByID(ctx, deploymentID)
56-
if err != nil {
57-
return nil, err
58-
}
59-
return db.ToOapiDeployment(row), nil
60-
}
61-
62-
func (p *PostgresGetter) GetResource(
63-
ctx context.Context,
64-
resourceID uuid.UUID,
65-
) (*oapi.Resource, error) {
66-
row, err := db.GetQueries(ctx).GetResourceByID(ctx, resourceID)
67-
if err != nil {
68-
return nil, err
69-
}
70-
return db.ToOapiResource(row), nil
71-
}
72-
73-
func (p *PostgresGetter) ListJobAgentsByWorkspaceID(
74-
ctx context.Context,
75-
workspaceID uuid.UUID,
76-
) ([]oapi.JobAgent, error) {
77-
rows, err := db.GetQueries(ctx).ListJobAgentsByWorkspaceID(ctx, workspaceID)
78-
if err != nil {
79-
return nil, err
80-
}
81-
agents := make([]oapi.JobAgent, len(rows))
82-
for i, row := range rows {
83-
agents[i] = *db.ToOapiJobAgent(row)
84-
}
85-
return agents, nil
86-
}
87-
8851
func (p *PostgresGetter) GetJobAgent(
8952
ctx context.Context,
9053
jobAgentID uuid.UUID,

apps/workspace-engine/svc/controllers/jobdispatch/reconcile.go

Lines changed: 16 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"go.opentelemetry.io/otel/codes"
1111
"go.opentelemetry.io/otel/trace"
1212
"workspace-engine/pkg/oapi"
13-
"workspace-engine/pkg/selector"
1413
"workspace-engine/svc/controllers/jobdispatch/verification"
1514
)
1615

@@ -35,110 +34,13 @@ func getRelease(ctx context.Context, getter Getter, job *oapi.Job) (*oapi.Releas
3534
return release, nil
3635
}
3736

38-
func getDeployment(
39-
ctx context.Context,
40-
getter Getter,
41-
release *oapi.Release,
42-
) (*oapi.Deployment, error) {
43-
ctx, span := tracer.Start(ctx, "jobdispatch.getDeployment")
44-
defer span.End()
45-
46-
deploymentID := uuid.MustParse(release.Version.DeploymentId)
47-
deployment, err := getter.GetDeployment(ctx, deploymentID)
48-
if err != nil {
49-
return nil, recordErr(span, "get deployment", err)
50-
}
51-
return deployment, nil
52-
}
53-
54-
func getJobAgents(
55-
ctx context.Context,
56-
getter Getter,
57-
workspaceID uuid.UUID,
58-
release *oapi.Release,
59-
) ([]oapi.JobAgent, error) {
60-
ctx, span := tracer.Start(ctx, "jobdispatch.getJobAgents")
61-
defer span.End()
62-
63-
deployment, err := getDeployment(ctx, getter, release)
64-
if err != nil {
65-
return nil, err
66-
}
67-
68-
if deployment.JobAgentSelector == "" {
69-
return nil, fmt.Errorf("deployment job agent selector is empty")
70-
}
71-
72-
resourceID, err := uuid.Parse(release.ReleaseTarget.ResourceId)
73-
if err != nil {
74-
return nil, fmt.Errorf("parse resource id: %w", err)
75-
}
76-
77-
resource, err := getter.GetResource(ctx, resourceID)
78-
if err != nil {
79-
return nil, fmt.Errorf("get resource: %w", err)
80-
}
81-
82-
allAgents, err := getter.ListJobAgentsByWorkspaceID(ctx, workspaceID)
83-
if err != nil {
84-
return nil, fmt.Errorf("list job agents: %w", err)
85-
}
86-
87-
matched, err := selector.MatchJobAgentsWithResource(
88-
ctx,
89-
deployment.JobAgentSelector,
90-
allAgents,
91-
resource,
92-
)
93-
if err != nil {
94-
return nil, fmt.Errorf("match job agents: %w", err)
95-
}
96-
97-
return matched, nil
98-
}
99-
100-
func getAgentSpecs(
101-
ctx context.Context,
102-
verifier AgentVerifier,
103-
getter Getter,
104-
workspaceID uuid.UUID,
105-
release *oapi.Release,
106-
) ([]oapi.VerificationMetricSpec, error) {
107-
ctx, span := tracer.Start(ctx, "jobdispatch.getAgentSpecs")
108-
defer span.End()
109-
110-
if verifier == nil {
111-
return nil, nil
112-
}
113-
114-
agents, err := getJobAgents(ctx, getter, workspaceID, release)
115-
if err != nil {
116-
return nil, err
117-
}
118-
119-
specs := make([]oapi.VerificationMetricSpec, 0)
120-
for _, agent := range agents {
121-
agentSpecs, err := verifier.AgentVerifications(agent.Type, agent.Config)
122-
if err != nil {
123-
return nil, recordErr(
124-
span,
125-
fmt.Sprintf("get agent verifications for agent %s", agent.Id),
126-
err,
127-
)
128-
}
129-
specs = append(specs, agentSpecs...)
130-
}
131-
return specs, nil
132-
}
133-
13437
// Reconcile dispatches a job and enqueues verifications for the job.
13538
func Reconcile(
13639
ctx context.Context,
13740
getter Getter,
13841
setter Setter,
13942
verifier AgentVerifier,
14043
dispatcher Dispatcher,
141-
workspaceID uuid.UUID,
14244
job *oapi.Job,
14345
) (*ReconcileResult, error) {
14446
ctx, span := tracer.Start(ctx, "jobdispatch.Reconcile")
@@ -149,29 +51,31 @@ func Reconcile(
14951
return nil, err
15052
}
15153

152-
agents, err := getJobAgents(ctx, getter, workspaceID, release)
153-
if err != nil {
154-
span.RecordError(err)
155-
span.SetStatus(codes.Error, err.Error())
156-
return nil, err
157-
}
158-
if len(agents) == 0 {
159-
span.AddEvent("no job agents matched selector for deployment")
160-
return &ReconcileResult{}, nil
161-
}
162-
16354
releaseTarget := &ReleaseTarget{
16455
DeploymentID: uuid.MustParse(release.ReleaseTarget.DeploymentId),
16556
EnvironmentID: uuid.MustParse(release.ReleaseTarget.EnvironmentId),
16657
ResourceID: uuid.MustParse(release.ReleaseTarget.ResourceId),
16758
}
16859

169-
policySpecs, err := getter.GetVerificationPolicies(ctx, releaseTarget)
60+
agentUUID, err := uuid.Parse(job.JobAgentId)
17061
if err != nil {
171-
return nil, err
62+
return nil, recordErr(span, "parse job agent id", err)
17263
}
17364

174-
agentSpecs, err := getAgentSpecs(ctx, verifier, getter, workspaceID, release)
65+
agent, err := getter.GetJobAgent(ctx, agentUUID)
66+
if err != nil {
67+
return nil, recordErr(span, "get job agent", err)
68+
}
69+
70+
var agentSpecs []oapi.VerificationMetricSpec
71+
if verifier != nil {
72+
agentSpecs, err = verifier.AgentVerifications(agent.Type, job.JobAgentConfig)
73+
if err != nil {
74+
return nil, err
75+
}
76+
}
77+
78+
policySpecs, err := getter.GetVerificationPolicies(ctx, releaseTarget)
17579
if err != nil {
17680
return nil, err
17781
}

0 commit comments

Comments
 (0)