feat: Implement Least Request Load Balancing Policy (gRFC A48)#2651
feat: Implement Least Request Load Balancing Policy (gRFC A48)#2651emil10001 wants to merge 3 commits into
Conversation
Implements the "all weights equal" Least Request Load Balancing policy in
gRPC-Rust, in compliance with gRFC A48. The Least Request policy improves tail
latencies in heterogeneous environments by tracking active request counts per
endpoint and directing new requests to the backend with the lowest load.
Detailed Changes:
1. Core Load Balancing Policy (`least_request.rs`):
- Defined `LeastRequestLoadBalancingConfig` to parse and validate the
`choiceCount` config parameter (default = 2, clamped from 2 to 10).
- Implemented `LeastRequestBuilder` registering policy name
`least_request_experimental`.
- Implemented `LeastRequestPolicy` managing endpoint-level connections
via `ChildManager` children delegating to `pick_first`.
- Maintained a persistent mapping of weak subchannel references to active
request counters (`subchannel_counters`) so that outstanding request
metrics survive picker updates and name re-resolutions.
- Implemented `LeastRequestPicker` utilizing a random sampling selection
algorithm over `choice_count` subchannels.
2. Active Request Cancellation Safety:
- Identified and resolved a request counter leak bug where async task
cancellations during `dyn_invoke.await` dropped the `Pick` closure
without calling it.
- Implemented a custom, defusable `ActiveRequestGuard` using an `AtomicBool`
inside `LeastRequestPicker::pick`. The guard guarantees that the active
request count is decremented upon drop if the picker's `on_complete`
callback is never invoked.
3. Channel & Service Config Integration:
- Registered the builder with the global LB registry in `Channel::new`
inside `channel.rs`.
- Added `CallbackRecvStream` wrapping the stream in the channel's
`Invoke` implementation to trigger `on_complete` callbacks when client
streams are completed or dropped.
- Added `LeastRequest` variant to `LbPolicyType` enum in `service_config.rs`.
- Mapped `LbPolicyType::LeastRequest` configuration inside
`ResolverChannelController::update` in `channel.rs`.
4. Test Additions & Verification:
- Added comprehensive unit tests in `least_request.rs` covering configuration
parsing/clamping/validation, least request selection, tie-breaking,
fewer subchannels than choice count, and cancellation drop-guard safety.
- Modified the `InMemoryResolver` in `inmemory/mod.rs` to dynamically set
the `LeastRequest` load-balancing policy based on target URI path prefixes.
- Wrote a robust E2E integration test `test_in_memory_least_request_load_balancing`
in `inmemory/mod.rs` verifying dynamic load balancing across multiple
in-memory backends concurrently.
|
/gemini review |
dfawley
left a comment
There was a problem hiding this comment.
Thanks for the PR! I haven't looked at the tests yet, but here's a first round of review comments.
| #[default] | ||
| PickFirst, | ||
| RoundRobin, | ||
| LeastRequest, |
There was a problem hiding this comment.
This is a pre-defined list of LB policies limited to just PF and RR. But I think we can do this for now if we put a TODO to remove. cc @nathanielford for when he implements support for the newer field that lets you specify any policy.
| fn build(&self, target: &Target, options: ResolverOptions) -> Box<dyn Resolver> { | ||
| let path = target.path().strip_prefix('/').unwrap_or(target.path()); | ||
| let ids: Vec<String> = path.split(',').map(|s| s.to_string()).collect(); | ||
| let (lb_policy, rest) = if let Some(stripped) = path.strip_prefix("leastrequest/") { |
There was a problem hiding this comment.
The LB policy should get set by specifying a default service config, not through the in memory resolver. We don't have that yet, though, so this is fine for now since this module is unexported and unused except in tests.
| fn parse_config( | ||
| &self, | ||
| config: &ParsedJsonLbConfig, | ||
| ) -> Result<Option<<Self::LbPolicy as LbPolicy>::LbConfig>, String> { |
There was a problem hiding this comment.
I believe you can just name the type here if you want to avoid all the colons and "as"es.. Result<LeastRequestLoadBalancingConfig, String>?
| let choice_count = parsed.choice_count.min(10); | ||
| Ok(Some(LeastRequestLoadBalancingConfig { choice_count })) |
There was a problem hiding this comment.
Optional: A style choice. This could also be parsed.choice_count = ...min(10) and then return Ok(Some(parsed)). (parsed would need to be declared as mut)
| pub(crate) struct LeastRequestPolicy { | ||
| child_manager: ChildManager<Endpoint>, | ||
| pick_first_builder: Arc<DynLbPolicyBuilder>, | ||
| choice_count: u32, |
There was a problem hiding this comment.
Consider storing an Option<Config> here instead of the flat config fields.
| PickResult::Pick(crate::client::load_balancing::Pick { | ||
| subchannel: selected.subchannel.clone(), | ||
| metadata: crate::metadata::MetadataMap::new(), |
| .state | ||
| .picker | ||
| .pick(&crate::core::RequestHeaders::default()) |
There was a problem hiding this comment.
We should not be making picks for fake RPCs. If we can't map via the child itself, then we'll need another way to retrieve its active subchannel.
| // Clean up stale counters | ||
| self.subchannel_counters | ||
| .retain(|weak, _| weak.upgrade().is_some()); |
There was a problem hiding this comment.
I think this would be better if it reused the known-active subchannels that we got above instead of keeping all the ones that happen to still upgrade.
| } | ||
| let aggregate_state = self.child_manager.aggregate_states(); | ||
|
|
||
| if aggregate_state == ConnectivityState::Ready { |
There was a problem hiding this comment.
Optional: maybe let picker = if ... and send the update commonly between the if/else.
| let picker = self | ||
| .child_manager | ||
| .children() | ||
| .find(|cs| cs.state.connectivity_state == aggregate_state) | ||
| .map(|cs| cs.state.picker.clone()) | ||
| .unwrap_or_else(|| { | ||
| Arc::new(crate::client::load_balancing::QueuingPicker) as Arc<dyn Picker> | ||
| }); |
There was a problem hiding this comment.
What picker is coming out of this? Is it just the first child whose state is the same as the aggregate state? We should round-robin over all of those children that match instead. You should ideally share the existing RoundRobin picker for this.
Implements the "all weights equal" Least Request Load Balancing policy in gRPC-Rust, in compliance with gRFC A48. The Least Request policy improves tail latencies in heterogeneous environments by tracking active request counts per endpoint and directing new requests to the backend with the lowest load.
Detailed Changes:
Core Load Balancing Policy (
least_request.rs):LeastRequestLoadBalancingConfigto parse and validate thechoiceCountconfig parameter (default = 2, clamped from 2 to 10).LeastRequestBuilderregistering policy nameleast_request_experimental.LeastRequestPolicymanaging endpoint-level connections viaChildManagerchildren delegating topick_first.subchannel_counters) so that outstanding request metrics survive picker updates and name re-resolutions.LeastRequestPickerutilizing a random sampling selection algorithm overchoice_countsubchannels.Active Request Cancellation Safety:
dyn_invoke.awaitdropped thePickclosure without calling it.ActiveRequestGuardusing anAtomicBoolinsideLeastRequestPicker::pick. The guard guarantees that the active request count is decremented upon drop if the picker'son_completecallback is never invoked.Channel & Service Config Integration:
Channel::newinsidechannel.rs.CallbackRecvStreamwrapping the stream in the channel'sInvokeimplementation to triggeron_completecallbacks when client streams are completed or dropped.LeastRequestvariant toLbPolicyTypeenum inservice_config.rs.LbPolicyType::LeastRequestconfiguration insideResolverChannelController::updateinchannel.rs.Test Additions & Verification:
least_request.rscovering configuration parsing/clamping/validation, least request selection, tie-breaking, fewer subchannels than choice count, and cancellation drop-guard safety.InMemoryResolverininmemory/mod.rsto dynamically set theLeastRequestload-balancing policy based on target URI path prefixes.test_in_memory_least_request_load_balancingininmemory/mod.rsverifying dynamic load balancing across multiple in-memory backends concurrently.Motivation
Solution