Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 53 additions & 29 deletions pkg/fleet/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Daemon struct {
revisionsEnabled bool
mu sync.RWMutex
configs map[string]installerConfig // keyed by config ID; replaced on each RC update
experimentTarget types.NamespacedName // DDA targeted by the current experiment; set on startExperiment
taskMu sync.Mutex // serializes UPDATER_TASK execution
}

// NewDaemon creates a new Fleet Daemon. When revisionsEnabled is false, experiment
Expand All @@ -73,19 +73,7 @@ func (d *Daemon) Start(ctx context.Context) error {
return d.handleConfigs(ctx, configs)
}))
d.rcClient.Subscribe(state.ProductUpdaterTask, handleUpdaterTaskUpdate(ctx, func(req remoteAPIRequest) error {
d.setTaskState(req.Package, req.ID, pbgo.TaskState_RUNNING, nil)
err := d.handleRemoteAPIRequest(ctx, req)
if err != nil {
var stateErr *stateDoesntMatchError
if errors.As(err, &stateErr) {
d.setTaskState(req.Package, req.ID, pbgo.TaskState_INVALID_STATE, err)
} else {
d.setTaskState(req.Package, req.ID, pbgo.TaskState_ERROR, err)
}
} else {
d.setTaskState(req.Package, req.ID, pbgo.TaskState_DONE, nil)
}
return err
return d.handleTask(ctx, req)
}))

<-ctx.Done()
Expand All @@ -99,6 +87,26 @@ func (d *Daemon) NeedLeaderElection() bool {
return true
}

// handleTask serializes UPDATER_TASK execution so at most one task runs at a time,
// matching the datadog-agent's single-writer model and preventing races in setTaskState.
func (d *Daemon) handleTask(ctx context.Context, req remoteAPIRequest) error {
d.taskMu.Lock()
defer d.taskMu.Unlock()
d.setTaskState(req.Package, req.ID, pbgo.TaskState_RUNNING, nil)
err := d.handleRemoteAPIRequest(ctx, req)
if err != nil {
var stateErr *stateDoesntMatchError
if errors.As(err, &stateErr) {
d.setTaskState(req.Package, req.ID, pbgo.TaskState_INVALID_STATE, err)
} else {
d.setTaskState(req.Package, req.ID, pbgo.TaskState_ERROR, err)
}
} else {
d.setTaskState(req.Package, req.ID, pbgo.TaskState_DONE, nil)
}
return err
}

// handleConfigs replaces the stored installer configs with the latest RC update.
// Configs are indexed by their ID so they can be retrieved by task handlers.
func (d *Daemon) handleConfigs(ctx context.Context, configs map[string]installerConfig) error {
Expand Down Expand Up @@ -172,13 +180,12 @@ func (d *Daemon) handleRemoteAPIRequest(ctx context.Context, req remoteAPIReques

// resolveOperation looks up the installer config for the request, validates its single
// DatadogAgent operation, and fills in the canonical GVK. Returns the operation ready for use.
// The target namespaced name is carried by the UPDATER_TASK itself (not the INSTALLER_CONFIG).
func (d *Daemon) resolveOperation(req remoteAPIRequest, signal string) (fleetManagementOperation, error) {
// get params version from req
id := req.Params.Version
if id == "" {
return fleetManagementOperation{}, fmt.Errorf("%s: version is required", signal)
}
// match to d.configs[params.version] to get config
cfg, err := d.getConfig(id)
if err != nil {
return fleetManagementOperation{}, fmt.Errorf("%s: %w", signal, err)
Expand All @@ -204,16 +211,17 @@ func (d *Daemon) resolveOperation(req remoteAPIRequest, signal string) (fleetMan
func (d *Daemon) startDatadogAgentExperiment(ctx context.Context, req remoteAPIRequest) error {
logger := ctrl.LoggerFrom(ctx).WithValues("id", req.ID)
logger.V(1).Info("Starting DatadogAgent experiment", "config", req.Params.Version)
nsn, err := parseTaskNSN(req, "start DatadogAgent experiment")
if err != nil {
return err
}
op, err := d.resolveOperation(req, "start DatadogAgent experiment")
if err != nil {
logger.Error(err, "Failed to resolve operation")
return err
}

// Store the target DDA for promote/stop signals (which don't carry a config).
d.experimentTarget = op.NamespacedName

logger = logger.WithValues("namespace", op.NamespacedName.Namespace, "name", op.NamespacedName.Name)
logger = logger.WithValues("namespace", nsn.Namespace, "name", nsn.Name)
ctx = ctrl.LoggerInto(ctx, logger)

// Check the operation
Expand All @@ -223,8 +231,8 @@ func (d *Daemon) startDatadogAgentExperiment(ctx context.Context, req remoteAPIR

// Fetch current DDA to check signal preconditions.
dda := &v2alpha1.DatadogAgent{}
if err := d.client.Get(ctx, op.NamespacedName, dda); err != nil {
return fmt.Errorf("start DatadogAgent experiment: failed to get DatadogAgent %s: %w", op.NamespacedName, err)
if err := d.client.Get(ctx, nsn, dda); err != nil {
return fmt.Errorf("start DatadogAgent experiment: failed to get DatadogAgent %s: %w", nsn, err)
}

if err := canStart(getExperimentPhase(dda)); err != nil {
Expand Down Expand Up @@ -253,7 +261,7 @@ func (d *Daemon) startDatadogAgentExperiment(ctx context.Context, req remoteAPIR
// Update status: phase=running, record experiment ID.
// Re-fetch inside the retry to get the latest ResourceVersion on conflict.
if err := retryWithBackoff(ctx, func() error {
if err := d.client.Get(ctx, op.NamespacedName, dda); err != nil {
if err := d.client.Get(ctx, nsn, dda); err != nil {
return err
}
dda.Status.Experiment = &v2alpha1.ExperimentStatus{
Expand All @@ -274,9 +282,9 @@ func (d *Daemon) startDatadogAgentExperiment(ctx context.Context, req remoteAPIR
}

func (d *Daemon) stopDatadogAgentExperiment(ctx context.Context, req remoteAPIRequest) error {
nsn := d.experimentTarget
if nsn.Name == "" {
return fmt.Errorf("stop DatadogAgent experiment: no experiment target set")
nsn, err := parseTaskNSN(req, "stop DatadogAgent experiment")
if err != nil {
return err
}

ctx = ctrl.LoggerInto(ctx, ctrl.LoggerFrom(ctx).WithValues("id", req.ID, "namespace", nsn.Namespace, "name", nsn.Name))
Expand All @@ -291,6 +299,10 @@ func (d *Daemon) stopDatadogAgentExperiment(ctx context.Context, req remoteAPIRe
if isNoOp, err := canStop(ctx, getExperimentPhase(dda)); err != nil {
return fmt.Errorf("stop DatadogAgent experiment: %w", err)
} else if isNoOp {
// The spec rollback already happened (e.g. timeout), but the experiment
// config version still needs to be cleared so the backend can issue new tasks.
stable, _ := d.getPackageConfigVersions(req.Package)
d.setPackageConfigVersions(req.Package, stable, "")
return nil
}

Expand All @@ -317,9 +329,9 @@ func (d *Daemon) stopDatadogAgentExperiment(ctx context.Context, req remoteAPIRe
}

func (d *Daemon) promoteDatadogAgentExperiment(ctx context.Context, req remoteAPIRequest) error {
nsn := d.experimentTarget
if nsn.Name == "" {
return fmt.Errorf("promote DatadogAgent experiment: no experiment target set")
nsn, err := parseTaskNSN(req, "promote DatadogAgent experiment")
if err != nil {
return err
}

ctx = ctrl.LoggerInto(ctx, ctrl.LoggerFrom(ctx).WithValues("id", req.ID, "namespace", nsn.Namespace, "name", nsn.Name))
Expand All @@ -340,6 +352,18 @@ func (d *Daemon) promoteDatadogAgentExperiment(ctx context.Context, req remoteAP
if isNoOp, err := canPromote(ctx, getExperimentPhase(dda)); err != nil {
return fmt.Errorf("promote DatadogAgent experiment: %w", err)
} else if isNoOp {
stable, exp := d.getPackageConfigVersions(req.Package)
if getExperimentPhase(dda) == v2alpha1.ExperimentPhasePromoted {
// The experiment was already promoted (e.g. daemon crashed between the
// status update and setPackageConfigVersions). Finish the promotion by
// moving the experiment version to stable so the backend gets the correct
// baseline for future expected_state checks.
d.setPackageConfigVersions(req.Package, exp, "")
} else {
// The experiment was rolled back (stopped/rollback/timeout). Just clear
// the experiment version so the backend can issue new tasks.
d.setPackageConfigVersions(req.Package, stable, "")
}
return nil
}

Expand Down
Loading
Loading