-
Notifications
You must be signed in to change notification settings - Fork 46
feat(scheduler): optimize worker scheduling to O(1) using Valkey/Redi… #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -164,6 +164,13 @@ func (s *Persistence) CreateWorker(ctx context.Context, worker *ateapipb.Worker) | |
| return store.ErrAlreadyExists | ||
| } | ||
|
|
||
| // Add to the idle set. | ||
| setKey := fmt.Sprintf("pool:%s:%s:idle_workers", worker.GetWorkerNamespace(), worker.GetWorkerPool()) | ||
| err = s.rdb.SAdd(ctx, setKey, worker.GetWorkerPod()).Err() | ||
| if err != nil { | ||
| return fmt.Errorf("while registering worker in idle set: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -192,6 +199,7 @@ func (s *Persistence) GetWorker(ctx context.Context, namespace, pool, pod string | |
|
|
||
| func (s *Persistence) UpdateWorker(ctx context.Context, worker *ateapipb.Worker, expectedVersion int64) error { | ||
| dbKey := workerDBKey(worker.GetWorkerNamespace(), worker.GetWorkerPool(), worker.GetWorkerPod()) | ||
| var shouldAddToIdle bool | ||
|
|
||
| // Clone because we will update the version field, and we don't want to | ||
| // stomp the caller's copy. | ||
|
|
@@ -228,6 +236,10 @@ func (s *Persistence) UpdateWorker(ctx context.Context, worker *ateapipb.Worker, | |
| return fmt.Errorf("ip is immutable") | ||
| } | ||
|
|
||
| if currentWorker.GetActorId() != "" && dbWorker.GetActorId() == "" { | ||
| shouldAddToIdle = true | ||
| } | ||
|
|
||
| newVal, err := protojson.Marshal(dbWorker) | ||
| if err != nil { | ||
| return fmt.Errorf("in protojson.Marshal: %w", err) | ||
|
|
@@ -246,15 +258,32 @@ func (s *Persistence) UpdateWorker(ctx context.Context, worker *ateapipb.Worker, | |
| return fmt.Errorf("while executing update worker transaction: %w", err) | ||
| } | ||
|
|
||
| // Run SAdd sequentially outside the transaction to avoid cluster slot restrictions. | ||
| if shouldAddToIdle { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs to be a three phase process.
Otherwise, a crash between steps 1 and 2 will permanently leak the worker from consideration. With the three-phase approach, we can have an additional background thread sweeping workers that are "free, but not yet returned to the free set" back to the free set.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we do have a re-sync process? Will need to double check. But I will make this change. Thanks Taahir. |
||
| setKey := fmt.Sprintf("pool:%s:%s:idle_workers", worker.GetWorkerNamespace(), worker.GetWorkerPool()) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One of our design goals is to let all ActorTemplates share the same WorkerPool to maximize efficiency. In that case, this will mean that we are sending all restore traffic through a single hash bucket (and thus, one single valkey node). We are going to need to solve this problem, and I'm having difficulty seeing how it could be solved within redis. However, this is heaps better than the current strategy of "read every worker all the time".
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 100% I have a much more advanced change that handles locality but because it is much more ossifying I felt this was a decent intermediate step while the datamodel and DB choices were finalized. |
||
| err = s.rdb.SAdd(ctx, setKey, worker.GetWorkerPod()).Err() | ||
| if err != nil { | ||
| return fmt.Errorf("while returning worker to idle set: %w", err) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (s *Persistence) DeleteWorker(ctx context.Context, namespace, pool, pod string) error { | ||
| dbKey := workerDBKey(namespace, pool, pod) | ||
| setKey := fmt.Sprintf("pool:%s:%s:idle_workers", namespace, pool) | ||
|
|
||
| err := s.rdb.Del(ctx, dbKey).Err() | ||
| if err != nil { | ||
| return fmt.Errorf("while deleting worker key %q: %w", dbKey, err) | ||
| } | ||
|
|
||
| err = s.rdb.SRem(ctx, setKey, pod).Err() | ||
| if err != nil { | ||
| return fmt.Errorf("while removing worker from idle set: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -452,3 +481,50 @@ func (s *Persistence) ReleaseLock(ctx context.Context, key string, value string) | |
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (s *Persistence) ClaimIdleWorker(ctx context.Context, namespace, pool string, actorID string, actorNamespace string, actorTemplate string) (*ateapipb.Worker, error) { | ||
| setKey := fmt.Sprintf("pool:%s:%s:idle_workers", namespace, pool) | ||
|
|
||
| for { | ||
| // Pop a random idle worker name. | ||
| podName, err := s.rdb.SPop(ctx, setKey).Result() | ||
| if err != nil { | ||
| if errors.Is(err, redis.Nil) { | ||
| return nil, store.ErrNotFound | ||
| } | ||
| return nil, fmt.Errorf("while popping idle worker from set: %w", err) | ||
| } | ||
|
|
||
| worker, err := s.GetWorker(ctx, namespace, pool, podName) | ||
| if err != nil { | ||
| // If the worker was deleted, skip and pop the next one. | ||
| if errors.Is(err, store.ErrNotFound) { | ||
| continue | ||
| } | ||
| _ = s.rdb.SAdd(ctx, setKey, podName).Err() | ||
| return nil, fmt.Errorf("while loading popped worker metadata: %w", err) | ||
| } | ||
|
|
||
| if worker.GetActorId() != "" { | ||
| // Skip busy workers. | ||
| continue | ||
| } | ||
|
|
||
| worker.ActorId = actorID | ||
| worker.ActorNamespace = actorNamespace | ||
| worker.ActorTemplate = actorTemplate | ||
|
|
||
| err = s.UpdateWorker(ctx, worker, worker.Version) | ||
| if err != nil { | ||
| if errors.Is(err, store.ErrPersistenceRetry) { | ||
| // Return to the idle set and retry on locking conflict. | ||
| _ = s.rdb.SAdd(ctx, setKey, podName).Err() | ||
| continue | ||
| } | ||
| _ = s.rdb.SAdd(ctx, setKey, podName).Err() | ||
| return nil, fmt.Errorf("while claiming popped worker: %w", err) | ||
| } | ||
|
|
||
| return worker, nil | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will never be non-empty during resume (except maybe on retry of a failed resume?)