From 14bab606ee933e2df9a385fae4932967760913ea Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Tue, 12 May 2026 16:09:31 +0200 Subject: [PATCH] Don't persist `events,incident_events` to DB --- internal/event/event.go | 68 +++++++++------------------------- internal/incident/db_types.go | 12 ------ internal/incident/incident.go | 29 +-------------- internal/incident/incidents.go | 30 ++------------- internal/incident/sync.go | 13 +------ internal/listener/listener.go | 2 + schema/mysql/schema.sql | 28 -------------- schema/mysql/upgrades/1.0.sql | 3 ++ schema/pgsql/schema.sql | 40 -------------------- schema/pgsql/upgrades/1.0.sql | 4 ++ 10 files changed, 32 insertions(+), 197 deletions(-) create mode 100644 schema/mysql/upgrades/1.0.sql create mode 100644 schema/pgsql/upgrades/1.0.sql diff --git a/internal/event/event.go b/internal/event/event.go index 1b517de7..90010b45 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -1,13 +1,11 @@ package event import ( - "context" "errors" "fmt" - "github.com/icinga/icinga-go-library/database" baseEv "github.com/icinga/icinga-go-library/notifications/event" "github.com/icinga/icinga-go-library/types" - "github.com/jmoiron/sqlx" + "go.uber.org/zap/zapcore" "net/url" "strings" "time" @@ -29,7 +27,6 @@ var ErrSuperfluousMuteUnmuteEvent = errors.New("ignoring superfluous (un)mute ev type Event struct { Time time.Time `json:"-"` SourceId int64 `json:"-"` - ID int64 `json:"-"` baseEv.Event `json:",inline"` } @@ -101,52 +98,21 @@ func (e *Event) SetMute(muted bool, reason string) { e.MuteReason = reason } -func (e *Event) String() string { - return fmt.Sprintf("[time=%s type=%q severity=%s]", e.Time, e.Type, e.Severity.String()) -} - -// Sync transforms this event to *event.EventRow and synchronises with the database. -func (e *Event) Sync(ctx context.Context, tx *sqlx.Tx, db *database.DB, objectId types.Binary) error { - if e.ID != 0 { +func (e *Event) String() string { return e.Name } + +// MarshalLogObject implements the [zapcore.ObjectMarshaler] interface to allow logging the event as a structured object. +func (e *Event) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("name", e.Name) + encoder.AddTime("time", e.Time) + encoder.AddInt64("source_id", e.SourceId) + encoder.AddString("type", e.Type.String()) + encoder.AddString("severity", e.Severity.String()) + encoder.AddString("username", e.Username) + _ = encoder.AddObject("tags", zapcore.ObjectMarshalerFunc(func(objectEncoder zapcore.ObjectEncoder) error { + for key, value := range e.Tags { + objectEncoder.AddString(key, value) + } return nil - } - - eventRow := NewEventRow(e, objectId) - eventID, err := database.InsertObtainID(ctx, tx, database.BuildInsertStmtWithout(db, eventRow, "id"), eventRow) - if err == nil { - e.ID = eventID - } - - return err -} - -// EventRow represents a single event database row and isn't an in-memory representation of an event. -type EventRow struct { - ID int64 `db:"id"` - Time types.UnixMilli `db:"time"` - ObjectID types.Binary `db:"object_id"` - Type types.String `db:"type"` - Severity baseEv.Severity `db:"severity"` - Username types.String `db:"username"` - Message types.String `db:"message"` - Mute types.Bool `db:"mute"` - MuteReason types.String `db:"mute_reason"` -} - -// TableName implements the contracts.TableNamer interface. -func (er *EventRow) TableName() string { - return "event" -} - -func NewEventRow(e *Event, objectId types.Binary) *EventRow { - return &EventRow{ - Time: types.UnixMilli(e.Time), - ObjectID: objectId, - Type: types.MakeString(e.Type.String(), types.TransformEmptyStringToNull), - Severity: e.Severity, - Username: types.MakeString(e.Username, types.TransformEmptyStringToNull), - Message: types.MakeString(e.Message, types.TransformEmptyStringToNull), - Mute: e.Mute, - MuteReason: types.MakeString(e.MuteReason, types.TransformEmptyStringToNull), - } + })) + return nil } diff --git a/internal/incident/db_types.go b/internal/incident/db_types.go index ab7749d9..131c09bd 100644 --- a/internal/incident/db_types.go +++ b/internal/incident/db_types.go @@ -9,17 +9,6 @@ import ( "github.com/jmoiron/sqlx" ) -// EventRow represents a single incident event database entry. -type EventRow struct { - IncidentID int64 `db:"incident_id"` - EventID int64 `db:"event_id"` -} - -// TableName implements the contracts.TableNamer interface. -func (e *EventRow) TableName() string { - return "incident_event" -} - // ContactRow represents a single incident contact database entry. type ContactRow struct { IncidentID int64 `db:"incident_id"` @@ -68,7 +57,6 @@ type HistoryRow struct { ID int64 `db:"id"` IncidentID int64 `db:"incident_id"` RuleEscalationID types.Int `db:"rule_escalation_id"` - EventID types.Int `db:"event_id"` recipient.Key `db:",inline"` RuleID types.Int `db:"rule_id"` Time types.UnixMilli `db:"time"` diff --git a/internal/incident/incident.go b/internal/incident/incident.go index d674ba9b..44912916 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -144,11 +144,6 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { } defer func() { _ = tx.Rollback() }() - if err = ev.Sync(ctx, tx, i.db, i.Object.ID); err != nil { - i.logger.Errorw("Failed to insert event and fetch its ID", zap.String("event", ev.String()), zap.Error(err)) - return err - } - isNew := i.StartedAt.Time().IsZero() if isNew { err = i.processIncidentOpenedEvent(ctx, tx, ev) @@ -159,11 +154,6 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { i.logger = i.logger.With(zap.String("incident", i.String())) } - if err = i.AddEvent(ctx, tx, ev); err != nil { - i.logger.Errorw("Cannot insert incident event to the database", zap.Error(err)) - return err - } - switch ev.Type { case baseEv.TypeState: if !isNew { @@ -261,15 +251,6 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { var notifications []*NotificationEntry ctx := context.Background() err = i.db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { - err := ev.Sync(ctx, tx, i.db, i.Object.ID) - if err != nil { - return err - } - - if err = i.AddEvent(ctx, tx, ev); err != nil { - return fmt.Errorf("cannot insert incident event to the database: %w", err) - } - if err = i.triggerEscalations(ctx, tx, ev, escalations); err != nil { return err } @@ -306,7 +287,6 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, hr := &HistoryRow{ IncidentID: i.Id, - EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), Time: types.UnixMilli(time.Now()), Type: IncidentSeverityChanged, NewSeverity: newSeverity, @@ -327,7 +307,6 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, hr = &HistoryRow{ IncidentID: i.Id, - EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), Time: i.RecoveredAt, Type: Closed, } @@ -365,7 +344,6 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, IncidentID: i.Id, Type: Opened, Time: types.UnixMilli(ev.Time), - EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), NewSeverity: i.Severity, Message: types.MakeString(ev.Message, types.TransformEmptyStringToNull), } @@ -391,7 +369,6 @@ func (i *Incident) handleUnmute(ctx context.Context, tx *sqlx.Tx, ev *event.Even hr := &HistoryRow{ IncidentID: i.Id, - EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), Time: types.UnixMilli(time.Now()), Type: Unmuted, // On the other hand, if an object is unmuted, its mute reason is already reset, and we can't access it anymore. @@ -413,7 +390,6 @@ func (i *Incident) handleMute(ctx context.Context, tx *sqlx.Tx, ev *event.Event) hr := &HistoryRow{ IncidentID: i.Id, - EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), Time: types.UnixMilli(time.Now()), Type: Muted, // Since the object may have already been muted with previous events before this incident even @@ -463,7 +439,6 @@ func (i *Incident) applyMatchingRules(ctx context.Context, tx *sqlx.Tx, ev *even hr := &HistoryRow{ IncidentID: i.Id, Time: types.UnixMilli(time.Now()), - EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), RuleID: types.MakeInt(r.ID, types.TransformZeroIntToNull), Type: RuleMatched, } @@ -573,7 +548,6 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even hr := &HistoryRow{ IncidentID: i.Id, Time: state.TriggeredAt, - EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), RuleEscalationID: types.MakeInt(state.RuleEscalationID, types.TransformZeroIntToNull), RuleID: types.MakeInt(r.ID, types.TransformZeroIntToNull), Type: EscalationTriggered, @@ -587,7 +561,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even return err } - if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil { + if err := i.AddRecipient(ctx, tx, escalation); err != nil { return err } } @@ -688,7 +662,6 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, hr := &HistoryRow{ IncidentID: i.Id, Key: recipientKey, - EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), Type: RecipientRoleChanged, Time: types.UnixMilli(time.Now()), NewRecipientRole: newRole, diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index f6ec183d..6bd17ea9 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -12,7 +12,6 @@ import ( "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/utils" - "github.com/jmoiron/sqlx" "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -228,11 +227,6 @@ func ProcessEvent( runtimeConfig *config.RuntimeConfig, ev *event.Event, ) error { - var wasObjectMuted bool - if obj := object.GetFromCache(object.ID(ev.SourceId, ev.Tags)); obj != nil { - wasObjectMuted = obj.IsMuted() - } - obj, err := object.FromEvent(ctx, db, ev) if err != nil { return fmt.Errorf("cannot sync event object: %w", err) @@ -251,29 +245,13 @@ func ProcessEvent( } if currentIncident == nil { - switch { - case ev.Severity == baseEv.SeverityNone: - // We need to ignore superfluous mute and unmute events here, as would be the case with an existing - // incident, otherwise the event stream catch-up phase will generate useless events after each - // Icinga 2 reload and overwhelm the database with the very same mute/unmute events. - if wasObjectMuted && ev.Type == baseEv.TypeMute { - return event.ErrSuperfluousMuteUnmuteEvent - } else if !wasObjectMuted && ev.Type == baseEv.TypeUnmute { - return event.ErrSuperfluousMuteUnmuteEvent - } - - // There is no active incident, but the event appears to be relevant, so try to persist it in the DB. - err = db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { return ev.Sync(ctx, tx, db, obj.ID) }) - if err != nil { - return errors.New("cannot sync non-state event to the database") - } - - return nil - case ev.Severity != baseEv.SeverityOK: + if ev.Severity != baseEv.SeverityOK && ev.Severity != baseEv.SeverityNone { panic(fmt.Sprintf("cannot process event %v with a non-OK state %v without a known incident", ev, ev.Severity)) - default: + } + if ev.Severity == baseEv.SeverityOK { return fmt.Errorf("%w: ok state event from source %d", event.ErrSuperfluousStateChange, ev.SourceId) } + return nil } return currentIncident.ProcessEvent(ctx, ev) diff --git a/internal/incident/sync.go b/internal/incident/sync.go index 61bafd3d..9da252a0 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -54,18 +54,9 @@ func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, stat return err } -// AddEvent Inserts incident history record to the database and returns an error on db failure. -func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { - ie := &EventRow{IncidentID: i.Id, EventID: ev.ID} - stmt, _ := i.db.BuildInsertStmt(ie) - _, err := tx.NamedExecContext(ctx, stmt, ie) - - return err -} - // AddRecipient adds recipient from the given *rule.Escalation to this incident. // Syncs also all the recipients with the database and returns an error on db failure. -func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *rule.Escalation, eventId int64) error { +func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *rule.Escalation) error { newRole := RoleRecipient if i.HasManager() { newRole = RoleSubscriber @@ -90,7 +81,6 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru hr := &HistoryRow{ IncidentID: i.Id, - EventID: types.MakeInt(eventId, types.TransformZeroIntToNull), Key: cr.Key, Time: types.UnixMilli(time.Now()), Type: RecipientRoleChanged, @@ -148,7 +138,6 @@ func (i *Incident) generateNotifications( hr := &HistoryRow{ IncidentID: i.Id, Key: recipient.ToKey(contact), - EventID: types.MakeInt(ev.ID, types.TransformZeroIntToNull), Time: types.UnixMilli(time.Now()), Type: Notified, ChannelID: types.MakeInt(chID, types.TransformZeroIntToNull), diff --git a/internal/listener/listener.go b/internal/listener/listener.go index 794d8466..42828ca8 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -168,6 +168,8 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) { } l.logger.Infow("Processing event", zap.String("event", ev.String())) + l.logger.Debugw("Event details", zap.Object("event", &ev)) + err := incident.ProcessEvent(context.Background(), l.db, l.logs, l.runtimeConfig, &ev) if errors.Is(err, event.ErrSuperfluousStateChange) || errors.Is(err, event.ErrSuperfluousMuteUnmuteEvent) { l.abort(w, http.StatusNotAcceptable, &ev, "%v", err) diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index 8960e81e..4469620d 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -264,23 +264,6 @@ CREATE TABLE object_extra_tag ( CONSTRAINT fk_object_extra_tag_object FOREIGN KEY (object_id) REFERENCES object(id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE TABLE event ( - id bigint NOT NULL AUTO_INCREMENT, - time bigint NOT NULL, - object_id binary(32) NOT NULL, - -- NOT NULL is enforced via CHECK not to default to 'acknowledgement-cleared' - type enum('acknowledgement-cleared', 'acknowledgement-set', 'custom', 'downtime-end', 'downtime-removed', 'downtime-start', 'flapping-end', 'flapping-start', 'incident-age', 'mute', 'state', 'unmute'), - severity enum('ok', 'debug', 'info', 'notice', 'warning', 'err', 'crit', 'alert', 'emerg'), - message mediumtext, - username text COLLATE utf8mb4_unicode_ci, - mute enum('n', 'y'), - mute_reason mediumtext, - - CONSTRAINT pk_event PRIMARY KEY (id), - CONSTRAINT ck_event_type_notnull CHECK (type IS NOT NULL), - CONSTRAINT fk_event_object FOREIGN KEY (object_id) REFERENCES object(id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; - CREATE TABLE rule ( id bigint NOT NULL AUTO_INCREMENT, name text NOT NULL COLLATE utf8mb4_unicode_ci, @@ -358,15 +341,6 @@ CREATE TABLE incident ( CONSTRAINT fk_incident_object FOREIGN KEY (object_id) REFERENCES object(id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE TABLE incident_event ( - incident_id bigint NOT NULL, - event_id bigint NOT NULL, - - CONSTRAINT pk_incident_event PRIMARY KEY (incident_id, event_id), - CONSTRAINT fk_incident_event_incident FOREIGN KEY (incident_id) REFERENCES incident(id), - CONSTRAINT fk_incident_event_event FOREIGN KEY (event_id) REFERENCES event(id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; - CREATE TABLE incident_contact ( incident_id bigint NOT NULL, contact_id bigint, @@ -409,7 +383,6 @@ CREATE TABLE incident_history ( id bigint NOT NULL AUTO_INCREMENT, incident_id bigint NOT NULL, rule_escalation_id bigint, - event_id bigint, contact_id bigint, contactgroup_id bigint, schedule_id bigint, @@ -432,7 +405,6 @@ CREATE TABLE incident_history ( CONSTRAINT fk_incident_history_incident_rule_escalation_state FOREIGN KEY (incident_id, rule_escalation_id) REFERENCES incident_rule_escalation_state(incident_id, rule_escalation_id), CONSTRAINT fk_incident_history_incident FOREIGN KEY (incident_id) REFERENCES incident(id), CONSTRAINT fk_incident_history_rule_escalation FOREIGN KEY (rule_escalation_id) REFERENCES rule_escalation(id), - CONSTRAINT fk_incident_history_event FOREIGN KEY (event_id) REFERENCES event(id), CONSTRAINT fk_incident_history_contact FOREIGN KEY (contact_id) REFERENCES contact(id), CONSTRAINT fk_incident_history_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), CONSTRAINT fk_incident_history_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), diff --git a/schema/mysql/upgrades/1.0.sql b/schema/mysql/upgrades/1.0.sql new file mode 100644 index 00000000..20a1201d --- /dev/null +++ b/schema/mysql/upgrades/1.0.sql @@ -0,0 +1,3 @@ +DROP TABLE incident_event; +ALTER TABLE incident_history DROP CONSTRAINT fk_incident_history_event; +DROP TABLE event; diff --git a/schema/pgsql/schema.sql b/schema/pgsql/schema.sql index 791b5702..d55d2277 100644 --- a/schema/pgsql/schema.sql +++ b/schema/pgsql/schema.sql @@ -297,37 +297,8 @@ CREATE TABLE object_extra_tag ( CONSTRAINT fk_object_extra_tag_object FOREIGN KEY (object_id) REFERENCES object(id) ); -CREATE TYPE event_type AS ENUM ( - 'acknowledgement-cleared', - 'acknowledgement-set', - 'custom', - 'downtime-end', - 'downtime-removed', - 'downtime-start', - 'flapping-end', - 'flapping-start', - 'incident-age', - 'mute', - 'state', - 'unmute' -); CREATE TYPE severity AS ENUM ('ok', 'debug', 'info', 'notice', 'warning', 'err', 'crit', 'alert', 'emerg'); -CREATE TABLE event ( - id bigserial, - time bigint NOT NULL, - object_id bytea NOT NULL, - type event_type NOT NULL, - severity severity, - message text, - username citext, - mute boolenum, - mute_reason text, - - CONSTRAINT pk_event PRIMARY KEY (id), - CONSTRAINT fk_event_object FOREIGN KEY (object_id) REFERENCES object(id) -); - CREATE TABLE rule ( id bigserial, name citext NOT NULL, @@ -403,15 +374,6 @@ CREATE TABLE incident ( CONSTRAINT fk_incident_object FOREIGN KEY (object_id) REFERENCES object(id) ); -CREATE TABLE incident_event ( - incident_id bigint NOT NULL, - event_id bigint NOT NULL, - - CONSTRAINT pk_incident_event PRIMARY KEY (incident_id, event_id), - CONSTRAINT fk_incident_event_incident FOREIGN KEY (incident_id) REFERENCES incident(id), - CONSTRAINT fk_incident_event_event FOREIGN KEY (event_id) REFERENCES event(id) -); - CREATE TYPE incident_contact_role AS ENUM ('recipient', 'subscriber', 'manager'); CREATE TABLE incident_contact ( @@ -456,7 +418,6 @@ CREATE TABLE incident_history ( id bigserial, incident_id bigint NOT NULL, rule_escalation_id bigint, - event_id bigint, contact_id bigint, contactgroup_id bigint, schedule_id bigint, @@ -476,7 +437,6 @@ CREATE TABLE incident_history ( CONSTRAINT fk_incident_history_incident_rule_escalation_state FOREIGN KEY (incident_id, rule_escalation_id) REFERENCES incident_rule_escalation_state(incident_id, rule_escalation_id), CONSTRAINT fk_incident_history_incident FOREIGN KEY (incident_id) REFERENCES incident(id), CONSTRAINT fk_incident_history_rule_escalation FOREIGN KEY (rule_escalation_id) REFERENCES rule_escalation(id), - CONSTRAINT fk_incident_history_event FOREIGN KEY (event_id) REFERENCES event(id), CONSTRAINT fk_incident_history_contact FOREIGN KEY (contact_id) REFERENCES contact(id), CONSTRAINT fk_incident_history_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), CONSTRAINT fk_incident_history_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), diff --git a/schema/pgsql/upgrades/1.0.sql b/schema/pgsql/upgrades/1.0.sql new file mode 100644 index 00000000..fe2227b2 --- /dev/null +++ b/schema/pgsql/upgrades/1.0.sql @@ -0,0 +1,4 @@ +DROP TABLE incident_event; +ALTER TABLE incident_history DROP CONSTRAINT fk_incident_history_event; +DROP TABLE event; +DROP TYPE IF EXISTS event_type;