diff --git a/storage/aws/aws.go b/storage/aws/aws.go index 08f2ebac..933a92c1 100644 --- a/storage/aws/aws.go +++ b/storage/aws/aws.go @@ -305,7 +305,7 @@ func (a *Appender) integrateEntriesJob(ctx context.Context) { defer func() { opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("integrateEntries"))) }() - + ctx, cancel := context.WithTimeout(ctx, defaultIntegrationTimeout) defer cancel() // Note: ok because we're in a func passed to TraceErr here! @@ -341,12 +341,12 @@ func (a *Appender) publishCheckpointJob(ctx context.Context, pubInterval, republ ctx, cancel := context.WithTimeout(ctx, defaultPublicationTimeout) defer cancel() // Note: ok because we're in a func passed to TraceErr here! - publishedAt, err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.updateCheckpoint) + nextPub, err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.updateCheckpoint) if err != nil { return fmt.Errorf("publishCheckpoint failed: %v", err) } // Schedule a checkpoint update immediately, if an updated is due. - t.Reset(max(time.Millisecond, pubInterval-time.Since(publishedAt))) + t.Reset(max(time.Millisecond, time.Until(nextPub))) return nil }, trace.WithAttributes(otel.PeriodicKey.Bool(true))); err != nil { t.Reset(pubInterval) @@ -1269,11 +1269,11 @@ func (s *mySQLSequencer) nextIndex(ctx context.Context) (uint64, error) { // publishCheckpoint checks when the last checkpoint was published, and if it was more than minAge ago, calls the provided // function to publish a new one. // -// Returns the time at which the checkpoint is published, or was last published. +// Returns the time at which publishCheckpoint should be called again. // // This function uses PubCoord with an exclusive lock to guarantee that only one tessera instance can attempt to publish // a checkpoint at any given time. -func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive, minStaleRepub time.Duration, f func(context.Context, uint64, []byte) error) (publishedAt time.Time, errR error) { +func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive, minStaleRepub time.Duration, f func(context.Context, uint64, []byte) error) (nextPubAt time.Time, errR error) { start := time.Now() defer func() { // Detect any errors and update metrics accordingly. @@ -1294,16 +1294,25 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive, }() pRow := tx.QueryRowContext(ctx, "SELECT publishedAt, size FROM PubCoord WHERE id = ? FOR UPDATE", 0) - var pubAt int64 + var publishedAt int64 var lastSize sql.NullInt64 - if err := pRow.Scan(&pubAt, &lastSize); err != nil { + if err := pRow.Scan(&publishedAt, &lastSize); err != nil { return time.Time{}, fmt.Errorf("failed to parse PubCoord: %v", err) } - cpAge := time.Since(time.Unix(pubAt, 0)) + var pubAt time.Time + // Tessera used to store publishedAt in seconds, and now stores it in nanoseconds. + // This handles the transition. + const nanosecondThreshold = 1_310_324_790_000 + if publishedAt < nanosecondThreshold { + pubAt = time.Unix(publishedAt, 0) + } else { + pubAt = time.Unix(0, publishedAt) + } + cpAge := time.Since(pubAt) if cpAge < minStaleActive { slog.DebugContext(ctx, "publishCheckpoint: last checkpoint published too recently, not publishing new checkpoint", slog.Duration("age", cpAge), slog.Duration("minstaleactive", minStaleActive)) publishCount.Add(ctx, 1, metric.WithAttributes(errorTypeKey.String("skipped"))) - return time.Unix(pubAt, 0), nil + return pubAt.Add(minStaleActive), nil } row := tx.QueryRowContext(ctx, "SELECT seq, rootHash FROM IntCoord WHERE id = ?", 0) @@ -1328,7 +1337,11 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive, if !shouldPublish { slog.DebugContext(ctx, "publishCheckpoint: skipping publish because tree hasn't grown and previous checkpoint is too recent") publishCount.Add(ctx, 1, metric.WithAttributes(errorTypeKey.String("skipped_no_growth"))) - return time.Unix(pubAt, 0), nil + nextPubAt = start.Add(minStaleActive) + if minStaleRepub > 0 && nextPubAt.After(pubAt.Add(minStaleRepub)) { + nextPubAt = pubAt.Add(minStaleRepub) + } + return nextPubAt, nil } slog.DebugContext(ctx, "publishCheckpoint: updating checkpoint (replacing old checkpoint)", slog.Duration("age", cpAge)) @@ -1337,8 +1350,8 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive, return time.Time{}, err } - publishedAt = time.Now() - if _, err := tx.ExecContext(ctx, "UPDATE PubCoord SET publishedAt=?, size=? WHERE id=?", publishedAt.Unix(), currentSize, 0); err != nil { + pubAt = time.Now() + if _, err := tx.ExecContext(ctx, "UPDATE PubCoord SET publishedAt=?, size=? WHERE id=?", pubAt.UnixNano(), currentSize, 0); err != nil { return time.Time{}, err } if err := tx.Commit(); err != nil { @@ -1348,7 +1361,8 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive, publishCount.Add(ctx, 1) tx = nil - return publishedAt, nil + nextPubAt = pubAt.Add(minStaleActive) + return nextPubAt, nil } // garbageCollect will identify up to maxBundles unneeded partial entry bundles (and any unneeded partial tiles which sit above them in the tree) and diff --git a/storage/aws/aws_test.go b/storage/aws/aws_test.go index fd574121..bfa35032 100644 --- a/storage/aws/aws_test.go +++ b/storage/aws/aws_test.go @@ -414,6 +414,12 @@ func TestPublishTree(t *testing.T) { republishInterval: 2 * time.Second, attempts: []time.Duration{1500 * time.Millisecond, 2500 * time.Millisecond}, wantUpdates: 1, + }, { + name: "publish: no growth; republish: disabled", + publishInterval: 100 * time.Millisecond, + republishInterval: 0, + attempts: []time.Duration{100 * time.Millisecond, 100 * time.Millisecond}, + wantUpdates: 0, }, } { t.Run(test.name, func(t *testing.T) { @@ -439,6 +445,7 @@ func TestPublishTree(t *testing.T) { if err := storage.init(ctx); err != nil { t.Fatalf("storage.init: %v", err) } + pubAt := time.Now() // Good approximation of the checkpoint's future publishedAt. if _, err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint); err != nil { t.Fatalf("publishTree: %v", err) } @@ -449,15 +456,24 @@ func TestPublishTree(t *testing.T) { updatesSeen := 0 for _, d := range test.attempts { time.Sleep(d) - if _, err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint); err != nil { + nextPubAt, err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint) + notAfter := time.Now().Add(test.publishInterval) + if err != nil { t.Fatalf("publishTree: %v", err) } + if nextPubAt.Before(pubAt.Add(test.publishInterval)) { + t.Errorf("nextPubAt = %v, want larger than %v", nextPubAt, pubAt.Add(test.publishInterval)) + } + if nextPubAt.After(notAfter) { + t.Errorf("nextPubAt = %v, want smaller than %v", nextPubAt, notAfter) + } cpNew, err := m.getObject(ctx, layout.CheckpointPath) if err != nil { t.Fatalf("getObject: %v", err) } if !bytes.Equal(cpOld, cpNew) { updatesSeen++ + pubAt = time.Now() cpOld = cpNew } } diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 7e74da7b..e0a28e7f 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -393,12 +393,12 @@ func (a *Appender) publishCheckpointJob(ctx context.Context, pubInterval, republ ctx, cancel := context.WithTimeout(ctx, defaultPublicationTimeout) defer cancel() // Note: ok because we're in a func passed to TraceErr here! - publishedAt, err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.updateCheckpoint) + nextPub, err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.updateCheckpoint) if err != nil { return fmt.Errorf("publishCheckpoint failed: %v", err) } // Schedule a checkpoint update immediately, if an updated is due. - t.Reset(max(time.Millisecond, pubInterval-time.Since(publishedAt))) + t.Reset(max(time.Millisecond, time.Until(nextPub))) return nil }, trace.WithAttributes(otel.PeriodicKey.Bool(true))); err != nil { t.Reset(pubInterval) @@ -1104,7 +1104,7 @@ func (s *spannerCoordinator) nextIndex(ctx context.Context) (uint64, error) { // publishCheckpoint checks when the last checkpoint was published, and if appropriate, calls the provided // function to publish a new one. // -// Returns the time at which the checkpoint is published, or was last published. +// Returns the time at which publishCheckpoint should be called again. // // A checkpoint will not be published if either: // - the currently published checkpoint was published less than minStaleActive ago @@ -1117,7 +1117,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi return otel.Trace(ctx, "tessera.storage.gcp.publishCheckpoint", tracer, func(ctx context.Context, span trace.Span) (time.Time, error) { // outcomeAttr records the outcome of the checkpoint publication attempt for metrics and traces. var outcomeAttr attribute.KeyValue - var pubAt time.Time + var nextPubAt time.Time start := time.Now() currentSize, rootHash, err := s.currentTree(ctx) @@ -1136,6 +1136,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi return fmt.Errorf("failed to read PubCoord: %w", err) } var lastSize spanner.NullInt64 + var pubAt time.Time if err := pRow.Columns(&pubAt, &lastSize); err != nil { return fmt.Errorf("failed to parse PubCoord: %v", err) } @@ -1150,6 +1151,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi if cpAge < minStaleActive { slog.DebugContext(ctx, "publishCheckpoint: last checkpoint published too recently, not publishing new checkpoint", slog.Duration("cpAge", cpAge), slog.Duration("minStaleActive", minStaleActive)) outcomeAttr = outcomeTypeKey.String("skipped") + nextPubAt = pubAt.Add(minStaleActive) return nil } @@ -1166,6 +1168,10 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi if !shouldPublish { slog.DebugContext(ctx, "publishCheckpoint: skipping publish because tree hasn't grown and previous checkpoint is too recent") outcomeAttr = outcomeTypeKey.String("skipped_no_growth") + nextPubAt = start.Add(minStaleActive) + if minStaleRepub > 0 && nextPubAt.After(pubAt.Add(minStaleRepub)) { + nextPubAt = pubAt.Add(minStaleRepub) + } return nil } @@ -1177,6 +1183,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi } span.AddEvent("Updating PubCoord") pubAt = time.Now() + nextPubAt = pubAt.Add(minStaleActive) if err := txn.BufferWrite([]*spanner.Mutation{spanner.Update("PubCoord", []string{"id", "publishedAt", "size"}, []any{0, pubAt, int64(currentSize)})}); err != nil { return err } @@ -1190,7 +1197,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("publishCheckpoint"))) span.SetAttributes(outcomeAttr) publishCount.Add(ctx, 1, metric.WithAttributes(outcomeAttr)) - return pubAt, nil + return nextPubAt, nil }) } diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index 2cc71dff..641eb3ea 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -491,6 +491,12 @@ func TestPublishTree(t *testing.T) { republishInterval: 2 * time.Second, attempts: []time.Duration{1500 * time.Millisecond, 2500 * time.Millisecond}, wantUpdates: 1, + }, { + name: "publish: no growth; republish: disabled", + publishInterval: 100 * time.Millisecond, + republishInterval: 0, + attempts: []time.Duration{100 * time.Millisecond, 100 * time.Millisecond}, + wantUpdates: 0, }, } { t.Run(test.name, func(t *testing.T) { @@ -518,6 +524,7 @@ func TestPublishTree(t *testing.T) { t.Fatalf("storage.init: %v", err) } + pubAt := time.Now() // Good approximation of the checkpoint's future publishedAt. if _, err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint); err != nil { t.Fatalf("publishTree: %v", err) } @@ -528,15 +535,24 @@ func TestPublishTree(t *testing.T) { updatesSeen := 0 for _, d := range test.attempts { time.Sleep(d) - if _, err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint); err != nil { + nextPubAt, err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint) + notAfter := time.Now().Add(test.publishInterval) + if err != nil { t.Fatalf("publishTree: %v", err) } + if nextPubAt.Before(pubAt.Add(test.publishInterval)) { + t.Errorf("nextPubAt = %v, want larger than %v", nextPubAt, pubAt.Add(test.publishInterval)) + } + if nextPubAt.After(notAfter) { + t.Errorf("nextPubAt = %v, want smaller than %v", nextPubAt, notAfter) + } cpNew, _, err := m.getObject(ctx, layout.CheckpointPath) if err != nil { t.Fatalf("getObject: %v", err) } if !bytes.Equal(cpOld, cpNew) { updatesSeen++ + pubAt = time.Now() cpOld = cpNew } } diff --git a/storage/posix/files.go b/storage/posix/files.go index e7117083..359e7eee 100644 --- a/storage/posix/files.go +++ b/storage/posix/files.go @@ -179,12 +179,12 @@ func (a *appender) publishCheckpointJob(ctx context.Context, pubInterval, republ if err := otel.TraceErr(ctx, "tessera.storage.posix.publishCheckpointJob", tracer, func(ctx context.Context, span trace.Span) error { ctx, cancel := context.WithTimeout(ctx, defaultPublicationTimeout) defer cancel() - publishedAt, err := a.publishCheckpoint(ctx, pubInterval, republishInterval) + nextPub, err := a.publishCheckpoint(ctx, pubInterval, republishInterval) if err != nil { return err } // Schedule a checkpoint update immediately, if an updated is due. - t.Reset(max(time.Millisecond, pubInterval-time.Since(publishedAt))) + t.Reset(max(time.Millisecond, time.Until(nextPub))) return nil }, trace.WithAttributes(otel.PeriodicKey.Bool(true))); err != nil { t.Reset(pubInterval) @@ -675,10 +675,10 @@ func (s *Storage) readTreeState(ctx context.Context) (uint64, []byte, error) { // minStaleness old, and, if so, creates and published a fresh checkpoint from the current // stored tree state. // -// Returns the time at which the checkpoint is published, or was last published. -func (a *appender) publishCheckpoint(ctx context.Context, minStalenessActive, minStalenessRepub time.Duration) (publishedAt time.Time, errR error) { +// Returns the time at which publishCheckpoint should be called again. +func (a *appender) publishCheckpoint(ctx context.Context, minStalenessActive, minStalenessRepub time.Duration) (nextPubAt time.Time, errR error) { return otel.Trace(ctx, "tessera.storage.posix.publishCheckpoint", tracer, func(ctx context.Context, span trace.Span) (time.Time, error) { - now := time.Now() + start := time.Now() defer func() { // Detect any errors and update metrics accordingly. // Non-error cases are explicitly handled in the body of the function below. @@ -701,6 +701,7 @@ func (a *appender) publishCheckpoint(ctx context.Context, minStalenessActive, mi var publishedAge time.Duration var publishedSize uint64 cpExists := true + var pubAt time.Time info, err := a.s.stat(layout.CheckpointPath) if errors.Is(err, os.ErrNotExist) { slog.DebugContext(ctx, "No checkpoint exists, publishing") @@ -708,12 +709,13 @@ func (a *appender) publishCheckpoint(ctx context.Context, minStalenessActive, mi } else if err != nil { return time.Time{}, fmt.Errorf("stat(%s): %v", layout.CheckpointPath, err) } else { - publishedAt = info.ModTime() + pubAt = info.ModTime() publishedAge = time.Since(info.ModTime()) if publishedAge < minStalenessActive { slog.DebugContext(ctx, "publishCheckpoint: skipping publish because previous checkpoint too fresh", slog.Duration("age", publishedAge), slog.Duration("minstalenessactive", minStalenessActive)) publishCount.Add(ctx, 1, metric.WithAttributes(errorTypeKey.String("skipped"))) - return publishedAt, nil + nextPubAt = pubAt.Add(minStalenessActive) + return nextPubAt, nil } publishedSize, err = a.publishedSize(ctx) if err != nil { @@ -731,7 +733,11 @@ func (a *appender) publishCheckpoint(ctx context.Context, minStalenessActive, mi if minStalenessRepub == 0 || publishedAge < minStalenessRepub { slog.DebugContext(ctx, "publishCheckpoint: skipping publish because tree hasn't grown and previous checkpoint is too recent") publishCount.Add(ctx, 1, metric.WithAttributes(errorTypeKey.String("skipped_no_growth"))) - return publishedAt, nil + nextPubAt = start.Add(minStalenessActive) + if minStalenessRepub > 0 && nextPubAt.After(pubAt.Add(minStalenessRepub)) { + nextPubAt = pubAt.Add(minStalenessRepub) + } + return nextPubAt, nil } } @@ -744,14 +750,14 @@ func (a *appender) publishCheckpoint(ctx context.Context, minStalenessActive, mi return time.Time{}, fmt.Errorf("createOverwrite(%s): %v", layout.CheckpointPath, err) } // This is not the actual ModTime of the file, but it's close enough. - publishedAt = time.Now() + pubAt = time.Now() slog.DebugContext(ctx, "Published latest checkpoint", slog.Uint64("size", size), slog.String("root", fmt.Sprintf("%x", root))) - posixOpsHistogram.Record(ctx, time.Since(now).Milliseconds(), metric.WithAttributes(opNameKey.String("publishCheckpoint"))) + posixOpsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("publishCheckpoint"))) publishCount.Add(ctx, 1) - return publishedAt, nil + return pubAt.Add(minStalenessActive), nil }) } diff --git a/storage/posix/files_test.go b/storage/posix/files_test.go index efefb7d1..ed53d213 100644 --- a/storage/posix/files_test.go +++ b/storage/posix/files_test.go @@ -281,6 +281,13 @@ func TestPublishTree(t *testing.T) { growTree: true, attempts: []time.Duration{300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond}, wantUpdates: 1, + }, { + name: "publish: no growth; republish: disabled", + publishInterval: 100 * time.Millisecond, + republishInterval: 0, + growTree: false, + attempts: []time.Duration{100 * time.Millisecond, 100 * time.Millisecond}, + wantUpdates: 0, }, { name: "republish: works ok", publishInterval: minCheckpointInterval, @@ -327,6 +334,7 @@ func TestPublishTree(t *testing.T) { s: s, entriesPath: opts.EntriesPath(), } + pubAt := time.Now() // Good approximation of the checkpoint's future modTime. appender, lr, err := s.newAppender(ctx, logStorage, opts) if err != nil { t.Fatalf("Appender: %v", err) @@ -357,15 +365,24 @@ func TestPublishTree(t *testing.T) { for _, d := range test.attempts { time.Sleep(d) - if _, err := appender.publishCheckpoint(ctx, test.publishInterval, test.republishInterval); err != nil { + nextPubAt, err := appender.publishCheckpoint(ctx, test.publishInterval, test.republishInterval) + notAfter := time.Now().Add(test.publishInterval) + if err != nil { t.Fatalf("publishTree: %v", err) } + if nextPubAt.Before(pubAt.Add(test.publishInterval)) { + t.Errorf("nextPubAt = %v, want larger than %v", nextPubAt, pubAt.Add(test.publishInterval)) + } + if nextPubAt.After(notAfter) { + t.Errorf("nextPubAt = %v, want smaller than %v", nextPubAt, notAfter) + } cpNew, err := lr.ReadCheckpoint(ctx) if err != nil { t.Fatalf("ReadCheckpoint: %v", err) } if !bytes.Equal(cpOld, cpNew) { updatesSeen++ + pubAt = time.Now() cpOld = cpNew } }