From 89633749dbe5dc0904f3bf088f35ff2c975465a8 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 9 Apr 2025 19:42:38 +0900 Subject: [PATCH 01/10] fix: add isLastRebootTimeAfter method Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 41b3ec6..99e5764 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "strconv" + "sync" "time" "github.com/civo/civogo" @@ -38,6 +39,7 @@ type watcher struct { apiURL string nodeDesiredGPUCount int rebootTimeWindowMinutes time.Duration + lastRebootTime sync.Map nodeSelector *metav1.LabelSelector } @@ -162,6 +164,10 @@ func (w *watcher) run(ctx context.Context) error { slog.Info("Skipping reboot because Ready/NotReady status was updated recently", "node", node.GetName()) continue } + if w.isLastRebootTimeAfter(node.GetName(), thresholdTime) { + slog.Info("Skipping reboot because Reboot operation was executed recently", "node", node.GetName()) + continue + } if err := w.rebootNode(node.GetName()); err != nil { slog.Error("Failed to reboot Node", "node", node.GetName(), "error", err) return fmt.Errorf("failed to reboot node: %w", err) @@ -193,6 +199,22 @@ func isReadyOrNotReadyStatusChangedAfter(node *corev1.Node, thresholdTime time.T return lastChangedTime.After(thresholdTime) } +// isLastRebootTimeAfter checks if the last reboot time for the specified node +// is after the given threshold time. In case of delays in reboot, the +// LastTransitionTime of node might not be updated, so it compares the latest reboot +// time to prevent sending reboot commands multiple times. +func (w *watcher) isLastRebootTimeAfter(nodeName string, thresholdTime time.Time) bool { + v, ok := w.lastRebootTime.Load(nodeName) + if ok { + return false + } + lastRebootTime, ok := v.(time.Time) + if !ok { + return false + } + return lastRebootTime.After(thresholdTime) +} + func isNodeReady(node *corev1.Node) bool { for _, cond := range node.Status.Conditions { if cond.Type == corev1.NodeReady { @@ -241,5 +263,6 @@ func (w *watcher) rebootNode(name string) error { return fmt.Errorf("failed to reboot instance, clusterID: %s, instanceID: %s: %w", w.clusterID, instance.ID, err) } slog.Info("Instance is rebooting", "instanceID", instance.ID, "node", name) + w.lastRebootTime.Store(name, time.Now()) return nil } From 9324efe6edba425b944b08683ee9db3af7b781a6 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 9 Apr 2025 19:45:09 +0900 Subject: [PATCH 02/10] feat: add NOTE comment Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 99e5764..a0d7fb1 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -203,6 +203,7 @@ func isReadyOrNotReadyStatusChangedAfter(node *corev1.Node, thresholdTime time.T // is after the given threshold time. In case of delays in reboot, the // LastTransitionTime of node might not be updated, so it compares the latest reboot // time to prevent sending reboot commands multiple times. +// NOTE: This is only effective when running with a single node-agent. If you want to run multiple instances, additional logic modifications will be required. func (w *watcher) isLastRebootTimeAfter(nodeName string, thresholdTime time.Time) bool { v, ok := w.lastRebootTime.Load(nodeName) if ok { From 4e405a763784a4fdef19d668ecf6163125945db4 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 9 Apr 2025 19:52:23 +0900 Subject: [PATCH 03/10] feat: add comment and log Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index a0d7fb1..24f7191 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -39,7 +39,9 @@ type watcher struct { apiURL string nodeDesiredGPUCount int rebootTimeWindowMinutes time.Duration - lastRebootTime sync.Map + + // NOTE: This is only effective when running with a single node-agent. If we want to run multiple instances, additional logic modifications will be required. + lastRebootTimes sync.Map nodeSelector *metav1.LabelSelector } @@ -203,16 +205,25 @@ func isReadyOrNotReadyStatusChangedAfter(node *corev1.Node, thresholdTime time.T // is after the given threshold time. In case of delays in reboot, the // LastTransitionTime of node might not be updated, so it compares the latest reboot // time to prevent sending reboot commands multiple times. -// NOTE: This is only effective when running with a single node-agent. If you want to run multiple instances, additional logic modifications will be required. +// NOTE: This is only effective when running with a single node-agent. If we want to run multiple instances, additional logic modifications will be required. func (w *watcher) isLastRebootTimeAfter(nodeName string, thresholdTime time.Time) bool { - v, ok := w.lastRebootTime.Load(nodeName) + v, ok := w.lastRebootTimes.Load(nodeName) if ok { + slog.Info("LastRebootTime not found", "node", nodeName) return false } lastRebootTime, ok := v.(time.Time) if !ok { + slog.Info("LastRebootTime is invalid, so it will be removed from the records", "node", nodeName, "value", v) + w.lastRebootTimes.Delete(nodeName) return false } + + slog.Info("Checking if Ready/NotReady status has changed recently", + "node", nodeName, + "lastRebootTime", lastRebootTime.String(), + "thresholdTime", thresholdTime.String()) + return lastRebootTime.After(thresholdTime) } @@ -264,6 +275,6 @@ func (w *watcher) rebootNode(name string) error { return fmt.Errorf("failed to reboot instance, clusterID: %s, instanceID: %s: %w", w.clusterID, instance.ID, err) } slog.Info("Instance is rebooting", "instanceID", instance.ID, "node", name) - w.lastRebootTime.Store(name, time.Now()) + w.lastRebootTimes.Store(name, time.Now()) return nil } From cd71a4d4a6babcc7baec938b12f1e6d504258b26 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 9 Apr 2025 20:02:09 +0900 Subject: [PATCH 04/10] fix: bugfix condition check Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 24f7191..3cbfcba 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -208,7 +208,7 @@ func isReadyOrNotReadyStatusChangedAfter(node *corev1.Node, thresholdTime time.T // NOTE: This is only effective when running with a single node-agent. If we want to run multiple instances, additional logic modifications will be required. func (w *watcher) isLastRebootTimeAfter(nodeName string, thresholdTime time.Time) bool { v, ok := w.lastRebootTimes.Load(nodeName) - if ok { + if !ok { slog.Info("LastRebootTime not found", "node", nodeName) return false } From 1fd59a74216c27cbd80d3768d339fb2d3f616426 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 9 Apr 2025 20:03:44 +0900 Subject: [PATCH 05/10] fix: log comment Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 3cbfcba..a62e96f 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -219,7 +219,7 @@ func (w *watcher) isLastRebootTimeAfter(nodeName string, thresholdTime time.Time return false } - slog.Info("Checking if Ready/NotReady status has changed recently", + slog.Info("Checking if LastRebootTime has changed recently", "node", nodeName, "lastRebootTime", lastRebootTime.String(), "thresholdTime", thresholdTime.String()) From 354a2fc22bee7ac664739eb33476169a88527287 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 9 Apr 2025 20:13:59 +0900 Subject: [PATCH 06/10] fix: variable name Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index a62e96f..2dbe0e6 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -41,7 +41,7 @@ type watcher struct { rebootTimeWindowMinutes time.Duration // NOTE: This is only effective when running with a single node-agent. If we want to run multiple instances, additional logic modifications will be required. - lastRebootTimes sync.Map + lastRebootCmdTimes sync.Map nodeSelector *metav1.LabelSelector } @@ -166,8 +166,8 @@ func (w *watcher) run(ctx context.Context) error { slog.Info("Skipping reboot because Ready/NotReady status was updated recently", "node", node.GetName()) continue } - if w.isLastRebootTimeAfter(node.GetName(), thresholdTime) { - slog.Info("Skipping reboot because Reboot operation was executed recently", "node", node.GetName()) + if w.isLastRebootCommandTimeAfter(node.GetName(), thresholdTime) { + slog.Info("Skipping reboot because Reboot command was executed recently", "node", node.GetName()) continue } if err := w.rebootNode(node.GetName()); err != nil { @@ -201,30 +201,30 @@ func isReadyOrNotReadyStatusChangedAfter(node *corev1.Node, thresholdTime time.T return lastChangedTime.After(thresholdTime) } -// isLastRebootTimeAfter checks if the last reboot time for the specified node +// isLastRebootCommandTimeAfter checks if the last reboot command time for the specified node // is after the given threshold time. In case of delays in reboot, the // LastTransitionTime of node might not be updated, so it compares the latest reboot -// time to prevent sending reboot commands multiple times. +// command time to prevent sending reboot commands multiple times. // NOTE: This is only effective when running with a single node-agent. If we want to run multiple instances, additional logic modifications will be required. -func (w *watcher) isLastRebootTimeAfter(nodeName string, thresholdTime time.Time) bool { - v, ok := w.lastRebootTimes.Load(nodeName) +func (w *watcher) isLastRebootCommandTimeAfter(nodeName string, thresholdTime time.Time) bool { + v, ok := w.lastRebootCmdTimes.Load(nodeName) if !ok { - slog.Info("LastRebootTime not found", "node", nodeName) + slog.Info("LastRebootCommandTime not found", "node", nodeName) return false } - lastRebootTime, ok := v.(time.Time) + lastRebootCmdTime, ok := v.(time.Time) if !ok { - slog.Info("LastRebootTime is invalid, so it will be removed from the records", "node", nodeName, "value", v) - w.lastRebootTimes.Delete(nodeName) + slog.Info("LastRebootCommandTime is invalid, so it will be removed from the records", "node", nodeName, "value", v) + w.lastRebootCmdTimes.Delete(nodeName) return false } - slog.Info("Checking if LastRebootTime has changed recently", + slog.Info("Checking if LastRebootCommandTime has changed recently", "node", nodeName, - "lastRebootTime", lastRebootTime.String(), + "lastRebootCommandTime", lastRebootCmdTime.String(), "thresholdTime", thresholdTime.String()) - return lastRebootTime.After(thresholdTime) + return lastRebootCmdTime.After(thresholdTime) } func isNodeReady(node *corev1.Node) bool { @@ -275,6 +275,6 @@ func (w *watcher) rebootNode(name string) error { return fmt.Errorf("failed to reboot instance, clusterID: %s, instanceID: %s: %w", w.clusterID, instance.ID, err) } slog.Info("Instance is rebooting", "instanceID", instance.ID, "node", name) - w.lastRebootTimes.Store(name, time.Now()) + w.lastRebootCmdTimes.Store(name, time.Now()) return nil } From 66e20dc75934ca9c1fcf06611cde9c3e68a407dc Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 9 Apr 2025 20:58:54 +0900 Subject: [PATCH 07/10] feat: add test case Signed-off-by: hlts2 --- pkg/watcher/watcher_test.go | 82 +++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/pkg/watcher/watcher_test.go b/pkg/watcher/watcher_test.go index 147c259..f9c3fc2 100644 --- a/pkg/watcher/watcher_test.go +++ b/pkg/watcher/watcher_test.go @@ -600,6 +600,88 @@ func TestIsReadyOrNotReadyStatusChangedAfter(t *testing.T) { } } +func TestIsLastRebootCommandTimeAfter(t *testing.T) { + type test struct { + name string + nodeName string + opts []Option + thresholdTime time.Time + beforeFunc func(*watcher) + want bool + } + + tests := []test{ + { + name: "Return true when last reboot command time is after threshold", + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + }, + nodeName: "node-01", + thresholdTime: time.Now().Add(-time.Hour), + beforeFunc: func(w *watcher) { + w.lastRebootCmdTimes.Store("node-01", time.Now()) + }, + want: true, + }, + { + name: "Return false when last reboot command time is before threshold", + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + }, + nodeName: "node-01", + thresholdTime: time.Now().Add(-time.Hour), + beforeFunc: func(w *watcher) { + w.lastRebootCmdTimes.Store("nodde-01", time.Now().Add(-2*time.Hour)) + }, + want: false, + }, + { + name: "Return false when last reboot command time not found", + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + }, + nodeName: "node-01", + thresholdTime: time.Now().Add(-time.Hour), + want: false, + }, + { + name: "Return false when type of last reboot command time is invalid", + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + }, + nodeName: "node-01", + thresholdTime: time.Now().Add(-time.Hour), + beforeFunc: func(w *watcher) { + w.lastRebootCmdTimes.Store("nodde-01", "invalid-type") + }, + want: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + w, err := NewWatcher(t.Context(), + testApiURL, testApiKey, testRegion, testClusterID, testNodePoolID, test.opts...) + if err != nil { + t.Fatal(err) + } + + obj := w.(*watcher) + if test.beforeFunc != nil { + test.beforeFunc(obj) + } + got := obj.isLastRebootCommandTimeAfter(test.nodeName, test.thresholdTime) + if got != test.want { + t.Errorf("got = %v, want %v", got, test.want) + } + }) + } +} + func TestIsNodeReady(t *testing.T) { type test struct { name string From 7a81d45f43bd6a717b27729eaa60f04ab6b80f4e Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 9 Apr 2025 21:08:28 +0900 Subject: [PATCH 08/10] feat: add test case for isLastRebootCommandTimeAfter method Signed-off-by: hlts2 --- pkg/watcher/watcher_test.go | 44 +++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/pkg/watcher/watcher_test.go b/pkg/watcher/watcher_test.go index f9c3fc2..c69d17c 100644 --- a/pkg/watcher/watcher_test.go +++ b/pkg/watcher/watcher_test.go @@ -369,6 +369,50 @@ func TestRun(t *testing.T) { t.Helper() client := w.client.(*fake.Clientset) + w.lastRebootCmdTimes.Store("node-01", time.Now()) + + nodes := &corev1.NodeList{ + Items: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-01", + Labels: map[string]string{ + nodePoolLabelKey: testNodePoolID, + }, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + }, + Allocatable: corev1.ResourceList{ + gpuResourceName: resource.MustParse("8"), + }, + }, + }, + }, + } + client.Fake.PrependReactor("list", "nodes", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nodes, nil + }) + }, + }, + { + name: "Returns nil and skips reboot when GPU count matches desired but node is not ready, and LastRebootCmdTime is more recent than thresholdTime", + args: args{ + opts: []Option{ + WithKubernetesClient(fake.NewSimpleClientset()), + WithCivoClient(&FakeClient{}), + WithDesiredGPUCount(testNodeDesiredGPUCount), + }, + nodePoolID: testNodePoolID, + }, + beforeFunc: func(w *watcher) { + t.Helper() + client := w.client.(*fake.Clientset) + nodes := &corev1.NodeList{ Items: []corev1.Node{ { From e34999add8fe36d3eca7872cf39f6ca6111b76d6 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 10 Apr 2025 16:26:52 +0900 Subject: [PATCH 09/10] fix: add comment based on feedback Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 2dbe0e6..a1b1e47 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -161,6 +161,13 @@ func (w *watcher) run(ctx context.Context) error { for _, node := range nodes.Items { if !isNodeDesiredGPU(&node, w.nodeDesiredGPUCount) || !isNodeReady(&node) { + + // LTT: LastTransitionTime of node. + // LRCT: LastRebootCmdTimes + // - LTT > 60 , LRCT < 60 dont reboot + // - LTT < 60 , LRCT < 60 dont reboot + // - LTT < 60 , LRCT > 60 dont reboot + // - LTT > 60, LRCT >. 60 reboot slog.Info("Node is not ready, attempting to reboot", "node", node.GetName()) if isReadyOrNotReadyStatusChangedAfter(&node, thresholdTime) { slog.Info("Skipping reboot because Ready/NotReady status was updated recently", "node", node.GetName()) From 44c44cd475e7dc6205969a8403a3f5e44309c21b Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 10 Apr 2025 16:30:26 +0900 Subject: [PATCH 10/10] fix: fix comment Signed-off-by: hlts2 --- pkg/watcher/watcher.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index a1b1e47..5f28054 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -162,8 +162,9 @@ func (w *watcher) run(ctx context.Context) error { for _, node := range nodes.Items { if !isNodeDesiredGPU(&node, w.nodeDesiredGPUCount) || !isNodeReady(&node) { - // LTT: LastTransitionTime of node. + // LTT: LastTransitionTime of node. // LRCT: LastRebootCmdTimes + // 60: Threshold time (example) // - LTT > 60 , LRCT < 60 dont reboot // - LTT < 60 , LRCT < 60 dont reboot // - LTT < 60 , LRCT > 60 dont reboot