Skip to content

Commit 41f345f

Browse files
ffromanishajmakh
authored andcommitted
pfp: recorder: implement maxSize option alongside maxCapacity
Allow to configure the NodeRecorder with maximum capacity to avoid extreme growth in recorded PFP statuses size. This is yet another sealing besides maxCapacity that the node recorder should not cross. The maxSize takes precesdence over maxCapacity, meaning as long as the current size of the nodeRecorder does not reach the maximum allowed, `statuses` slice length is maintained and is allowed to reach the maximum capacity. Once the new item pushed to the nodeRecorder is expected to make the recorder reach the max allowed size, `statuses` will witness reductions in older items to allow the new status to fit, under the constraint that node recorder will at least have one status no matter its size. So the boundary is important but the recorder goal should still be on top. Signed-off-by: Francesco Romani <fromani@redhat.com>
1 parent 3c46505 commit 41f345f

4 files changed

Lines changed: 160 additions & 15 deletions

File tree

pkg/pfpstatus/pfpstatus.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,16 @@ const (
4141
const (
4242
defaultMaxNodes = 5000
4343
defaultMaxSamplesPerNode = 10
44+
defaultMaxSizePerNode = 0 // no constraints
4445
defaultDumpPeriod = 10 * time.Second
4546
)
4647

4748
type StorageParams struct {
48-
Enabled bool
49-
Directory string
50-
Period time.Duration
51-
CoalesceLast bool
49+
Enabled bool
50+
Directory string
51+
Period time.Duration
52+
CoalesceLast bool
53+
MaxSizePerNode int
5254
}
5355

5456
type Params struct {
@@ -64,10 +66,11 @@ type environ struct {
6466
func DefaultParams() Params {
6567
return Params{
6668
Storage: StorageParams{
67-
Enabled: false,
68-
Directory: DefaultDumpDirectory,
69-
Period: 10 * time.Second,
70-
CoalesceLast: false,
69+
Enabled: false,
70+
Directory: DefaultDumpDirectory,
71+
Period: 10 * time.Second,
72+
CoalesceLast: false,
73+
MaxSizePerNode: defaultMaxSizePerNode,
7174
},
7275
}
7376
}
@@ -100,6 +103,7 @@ func Setup(logh logr.Logger, params Params) {
100103
rec, err := record.NewRecorder(
101104
record.WithMaxNodes(defaultMaxNodes),
102105
record.WithNodeCapacity(defaultMaxSamplesPerNode),
106+
record.WithMaxSizePerNode(defaultMaxSizePerNode),
103107
record.WithPFPCoalescing(params.Storage.CoalesceLast),
104108
)
105109
if err != nil {

pkg/pfpstatus/record/options.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ func WithPFPCoalescing(val bool) Option {
4444
}
4545
}
4646

47+
func WithMaxSizePerNode(maxSize int) Option {
48+
return func(rr *Recorder) {
49+
rr.maxSize = maxSize
50+
}
51+
}
52+
4753
type NodeOption func(*NodeRecorder)
4854

4955
func WithCapacity(capacity int) NodeOption {
@@ -63,3 +69,9 @@ func WithCoalescing(val bool) NodeOption {
6369
nr.coalesceLast = val
6470
}
6571
}
72+
73+
func WithMaxSize(maxSize int) NodeOption {
74+
return func(nr *NodeRecorder) {
75+
nr.maxSize = maxSize
76+
}
77+
}

pkg/pfpstatus/record/recorder.go

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package record
1818

1919
import (
2020
"errors"
21+
"fmt"
22+
"io"
2123
"time"
2224

2325
"github.com/k8stopologyawareschedwg/podfingerprint"
@@ -40,6 +42,13 @@ type RecordedStatus struct {
4042
podfingerprint.Status
4143
// RecordTime is a timestamp of when the RecordedStatus was added to the record
4244
RecordTime time.Time `json:"recordTime"`
45+
// statusSize is approximate size of the string representation of the status in bytes
46+
statusSize int `json:"-"`
47+
}
48+
49+
func (rs RecordedStatus) Size() int {
50+
byteCount, _ := fmt.Fprint(io.Discard, fmt.Sprintf("%+v\n%+v", rs.Pods, rs.RecordTime))
51+
return byteCount
4352
}
4453

4554
func (rs RecordedStatus) Equal(x RecordedStatus) bool {
@@ -55,6 +64,8 @@ type NodeRecorder struct {
5564
timestamper func() time.Time
5665
nodeName string // shortcut
5766
capacity int
67+
maxSize int
68+
size int
5869
statuses []RecordedStatus
5970
coalesceLast bool
6071
}
@@ -88,14 +99,27 @@ func (nr *NodeRecorder) dropOldest() {
8899
if nr.Len() < 1 {
89100
return
90101
}
102+
nr.size -= nr.statuses[0].Size()
91103
nr.statuses = nr.statuses[1:]
92104
}
93105

94-
func (nr *NodeRecorder) makeRoom() {
95-
if nr.Len() < nr.Cap() {
106+
func (nr *NodeRecorder) makeRoom(size int) {
107+
if nr.capacity > 1 && nr.Len() == nr.Cap() {
108+
nr.dropOldest()
109+
}
110+
if nr.maxSize == 0 {
111+
return
112+
}
113+
114+
if size >= nr.maxSize {
115+
nr.size = 0
116+
nr.statuses = []RecordedStatus{}
96117
return
97118
}
98-
nr.dropOldest()
119+
120+
for (nr.size + size) > nr.maxSize {
121+
nr.dropOldest()
122+
}
99123
}
100124

101125
// Push adds a new Status to the record, evicting the oldest Status if necessary.
@@ -119,15 +143,19 @@ func (nr *NodeRecorder) Push(st podfingerprint.Status) error {
119143
}
120144

121145
item := RecordedStatus{
122-
Status: st.Clone(),
146+
Status: st,
123147
RecordTime: ts,
124148
}
149+
item.statusSize = item.Size()
125150
if nr.capacity == 1 { // handle common special case, avoid any resize
126151
nr.statuses[0] = item
152+
nr.size = item.Size()
153+
// no maxSize constraint required
127154
return nil
128155
}
129-
nr.makeRoom()
156+
nr.makeRoom(item.statusSize)
130157
nr.statuses = append(nr.statuses, item)
158+
nr.size += item.Size()
131159
return nil
132160
}
133161

@@ -151,12 +179,18 @@ func (nr *NodeRecorder) IsCoalescing() bool {
151179
return nr.coalesceLast
152180
}
153181

182+
// MaxSize returns the maximum size allowed for nr
183+
func (nr *NodeRecorder) MaxSize() int {
184+
return nr.maxSize
185+
}
186+
154187
// Recorder stores all the recorded statuses, dividing them by node name.
155188
// There is a hard cap of how many nodes are managed, and how many Statuses are recorded per node.
156189
type Recorder struct {
157190
nodes map[string]*NodeRecorder
158191
nodeCapacity int
159192
maxNodes int
193+
maxSize int
160194
timestamper Timestamper
161195
coalesceLast bool
162196
}
@@ -186,7 +220,7 @@ func NewRecorder(opts ...Option) (*Recorder, error) {
186220
return &rec, nil
187221
}
188222

189-
// Cap returns the maximum nodes allowed in this Recorder
223+
// MaxNodes returns the maximum nodes allowed in this Recorder
190224
func (rr *Recorder) MaxNodes() int {
191225
return rr.maxNodes
192226
}
@@ -224,6 +258,11 @@ func (rr *Recorder) IsCoalescing() bool {
224258
return rr.coalesceLast
225259
}
226260

261+
// MaxSize returns the maximum size allowed per NodeRecorder in rr
262+
func (rr *Recorder) MaxSize() int {
263+
return rr.maxSize
264+
}
265+
227266
// Push adds a new Status to the record for its node, evicting the oldest Status
228267
// belonging to the same node if necessary.
229268
// Per-node records are created lazily as needed, up to the configured maximum.
@@ -243,7 +282,7 @@ func (rr *Recorder) Push(st podfingerprint.Status) error {
243282
}
244283

245284
if !ok {
246-
nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity), WithCoalescing(rr.coalesceLast))
285+
nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity), WithMaxSize(rr.maxSize), WithCoalescing(rr.coalesceLast))
247286
if err != nil {
248287
return err
249288
}

pkg/pfpstatus/record/recorder_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ package record
1919
import (
2020
"errors"
2121
"fmt"
22+
"io"
2223
"testing"
24+
"time"
2325

2426
"github.com/k8stopologyawareschedwg/podfingerprint"
2527
)
@@ -441,3 +443,91 @@ func TestRecorderWithCoalescing(t *testing.T) {
441443
t.Fatalf("unexpected status count")
442444
}
443445
}
446+
447+
func TestRecorderWithMaxSize(t *testing.T) {
448+
rr, err := NewRecorder(WithMaxNodes(8), WithNodeCapacity(5), WithMaxSizePerNode(1500))
449+
if err != nil {
450+
t.Fatalf("unexpected error: %v", err)
451+
}
452+
453+
if rr.MaxSize() != 1500 {
454+
t.Fatalf("unexpected max size: %d", rr.MaxSize())
455+
}
456+
}
457+
458+
func TestNodeRecorderWithMaxSize(t *testing.T) {
459+
rt := time.Now() // to manage accurate size checks
460+
rtf := func() time.Time {
461+
return rt
462+
}
463+
464+
st := podfingerprint.Status{
465+
FingerprintExpected: "pfp-exp-st1",
466+
FingerprintComputed: "pfp-comp-st1",
467+
NodeName: "node-0",
468+
}
469+
rs := RecordedStatus{
470+
Status: st,
471+
RecordTime: rt,
472+
}
473+
stSize := rs.Size()
474+
maxSize := 2*stSize + 1000
475+
476+
nr, err := NewNodeRecorder("node-0", WithCapacity(5), WithTimestamper(rtf), WithMaxSize(maxSize))
477+
if err != nil {
478+
t.Fatalf("unexpected error: %v", err)
479+
}
480+
if nr.MaxSize() != maxSize {
481+
t.Fatalf("unexpected max size: %d vs %d", nr.MaxSize(), maxSize)
482+
}
483+
484+
for i := 0; i < 2; i++ {
485+
if err := nr.Push(st); err != nil {
486+
t.Fatalf("unexpected error: %v", err)
487+
}
488+
}
489+
490+
stSize += rs.Size()
491+
if nr.Len() != 2 {
492+
t.Fatalf("unexpected length: %d", nr.Len())
493+
}
494+
if nr.size != stSize {
495+
t.Fatalf("unexpected size: %d vs %d", nr.size, stSize)
496+
}
497+
498+
var pods []podfingerprint.NamespacedName
499+
for i := 0; getSize(pods) < maxSize; i++ {
500+
pods = append(pods, podfingerprint.NamespacedName{
501+
Namespace: fmt.Sprintf("pod-name %d", i),
502+
Name: fmt.Sprintf("namespace-name %d", i),
503+
})
504+
}
505+
506+
rs2 := RecordedStatus{
507+
Status: podfingerprint.Status{
508+
FingerprintExpected: "pfp-exp-st2",
509+
FingerprintComputed: "pfp-comp-st2",
510+
Pods: pods,
511+
NodeName: st.NodeName,
512+
},
513+
RecordTime: rt,
514+
}
515+
516+
if err := nr.Push(rs2.Status); err != nil {
517+
t.Fatalf("unexpected error: %v", err)
518+
}
519+
520+
if nr.Len() != 1 {
521+
t.Fatalf("unexpected length: expected 1 found %d", nr.Len())
522+
}
523+
524+
if nr.size != rs2.Size() {
525+
t.Fatalf("unexpected size: expected %d found %d", rs2.Size(), nr.size)
526+
}
527+
}
528+
529+
func getSize(pods []podfingerprint.NamespacedName) int {
530+
byteCount, _ := fmt.Fprint(io.Discard, fmt.Sprintf("%+v\n", pods))
531+
return byteCount
532+
533+
}

0 commit comments

Comments
 (0)