Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ out.html
specs/*.md
generate-openapi
.todos/
ginkgo*.json
108 changes: 108 additions & 0 deletions changegroup/closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package changegroup

import (
"encoding/json"
"time"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/types"
)

// CloseStaleGroups closes open change_groups whose last_member_at is older
// than the effective close window for the group's rule. Called periodically
// by StartCloser.
//
// For TemporaryPermissionGroup specifically, this also computes
// DurationSeconds from started_at/ended_at at close time.
func (e *Engine) CloseStaleGroups(ctx context.Context, now time.Time) (int, error) {
// Build the rule → CloseAfter lookup snapshot.
e.mu.RLock()
closeAfterByRule := make(map[string]time.Duration, len(e.rules))
for _, r := range e.rules {
ca := r.CloseAfter.Std()
if ca == 0 {
// 0 means "never time out" — skip timeout-close for this rule.
continue
}
closeAfterByRule[r.Name] = ca
}
e.mu.RUnlock()

if len(closeAfterByRule) == 0 {
return 0, nil
}

var candidates []models.ChangeGroup
if err := ctx.DB().
Where("status = ?", models.ChangeGroupStatusOpen).
Find(&candidates).Error; err != nil {
return 0, err
}

closed := 0
for i := range candidates {
g := &candidates[i]
if g.RuleName == nil {
continue
}
window, ok := closeAfterByRule[*g.RuleName]
if !ok {
continue
}
if now.Sub(g.LastMemberAt) < window {
continue
}
if err := finalizeClose(ctx, g, g.LastMemberAt); err != nil {
return closed, err
}
closed++
}
return closed, nil
}

// finalizeClose writes the terminal state for a group: status=closed,
// ended_at set, and — for TemporaryPermissionGroup — DurationSeconds computed.
func finalizeClose(ctx context.Context, g *models.ChangeGroup, endedAt time.Time) error {
updates := map[string]any{
"status": models.ChangeGroupStatusClosed,
"ended_at": endedAt,
"updated_at": time.Now().UTC(),
}

stored, err := g.TypedDetails()
if err == nil && stored != nil {
if tp, ok := stored.(types.TemporaryPermissionGroup); ok {
dur := int64(endedAt.Sub(g.StartedAt).Seconds())
tp.DurationSeconds = &dur
raw, err := json.Marshal(tp)
if err == nil {
updates["details"] = types.JSON(raw)
}
}
}

return ctx.DB().Model(&models.ChangeGroup{}).
Where("id = ?", g.ID).
Updates(updates).Error
}

// StartCloser runs CloseStaleGroups on a ticker until ctx is cancelled.
// Intended to be spawned once per process by the job scheduler.
func (e *Engine) StartCloser(ctx context.Context, interval time.Duration) {
if interval <= 0 {
interval = 30 * time.Second
}
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case now := <-ticker.C:
_, _ = e.CloseStaleGroups(ctx, now)
}
}
}()
}
175 changes: 175 additions & 0 deletions changegroup/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package changegroup

import (
"sort"
"sync"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
)

// Engine evaluates GroupingRules against incoming config_changes and keeps
// change_groups in sync. It is safe for concurrent use once rules are loaded.
type Engine struct {
mu sync.RWMutex
rules []*GroupingRule
evaluator Evaluator
}

// New returns an Engine pre-loaded with the given rules and evaluator.
// The caller is responsible for calling Validate on each rule before passing
// it in — New re-validates and returns an error if any rule fails.
func New(evaluator Evaluator, rules []GroupingRule) (*Engine, error) {
e := &Engine{evaluator: evaluator}
if err := e.SetRules(rules); err != nil {
return nil, err
}
return e, nil
}

// SetRules replaces the engine's rule set atomically. All rules are
// (re-)validated; if any rule fails, the previous rule set is preserved and
// the error is returned.
func (e *Engine) SetRules(rules []GroupingRule) error {
if e.evaluator == nil {
// Allow rule loading with no evaluator only if every rule has empty
// CEL expressions (currently never true). Require evaluator.
return ErrMissingEvaluator
}

compiled := make([]*GroupingRule, 0, len(rules))
for i := range rules {
r := rules[i]
if err := r.Validate(e.evaluator); err != nil {
return err
}
compiled = append(compiled, &r)
}
sort.SliceStable(compiled, func(i, j int) bool {
return compiled[i].Priority > compiled[j].Priority
})

e.mu.Lock()
e.rules = compiled
e.mu.Unlock()
return nil
}

// Rules returns a snapshot of the currently loaded rules for inspection/tests.
func (e *Engine) Rules() []*GroupingRule {
e.mu.RLock()
defer e.mu.RUnlock()
out := make([]*GroupingRule, len(e.rules))
copy(out, e.rules)
return out
}

// Evaluate runs the rule engine against a single already-persisted
// config_changes row. Matching rules will create or update a change_group and
// set change.GroupID. If the change already has a GroupID, Evaluate is a no-op
// (producers are trusted).
func (e *Engine) Evaluate(ctx context.Context, change *models.ConfigChange) error {
if change == nil {
return nil
}
if change.GroupID != nil {
return nil // explicit path — respect producer assignment
}

e.mu.RLock()
rules := e.rules
e.mu.RUnlock()

for _, rule := range rules {
matched, err := e.tryRule(ctx, rule, change)
if err != nil {
return err
}
if matched {
return nil
}
}
return nil
}

// tryRule attempts to apply a single rule to the change. Returns (true, nil)
// on a successful attach, (false, nil) on a non-match.
func (e *Engine) tryRule(ctx context.Context, rule *GroupingRule, change *models.ConfigChange) (bool, error) {
if !rule.Matches(change.ChangeType) {
return false, nil
}

env := buildSingleChangeEnv(change)

if rule.filterProgram != nil {
ok, err := e.evaluator.EvalBool(rule.filterProgram, env)
if err != nil {
return false, &EvalError{Rule: rule.Name, Field: "filter", Err: err}
}
if !ok {
return false, nil
}
}

rawKey, err := e.evaluator.EvalString(rule.keyProgram, env)
if err != nil {
return false, &EvalError{Rule: rule.Name, Field: "key", Err: err}
}
if rawKey == "" {
return false, nil
}
correlationKey := hashKey(rule.Name, rawKey)

if err := e.upsertAndAttach(ctx, rule, correlationKey, change); err != nil {
return false, err
}
return true, nil
}

// buildSingleChangeEnv creates an Env whose Changes contains only the
// triggering change (plus the flat shortcuts). Used on the first call; the
// upsert path rebuilds the env with all persisted members before re-running
// Details / Summary.
func buildSingleChangeEnv(change *models.ConfigChange) Env {
m := changeAsMap(change)
return Env{
Change: m,
Changes: []map[string]any{m},
Flat: m,
}
}

// changeAsMap projects a ConfigChange into the CEL binding shape.
func changeAsMap(c *models.ConfigChange) map[string]any {
var groupID any
if c.GroupID != nil {
groupID = c.GroupID.String()
}
return map[string]any{
"id": c.ID,
"external_id": c.ExternalID,
"external_change_id": derefString(c.ExternalChangeID),
"config_id": c.ConfigID,
"change_type": c.ChangeType,
"severity": string(c.Severity),
"source": c.Source,
"summary": c.Summary,
"patches": c.Patches,
"diff": c.Diff,
"fingerprint": c.Fingerprint,
"details": c.Details,
"created_at": c.CreatedAt,
"created_by": c.CreatedBy,
"external_created_by": derefString(c.ExternalCreatedBy),
"count": c.Count,
"group_id": groupID,
}
}

func derefString(s *string) string {
if s == nil {
return ""
}
return *s
}

41 changes: 41 additions & 0 deletions changegroup/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package changegroup

import (
"errors"
"fmt"
)

var (
ErrEmptyRuleName = errors.New("changegroup: rule name is required")
ErrMissingDetails = errors.New("changegroup: rule details expression is required")
ErrMissingKey = errors.New("changegroup: rule key expression is required")
ErrUnknownPseudo = errors.New("changegroup: unknown pseudo change type")
ErrMissingEvaluator = errors.New("changegroup: evaluator must be set before rules are loaded")
)

// EvalError wraps an evaluator runtime failure with the originating rule and field.
type EvalError struct {
Rule string
Field string
Err error
}

func (e *EvalError) Error() string {
return fmt.Sprintf("changegroup: eval rule %q field %q: %v", e.Rule, e.Field, e.Err)
}

func (e *EvalError) Unwrap() error { return e.Err }

// CompileError wraps an evaluator compile failure with the originating rule
// and field so operators can pinpoint their YAML.
type CompileError struct {
Rule string
Field string
Err error
}

func (e *CompileError) Error() string {
return fmt.Sprintf("changegroup: compile rule %q field %q: %v", e.Rule, e.Field, e.Err)
}

func (e *CompileError) Unwrap() error { return e.Err }
Loading
Loading