Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/external/cinder/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ type ExternalSchedulerRequest struct {
Weights map[string]float64 `json:"weights"`
// The name of the pipeline to execute.
Pipeline string `json:"pipeline"`
// Options configure the pipeline behavior for this scheduling call.
Options lib.Options `json:"options,omitempty"`
}

func (r ExternalSchedulerRequest) GetOptions() lib.Options { return r.Options }
func (r ExternalSchedulerRequest) GetHosts() []string {
hosts := make([]string, len(r.Hosts))
for i, host := range r.Hosts {
Expand Down
3 changes: 3 additions & 0 deletions api/external/ironcore/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
type MachinePipelineRequest struct {
// The available machine pools.
Pools []ironcorev1alpha1.MachinePool `json:"pools"`
// Options configure the pipeline behavior for this scheduling call.
Options lib.Options `json:"options,omitempty"`
}

func (r MachinePipelineRequest) GetOptions() lib.Options { return r.Options }
func (r MachinePipelineRequest) GetHosts() []string {
hosts := make([]string, len(r.Pools))
for i, host := range r.Pools {
Expand Down
3 changes: 3 additions & 0 deletions api/external/manila/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ type ExternalSchedulerRequest struct {
Weights map[string]float64 `json:"weights"`
// The name of the pipeline to execute.
Pipeline string `json:"pipeline"`
// Options configure the pipeline behavior for this scheduling call.
Options lib.Options `json:"options,omitempty"`
}

func (r ExternalSchedulerRequest) GetOptions() lib.Options { return r.Options }
func (r ExternalSchedulerRequest) GetHosts() []string {
hosts := make([]string, len(r.Hosts))
for i, host := range r.Hosts {
Expand Down
6 changes: 6 additions & 0 deletions api/external/nova/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ type ExternalSchedulerRequest struct {

// The name of the pipeline to execute.
Pipeline string `json:"pipeline"`

// Options configure the pipeline behavior for this scheduling call.
// Set by the caller (CR controller, failover controller, Nova).
// Nova does not set these; Cortex fills in config-derived defaults server-side.
Options lib.Options `json:"options,omitempty"`
}

func (r ExternalSchedulerRequest) GetOptions() lib.Options { return r.Options }
Comment on lines +41 to +47
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find where Nova's ExternalSchedulerRequest is deserialized and where Options is overwritten on the server side.
rg -nP -C5 'ExternalSchedulerRequest' --type=go
rg -nP -C5 'request\.Options\s*=' --type=go
rg -nP -C5 'buildOptions' --type=go

Repository: cobaltcore-dev/cortex

Length of output: 50378


🏁 Script executed:

#!/bin/bash
# Find HTTP handlers that accept ExternalSchedulerRequest (POST endpoints from Nova)
rg -nP 'func.*ExternalSchedulerRequest' --type=go | grep -v test | head -20

# Search for code that might override or ignore incoming Options
rg -nP '\.Options\s*=' --type=go | grep -v test | head -20

# Look for "buildOptions" or similar factory functions
rg -nP '(buildOptions|NewOptions|defaultOptions)' --type=go -i | grep -v test | head -20

Repository: cobaltcore-dev/cortex

Length of output: 4457


🏁 Script executed:

#!/bin/bash
# Find the HTTP handler that receives requests FROM Nova (not sends to Nova)
# Look for POST handlers or API routes
rg -nP '(HandleFunc|POST|POST)' internal/scheduling --type=go | grep -i 'schedule\|request' | head -20

# Look at filter_weigher_pipeline_controller to see how it processes incoming requests
fd 'filter_weigher_pipeline_controller' --type=go -x cat -n {} | head -100

# Search for where the ExternalSchedulerRequest JSON is decoded
rg -nP 'Decode.*ExternalSchedulerRequest|json\.Unmarshal.*ExternalSchedulerRequest' --type=go -B3 -A3 | head -50

Repository: cobaltcore-dev/cortex

Length of output: 2785


🏁 Script executed:

#!/bin/bash
# Read the external_scheduler_api.go file to see how incoming requests are handled
cat -n internal/scheduling/nova/external_scheduler_api.go

Repository: cobaltcore-dev/cortex

Length of output: 11670


🏁 Script executed:

#!/bin/bash
# Search for where request.Options or r.Options is used in the pipeline
rg -nP 'request\.Options|\.GetOptions\(\)' --type=go -C3 | head -60

# Look for any code that builds or resets Options after the request is received
rg -nP 'buildOptions|setOptions|resetOptions' --type=go -i -C2

Repository: cobaltcore-dev/cortex

Length of output: 5767


🏁 Script executed:

#!/bin/bash
# Check the Options struct definition to see all fields
fd -e go 'Options' api/external/nova --type=f -x cat -n {} | head -100

# Also check what the full Options struct looks like
rg -nP 'type Options struct' --type=go -A20

Repository: cobaltcore-dev/cortex

Length of output: 1990


Clarify Options handling contract between Nova and Cortex, or reset untrusted caller-supplied Options.

The field comment states that "Nova does not set these; Cortex fills in config-derived defaults server-side," but the code only partially enforces this: in filter_weigher_pipeline_controller.go (lines 177–180), only RecordHistory is conditionally set from pipeline config. Other Options fields (AssumeEmptyHosts, IgnoredReservationTypes, MaxCandidates, etc.) are passed through directly from the incoming request without validation or reset.

If this API is restricted to trusted Nova calls only, the comment should clarify that. If external/untrusted callers can reach this endpoint, the incoming Options should be reset or validated to prevent caller-supplied flags like AssumeEmptyHosts: true or MaxCandidates: 1 from bypassing capacity checks or placement logic.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@api/external/nova/messages.go` around lines 41 - 47, The Options field is
currently annotated as "Nova does not set these" but incoming Options are passed
through unchanged; update the contract or enforce server-side defaults: either
clarify the comment on
ExternalSchedulerRequest.Options/ExternalSchedulerRequest.GetOptions to state
this endpoint is trust-only (Nova only), or—preferable—reset/validate
caller-supplied options in the request handling code (e.g., in the
FilterWeigherPipelineController pipeline code where RecordHistory is set) by
replacing or sanitizing Options before use (clear or set safe defaults for
AssumeEmptyHosts, IgnoredReservationTypes, MaxCandidates, etc., or whitelist
only allowed fields), and ensure the pipeline uses the sanitizedOptions instance
everywhere downstream.

func (r ExternalSchedulerRequest) GetHosts() []string {
hosts := make([]string, len(r.Hosts))
for i, host := range r.Hosts {
Expand Down
3 changes: 3 additions & 0 deletions api/external/pods/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ type PodPipelineRequest struct {
Nodes []corev1.Node `json:"nodes"`
// The pod to be scheduled.
Pod corev1.Pod `json:"pod"`
// Options configure the pipeline behavior for this scheduling call.
Options lib.Options `json:"options,omitempty"`
}

func (r PodPipelineRequest) GetOptions() lib.Options { return r.Options }
func (r PodPipelineRequest) GetHosts() []string {
hosts := make([]string, len(r.Nodes))
for i, host := range r.Nodes {
Expand Down
16 changes: 16 additions & 0 deletions internal/scheduling/lib/filter_weigher_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

type FilterWeigherPipeline[RequestType FilterWeigherPipelineRequest] interface {
// Run the scheduling pipeline with the given request.
// Call-time options are read from request.GetOptions().
Run(request RequestType) (v1alpha1.DecisionResult, error)
}

Expand Down Expand Up @@ -263,6 +264,10 @@ func (s *filterWeigherPipeline[RequestType]) sortHostsByWeights(weights map[stri

// Evaluate the pipeline and return a list of hosts in order of preference.
func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.DecisionResult, error) {
opts := request.GetOptions()
if err := opts.Validate(); err != nil {
return v1alpha1.DecisionResult{}, err
}
slogArgs := request.GetTraceLogArgs()
slogArgsAny := make([]any, 0, len(slogArgs))
for _, arg := range slogArgs {
Expand Down Expand Up @@ -297,6 +302,17 @@ func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.
hosts := p.sortHostsByWeights(outWeights)
traceLog.Info("scheduler: sorted hosts", "hosts", hosts)

if opts.MaxCandidates > 0 && len(hosts) > opts.MaxCandidates {
traceLog.Info("scheduler: trimming candidate list", "maxCandidates", opts.MaxCandidates, "before", len(hosts))
hosts = hosts[:opts.MaxCandidates]
// Drop trimmed hosts from outWeights so AggregatedOutWeights stays consistent.
for host := range outWeights {
if !slices.Contains(hosts, host) {
delete(outWeights, host)
}
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide logging here so we see what's going on.

// Collect some metrics about the pipeline execution.
go p.monitor.observePipelineResult(request, hosts)

Expand Down
2 changes: 2 additions & 0 deletions internal/scheduling/lib/filter_weigher_pipeline_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ type FilterWeigherPipelineRequest interface {
// Get logging args to be used in the step's trace log.
// Usually, this will be the request context including the request ID.
GetTraceLogArgs() []slog.Attr
// Get the call-time options for this pipeline run.
GetOptions() Options
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ type mockFilterWeigherPipelineRequest struct {
Hosts []string
Weights map[string]float64
Pipeline string
Options Options
}

func (m mockFilterWeigherPipelineRequest) GetWeightKeys() []string { return m.WeightKeys }
func (m mockFilterWeigherPipelineRequest) GetTraceLogArgs() []slog.Attr { return m.TraceLogArgs }
func (m mockFilterWeigherPipelineRequest) GetHosts() []string { return m.Hosts }
func (m mockFilterWeigherPipelineRequest) GetWeights() map[string]float64 { return m.Weights }
func (m mockFilterWeigherPipelineRequest) GetPipeline() string { return m.Pipeline }
func (m mockFilterWeigherPipelineRequest) GetOptions() Options { return m.Options }

func (m mockFilterWeigherPipelineRequest) Filter(hosts map[string]float64) FilterWeigherPipelineRequest {
filteredHosts := make([]string, 0, len(hosts))
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduling/lib/filter_weigher_pipeline_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type FilterWeigherPipelineStep[RequestType FilterWeigherPipelineRequest] interfa
//
// A traceLog is provided that contains the global request id and should
// be used to log the step's execution.
//
// Per-call options are available via request.GetOptions().
Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error)
}

Expand Down
58 changes: 58 additions & 0 deletions internal/scheduling/lib/filter_weigher_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,61 @@ func TestFilterWeigherPipelineMonitor_SubPipeline(t *testing.T) {
t.Error("original monitor should not be modified")
}
}

func TestPipeline_MaxCandidates(t *testing.T) {
// Pipeline that passes all 4 hosts with descending weights.
pipeline := &filterWeigherPipeline[mockFilterWeigherPipelineRequest]{
filters: map[string]Filter[mockFilterWeigherPipelineRequest]{},
filtersOrder: []string{},
weighersOrder: []string{},
weighers: map[string]Weigher[mockFilterWeigherPipelineRequest]{},
}
request := mockFilterWeigherPipelineRequest{
Hosts: []string{"host1", "host2", "host3", "host4"},
Weights: map[string]float64{"host1": 4.0, "host2": 3.0, "host3": 2.0, "host4": 1.0},
}

tests := []struct {
name string
maxCandidates int
wantLen int
wantFirst string
}{
{"no limit", 0, 4, "host1"},
{"limit to 2", 2, 2, "host1"},
{"limit to 1", 1, 1, "host1"},
{"limit larger than hosts", 10, 4, "host1"},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req := request
req.Options = Options{MaxCandidates: tt.maxCandidates}
result, err := pipeline.Run(req)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if len(result.OrderedHosts) != tt.wantLen {
t.Errorf("expected %d hosts, got %d: %v", tt.wantLen, len(result.OrderedHosts), result.OrderedHosts)
}
if len(result.OrderedHosts) > 0 && result.OrderedHosts[0] != tt.wantFirst {
t.Errorf("expected first host %s, got %s", tt.wantFirst, result.OrderedHosts[0])
}
if tt.maxCandidates > 0 && len(result.OrderedHosts) <= tt.maxCandidates {
// AggregatedOutWeights must only contain returned hosts.
for host := range result.AggregatedOutWeights {
found := false
for _, h := range result.OrderedHosts {
if h == host {
found = true
break
}
}
if !found {
t.Errorf("AggregatedOutWeights contains trimmed host %s", host)
}
}
}
})
}
}
49 changes: 49 additions & 0 deletions internal/scheduling/lib/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package lib

import (
"errors"

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
)

// Options configure the behavior of a single pipeline run at call time.
// These are distinct from per-step YAML options (FilterWeigherPipelineStepOpts),
// which are static and set when the pipeline is initialized.
//
// Consumed by steps: ReadOnly, LockReservations, AssumeEmptyHosts, IgnoredReservationTypes.
// Consumed by the controller after pipeline.Run(): RecordHistory, CreateInflight.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two code comment lines are likely to be obsolete once the controller or step logic changes. We should consider removing them.

type Options struct {
// ReadOnly means the pipeline run does not modify shared scheduling state (reservations,
// history, inflight records). Concurrent read-only runs are safe under a shared read lock.
// Note: the controller may still write the Decision status after Run() regardless of this flag.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems inconsistent. Shouldn't we draw the line here? Read-only requests create or modify NO resources and are purely to calculate host candidates for constraints.

ReadOnly bool
// LockReservations prevents reservation unlocking, e.g. in the capacity filter.
// Set when finding hosts for new reservations (failover, CR) to see true available capacity.
LockReservations bool
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be more generic such as Kind which is a typed enum? In this case, the capacity filter would just check for req.GetOptions().Kind == KindFailoverReservation to control which logic is executed. We could also add a kind KindCapacityScan for limes etc. -- this is nicely extensible and well-defined. In this case, the ReadOnly, AssumeEmptyHosts, and CreateInFlight flags could also be removed.

// AssumeEmptyHosts treats all hosts as having no running VMs.
AssumeEmptyHosts bool
// IgnoredReservationTypes lists reservation types the capacity filter skips entirely.
IgnoredReservationTypes []v1alpha1.ReservationType
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this a substruct such as type ReservationOptions struct?

// MaxCandidates limits the number of hosts returned after weighing. 0 means no limit.
MaxCandidates int

// RecordHistory records the placement decision in placement history.
// Replaces pipeline.Spec.CreateHistory once pipelines consolidate.
RecordHistory bool
// CreateInflight creates pessimistic blocking reservations for all returned candidates.
CreateInflight bool
}

// Validate checks for mutually exclusive or inconsistent option combinations.
func (o Options) Validate() error {
if o.ReadOnly && o.RecordHistory {
return errors.New("ReadOnly and RecordHistory are mutually exclusive: read-only runs must not write scheduling history")
}
if o.ReadOnly && o.CreateInflight {
return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not create inflight reservations")
}
Comment on lines +42 to +47
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use lowercase error messages to satisfy linting

At Line [41] and Line [44], error strings start with uppercase words. This can fail lint checks in this repo.

🔧 Suggested patch
-		return errors.New("ReadOnly and RecordHistory are mutually exclusive: read-only runs must not mutate state")
+		return errors.New("readonly and record history are mutually exclusive: read-only runs must not mutate state")
 	}
 	if o.ReadOnly && o.CreateInflight {
-		return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not mutate state")
+		return errors.New("readonly and create inflight are mutually exclusive: read-only runs must not mutate state")
 	}

As per coding guidelines: "Error messages should always be lowercase to conform to linting rules".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if o.ReadOnly && o.RecordHistory {
return errors.New("ReadOnly and RecordHistory are mutually exclusive: read-only runs must not mutate state")
}
if o.ReadOnly && o.CreateInflight {
return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not mutate state")
}
if o.ReadOnly && o.RecordHistory {
return errors.New("readonly and record history are mutually exclusive: read-only runs must not mutate state")
}
if o.ReadOnly && o.CreateInflight {
return errors.New("readonly and create inflight are mutually exclusive: read-only runs must not mutate state")
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/scheduling/lib/options.go` around lines 40 - 45, The validation
currently returns error messages that start with uppercase letters; update the
error strings returned when o.ReadOnly && o.RecordHistory and when o.ReadOnly &&
o.CreateInflight to start with lowercase (e.g. change "ReadOnly and
RecordHistory are mutually exclusive: ..." to "readOnly and recordHistory are
mutually exclusive: ..." or similar lowercase wording) in the function that
checks o.ReadOnly, o.RecordHistory and o.CreateInflight so they satisfy the lint
rule; keep the same descriptive text but make the first character lowercase for
both error.New(...) calls.

return nil
}
34 changes: 34 additions & 0 deletions internal/scheduling/lib/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package lib

import "testing"

func TestOptions_Validate(t *testing.T) {
tests := []struct {
name string
opts Options
wantErr bool
}{
{"zero value is valid", Options{}, false},
{"write run with history", Options{RecordHistory: true}, false},
{"write run with inflight", Options{CreateInflight: true}, false},
{"read-only run, no side effects", Options{ReadOnly: true}, false},
{"ReadOnly + RecordHistory is invalid", Options{ReadOnly: true, RecordHistory: true}, true},
{"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, CreateInflight: true}, true},
{"ReadOnly + both invalid", Options{ReadOnly: true, RecordHistory: true, CreateInflight: true}, true},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.opts.Validate()
if tt.wantErr && err == nil {
t.Error("expected error, got nil")
}
if !tt.wantErr && err != nil {
t.Errorf("expected no error, got %v", err)
}
})
}
}
59 changes: 43 additions & 16 deletions internal/scheduling/nova/filter_weigher_pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -38,8 +37,9 @@ type FilterWeigherPipelineController struct {
// Toolbox shared between all pipeline controllers.
lib.BasePipelineController[lib.FilterWeigherPipeline[api.ExternalSchedulerRequest]]

// Mutex to only allow one process at a time
processMu sync.Mutex
// Mutex to only allow one process at a time.
// Read-only runs (opts.ReadOnly == true) acquire a read lock; write runs acquire the full lock.
processMu sync.RWMutex
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Monitor to pass down to all pipelines.
Monitor lib.FilterWeigherPipelineMonitor
Expand All @@ -54,13 +54,23 @@ func (c *FilterWeigherPipelineController) PipelineType() v1alpha1.PipelineType {

// Callback executed when kubernetes asks to reconcile a decision resource.
func (c *FilterWeigherPipelineController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
c.processMu.Lock()
defer c.processMu.Unlock()

// Peek at the decision before acquiring the lock so we can choose the right lock type.
// Read-only runs can proceed concurrently; write runs need the exclusive lock.
decision := &v1alpha1.Decision{}
if err := c.Get(ctx, req.NamespacedName, decision); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if c.peekReadOnly(decision) {
c.processMu.RLock()
defer c.processMu.RUnlock()
} else {
c.processMu.Lock()
defer c.processMu.Unlock()
// Re-fetch after acquiring the exclusive lock to see consistent state.
if err := c.Get(ctx, req.NamespacedName, decision); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
old := decision.DeepCopy()
if err := c.process(ctx, decision); err != nil {
return ctrl.Result{}, err
Expand All @@ -74,13 +84,14 @@ func (c *FilterWeigherPipelineController) Reconcile(ctx context.Context, req ctr

// Process the decision from the API. Should create and return the updated decision.
func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context.Context, decision *v1alpha1.Decision) error {
c.processMu.Lock()
defer c.processMu.Unlock()

pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name]
if !ok {
return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name)
if c.peekReadOnly(decision) {
c.processMu.RLock()
defer c.processMu.RUnlock()
} else {
c.processMu.Lock()
defer c.processMu.Unlock()
}

err := c.process(ctx, decision)
if err != nil {
meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{
Expand All @@ -97,9 +108,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context.
Message: "pipeline run succeeded",
})
}
if pipelineConf.Spec.CreateHistory {
c.upsertHistory(ctx, decision, err)
}
return err
}

Expand Down Expand Up @@ -166,7 +174,14 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision
log.Info("gathered all placement candidates", "numHosts", len(request.Hosts))
}

// Fill RecordHistory from config if the caller didn't set it.
if !request.Options.RecordHistory {
request.Options.RecordHistory = pipelineConf.Spec.CreateHistory
}
result, err := pipeline.Run(request)
if request.Options.RecordHistory {
c.upsertHistory(ctx, decision, err)
}
if err != nil {
log.Error(err, "failed to run pipeline")
return err
Expand All @@ -182,7 +197,19 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision
return nil
}

// The base controller will delegate the pipeline creation down to this method.
// peekReadOnly determines whether a decision should use a read lock instead of
// the exclusive write lock. Defaults to false (exclusive) on any parse error.
func (c *FilterWeigherPipelineController) peekReadOnly(decision *v1alpha1.Decision) bool {
if decision.Spec.NovaRaw == nil {
return false
}
var request api.ExternalSchedulerRequest
if err := json.Unmarshal(decision.Spec.NovaRaw.Raw, &request); err != nil {
return false
}
return request.Options.ReadOnly
}

func (c *FilterWeigherPipelineController) InitPipeline(
ctx context.Context,
p v1alpha1.Pipeline,
Expand Down
Loading