diff --git a/apps/workspace-engine/pkg/jobagents/terraformcloud/tfe_plan.go b/apps/workspace-engine/pkg/jobagents/terraformcloud/tfe_plan.go new file mode 100644 index 000000000..58ba6534b --- /dev/null +++ b/apps/workspace-engine/pkg/jobagents/terraformcloud/tfe_plan.go @@ -0,0 +1,195 @@ +package terraformcloud + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "time" + + "workspace-engine/pkg/jobagents/types" + "workspace-engine/pkg/oapi" +) + +const planTimeout = 5 * time.Minute + +var _ types.Plannable = (*TFCPlanner)(nil) + +// WorkspaceSetup handles workspace provisioning for a plan. +type WorkspaceSetup interface { + Setup(ctx context.Context, dispatchCtx *oapi.DispatchContext) (workspaceID string, err error) +} + +// SpeculativeRunner creates and reads speculative (plan-only) runs. +type SpeculativeRunner interface { + CreateSpeculativeRun( + ctx context.Context, + cfg *tfeConfig, + workspaceID string, + ) (runID string, err error) + ReadRunStatus(ctx context.Context, cfg *tfeConfig, runID string) (*RunStatus, error) + ReadPlanJSON(ctx context.Context, cfg *tfeConfig, planID string) ([]byte, error) +} + +// RunStatus is the information read back from a TFC run. +type RunStatus struct { + Status string + PlanID string + ResourceAdditions int + ResourceChanges int + ResourceDestructions int + IsFinished bool + IsErrored bool +} + +type TFCPlanner struct { + workspace WorkspaceSetup + runner SpeculativeRunner +} + +func NewTFCPlanner(workspace WorkspaceSetup, runner SpeculativeRunner) *TFCPlanner { + return &TFCPlanner{workspace: workspace, runner: runner} +} + +func (p *TFCPlanner) Type() string { + return "tfe" +} + +type tfePlanState struct { + RunID string `json:"runId"` + PollCount int `json:"pollCount"` + FirstPolled *time.Time `json:"firstPolled,omitempty"` +} + +func (p *TFCPlanner) Plan( + ctx context.Context, + dispatchCtx *oapi.DispatchContext, + state json.RawMessage, +) (*types.PlanResult, error) { + cfg, err := parseJobAgentConfig(dispatchCtx.JobAgentConfig) + if err != nil { + return nil, err + } + + var s tfePlanState + if state != nil { + if err := json.Unmarshal(state, &s); err != nil { + return nil, fmt.Errorf("unmarshal plan state: %w", err) + } + } + + if s.RunID == "" { + workspaceID, err := p.workspace.Setup(ctx, dispatchCtx) + if err != nil { + return nil, fmt.Errorf("setup workspace: %w", err) + } + return p.createRun(ctx, cfg, workspaceID) + } + + return p.pollRun(ctx, cfg, s) +} + +func (p *TFCPlanner) createRun( + ctx context.Context, + cfg *tfeConfig, + workspaceID string, +) (*types.PlanResult, error) { + runID, err := p.runner.CreateSpeculativeRun(ctx, cfg, workspaceID) + if err != nil { + return nil, fmt.Errorf("create speculative run: %w", err) + } + + now := time.Now() + s := tfePlanState{ + RunID: runID, + PollCount: 0, + FirstPolled: &now, + } + + stateJSON, err := json.Marshal(s) + if err != nil { + return nil, fmt.Errorf("marshal plan state: %w", err) + } + + return &types.PlanResult{ + State: stateJSON, + Message: fmt.Sprintf("Speculative run %s created, waiting for plan", runID), + }, nil +} + +func (p *TFCPlanner) pollRun( + ctx context.Context, + cfg *tfeConfig, + s tfePlanState, +) (*types.PlanResult, error) { + status, err := p.runner.ReadRunStatus(ctx, cfg, s.RunID) + if err != nil { + return nil, fmt.Errorf("read run %s: %w", s.RunID, err) + } + + s.PollCount++ + + if status.IsFinished { + return p.completePlan(ctx, cfg, status) + } + + if status.IsErrored { + now := time.Now() + return &types.PlanResult{ + CompletedAt: &now, + Message: fmt.Sprintf("Run %s ended with status: %s", s.RunID, status.Status), + }, nil + } + + if s.FirstPolled != nil && time.Since(*s.FirstPolled) > planTimeout { + now := time.Now() + return &types.PlanResult{ + CompletedAt: &now, + Message: fmt.Sprintf( + "Run %s timed out after %d polls (%s elapsed), last status: %s", + s.RunID, s.PollCount, time.Since(*s.FirstPolled).Round(time.Second), status.Status, + ), + }, nil + } + + stateJSON, err := json.Marshal(s) + if err != nil { + return nil, fmt.Errorf("marshal plan state: %w", err) + } + + return &types.PlanResult{ + State: stateJSON, + Message: fmt.Sprintf( + "Waiting for plan (poll %d, status: %s)", + s.PollCount, status.Status, + ), + }, nil +} + +func (p *TFCPlanner) completePlan( + ctx context.Context, + cfg *tfeConfig, + status *RunStatus, +) (*types.PlanResult, error) { + planJSON, err := p.runner.ReadPlanJSON(ctx, cfg, status.PlanID) + if err != nil { + return nil, fmt.Errorf("read plan JSON: %w", err) + } + + hasChanges := status.ResourceAdditions+status.ResourceChanges+status.ResourceDestructions > 0 + hash := sha256.Sum256(planJSON) + + now := time.Now() + return &types.PlanResult{ + CompletedAt: &now, + HasChanges: hasChanges, + ContentHash: hex.EncodeToString(hash[:]), + Current: "", + Proposed: string(planJSON), + Message: fmt.Sprintf( + "+%d ~%d -%d resources", + status.ResourceAdditions, status.ResourceChanges, status.ResourceDestructions, + ), + }, nil +} diff --git a/apps/workspace-engine/pkg/jobagents/terraformcloud/tfe_plan_client.go b/apps/workspace-engine/pkg/jobagents/terraformcloud/tfe_plan_client.go new file mode 100644 index 000000000..8a309155b --- /dev/null +++ b/apps/workspace-engine/pkg/jobagents/terraformcloud/tfe_plan_client.go @@ -0,0 +1,128 @@ +package terraformcloud + +import ( + "context" + "fmt" + + "github.com/hashicorp/go-tfe" + "workspace-engine/pkg/oapi" +) + +// GoWorkspaceSetup is the production implementation of WorkspaceSetup. +type GoWorkspaceSetup struct{} + +// Setup provisions the TFC workspace (upsert + variable sync) and returns its ID. +func (g *GoWorkspaceSetup) Setup( + ctx context.Context, + dispatchCtx *oapi.DispatchContext, +) (string, error) { + cfg, err := parseJobAgentConfig(dispatchCtx.JobAgentConfig) + if err != nil { + return "", err + } + + client, err := getClient(cfg.address, cfg.token) + if err != nil { + return "", fmt.Errorf("create tfe client: %w", err) + } + + workspace, err := templateWorkspace(dispatchCtx, cfg.template) + if err != nil { + return "", fmt.Errorf("template workspace: %w", err) + } + + targetWorkspace, err := upsertWorkspace(ctx, client, cfg.organization, workspace) + if err != nil { + return "", fmt.Errorf("upsert workspace: %w", err) + } + + if len(workspace.Variables) > 0 { + if err := syncVariables(ctx, client, targetWorkspace.ID, workspace.Variables); err != nil { + return "", fmt.Errorf("sync variables: %w", err) + } + } + + return targetWorkspace.ID, nil +} + +// GoSpeculativeRunner is the production implementation of SpeculativeRunner. +type GoSpeculativeRunner struct{} + +// CreateSpeculativeRun creates a plan-only run on the given workspace and returns the run ID. +func (g *GoSpeculativeRunner) CreateSpeculativeRun( + ctx context.Context, + cfg *tfeConfig, + workspaceID string, +) (string, error) { + client, err := getClient(cfg.address, cfg.token) + if err != nil { + return "", fmt.Errorf("create tfe client: %w", err) + } + + planOnly := true + message := "Speculative plan by ctrlplane" + run, err := client.Runs.Create(ctx, tfe.RunCreateOptions{ + Workspace: &tfe.Workspace{ID: workspaceID}, + PlanOnly: &planOnly, + Message: &message, + }) + if err != nil { + return "", fmt.Errorf("create speculative run: %w", err) + } + return run.ID, nil +} + +// ReadRunStatus reads the current status of a TFC run and maps it to a RunStatus. +func (g *GoSpeculativeRunner) ReadRunStatus( + ctx context.Context, + cfg *tfeConfig, + runID string, +) (*RunStatus, error) { + client, err := getClient(cfg.address, cfg.token) + if err != nil { + return nil, fmt.Errorf("create tfe client: %w", err) + } + + run, err := client.Runs.Read(ctx, runID) + if err != nil { + return nil, fmt.Errorf("read run: %w", err) + } + + status := &RunStatus{ + Status: string(run.Status), + } + + if run.Plan != nil { + status.PlanID = run.Plan.ID + status.ResourceAdditions = run.Plan.ResourceAdditions + status.ResourceChanges = run.Plan.ResourceChanges + status.ResourceDestructions = run.Plan.ResourceDestructions + } + + switch run.Status { + case tfe.RunPlannedAndFinished: + status.IsFinished = true + case tfe.RunErrored, tfe.RunCanceled, tfe.RunDiscarded, tfe.RunPolicySoftFailed: + status.IsErrored = true + } + + return status, nil +} + +// ReadPlanJSON fetches the JSON output of a completed plan. +func (g *GoSpeculativeRunner) ReadPlanJSON( + ctx context.Context, + cfg *tfeConfig, + planID string, +) ([]byte, error) { + client, err := getClient(cfg.address, cfg.token) + if err != nil { + return nil, fmt.Errorf("create tfe client: %w", err) + } + + data, err := client.Plans.ReadJSONOutput(ctx, planID) + if err != nil { + return nil, fmt.Errorf("read plan JSON output: %w", err) + } + return data, nil +} diff --git a/apps/workspace-engine/pkg/jobagents/terraformcloud/tfe_plan_test.go b/apps/workspace-engine/pkg/jobagents/terraformcloud/tfe_plan_test.go new file mode 100644 index 000000000..bf01065ee --- /dev/null +++ b/apps/workspace-engine/pkg/jobagents/terraformcloud/tfe_plan_test.go @@ -0,0 +1,345 @@ +package terraformcloud + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "workspace-engine/pkg/oapi" +) + +// --- mocks --- + +type mockWorkspaceSetup struct { + workspaceID string + err error +} + +func (m *mockWorkspaceSetup) Setup(_ context.Context, _ *oapi.DispatchContext) (string, error) { + return m.workspaceID, m.err +} + +type mockSpeculativeRunner struct { + createRunID string + createErr error + + readStatus *RunStatus + readErr error + + planJSON []byte + jsonErr error +} + +func (m *mockSpeculativeRunner) CreateSpeculativeRun( + _ context.Context, + _ *tfeConfig, + _ string, +) (string, error) { + return m.createRunID, m.createErr +} + +func (m *mockSpeculativeRunner) ReadRunStatus( + _ context.Context, + _ *tfeConfig, + _ string, +) (*RunStatus, error) { + return m.readStatus, m.readErr +} + +func (m *mockSpeculativeRunner) ReadPlanJSON( + _ context.Context, + _ *tfeConfig, + _ string, +) ([]byte, error) { + return m.planJSON, m.jsonErr +} + +// --- helpers --- + +func validPlanConfig() oapi.JobAgentConfig { + return oapi.JobAgentConfig{ + "address": "https://app.terraform.io", + "token": "test-token", + "organization": "my-org", + "template": "name: test-ws", + "webhookUrl": "https://example.com/webhook", + } +} + +func planDispatchCtx() *oapi.DispatchContext { + return &oapi.DispatchContext{ + JobAgentConfig: validPlanConfig(), + } +} + +// --- tests --- + +func TestTFCPlanner_Type(t *testing.T) { + p := NewTFCPlanner(&mockWorkspaceSetup{}, &mockSpeculativeRunner{}) + assert.Equal(t, "tfe", p.Type()) +} + +func TestPlan_BadConfig(t *testing.T) { + p := NewTFCPlanner(&mockWorkspaceSetup{}, &mockSpeculativeRunner{}) + dctx := &oapi.DispatchContext{ + JobAgentConfig: oapi.JobAgentConfig{}, + } + _, err := p.Plan(context.Background(), dctx, nil) + require.Error(t, err) +} + +func TestPlan_WorkspaceSetupFailure(t *testing.T) { + ws := &mockWorkspaceSetup{err: fmt.Errorf("workspace upsert failed")} + p := NewTFCPlanner(ws, &mockSpeculativeRunner{}) + + _, err := p.Plan(context.Background(), planDispatchCtx(), nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "setup workspace") +} + +func TestPlan_CreateRunFailure(t *testing.T) { + ws := &mockWorkspaceSetup{workspaceID: "ws-123"} + runner := &mockSpeculativeRunner{createErr: fmt.Errorf("tfc unavailable")} + p := NewTFCPlanner(ws, runner) + + _, err := p.Plan(context.Background(), planDispatchCtx(), nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "create speculative run") +} + +func TestPlan_CreateRun_ReturnsIncomplete(t *testing.T) { + ws := &mockWorkspaceSetup{workspaceID: "ws-123"} + runner := &mockSpeculativeRunner{createRunID: "run-abc123"} + p := NewTFCPlanner(ws, runner) + + result, err := p.Plan(context.Background(), planDispatchCtx(), nil) + require.NoError(t, err) + assert.Nil(t, result.CompletedAt) + assert.NotEmpty(t, result.State) + assert.Contains(t, result.Message, "run-abc123") + + var s tfePlanState + require.NoError(t, json.Unmarshal(result.State, &s)) + assert.Equal(t, "run-abc123", s.RunID) + assert.Equal(t, 0, s.PollCount) + assert.NotNil(t, s.FirstPolled) +} + +func TestPlan_PollStillRunning_Requeues(t *testing.T) { + runner := &mockSpeculativeRunner{ + readStatus: &RunStatus{Status: "planning"}, + } + p := NewTFCPlanner(&mockWorkspaceSetup{}, runner) + + now := time.Now() + state, _ := json.Marshal(tfePlanState{ + RunID: "run-abc123", + PollCount: 1, + FirstPolled: &now, + }) + + result, err := p.Plan(context.Background(), planDispatchCtx(), state) + require.NoError(t, err) + assert.Nil(t, result.CompletedAt) + assert.NotEmpty(t, result.State) + assert.Contains(t, result.Message, "Waiting for plan") + + var s tfePlanState + require.NoError(t, json.Unmarshal(result.State, &s)) + assert.Equal(t, 2, s.PollCount) +} + +func TestPlan_RunErrored_Completes(t *testing.T) { + runner := &mockSpeculativeRunner{ + readStatus: &RunStatus{Status: "errored", IsErrored: true}, + } + p := NewTFCPlanner(&mockWorkspaceSetup{}, runner) + + now := time.Now() + state, _ := json.Marshal(tfePlanState{ + RunID: "run-abc123", + PollCount: 3, + FirstPolled: &now, + }) + + result, err := p.Plan(context.Background(), planDispatchCtx(), state) + require.NoError(t, err) + require.NotNil(t, result.CompletedAt) + assert.Contains(t, result.Message, "errored") + assert.False(t, result.HasChanges) +} + +func TestPlan_RunCanceled_Completes(t *testing.T) { + runner := &mockSpeculativeRunner{ + readStatus: &RunStatus{Status: "canceled", IsErrored: true}, + } + p := NewTFCPlanner(&mockWorkspaceSetup{}, runner) + + now := time.Now() + state, _ := json.Marshal(tfePlanState{ + RunID: "run-abc123", + PollCount: 1, + FirstPolled: &now, + }) + + result, err := p.Plan(context.Background(), planDispatchCtx(), state) + require.NoError(t, err) + require.NotNil(t, result.CompletedAt) + assert.Contains(t, result.Message, "canceled") +} + +func TestPlan_Timeout_Completes(t *testing.T) { + runner := &mockSpeculativeRunner{ + readStatus: &RunStatus{Status: "planning"}, + } + p := NewTFCPlanner(&mockWorkspaceSetup{}, runner) + + expired := time.Now().Add(-planTimeout - time.Minute) + state, _ := json.Marshal(tfePlanState{ + RunID: "run-abc123", + PollCount: 50, + FirstPolled: &expired, + }) + + result, err := p.Plan(context.Background(), planDispatchCtx(), state) + require.NoError(t, err) + require.NotNil(t, result.CompletedAt) + assert.Contains(t, result.Message, "timed out") +} + +func TestPlan_PlannedAndFinished_NoChanges(t *testing.T) { + planJSON := []byte(`{"resource_changes":[]}`) + runner := &mockSpeculativeRunner{ + readStatus: &RunStatus{ + Status: "planned_and_finished", + IsFinished: true, + PlanID: "plan-123", + ResourceAdditions: 0, + ResourceChanges: 0, + ResourceDestructions: 0, + }, + planJSON: planJSON, + } + p := NewTFCPlanner(&mockWorkspaceSetup{}, runner) + + now := time.Now() + state, _ := json.Marshal(tfePlanState{ + RunID: "run-abc123", + PollCount: 2, + FirstPolled: &now, + }) + + result, err := p.Plan(context.Background(), planDispatchCtx(), state) + require.NoError(t, err) + require.NotNil(t, result.CompletedAt) + assert.False(t, result.HasChanges) + assert.NotEmpty(t, result.ContentHash) + assert.JSONEq(t, string(planJSON), result.Proposed) + assert.Contains(t, result.Message, "+0 ~0 -0") +} + +func TestPlan_PlannedAndFinished_WithChanges(t *testing.T) { + planJSON := []byte(`{"resource_changes":[{"type":"aws_instance"}]}`) + runner := &mockSpeculativeRunner{ + readStatus: &RunStatus{ + Status: "planned_and_finished", + IsFinished: true, + PlanID: "plan-456", + ResourceAdditions: 2, + ResourceChanges: 1, + ResourceDestructions: 0, + }, + planJSON: planJSON, + } + p := NewTFCPlanner(&mockWorkspaceSetup{}, runner) + + now := time.Now() + state, _ := json.Marshal(tfePlanState{ + RunID: "run-abc123", + PollCount: 3, + FirstPolled: &now, + }) + + result, err := p.Plan(context.Background(), planDispatchCtx(), state) + require.NoError(t, err) + require.NotNil(t, result.CompletedAt) + assert.True(t, result.HasChanges) + assert.NotEmpty(t, result.ContentHash) + assert.Contains(t, result.Message, "+2 ~1 -0") +} + +func TestPlan_ReadPlanJSONFailure(t *testing.T) { + runner := &mockSpeculativeRunner{ + readStatus: &RunStatus{ + Status: "planned_and_finished", + IsFinished: true, + PlanID: "plan-789", + }, + jsonErr: fmt.Errorf("plan output not available"), + } + p := NewTFCPlanner(&mockWorkspaceSetup{}, runner) + + now := time.Now() + state, _ := json.Marshal(tfePlanState{ + RunID: "run-abc123", + PollCount: 1, + FirstPolled: &now, + }) + + _, err := p.Plan(context.Background(), planDispatchCtx(), state) + require.Error(t, err) + assert.Contains(t, err.Error(), "read plan JSON") +} + +func TestPlan_ReadRunFailure(t *testing.T) { + runner := &mockSpeculativeRunner{ + readErr: fmt.Errorf("connection refused"), + } + p := NewTFCPlanner(&mockWorkspaceSetup{}, runner) + + now := time.Now() + state, _ := json.Marshal(tfePlanState{ + RunID: "run-abc123", + PollCount: 1, + FirstPolled: &now, + }) + + _, err := p.Plan(context.Background(), planDispatchCtx(), state) + require.Error(t, err) + assert.Contains(t, err.Error(), "read run") +} + +func TestPlan_ContentHashDeterministic(t *testing.T) { + planJSON := []byte(`{"resource_changes":[{"type":"aws_s3_bucket"}]}`) + runner := &mockSpeculativeRunner{ + readStatus: &RunStatus{ + Status: "planned_and_finished", + IsFinished: true, + PlanID: "plan-det", + ResourceAdditions: 1, + ResourceChanges: 0, + ResourceDestructions: 0, + }, + planJSON: planJSON, + } + p := NewTFCPlanner(&mockWorkspaceSetup{}, runner) + + now := time.Now() + state, _ := json.Marshal(tfePlanState{ + RunID: "run-abc123", + PollCount: 1, + FirstPolled: &now, + }) + + r1, err := p.Plan(context.Background(), planDispatchCtx(), state) + require.NoError(t, err) + + r2, err := p.Plan(context.Background(), planDispatchCtx(), state) + require.NoError(t, err) + + assert.Equal(t, r1.ContentHash, r2.ContentHash) +} diff --git a/apps/workspace-engine/svc/controllers/deploymentplanresult/getters_postgres.go b/apps/workspace-engine/svc/controllers/deploymentplanresult/getters_postgres.go index 8938c44a1..99c8ef110 100644 --- a/apps/workspace-engine/svc/controllers/deploymentplanresult/getters_postgres.go +++ b/apps/workspace-engine/svc/controllers/deploymentplanresult/getters_postgres.go @@ -7,6 +7,7 @@ import ( "workspace-engine/pkg/db" "workspace-engine/pkg/jobagents" "workspace-engine/pkg/jobagents/argo" + "workspace-engine/pkg/jobagents/terraformcloud" "workspace-engine/pkg/jobagents/testrunner" ) @@ -29,5 +30,11 @@ func newRegistry() *jobagents.Registry { ), ) registry.Register(testrunner.New(nil)) + registry.Register( + terraformcloud.NewTFCPlanner( + &terraformcloud.GoWorkspaceSetup{}, + &terraformcloud.GoSpeculativeRunner{}, + ), + ) return registry }