RDKEMW-17528: Fix memory leak from missing session/event cleanup on WebSocket disconnect.#946
RDKEMW-17528: Fix memory leak from missing session/event cleanup on WebSocket disconnect.#946kvfasil wants to merge 4 commits into
Conversation
…ebSocket disconnect *Sessions were registered by connection_id but cleanup used session_id,causing remove_session() to be a no-op. *Event listeners and empty map entries accumulated indefinitely across disconnects. Changes: - app_events.rs: Add cleanup_by_connection_id(), prune empty entries in remove_session() - firebolt_gateway.rs: Clean up by both session_id and connection_id on UnregisterSession - thunder_event_processor.rs: Add cleanup_by_app_id() for extension-side event map cleanup - firebolt_ws.rs: Elevate disconnect log from debug! to info! with full context
…ventProcessor Subscription entries. -In request_map and extension_request_map were never removed on disconnect. -ThunderEventProcessor.cleanup_by_app_id() existed but was never called. -Added DeviceEvent::Cleanup to wire ThunderEventProcessor cleanup across the gateway-extension boundary. -Added cleanup_request_maps_for_app() to prune broker maps on disconnect.
There was a problem hiding this comment.
Pull request overview
This PR addresses memory leaks caused by missing cleanup when a Firebolt WebSocket disconnects, ensuring event listeners, broker subscriptions, and related per-session state are removed on disconnect.
Changes:
- Add a
DeviceEvent::Cleanuprequest path and a Thunder-sidecleanup_by_app_idto remove device event subscriptions on disconnect. - Add core-side cleanup for app event listeners (by
session_idand byconnection_id) and prune empty listener maps. - Extend broker cleanup to remove subscription/event entries from
request_mapandextension_request_mapduring app/session cleanup.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| device/thunder_ripple_sdk/src/processors/thunder_events.rs | Handles new DeviceEvent::Cleanup requests and triggers Thunder event subscription cleanup. |
| device/thunder_ripple_sdk/src/events/thunder_event_processor.rs | Ignores synthetic Cleanup as a Thunder event; adds cleanup_by_app_id for listener/map cleanup. |
| core/sdk/src/api/device/device_events.rs | Introduces DeviceEvent::Cleanup, string parsing, and contract mapping for cleanup requests. |
| core/main/src/service/apps/app_events.rs | Prunes empty listener maps and adds cleanup_by_connection_id for disconnect cleanup. |
| core/main/src/firebolt/firebolt_ws.rs | Improves disconnect logging before session unregister. |
| core/main/src/firebolt/firebolt_gateway.rs | Performs broader disconnect cleanup (AppEvents, broker cleanup, session removal) and sends cleanup request to Thunder. |
| core/main/src/broker/endpoint_broker.rs | Adds cleanup of broker request tracking maps on app/session cleanup to prevent map growth. |
Comments suppressed due to low confidence (1)
core/main/src/service/apps/app_events.rs:479
cleanup_by_connection_idis new disconnect cleanup logic, but this file’s test module doesn’t exercise it (and it also doesn’t validate the new removal of empty context/event map entries inremove_session). Adding a unit test that registers listeners withcall_ctx.cidset and verifies they’re removed (and that empty maps are pruned) would help ensure the leak fix stays effective.
/// Cleanup all event listeners matching a given connection_id.
/// This ensures cleanup works even when session_id != connection_id.
pub fn cleanup_by_connection_id(state: &PlatformState, connection_id: &str) {
let mut listeners = state.app_events_state.listeners.write().unwrap();
let all_events = listeners.keys().cloned().collect::<Vec<String>>();
for event_name in all_events {
if let Some(ctx_map) = listeners.get_mut(&event_name) {
let all_contexts = ctx_map.keys().cloned().collect::<Vec<Option<String>>>();
for context in all_contexts {
if let Some(event_listener) = ctx_map.get_mut(&context) {
event_listener.retain(|l| l.call_ctx.cid.as_deref() != Some(connection_id));
}
if ctx_map.get(&context).map_or(false, |v| v.is_empty()) {
ctx_map.remove(&context);
}
}
}
if listeners.get(&event_name).map_or(false, |m| m.is_empty()) {
listeners.remove(&event_name);
}
}
}
}
#[cfg(test)]
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Method to cleanup all subscription on App termination | ||
| pub async fn cleanup_for_app(&self, app_id: &str) { | ||
| let cleaners = { self.cleaner_list.read().unwrap().clone() }; | ||
|
|
||
| for cleaner in cleaners { | ||
| /* | ||
| for now, just eat the error - the return type was mainly added to prepate for future refactoring/testability | ||
| */ | ||
| let _ = cleaner.cleanup_session(app_id).await; | ||
| } | ||
|
|
||
| // Clean up subscription entries from request_map and extension_request_map | ||
| // that belong to this app. These are never removed on disconnect otherwise. | ||
| self.cleanup_request_maps_for_app(app_id); | ||
| } | ||
|
|
||
| /// Remove subscription/event entries from request_map and extension_request_map | ||
| /// for the given app_id (which may be a session_id or connection_id). | ||
| /// Without this, subscription entries in request_map (guarded by is_subscription()) | ||
| /// and event entries in extension_request_map persist forever. | ||
| fn cleanup_request_maps_for_app(&self, app_id: &str) { | ||
| let removed_ids: Vec<u64> = { | ||
| let mut request_map = self.request_map.write().unwrap(); | ||
| let ids_to_remove: Vec<u64> = request_map | ||
| .iter() | ||
| .filter(|(_, req)| { | ||
| req.rpc.ctx.app_id == app_id | ||
| || req.rpc.ctx.session_id == app_id | ||
| || req.rpc.ctx.cid.as_deref() == Some(app_id) | ||
| }) | ||
| .map(|(id, _)| *id) | ||
| .collect(); | ||
| for id in &ids_to_remove { | ||
| request_map.remove(id); | ||
| } | ||
| ids_to_remove | ||
| }; | ||
|
|
||
| if !removed_ids.is_empty() { | ||
| let mut extn_map = self.extension_request_map.write().unwrap(); | ||
| for id in &removed_ids { | ||
| extn_map.remove(id); | ||
| } | ||
| info!( | ||
| "cleanup_request_maps_for_app: removed {} request_map and extension_request_map entries for {}", | ||
| removed_ids.len(), | ||
| app_id | ||
| ); | ||
| } |
There was a problem hiding this comment.
cleanup_request_maps_for_app introduces important disconnect cleanup for request_map/extension_request_map, but there’s no test coverage ensuring entries for a given app/session/cid are actually removed. Since this module already has unit tests, consider adding one that inserts representative BrokerRequest/ExtnMessage entries into both maps and asserts they’re cleaned up by cleanup_for_app().
| let cleanup_request = DeviceEventRequest { | ||
| event: DeviceEvent::Cleanup, | ||
| subscribe: false, | ||
| callback_type: DeviceEventCallback::FireboltAppEvent(cid.clone()), | ||
| }; |
There was a problem hiding this comment.
The ThunderEventProcessor stores listener IDs as the Firebolt app_id (see DeviceEventRequest subscriptions in device_rpc.rs, which use DeviceEventCallback::FireboltAppEvent(ctx.app_id)). Here the cleanup request uses cid as the ID, so cleanup_by_app_id(&id) on the device side will not match and the leak likely persists. Consider deriving the app_id from session_state using cid before clearing it (or pass app_id through UnregisterSession) and use that app_id in the cleanup request’s callback_type.
| @@ -48,6 +51,7 @@ impl FromStr for DeviceEvent { | |||
| match s { | |||
| "device.onHdcpChanged" => Ok(Self::InputChanged), | |||
| "device.onAudioChanged" => Ok(Self::AudioChanged), | |||
| "device.cleanup" => Ok(Self::Cleanup), | |||
| _ => Err(()), | |||
| } | |||
There was a problem hiding this comment.
New DeviceEvent::Cleanup parsing ("device.cleanup") was added, but the unit tests only cover device.onHdcpChanged and an invalid event. Please add test cases for the new Cleanup variant (at least FromStr for "device.cleanup", and ideally DeviceEventRequest::get_contract() for Cleanup) to prevent regressions.
| } | ||
| } | ||
| /// Send a request through the broker and wait for response with a oneshot channel and custom timeout | ||
| pub async fn send_with_response_timeout( |
| .add_session(session_id, session); | ||
| } | ||
| UnregisterSession { session_id, cid } => { | ||
| info!( |
Minimum allowed line rate is |
What
What does this PR add or remove?
Why
Why are these changes needed?
How
How do these changes achieve the goal?
Test
How has this been tested? How can a reviewer test it?
Checklist