Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions pkg/pfpstatus/pfpstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ const (
const (
defaultMaxNodes = 5000
defaultMaxSamplesPerNode = 10
defaultMaxSizePerNode = 0 // no constraints
defaultDumpPeriod = 10 * time.Second
)

type StorageParams struct {
Enabled bool
Directory string
Period time.Duration
Enabled bool
Directory string
Period time.Duration
CoalesceLast bool
MaxSizePerNode int
}

type Params struct {
Expand All @@ -63,9 +66,11 @@ type environ struct {
func DefaultParams() Params {
return Params{
Storage: StorageParams{
Enabled: false,
Directory: DefaultDumpDirectory,
Period: 10 * time.Second,
Enabled: false,
Directory: DefaultDumpDirectory,
Period: 10 * time.Second,
CoalesceLast: false,
MaxSizePerNode: defaultMaxSizePerNode,
},
}
}
Expand Down Expand Up @@ -95,7 +100,12 @@ func Setup(logh logr.Logger, params Params) {

logh.Info("Setup in progress", "params", fmt.Sprintf("%+#v", params))

rec, err := record.NewRecorder(record.WithMaxNodes(defaultMaxNodes), record.WithNodeCapacity(defaultMaxSamplesPerNode))
rec, err := record.NewRecorder(
record.WithMaxNodes(defaultMaxNodes),
record.WithNodeCapacity(defaultMaxSamplesPerNode),
record.WithMaxSizePerNode(defaultMaxSizePerNode),
record.WithPFPCoalescing(params.Storage.CoalesceLast),
)
if err != nil {
logh.Error(err, "cannot create a status recorder")
return
Expand Down
77 changes: 77 additions & 0 deletions pkg/pfpstatus/record/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2025 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package record

import "time"

type Option func(*Recorder)

func WithNodeCapacity(nodeCapacity int) Option {
return func(rec *Recorder) {
rec.nodeCapacity = nodeCapacity
}
}

func WithNodeTimestamper(tsr func() time.Time) Option {
return func(rec *Recorder) {
rec.timestamper = tsr
}
}

func WithMaxNodes(maxNodes int) Option {
return func(rec *Recorder) {
rec.maxNodes = maxNodes
}
}

func WithPFPCoalescing(val bool) Option {
return func(rr *Recorder) {
rr.coalesceLast = val
}
}

func WithMaxSizePerNode(maxSize int) Option {
return func(rr *Recorder) {
rr.maxSize = maxSize
}
}

type NodeOption func(*NodeRecorder)

func WithCapacity(capacity int) NodeOption {
return func(nr *NodeRecorder) {
nr.capacity = capacity
}
}

func WithTimestamper(tsr func() time.Time) NodeOption {
return func(nr *NodeRecorder) {
nr.timestamper = tsr
}
}

func WithCoalescing(val bool) NodeOption {
return func(nr *NodeRecorder) {
nr.coalesceLast = val
}
}

func WithMaxSize(maxSize int) NodeOption {
return func(nr *NodeRecorder) {
nr.maxSize = maxSize
}
}
119 changes: 72 additions & 47 deletions pkg/pfpstatus/record/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package record

import (
"errors"
"fmt"
"io"
"time"
Comment thread
shajmakh marked this conversation as resolved.

"github.com/k8stopologyawareschedwg/podfingerprint"
Expand All @@ -40,6 +42,13 @@ type RecordedStatus struct {
podfingerprint.Status
// RecordTime is a timestamp of when the RecordedStatus was added to the record
RecordTime time.Time `json:"recordTime"`
// statusSize is approximate size of the string representation of the status in bytes
statusSize int `json:"-"`
}

func (rs RecordedStatus) Size() int {
byteCount, _ := fmt.Fprint(io.Discard, fmt.Sprintf("%+v\n%+v", rs.Pods, rs.RecordTime))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove this

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean that we should replace it with calculating that ourselves like you originally did? I thought go fmt provides bytesize for free so why not to use it if we are dumping the output to io.Discard

return byteCount
}

func (rs RecordedStatus) Equal(x RecordedStatus) bool {
Expand All @@ -52,24 +61,13 @@ type Timestamper func() time.Time
// NodeRecorder stores all the recorded statuses for a given node name.
// Statuses belonging to different nodes won't be accepted.
type NodeRecorder struct {
timestamper func() time.Time
nodeName string // shortcut
capacity int
statuses []RecordedStatus
}

type NodeOption func(*NodeRecorder)

func WithCapacity(capacity int) NodeOption {
return func(nr *NodeRecorder) {
nr.capacity = capacity
}
}

func WithTimestamper(tsr func() time.Time) NodeOption {
return func(nr *NodeRecorder) {
nr.timestamper = tsr
}
timestamper func() time.Time
nodeName string // shortcut
capacity int
maxSize int
size int
statuses []RecordedStatus
coalesceLast bool
}

// NewNodeRecorder creates a new recorder for the given node with the given capacity.
Expand Down Expand Up @@ -101,14 +99,27 @@ func (nr *NodeRecorder) dropOldest() {
if nr.Len() < 1 {
return
}
nr.size -= nr.statuses[0].Size()
nr.statuses = nr.statuses[1:]
}

func (nr *NodeRecorder) makeRoom() {
if nr.Len() < nr.Cap() {
func (nr *NodeRecorder) makeRoom(size int) {
if nr.capacity > 1 && nr.Len() == nr.Cap() {
nr.dropOldest()
}
if nr.maxSize == 0 {
return
}
nr.dropOldest()

if size >= nr.maxSize {
nr.size = 0
nr.statuses = []RecordedStatus{}
return
}

for (nr.size + size) > nr.maxSize {
nr.dropOldest()
}
}

// Push adds a new Status to the record, evicting the oldest Status if necessary.
Expand All @@ -123,16 +134,28 @@ func (nr *NodeRecorder) Push(st podfingerprint.Status) error {
return ErrMismatchingNode
}
ts := nr.timestamper()

if nr.IsCoalescing() && nr.Len() != 0 {
lastItem := nr.statuses[nr.Len()-1]
if lastItem.FingerprintComputed == st.FingerprintComputed && lastItem.FingerprintExpected == st.FingerprintExpected { // pod list should not change
return nil
}
}

item := RecordedStatus{
Status: st.Clone(),
Status: st,
RecordTime: ts,
}
item.statusSize = item.Size()
if nr.capacity == 1 { // handle common special case, avoid any resize
nr.statuses[0] = item
nr.size = item.Size()
// no maxSize constraint required
return nil
}
nr.makeRoom()
nr.makeRoom(item.statusSize)
nr.statuses = append(nr.statuses, item)
nr.size += item.Size()
return nil
}

Expand All @@ -146,38 +169,30 @@ func (nr *NodeRecorder) Cap() int {
return nr.capacity
}

// Content() returns a shallow copy of all the recorded statuses.
// Content returns a shallow copy of all the recorded statuses.
func (nr *NodeRecorder) Content() []RecordedStatus {
return nr.statuses
}

// IsCoalescing returns true if the node recorder is configured to push statuses only if they are unique in PFPs
func (nr *NodeRecorder) IsCoalescing() bool {
return nr.coalesceLast
}

// MaxSize returns the maximum size allowed for nr
func (nr *NodeRecorder) MaxSize() int {
return nr.maxSize
}

// Recorder stores all the recorded statuses, dividing them by node name.
// There is a hard cap of how many nodes are managed, and how many Statuses are recorded per node.
type Recorder struct {
nodes map[string]*NodeRecorder
nodeCapacity int
maxNodes int
maxSize int
timestamper Timestamper
}

type Option func(*Recorder)

func WithNodeCapacity(nodeCapacity int) Option {
return func(rec *Recorder) {
rec.nodeCapacity = nodeCapacity
}
}

func WithNodeTimestamper(tsr func() time.Time) Option {
return func(rec *Recorder) {
rec.timestamper = tsr
}
}

func WithMaxNodes(maxNodes int) Option {
return func(rec *Recorder) {
rec.maxNodes = maxNodes
}
coalesceLast bool
}

// NewRecorder creates a new recorder up to the given node count, each with the given capacity.
Expand Down Expand Up @@ -205,7 +220,7 @@ func NewRecorder(opts ...Option) (*Recorder, error) {
return &rec, nil
}

// Cap returns the maximum nodes allowed in this Recorder
// MaxNodes returns the maximum nodes allowed in this Recorder
func (rr *Recorder) MaxNodes() int {
return rr.maxNodes
}
Expand Down Expand Up @@ -238,6 +253,16 @@ func (rr *Recorder) Len() int {
return tot
}

// IsCoalescing returns true if the recorder is configured to push statuses only if they are unique in PFPs
func (rr *Recorder) IsCoalescing() bool {
return rr.coalesceLast
}

// MaxSize returns the maximum size allowed per NodeRecorder in rr
func (rr *Recorder) MaxSize() int {
return rr.maxSize
}

// Push adds a new Status to the record for its node, evicting the oldest Status
// belonging to the same node if necessary.
// Per-node records are created lazily as needed, up to the configured maximum.
Expand All @@ -257,7 +282,7 @@ func (rr *Recorder) Push(st podfingerprint.Status) error {
}

if !ok {
nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity))
nr, err = NewNodeRecorder(st.NodeName, WithTimestamper(rr.timestamper), WithCapacity(rr.nodeCapacity), WithMaxSize(rr.maxSize), WithCoalescing(rr.coalesceLast))
if err != nil {
return err
}
Expand All @@ -266,7 +291,7 @@ func (rr *Recorder) Push(st podfingerprint.Status) error {
return nr.Push(st)
}

// Content() returns a shallow copy of all the recorded statuses, by node name.
// Content returns a shallow copy of all the recorded statuses, by node name.
func (rr *Recorder) Content() map[string][]RecordedStatus {
ret := make(map[string][]RecordedStatus, len(rr.nodes))
for nodeName, nr := range rr.nodes {
Expand Down
Loading