Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 94 additions & 29 deletions internal/pkg/object/command/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
type commandContext struct {
TaskDefinitionTemplate string `yaml:"task_definition_template,omitempty" json:"task_definition_template,omitempty"`
TaskCount int `yaml:"task_count,omitempty" json:"task_count,omitempty"`
TaskFamily *string `yaml:"task_family,omitempty" json:"task_family,omitempty"`
TaskRevision *int `yaml:"task_revision,omitempty" json:"task_revision,omitempty"`
UseLatestRevision bool `yaml:"use_latest_revision,omitempty" json:"use_latest_revision,omitempty"`
CPU int `yaml:"cpu,omitempty" json:"cpu,omitempty"`
Memory int `yaml:"memory,omitempty" json:"memory,omitempty"`
ContainerOverrides []types.ContainerOverride `yaml:"container_overrides,omitempty" json:"container_overrides,omitempty"`
Expand Down Expand Up @@ -83,6 +86,9 @@ type FailureReason string
// executionContext holds the final resolved configuration for job execution.
type executionContext struct {
TaskCount int `json:"task_count"`
TaskFamily *string `json:"task_family,omitempty"`
TaskRevision *int `json:"task_revision,omitempty"`
UseLatestRevision bool `json:"use_latest_revision,omitempty"`
CPU int `json:"cpu"`
Memory int `json:"memory"`
TaskDefinitionWrapper *taskDefinitionWrapper `json:"task_definition_wrapper"`
Expand Down Expand Up @@ -152,8 +158,8 @@ func (e *commandContext) Execute(ctx context.Context, r *plugin.Runtime, job *jo
return err
}

// register task definition
if err := execCtx.registerTaskDefinition(ctx); err != nil {
// select or register task definition
if err := execCtx.prepareTaskDefinition(ctx); err != nil {
return err
}

Expand Down Expand Up @@ -182,8 +188,39 @@ func (e *commandContext) Execute(ctx context.Context, r *plugin.Runtime, job *jo

}

// prepare and register task definition with ECS
func (execCtx *executionContext) registerTaskDefinition(ctx context.Context) error {
// select or register task definition
func (execCtx *executionContext) prepareTaskDefinition(ctx context.Context) error {

// Optionally override task family from config
if execCtx.TaskFamily != nil {
execCtx.TaskDefinitionWrapper.TaskDefinition.Family = aws.String(strings.TrimSpace(*execCtx.TaskFamily))
}

family := aws.ToString(execCtx.TaskDefinitionWrapper.TaskDefinition.Family)

// If configured to reuse an existing task definition, do that first.
if execCtx.UseLatestRevision {
taskDefListOutput, err := execCtx.ecsClient.ListTaskDefinitions(ctx, &ecs.ListTaskDefinitionsInput{
FamilyPrefix: aws.String(family),
Status: types.TaskDefinitionStatusActive,
Sort: types.SortOrderDesc,
MaxResults: aws.Int32(1),
})

if err != nil {
return err
}

if len(taskDefListOutput.TaskDefinitionArns) > 0 {
return execCtx.adoptTaskDefinition(ctx, taskDefListOutput.TaskDefinitionArns[0])
}
}

if execCtx.TaskRevision != nil {
return execCtx.adoptTaskDefinition(ctx, fmt.Sprintf("%s:%d", family, *execCtx.TaskRevision))
}

// if you are here, you need to register a new task definition
registerInput := &ecs.RegisterTaskDefinitionInput{
Family: aws.String(aws.ToString(execCtx.TaskDefinitionWrapper.TaskDefinition.Family)),
RequiresCompatibilities: []types.Compatibility{types.CompatibilityFargate},
Expand All @@ -206,6 +243,52 @@ func (execCtx *executionContext) registerTaskDefinition(ctx context.Context) err

}

func (execCtx *executionContext) adoptTaskDefinition(ctx context.Context, taskDefinitionArn string) error {

taskDefOutput, err := execCtx.ecsClient.DescribeTaskDefinition(ctx, &ecs.DescribeTaskDefinitionInput{
TaskDefinition: aws.String(taskDefinitionArn),
})
if err != nil || taskDefOutput.TaskDefinition == nil {
return fmt.Errorf("failed to describe task definition: %w", err)
}

execCtx.taskDefARN = taskDefOutput.TaskDefinition.TaskDefinitionArn
execCtx.TaskDefinitionWrapper = newTaskDefinitionWrapper(taskDefOutput.TaskDefinition)

return nil

}

func newTaskDefinitionWrapper(taskDef *types.TaskDefinition) *taskDefinitionWrapper {

// Pre-compute essential containers map
essentialContainers := make(map[string]bool)
for _, containerDef := range taskDef.ContainerDefinitions {
if containerDef.Essential != nil && *containerDef.Essential {
essentialContainers[aws.ToString(containerDef.Name)] = true
}
}

// Pre-compute containers with log configurations
var logGroups []containerLogInfo
for _, containerDef := range taskDef.ContainerDefinitions {
if containerDef.LogConfiguration != nil {
logGroups = append(logGroups, containerLogInfo{
containerName: aws.ToString(containerDef.Name),
logDriver: containerDef.LogConfiguration.LogDriver,
options: containerDef.LogConfiguration.Options,
})
}
}

return &taskDefinitionWrapper{
TaskDefinition: taskDef,
EssentialContainers: essentialContainers,
LogGroups: logGroups,
}

}

// startTasks launches all tasks and returns a map of task trackers
func (execCtx *executionContext) startTasks(ctx context.Context, jobID string) error {

Expand All @@ -224,6 +307,7 @@ func (execCtx *executionContext) startTasks(ctx context.Context, jobID string) e
}

return nil

}

// monitor tasks until completion, faliure, or timeout
Expand Down Expand Up @@ -424,6 +508,11 @@ func validateExecutionContext(ctx *executionContext) error {
return fmt.Errorf("memory (%d) needs to be greater than 0 and less than or equal to cluster max memory (%d)", ctx.Memory, ctx.ClusterConfig.MaxMemory)
}

// Task definition selection business logic.
if ctx.UseLatestRevision && ctx.TaskRevision != nil {
return fmt.Errorf("use_latest_revision and task_revision cannot both be set")
}

return nil

}
Expand Down Expand Up @@ -513,31 +602,7 @@ func loadTaskDefinitionTemplate(templatePath string) (*taskDefinitionWrapper, er
return nil, err
}

// Pre-compute essential containers map
essentialContainers := make(map[string]bool)
for _, containerDef := range taskDef.ContainerDefinitions {
if containerDef.Essential != nil && *containerDef.Essential {
essentialContainers[aws.ToString(containerDef.Name)] = true
}
}

// Pre-compute containers with log configurations
var logGroups []containerLogInfo
for _, containerDef := range taskDef.ContainerDefinitions {
if containerDef.LogConfiguration != nil {
logGroups = append(logGroups, containerLogInfo{
containerName: aws.ToString(containerDef.Name),
logDriver: containerDef.LogConfiguration.LogDriver,
options: containerDef.LogConfiguration.Options,
})
}
}

return &taskDefinitionWrapper{
TaskDefinition: &taskDef,
EssentialContainers: essentialContainers,
LogGroups: logGroups,
}, nil
return newTaskDefinitionWrapper(&taskDef), nil

}

Expand Down