Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions storage/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (a *Appender) integrateEntriesJob(ctx context.Context) {
defer func() {
opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("integrateEntries")))
}()

ctx, cancel := context.WithTimeout(ctx, defaultIntegrationTimeout)
defer cancel() // Note: ok because we're in a func passed to TraceErr here!

Expand Down Expand Up @@ -341,12 +341,12 @@ func (a *Appender) publishCheckpointJob(ctx context.Context, pubInterval, republ
ctx, cancel := context.WithTimeout(ctx, defaultPublicationTimeout)
defer cancel() // Note: ok because we're in a func passed to TraceErr here!

publishedAt, err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.updateCheckpoint)
nextPub, err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.updateCheckpoint)
if err != nil {
return fmt.Errorf("publishCheckpoint failed: %v", err)
}
// Schedule a checkpoint update immediately, if an updated is due.
t.Reset(max(time.Millisecond, pubInterval-time.Since(publishedAt)))
t.Reset(max(time.Millisecond, time.Until(nextPub)))
return nil
}, trace.WithAttributes(otel.PeriodicKey.Bool(true))); err != nil {
t.Reset(pubInterval)
Expand Down Expand Up @@ -1269,11 +1269,11 @@ func (s *mySQLSequencer) nextIndex(ctx context.Context) (uint64, error) {
// publishCheckpoint checks when the last checkpoint was published, and if it was more than minAge ago, calls the provided
// function to publish a new one.
//
// Returns the time at which the checkpoint is published, or was last published.
// Returns the time at which publishCheckpoint should be called again.
//
// This function uses PubCoord with an exclusive lock to guarantee that only one tessera instance can attempt to publish
// a checkpoint at any given time.
func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive, minStaleRepub time.Duration, f func(context.Context, uint64, []byte) error) (publishedAt time.Time, errR error) {
func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive, minStaleRepub time.Duration, f func(context.Context, uint64, []byte) error) (nextPubAt time.Time, errR error) {
start := time.Now()
defer func() {
// Detect any errors and update metrics accordingly.
Expand All @@ -1294,16 +1294,25 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive,
}()

pRow := tx.QueryRowContext(ctx, "SELECT publishedAt, size FROM PubCoord WHERE id = ? FOR UPDATE", 0)
var pubAt int64
var publishedAt int64
var lastSize sql.NullInt64
if err := pRow.Scan(&pubAt, &lastSize); err != nil {
if err := pRow.Scan(&publishedAt, &lastSize); err != nil {
return time.Time{}, fmt.Errorf("failed to parse PubCoord: %v", err)
}
cpAge := time.Since(time.Unix(pubAt, 0))
var pubAt time.Time
// Tessera used to store publishedAt in seconds, and now stores it in nanoseconds.
// This handles the transition.
const nanosecondThreshold = 1_310_324_790_000
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hehe, nice :)

if publishedAt < nanosecondThreshold {
pubAt = time.Unix(publishedAt, 0)
} else {
pubAt = time.Unix(0, publishedAt)
}
cpAge := time.Since(pubAt)
if cpAge < minStaleActive {
slog.DebugContext(ctx, "publishCheckpoint: last checkpoint published too recently, not publishing new checkpoint", slog.Duration("age", cpAge), slog.Duration("minstaleactive", minStaleActive))
publishCount.Add(ctx, 1, metric.WithAttributes(errorTypeKey.String("skipped")))
return time.Unix(pubAt, 0), nil
return pubAt.Add(minStaleActive), nil
}

row := tx.QueryRowContext(ctx, "SELECT seq, rootHash FROM IntCoord WHERE id = ?", 0)
Expand All @@ -1328,7 +1337,11 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive,
if !shouldPublish {
slog.DebugContext(ctx, "publishCheckpoint: skipping publish because tree hasn't grown and previous checkpoint is too recent")
publishCount.Add(ctx, 1, metric.WithAttributes(errorTypeKey.String("skipped_no_growth")))
return time.Unix(pubAt, 0), nil
nextPubAt = start.Add(minStaleActive)
if minStaleRepub > 0 && nextPubAt.After(pubAt.Add(minStaleRepub)) {
nextPubAt = pubAt.Add(minStaleRepub)
}
return nextPubAt, nil
}

slog.DebugContext(ctx, "publishCheckpoint: updating checkpoint (replacing old checkpoint)", slog.Duration("age", cpAge))
Expand All @@ -1337,8 +1350,8 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive,
return time.Time{}, err
}

publishedAt = time.Now()
if _, err := tx.ExecContext(ctx, "UPDATE PubCoord SET publishedAt=?, size=? WHERE id=?", publishedAt.Unix(), currentSize, 0); err != nil {
pubAt = time.Now()
if _, err := tx.ExecContext(ctx, "UPDATE PubCoord SET publishedAt=?, size=? WHERE id=?", pubAt.UnixNano(), currentSize, 0); err != nil {
return time.Time{}, err
}
if err := tx.Commit(); err != nil {
Expand All @@ -1348,7 +1361,8 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive,
publishCount.Add(ctx, 1)
tx = nil

return publishedAt, nil
nextPubAt = pubAt.Add(minStaleActive)
return nextPubAt, nil
}

// garbageCollect will identify up to maxBundles unneeded partial entry bundles (and any unneeded partial tiles which sit above them in the tree) and
Expand Down
18 changes: 17 additions & 1 deletion storage/aws/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,12 @@ func TestPublishTree(t *testing.T) {
republishInterval: 2 * time.Second,
attempts: []time.Duration{1500 * time.Millisecond, 2500 * time.Millisecond},
wantUpdates: 1,
}, {
name: "publish: no growth; republish: disabled",
publishInterval: 100 * time.Millisecond,
republishInterval: 0,
attempts: []time.Duration{100 * time.Millisecond, 100 * time.Millisecond},
wantUpdates: 0,
},
} {
t.Run(test.name, func(t *testing.T) {
Expand All @@ -439,6 +445,7 @@ func TestPublishTree(t *testing.T) {
if err := storage.init(ctx); err != nil {
t.Fatalf("storage.init: %v", err)
}
pubAt := time.Now() // Good approximation of the checkpoint's future publishedAt.
if _, err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint); err != nil {
t.Fatalf("publishTree: %v", err)
}
Expand All @@ -449,15 +456,24 @@ func TestPublishTree(t *testing.T) {
updatesSeen := 0
for _, d := range test.attempts {
time.Sleep(d)
if _, err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint); err != nil {
nextPubAt, err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint)
notAfter := time.Now().Add(test.publishInterval)
if err != nil {
t.Fatalf("publishTree: %v", err)
}
if nextPubAt.Before(pubAt.Add(test.publishInterval)) {
t.Errorf("nextPubAt = %v, want larger than %v", nextPubAt, pubAt.Add(test.publishInterval))
}
if nextPubAt.After(notAfter) {
t.Errorf("nextPubAt = %v, want smaller than %v", nextPubAt, notAfter)
}
cpNew, err := m.getObject(ctx, layout.CheckpointPath)
if err != nil {
t.Fatalf("getObject: %v", err)
}
if !bytes.Equal(cpOld, cpNew) {
updatesSeen++
pubAt = time.Now()
cpOld = cpNew
}
}
Expand Down
17 changes: 12 additions & 5 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,12 +393,12 @@ func (a *Appender) publishCheckpointJob(ctx context.Context, pubInterval, republ
ctx, cancel := context.WithTimeout(ctx, defaultPublicationTimeout)
defer cancel() // Note: ok because we're in a func passed to TraceErr here!

publishedAt, err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.updateCheckpoint)
nextPub, err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.updateCheckpoint)
if err != nil {
return fmt.Errorf("publishCheckpoint failed: %v", err)
}
// Schedule a checkpoint update immediately, if an updated is due.
t.Reset(max(time.Millisecond, pubInterval-time.Since(publishedAt)))
t.Reset(max(time.Millisecond, time.Until(nextPub)))
return nil
}, trace.WithAttributes(otel.PeriodicKey.Bool(true))); err != nil {
t.Reset(pubInterval)
Expand Down Expand Up @@ -1104,7 +1104,7 @@ func (s *spannerCoordinator) nextIndex(ctx context.Context) (uint64, error) {
// publishCheckpoint checks when the last checkpoint was published, and if appropriate, calls the provided
// function to publish a new one.
//
// Returns the time at which the checkpoint is published, or was last published.
// Returns the time at which publishCheckpoint should be called again.
//
// A checkpoint will not be published if either:
// - the currently published checkpoint was published less than minStaleActive ago
Expand All @@ -1117,7 +1117,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi
return otel.Trace(ctx, "tessera.storage.gcp.publishCheckpoint", tracer, func(ctx context.Context, span trace.Span) (time.Time, error) {
// outcomeAttr records the outcome of the checkpoint publication attempt for metrics and traces.
var outcomeAttr attribute.KeyValue
var pubAt time.Time
var nextPubAt time.Time
start := time.Now()

currentSize, rootHash, err := s.currentTree(ctx)
Expand All @@ -1136,6 +1136,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi
return fmt.Errorf("failed to read PubCoord: %w", err)
}
var lastSize spanner.NullInt64
var pubAt time.Time
if err := pRow.Columns(&pubAt, &lastSize); err != nil {
return fmt.Errorf("failed to parse PubCoord: %v", err)
}
Expand All @@ -1150,6 +1151,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi
if cpAge < minStaleActive {
slog.DebugContext(ctx, "publishCheckpoint: last checkpoint published too recently, not publishing new checkpoint", slog.Duration("cpAge", cpAge), slog.Duration("minStaleActive", minStaleActive))
outcomeAttr = outcomeTypeKey.String("skipped")
nextPubAt = pubAt.Add(minStaleActive)
return nil
}

Expand All @@ -1166,6 +1168,10 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi
if !shouldPublish {
slog.DebugContext(ctx, "publishCheckpoint: skipping publish because tree hasn't grown and previous checkpoint is too recent")
outcomeAttr = outcomeTypeKey.String("skipped_no_growth")
nextPubAt = start.Add(minStaleActive)
if minStaleRepub > 0 && nextPubAt.After(pubAt.Add(minStaleRepub)) {
nextPubAt = pubAt.Add(minStaleRepub)
}
return nil
}

Expand All @@ -1177,6 +1183,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi
}
span.AddEvent("Updating PubCoord")
pubAt = time.Now()
nextPubAt = pubAt.Add(minStaleActive)
if err := txn.BufferWrite([]*spanner.Mutation{spanner.Update("PubCoord", []string{"id", "publishedAt", "size"}, []any{0, pubAt, int64(currentSize)})}); err != nil {
return err
}
Expand All @@ -1190,7 +1197,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi
opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("publishCheckpoint")))
span.SetAttributes(outcomeAttr)
publishCount.Add(ctx, 1, metric.WithAttributes(outcomeAttr))
return pubAt, nil
return nextPubAt, nil
})
}

Expand Down
18 changes: 17 additions & 1 deletion storage/gcp/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,12 @@ func TestPublishTree(t *testing.T) {
republishInterval: 2 * time.Second,
attempts: []time.Duration{1500 * time.Millisecond, 2500 * time.Millisecond},
wantUpdates: 1,
}, {
name: "publish: no growth; republish: disabled",
publishInterval: 100 * time.Millisecond,
republishInterval: 0,
attempts: []time.Duration{100 * time.Millisecond, 100 * time.Millisecond},
wantUpdates: 0,
},
} {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -518,6 +524,7 @@ func TestPublishTree(t *testing.T) {
t.Fatalf("storage.init: %v", err)
}

pubAt := time.Now() // Good approximation of the checkpoint's future publishedAt.
if _, err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint); err != nil {
t.Fatalf("publishTree: %v", err)
}
Expand All @@ -528,15 +535,24 @@ func TestPublishTree(t *testing.T) {
updatesSeen := 0
for _, d := range test.attempts {
time.Sleep(d)
if _, err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint); err != nil {
nextPubAt, err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint)
notAfter := time.Now().Add(test.publishInterval)
if err != nil {
t.Fatalf("publishTree: %v", err)
}
if nextPubAt.Before(pubAt.Add(test.publishInterval)) {
t.Errorf("nextPubAt = %v, want larger than %v", nextPubAt, pubAt.Add(test.publishInterval))
}
if nextPubAt.After(notAfter) {
t.Errorf("nextPubAt = %v, want smaller than %v", nextPubAt, notAfter)
}
cpNew, _, err := m.getObject(ctx, layout.CheckpointPath)
if err != nil {
t.Fatalf("getObject: %v", err)
}
if !bytes.Equal(cpOld, cpNew) {
updatesSeen++
pubAt = time.Now()
cpOld = cpNew
}
}
Expand Down
28 changes: 17 additions & 11 deletions storage/posix/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ func (a *appender) publishCheckpointJob(ctx context.Context, pubInterval, republ
if err := otel.TraceErr(ctx, "tessera.storage.posix.publishCheckpointJob", tracer, func(ctx context.Context, span trace.Span) error {
ctx, cancel := context.WithTimeout(ctx, defaultPublicationTimeout)
defer cancel()
publishedAt, err := a.publishCheckpoint(ctx, pubInterval, republishInterval)
nextPub, err := a.publishCheckpoint(ctx, pubInterval, republishInterval)
if err != nil {
return err
}
// Schedule a checkpoint update immediately, if an updated is due.
t.Reset(max(time.Millisecond, pubInterval-time.Since(publishedAt)))
t.Reset(max(time.Millisecond, time.Until(nextPub)))
return nil
}, trace.WithAttributes(otel.PeriodicKey.Bool(true))); err != nil {
t.Reset(pubInterval)
Expand Down Expand Up @@ -675,10 +675,10 @@ func (s *Storage) readTreeState(ctx context.Context) (uint64, []byte, error) {
// minStaleness old, and, if so, creates and published a fresh checkpoint from the current
// stored tree state.
//
// Returns the time at which the checkpoint is published, or was last published.
func (a *appender) publishCheckpoint(ctx context.Context, minStalenessActive, minStalenessRepub time.Duration) (publishedAt time.Time, errR error) {
// Returns the time at which publishCheckpoint should be called again.
func (a *appender) publishCheckpoint(ctx context.Context, minStalenessActive, minStalenessRepub time.Duration) (nextPubAt time.Time, errR error) {
return otel.Trace(ctx, "tessera.storage.posix.publishCheckpoint", tracer, func(ctx context.Context, span trace.Span) (time.Time, error) {
now := time.Now()
start := time.Now()
defer func() {
// Detect any errors and update metrics accordingly.
// Non-error cases are explicitly handled in the body of the function below.
Expand All @@ -701,19 +701,21 @@ func (a *appender) publishCheckpoint(ctx context.Context, minStalenessActive, mi
var publishedAge time.Duration
var publishedSize uint64
cpExists := true
var pubAt time.Time
info, err := a.s.stat(layout.CheckpointPath)
if errors.Is(err, os.ErrNotExist) {
slog.DebugContext(ctx, "No checkpoint exists, publishing")
cpExists = false
} else if err != nil {
return time.Time{}, fmt.Errorf("stat(%s): %v", layout.CheckpointPath, err)
} else {
publishedAt = info.ModTime()
pubAt = info.ModTime()
publishedAge = time.Since(info.ModTime())
if publishedAge < minStalenessActive {
slog.DebugContext(ctx, "publishCheckpoint: skipping publish because previous checkpoint too fresh", slog.Duration("age", publishedAge), slog.Duration("minstalenessactive", minStalenessActive))
publishCount.Add(ctx, 1, metric.WithAttributes(errorTypeKey.String("skipped")))
return publishedAt, nil
nextPubAt = pubAt.Add(minStalenessActive)
return nextPubAt, nil
}
publishedSize, err = a.publishedSize(ctx)
if err != nil {
Expand All @@ -731,7 +733,11 @@ func (a *appender) publishCheckpoint(ctx context.Context, minStalenessActive, mi
if minStalenessRepub == 0 || publishedAge < minStalenessRepub {
slog.DebugContext(ctx, "publishCheckpoint: skipping publish because tree hasn't grown and previous checkpoint is too recent")
publishCount.Add(ctx, 1, metric.WithAttributes(errorTypeKey.String("skipped_no_growth")))
return publishedAt, nil
nextPubAt = start.Add(minStalenessActive)
if minStalenessRepub > 0 && nextPubAt.After(pubAt.Add(minStalenessRepub)) {
nextPubAt = pubAt.Add(minStalenessRepub)
}
return nextPubAt, nil
}
}

Expand All @@ -744,14 +750,14 @@ func (a *appender) publishCheckpoint(ctx context.Context, minStalenessActive, mi
return time.Time{}, fmt.Errorf("createOverwrite(%s): %v", layout.CheckpointPath, err)
}
// This is not the actual ModTime of the file, but it's close enough.
publishedAt = time.Now()
pubAt = time.Now()

slog.DebugContext(ctx, "Published latest checkpoint", slog.Uint64("size", size), slog.String("root", fmt.Sprintf("%x", root)))

posixOpsHistogram.Record(ctx, time.Since(now).Milliseconds(), metric.WithAttributes(opNameKey.String("publishCheckpoint")))
posixOpsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("publishCheckpoint")))
publishCount.Add(ctx, 1)

return publishedAt, nil
return pubAt.Add(minStalenessActive), nil
})
}

Expand Down
Loading
Loading