Skip to content

Commit 14bd5f0

Browse files
committed
Updated ILP policy
1 parent 24a7bcc commit 14bd5f0

9 files changed

Lines changed: 56 additions & 41 deletions

File tree

internal/api/api.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,8 @@ func GetServerStatus(c echo.Context) error {
240240
response := registration.StatusInformation{
241241
AvailableWarmContainers: node.WarmStatus(),
242242
TotalMemory: node.LocalResources.TotalMemory(),
243-
UsedMemory: node.LocalResources.UsedMemory(),
243+
AvailableMemory: node.LocalResources.AvailableMemory(),
244+
FreeMemory: node.LocalResources.FreeMemory(),
244245
TotalCPU: node.LocalResources.TotalCPUs(),
245246
UsedCPU: node.LocalResources.UsedCPUs(),
246247
Coordinates: *registration.VivaldiClient.GetCoordinate(),

internal/config/keys.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ const WORKFLOW_OFFLOADING_POLICY_REGION_COST = "workflow.offloading.policy.regio
107107
const WORKFLOW_OFFLOADING_POLICY_ILP_OBJ_WEIGHT_VIOLATIONS = "workflow.offloading.policy.ilp.obj.violations"
108108
const WORKFLOW_OFFLOADING_POLICY_ILP_OBJ_WEIGHT_DATA_TRANSFERS = "workflow.offloading.policy.ilp.obj.data"
109109
const WORKFLOW_OFFLOADING_POLICY_ILP_OBJ_WEIGHT_COST = "workflow.offloading.policy.ilp.obj.cost"
110+
const WORKFLOW_OFFLOADING_POLICY_ILP_OBJ_WEIGHT_RECLAIMED_MEMORY = "workflow.offloading.policy.ilp.obj.reclaimed"
110111

111112
// Estimated bandwidth between the current node and the data store
112113
const WORKFLOW_OFFLOADING_POLICY_NODE_TO_DATA_STORE_BANDWIDTH = "workflow.offloading.policy.node2datastore.bandwidth"

internal/container/container.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func sendPostRequestWithRetries(url string, body *bytes.Buffer) (*http.Response,
168168
}
169169
}
170170

171-
return nil, time.Duration(totalWaitMillis * int(time.Millisecond)), err
171+
return nil, time.Duration(totalWaitMillis * int(time.Millisecond)), fmt.Errorf("exceeded max number of retries: %v", err)
172172
}
173173

174174
func minInt(a, b int) int {

internal/lb/consthash_policy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,5 +133,5 @@ func (c *constHashBalancer) OnStatusUpdate(registration *registration.NodeRegist
133133
c.mutex.Lock()
134134
defer c.mutex.Unlock()
135135

136-
c.availMemory[registration.Key] = status.TotalMemory - status.UsedMemory
136+
c.availMemory[registration.Key] = status.AvailableMemory
137137
}

internal/registration/types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ type NodeRegistration struct {
2020
type StatusInformation struct {
2121
AvailableWarmContainers map[string]int // <k, v> = <function name, warm container number>
2222
TotalMemory int64
23-
UsedMemory int64
23+
AvailableMemory int64
24+
FreeMemory int64
2425
TotalCPU float64
2526
UsedCPU float64
2627
Coordinates vivaldi.Coordinate

internal/registration/udp.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ func getCurrentStatusInformation() (status []byte, err error) {
7070
AvailableWarmContainers: node.WarmStatus(),
7171
TotalMemory: node.LocalResources.TotalMemory(),
7272
TotalCPU: node.LocalResources.TotalCPUs(),
73-
UsedMemory: node.LocalResources.UsedMemory(),
73+
AvailableMemory: node.LocalResources.AvailableMemory(),
74+
FreeMemory: node.LocalResources.FreeMemory(),
7475
UsedCPU: node.LocalResources.UsedCPUs(),
7576
Coordinates: *VivaldiClient.GetCoordinate(),
7677
}

internal/workflow/ilp_offloading_policy.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/serverledge-faas/serverledge/internal/config"
88
"log"
99
"net/http"
10+
"time"
1011
)
1112

1213
type IlpOffloadingPolicy struct{}
@@ -38,6 +39,8 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
3839

3940
params := prepareParameters(r, p)
4041

42+
t0 := time.Now()
43+
4144
// Serialize to JSON
4245
jsonData, err := json.Marshal(params)
4346
if err != nil {
@@ -67,6 +70,9 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
6770
return OffloadingDecision{Offload: false}, fmt.Errorf("scheduling failed with status code %d", statusCode)
6871
}
6972

73+
solverTime := time.Since(t0)
74+
log.Printf("solver time for %s: %v\n", r.W.Name, solverTime.Seconds())
75+
7076
// Read and print response
7177
var placement taskPlacement
7278
err = json.NewDecoder(resp.Body).Decode(&placement)

internal/workflow/remote_offloading_policy.go

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,20 @@ import (
1818
)
1919

2020
type remotePolicyParams struct {
21-
CloudNodes []string `json:"cloud_nodes"` // Set of Cloud nodes
22-
EdgeNodes []string `json:"edge_nodes"` // Set of Edge nodes
23-
NodeMemory map[string]float64 `json:"node_memory"` // Memory per node
24-
DSLatency map[string]float64 `json:"ds_latency"` // Latency per node
25-
DSBandwidth map[string]float64 `json:"ds_bandwidth"` // Bandwidth per node
26-
NodeLatency map[string]float64 `json:"node_latency"` // map[json.dumps((src_node, dst_node))] = latency
27-
HandlingNode string `json:"handling_node"`
28-
T []string `json:"T"` // Set of tasks
29-
Adj map[string][]string `json:"adj"` // Task adjacency list
30-
TaskMemory map[string]float64 `json:"task_memory"` // Memory per task
31-
Deadline float64 `json:"deadline"` // Global deadline
32-
OutputSize map[string]float64 `json:"output_size"` // Output size per task
33-
InputSize float64 `json:"input_size"` // Input data size
21+
CloudNodes []string `json:"cloud_nodes"` // Set of Cloud nodes
22+
EdgeNodes []string `json:"edge_nodes"` // Set of Edge nodes
23+
NodeAvailableMemory map[string]float64 `json:"node_available_memory"` // Available memory per node
24+
NodeFreeMemory map[string]float64 `json:"node_free_memory"` // Free memory per node
25+
DSLatency map[string]float64 `json:"ds_latency"` // Latency per node
26+
DSBandwidth map[string]float64 `json:"ds_bandwidth"` // Bandwidth per node
27+
NodeLatency map[string]float64 `json:"node_latency"` // map[json.dumps((src_node, dst_node))] = latency
28+
HandlingNode string `json:"handling_node"`
29+
T []string `json:"T"` // Set of tasks
30+
Adj map[string][]string `json:"adj"` // Task adjacency list
31+
TaskMemory map[string]float64 `json:"task_memory"` // Memory per task
32+
Deadline float64 `json:"deadline"` // Global deadline
33+
OutputSize map[string]float64 `json:"output_size"` // Output size per task
34+
InputSize float64 `json:"input_size"` // Input data size
3435

3536
ObjWeights []float64 `json:"obj_weights"` // Objective terms weights
3637
Cost map[string]float64 `json:"cost"` // Computation cost
@@ -45,19 +46,20 @@ type taskPlacement map[TaskId]string
4546

4647
func initParams() remotePolicyParams {
4748
return remotePolicyParams{
48-
Adj: make(map[string][]string),
49-
ExecTime: make(map[string]float64),
50-
InitTime: make(map[string]float64),
51-
OutputSize: make(map[string]float64),
52-
NodeMemory: make(map[string]float64),
53-
Cost: make(map[string]float64),
54-
TaskMemory: make(map[string]float64),
55-
NodeLabels: make(map[string][]string),
56-
TaskLabels: make(map[string][]string),
57-
DSBandwidth: make(map[string]float64),
58-
DSLatency: make(map[string]float64),
59-
NodeLatency: make(map[string]float64),
60-
ObjWeights: []float64{0.33, 0.33, 0.33},
49+
Adj: make(map[string][]string),
50+
ExecTime: make(map[string]float64),
51+
InitTime: make(map[string]float64),
52+
OutputSize: make(map[string]float64),
53+
NodeAvailableMemory: make(map[string]float64),
54+
NodeFreeMemory: make(map[string]float64),
55+
Cost: make(map[string]float64),
56+
TaskMemory: make(map[string]float64),
57+
NodeLabels: make(map[string][]string),
58+
TaskLabels: make(map[string][]string),
59+
DSBandwidth: make(map[string]float64),
60+
DSLatency: make(map[string]float64),
61+
NodeLatency: make(map[string]float64),
62+
ObjWeights: []float64{0.33, 0.33, 0.33},
6163
}
6264
}
6365

@@ -180,12 +182,14 @@ func prepareParameters(r *Request, p *Progress) *remotePolicyParams {
180182
params.EdgeNodes = []string{LOCAL}
181183
params.Deadline = r.QoS.MaxRespT - time.Now().Sub(r.Arrival).Seconds()
182184
params.HandlingNode = LOCAL
183-
params.NodeMemory[LOCAL] = (float64)(node.LocalResources.AvailableMemory())
185+
params.NodeAvailableMemory[LOCAL] = (float64)(node.LocalResources.AvailableMemory())
186+
params.NodeFreeMemory[LOCAL] = (float64)(node.LocalResources.FreeMemory())
184187

185-
wViolations := config.GetFloat(config.WORKFLOW_OFFLOADING_POLICY_ILP_OBJ_WEIGHT_VIOLATIONS, 0.33)
186-
wDataTransfers := config.GetFloat(config.WORKFLOW_OFFLOADING_POLICY_ILP_OBJ_WEIGHT_DATA_TRANSFERS, 0.33)
187-
wCost := config.GetFloat(config.WORKFLOW_OFFLOADING_POLICY_ILP_OBJ_WEIGHT_COST, 0.33)
188-
params.ObjWeights = []float64{wViolations, wDataTransfers, wCost}
188+
wViolations := config.GetFloat(config.WORKFLOW_OFFLOADING_POLICY_ILP_OBJ_WEIGHT_VIOLATIONS, 0.3)
189+
wDataTransfers := config.GetFloat(config.WORKFLOW_OFFLOADING_POLICY_ILP_OBJ_WEIGHT_DATA_TRANSFERS, 0.3)
190+
wCost := config.GetFloat(config.WORKFLOW_OFFLOADING_POLICY_ILP_OBJ_WEIGHT_COST, 0.3)
191+
wReclaimedMemory := config.GetFloat(config.WORKFLOW_OFFLOADING_POLICY_ILP_OBJ_WEIGHT_RECLAIMED_MEMORY, 0.1)
192+
params.ObjWeights = []float64{wViolations, wDataTransfers, wCost, wReclaimedMemory}
189193

190194
regionCost := config.GetStringMapFloat64(config.WORKFLOW_OFFLOADING_POLICY_REGION_COST)
191195
// TODO: ToLower() is needed because viper (used to parse configuration files) is not case sensitive
@@ -201,9 +205,10 @@ func prepareParameters(r *Request, p *Progress) *remotePolicyParams {
201205
nearbyServers := registration.GetFullNeighborInfo()
202206
if nearbyServers != nil {
203207
for k, v := range nearbyServers {
204-
if (v.TotalMemory-v.UsedMemory) > 0 && (v.TotalCPU-v.UsedCPU) > 0 {
208+
if v.AvailableMemory > 0 && (v.TotalCPU-v.UsedCPU) > 0 {
205209
params.EdgeNodes = append(params.EdgeNodes, k)
206-
params.NodeMemory[k] = float64(v.TotalMemory - v.UsedMemory)
210+
params.NodeAvailableMemory[k] = float64(v.AvailableMemory)
211+
params.NodeFreeMemory[k] = float64(v.FreeMemory)
207212

208213
// Cost (assuming that Edge nodes are all in the same area)
209214
params.Cost[k] = localCost

internal/workflow/threshold_policy.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,10 @@ func (policy *ThresholdBasedPolicy) Evaluate(r *Request, p *Progress) (Offloadin
102102
if nearbyServers != nil {
103103
for k, v := range nearbyServers {
104104
// TODO: apply a threshold here ?
105-
if (v.TotalMemory - v.UsedMemory) >= offloadedMemory { // TODO: should look at free memory (ignoring warm containers)
106-
if offloadingTarget == "" || (v.TotalMemory-v.UsedMemory) > offloadingTargetMem {
105+
if (v.AvailableMemory) >= offloadedMemory { // TODO: should look at free memory for ranking (ignoring warm containers)
106+
if offloadingTarget == "" || v.AvailableMemory > offloadingTargetMem {
107107
offloadingTarget = k
108-
offloadingTargetMem = v.TotalMemory - v.UsedMemory
108+
offloadingTargetMem = v.AvailableMemory
109109
}
110110
} else {
111111
log.Printf("Not enough memory to offload to %v", k)

0 commit comments

Comments
 (0)