Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/sunlight/sunlight.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,11 @@ type LogConfig struct {
Cache string

// PoolSize is the maximum number of chains pending in the sequencing pool.
// Since the pool is sequenced every second, it works as a qps limit. If the
// pool is full, add-chain requests will be rejected with a 503. Zero means
// no limit.
// Since the pool is sequenced every Period, it works as a qps limit. If the
// pool is full, lower-priority entries will be evicted and replaced if
// possible, and otherwise add-chain requests will be rejected with a 503.
// Lower-priority entries are precertificates with NotBefore more than 48h
// in the past, or certificates with an SCT extension. Zero means no limit.
PoolSize int

// S3Region is the AWS region for the S3 bucket.
Expand Down
47 changes: 40 additions & 7 deletions internal/ctlog/ctlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ func computeCacheHash(Certificate []byte, IsPrecert bool, IssuerKeyHash [32]byte
type pool struct {
pendingLeaves []*PendingLogEntry
byHash map[cacheHash]waitEntryFunc
lowPriority map[int]func() // pendingLeaves idx => cancel func

// done is closed when the pool has been sequenced and
// the results below are ready.
Expand All @@ -605,18 +606,23 @@ type waitEntryFunc func(ctx context.Context) (*sunlight.LogEntry, error)

func newPool() *pool {
return &pool{
done: make(chan struct{}),
byHash: make(map[cacheHash]waitEntryFunc),
done: make(chan struct{}),
byHash: make(map[cacheHash]waitEntryFunc),
lowPriority: make(map[int]func()),
}
}

var errPoolFull = fmtErrorf("rate limited")
var errEvicted = fmtErrorf("evicted to make way for higher priority leaves")

// addLeafToPool adds leaf to the current pool, unless it is found in a
// deduplication cache. It returns a function that will wait until the pool is
// sequenced and return the sequenced leaf, as well as the source of the
// sequenced leaf (pool or cache if deduplicated, sequencer otherwise).
func (l *Log) addLeafToPool(ctx context.Context, leaf *PendingLogEntry) (f waitEntryFunc, source string) {
//
// Low priority entries might get evicted to make space for high priority ones,
// in which case the waitEntryFunc of the evicted entry will immediately return.
func (l *Log) addLeafToPool(ctx context.Context, leaf *PendingLogEntry, lowPriority bool) (f waitEntryFunc, source string) {
// We could marginally more efficiently do uploadIssuer after checking the
// caches, but it's simpler for the the block below to be under a single
// poolMu lock, and uploadIssuer goes to the network so we don't want to
Expand Down Expand Up @@ -656,16 +662,43 @@ func (l *Log) addLeafToPool(ctx context.Context, leaf *PendingLogEntry) (f waitE
}
n := len(p.pendingLeaves)
if l.c.PoolSize > 0 && n >= l.c.PoolSize {
return func(ctx context.Context) (*sunlight.LogEntry, error) {
return nil, errPoolFull
}, "ratelimit"
if lowPriority || len(p.lowPriority) == 0 {
return func(ctx context.Context) (*sunlight.LogEntry, error) {
return nil, errPoolFull
}, "ratelimit"
}
for nn, cancel := range p.lowPriority {
cancel()
delete(p.lowPriority, nn)
n = nn
p.pendingLeaves[n] = leaf
break
Comment thread
FiloSottile marked this conversation as resolved.
}
} else {
p.pendingLeaves = append(p.pendingLeaves, leaf)
}
var cancelChan chan struct{}
if lowPriority {
cancelChan = make(chan struct{})
p.lowPriority[n] = func() {
close(cancelChan)
}
}
p.pendingLeaves = append(p.pendingLeaves, leaf)
f = func(ctx context.Context) (*sunlight.LogEntry, error) {
select {
case <-ctx.Done():
return nil, fmtErrorf("context canceled while waiting for sequencing: %w", ctx.Err())
case <-cancelChan:
return nil, errEvicted
case <-p.done:
if err := ctx.Err(); err != nil {
return nil, fmtErrorf("context canceled while waiting for sequencing: %w", err)
}
select {
case <-cancelChan:
return nil, errEvicted
default:
}
if p.err != nil {
return nil, p.err
}
Expand Down
111 changes: 111 additions & 0 deletions internal/ctlog/ctlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,117 @@ func TestSequenceUploadPaths(t *testing.T) {
}
}

func TestRatelimit(t *testing.T) {
tl := NewEmptyTestLog(t)
tl.Config.PoolSize = 10

var pendingEvictions []ctlog.WaitEntryFunc
addCertificateExpectEvictionWithSeed := func(seed int64) string {
r := mathrand.New(mathrand.NewSource(seed))
e := &ctlog.PendingLogEntry{}
e.Certificate = make([]byte, r.Intn(4)+8)
r.Read(e.Certificate)
e.Issuers = chains[r.Intn(len(chains))]
f, source := tl.Log.AddLeafToPoolWithLowPriority(e)
pendingEvictions = append(pendingEvictions, f)
return source
}
addCertificateExpectEviction := func() {
addCertificateExpectEvictionWithSeed(mathrand.Int63())
}
checkEvictions := func() {
for _, f := range pendingEvictions {
_, err := f(t.Context())
if err != ctlog.ErrEvicted {
t.Errorf("got error %v, expected ErrEvicted", err)
}
}
pendingEvictions = nil
}

addLowPriorityExpectRatelimit := func() {
r := mathrand.New(mathrand.NewSource(mathrand.Int63()))
e := &ctlog.PendingLogEntry{}
e.Certificate = make([]byte, r.Intn(4)+8)
r.Read(e.Certificate)
e.Issuers = chains[r.Intn(len(chains))]
f, source := tl.Log.AddLeafToPoolWithLowPriority(e)
if source != "ratelimit" {
t.Errorf("got source %q, expected \"ratelimit\"", source)
}
if _, err := f(t.Context()); err != ctlog.ErrPoolFull {
t.Errorf("got error %v, expected ErrPoolFull", err)
}
}

// When the pool is full of high-priority entries, new entries are rejected.
for range 10 {
addCertificate(t, tl)
}
for range 10 {
addCertificateExpectFailure(t, tl)
}
for range 10 {
addLowPriorityExpectRatelimit()
}
fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog(10)

// When there are low-priority entries, high-priority entries cause them to
// be evicted.
for range 5 {
addCertificate(t, tl)
}
for range 3 {
addCertificateExpectEviction()
}
for range 2 {
addCertificate(t, tl)
}
addLowPriorityExpectRatelimit()
for range 3 {
addCertificate(t, tl)
}
addCertificateExpectFailure(t, tl)
// Evictions unblock before the sequencing.
checkEvictions()
fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog(20)

// If we were to wait to call the waitFunc until after sequencing, the
// evictions would still know they were evicted.
for range 5 {
addCertificateExpectEviction()
}
for range 10 {
addCertificate(t, tl)
}
fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog(30)
checkEvictions()

// If a low-priority entry is deduplicated (resubmitted to the same pool)
// and then evicted, both callers get ErrEvicted.
for range 5 {
addCertificate(t, tl)
}
seed := mathrand.Int63()
if source := addCertificateExpectEvictionWithSeed(seed); source != "sequencer" {
t.Errorf("got source %q, expected \"sequencer\"", source)
}
if source := addCertificateExpectEvictionWithSeed(seed); source != "pool" {
t.Errorf("got source %q, expected \"pool\"", source)
}
for range 4 {
addCertificate(t, tl)
}
// Evict the low-priority entry by filling the pool with high-priority ones.
addCertificate(t, tl)
checkEvictions()
fatalIfErr(t, tl.Log.Sequence())
tl.CheckLog(40)
}

func TestDuplicates(t *testing.T) {
t.Run("Certificates", func(t *testing.T) {
testDuplicates(t, addCertificateWithSeed)
Expand Down
13 changes: 11 additions & 2 deletions internal/ctlog/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,17 @@ import (
"filippo.io/sunlight"
)

func (l *Log) AddLeafToPool(e *PendingLogEntry) (waitEntryFunc, string) {
return l.addLeafToPool(context.Background(), e)
var ErrEvicted = errEvicted
var ErrPoolFull = errPoolFull

type WaitEntryFunc = waitEntryFunc

func (l *Log) AddLeafToPool(e *PendingLogEntry) (WaitEntryFunc, string) {
return l.addLeafToPool(context.Background(), e, false)
}

func (l *Log) AddLeafToPoolWithLowPriority(e *PendingLogEntry) (WaitEntryFunc, string) {
return l.addLeafToPool(context.Background(), e, true)
}

func (l *Log) Sequence() error {
Expand Down
26 changes: 22 additions & 4 deletions internal/ctlog/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (l *Log) addPreChain(rw http.ResponseWriter, r *http.Request) {

func (l *Log) addChainOrPreChain(ctx context.Context, reqBody io.ReadCloser, checkType func(*PendingLogEntry) error) (response []byte, code int, err error) {
labels := prometheus.Labels{"error": "", "issuer": "", "root": "", "reused": "",
"precert": "", "preissuer": "", "chain_len": "", "source": ""}
"precert": "", "preissuer": "", "chain_len": "", "low_priority": "", "source": ""}
defer func() {
if err != nil {
labels["error"] = errorCategory(err)
Expand Down Expand Up @@ -147,9 +147,11 @@ func (l *Log) addChainOrPreChain(ctx context.Context, reqBody io.ReadCloser, che
if err != nil {
return nil, http.StatusBadRequest, fmtErrorf("invalid chain: %w", err)
}
lowPriority := lowPriority(chain[0])
labels["chain_len"] = fmt.Sprintf("%d", len(chain))
labels["root"] = x509util.NameToString(chain[len(chain)-1].Subject)
labels["issuer"] = x509util.NameToString(chain[0].Issuer)
labels["low_priority"] = fmt.Sprintf("%v", lowPriority)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is printing a function value into a metric, should be converted to boolean 1/0 or a reason code (AlreadyHasSCT)

Copy link
Copy Markdown
Contributor

@mcpherrinm mcpherrinm Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is adding a label which to the labels set with a value of the boolean lowPriority, which will get you a separate time series.

The resulting metric looks like:

sunlight_addchain_requests_total{chain_len="3",error="",issuer="C=US, O=(STAGING) Let's Encrypt, CN=(STAGING) Mysterious Mulberry E8",log="navigli2026h1",low_priority="false",precert="true",preissuer="",reused="false",root="C=US, O=(STAGING) Internet Security Research Group, CN=(STAGING) Pretend Pear X1",source="sequencer"} 124136

Note the low_priority="false" label on the metric.

So this code is corrrect, though perhaps it's worth considering using a different variable name than the function name it will shadow

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I somehow thought that was a lookup in the pool map rather than recomputation of the low priority criteria


e := &PendingLogEntry{Certificate: chain[0].Raw}
for _, issuer := range chain[1:] {
Expand Down Expand Up @@ -195,14 +197,17 @@ func (l *Log) addChainOrPreChain(ctx context.Context, reqBody io.ReadCloser, che
return nil, http.StatusBadRequest, err
}

waitLeaf, source := l.addLeafToPool(ctx, e)
waitLeaf, source := l.addLeafToPool(ctx, e, lowPriority)
labels["source"] = source
waitTimer := prometheus.NewTimer(l.m.AddChainWait)
seq, err := waitLeaf(ctx)
if source == "sequencer" {
if source == "sequencer" && err != errEvicted {
waitTimer.ObserveDuration()
}
if err == errPoolFull {
if err == errEvicted {
labels["source"] = "evicted"
}
if err == errPoolFull || err == errEvicted {
return nil, http.StatusServiceUnavailable, err
} else if errors.As(err, new(SunsetLogError)) {
return nil, http.StatusGone, err
Expand Down Expand Up @@ -236,6 +241,19 @@ func (l *Log) addChainOrPreChain(ctx context.Context, reqBody io.ReadCloser, che
return rsp, http.StatusOK, nil
}

func lowPriority(c *x509.Certificate) bool {
if isPrecert, _ := ctfe.IsPrecertificate(c); isPrecert {
// The BRs allow at most 48 hours of backdating. A precertificate older
// than that can't turn into a valid certificate anymore, so it must be
// cross-posted.
return time.Since(c.NotBefore) >= 48*time.Hour
}
// If a certificate has SCTs, it's already been logged. It'd be better to
// verify the signatures, but this check is meant for when we are under load
// and need to prioritize.
return len(c.SCTList.SCTList) > 0
}

func (l *Log) getRoots(rw http.ResponseWriter, r *http.Request) {
roots := l.rootPool().RawCertificates()
var res struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/ctlog/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func initMetrics() metrics {
Name: "addchain_requests_total",
Help: "Number of add-[pre-]chain requests, by chain characteristics and errors if any.",
},
[]string{"error", "issuer", "root", "precert", "preissuer", "chain_len", "source", "reused"},
[]string{"error", "issuer", "root", "precert", "preissuer", "chain_len", "low_priority", "source", "reused"},
),
AddChainWait: prometheus.NewSummary(
prometheus.SummaryOpts{
Expand Down
Loading