Skip to content

Commit ccb2b87

Browse files
committed
[lb] consistent hashing-based policy
1 parent c25b99a commit ccb2b87

5 files changed

Lines changed: 254 additions & 29 deletions

File tree

internal/config/keys.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,5 @@ const WORKFLOW_THRESHOLD_BASED_POLICY_THRESHOLD = "workflow.offloading.policy.th
111111
// Max number of tasks offloaded at once in the threshold-based offloading policy
112112
const WORKFLOW_THRESHOLD_BASED_POLICY_MAX_OFFLOADED = "workflow.offloading.policy.threshold.offloaded.max"
113113

114-
const LOAD_BALANCER_POLICY = "load.balancer.policy"
114+
const LOAD_BALANCER_POLICY = "loadbalancer.policy"
115+
const LOAD_BALANCER_TARGET_UPDATE_INTERVAL = "loadbalancer.targets.interval"

internal/lb/common.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package lb
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"github.com/serverledge-faas/serverledge/internal/registration"
7+
"io"
8+
"log"
9+
"net/http"
10+
"net/url"
11+
)
12+
13+
func getTargetStatus(targetUrl string) (registration.StatusInformation, error) {
14+
path, _ := url.JoinPath(targetUrl, "/status")
15+
resp, err := http.Get(path)
16+
if err != nil {
17+
log.Fatalf("Invocation to get status failed: %v", err)
18+
return registration.StatusInformation{}, err
19+
}
20+
defer resp.Body.Close()
21+
22+
// Read the response body
23+
body, err := io.ReadAll(resp.Body)
24+
if err != nil {
25+
log.Fatalf("Error reading response body: %v", err)
26+
return registration.StatusInformation{}, err
27+
}
28+
29+
// Check the status code
30+
if resp.StatusCode == http.StatusOK {
31+
var statusInfo registration.StatusInformation
32+
if err := json.Unmarshal(body, &statusInfo); err != nil {
33+
log.Fatalf("Error decoding JSON: %v", err)
34+
return registration.StatusInformation{}, errors.New("could not get status information")
35+
}
36+
return statusInfo, nil
37+
}
38+
39+
return registration.StatusInformation{}, errors.New("could not get status information")
40+
}

internal/lb/consthash_policy.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package lb
2+
3+
import (
4+
"crypto/sha256"
5+
"errors"
6+
"github.com/serverledge-faas/serverledge/internal/function"
7+
"github.com/serverledge-faas/serverledge/internal/registration"
8+
"log"
9+
"sort"
10+
"sync"
11+
)
12+
13+
type constHashBalancer struct {
14+
ring *Ring
15+
availMemory map[string]int64
16+
mutex sync.RWMutex
17+
}
18+
19+
func (c *constHashBalancer) OnRequestComplete(funcName string, nodeKey string) {
20+
c.mutex.Lock()
21+
defer c.mutex.Unlock()
22+
f, found := function.GetFunction(funcName)
23+
if !found {
24+
return
25+
}
26+
c.availMemory[nodeKey] += f.MemoryMB
27+
}
28+
29+
type ringElem struct {
30+
key uint64
31+
node string
32+
}
33+
34+
// Ring is a structure that maintains nodes in sorted order
35+
type Ring struct {
36+
nodes []ringElem
37+
}
38+
39+
// Helper function to generate a key
40+
func hash(key string) uint64 {
41+
hash := sha256.New()
42+
hash.Write([]byte(key))
43+
hashBytes := hash.Sum(nil)
44+
var hashValue uint64
45+
for _, b := range hashBytes[:8] { // Use only the first 8 bytes to get a uint64
46+
hashValue = hashValue<<8 + uint64(b)
47+
}
48+
return hashValue
49+
}
50+
51+
// Adds a node to the ring
52+
func (r *Ring) addNode(node *registration.NodeRegistration) {
53+
key := hash(node.APIUrl())
54+
r.nodes = append(r.nodes, ringElem{key: key, node: node.Key})
55+
sort.Slice(r.nodes, func(i, j int) bool {
56+
return r.nodes[i].key < r.nodes[j].key
57+
})
58+
}
59+
60+
func newConstHashBalancer() *constHashBalancer {
61+
ring := &Ring{}
62+
ring.nodes = make([]ringElem, 0)
63+
64+
for _, node := range currentTargets {
65+
ring.addNode(&node)
66+
}
67+
68+
c := &constHashBalancer{ring: ring}
69+
c.availMemory = make(map[string]int64)
70+
return c
71+
}
72+
73+
func (c *constHashBalancer) Route(funcName string) (string, error) {
74+
f, ok := function.GetFunction(funcName)
75+
if !ok {
76+
return "", errors.New("function not found")
77+
}
78+
key := hash(funcName)
79+
80+
c.mutex.Lock()
81+
defer c.mutex.Unlock()
82+
83+
// Find the index where the key should be inserted
84+
startIndex := sort.Search(len(c.ring.nodes), func(i int) bool {
85+
return c.ring.nodes[i].key > key
86+
})
87+
88+
if startIndex >= len(c.ring.nodes) {
89+
startIndex = 0
90+
}
91+
92+
// Check nodes starting from the startIndex and wrap around if necessary
93+
for _, entry := range append(c.ring.nodes[startIndex:], c.ring.nodes[:startIndex]...) {
94+
nodeKey := entry.node
95+
availMemory, ok := c.availMemory[nodeKey]
96+
if ok && availMemory >= f.MemoryMB {
97+
//log.Printf("Using %s for function %s", nodeKey, funcName)
98+
c.availMemory[nodeKey] -= f.MemoryMB // TODO: inaccurate, if a warm container is available
99+
return nodeKey, nil
100+
} else if !ok {
101+
log.Printf("WARNING: no info about available memory for node %s", nodeKey)
102+
}
103+
}
104+
105+
return "", errors.New("no available target")
106+
}
107+
func (c *constHashBalancer) OnNodeArrival(registration *registration.NodeRegistration) {
108+
c.mutex.Lock()
109+
defer c.mutex.Unlock()
110+
111+
c.ring.addNode(registration)
112+
}
113+
114+
func (c *constHashBalancer) OnNodeDeletion(registration *registration.NodeRegistration) {
115+
c.mutex.Lock()
116+
defer c.mutex.Unlock()
117+
118+
toDelete := -1
119+
for i, n := range c.ring.nodes {
120+
if n.node == registration.Key {
121+
toDelete = i
122+
break
123+
}
124+
}
125+
126+
if toDelete != -1 {
127+
c.ring.nodes = append(c.ring.nodes[:toDelete], c.ring.nodes[toDelete+1:]...)
128+
delete(c.availMemory, registration.Key)
129+
}
130+
}
131+
132+
func (c *constHashBalancer) OnStatusUpdate(registration *registration.NodeRegistration, status registration.StatusInformation) {
133+
c.mutex.Lock()
134+
defer c.mutex.Unlock()
135+
136+
c.availMemory[registration.Key] = status.TotalMemory - status.UsedMemory
137+
}

internal/lb/lb.go

Lines changed: 57 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ var targetsMutex sync.RWMutex
2323
var currentTargets map[string]registration.NodeRegistration
2424

2525
func newPolicy() policy {
26-
policyName := config.GetString(config.LOAD_BALANCER_POLICY, "round-robin")
26+
policyName := config.GetString(config.LOAD_BALANCER_POLICY, "const-hash")
2727
if policyName == "random" {
2828
return &randomPolicy{}
29+
} else if policyName == "const-hash" {
30+
return newConstHashBalancer()
2931
} else {
3032
panic("unknown policy: " + policyName)
3133
}
@@ -35,10 +37,19 @@ func handleInvoke(c echo.Context) error {
3537

3638
funcName := c.Param("fun")
3739
// Select backend
38-
targetURL, err := lbPolicy.Route(funcName)
40+
targetKey, err := lbPolicy.Route(funcName)
41+
if err != nil {
42+
return c.JSON(http.StatusTooManyRequests, map[string]string{"error": err.Error()})
43+
}
44+
targetsMutex.RLock()
45+
targetNode, ok := currentTargets[targetKey]
46+
if !ok {
47+
return c.JSON(http.StatusTooManyRequests, map[string]string{"error": "missing target"})
48+
}
49+
targetsMutex.RUnlock()
3950

4051
// Create a new HTTP request to forward to the selected backend
41-
newURL, _ := url.JoinPath(targetURL, c.Request().RequestURI)
52+
newURL, _ := url.JoinPath(targetNode.APIUrl(), c.Request().RequestURI)
4253
req, err := http.NewRequest(c.Request().Method, newURL, c.Request().Body)
4354
if err != nil {
4455
return err
@@ -55,7 +66,7 @@ func handleInvoke(c echo.Context) error {
5566

5667
if resp.StatusCode == http.StatusOK {
5768
// function has been actually executed
58-
// TODO: update info?
69+
lbPolicy.OnRequestComplete(funcName, targetKey)
5970
}
6071

6172
res := c.Response()
@@ -126,7 +137,7 @@ func StartReverseProxy(e *echo.Echo, region string) {
126137
log.Printf("Initializing with %d targets.\n", len(currentTargets))
127138
lbPolicy = newPolicy()
128139

129-
go updateTargets(region)
140+
updateTargets(region)
130141

131142
tr := &http.Transport{
132143
MaxIdleConns: 2500,
@@ -151,30 +162,49 @@ func StartReverseProxy(e *echo.Echo, region string) {
151162
}
152163

153164
func updateTargets(region string) {
154-
for {
155-
time.Sleep(10 * time.Second) // TODO: configure
156-
157-
newTargets, err := registration.GetNodesInArea(region, false, 0)
158-
if err != nil || newTargets == nil {
159-
log.Printf("Cannot update targets: %v\n", err)
160-
}
161165

162-
targetsMutex.Lock()
163-
for k, _ := range currentTargets {
164-
if _, ok := newTargets[k]; !ok {
165-
// this target is not present any more
166-
log.Printf("Removing target: %s\n", k)
166+
interval := config.GetInt(config.LOAD_BALANCER_TARGET_UPDATE_INTERVAL, 30)
167+
ticker := time.NewTicker(time.Duration(interval) * time.Second)
168+
169+
go func() {
170+
for {
171+
select {
172+
case <-ticker.C:
173+
// retrieve new targets
174+
newTargets, err := registration.GetNodesInArea(region, false, 0)
175+
if err != nil || newTargets == nil {
176+
log.Printf("Cannot update targets: %v\n", err)
177+
}
178+
179+
targetsMutex.Lock()
180+
for k, v := range currentTargets {
181+
if _, ok := newTargets[k]; !ok {
182+
// this target is not present any more
183+
log.Printf("Removing target: %s\n", k)
184+
lbPolicy.OnNodeDeletion(&v)
185+
}
186+
}
187+
188+
for k, v := range newTargets {
189+
if _, ok := currentTargets[k]; !ok {
190+
// this target was not present
191+
log.Printf("Adding new target: %s\n", k)
192+
lbPolicy.OnNodeArrival(&v)
193+
}
194+
}
195+
196+
currentTargets = newTargets
197+
targetsMutex.Unlock()
198+
199+
// query status of each target
200+
for _, v := range currentTargets {
201+
status, err := getTargetStatus(v.APIUrl())
202+
if err == nil {
203+
lbPolicy.OnStatusUpdate(&v, status)
204+
}
205+
}
167206
}
168207
}
208+
}()
169209

170-
for k, _ := range newTargets {
171-
if _, ok := currentTargets[k]; !ok {
172-
// this target was not present
173-
log.Printf("Adding new target: %s\n", k)
174-
}
175-
}
176-
177-
currentTargets = newTargets
178-
targetsMutex.Unlock()
179-
}
180210
}

internal/lb/policy.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,25 @@ package lb
22

33
import (
44
"errors"
5+
"github.com/serverledge-faas/serverledge/internal/registration"
56
"math/rand"
67
)
78

89
type policy interface {
910
Route(funcName string) (string, error)
11+
OnNodeArrival(registration *registration.NodeRegistration)
12+
OnNodeDeletion(registration *registration.NodeRegistration)
13+
OnStatusUpdate(registration *registration.NodeRegistration, status registration.StatusInformation)
14+
OnRequestComplete(funcName string, nodeKey string)
1015
}
1116

1217
type randomPolicy struct{}
1318

19+
func (r *randomPolicy) OnRequestComplete(funcName string, node string) {
20+
//TODO implement me
21+
panic("implement me")
22+
}
23+
1424
func (r *randomPolicy) Route(funcName string) (string, error) {
1525
targetsMutex.RLock()
1626
defer targetsMutex.RUnlock()
@@ -20,10 +30,17 @@ func (r *randomPolicy) Route(funcName string) (string, error) {
2030

2131
for _, value := range currentTargets {
2232
if i == skip {
23-
return value.APIUrl(), nil
33+
return value.Key, nil
2434
}
2535
i++
2636
}
2737

2838
return "", errors.New("no targets")
2939
}
40+
41+
func (r *randomPolicy) OnNodeArrival(registration *registration.NodeRegistration) {}
42+
43+
func (r *randomPolicy) OnNodeDeletion(registration *registration.NodeRegistration) {}
44+
45+
func (r *randomPolicy) OnStatusUpdate(registration *registration.NodeRegistration, status registration.StatusInformation) {
46+
}

0 commit comments

Comments
 (0)