-
Notifications
You must be signed in to change notification settings - Fork 14
pfp: recorder: add maxSize and coalescing options
#99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| /* | ||
| * Copyright 2025 Red Hat, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package record | ||
|
|
||
| import "time" | ||
|
|
||
| type Option func(*Recorder) | ||
|
|
||
| func WithNodeCapacity(nodeCapacity int) Option { | ||
| return func(rec *Recorder) { | ||
| rec.nodeCapacity = nodeCapacity | ||
| } | ||
| } | ||
|
|
||
| func WithNodeTimestamper(tsr func() time.Time) Option { | ||
| return func(rec *Recorder) { | ||
| rec.timestamper = tsr | ||
| } | ||
| } | ||
|
|
||
| func WithMaxNodes(maxNodes int) Option { | ||
| return func(rec *Recorder) { | ||
| rec.maxNodes = maxNodes | ||
| } | ||
| } | ||
|
|
||
| func WithPFPCoalescing(val bool) Option { | ||
| return func(rr *Recorder) { | ||
| rr.coalesceLast = val | ||
| } | ||
| } | ||
|
|
||
| func WithMaxSizePerNode(maxSize int) Option { | ||
| return func(rr *Recorder) { | ||
| rr.maxSize = maxSize | ||
| } | ||
| } | ||
|
|
||
| type NodeOption func(*NodeRecorder) | ||
|
|
||
| func WithCapacity(capacity int) NodeOption { | ||
| return func(nr *NodeRecorder) { | ||
| nr.capacity = capacity | ||
| } | ||
| } | ||
|
|
||
| func WithTimestamper(tsr func() time.Time) NodeOption { | ||
| return func(nr *NodeRecorder) { | ||
| nr.timestamper = tsr | ||
| } | ||
| } | ||
|
|
||
| func WithCoalescing(val bool) NodeOption { | ||
| return func(nr *NodeRecorder) { | ||
| nr.coalesceLast = val | ||
| } | ||
| } | ||
|
|
||
| func WithMaxSize(maxSize int) NodeOption { | ||
| return func(nr *NodeRecorder) { | ||
| nr.maxSize = maxSize | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,8 @@ package record | |
|
|
||
| import ( | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "time" | ||
|
|
||
| "github.com/k8stopologyawareschedwg/podfingerprint" | ||
|
|
@@ -40,6 +42,13 @@ type RecordedStatus struct { | |
| podfingerprint.Status | ||
| // RecordTime is a timestamp of when the RecordedStatus was added to the record | ||
| RecordTime time.Time `json:"recordTime"` | ||
| // statusSize is approximate size of the string representation of the status in bytes | ||
| statusSize int `json:"-"` | ||
| } | ||
|
|
||
| func (rs RecordedStatus) Size() int { | ||
| byteCount, _ := fmt.Fprint(io.Discard, fmt.Sprintf("%+v\n%+v", rs.Pods, rs.RecordTime)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please remove this
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you mean that we should replace it with calculating that ourselves like you originally did? I thought go fmt provides bytesize for free so why not to use it if we are dumping the output to |
||
| return byteCount | ||
| } | ||
|
|
||
| func (rs RecordedStatus) Equal(x RecordedStatus) bool { | ||
|
|
@@ -52,24 +61,13 @@ type Timestamper func() time.Time | |
| // NodeRecorder stores all the recorded statuses for a given node name. | ||
| // Statuses belonging to different nodes won't be accepted. | ||
| type NodeRecorder struct { | ||
| timestamper func() time.Time | ||
| nodeName string // shortcut | ||
| capacity int | ||
| statuses []RecordedStatus | ||
| } | ||
|
|
||
| type NodeOption func(*NodeRecorder) | ||
|
|
||
| func WithCapacity(capacity int) NodeOption { | ||
| return func(nr *NodeRecorder) { | ||
| nr.capacity = capacity | ||
| } | ||
| } | ||
|
|
||
| func WithTimestamper(tsr func() time.Time) NodeOption { | ||
| return func(nr *NodeRecorder) { | ||
| nr.timestamper = tsr | ||
| } | ||
| timestamper func() time.Time | ||
| nodeName string // shortcut | ||
| capacity int | ||
| maxSize int | ||
| size int | ||
| statuses []RecordedStatus | ||
| coalesceLast bool | ||
| } | ||
|
|
||
| // NewNodeRecorder creates a new recorder for the given node with the given capacity. | ||
|
|
@@ -101,14 +99,27 @@ func (nr *NodeRecorder) dropOldest() { | |
| if nr.Len() < 1 { | ||
| return | ||
| } | ||
| nr.size -= nr.statuses[0].Size() | ||
| nr.statuses = nr.statuses[1:] | ||
| } | ||
|
|
||
| func (nr *NodeRecorder) makeRoom() { | ||
| if nr.Len() < nr.Cap() { | ||
| func (nr *NodeRecorder) makeRoom(size int) { | ||
| if nr.capacity > 1 && nr.Len() == nr.Cap() { | ||
| nr.dropOldest() | ||
| } | ||
| if nr.maxSize == 0 { | ||
| return | ||
| } | ||
| nr.dropOldest() | ||
|
|
||
| if size >= nr.maxSize { | ||
| nr.size = 0 | ||
| nr.statuses = []RecordedStatus{} | ||
| return | ||
| } | ||
|
|
||
| for (nr.size + size) > nr.maxSize { | ||
| nr.dropOldest() | ||
| } | ||
| } | ||
|
|
||
| // Push adds a new Status to the record, evicting the oldest Status if necessary. | ||
|
|
@@ -123,16 +134,28 @@ func (nr *NodeRecorder) Push(st podfingerprint.Status) error { | |
| return ErrMismatchingNode | ||
| } | ||
| ts := nr.timestamper() | ||
|
|
||
| if nr.IsCoalescing() && nr.Len() != 0 { | ||
| lastItem := nr.statuses[nr.Len()-1] | ||
| if lastItem.FingerprintComputed == st.FingerprintComputed && lastItem.FingerprintExpected == st.FingerprintExpected { // pod list should not change | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| item := RecordedStatus{ | ||
| Status: st.Clone(), | ||
| Status: st, | ||
| RecordTime: ts, | ||
| } | ||
| item.statusSize = item.Size() | ||
| if nr.capacity == 1 { // handle common special case, avoid any resize | ||
| nr.statuses[0] = item | ||
| nr.size = item.Size() | ||
| // no maxSize constraint required | ||
| return nil | ||
| } | ||
| nr.makeRoom() | ||
| nr.makeRoom(item.statusSize) | ||
| nr.statuses = append(nr.statuses, item) | ||
| nr.size += item.Size() | ||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -146,38 +169,30 @@ func (nr *NodeRecorder) Cap() int { | |
| return nr.capacity | ||
| } | ||
|
|
||
| // Content() returns a shallow copy of all the recorded statuses. | ||
| // Content returns a shallow copy of all the recorded statuses. | ||
| func (nr *NodeRecorder) Content() []RecordedStatus { | ||
| return nr.statuses | ||
| } | ||
|
|
||
| // IsCoalescing returns true if the node recorder is configured to push statuses only if they are unique in PFPs | ||
| func (nr *NodeRecorder) IsCoalescing() bool { | ||
| return nr.coalesceLast | ||
| } | ||
|
|
||
| // MaxSize returns the maximum size allowed for nr | ||
| func (nr *NodeRecorder) MaxSize() int { | ||
| return nr.maxSize | ||
| } | ||
|
|
||
| // Recorder stores all the recorded statuses, dividing them by node name. | ||
| // There is a hard cap of how many nodes are managed, and how many Statuses are recorded per node. | ||
| type Recorder struct { | ||
| nodes map[string]*NodeRecorder | ||
| nodeCapacity int | ||
| maxNodes int | ||
| maxSize int | ||
| timestamper Timestamper | ||
| } | ||
|
|
||
| type Option func(*Recorder) | ||
|
|
||
| func WithNodeCapacity(nodeCapacity int) Option { | ||
| return func(rec *Recorder) { | ||
| rec.nodeCapacity = nodeCapacity | ||
| } | ||
| } | ||
|
|
||
| func WithNodeTimestamper(tsr func() time.Time) Option { | ||
| return func(rec *Recorder) { | ||
| rec.timestamper = tsr | ||
| } | ||
| } | ||
|
|
||
| func WithMaxNodes(maxNodes int) Option { | ||
| return func(rec *Recorder) { | ||
| rec.maxNodes = maxNodes | ||
| } | ||
| coalesceLast bool | ||
| } | ||
|
|
||
| // NewRecorder creates a new recorder up to the given node count, each with the given capacity. | ||
|
|
@@ -205,7 +220,7 @@ func NewRecorder(opts ...Option) (*Recorder, error) { | |
| return &rec, nil | ||
| } | ||
|
|
||
| // Cap returns the maximum nodes allowed in this Recorder | ||
| // MaxNodes returns the maximum nodes allowed in this Recorder | ||
| func (rr *Recorder) MaxNodes() int { | ||
| return rr.maxNodes | ||
| } | ||
|
|
@@ -238,6 +253,16 @@ func (rr *Recorder) Len() int { | |
| return tot | ||
| } | ||
|
|
||
| // IsCoalescing returns true if the recorder is configured to push statuses only if they are unique in PFPs | ||
| func (rr *Recorder) IsCoalescing() bool { | ||
| return rr.coalesceLast | ||
| } | ||
|
|
||
| // MaxSize returns the maximum size allowed per NodeRecorder in rr | ||
| func (rr *Recorder) MaxSize() int { | ||
| return rr.maxSize | ||
| } | ||
|
|
||
| // Push adds a new Status to the record for its node, evicting the oldest Status | ||
| // belonging to the same node if necessary. | ||
| // Per-node records are created lazily as needed, up to the configured maximum. | ||
|
|
@@ -257,7 +282,7 @@ func (rr *Recorder) Push(st podfingerprint.Status) error { | |
| } | ||
|
|
||
| if !ok { | ||
| nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity)) | ||
| nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity), WithMaxSize(rr.maxSize), WithCoalescing(rr.coalesceLast)) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
@@ -266,7 +291,7 @@ func (rr *Recorder) Push(st podfingerprint.Status) error { | |
| return nr.Push(st) | ||
| } | ||
|
|
||
| // Content() returns a shallow copy of all the recorded statuses, by node name. | ||
| // Content returns a shallow copy of all the recorded statuses, by node name. | ||
| func (rr *Recorder) Content() map[string][]RecordedStatus { | ||
| ret := make(map[string][]RecordedStatus, len(rr.nodes)) | ||
| for nodeName, nr := range rr.nodes { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.