diff --git a/config/manager_config.go b/config/manager_config.go index 9144353..53a7e8e 100644 --- a/config/manager_config.go +++ b/config/manager_config.go @@ -79,6 +79,7 @@ type AccountConfig struct { type K8SConfig struct { KubeConfigPath string `mapstructure:"kube_config_path"` IsInCluster bool `mapstructure:"in_cluster"` + CRDNamespace string `mapstructure:"crd_namespace"` } var managerCfg *ManageConfig diff --git a/deployment/crds/schedulingintent-crd.yaml b/deployment/crds/schedulingintent-crd.yaml new file mode 100644 index 0000000..072812f --- /dev/null +++ b/deployment/crds/schedulingintent-crd.yaml @@ -0,0 +1,73 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: schedulingintents.gthulhu.io +spec: + group: gthulhu.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + strategyID: + type: string + podID: + type: string + podName: + type: string + nodeID: + type: string + k8sNamespace: + type: string + commandRegex: + type: string + priority: + type: integer + executionTime: + type: integer + format: int64 + podLabels: + type: object + additionalProperties: + type: string + state: + type: integer + creatorID: + type: string + updaterID: + type: string + createdTime: + type: integer + format: int64 + updatedTime: + type: integer + format: int64 + additionalPrinterColumns: + - name: Strategy + type: string + jsonPath: .spec.strategyID + - name: Pod + type: string + jsonPath: .spec.podName + - name: Node + type: string + jsonPath: .spec.nodeID + - name: State + type: integer + jsonPath: .spec.state + - name: Age + type: date + jsonPath: .metadata.creationTimestamp + scope: Namespaced + names: + plural: schedulingintents + singular: schedulingintent + kind: SchedulingIntent + shortNames: + - si diff --git a/deployment/crds/schedulingstrategy-crd.yaml b/deployment/crds/schedulingstrategy-crd.yaml new file mode 100644 index 0000000..5889a1f --- /dev/null +++ b/deployment/crds/schedulingstrategy-crd.yaml @@ -0,0 +1,66 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: schedulingstrategies.gthulhu.io +spec: + group: gthulhu.io + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + strategyNamespace: + type: string + labelSelectors: + type: array + items: + type: object + properties: + key: + type: string + value: + type: string + k8sNamespaces: + type: array + items: + type: string + commandRegex: + type: string + priority: + type: integer + executionTime: + type: integer + format: int64 + creatorID: + type: string + updaterID: + type: string + createdTime: + type: integer + format: int64 + updatedTime: + type: integer + format: int64 + additionalPrinterColumns: + - name: Priority + type: integer + jsonPath: .spec.priority + - name: Creator + type: string + jsonPath: .spec.creatorID + - name: Age + type: date + jsonPath: .metadata.creationTimestamp + scope: Namespaced + names: + plural: schedulingstrategies + singular: schedulingstrategy + kind: SchedulingStrategy + shortNames: + - ss diff --git a/deployment/rbac/manager-crd-rbac.yaml b/deployment/rbac/manager-crd-rbac.yaml new file mode 100644 index 0000000..1974e80 --- /dev/null +++ b/deployment/rbac/manager-crd-rbac.yaml @@ -0,0 +1,35 @@ +# ClusterRole granting the API Server (manager mode) full access to +# SchedulingStrategy and SchedulingIntent custom resources. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: gthulhu-manager-crd +rules: + - apiGroups: ["gthulhu.io"] + resources: ["schedulingstrategies", "schedulingintents"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] +--- +# Bind the ClusterRole to the manager ServiceAccount so that only the +# manager pods can manipulate scheduling CRs. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: gthulhu-manager-crd-binding +subjects: + - kind: ServiceAccount + name: manager + namespace: gthulhu-api-local +roleRef: + kind: ClusterRole + name: gthulhu-manager-crd + apiGroup: rbac.authorization.k8s.io +--- +# Read-only ClusterRole for monitoring / dashboards. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: gthulhu-crd-viewer +rules: + - apiGroups: ["gthulhu.io"] + resources: ["schedulingstrategies", "schedulingintents"] + verbs: ["get", "list", "watch"] diff --git a/manager/app/module.go b/manager/app/module.go index 454ca74..de4c16e 100644 --- a/manager/app/module.go +++ b/manager/app/module.go @@ -10,6 +10,10 @@ import ( "github.com/Gthulhu/api/manager/service" "github.com/Gthulhu/api/pkg/container" "go.uber.org/fx" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + dynamicfake "k8s.io/client-go/dynamic/fake" ) // ConfigModule creates an Fx module that provides configuration structs @@ -48,6 +52,12 @@ func AdapterModule() (fx.Option, error) { InCluster: k8sConfig.IsInCluster, }) }), + fx.Provide(func(k8sConfig config.K8SConfig) (dynamic.Interface, error) { + return k8sadapter.NewDynamicClient(k8sadapter.Options{ + KubeConfigPath: k8sConfig.KubeConfigPath, + InCluster: k8sConfig.IsInCluster, + }) + }), fx.Provide(client.NewDecisionMakerClient), ), nil } @@ -100,6 +110,18 @@ func TestRepoModule(cfg config.ManageConfig, containerBuilder *container.Contain } return fx.Options( configModule, + fx.Provide(NewFakeDynamicClient), fx.Provide(repository.NewRepository), ), nil } + +// NewFakeDynamicClient returns a fake Kubernetes dynamic client for testing. +func NewFakeDynamicClient() dynamic.Interface { + scheme := runtime.NewScheme() + return dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, + map[schema.GroupVersionResource]string{ + {Group: "gthulhu.io", Version: "v1alpha1", Resource: "schedulingstrategies"}: "SchedulingStrategyList", + {Group: "gthulhu.io", Version: "v1alpha1", Resource: "schedulingintents"}: "SchedulingIntentList", + }, + ) +} diff --git a/manager/domain/interface.go b/manager/domain/interface.go index af8250c..9b401bc 100644 --- a/manager/domain/interface.go +++ b/manager/domain/interface.go @@ -67,7 +67,7 @@ type Repository interface { BatchUpdateIntentsState(ctx context.Context, intentIDs []bson.ObjectID, newState IntentState) error QueryStrategies(ctx context.Context, opt *QueryStrategyOptions) error QueryIntents(ctx context.Context, opt *QueryIntentOptions) error - UpdateStrategy(ctx context.Context, strategyID bson.ObjectID, update bson.M) error + UpdateStrategy(ctx context.Context, strategy *ScheduleStrategy) error DeleteStrategy(ctx context.Context, strategyID bson.ObjectID) error DeleteIntents(ctx context.Context, intentIDs []bson.ObjectID) error DeleteIntentsByStrategyID(ctx context.Context, strategyID bson.ObjectID) error diff --git a/manager/domain/mock_domain.go b/manager/domain/mock_domain.go index 258cf2a..d8a00eb 100644 --- a/manager/domain/mock_domain.go +++ b/manager/domain/mock_domain.go @@ -443,69 +443,6 @@ func (_c *MockRepository_DeleteIntentsByStrategyID_Call) RunAndReturn(run func(c return _c } -// UpdateStrategy provides a mock function for the type MockRepository -func (_mock *MockRepository) UpdateStrategy(ctx context.Context, strategyID bson.ObjectID, update bson.M) error { - ret := _mock.Called(ctx, strategyID, update) - - if len(ret) == 0 { - panic("no return value specified for UpdateStrategy") - } - - var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, bson.ObjectID, bson.M) error); ok { - r0 = returnFunc(ctx, strategyID, update) - } else { - r0 = ret.Error(0) - } - return r0 -} - -// MockRepository_UpdateStrategy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateStrategy' -type MockRepository_UpdateStrategy_Call struct { - *mock.Call -} - -// UpdateStrategy is a helper method to define mock.On call -// - ctx context.Context -// - strategyID bson.ObjectID -// - update bson.M -func (_e *MockRepository_Expecter) UpdateStrategy(ctx interface{}, strategyID interface{}, update interface{}) *MockRepository_UpdateStrategy_Call { - return &MockRepository_UpdateStrategy_Call{Call: _e.mock.On("UpdateStrategy", ctx, strategyID, update)} -} - -func (_c *MockRepository_UpdateStrategy_Call) Run(run func(ctx context.Context, strategyID bson.ObjectID, update bson.M)) *MockRepository_UpdateStrategy_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 bson.ObjectID - if args[1] != nil { - arg1 = args[1].(bson.ObjectID) - } - var arg2 bson.M - if args[2] != nil { - arg2 = args[2].(bson.M) - } - run( - arg0, - arg1, - arg2, - ) - }) - return _c -} - -func (_c *MockRepository_UpdateStrategy_Call) Return(err error) *MockRepository_UpdateStrategy_Call { - _c.Call.Return(err) - return _c -} - -func (_c *MockRepository_UpdateStrategy_Call) RunAndReturn(run func(ctx context.Context, strategyID bson.ObjectID, update bson.M) error) *MockRepository_UpdateStrategy_Call { - _c.Call.Return(run) - return _c -} - // DeleteStrategy provides a mock function for the type MockRepository func (_mock *MockRepository) DeleteStrategy(ctx context.Context, strategyID bson.ObjectID) error { ret := _mock.Called(ctx, strategyID) @@ -1139,6 +1076,63 @@ func (_c *MockRepository_UpdateRole_Call) RunAndReturn(run func(ctx context.Cont return _c } +// UpdateStrategy provides a mock function for the type MockRepository +func (_mock *MockRepository) UpdateStrategy(ctx context.Context, strategy *ScheduleStrategy) error { + ret := _mock.Called(ctx, strategy) + + if len(ret) == 0 { + panic("no return value specified for UpdateStrategy") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, *ScheduleStrategy) error); ok { + r0 = returnFunc(ctx, strategy) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockRepository_UpdateStrategy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateStrategy' +type MockRepository_UpdateStrategy_Call struct { + *mock.Call +} + +// UpdateStrategy is a helper method to define mock.On call +// - ctx context.Context +// - strategy *ScheduleStrategy +func (_e *MockRepository_Expecter) UpdateStrategy(ctx interface{}, strategy interface{}) *MockRepository_UpdateStrategy_Call { + return &MockRepository_UpdateStrategy_Call{Call: _e.mock.On("UpdateStrategy", ctx, strategy)} +} + +func (_c *MockRepository_UpdateStrategy_Call) Run(run func(ctx context.Context, strategy *ScheduleStrategy)) *MockRepository_UpdateStrategy_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 *ScheduleStrategy + if args[1] != nil { + arg1 = args[1].(*ScheduleStrategy) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockRepository_UpdateStrategy_Call) Return(err error) *MockRepository_UpdateStrategy_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockRepository_UpdateStrategy_Call) RunAndReturn(run func(ctx context.Context, strategy *ScheduleStrategy) error) *MockRepository_UpdateStrategy_Call { + _c.Call.Return(run) + return _c +} + // UpdateUser provides a mock function for the type MockRepository func (_mock *MockRepository) UpdateUser(ctx context.Context, user *User) error { ret := _mock.Called(ctx, user) @@ -1550,75 +1544,6 @@ func (_c *MockService_CreateScheduleStrategy_Call) RunAndReturn(run func(ctx con return _c } -// UpdateScheduleStrategy provides a mock function for the type MockService -func (_mock *MockService) UpdateScheduleStrategy(ctx context.Context, operator *Claims, strategyID string, strategy *ScheduleStrategy) error { - ret := _mock.Called(ctx, operator, strategyID, strategy) - - if len(ret) == 0 { - panic("no return value specified for UpdateScheduleStrategy") - } - - var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, *Claims, string, *ScheduleStrategy) error); ok { - r0 = returnFunc(ctx, operator, strategyID, strategy) - } else { - r0 = ret.Error(0) - } - return r0 -} - -// MockService_UpdateScheduleStrategy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateScheduleStrategy' -type MockService_UpdateScheduleStrategy_Call struct { - *mock.Call -} - -// UpdateScheduleStrategy is a helper method to define mock.On call -// - ctx context.Context -// - operator *Claims -// - strategyID string -// - strategy *ScheduleStrategy -func (_e *MockService_Expecter) UpdateScheduleStrategy(ctx interface{}, operator interface{}, strategyID interface{}, strategy interface{}) *MockService_UpdateScheduleStrategy_Call { - return &MockService_UpdateScheduleStrategy_Call{Call: _e.mock.On("UpdateScheduleStrategy", ctx, operator, strategyID, strategy)} -} - -func (_c *MockService_UpdateScheduleStrategy_Call) Run(run func(ctx context.Context, operator *Claims, strategyID string, strategy *ScheduleStrategy)) *MockService_UpdateScheduleStrategy_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 *Claims - if args[1] != nil { - arg1 = args[1].(*Claims) - } - var arg2 string - if args[2] != nil { - arg2 = args[2].(string) - } - var arg3 *ScheduleStrategy - if args[3] != nil { - arg3 = args[3].(*ScheduleStrategy) - } - run( - arg0, - arg1, - arg2, - arg3, - ) - }) - return _c -} - -func (_c *MockService_UpdateScheduleStrategy_Call) Return(err error) *MockService_UpdateScheduleStrategy_Call { - _c.Call.Return(err) - return _c -} - -func (_c *MockService_UpdateScheduleStrategy_Call) RunAndReturn(run func(ctx context.Context, operator *Claims, strategyID string, strategy *ScheduleStrategy) error) *MockService_UpdateScheduleStrategy_Call { - _c.Call.Return(run) - return _c -} - // DeleteRole provides a mock function for the type MockService func (_mock *MockService) DeleteRole(ctx context.Context, operator *Claims, roleID string) error { ret := _mock.Called(ctx, operator, roleID) @@ -2484,6 +2409,75 @@ func (_c *MockService_UpdateRole_Call) RunAndReturn(run func(ctx context.Context return _c } +// UpdateScheduleStrategy provides a mock function for the type MockService +func (_mock *MockService) UpdateScheduleStrategy(ctx context.Context, operator *Claims, strategyID string, strategy *ScheduleStrategy) error { + ret := _mock.Called(ctx, operator, strategyID, strategy) + + if len(ret) == 0 { + panic("no return value specified for UpdateScheduleStrategy") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, *Claims, string, *ScheduleStrategy) error); ok { + r0 = returnFunc(ctx, operator, strategyID, strategy) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockService_UpdateScheduleStrategy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateScheduleStrategy' +type MockService_UpdateScheduleStrategy_Call struct { + *mock.Call +} + +// UpdateScheduleStrategy is a helper method to define mock.On call +// - ctx context.Context +// - operator *Claims +// - strategyID string +// - strategy *ScheduleStrategy +func (_e *MockService_Expecter) UpdateScheduleStrategy(ctx interface{}, operator interface{}, strategyID interface{}, strategy interface{}) *MockService_UpdateScheduleStrategy_Call { + return &MockService_UpdateScheduleStrategy_Call{Call: _e.mock.On("UpdateScheduleStrategy", ctx, operator, strategyID, strategy)} +} + +func (_c *MockService_UpdateScheduleStrategy_Call) Run(run func(ctx context.Context, operator *Claims, strategyID string, strategy *ScheduleStrategy)) *MockService_UpdateScheduleStrategy_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 *Claims + if args[1] != nil { + arg1 = args[1].(*Claims) + } + var arg2 string + if args[2] != nil { + arg2 = args[2].(string) + } + var arg3 *ScheduleStrategy + if args[3] != nil { + arg3 = args[3].(*ScheduleStrategy) + } + run( + arg0, + arg1, + arg2, + arg3, + ) + }) + return _c +} + +func (_c *MockService_UpdateScheduleStrategy_Call) Return(err error) *MockService_UpdateScheduleStrategy_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockService_UpdateScheduleStrategy_Call) RunAndReturn(run func(ctx context.Context, operator *Claims, strategyID string, strategy *ScheduleStrategy) error) *MockService_UpdateScheduleStrategy_Call { + _c.Call.Return(run) + return _c +} + // UpdateUserPermissions provides a mock function for the type MockService func (_mock *MockService) UpdateUserPermissions(ctx context.Context, operator *Claims, id string, opt UpdateUserPermissionsOptions) error { ret := _mock.Called(ctx, operator, id, opt) diff --git a/manager/k8s_adapter/dynamic_client.go b/manager/k8s_adapter/dynamic_client.go new file mode 100644 index 0000000..6f96769 --- /dev/null +++ b/manager/k8s_adapter/dynamic_client.go @@ -0,0 +1,20 @@ +package k8sadapter + +import ( + "time" + + "k8s.io/client-go/dynamic" +) + +// NewDynamicClient creates a Kubernetes dynamic client using the same +// configuration strategy as the pod informer (in-cluster or kubeconfig file). +func NewDynamicClient(opt Options) (dynamic.Interface, error) { + cfg, err := buildConfig(opt) + if err != nil { + return nil, err + } + cfg.Timeout = 10 * time.Second + cfg.QPS = 20 + cfg.Burst = 50 + return dynamic.NewForConfig(cfg) +} diff --git a/manager/repository/cr_strategy_repo.go b/manager/repository/cr_strategy_repo.go new file mode 100644 index 0000000..eaa49fe --- /dev/null +++ b/manager/repository/cr_strategy_repo.go @@ -0,0 +1,658 @@ +package repository + +import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "github.com/Gthulhu/api/manager/domain" + "go.mongodb.org/mongo-driver/v2/bson" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var ( + strategyGVR = schema.GroupVersionResource{ + Group: "gthulhu.io", + Version: "v1alpha1", + Resource: "schedulingstrategies", + } + intentGVR = schema.GroupVersionResource{ + Group: "gthulhu.io", + Version: "v1alpha1", + Resource: "schedulingintents", + } +) + +const ( + labelCreatorID = "gthulhu.io/creator-id" + labelStrategyID = "gthulhu.io/strategy-id" + labelState = "gthulhu.io/state" +) + +// --------------------------------------------------------------------------- +// Strategy CRUD +// --------------------------------------------------------------------------- + +func (r *repo) InsertStrategyAndIntents(ctx context.Context, strategy *domain.ScheduleStrategy, intents []*domain.ScheduleIntent) error { + if strategy == nil { + return errors.New("nil strategy") + } + if intents == nil { + return errors.New("nil intents") + } + now := time.Now().UnixMilli() + if strategy.ID.IsZero() { + strategy.ID = bson.NewObjectID() + } + if strategy.CreatedTime == 0 { + strategy.CreatedTime = now + } + strategy.UpdatedTime = now + + obj := domainStrategyToUnstructured(strategy, r.crNamespace) + created, err := r.k8sDynamic.Resource(strategyGVR).Namespace(r.crNamespace).Create(ctx, obj, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("create strategy CR: %w", err) + } + createdIntentNames := make([]string, 0, len(intents)) + // Assign the ID back from the created object name (may differ on retry). + if id, e := bson.ObjectIDFromHex(created.GetName()); e == nil { + strategy.ID = id + } + + for _, intent := range intents { + if intent.ID.IsZero() { + intent.ID = bson.NewObjectID() + } + intent.StrategyID = strategy.ID + if intent.CreatedTime == 0 { + intent.CreatedTime = now + } + if intent.UpdatedTime == 0 { + intent.UpdatedTime = now + } + intentObj := domainIntentToUnstructured(intent, r.crNamespace) + if _, err := r.k8sDynamic.Resource(intentGVR).Namespace(r.crNamespace).Create(ctx, intentObj, metav1.CreateOptions{}); err != nil { + rollbackErrs := make([]string, 0, len(createdIntentNames)+1) + for _, createdIntentName := range createdIntentNames { + delErr := r.k8sDynamic.Resource(intentGVR).Namespace(r.crNamespace).Delete(ctx, createdIntentName, metav1.DeleteOptions{}) + if delErr != nil && !k8serrors.IsNotFound(delErr) { + rollbackErrs = append(rollbackErrs, fmt.Sprintf("delete intent CR %s: %v", createdIntentName, delErr)) + } + } + delStrategyErr := r.k8sDynamic.Resource(strategyGVR).Namespace(r.crNamespace).Delete(ctx, strategy.ID.Hex(), metav1.DeleteOptions{}) + if delStrategyErr != nil && !k8serrors.IsNotFound(delStrategyErr) { + rollbackErrs = append(rollbackErrs, fmt.Sprintf("delete strategy CR %s: %v", strategy.ID.Hex(), delStrategyErr)) + } + + if len(rollbackErrs) > 0 { + return fmt.Errorf("create intent CR: %w; rollback errors: %s", err, strings.Join(rollbackErrs, "; ")) + } + return fmt.Errorf("create intent CR: %w", err) + } + createdIntentNames = append(createdIntentNames, intent.ID.Hex()) + } + return nil +} + +func (r *repo) InsertIntents(ctx context.Context, intents []*domain.ScheduleIntent) error { + if len(intents) == 0 { + return nil + } + now := time.Now().UnixMilli() + for _, intent := range intents { + if intent.ID.IsZero() { + intent.ID = bson.NewObjectID() + } + if intent.CreatedTime == 0 { + intent.CreatedTime = now + } + if intent.UpdatedTime == 0 { + intent.UpdatedTime = now + } + obj := domainIntentToUnstructured(intent, r.crNamespace) + if _, err := r.k8sDynamic.Resource(intentGVR).Namespace(r.crNamespace).Create(ctx, obj, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create intent CR: %w", err) + } + } + return nil +} + +func (r *repo) BatchUpdateIntentsState(ctx context.Context, intentIDs []bson.ObjectID, newState domain.IntentState) error { + now := time.Now().UnixMilli() + for _, id := range intentIDs { + name := id.Hex() + obj, err := r.k8sDynamic.Resource(intentGVR).Namespace(r.crNamespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + continue + } + return fmt.Errorf("get intent CR %s: %w", name, err) + } + spec, found, err := unstructured.NestedMap(obj.Object, "spec") + if err != nil { + return fmt.Errorf("read spec for intent CR %s: %w", name, err) + } + if !found { + return fmt.Errorf("spec not found for intent CR %s", name) + } + spec["state"] = int64(newState) + spec["updatedTime"] = now + if err := unstructured.SetNestedField(obj.Object, spec, "spec"); err != nil { + return err + } + // Update the state label for efficient filtering. + labels := obj.GetLabels() + if labels == nil { + labels = map[string]string{} + } + labels[labelState] = strconv.Itoa(int(newState)) + obj.SetLabels(labels) + + if _, err := r.k8sDynamic.Resource(intentGVR).Namespace(r.crNamespace).Update(ctx, obj, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("update intent CR %s: %w", name, err) + } + } + return nil +} + +func (r *repo) QueryStrategies(ctx context.Context, opt *domain.QueryStrategyOptions) error { + if opt == nil { + return errors.New("nil query options") + } + + // If specific IDs are requested, fetch each by name. + if len(opt.IDs) > 0 { + for _, id := range opt.IDs { + obj, err := r.k8sDynamic.Resource(strategyGVR).Namespace(r.crNamespace).Get(ctx, id.Hex(), metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + continue + } + return err + } + s, err := unstructuredToDomainStrategy(obj) + if err != nil { + return err + } + if matchesStrategyFilter(s, opt) { + opt.Result = append(opt.Result, s) + } + } + return nil + } + + // Build label selector for list queries. + sel := buildLabelSelector(opt.CreatorIDs, labelCreatorID) + list, err := r.k8sDynamic.Resource(strategyGVR).Namespace(r.crNamespace).List(ctx, metav1.ListOptions{LabelSelector: sel}) + if err != nil { + return err + } + for i := range list.Items { + s, err := unstructuredToDomainStrategy(&list.Items[i]) + if err != nil { + return err + } + if matchesStrategyFilter(s, opt) { + opt.Result = append(opt.Result, s) + } + } + return nil +} + +func (r *repo) QueryIntents(ctx context.Context, opt *domain.QueryIntentOptions) error { + if opt == nil { + return errors.New("nil query options") + } + + // Fetch by specific IDs when provided. + if len(opt.IDs) > 0 { + for _, id := range opt.IDs { + obj, err := r.k8sDynamic.Resource(intentGVR).Namespace(r.crNamespace).Get(ctx, id.Hex(), metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + continue + } + return err + } + intent, err := unstructuredToDomainIntent(obj) + if err != nil { + return err + } + if matchesIntentFilter(intent, opt) { + opt.Result = append(opt.Result, intent) + } + } + return nil + } + + // Build label selector for common filters. + selParts := []string{} + if s := buildLabelSelector(opt.CreatorIDs, labelCreatorID); s != "" { + selParts = append(selParts, s) + } + if s := buildLabelSelector(opt.StrategyIDs, labelStrategyID); s != "" { + selParts = append(selParts, s) + } + if s := buildStateLabelSelector(opt.States); s != "" { + selParts = append(selParts, s) + } + sel := strings.Join(selParts, ",") + + list, err := r.k8sDynamic.Resource(intentGVR).Namespace(r.crNamespace).List(ctx, metav1.ListOptions{LabelSelector: sel}) + if err != nil { + return err + } + for i := range list.Items { + intent, err := unstructuredToDomainIntent(&list.Items[i]) + if err != nil { + return err + } + if matchesIntentFilter(intent, opt) { + opt.Result = append(opt.Result, intent) + } + } + return nil +} + +func (r *repo) UpdateStrategy(ctx context.Context, strategy *domain.ScheduleStrategy) error { + if strategy == nil { + return errors.New("nil strategy") + } + obj := domainStrategyToUnstructured(strategy, r.crNamespace) + + // Preserve the existing resourceVersion for optimistic concurrency. + existing, err := r.k8sDynamic.Resource(strategyGVR).Namespace(r.crNamespace).Get(ctx, strategy.ID.Hex(), metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("get strategy CR for update: %w", err) + } + obj.SetResourceVersion(existing.GetResourceVersion()) + + _, err = r.k8sDynamic.Resource(strategyGVR).Namespace(r.crNamespace).Update(ctx, obj, metav1.UpdateOptions{}) + return err +} + +func (r *repo) DeleteStrategy(ctx context.Context, strategyID bson.ObjectID) error { + err := r.k8sDynamic.Resource(strategyGVR).Namespace(r.crNamespace).Delete(ctx, strategyID.Hex(), metav1.DeleteOptions{}) + if k8serrors.IsNotFound(err) { + return nil + } + return err +} + +func (r *repo) DeleteIntents(ctx context.Context, intentIDs []bson.ObjectID) error { + if len(intentIDs) == 0 { + return nil + } + for _, id := range intentIDs { + err := r.k8sDynamic.Resource(intentGVR).Namespace(r.crNamespace).Delete(ctx, id.Hex(), metav1.DeleteOptions{}) + if err != nil && !k8serrors.IsNotFound(err) { + return fmt.Errorf("delete intent CR %s: %w", id.Hex(), err) + } + } + return nil +} + +func (r *repo) DeleteIntentsByStrategyID(ctx context.Context, strategyID bson.ObjectID) error { + sel := labelStrategyID + "=" + strategyID.Hex() + list, err := r.k8sDynamic.Resource(intentGVR).Namespace(r.crNamespace).List(ctx, metav1.ListOptions{LabelSelector: sel}) + if err != nil { + return err + } + for _, item := range list.Items { + if err := r.k8sDynamic.Resource(intentGVR).Namespace(r.crNamespace).Delete(ctx, item.GetName(), metav1.DeleteOptions{}); err != nil && !k8serrors.IsNotFound(err) { + return fmt.Errorf("delete intent CR %s: %w", item.GetName(), err) + } + } + return nil +} + +// --------------------------------------------------------------------------- +// Conversion helpers +// --------------------------------------------------------------------------- + +func domainStrategyToUnstructured(s *domain.ScheduleStrategy, namespace string) *unstructured.Unstructured { + labelSelectors := make([]interface{}, len(s.LabelSelectors)) + for i, ls := range s.LabelSelectors { + labelSelectors[i] = map[string]interface{}{ + "key": ls.Key, + "value": ls.Value, + } + } + k8sNS := make([]interface{}, len(s.K8sNamespace)) + for i, ns := range s.K8sNamespace { + k8sNS[i] = ns + } + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "gthulhu.io/v1alpha1", + "kind": "SchedulingStrategy", + "metadata": map[string]interface{}{ + "name": s.ID.Hex(), + "namespace": namespace, + "labels": map[string]interface{}{ + labelCreatorID: s.CreatorID.Hex(), + }, + }, + "spec": map[string]interface{}{ + "strategyNamespace": s.StrategyNamespace, + "labelSelectors": labelSelectors, + "k8sNamespaces": k8sNS, + "commandRegex": s.CommandRegex, + "priority": int64(s.Priority), + "executionTime": s.ExecutionTime, + "creatorID": s.CreatorID.Hex(), + "updaterID": s.UpdaterID.Hex(), + "createdTime": s.CreatedTime, + "updatedTime": s.UpdatedTime, + }, + }, + } +} + +func unstructuredToDomainStrategy(obj *unstructured.Unstructured) (*domain.ScheduleStrategy, error) { + spec, found, err := unstructured.NestedMap(obj.Object, "spec") + if err != nil || !found { + return nil, fmt.Errorf("spec not found in strategy CR %s", obj.GetName()) + } + + id, err := bson.ObjectIDFromHex(obj.GetName()) + if err != nil { + return nil, fmt.Errorf("invalid strategy CR name %s: %w", obj.GetName(), err) + } + + strategy := &domain.ScheduleStrategy{ + BaseEntity: domain.BaseEntity{ + ID: id, + CreatedTime: getInt64(spec, "createdTime"), + UpdatedTime: getInt64(spec, "updatedTime"), + }, + StrategyNamespace: getStr(spec, "strategyNamespace"), + CommandRegex: getStr(spec, "commandRegex"), + Priority: int(getInt64(spec, "priority")), + ExecutionTime: getInt64(spec, "executionTime"), + } + + creatorID, err := parseObjectIDField(spec, "creatorID") + if err != nil { + return nil, fmt.Errorf("invalid creatorID in strategy CR %s: %w", obj.GetName(), err) + } + strategy.CreatorID = creatorID + + updaterID, err := parseObjectIDField(spec, "updaterID") + if err != nil { + return nil, fmt.Errorf("invalid updaterID in strategy CR %s: %w", obj.GetName(), err) + } + strategy.UpdaterID = updaterID + + if raw, ok := spec["labelSelectors"]; ok { + if arr, ok := raw.([]interface{}); ok { + for _, item := range arr { + m, ok := item.(map[string]interface{}) + if !ok { + continue + } + strategy.LabelSelectors = append(strategy.LabelSelectors, domain.LabelSelector{ + Key: getStr(m, "key"), + Value: getStr(m, "value"), + }) + } + } + } + if raw, ok := spec["k8sNamespaces"]; ok { + if arr, ok := raw.([]interface{}); ok { + for _, item := range arr { + if s, ok := item.(string); ok { + strategy.K8sNamespace = append(strategy.K8sNamespace, s) + } + } + } + } + return strategy, nil +} + +func domainIntentToUnstructured(intent *domain.ScheduleIntent, namespace string) *unstructured.Unstructured { + podLabels := map[string]interface{}{} + for k, v := range intent.PodLabels { + podLabels[k] = v + } + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "gthulhu.io/v1alpha1", + "kind": "SchedulingIntent", + "metadata": map[string]interface{}{ + "name": intent.ID.Hex(), + "namespace": namespace, + "labels": map[string]interface{}{ + labelCreatorID: intent.CreatorID.Hex(), + labelStrategyID: intent.StrategyID.Hex(), + labelState: strconv.Itoa(int(intent.State)), + }, + }, + "spec": map[string]interface{}{ + "strategyID": intent.StrategyID.Hex(), + "podID": intent.PodID, + "podName": intent.PodName, + "nodeID": intent.NodeID, + "k8sNamespace": intent.K8sNamespace, + "commandRegex": intent.CommandRegex, + "priority": int64(intent.Priority), + "executionTime": intent.ExecutionTime, + "podLabels": podLabels, + "state": int64(intent.State), + "creatorID": intent.CreatorID.Hex(), + "updaterID": intent.UpdaterID.Hex(), + "createdTime": intent.CreatedTime, + "updatedTime": intent.UpdatedTime, + }, + }, + } +} + +func unstructuredToDomainIntent(obj *unstructured.Unstructured) (*domain.ScheduleIntent, error) { + spec, found, err := unstructured.NestedMap(obj.Object, "spec") + if err != nil || !found { + return nil, fmt.Errorf("spec not found in intent CR %s", obj.GetName()) + } + + id, err := bson.ObjectIDFromHex(obj.GetName()) + if err != nil { + return nil, fmt.Errorf("invalid intent CR name %s: %w", obj.GetName(), err) + } + + intent := &domain.ScheduleIntent{ + BaseEntity: domain.BaseEntity{ + ID: id, + CreatedTime: getInt64(spec, "createdTime"), + UpdatedTime: getInt64(spec, "updatedTime"), + }, + PodID: getStr(spec, "podID"), + PodName: getStr(spec, "podName"), + NodeID: getStr(spec, "nodeID"), + K8sNamespace: getStr(spec, "k8sNamespace"), + CommandRegex: getStr(spec, "commandRegex"), + Priority: int(getInt64(spec, "priority")), + ExecutionTime: getInt64(spec, "executionTime"), + State: domain.IntentState(getInt64(spec, "state")), + } + + creatorID, err := parseObjectIDField(spec, "creatorID") + if err != nil { + return nil, fmt.Errorf("invalid creatorID in intent CR %s: %w", obj.GetName(), err) + } + intent.CreatorID = creatorID + + updaterID, err := parseObjectIDField(spec, "updaterID") + if err != nil { + return nil, fmt.Errorf("invalid updaterID in intent CR %s: %w", obj.GetName(), err) + } + intent.UpdaterID = updaterID + + strategyID, err := parseObjectIDField(spec, "strategyID") + if err != nil { + return nil, fmt.Errorf("invalid strategyID in intent CR %s: %w", obj.GetName(), err) + } + intent.StrategyID = strategyID + + if raw, ok := spec["podLabels"]; ok { + if m, ok := raw.(map[string]interface{}); ok { + intent.PodLabels = make(map[string]string, len(m)) + for k, v := range m { + if s, ok := v.(string); ok { + intent.PodLabels[k] = s + } + } + } + } + return intent, nil +} + +// --------------------------------------------------------------------------- +// Filter helpers +// --------------------------------------------------------------------------- + +func matchesStrategyFilter(s *domain.ScheduleStrategy, opt *domain.QueryStrategyOptions) bool { + if len(opt.CreatorIDs) > 0 && !containsOID(opt.CreatorIDs, s.CreatorID) { + return false + } + if len(opt.K8SNamespaces) > 0 { + if !sliceOverlap(opt.K8SNamespaces, s.K8sNamespace) { + return false + } + } + return true +} + +func matchesIntentFilter(intent *domain.ScheduleIntent, opt *domain.QueryIntentOptions) bool { + if len(opt.CreatorIDs) > 0 && !containsOID(opt.CreatorIDs, intent.CreatorID) { + return false + } + if len(opt.StrategyIDs) > 0 && !containsOID(opt.StrategyIDs, intent.StrategyID) { + return false + } + if len(opt.K8SNamespaces) > 0 && !containsStr(opt.K8SNamespaces, intent.K8sNamespace) { + return false + } + if len(opt.States) > 0 && !containsState(opt.States, intent.State) { + return false + } + if len(opt.PodIDs) > 0 && !containsStr(opt.PodIDs, intent.PodID) { + return false + } + return true +} + +// --------------------------------------------------------------------------- +// Utility helpers +// --------------------------------------------------------------------------- + +func buildLabelSelector(ids []bson.ObjectID, label string) string { + if len(ids) == 0 { + return "" + } + if len(ids) == 1 { + return label + "=" + ids[0].Hex() + } + vals := make([]string, len(ids)) + for i, id := range ids { + vals[i] = id.Hex() + } + return label + " in (" + strings.Join(vals, ",") + ")" +} + +func containsOID(ids []bson.ObjectID, target bson.ObjectID) bool { + for _, id := range ids { + if id == target { + return true + } + } + return false +} + +func containsStr(haystack []string, needle string) bool { + for _, s := range haystack { + if s == needle { + return true + } + } + return false +} + +func containsState(haystack []domain.IntentState, needle domain.IntentState) bool { + for _, s := range haystack { + if s == needle { + return true + } + } + return false +} + +func sliceOverlap(a, b []string) bool { + set := make(map[string]struct{}, len(b)) + for _, v := range b { + set[v] = struct{}{} + } + for _, v := range a { + if _, ok := set[v]; ok { + return true + } + } + return false +} + +func buildStateLabelSelector(states []domain.IntentState) string { + if len(states) == 0 { + return "" + } + if len(states) == 1 { + return labelState + "=" + strconv.Itoa(int(states[0])) + } + vals := make([]string, len(states)) + for i, state := range states { + vals[i] = strconv.Itoa(int(state)) + } + return labelState + " in (" + strings.Join(vals, ",") + ")" +} + +func parseObjectIDField(m map[string]interface{}, key string) (bson.ObjectID, error) { + v, ok := m[key] + if !ok { + return bson.ObjectID{}, fmt.Errorf("missing field %s", key) + } + value, ok := v.(string) + if !ok { + return bson.ObjectID{}, fmt.Errorf("field %s is not a string", key) + } + if value == "" { + return bson.ObjectID{}, fmt.Errorf("field %s is empty", key) + } + id, err := bson.ObjectIDFromHex(value) + if err != nil { + return bson.ObjectID{}, err + } + return id, nil +} + +func getStr(m map[string]interface{}, key string) string { + v, _ := m[key].(string) + return v +} + +func getInt64(m map[string]interface{}, key string) int64 { + switch v := m[key].(type) { + case int64: + return v + case float64: + return int64(v) + case int: + return int64(v) + default: + return 0 + } +} diff --git a/manager/repository/cr_strategy_repo_test.go b/manager/repository/cr_strategy_repo_test.go new file mode 100644 index 0000000..2110d87 --- /dev/null +++ b/manager/repository/cr_strategy_repo_test.go @@ -0,0 +1,283 @@ +package repository + +import ( + "context" + "testing" + + "github.com/Gthulhu/api/manager/domain" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/v2/bson" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + dynamicfake "k8s.io/client-go/dynamic/fake" +) + +func newTestCRRepo() *repo { + scheme := runtime.NewScheme() + fakeClient := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, + map[schema.GroupVersionResource]string{ + strategyGVR: "SchedulingStrategyList", + intentGVR: "SchedulingIntentList", + }, + ) + return &repo{ + k8sDynamic: fakeClient, + crNamespace: "test-ns", + } +} + +func TestCRInsertStrategyAndIntents(t *testing.T) { + r := newTestCRRepo() + ctx := context.Background() + + creatorID := bson.NewObjectID() + strategy := &domain.ScheduleStrategy{ + BaseEntity: domain.BaseEntity{ + CreatorID: creatorID, + UpdaterID: creatorID, + }, + StrategyNamespace: "prod", + LabelSelectors: []domain.LabelSelector{ + {Key: "app", Value: "nginx"}, + }, + K8sNamespace: []string{"default"}, + CommandRegex: "nginx", + Priority: 10, + ExecutionTime: 5000, + } + intents := []*domain.ScheduleIntent{ + { + BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, + PodID: "pod-uid-1", + PodName: "nginx-abc", + NodeID: "node-1", + K8sNamespace: "default", + CommandRegex: "nginx", + Priority: 10, + ExecutionTime: 5000, + PodLabels: map[string]string{"app": "nginx"}, + State: domain.IntentStateInitialized, + }, + } + + err := r.InsertStrategyAndIntents(ctx, strategy, intents) + require.NoError(t, err) + assert.False(t, strategy.ID.IsZero(), "strategy ID should be set") + assert.False(t, intents[0].ID.IsZero(), "intent ID should be set") + assert.Equal(t, strategy.ID, intents[0].StrategyID, "intent should reference strategy") + + // Query strategy back + opt := &domain.QueryStrategyOptions{IDs: []bson.ObjectID{strategy.ID}} + err = r.QueryStrategies(ctx, opt) + require.NoError(t, err) + require.Len(t, opt.Result, 1) + assert.Equal(t, strategy.ID, opt.Result[0].ID) + assert.Equal(t, "prod", opt.Result[0].StrategyNamespace) + assert.Equal(t, 10, opt.Result[0].Priority) + assert.Equal(t, int64(5000), opt.Result[0].ExecutionTime) + assert.Equal(t, "nginx", opt.Result[0].CommandRegex) + require.Len(t, opt.Result[0].LabelSelectors, 1) + assert.Equal(t, "app", opt.Result[0].LabelSelectors[0].Key) + assert.Equal(t, "nginx", opt.Result[0].LabelSelectors[0].Value) + require.Len(t, opt.Result[0].K8sNamespace, 1) + assert.Equal(t, "default", opt.Result[0].K8sNamespace[0]) + + // Query intent back + intentOpt := &domain.QueryIntentOptions{IDs: []bson.ObjectID{intents[0].ID}} + err = r.QueryIntents(ctx, intentOpt) + require.NoError(t, err) + require.Len(t, intentOpt.Result, 1) + assert.Equal(t, intents[0].ID, intentOpt.Result[0].ID) + assert.Equal(t, "pod-uid-1", intentOpt.Result[0].PodID) + assert.Equal(t, "nginx-abc", intentOpt.Result[0].PodName) + assert.Equal(t, "node-1", intentOpt.Result[0].NodeID) + assert.Equal(t, domain.IntentStateInitialized, intentOpt.Result[0].State) +} + +func TestCRQueryStrategiesByCreator(t *testing.T) { + r := newTestCRRepo() + ctx := context.Background() + + creator1 := bson.NewObjectID() + creator2 := bson.NewObjectID() + + s1 := &domain.ScheduleStrategy{ + BaseEntity: domain.BaseEntity{CreatorID: creator1, UpdaterID: creator1}, + Priority: 1, + } + s2 := &domain.ScheduleStrategy{ + BaseEntity: domain.BaseEntity{CreatorID: creator2, UpdaterID: creator2}, + Priority: 2, + } + require.NoError(t, r.InsertStrategyAndIntents(ctx, s1, []*domain.ScheduleIntent{})) + require.NoError(t, r.InsertStrategyAndIntents(ctx, s2, []*domain.ScheduleIntent{})) + + // Query by creator1 + opt := &domain.QueryStrategyOptions{CreatorIDs: []bson.ObjectID{creator1}} + err := r.QueryStrategies(ctx, opt) + require.NoError(t, err) + require.Len(t, opt.Result, 1) + assert.Equal(t, s1.ID, opt.Result[0].ID) +} + +func TestCRDeleteStrategy(t *testing.T) { + r := newTestCRRepo() + ctx := context.Background() + + creatorID := bson.NewObjectID() + s := &domain.ScheduleStrategy{ + BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, + } + require.NoError(t, r.InsertStrategyAndIntents(ctx, s, []*domain.ScheduleIntent{})) + + err := r.DeleteStrategy(ctx, s.ID) + require.NoError(t, err) + + opt := &domain.QueryStrategyOptions{IDs: []bson.ObjectID{s.ID}} + err = r.QueryStrategies(ctx, opt) + require.NoError(t, err) + assert.Empty(t, opt.Result) +} + +func TestCRDeleteIntentsByStrategyID(t *testing.T) { + r := newTestCRRepo() + ctx := context.Background() + + creatorID := bson.NewObjectID() + strategy := &domain.ScheduleStrategy{ + BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, + } + intents := []*domain.ScheduleIntent{ + {BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, PodID: "p1", NodeID: "n1", State: domain.IntentStateInitialized}, + {BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, PodID: "p2", NodeID: "n1", State: domain.IntentStateInitialized}, + } + require.NoError(t, r.InsertStrategyAndIntents(ctx, strategy, intents)) + + err := r.DeleteIntentsByStrategyID(ctx, strategy.ID) + require.NoError(t, err) + + opt := &domain.QueryIntentOptions{StrategyIDs: []bson.ObjectID{strategy.ID}} + err = r.QueryIntents(ctx, opt) + require.NoError(t, err) + assert.Empty(t, opt.Result) +} + +func TestCRBatchUpdateIntentsState(t *testing.T) { + r := newTestCRRepo() + ctx := context.Background() + + creatorID := bson.NewObjectID() + strategy := &domain.ScheduleStrategy{ + BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, + } + intents := []*domain.ScheduleIntent{ + {BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, PodID: "p1", NodeID: "n1", State: domain.IntentStateInitialized}, + } + require.NoError(t, r.InsertStrategyAndIntents(ctx, strategy, intents)) + + err := r.BatchUpdateIntentsState(ctx, []bson.ObjectID{intents[0].ID}, domain.IntentStateSent) + require.NoError(t, err) + + opt := &domain.QueryIntentOptions{IDs: []bson.ObjectID{intents[0].ID}} + err = r.QueryIntents(ctx, opt) + require.NoError(t, err) + require.Len(t, opt.Result, 1) + assert.Equal(t, domain.IntentStateSent, opt.Result[0].State) +} + +func TestCRUpdateStrategy(t *testing.T) { + r := newTestCRRepo() + ctx := context.Background() + + creatorID := bson.NewObjectID() + strategy := &domain.ScheduleStrategy{ + BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, + StrategyNamespace: "old", + Priority: 1, + } + require.NoError(t, r.InsertStrategyAndIntents(ctx, strategy, []*domain.ScheduleIntent{})) + + // Update the strategy + strategy.StrategyNamespace = "new" + strategy.Priority = 99 + err := r.UpdateStrategy(ctx, strategy) + require.NoError(t, err) + + // Verify update + opt := &domain.QueryStrategyOptions{IDs: []bson.ObjectID{strategy.ID}} + err = r.QueryStrategies(ctx, opt) + require.NoError(t, err) + require.Len(t, opt.Result, 1) + assert.Equal(t, "new", opt.Result[0].StrategyNamespace) + assert.Equal(t, 99, opt.Result[0].Priority) +} + +func TestCRInsertAndDeleteIntents(t *testing.T) { + r := newTestCRRepo() + ctx := context.Background() + + creatorID := bson.NewObjectID() + strategyID := bson.NewObjectID() + intents := []*domain.ScheduleIntent{ + {BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, StrategyID: strategyID, PodID: "p1", NodeID: "n1", State: domain.IntentStateInitialized}, + {BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, StrategyID: strategyID, PodID: "p2", NodeID: "n1", State: domain.IntentStateInitialized}, + } + require.NoError(t, r.InsertIntents(ctx, intents)) + + // Delete one intent + err := r.DeleteIntents(ctx, []bson.ObjectID{intents[0].ID}) + require.NoError(t, err) + + // Verify only one remains + opt := &domain.QueryIntentOptions{StrategyIDs: []bson.ObjectID{strategyID}} + err = r.QueryIntents(ctx, opt) + require.NoError(t, err) + require.Len(t, opt.Result, 1) + assert.Equal(t, intents[1].ID, opt.Result[0].ID) +} + +func TestCRQueryIntentsByCreator(t *testing.T) { + r := newTestCRRepo() + ctx := context.Background() + + creator1 := bson.NewObjectID() + creator2 := bson.NewObjectID() + strategyID := bson.NewObjectID() + + intents1 := []*domain.ScheduleIntent{ + {BaseEntity: domain.BaseEntity{CreatorID: creator1, UpdaterID: creator1}, StrategyID: strategyID, PodID: "p1", NodeID: "n1", State: domain.IntentStateInitialized}, + } + intents2 := []*domain.ScheduleIntent{ + {BaseEntity: domain.BaseEntity{CreatorID: creator2, UpdaterID: creator2}, StrategyID: strategyID, PodID: "p2", NodeID: "n1", State: domain.IntentStateInitialized}, + } + require.NoError(t, r.InsertIntents(ctx, intents1)) + require.NoError(t, r.InsertIntents(ctx, intents2)) + + // Query by creator1 + opt := &domain.QueryIntentOptions{CreatorIDs: []bson.ObjectID{creator1}} + err := r.QueryIntents(ctx, opt) + require.NoError(t, err) + require.Len(t, opt.Result, 1) + assert.Equal(t, "p1", opt.Result[0].PodID) +} + +func TestCRQueryIntentsByStates(t *testing.T) { + r := newTestCRRepo() + ctx := context.Background() + + creatorID := bson.NewObjectID() + strategyID := bson.NewObjectID() + intents := []*domain.ScheduleIntent{ + {BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, StrategyID: strategyID, PodID: "p1", NodeID: "n1", State: domain.IntentStateInitialized}, + {BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, StrategyID: strategyID, PodID: "p2", NodeID: "n1", State: domain.IntentStateSent}, + {BaseEntity: domain.BaseEntity{CreatorID: creatorID, UpdaterID: creatorID}, StrategyID: strategyID, PodID: "p3", NodeID: "n1", State: domain.IntentStateUnknown}, + } + require.NoError(t, r.InsertIntents(ctx, intents)) + + opt := &domain.QueryIntentOptions{States: []domain.IntentState{domain.IntentStateSent, domain.IntentStateUnknown}} + err := r.QueryIntents(ctx, opt) + require.NoError(t, err) + require.Len(t, opt.Result, 2) + assert.ElementsMatch(t, []string{"p2", "p3"}, []string{opt.Result[0].PodID, opt.Result[1].PodID}) +} diff --git a/manager/repository/repo.go b/manager/repository/repo.go index a4f6cae..ca82799 100644 --- a/manager/repository/repo.go +++ b/manager/repository/repo.go @@ -12,11 +12,14 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" "go.uber.org/fx" + "k8s.io/client-go/dynamic" ) type Params struct { fx.In - MongoConfig config.MongoDBConfig + MongoConfig config.MongoDBConfig + K8SConfig config.K8SConfig + DynamicClient dynamic.Interface } func NewRepository(params Params) (domain.Repository, error) { @@ -51,23 +54,30 @@ func NewRepository(params Params) (domain.Repository, error) { dbName = "manager" } + crNamespace := params.K8SConfig.CRDNamespace + if crNamespace == "" { + crNamespace = "gthulhu-system" + } + return &repo{ - client: client, - db: client.Database(dbName), + client: client, + db: client.Database(dbName), + k8sDynamic: params.DynamicClient, + crNamespace: crNamespace, }, nil } type repo struct { - client *mongo.Client - db *mongo.Database + client *mongo.Client + db *mongo.Database + k8sDynamic dynamic.Interface + crNamespace string } const ( - userCollection = "users" - roleCollection = "roles" - permissionCollection = "permissions" - auditLogCollection = "audit_logs" - defaultTimestampField = "timestamp" - scheduleStrategyCollection = "schedule_strategies" - scheduleIntentCollection = "schedule_intents" + userCollection = "users" + roleCollection = "roles" + permissionCollection = "permissions" + auditLogCollection = "audit_logs" + defaultTimestampField = "timestamp" ) diff --git a/manager/repository/strategy_repo.go b/manager/repository/strategy_repo.go deleted file mode 100644 index d0fea9b..0000000 --- a/manager/repository/strategy_repo.go +++ /dev/null @@ -1,180 +0,0 @@ -package repository - -import ( - "context" - "errors" - "time" - - "github.com/Gthulhu/api/manager/domain" - "go.mongodb.org/mongo-driver/v2/bson" -) - -func (r *repo) InsertStrategyAndIntents(ctx context.Context, strategy *domain.ScheduleStrategy, intents []*domain.ScheduleIntent) error { - if strategy == nil { - return errors.New("nil strategy") - } - if intents == nil { - return errors.New("nil intents") - } - now := time.Now().UnixMilli() - if strategy.CreatedTime == 0 { - strategy.CreatedTime = now - } - strategy.UpdatedTime = now - res, err := r.db.Collection(scheduleStrategyCollection).InsertOne(ctx, strategy) - if err != nil { - return err - } - if oid, ok := res.InsertedID.(bson.ObjectID); ok { - strategy.ID = oid - } - - for _, intent := range intents { - if intent.ID.IsZero() { - intent.ID = bson.NewObjectID() - } - intent.StrategyID = strategy.ID - if intent.CreatedTime == 0 { - intent.CreatedTime = now - } - if intent.UpdatedTime == 0 { - intent.UpdatedTime = now - } - } - _, err = r.db.Collection(scheduleIntentCollection).InsertMany(ctx, intents) - if err != nil { - return err - } - return nil -} - -func (r *repo) InsertIntents(ctx context.Context, intents []*domain.ScheduleIntent) error { - if len(intents) == 0 { - return nil - } - now := time.Now().UnixMilli() - for _, intent := range intents { - if intent.ID.IsZero() { - intent.ID = bson.NewObjectID() - } - if intent.CreatedTime == 0 { - intent.CreatedTime = now - } - if intent.UpdatedTime == 0 { - intent.UpdatedTime = now - } - } - _, err := r.db.Collection(scheduleIntentCollection).InsertMany(ctx, intents) - return err -} - -func (r *repo) BatchUpdateIntentsState(ctx context.Context, intentIDs []bson.ObjectID, newState domain.IntentState) error { - update := bson.M{ - "$set": bson.M{ - "state": newState, - "updateTime": time.Now().UnixMilli(), - }, - } - _, err := r.db.Collection(scheduleIntentCollection).UpdateMany(ctx, bson.M{ - "_id": bson.M{"$in": intentIDs}, - }, update) - if err != nil { - return err - } - return nil -} - -func (r *repo) QueryStrategies(ctx context.Context, opt *domain.QueryStrategyOptions) error { - if opt == nil { - return errors.New("nil query options") - } - filter := bson.M{} - if len(opt.IDs) > 0 { - filter["_id"] = bson.M{"$in": opt.IDs} - } - if len(opt.K8SNamespaces) > 0 { - filter["k8sNamespace"] = bson.M{"$in": opt.K8SNamespaces} - } - if len(opt.CreatorIDs) > 0 { - filter["creatorID"] = bson.M{"$in": opt.CreatorIDs} - } - cursor, err := r.db.Collection(scheduleStrategyCollection).Find(ctx, filter) - if err != nil { - return err - } - defer cursor.Close(ctx) - - for cursor.Next(ctx) { - var strategy domain.ScheduleStrategy - if err := cursor.Decode(&strategy); err != nil { - return err - } - opt.Result = append(opt.Result, &strategy) - } - return cursor.Err() -} - -func (r *repo) QueryIntents(ctx context.Context, opt *domain.QueryIntentOptions) error { - if opt == nil { - return errors.New("nil query options") - } - filter := bson.M{} - if len(opt.IDs) > 0 { - filter["_id"] = bson.M{"$in": opt.IDs} - } - if len(opt.K8SNamespaces) > 0 { - filter["k8sNamespace"] = bson.M{"$in": opt.K8SNamespaces} - } - if len(opt.StrategyIDs) > 0 { - filter["strategyID"] = bson.M{"$in": opt.StrategyIDs} - } - if len(opt.PodIDs) > 0 { - filter["podID"] = bson.M{"$in": opt.PodIDs} - } - if len(opt.States) > 0 { - filter["state"] = bson.M{"$in": opt.States} - } - if len(opt.CreatorIDs) > 0 { - filter["creatorID"] = bson.M{"$in": opt.CreatorIDs} - } - cursor, err := r.db.Collection(scheduleIntentCollection).Find(ctx, filter) - if err != nil { - return err - } - defer cursor.Close(ctx) - - for cursor.Next(ctx) { - var intent domain.ScheduleIntent - if err := cursor.Decode(&intent); err != nil { - return err - } - opt.Result = append(opt.Result, &intent) - } - return cursor.Err() -} - -func (r *repo) DeleteStrategy(ctx context.Context, strategyID bson.ObjectID) error { - _, err := r.db.Collection(scheduleStrategyCollection).DeleteOne(ctx, bson.M{"_id": strategyID}) - return err -} - -func (r *repo) UpdateStrategy(ctx context.Context, strategyID bson.ObjectID, update bson.M) error { - if update == nil { - return errors.New("nil update") - } - _, err := r.db.Collection(scheduleStrategyCollection).UpdateOne(ctx, bson.M{"_id": strategyID}, update) - return err -} - -func (r *repo) DeleteIntents(ctx context.Context, intentIDs []bson.ObjectID) error { - if len(intentIDs) == 0 { - return nil - } - _, err := r.db.Collection(scheduleIntentCollection).DeleteMany(ctx, bson.M{"_id": bson.M{"$in": intentIDs}}) - return err -} - -func (r *repo) DeleteIntentsByStrategyID(ctx context.Context, strategyID bson.ObjectID) error { - _, err := r.db.Collection(scheduleIntentCollection).DeleteMany(ctx, bson.M{"strategyID": strategyID}) - return err -} diff --git a/manager/service/strategy_svc.go b/manager/service/strategy_svc.go index 9c2498e..3d512e9 100644 --- a/manager/service/strategy_svc.go +++ b/manager/service/strategy_svc.go @@ -155,19 +155,13 @@ func (svc *Service) UpdateScheduleStrategy(ctx context.Context, operator *domain // Update strategy document now := time.Now().UnixMilli() - update := bson.M{ - "$set": bson.M{ - "strategyNamespace": strategy.StrategyNamespace, - "labelSelectors": strategy.LabelSelectors, - "k8sNamespace": strategy.K8sNamespace, - "commandRegex": strategy.CommandRegex, - "priority": strategy.Priority, - "executionTime": strategy.ExecutionTime, - "updaterID": operatorID, - "updatedTime": now, - }, - } - if err := svc.Repo.UpdateStrategy(ctx, strategyObjID, update); err != nil { + strategy.ID = strategyObjID + strategy.CreatedTime = currentStrategy.CreatedTime + strategy.CreatorID = currentStrategy.CreatorID + strategy.UpdaterID = operatorID + strategy.UpdatedTime = now + + if err := svc.Repo.UpdateStrategy(ctx, strategy); err != nil { return fmt.Errorf("update strategy: %w", err) } @@ -176,12 +170,6 @@ func (svc *Service) UpdateScheduleStrategy(ctx context.Context, operator *domain return fmt.Errorf("delete intents by strategy ID: %w", err) } - strategy.ID = strategyObjID - strategy.CreatedTime = currentStrategy.CreatedTime - strategy.CreatorID = currentStrategy.CreatorID - strategy.UpdaterID = operatorID - strategy.UpdatedTime = now - intents := make([]*domain.ScheduleIntent, 0, len(pods)) nodeIDsMap := make(map[string]struct{}) nodeIDs := make([]string, 0)