From 54da8f0326cec2dda15af95f80e3edb1a6bbf578 Mon Sep 17 00:00:00 2001 From: wlggraham Date: Fri, 27 Feb 2026 15:39:13 -0700 Subject: [PATCH] reuse existing task definitions --- internal/pkg/object/command/ecs/ecs.go | 123 +++++++++++++++++++------ 1 file changed, 94 insertions(+), 29 deletions(-) diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index eec206a..30eaa8e 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -27,6 +27,9 @@ import ( type commandContext struct { TaskDefinitionTemplate string `yaml:"task_definition_template,omitempty" json:"task_definition_template,omitempty"` TaskCount int `yaml:"task_count,omitempty" json:"task_count,omitempty"` + TaskFamily *string `yaml:"task_family,omitempty" json:"task_family,omitempty"` + TaskRevision *int `yaml:"task_revision,omitempty" json:"task_revision,omitempty"` + UseLatestRevision bool `yaml:"use_latest_revision,omitempty" json:"use_latest_revision,omitempty"` CPU int `yaml:"cpu,omitempty" json:"cpu,omitempty"` Memory int `yaml:"memory,omitempty" json:"memory,omitempty"` ContainerOverrides []types.ContainerOverride `yaml:"container_overrides,omitempty" json:"container_overrides,omitempty"` @@ -83,6 +86,9 @@ type FailureReason string // executionContext holds the final resolved configuration for job execution. type executionContext struct { TaskCount int `json:"task_count"` + TaskFamily *string `json:"task_family,omitempty"` + TaskRevision *int `json:"task_revision,omitempty"` + UseLatestRevision bool `json:"use_latest_revision,omitempty"` CPU int `json:"cpu"` Memory int `json:"memory"` TaskDefinitionWrapper *taskDefinitionWrapper `json:"task_definition_wrapper"` @@ -152,8 +158,8 @@ func (e *commandContext) Execute(ctx context.Context, r *plugin.Runtime, job *jo return err } - // register task definition - if err := execCtx.registerTaskDefinition(ctx); err != nil { + // select or register task definition + if err := execCtx.prepareTaskDefinition(ctx); err != nil { return err } @@ -182,8 +188,39 @@ func (e *commandContext) Execute(ctx context.Context, r *plugin.Runtime, job *jo } -// prepare and register task definition with ECS -func (execCtx *executionContext) registerTaskDefinition(ctx context.Context) error { +// select or register task definition +func (execCtx *executionContext) prepareTaskDefinition(ctx context.Context) error { + + // Optionally override task family from config + if execCtx.TaskFamily != nil { + execCtx.TaskDefinitionWrapper.TaskDefinition.Family = aws.String(strings.TrimSpace(*execCtx.TaskFamily)) + } + + family := aws.ToString(execCtx.TaskDefinitionWrapper.TaskDefinition.Family) + + // If configured to reuse an existing task definition, do that first. + if execCtx.UseLatestRevision { + taskDefListOutput, err := execCtx.ecsClient.ListTaskDefinitions(ctx, &ecs.ListTaskDefinitionsInput{ + FamilyPrefix: aws.String(family), + Status: types.TaskDefinitionStatusActive, + Sort: types.SortOrderDesc, + MaxResults: aws.Int32(1), + }) + + if err != nil { + return err + } + + if len(taskDefListOutput.TaskDefinitionArns) > 0 { + return execCtx.adoptTaskDefinition(ctx, taskDefListOutput.TaskDefinitionArns[0]) + } + } + + if execCtx.TaskRevision != nil { + return execCtx.adoptTaskDefinition(ctx, fmt.Sprintf("%s:%d", family, *execCtx.TaskRevision)) + } + + // if you are here, you need to register a new task definition registerInput := &ecs.RegisterTaskDefinitionInput{ Family: aws.String(aws.ToString(execCtx.TaskDefinitionWrapper.TaskDefinition.Family)), RequiresCompatibilities: []types.Compatibility{types.CompatibilityFargate}, @@ -206,6 +243,52 @@ func (execCtx *executionContext) registerTaskDefinition(ctx context.Context) err } +func (execCtx *executionContext) adoptTaskDefinition(ctx context.Context, taskDefinitionArn string) error { + + taskDefOutput, err := execCtx.ecsClient.DescribeTaskDefinition(ctx, &ecs.DescribeTaskDefinitionInput{ + TaskDefinition: aws.String(taskDefinitionArn), + }) + if err != nil || taskDefOutput.TaskDefinition == nil { + return fmt.Errorf("failed to describe task definition: %w", err) + } + + execCtx.taskDefARN = taskDefOutput.TaskDefinition.TaskDefinitionArn + execCtx.TaskDefinitionWrapper = newTaskDefinitionWrapper(taskDefOutput.TaskDefinition) + + return nil + +} + +func newTaskDefinitionWrapper(taskDef *types.TaskDefinition) *taskDefinitionWrapper { + + // Pre-compute essential containers map + essentialContainers := make(map[string]bool) + for _, containerDef := range taskDef.ContainerDefinitions { + if containerDef.Essential != nil && *containerDef.Essential { + essentialContainers[aws.ToString(containerDef.Name)] = true + } + } + + // Pre-compute containers with log configurations + var logGroups []containerLogInfo + for _, containerDef := range taskDef.ContainerDefinitions { + if containerDef.LogConfiguration != nil { + logGroups = append(logGroups, containerLogInfo{ + containerName: aws.ToString(containerDef.Name), + logDriver: containerDef.LogConfiguration.LogDriver, + options: containerDef.LogConfiguration.Options, + }) + } + } + + return &taskDefinitionWrapper{ + TaskDefinition: taskDef, + EssentialContainers: essentialContainers, + LogGroups: logGroups, + } + +} + // startTasks launches all tasks and returns a map of task trackers func (execCtx *executionContext) startTasks(ctx context.Context, jobID string) error { @@ -224,6 +307,7 @@ func (execCtx *executionContext) startTasks(ctx context.Context, jobID string) e } return nil + } // monitor tasks until completion, faliure, or timeout @@ -424,6 +508,11 @@ func validateExecutionContext(ctx *executionContext) error { return fmt.Errorf("memory (%d) needs to be greater than 0 and less than or equal to cluster max memory (%d)", ctx.Memory, ctx.ClusterConfig.MaxMemory) } + // Task definition selection business logic. + if ctx.UseLatestRevision && ctx.TaskRevision != nil { + return fmt.Errorf("use_latest_revision and task_revision cannot both be set") + } + return nil } @@ -513,31 +602,7 @@ func loadTaskDefinitionTemplate(templatePath string) (*taskDefinitionWrapper, er return nil, err } - // Pre-compute essential containers map - essentialContainers := make(map[string]bool) - for _, containerDef := range taskDef.ContainerDefinitions { - if containerDef.Essential != nil && *containerDef.Essential { - essentialContainers[aws.ToString(containerDef.Name)] = true - } - } - - // Pre-compute containers with log configurations - var logGroups []containerLogInfo - for _, containerDef := range taskDef.ContainerDefinitions { - if containerDef.LogConfiguration != nil { - logGroups = append(logGroups, containerLogInfo{ - containerName: aws.ToString(containerDef.Name), - logDriver: containerDef.LogConfiguration.LogDriver, - options: containerDef.LogConfiguration.Options, - }) - } - } - - return &taskDefinitionWrapper{ - TaskDefinition: &taskDef, - EssentialContainers: essentialContainers, - LogGroups: logGroups, - }, nil + return newTaskDefinitionWrapper(&taskDef), nil }