diff --git a/bridge/CMakeLists.txt b/bridge/CMakeLists.txt index 393c5612..7c3fda09 100644 --- a/bridge/CMakeLists.txt +++ b/bridge/CMakeLists.txt @@ -11,6 +11,8 @@ add_library(livekit_bridge SHARED src/bridge_video_track.cpp src/bridge_room_delegate.cpp src/bridge_room_delegate.h + src/rpc_manager.cpp + src/rpc_manager.h ) if(WIN32) diff --git a/bridge/README.md b/bridge/README.md index 76b4f2b6..776b196c 100644 --- a/bridge/README.md +++ b/bridge/README.md @@ -44,7 +44,22 @@ bridge.setOnVideoFrameCallback("remote-peer", livekit::TrackSource::SOURCE_CAMER // Called on a background reader thread }); -// 5. Cleanup is automatic (RAII), or explicit: +// 5. RPC (Remote Procedure Call) +bridge.registerRpcMethod("greet", + [](const livekit::RpcInvocationData& data) -> std::optional { + return "Hello, " + data.caller_identity + "!"; + }); + +std::string response = bridge.performRpc("remote-peer", "greet", ""); + +bridge.unregisterRpcMethod("greet"); + +// Controller side: send commands to the publisher +controller_bridge.requestTrackMute("robot-1", "mic"); // mute audio track "mic" +controller_bridge.requestTrackUnmute("robot-1", "mic"); // unmute it +controller_bridge.requestTrackRelease("robot-1", "cam"); // unpublish video track "cam" + +// 7. Cleanup is automatic (RAII), or explicit: mic.reset(); // unpublishes the audio track cam.reset(); // unpublishes the video track bridge.disconnect(); @@ -138,6 +153,12 @@ bridge.connect(url, token, options); | `setOnVideoFrameCallback(identity, source, callback)` | Register a callback for video frames from a specific remote participant + track source. | | `clearOnAudioFrameCallback(identity, source)` | Clear the audio callback for a specific remote participant + track source. Stops and joins the reader thread if active. | | `clearOnVideoFrameCallback(identity, source)` | Clear the video callback for a specific remote participant + track source. Stops and joins the reader thread if active. | +| `performRpc(destination_identity, method, payload, response_timeout?)` | Blocking RPC call to a remote participant. Returns the response payload. Throws `livekit::RpcError` on failure. | +| `registerRpcMethod(method_name, handler)` | Register a handler for incoming RPC invocations. The handler returns an optional response payload or throws `livekit::RpcError`. | +| `unregisterRpcMethod(method_name)` | Unregister a previously registered RPC handler. | +| `requestTrackMute(identity, track_name)` | Ask a remote participant to mute a track by name. Throws `livekit::RpcError` on failure. | +| `requestTrackUnmute(identity, track_name)` | Ask a remote participant to unmute a track by name. Throws `livekit::RpcError` on failure. | +| `requestTrackRelease(identity, track_name)` | Ask a remote participant to release (unpublish) a track by name. Throws `livekit::RpcError` on failure. | ### `BridgeAudioTrack` @@ -240,7 +261,7 @@ The bridge is designed for simplicity and currently only supports limited audio - We dont support all events defined in the RoomDelegate interface. - E2EE configuration -- RPC / data channels / data tracks +- data tracks - Simulcast tuning - Video format selection (RGBA is the default; no format option yet) - Custom `RoomOptions` or `TrackPublishOptions` diff --git a/bridge/include/livekit_bridge/livekit_bridge.h b/bridge/include/livekit_bridge/livekit_bridge.h index df5f3e34..e4df5f20 100644 --- a/bridge/include/livekit_bridge/livekit_bridge.h +++ b/bridge/include/livekit_bridge/livekit_bridge.h @@ -22,7 +22,9 @@ #include "livekit_bridge/bridge_audio_track.h" #include "livekit_bridge/bridge_video_track.h" +#include "livekit/local_participant.h" #include "livekit/room.h" +#include "livekit/rpc_error.h" #include #include @@ -46,6 +48,7 @@ enum class TrackSource; namespace livekit_bridge { class BridgeRoomDelegate; +class RpcManager; namespace test { class CallbackKeyTest; @@ -264,6 +267,90 @@ class LiveKitBridge { void clearOnVideoFrameCallback(const std::string &participant_identity, livekit::TrackSource source); + // --------------------------------------------------------------- + // RPC (Remote Procedure Call) + // --------------------------------------------------------------- + + /** + * Initiate a blocking RPC call to a remote participant. + * + * Sends a request to the participant identified by + * @p destination_identity and blocks until a response is received + * or the call times out. + * + * @param destination_identity Identity of the remote participant. + * @param method Name of the RPC method to invoke. + * @param payload Request payload string. + * @param response_timeout Optional timeout in seconds. If not set, + * the server default (15 s) is used. + * @return The response payload returned by the remote handler. nullptr if the + * RPC call fails, or the bridge is not connected. + */ + std::optional + performRpc(const std::string &destination_identity, const std::string &method, + const std::string &payload, + const std::optional &response_timeout = std::nullopt); + + /** + * Register a handler for incoming RPC method invocations. + * + * When a remote participant calls the given @p method_name on this + * participant, the bridge invokes @p handler. The handler may return + * an optional response payload or throw a @c livekit::RpcError to + * signal failure to the caller. + * + * If a handler is already registered for @p method_name, it is + * silently replaced. + * + * @param method_name Name of the RPC method to handle. + * @param handler Callback invoked on each incoming invocation. + * @return true if the RPC method was registered successfully. + */ + bool registerRpcMethod(const std::string &method_name, + livekit::LocalParticipant::RpcHandler handler); + + /** + * Unregister a previously registered RPC method handler. + * + * After this call, invocations for @p method_name result in an + * "unsupported method" error being returned to the remote caller. + * If no handler is registered for this name, the call is a no-op. + * + * @param method_name Name of the RPC method to unregister. + * @return true if the RPC method was unregistered successfully. + */ + bool unregisterRpcMethod(const std::string &method_name); + + // --------------------------------------------------------------- + // Remote Track Control (via RPC) + // --------------------------------------------------------------- + + /** + * Request a remote participant to mute a published track. + * + * The remote participant must be a LiveKitBridge instance (which + * automatically registers the built-in track-control RPC handler). + * + * @param destination_identity Identity of the remote participant. + * @param track_name Name of the track to mute. + * @return true if the track was muted successfully. + */ + bool requestTrackMute(const std::string &destination_identity, + const std::string &track_name); + + /** + * Request a remote participant to unmute a published track. + * + * The remote participant must be a LiveKitBridge instance (which + * automatically registers the built-in track-control RPC handler). + * + * @param destination_identity Identity of the remote participant. + * @param track_name Name of the track to unmute. + * @return true if the track was unmuted successfully. + */ + bool requestTrackUnmute(const std::string &destination_identity, + const std::string &track_name); + private: friend class BridgeRoomDelegate; friend class test::CallbackKeyTest; @@ -314,6 +401,13 @@ class LiveKitBridge { const std::shared_ptr &track, VideoFrameCallback cb); + /// Execute a track action (mute/unmute/release) by track name. + /// Used as the TrackActionFn callback for RpcManager. + /// Throws livekit::RpcError if the track is not found. + /// @pre Caller does NOT hold mutex_ (acquires it internally). + void executeTrackAction(const std::string &action, + const std::string &track_name); + mutable std::mutex mutex_; bool connected_; bool connecting_; // guards against concurrent connect() calls @@ -323,6 +417,7 @@ class LiveKitBridge { std::unique_ptr room_; std::unique_ptr delegate_; + std::unique_ptr rpc_manager_; /// Registered callbacks (may be registered before tracks are subscribed). std::unordered_map @@ -341,7 +436,6 @@ class LiveKitBridge { std::vector> published_audio_tracks_; /// @copydoc published_audio_tracks_ std::vector> published_video_tracks_; - }; } // namespace livekit_bridge diff --git a/bridge/include/livekit_bridge/rpc_constants.h b/bridge/include/livekit_bridge/rpc_constants.h new file mode 100644 index 00000000..8ebff2ef --- /dev/null +++ b/bridge/include/livekit_bridge/rpc_constants.h @@ -0,0 +1,62 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// @file rpc_constants.h +/// @brief Constants for built-in bridge RPC methods. + +#pragma once + +#include + +namespace livekit_bridge { +namespace rpc { + +/// Built-in RPC method name used by remote track control. +/// Allows remote participants to mute, unmute, or release tracks +/// published by this bridge. Must be called after connect(). +/// Audio/video tracks support mute, unmute, and release. Data tracks +/// only support release (they have no mute/unmute); a mute request on +/// a data track is treated as release, and unmute returns an error. +namespace track_control { + +/// RPC method name registered by the bridge for remote track control. +constexpr const char *kMethod = "lk.bridge.track-control"; + +/// Payload action strings. +constexpr const char *kActionMute = "mute"; +constexpr const char *kActionUnmute = "unmute"; + +/// Delimiter between action and track name in the payload (e.g. "mute:cam"). +constexpr char kDelimiter = ':'; + +/// Response payload returned on success. +constexpr const char *kResponseOk = "ok"; + +/// Build a track-control RPC payload: ":". +inline std::string formatPayload(const char *action, + const std::string &track_name) { + std::string payload; + payload.reserve(std::char_traits::length(action) + 1 + + track_name.size()); + payload += action; + payload += kDelimiter; + payload += track_name; + return payload; +} + +} // namespace track_control +} // namespace rpc +} // namespace livekit_bridge diff --git a/bridge/src/livekit_bridge.cpp b/bridge/src/livekit_bridge.cpp index cfce522b..5b6a0a43 100644 --- a/bridge/src/livekit_bridge.cpp +++ b/bridge/src/livekit_bridge.cpp @@ -19,6 +19,8 @@ #include "livekit_bridge/livekit_bridge.h" #include "bridge_room_delegate.h" +#include "livekit_bridge/rpc_constants.h" +#include "rpc_manager.h" #include "livekit/audio_frame.h" #include "livekit/audio_source.h" @@ -60,7 +62,11 @@ LiveKitBridge::CallbackKeyHash::operator()(const CallbackKey &k) const { // --------------------------------------------------------------- LiveKitBridge::LiveKitBridge() - : connected_(false), connecting_(false), sdk_initialized_(false) {} + : connected_(false), connecting_(false), sdk_initialized_(false), + rpc_manager_(std::make_unique( + [this](const std::string &action, const std::string &track_name) { + executeTrackAction(action, track_name); + })) {} LiveKitBridge::~LiveKitBridge() { disconnect(); } @@ -113,17 +119,29 @@ bool LiveKitBridge::connect(const std::string &url, const std::string &token, auto delegate = std::make_unique(*this); assert(delegate != nullptr); room->setDelegate(delegate.get()); + livekit::LocalParticipant *lp = nullptr; { std::lock_guard lock(mutex_); room_ = std::move(room); delegate_ = std::move(delegate); connected_ = true; connecting_ = false; + + lp = room_->localParticipant(); + assert(lp != nullptr); } + + rpc_manager_->enable(lp); return true; } void LiveKitBridge::disconnect() { + // Disable the RPC manager before tearing down the room. This unregisters + // built-in handlers while the LocalParticipant is still alive. + if (rpc_manager_->isEnabled()) { + rpc_manager_->disable(); + } + // Collect threads to join outside the lock to avoid deadlock. std::vector threads_to_join; bool should_shutdown_sdk = false; @@ -208,10 +226,7 @@ LiveKitBridge::createAudioTrack(const std::string &name, int sample_rate, int num_channels, livekit::TrackSource source) { std::lock_guard lock(mutex_); - if (!connected_ || !room_) { - throw std::runtime_error( - "LiveKitBridge::createAudioTrack: not connected to a room"); - } + assert(connected_ && room_); // 1. Create audio source (real-time mode, queue_size_ms=0) auto audio_source = @@ -225,12 +240,15 @@ LiveKitBridge::createAudioTrack(const std::string &name, int sample_rate, livekit::TrackPublishOptions opts; opts.source = source; - auto publication = room_->localParticipant()->publishTrack(track, opts); + auto lp = room_->localParticipant(); + assert(lp != nullptr); + + auto publication = lp->publishTrack(track, opts); // 4. Wrap in handle and retain a reference auto bridge_track = std::shared_ptr(new BridgeAudioTrack( name, sample_rate, num_channels, std::move(audio_source), - std::move(track), std::move(publication), room_->localParticipant())); + std::move(track), std::move(publication), lp)); published_audio_tracks_.emplace_back(bridge_track); return bridge_track; } @@ -240,10 +258,7 @@ LiveKitBridge::createVideoTrack(const std::string &name, int width, int height, livekit::TrackSource source) { std::lock_guard lock(mutex_); - if (!connected_ || !room_) { - throw std::runtime_error( - "LiveKitBridge::createVideoTrack: not connected to a room"); - } + assert(connected_ && room_); // 1. Create video source auto video_source = std::make_shared(width, height); @@ -256,12 +271,15 @@ LiveKitBridge::createVideoTrack(const std::string &name, int width, int height, livekit::TrackPublishOptions opts; opts.source = source; - auto publication = room_->localParticipant()->publishTrack(track, opts); + auto lp = room_->localParticipant(); + assert(lp != nullptr); + + auto publication = lp->publishTrack(track, opts); // 4. Wrap in handle and retain a reference - auto bridge_track = std::shared_ptr(new BridgeVideoTrack( - name, width, height, std::move(video_source), std::move(track), - std::move(publication), room_->localParticipant())); + auto bridge_track = std::shared_ptr( + new BridgeVideoTrack(name, width, height, std::move(video_source), + std::move(track), std::move(publication), lp)); published_video_tracks_.emplace_back(bridge_track); return bridge_track; } @@ -327,6 +345,152 @@ void LiveKitBridge::clearOnVideoFrameCallback( } } +// --------------------------------------------------------------- +// RPC (delegates to RpcManager) +// --------------------------------------------------------------- + +std::optional +LiveKitBridge::performRpc(const std::string &destination_identity, + const std::string &method, const std::string &payload, + const std::optional &response_timeout) { + + if (!isConnected()) { + return std::nullopt; + } + + try { + return rpc_manager_->performRpc(destination_identity, method, payload, + response_timeout); + } catch (const std::exception &e) { + std::cerr << "[LiveKitBridge] Exception: " << e.what() << "\n"; + return std::nullopt; + } catch (const std::runtime_error &e) { + std::cerr << "[LiveKitBridge] Runtime error: " << e.what() << "\n"; + return std::nullopt; + } catch (const livekit::RpcError &e) { + std::cerr << "[LiveKitBridge] RPC error: " << e.what() << "\n"; + return std::nullopt; + } +} + +bool LiveKitBridge::registerRpcMethod( + const std::string &method_name, + livekit::LocalParticipant::RpcHandler handler) { + + if (!isConnected()) { + return false; + } + try { + rpc_manager_->registerRpcMethod(method_name, std::move(handler)); + return true; + } catch (const std::exception &e) { + std::cerr << "[LiveKitBridge] Exception: " << e.what() << "\n"; + return false; + } catch (const std::runtime_error &e) { + std::cerr << "[LiveKitBridge] Runtime error: " << e.what() << "\n"; + return false; + } catch (const livekit::RpcError &e) { + std::cerr << "[LiveKitBridge] RPC error: " << e.what() << "\n"; + return false; + } +} + +bool LiveKitBridge::unregisterRpcMethod(const std::string &method_name) { + if (!isConnected()) { + return false; + } + try { + rpc_manager_->unregisterRpcMethod(method_name); + return true; + } catch (const std::exception &e) { + std::cerr << "[LiveKitBridge] Exception: " << e.what() << "\n"; + return false; + } catch (const std::runtime_error &e) { + std::cerr << "[LiveKitBridge] Runtime error: " << e.what() << "\n"; + return false; + } catch (const livekit::RpcError &e) { + std::cerr << "[LiveKitBridge] RPC error: " << e.what() << "\n"; + return false; + } +} + +bool LiveKitBridge::requestTrackMute(const std::string &destination_identity, + const std::string &track_name) { + if (!isConnected()) { + return false; + } + try { + rpc_manager_->requestTrackMute(destination_identity, track_name); + return true; + } catch (const std::exception &e) { + std::cerr << "[LiveKitBridge] Exception: " << e.what() << "\n"; + return false; + } catch (const std::runtime_error &e) { + std::cerr << "[LiveKitBridge] Runtime error: " << e.what() << "\n"; + return false; + } catch (const livekit::RpcError &e) { + std::cerr << "[LiveKitBridge] RPC error: " << e.what() << "\n"; + return false; + } +} + +bool LiveKitBridge::requestTrackUnmute(const std::string &destination_identity, + const std::string &track_name) { + if (!isConnected()) { + return false; + } + try { + rpc_manager_->requestTrackUnmute(destination_identity, track_name); + return true; + } catch (const std::exception &e) { + std::cerr << "[LiveKitBridge] Exception: " << e.what() << "\n"; + return false; + } catch (const std::runtime_error &e) { + std::cerr << "[LiveKitBridge] Runtime error: " << e.what() << "\n"; + return false; + } catch (const livekit::RpcError &e) { + std::cerr << "[LiveKitBridge] RPC error: " << e.what() << "\n"; + return false; + } +} + +// --------------------------------------------------------------- +// Track action callback for RpcManager +// --------------------------------------------------------------- + +void LiveKitBridge::executeTrackAction(const std::string &action, + const std::string &track_name) { + namespace tc = rpc::track_control; + std::lock_guard lock(mutex_); + + assert(action == tc::kActionMute || action == tc::kActionUnmute); + + for (auto &track : published_audio_tracks_) { + if (track->name() == track_name && !track->isReleased()) { + if (action == tc::kActionMute) { + track->mute(); + } else { + track->unmute(); + } + return; + } + } + + for (auto &track : published_video_tracks_) { + if (track->name() == track_name && !track->isReleased()) { + if (action == tc::kActionMute) { + track->mute(); + } else { + track->unmute(); + } + return; + } + } + + throw livekit::RpcError(livekit::RpcError::ErrorCode::APPLICATION_ERROR, + "track not found: " + track_name); +} + // --------------------------------------------------------------- // Internal: track subscribe / unsubscribe from delegate // --------------------------------------------------------------- diff --git a/bridge/src/rpc_manager.cpp b/bridge/src/rpc_manager.cpp new file mode 100644 index 00000000..6f792ea2 --- /dev/null +++ b/bridge/src/rpc_manager.cpp @@ -0,0 +1,140 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// @file rpc_manager.cpp +/// @brief Implementation of RpcManager. + +#include "rpc_manager.h" +#include "livekit_bridge/rpc_constants.h" + +#include "livekit/local_participant.h" +#include "livekit/rpc_error.h" + +#include +#include + +namespace livekit_bridge { + +RpcManager::RpcManager(TrackActionFn track_action_fn) + : track_action_fn_(std::move(track_action_fn)) {} + +void RpcManager::enable(livekit::LocalParticipant *lp) { + assert(lp != nullptr); + lp_ = lp; + enableBuiltInHandlers(); +} + +void RpcManager::disable() { + if (lp_) { + disableBuiltInHandlers(); + } + lp_ = nullptr; +} + +// --------------------------------------------------------------- +// Generic RPC +// --------------------------------------------------------------- + +std::string +RpcManager::performRpc(const std::string &destination_identity, + const std::string &method, const std::string &payload, + const std::optional &response_timeout) { + assert(lp_ != nullptr); + return lp_->performRpc(destination_identity, method, payload, + response_timeout); +} + +// --------------------------------------------------------------- +// User-registered handlers +// --------------------------------------------------------------- + +void RpcManager::registerRpcMethod( + const std::string &method_name, + livekit::LocalParticipant::RpcHandler handler) { + assert(lp_ != nullptr); + lp_->registerRpcMethod(method_name, std::move(handler)); +} + +void RpcManager::unregisterRpcMethod(const std::string &method_name) { + assert(lp_ != nullptr); + lp_->unregisterRpcMethod(method_name); +} + +// --------------------------------------------------------------- +// Built-in outgoing convenience (track control) +// --------------------------------------------------------------- + +void RpcManager::requestTrackMute(const std::string &destination_identity, + const std::string &track_name) { + namespace tc = rpc::track_control; + performRpc(destination_identity, tc::kMethod, + tc::formatPayload(tc::kActionMute, track_name), std::nullopt); +} + +void RpcManager::requestTrackUnmute(const std::string &destination_identity, + const std::string &track_name) { + namespace tc = rpc::track_control; + performRpc(destination_identity, tc::kMethod, + tc::formatPayload(tc::kActionUnmute, track_name), std::nullopt); +} + +// --------------------------------------------------------------- +// Built-in handler registration +// --------------------------------------------------------------- + +void RpcManager::enableBuiltInHandlers() { + assert(lp_ != nullptr); + lp_->registerRpcMethod(rpc::track_control::kMethod, + [this](const livekit::RpcInvocationData &data) + -> std::optional { + return handleTrackControlRpc(data); + }); +} + +void RpcManager::disableBuiltInHandlers() { + assert(lp_ != nullptr); + lp_->unregisterRpcMethod(rpc::track_control::kMethod); +} + +// --------------------------------------------------------------- +// Built-in handler: track control +// --------------------------------------------------------------- + +std::optional +RpcManager::handleTrackControlRpc(const livekit::RpcInvocationData &data) { + namespace tc = rpc::track_control; + + std::cout << "[RpcManager] Handling track control RPC: " << data.payload + << "\n"; + auto delim = data.payload.find(tc::kDelimiter); + if (delim == std::string::npos || delim == 0) { + throw livekit::RpcError( + livekit::RpcError::ErrorCode::APPLICATION_ERROR, + "invalid payload format, expected \":\""); + } + std::string action = data.payload.substr(0, delim); + std::string track_name = data.payload.substr(delim + 1); + + if (action != tc::kActionMute && action != tc::kActionUnmute) { + throw livekit::RpcError(livekit::RpcError::ErrorCode::APPLICATION_ERROR, + "unknown action: " + action); + } + + track_action_fn_(action, track_name); + return tc::kResponseOk; +} + +} // namespace livekit_bridge diff --git a/bridge/src/rpc_manager.h b/bridge/src/rpc_manager.h new file mode 100644 index 00000000..a43b5c67 --- /dev/null +++ b/bridge/src/rpc_manager.h @@ -0,0 +1,144 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// @file rpc_manager.h +/// @brief Internal RPC manager that owns all RPC concerns for the bridge. + +#pragma once + +#include "livekit/local_participant.h" + +#include +#include +#include +#include + +namespace livekit { +struct RpcInvocationData; +} // namespace livekit + +namespace livekit_bridge { + +namespace test { +class RpcManagerTest; +} // namespace test + +/** + * Owns all RPC concerns for the LiveKitBridge: built-in handler registration + * and dispatch, user-registered custom handlers, and outgoing RPC calls. + * + * The manager is bound to a LocalParticipant via enable() and unbound via + * disable(). All public methods require the manager to be enabled (i.e., + * enable() has been called and disable() has not). + * + * Built-in handlers (e.g., track-control) are automatically registered on + * enable() and unregistered on disable(). User-registered handlers are + * forwarded directly to the underlying LocalParticipant. + * + * Not part of the public API; lives in bridge/src/. + */ +class RpcManager { +public: + /// Callback the bridge provides to execute a track action + /// (mute/unmute/release). Throws livekit::RpcError if the track is not found + /// or the action is invalid. + using TrackActionFn = std::function; + + explicit RpcManager(TrackActionFn track_action_fn); + + /// Bind to a LocalParticipant and register all built-in RPC handlers. + /// @pre @p lp must be non-null and remain valid until disable() is called. + void enable(livekit::LocalParticipant *lp); + + /// Unregister built-in handlers and unbind from the LocalParticipant. + void disable(); + + /// Whether the manager is currently bound to a LocalParticipant. + bool isEnabled() const { return lp_ != nullptr; } + + // -- Generic RPC -- + + /// @brief Perform an RPC call to a remote participant. + /// @param destination_identity Identity of the destination participant. + /// @param method Name of the RPC method to invoke. + /// @param payload Request payload to send to the remote + /// handler. + /// @param response_timeout Optional timeout in seconds for receiving + /// a response. If not set, the server default + /// timeout (15 seconds) is used. + /// @return The response payload returned by the remote handler. + /// @throws if the LocalParticipant performRpc fails. + std::string performRpc(const std::string &destination_identity, + const std::string &method, const std::string &payload, + const std::optional &response_timeout); + + // -- User-registered handlers -- + /// @brief Register a handler for an incoming RPC method. + /// @param method_name Name of the RPC method to handle. + /// @param handler Callback to execute when an invocation is received. + /// The handler may return an optional response payload + /// or throw an RpcError to signal failure. + /// @throws if the LocalParticipant registerRpcMethod fails. + void registerRpcMethod(const std::string &method_name, + livekit::LocalParticipant::RpcHandler handler); + + /// @brief Unregister a handler for an incoming RPC method. + /// @param method_name Name of the RPC method to unregister. + /// @throws if the LocalParticipant unregisterRpcMethod fails. + void unregisterRpcMethod(const std::string &method_name); + + // -- Built-in outgoing convenience (track control) -- + + /// @brief Request a remote participant to mute a published track. + /// @param destination_identity Identity of the remote participant. + /// @param track_name Name of the track to mute. + /// @throws if the LocalParticipant requestTrackMute fails. + void requestTrackMute(const std::string &destination_identity, + const std::string &track_name); + /// @brief Request a remote participant to unmute a published track. + /// @param destination_identity Identity of the remote participant. + /// @param track_name Name of the track to unmute. + /// @throws if the LocalParticipant requestTrackUnmute fails. + void requestTrackUnmute(const std::string &destination_identity, + const std::string &track_name); + +private: + friend class test::RpcManagerTest; + + /// @brief Enable built-in handlers. + /// @throws if the LocalParticipant registerRpcMethod fails. + void enableBuiltInHandlers(); + + /// @brief Disable built-in handlers. + /// @throws if the LocalParticipant unregisterRpcMethod fails. + void disableBuiltInHandlers(); + + /// @brief Handle a track control RPC. + /// @param data The RPC invocation data. + /// @return The response payload returned by the remote handler. + /// @throws if the RPC is invalid or the track is not found. + std::optional + handleTrackControlRpc(const livekit::RpcInvocationData &data); + + /// Callback to execute a track action RPC + TrackActionFn track_action_fn_; + + /// The LocalParticipant bound to the manager. + livekit::LocalParticipant *lp_ = nullptr; +}; + +} // namespace livekit_bridge diff --git a/bridge/tests/CMakeLists.txt b/bridge/tests/CMakeLists.txt index 258681e2..c42274b8 100644 --- a/bridge/tests/CMakeLists.txt +++ b/bridge/tests/CMakeLists.txt @@ -37,6 +37,11 @@ if(BRIDGE_TEST_SOURCES) ${BRIDGE_TEST_SOURCES} ) + target_include_directories(livekit_bridge_tests + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/../src + ) + target_link_libraries(livekit_bridge_tests PRIVATE livekit_bridge diff --git a/bridge/tests/integration/test_bridge_rpc_roundtrip.cpp b/bridge/tests/integration/test_bridge_rpc_roundtrip.cpp new file mode 100644 index 00000000..96a9d1a9 --- /dev/null +++ b/bridge/tests/integration/test_bridge_rpc_roundtrip.cpp @@ -0,0 +1,305 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../common/bridge_test_common.h" +#include + +namespace livekit_bridge { +namespace test { + +class BridgeRpcRoundtripTest : public BridgeTestBase {}; + +// --------------------------------------------------------------------------- +// Test 1: Basic RPC round-trip through the bridge. +// +// Receiver registers an "echo" handler, caller performs an RPC call, and the +// response is verified. +// --------------------------------------------------------------------------- +TEST_F(BridgeRpcRoundtripTest, BasicRpcRoundTrip) { + skipIfNotConfigured(); + + std::cout << "\n=== Bridge RPC Round-Trip Test ===" << std::endl; + + LiveKitBridge caller; + LiveKitBridge receiver; + + ASSERT_TRUE(connectPair(caller, receiver)); + + const std::string receiver_identity = "rpc-receiver"; + + std::atomic rpc_calls_received{0}; + receiver.registerRpcMethod( + "echo", + [&rpc_calls_received](const livekit::RpcInvocationData &data) + -> std::optional { + rpc_calls_received++; + size_t checksum = 0; + for (char c : data.payload) { + checksum += static_cast(c); + } + return "echo:" + std::to_string(data.payload.size()) + ":" + + std::to_string(checksum); + }); + + std::cout << "RPC handler registered, performing call..." << std::endl; + + std::string test_payload = "hello from bridge"; + std::string response = + caller.performRpc(receiver_identity, "echo", test_payload, 10.0); + + size_t expected_checksum = 0; + for (char c : test_payload) { + expected_checksum += static_cast(c); + } + std::string expected_response = + "echo:" + std::to_string(test_payload.size()) + ":" + + std::to_string(expected_checksum); + + std::cout << "Response: " << response << std::endl; + std::cout << "Expected: " << expected_response << std::endl; + + EXPECT_EQ(response, expected_response); + EXPECT_EQ(rpc_calls_received.load(), 1); + + receiver.unregisterRpcMethod("echo"); +} + +// --------------------------------------------------------------------------- +// Test 2: RPC error propagation. +// +// The handler throws an RpcError with a custom code and message. The caller +// should catch the same error code, message, and data. +// --------------------------------------------------------------------------- +TEST_F(BridgeRpcRoundtripTest, RpcErrorPropagation) { + skipIfNotConfigured(); + + std::cout << "\n=== Bridge RPC Error Propagation Test ===" << std::endl; + + LiveKitBridge caller; + LiveKitBridge receiver; + + ASSERT_TRUE(connectPair(caller, receiver)); + + const std::string receiver_identity = "rpc-receiver"; + + receiver.registerRpcMethod( + "fail-method", + [](const livekit::RpcInvocationData &) -> std::optional { + throw livekit::RpcError(livekit::RpcError::ErrorCode::APPLICATION_ERROR, + "intentional failure", "extra-data"); + }); + + std::cout << "Calling method that throws RpcError..." << std::endl; + + try { + caller.performRpc(receiver_identity, "fail-method", "", 10.0); + FAIL() << "Expected RpcError to be thrown"; + } catch (const livekit::RpcError &e) { + std::cout << "Caught RpcError: code=" << e.code() << " message=\"" + << e.message() << "\"" + << " data=\"" << e.data() << "\"" << std::endl; + + EXPECT_EQ(static_cast(e.code()), + livekit::RpcError::ErrorCode::APPLICATION_ERROR); + EXPECT_EQ(e.message(), "intentional failure"); + EXPECT_EQ(e.data(), "extra-data"); + } + + receiver.unregisterRpcMethod("fail-method"); +} + +// --------------------------------------------------------------------------- +// Test 3: Calling an unregistered method returns UNSUPPORTED_METHOD. +// --------------------------------------------------------------------------- +TEST_F(BridgeRpcRoundtripTest, UnregisteredMethod) { + skipIfNotConfigured(); + + std::cout << "\n=== Bridge RPC Unsupported Method Test ===" << std::endl; + + LiveKitBridge caller; + LiveKitBridge receiver; + + ASSERT_TRUE(connectPair(caller, receiver)); + + const std::string receiver_identity = "rpc-receiver"; + + std::cout << "Calling nonexistent method..." << std::endl; + + try { + caller.performRpc(receiver_identity, "nonexistent-method", "", 5.0); + FAIL() << "Expected RpcError for unsupported method"; + } catch (const livekit::RpcError &e) { + std::cout << "Caught RpcError: code=" << e.code() << " message=\"" + << e.message() << "\"" << std::endl; + + EXPECT_EQ(static_cast(e.code()), + livekit::RpcError::ErrorCode::UNSUPPORTED_METHOD); + } +} + +// =========================================================================== +// Remote Track Control Tests +// =========================================================================== + +class BridgeRemoteTrackControlTest : public BridgeTestBase {}; + +// --------------------------------------------------------------------------- +// Test 4: Remote mute of an audio track. +// +// Publisher creates an audio track, enables remote track control. Controller +// requests mute, then unmute. +// --------------------------------------------------------------------------- +TEST_F(BridgeRemoteTrackControlTest, RemoteMuteAudioTrack) { + skipIfNotConfigured(); + + std::cout << "\n=== Bridge Remote Mute Audio Track Test ===" << std::endl; + + LiveKitBridge publisher; + LiveKitBridge controller; + + ASSERT_TRUE(connectPair(controller, publisher)); + + const std::string publisher_identity = "rpc-receiver"; + + auto audio_track = publisher.createAudioTrack( + "mic", 48000, 1, livekit::TrackSource::SOURCE_MICROPHONE); + ASSERT_NE(audio_track, nullptr); + + std::this_thread::sleep_for(2s); + + std::cout << "Requesting mute..." << std::endl; + EXPECT_NO_THROW(controller.requestTrackMute(publisher_identity, "mic")); + + std::vector silence(480, 0); + bool pushed_while_muted = audio_track->pushFrame(silence, 480); + std::cout << "pushFrame while muted: " << pushed_while_muted << std::endl; + + std::cout << "Requesting unmute..." << std::endl; + EXPECT_NO_THROW(controller.requestTrackUnmute(publisher_identity, "mic")); + + bool pushed_after_unmute = audio_track->pushFrame(silence, 480); + EXPECT_TRUE(pushed_after_unmute); + std::cout << "pushFrame after unmute: " << pushed_after_unmute << std::endl; + + audio_track->release(); +} + +// --------------------------------------------------------------------------- +// Test 5: Remote mute of a video track. +// --------------------------------------------------------------------------- +TEST_F(BridgeRemoteTrackControlTest, RemoteMuteVideoTrack) { + skipIfNotConfigured(); + + std::cout << "\n=== Bridge Remote Mute Video Track Test ===" << std::endl; + + LiveKitBridge publisher; + LiveKitBridge controller; + + ASSERT_TRUE(connectPair(controller, publisher)); + + const std::string publisher_identity = "rpc-receiver"; + + auto video_track = publisher.createVideoTrack( + "cam", 320, 240, livekit::TrackSource::SOURCE_CAMERA); + ASSERT_NE(video_track, nullptr); + + std::this_thread::sleep_for(2s); + + std::cout << "Requesting mute on video track..." << std::endl; + EXPECT_NO_THROW(controller.requestTrackMute(publisher_identity, "cam")); + + std::cout << "Requesting unmute on video track..." << std::endl; + EXPECT_NO_THROW(controller.requestTrackUnmute(publisher_identity, "cam")); + + std::vector frame(320 * 240 * 4, 128); + bool pushed_after_unmute = video_track->pushFrame(frame); + EXPECT_TRUE(pushed_after_unmute); + std::cout << "pushFrame after unmute: " << pushed_after_unmute << std::endl; + + video_track->release(); +} + +// --------------------------------------------------------------------------- +// Test 6: Remote release of a data track. +// +// Data tracks have no mute/unmute, only release. +// --------------------------------------------------------------------------- +TEST_F(BridgeRemoteTrackControlTest, RemoteReleaseDataTrack) { + skipIfNotConfigured(); + + std::cout << "\n=== Bridge Remote Release Data Track Test ===" << std::endl; + + LiveKitBridge publisher; + LiveKitBridge controller; + + ASSERT_TRUE(connectPair(controller, publisher)); + + const std::string publisher_identity = "rpc-receiver"; + + auto data_track = publisher.createDataTrack("sensor-data"); + ASSERT_NE(data_track, nullptr); + ASSERT_TRUE(data_track->isPublished()); + + std::this_thread::sleep_for(2s); + + std::cout << "Requesting release on data track..." << std::endl; + EXPECT_NO_THROW( + controller.requestTrackRelease(publisher_identity, "sensor-data")); + + std::this_thread::sleep_for(500ms); + + EXPECT_TRUE(data_track->isReleased()); + + std::vector payload{0x01, 0x02}; + bool pushed = data_track->pushFrame(payload); + EXPECT_FALSE(pushed) << "pushFrame should fail after remote release"; +} + +// --------------------------------------------------------------------------- +// Test 7: Remote mute on a nonexistent track returns an error. +// --------------------------------------------------------------------------- +TEST_F(BridgeRemoteTrackControlTest, RemoteMuteNonexistentTrack) { + skipIfNotConfigured(); + + std::cout << "\n=== Bridge Remote Mute Nonexistent Track Test ===" + << std::endl; + + LiveKitBridge publisher; + LiveKitBridge controller; + + ASSERT_TRUE(connectPair(controller, publisher)); + + const std::string publisher_identity = "rpc-receiver"; + + std::this_thread::sleep_for(2s); + + std::cout << "Requesting mute on nonexistent track..." << std::endl; + try { + controller.requestTrackMute(publisher_identity, "no-such-track"); + FAIL() << "Expected RpcError for nonexistent track"; + } catch (const livekit::RpcError &e) { + std::cout << "Caught RpcError: code=" << e.code() << " message=\"" + << e.message() << "\"" << std::endl; + + EXPECT_EQ(static_cast(e.code()), + livekit::RpcError::ErrorCode::APPLICATION_ERROR); + EXPECT_NE(e.message().find("track not found"), std::string::npos) + << "Error message should mention 'track not found'"; + } +} + +} // namespace test +} // namespace livekit_bridge diff --git a/bridge/tests/test_rpc_manager.cpp b/bridge/tests/test_rpc_manager.cpp new file mode 100644 index 00000000..5666c58c --- /dev/null +++ b/bridge/tests/test_rpc_manager.cpp @@ -0,0 +1,274 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// @file test_rpc_manager.cpp +/// @brief Unit tests for RpcManager. + +#include + +#include "livekit_bridge/rpc_constants.h" +#include "rpc_manager.h" + +#include "livekit/local_participant.h" +#include "livekit/rpc_error.h" + +#include +#include + +namespace livekit_bridge { +namespace test { + +// Records (action, track_name) pairs passed to the TrackActionFn callback. +struct TrackActionRecord { + std::string action; + std::string track_name; +}; + +class RpcManagerTest : public ::testing::Test { +protected: + std::vector recorded_actions_; + + std::unique_ptr makeManager() { + return std::make_unique( + [this](const std::string &action, const std::string &track_name) { + recorded_actions_.push_back({action, track_name}); + }); + } + + std::unique_ptr makeThrowingManager() { + return std::make_unique([](const std::string &, + const std::string &track_name) { + throw livekit::RpcError(livekit::RpcError::ErrorCode::APPLICATION_ERROR, + "track not found: " + track_name); + }); + } + + // Helper: call the private handleTrackControlRpc with a given payload. + std::optional + callHandler(RpcManager &mgr, const std::string &payload, + const std::string &caller = "test-caller") { + livekit::RpcInvocationData data; + data.request_id = "test-request-id"; + data.caller_identity = caller; + data.payload = payload; + data.response_timeout_sec = 10.0; + return mgr.handleTrackControlRpc(data); + } +}; + +// ============================================================================ +// Construction & lifecycle +// ============================================================================ + +TEST_F(RpcManagerTest, InitiallyDisabled) { + auto mgr = makeManager(); + EXPECT_FALSE(mgr->isEnabled()); +} + +TEST_F(RpcManagerTest, DisableOnAlreadyDisabledIsNoOp) { + auto mgr = makeManager(); + EXPECT_NO_THROW(mgr->disable()); + EXPECT_FALSE(mgr->isEnabled()); +} + +TEST_F(RpcManagerTest, DisableMultipleTimesIsIdempotent) { + auto mgr = makeManager(); + EXPECT_NO_THROW({ + mgr->disable(); + mgr->disable(); + mgr->disable(); + }); +} + +TEST_F(RpcManagerTest, DestructorWithoutEnableIsSafe) { + EXPECT_NO_THROW({ auto mgr = makeManager(); }); +} + +// ============================================================================ +// handleTrackControlRpc — payload parsing +// ============================================================================ + +TEST_F(RpcManagerTest, ValidMutePayload) { + auto mgr = makeManager(); + auto result = callHandler(*mgr, "mute:my-track"); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), rpc::track_control::kResponseOk); + + ASSERT_EQ(recorded_actions_.size(), 1u); + EXPECT_EQ(recorded_actions_[0].action, "mute"); + EXPECT_EQ(recorded_actions_[0].track_name, "my-track"); +} + +TEST_F(RpcManagerTest, ValidUnmutePayload) { + auto mgr = makeManager(); + auto result = callHandler(*mgr, "unmute:cam"); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value(), rpc::track_control::kResponseOk); + + ASSERT_EQ(recorded_actions_.size(), 1u); + EXPECT_EQ(recorded_actions_[0].action, "unmute"); + EXPECT_EQ(recorded_actions_[0].track_name, "cam"); +} + +TEST_F(RpcManagerTest, TrackNameWithColons) { + auto mgr = makeManager(); + auto result = callHandler(*mgr, "mute:track:with:colons"); + + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(recorded_actions_.size(), 1u); + EXPECT_EQ(recorded_actions_[0].action, "mute"); + EXPECT_EQ(recorded_actions_[0].track_name, "track:with:colons"); +} + +TEST_F(RpcManagerTest, TrackNameWithSpaces) { + auto mgr = makeManager(); + auto result = callHandler(*mgr, "unmute:my track name"); + + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(recorded_actions_.size(), 1u); + EXPECT_EQ(recorded_actions_[0].action, "unmute"); + EXPECT_EQ(recorded_actions_[0].track_name, "my track name"); +} + +// ============================================================================ +// handleTrackControlRpc — invalid payloads +// ============================================================================ + +TEST_F(RpcManagerTest, EmptyPayloadThrows) { + auto mgr = makeManager(); + EXPECT_THROW(callHandler(*mgr, ""), livekit::RpcError); + EXPECT_TRUE(recorded_actions_.empty()); +} + +TEST_F(RpcManagerTest, NoDelimiterThrows) { + auto mgr = makeManager(); + EXPECT_THROW(callHandler(*mgr, "mutetrack"), livekit::RpcError); + EXPECT_TRUE(recorded_actions_.empty()); +} + +TEST_F(RpcManagerTest, LeadingDelimiterThrows) { + auto mgr = makeManager(); + EXPECT_THROW(callHandler(*mgr, ":track"), livekit::RpcError); + EXPECT_TRUE(recorded_actions_.empty()); +} + +TEST_F(RpcManagerTest, UnknownActionThrows) { + auto mgr = makeManager(); + EXPECT_THROW(callHandler(*mgr, "pause:cam"), livekit::RpcError); + EXPECT_TRUE(recorded_actions_.empty()); +} + +TEST_F(RpcManagerTest, CaseSensitiveAction) { + auto mgr = makeManager(); + EXPECT_THROW(callHandler(*mgr, "MUTE:cam"), livekit::RpcError); + EXPECT_THROW(callHandler(*mgr, "Mute:cam"), livekit::RpcError); + EXPECT_TRUE(recorded_actions_.empty()); +} + +// ============================================================================ +// handleTrackControlRpc — TrackActionFn propagation +// ============================================================================ + +TEST_F(RpcManagerTest, TrackActionFnExceptionPropagates) { + auto mgr = makeThrowingManager(); + + try { + callHandler(*mgr, "mute:nonexistent"); + FAIL() << "Expected RpcError to propagate from TrackActionFn"; + } catch (const livekit::RpcError &e) { + EXPECT_EQ(e.code(), static_cast( + livekit::RpcError::ErrorCode::APPLICATION_ERROR)); + EXPECT_NE(std::string(e.message()).find("nonexistent"), std::string::npos) + << "Error message should contain the track name"; + } +} + +TEST_F(RpcManagerTest, MultipleCallsAccumulate) { + auto mgr = makeManager(); + + callHandler(*mgr, "mute:audio"); + callHandler(*mgr, "unmute:audio"); + callHandler(*mgr, "mute:video"); + + ASSERT_EQ(recorded_actions_.size(), 3u); + EXPECT_EQ(recorded_actions_[0].action, "mute"); + EXPECT_EQ(recorded_actions_[0].track_name, "audio"); + EXPECT_EQ(recorded_actions_[1].action, "unmute"); + EXPECT_EQ(recorded_actions_[1].track_name, "audio"); + EXPECT_EQ(recorded_actions_[2].action, "mute"); + EXPECT_EQ(recorded_actions_[2].track_name, "video"); +} + +// ============================================================================ +// handleTrackControlRpc — caller identity forwarded +// ============================================================================ + +TEST_F(RpcManagerTest, CallerIdentityPassedThrough) { + auto mgr = makeManager(); + livekit::RpcInvocationData data; + data.request_id = "req-1"; + data.caller_identity = "remote-robot"; + data.payload = "mute:mic"; + data.response_timeout_sec = 5.0; + + auto result = mgr->handleTrackControlRpc(data); + + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(recorded_actions_.size(), 1u); + EXPECT_EQ(recorded_actions_[0].action, "mute"); + EXPECT_EQ(recorded_actions_[0].track_name, "mic"); +} + +// ============================================================================ +// rpc_constants — formatPayload +// ============================================================================ + +TEST_F(RpcManagerTest, FormatPayloadMute) { + namespace tc = rpc::track_control; + std::string payload = tc::formatPayload(tc::kActionMute, "cam"); + EXPECT_EQ(payload, "mute:cam"); +} + +TEST_F(RpcManagerTest, FormatPayloadUnmute) { + namespace tc = rpc::track_control; + std::string payload = tc::formatPayload(tc::kActionUnmute, "mic"); + EXPECT_EQ(payload, "unmute:mic"); +} + +TEST_F(RpcManagerTest, FormatPayloadEmptyTrackName) { + namespace tc = rpc::track_control; + std::string payload = tc::formatPayload(tc::kActionMute, ""); + EXPECT_EQ(payload, "mute:"); +} + +TEST_F(RpcManagerTest, FormatPayloadRoundTrip) { + namespace tc = rpc::track_control; + std::string track_name = "some-track-123"; + std::string payload = tc::formatPayload(tc::kActionMute, track_name); + + auto mgr = makeManager(); + auto result = callHandler(*mgr, payload); + + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(recorded_actions_.size(), 1u); + EXPECT_EQ(recorded_actions_[0].action, tc::kActionMute); + EXPECT_EQ(recorded_actions_[0].track_name, track_name); +} + +} // namespace test +} // namespace livekit_bridge diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 5494a4b5..0ace322f 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -199,9 +199,45 @@ target_include_directories(BridgeHuman PRIVATE ) target_link_libraries(BridgeHuman PRIVATE livekit_bridge SDL3::SDL3) -# Copy SDL3 shared library to bridge_human_robot example output directories +# --- bridge_rpc examples (headless custom RPC caller + receiver) --- + +add_executable(BridgeRpcCaller + bridge_rpc/custom_caller.cpp +) +target_link_libraries(BridgeRpcCaller PRIVATE livekit_bridge) + +add_executable(BridgeRpcReceiver + bridge_rpc/custom_receiver.cpp +) +target_link_libraries(BridgeRpcReceiver PRIVATE livekit_bridge) + +# --- bridge_mute_unmute examples (caller uses SDL3 for A/V playback; receiver is headless) --- + +add_executable(BridgeMuteCaller + bridge_mute_unmute/caller.cpp + ${EXAMPLES_COMMON_DIR}/sdl_media.cpp + ${EXAMPLES_COMMON_DIR}/sdl_media.h +) +target_include_directories(BridgeMuteCaller PRIVATE + ${EXAMPLES_PRIVATE_INCLUDE_DIRS} + ${EXAMPLES_COMMON_DIR} +) +target_link_libraries(BridgeMuteCaller PRIVATE livekit_bridge SDL3::SDL3) + +add_executable(BridgeMuteReceiver + bridge_mute_unmute/receiver.cpp + ${EXAMPLES_COMMON_DIR}/sdl_media.cpp + ${EXAMPLES_COMMON_DIR}/sdl_media.h +) +target_include_directories(BridgeMuteReceiver PRIVATE + ${EXAMPLES_PRIVATE_INCLUDE_DIRS} + ${EXAMPLES_COMMON_DIR} +) +target_link_libraries(BridgeMuteReceiver PRIVATE livekit_bridge SDL3::SDL3) + +# Copy SDL3 shared library to bridge example output directories if(UNIX AND NOT APPLE) - foreach(_target BridgeRobot BridgeHuman) + foreach(_target BridgeRobot BridgeHuman BridgeMuteCaller BridgeMuteReceiver) add_custom_command(TARGET ${_target} POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different "$" @@ -214,7 +250,7 @@ if(UNIX AND NOT APPLE) ) endforeach() else() - foreach(_target BridgeRobot BridgeHuman) + foreach(_target BridgeRobot BridgeHuman BridgeMuteCaller BridgeMuteReceiver) add_custom_command(TARGET ${_target} POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different "$" @@ -236,7 +272,7 @@ if(WIN32) ) # Copy DLLs to each example's output directory - foreach(EXAMPLE SimpleRoom SimpleRpc SimpleRobot SimpleHuman SimpleDataStream BridgeRobot BridgeHuman) + foreach(EXAMPLE SimpleRoom SimpleRpc SimpleRobot SimpleHuman SimpleDataStream BridgeRobot BridgeHuman BridgeMuteCaller BridgeMuteReceiver BridgeRpcCaller BridgeRpcReceiver) foreach(DLL ${REQUIRED_DLLS}) add_custom_command(TARGET ${EXAMPLE} POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different @@ -248,8 +284,8 @@ if(WIN32) endforeach() endforeach() - # bridge_human_robot examples also need livekit_bridge.dll - foreach(EXAMPLE BridgeRobot BridgeHuman) + # Bridge examples also need livekit_bridge.dll + foreach(EXAMPLE BridgeRobot BridgeHuman BridgeMuteCaller BridgeMuteReceiver BridgeRpcCaller BridgeRpcReceiver) add_custom_command(TARGET ${EXAMPLE} POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different "$" @@ -271,7 +307,7 @@ if(UNIX) endif() # Copy shared library to each example's output directory - foreach(EXAMPLE SimpleRoom SimpleRpc SimpleRobot SimpleHuman SimpleDataStream BridgeRobot BridgeHuman) + foreach(EXAMPLE SimpleRoom SimpleRpc SimpleRobot SimpleHuman SimpleDataStream BridgeRobot BridgeHuman BridgeMuteCaller BridgeMuteReceiver BridgeRpcCaller BridgeRpcReceiver) add_custom_command(TARGET ${EXAMPLE} POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different "${LIVEKIT_LIB_DIR}/${FFI_SHARED_LIB}" @@ -281,8 +317,8 @@ if(UNIX) ) endforeach() - # bridge_human_robot examples also need livekit_bridge shared library - foreach(EXAMPLE BridgeRobot BridgeHuman) + # Bridge examples also need livekit_bridge shared library + foreach(EXAMPLE BridgeRobot BridgeHuman BridgeMuteCaller BridgeMuteReceiver BridgeRpcCaller BridgeRpcReceiver) add_custom_command(TARGET ${EXAMPLE} POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different "$" diff --git a/examples/bridge_mute_unmute/README.md b/examples/bridge_mute_unmute/README.md new file mode 100644 index 00000000..8590297e --- /dev/null +++ b/examples/bridge_mute_unmute/README.md @@ -0,0 +1,94 @@ +# Bridge Mute/Unmute Example + +Demonstrates remote track control using the `LiveKitBridge` built-in +track-control RPC. A **receiver** publishes audio and video tracks, and a +**caller** subscribes to them and toggles mute/unmute every few seconds. + +## How it works + +| Executable | Role | +|-----------------------|------| +| **BridgeMuteReceiver** | Publishes an audio track (`"mic"`) and a video track (`"cam"`) using SDL3 hardware capture when available, falling back to silence and solid-color frames otherwise. The bridge automatically registers a built-in `lk.bridge.track-control` RPC handler on connect. | +| **BridgeMuteCaller** | Subscribes to the receiver's mic and cam tracks, renders them via SDL3 (speaker + window), and periodically calls `requestTrackMute` / `requestTrackUnmute` to toggle both tracks. | + +When the caller mutes a track, the receiver's `LocalAudioTrack::mute()` or +`LocalVideoTrack::mute()` is invoked via RPC, which signals the LiveKit +server to stop forwarding that track's media. The caller's audio goes +silent and the video freezes on the last received frame. On unmute, media +delivery resumes. + +## Running + +Generate two tokens for the same room with different identities: + +```bash +lk token create --join --room my-room --identity receiver --valid-for 24h +lk token create --join --room my-room --identity caller --valid-for 24h +``` + +Start the receiver first, then the caller: + +```bash +# Terminal 1 +LIVEKIT_URL=wss://... LIVEKIT_TOKEN= ./build-release/bin/BridgeMuteReceiver + +# Terminal 2 +LIVEKIT_URL=wss://... LIVEKIT_TOKEN= ./build-release/bin/BridgeMuteCaller +``` + +The caller also accepts an optional third argument for the receiver's +identity (defaults to `"receiver"`): + +```bash +./build-release/bin/BridgeMuteCaller wss://... my-receiver +``` + +## Sample output + +### Receiver + +``` +./build-release/bin/BridgeMuteReceiver +[receiver] Connecting to wss://sderosasandbox-15g80zq7.livekit.cloud ... +[receiver] Connected. +cs.state() is 1 connection_state_ is 1 +[receiver] Published audio track "mic" and video track "cam". +[receiver] Waiting for remote mute/unmute commands... +[receiver] Using SDL microphone. +[receiver] Using SDL camera. +[receiver] Press Ctrl-C to stop. +[RpcManager] Handling track control RPC: mute:mic +[RpcManager] Handling track control RPC: mute:cam +[RpcManager] Handling track control RPC: unmute:mic +[RpcManager] Handling track control RPC: unmute:cam +``` + +### Caller + +``` +./build-release/bin/BridgeMuteCaller +[caller] Connecting to wss://sderosasandbox-15g80zq7.livekit.cloud ... +cs.state() is 1 connection_state_ is 1 +[caller] Connected. +[caller] Target receiver identity: "receiver" +[caller] Subscribed to receiver's mic + cam. +[caller] Rendering receiver feed. Toggling mute every 5s. Close window or Ctrl-C to stop. +[caller] Speaker opened: 48000 Hz, 1 ch. + +[caller] --- Cycle 1: MUTE --- +[caller] mic: muted OK +[caller] cam: muted OK + +[caller] --- Cycle 2: UNMUTE --- +[caller] mic: unmuted OK +[caller] cam: unmuted OK +``` + +## Notes + +- The receiver uses SDL3 for microphone and camera capture. On macOS you + may need to grant camera/microphone permissions. +- If no hardware is detected, the receiver falls back to sending silence + (audio) and alternating solid-color frames (video). +- The caller opens an SDL3 window to render the received video and plays + audio through the default speaker. diff --git a/examples/bridge_mute_unmute/caller.cpp b/examples/bridge_mute_unmute/caller.cpp new file mode 100644 index 00000000..0c9b30e2 --- /dev/null +++ b/examples/bridge_mute_unmute/caller.cpp @@ -0,0 +1,337 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Caller (controller) for the bridge mute/unmute example. + * + * Connects to the same room as the receiver, subscribes to the receiver's + * "mic" and "cam" tracks, and renders them via SDL3 (speaker + window). + * Every 5 seconds the caller toggles mute/unmute on both tracks via RPC, + * so you can see and hear the tracks go silent and come back. + * + * Usage: + * BridgeMuteCaller [receiver-identity] + * LIVEKIT_URL=... LIVEKIT_TOKEN=... BridgeMuteCaller [receiver-identity] + * + * The token must grant a different identity (e.g. "caller"). Generate with: + * lk token create --api-key --api-secret \ + * --join --room my-room --identity caller --valid-for 24h + */ + +#include "livekit/audio_frame.h" +#include "livekit/rpc_error.h" +#include "livekit/track.h" +#include "livekit/video_frame.h" +#include "livekit_bridge/livekit_bridge.h" +#include "sdl_media.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static std::atomic g_running{true}; +static void handleSignal(int) { g_running.store(false); } + +struct LatestVideoFrame { + std::mutex mutex; + std::vector data; + int width = 0; + int height = 0; + bool dirty = false; +}; + +static LatestVideoFrame g_latest_video; + +static void storeFrame(const livekit::VideoFrame &frame) { + const std::uint8_t *src = frame.data(); + const std::size_t size = frame.dataSize(); + if (!src || size == 0) + return; + + std::lock_guard lock(g_latest_video.mutex); + g_latest_video.data.assign(src, src + size); + g_latest_video.width = frame.width(); + g_latest_video.height = frame.height(); + g_latest_video.dirty = true; +} + +int main(int argc, char *argv[]) { + std::string url, token; + std::string receiver_identity = "receiver"; + + std::vector positional; + for (int i = 1; i < argc; ++i) { + positional.push_back(argv[i]); + } + + if (positional.size() >= 2) { + url = positional[0]; + token = positional[1]; + if (positional.size() >= 3) + receiver_identity = positional[2]; + } else { + const char *e = std::getenv("LIVEKIT_URL"); + if (e) + url = e; + e = std::getenv("LIVEKIT_TOKEN"); + if (e) + token = e; + if (!positional.empty()) + receiver_identity = positional[0]; + } + if (url.empty() || token.empty()) { + std::cerr + << "Usage: BridgeMuteCaller [receiver-identity]\n" + << " or: LIVEKIT_URL=... LIVEKIT_TOKEN=... BridgeMuteCaller " + "[receiver-identity]\n" + << "Default receiver-identity: \"receiver\"\n"; + return 1; + } + + std::signal(SIGINT, handleSignal); + + // ----- Initialize SDL3 ----- + if (!SDL_Init(SDL_INIT_VIDEO | SDL_INIT_AUDIO)) { + std::cerr << "[caller] SDL_Init failed: " << SDL_GetError() << "\n"; + return 1; + } + + constexpr int kWindowWidth = 640; + constexpr int kWindowHeight = 480; + + SDL_Window *window = SDL_CreateWindow("Caller - Receiver Feed", kWindowWidth, + kWindowHeight, 0); + if (!window) { + std::cerr << "[caller] SDL_CreateWindow failed: " << SDL_GetError() << "\n"; + SDL_Quit(); + return 1; + } + + SDL_Renderer *renderer = SDL_CreateRenderer(window, nullptr); + if (!renderer) { + std::cerr << "[caller] SDL_CreateRenderer failed: " << SDL_GetError() + << "\n"; + SDL_DestroyWindow(window); + SDL_Quit(); + return 1; + } + + SDL_Texture *texture = nullptr; + int tex_width = 0; + int tex_height = 0; + + std::unique_ptr speaker; + std::mutex speaker_mutex; + + // ----- Connect to LiveKit ----- + livekit_bridge::LiveKitBridge bridge; + std::cout << "[caller] Connecting to " << url << " ...\n"; + + livekit::RoomOptions options; + options.auto_subscribe = true; + + if (!bridge.connect(url, token, options)) { + std::cerr << "[caller] Failed to connect.\n"; + SDL_DestroyRenderer(renderer); + SDL_DestroyWindow(window); + SDL_Quit(); + return 1; + } + std::cout << "[caller] Connected.\n"; + std::cout << "[caller] Target receiver identity: \"" << receiver_identity + << "\"\n"; + + // ----- Subscribe to receiver's audio ----- + bridge.setOnAudioFrameCallback( + receiver_identity, livekit::TrackSource::SOURCE_MICROPHONE, + [&speaker, &speaker_mutex](const livekit::AudioFrame &frame) { + const auto &samples = frame.data(); + if (samples.empty()) + return; + + std::lock_guard lock(speaker_mutex); + if (!speaker) { + speaker = std::make_unique(frame.sample_rate(), + frame.num_channels()); + if (!speaker->init()) { + std::cerr << "[caller] Failed to init SDL speaker.\n"; + speaker.reset(); + return; + } + std::cout << "[caller] Speaker opened: " << frame.sample_rate() + << " Hz, " << frame.num_channels() << " ch.\n"; + } + speaker->enqueue(samples.data(), frame.samples_per_channel()); + }); + + // ----- Subscribe to receiver's video ----- + bridge.setOnVideoFrameCallback( + receiver_identity, livekit::TrackSource::SOURCE_CAMERA, + [](const livekit::VideoFrame &frame, std::int64_t /*timestamp_us*/) { + storeFrame(frame); + }); + + std::cout << "[caller] Subscribed to receiver's mic + cam.\n"; + + // ----- Mute/unmute toggle thread ----- + std::atomic muted{false}; + std::atomic cycle{0}; + + std::atomic toggle_running{true}; + std::thread toggle_thread([&]() { + // Let the receiver connect and publish before we start toggling + for (int i = 0; i < 30 && toggle_running.load(); ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + while (toggle_running.load()) { + bool currently_muted = muted.load(); + const char *action = currently_muted ? "UNMUTE" : "MUTE"; + int c = cycle.fetch_add(1) + 1; + std::cout << "\n[caller] --- Cycle " << c << ": " << action << " ---\n"; + + // Toggle audio track "mic" + try { + if (currently_muted) { + bridge.requestTrackUnmute(receiver_identity, "mic"); + std::cout << "[caller] mic: unmuted OK\n"; + } else { + bridge.requestTrackMute(receiver_identity, "mic"); + std::cout << "[caller] mic: muted OK\n"; + } + } catch (const livekit::RpcError &e) { + std::cerr << "[caller] mic: RPC error (code=" << e.code() << " msg=\"" + << e.message() << "\")\n"; + } catch (const std::exception &e) { + std::cerr << "[caller] mic: error: " << e.what() << "\n"; + } + + // Toggle video track "cam" + try { + if (currently_muted) { + bridge.requestTrackUnmute(receiver_identity, "cam"); + std::cout << "[caller] cam: unmuted OK\n"; + } else { + bridge.requestTrackMute(receiver_identity, "cam"); + std::cout << "[caller] cam: muted OK\n"; + } + } catch (const livekit::RpcError &e) { + std::cerr << "[caller] cam: RPC error (code=" << e.code() << " msg=\"" + << e.message() << "\")\n"; + } catch (const std::exception &e) { + std::cerr << "[caller] cam: error: " << e.what() << "\n"; + } + + muted.store(!currently_muted); + + // Wait ~100 seconds, checking for shutdown every 100ms + for (int i = 0; i < 100 && toggle_running.load(); ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + }); + + // ----- Main loop: render video + pump SDL events ----- + std::cout << "[caller] Rendering receiver feed. Toggling mute every 5s. " + "Close window or Ctrl-C to stop.\n"; + + std::vector local_pixels; + + while (g_running.load()) { + SDL_Event ev; + while (SDL_PollEvent(&ev)) { + if (ev.type == SDL_EVENT_QUIT) { + g_running.store(false); + } + } + + int fw = 0, fh = 0; + bool have_frame = false; + { + std::lock_guard lock(g_latest_video.mutex); + if (g_latest_video.dirty && g_latest_video.width > 0 && + g_latest_video.height > 0) { + fw = g_latest_video.width; + fh = g_latest_video.height; + local_pixels.swap(g_latest_video.data); + g_latest_video.dirty = false; + have_frame = true; + } + } + + if (have_frame) { + if (fw != tex_width || fh != tex_height) { + if (texture) + SDL_DestroyTexture(texture); + texture = SDL_CreateTexture(renderer, SDL_PIXELFORMAT_RGBA32, + SDL_TEXTUREACCESS_STREAMING, fw, fh); + tex_width = fw; + tex_height = fh; + } + + if (texture) { + void *pixels = nullptr; + int pitch = 0; + if (SDL_LockTexture(texture, nullptr, &pixels, &pitch)) { + const int srcPitch = fw * 4; + for (int y = 0; y < fh; ++y) { + std::memcpy(static_cast(pixels) + y * pitch, + local_pixels.data() + y * srcPitch, srcPitch); + } + SDL_UnlockTexture(texture); + } + } + } + + SDL_SetRenderDrawColor(renderer, 0, 0, 0, 255); + SDL_RenderClear(renderer); + if (texture) { + SDL_RenderTexture(renderer, texture, nullptr, nullptr); + } + SDL_RenderPresent(renderer); + + SDL_Delay(16); + } + + // ----- Cleanup ----- + std::cout << "\n[caller] Shutting down...\n"; + toggle_running.store(false); + if (toggle_thread.joinable()) + toggle_thread.join(); + + bridge.disconnect(); + + { + std::lock_guard lock(speaker_mutex); + speaker.reset(); + } + + if (texture) + SDL_DestroyTexture(texture); + SDL_DestroyRenderer(renderer); + SDL_DestroyWindow(window); + SDL_Quit(); + + std::cout << "[caller] Done.\n"; + return 0; +} diff --git a/examples/bridge_mute_unmute/receiver.cpp b/examples/bridge_mute_unmute/receiver.cpp new file mode 100644 index 00000000..1abafbc9 --- /dev/null +++ b/examples/bridge_mute_unmute/receiver.cpp @@ -0,0 +1,266 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Receiver (publisher) for the bridge mute/unmute example. + * + * Publishes an audio track ("mic") and a video track ("cam"), then enables + * remote track control so that a remote caller can mute/unmute them via RPC. + * + * By default, captures from the real microphone and webcam using SDL3. If + * no hardware is available, falls back to silence (audio) and solid-color + * frames (video). + * + * Usage: + * BridgeMuteReceiver + * LIVEKIT_URL=... LIVEKIT_TOKEN=... BridgeMuteReceiver + * + * The token must grant identity "receiver". Generate one with: + * lk token create --api-key --api-secret \ + * --join --room my-room --identity receiver --valid-for 24h + */ + +#include "livekit/track.h" +#include "livekit_bridge/livekit_bridge.h" +#include "sdl_media.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static std::atomic g_running{true}; +static void handleSignal(int) { g_running.store(false); } + +int main(int argc, char *argv[]) { + std::string url, token; + if (argc >= 3) { + url = argv[1]; + token = argv[2]; + } else { + const char *e = std::getenv("LIVEKIT_URL"); + if (e) + url = e; + e = std::getenv("LIVEKIT_TOKEN"); + if (e) + token = e; + } + if (url.empty() || token.empty()) { + std::cerr + << "Usage: BridgeMuteReceiver \n" + << " or: LIVEKIT_URL=... LIVEKIT_TOKEN=... BridgeMuteReceiver\n"; + return 1; + } + + std::signal(SIGINT, handleSignal); + + // ----- Initialize SDL3 ----- + if (!SDL_Init(SDL_INIT_VIDEO | SDL_INIT_AUDIO | SDL_INIT_CAMERA)) { + std::cerr << "[receiver] SDL_Init failed: " << SDL_GetError() << "\n"; + return 1; + } + + // ----- Connect to LiveKit ----- + livekit_bridge::LiveKitBridge bridge; + std::cout << "[receiver] Connecting to " << url << " ...\n"; + + livekit::RoomOptions options; + options.auto_subscribe = true; + + if (!bridge.connect(url, token, options)) { + std::cerr << "[receiver] Failed to connect.\n"; + SDL_Quit(); + return 1; + } + std::cout << "[receiver] Connected.\n"; + + constexpr int kSampleRate = 48000; + constexpr int kChannels = 1; + constexpr int kWidth = 1280; + constexpr int kHeight = 720; + + auto mic = bridge.createAudioTrack("mic", kSampleRate, kChannels, + livekit::TrackSource::SOURCE_MICROPHONE); + auto cam = bridge.createVideoTrack("cam", kWidth, kHeight, + livekit::TrackSource::SOURCE_CAMERA); + + std::cout << "[receiver] Published audio track \"mic\" and video track " + "\"cam\".\n"; + std::cout << "[receiver] Waiting for remote mute/unmute commands...\n"; + + // ----- SDL Mic capture ----- + bool mic_using_sdl = false; + std::unique_ptr sdl_mic; + std::atomic mic_running{true}; + std::thread mic_thread; + + { + int recCount = 0; + SDL_AudioDeviceID *recDevs = SDL_GetAudioRecordingDevices(&recCount); + bool has_mic = recDevs && recCount > 0; + if (recDevs) + SDL_free(recDevs); + + if (has_mic) { + sdl_mic = std::make_unique( + kSampleRate, kChannels, kSampleRate / 100, + [&mic](const int16_t *samples, int num_samples_per_channel, + int /*sample_rate*/, int /*num_channels*/) { + mic->pushFrame(samples, num_samples_per_channel); + }); + + if (sdl_mic->init()) { + mic_using_sdl = true; + std::cout << "[receiver] Using SDL microphone.\n"; + mic_thread = std::thread([&]() { + while (mic_running.load()) { + sdl_mic->pump(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + }); + } else { + std::cerr << "[receiver] SDL mic init failed.\n"; + sdl_mic.reset(); + } + } + + if (!mic_using_sdl) { + std::cout << "[receiver] No microphone found; sending silence.\n"; + mic_thread = std::thread([&]() { + const int kSamplesPerFrame = kSampleRate / 100; + std::vector silence(kSamplesPerFrame * kChannels, 0); + auto next = std::chrono::steady_clock::now(); + while (mic_running.load()) { + mic->pushFrame(silence, kSamplesPerFrame); + next += std::chrono::milliseconds(10); + std::this_thread::sleep_until(next); + } + }); + } + } + + // ----- SDL Camera capture ----- + bool cam_using_sdl = false; + std::unique_ptr sdl_cam; + std::atomic cam_running{true}; + std::thread cam_thread; + + { + int camCount = 0; + SDL_CameraID *cams = SDL_GetCameras(&camCount); + bool has_cam = cams && camCount > 0; + if (cams) + SDL_free(cams); + + if (has_cam) { + sdl_cam = std::make_unique( + kWidth, kHeight, 30, SDL_PIXELFORMAT_RGBA32, + [&cam](const uint8_t *pixels, int pitch, int width, int height, + SDL_PixelFormat /*fmt*/, Uint64 timestampNS) { + const int dstPitch = width * 4; + std::vector buf(dstPitch * height); + for (int y = 0; y < height; ++y) { + std::memcpy(buf.data() + y * dstPitch, pixels + y * pitch, + dstPitch); + } + cam->pushFrame(buf.data(), buf.size(), + static_cast(timestampNS / 1000)); + }); + + if (sdl_cam->init()) { + cam_using_sdl = true; + std::cout << "[receiver] Using SDL camera.\n"; + cam_thread = std::thread([&]() { + while (cam_running.load()) { + sdl_cam->pump(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + }); + } else { + std::cerr << "[receiver] SDL camera init failed.\n"; + sdl_cam.reset(); + } + } + + if (!cam_using_sdl) { + std::cout << "[receiver] No camera found; sending solid-color frames.\n"; + cam_thread = std::thread([&]() { + std::vector frame(kWidth * kHeight * 4); + std::int64_t ts = 0; + int frame_num = 0; + + while (cam_running.load()) { + bool blue = (frame_num / 30) % 2 == 0; + for (int i = 0; i < kWidth * kHeight; ++i) { + frame[i * 4 + 0] = 0; + frame[i * 4 + 1] = + blue ? static_cast(0) : static_cast(180); + frame[i * 4 + 2] = + blue ? static_cast(200) : static_cast(0); + frame[i * 4 + 3] = 255; + } + + cam->pushFrame(frame, ts); + + ++frame_num; + ts += 33333; + std::this_thread::sleep_for(std::chrono::milliseconds(33)); + } + }); + } + } + + // ----- Main loop: pump SDL events (needed for camera approval on macOS) + // ----- + std::cout << "[receiver] Press Ctrl-C to stop.\n"; + while (g_running.load()) { + SDL_Event ev; + while (SDL_PollEvent(&ev)) { + if (ev.type == SDL_EVENT_QUIT) { + g_running.store(false); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + // ----- Cleanup ----- + std::cout << "[receiver] Shutting down...\n"; + mic_running.store(false); + cam_running.store(false); + if (mic_thread.joinable()) + mic_thread.join(); + if (cam_thread.joinable()) + cam_thread.join(); + sdl_mic.reset(); + sdl_cam.reset(); + + mic.reset(); + cam.reset(); + bridge.disconnect(); + + SDL_Quit(); + std::cout << "[receiver] Done.\n"; + return 0; +} diff --git a/examples/bridge_rpc/README.md b/examples/bridge_rpc/README.md new file mode 100644 index 00000000..8969619e --- /dev/null +++ b/examples/bridge_rpc/README.md @@ -0,0 +1,105 @@ +# Bridge RPC Example + +A minimal example of custom user-registered RPC methods using the +`LiveKitBridge` high-level API. + +Two headless executables — **BridgeRpcReceiver** and **BridgeRpcCaller** — +connect to the same LiveKit room. The receiver registers a `"print"` RPC +method that logs the caller's message and sleeps for a variable duration +before responding. The caller sends a numbered message every ~1 second and +prints the round-trip time. + +## Sleep schedule + +The receiver picks a sleep duration based on the call number: + +| Call number | Sleep | +|---------------|---------| +| `%10 == 0` | 20 s | +| `%5 == 0` | 10 s | +| otherwise | 1 s | + +Because the default LiveKit RPC timeout is 15 seconds, the caller sets a +30-second timeout so the 20-second sleeps can complete. The 10-second and +20-second cases demonstrate how long-running handlers affect the caller's +blocking `performRpc` call. + +## Running + +Generate two tokens for the same room with different identities: + +```bash +lk token create --join --room my-room --identity receiver --valid-for 24h +lk token create --join --room my-room --identity caller --valid-for 24h +``` + +Start the receiver first, then the caller: + +```bash +# Terminal 1 +LIVEKIT_URL=wss://... LIVEKIT_TOKEN= ./build-release/bin/BridgeRpcReceiver + +# Terminal 2 +LIVEKIT_URL=wss://... LIVEKIT_TOKEN= ./build-release/bin/BridgeRpcCaller +``` + +## Sample output + +### Receiver + +``` +[receiver] Connecting to wss://example.livekit.cloud ... +[receiver] Connected. +[receiver] Registered RPC method "print". +[receiver] call %10==0 -> 20s sleep +[receiver] call %5==0 -> 10s sleep +[receiver] otherwise -> 1s sleep +[receiver] Waiting for calls... +[receiver] Call #1 from caller: "Hello from caller #1" (sleeping 1s) +[receiver] Call #1 done. +[receiver] Call #2 from caller: "Hello from caller #2" (sleeping 1s) +[receiver] Call #2 done. +[receiver] Call #3 from caller: "Hello from caller #3" (sleeping 1s) +[receiver] Call #3 done. +[receiver] Call #4 from caller: "Hello from caller #4" (sleeping 1s) +[receiver] Call #4 done. +[receiver] Call #5 from caller: "Hello from caller #5" (sleeping 10s) +[receiver] Call #5 done. +[receiver] Call #6 from caller: "Hello from caller #6" (sleeping 1s) +[receiver] Call #6 done. +[receiver] Call #7 from caller: "Hello from caller #7" (sleeping 1s) +[receiver] Call #7 done. +[receiver] Call #8 from caller: "Hello from caller #8" (sleeping 1s) +[receiver] Call #8 done. +[receiver] Call #9 from caller: "Hello from caller #9" (sleeping 1s) +[receiver] Call #9 done. +[receiver] Call #10 from caller: "Hello from caller #10" (sleeping 20s) +[receiver] Call #10 done. +``` + +### Caller + +``` +[caller] Connecting to wss://example.livekit.cloud ... +[caller] Connected. +[caller] #1 Sending: "Hello from caller #1" ... +[caller] #1 Response: "ok (slept 1s)" (1159ms) +[caller] #2 Sending: "Hello from caller #2" ... +[caller] #2 Response: "ok (slept 1s)" (1174ms) +[caller] #3 Sending: "Hello from caller #3" ... +[caller] #3 Response: "ok (slept 1s)" (1152ms) +[caller] #4 Sending: "Hello from caller #4" ... +[caller] #4 Response: "ok (slept 1s)" (1135ms) +[caller] #5 Sending: "Hello from caller #5" ... +[caller] #5 Response: "ok (slept 10s)" (10139ms) +[caller] #6 Sending: "Hello from caller #6" ... +[caller] #6 Response: "ok (slept 1s)" (1138ms) +[caller] #7 Sending: "Hello from caller #7" ... +[caller] #7 Response: "ok (slept 1s)" (1143ms) +[caller] #8 Sending: "Hello from caller #8" ... +[caller] #8 Response: "ok (slept 1s)" (1115ms) +[caller] #9 Sending: "Hello from caller #9" ... +[caller] #9 Response: "ok (slept 1s)" (1123ms) +[caller] #10 Sending: "Hello from caller #10" ... +[caller] #10 Response: "ok (slept 20s)" (20119ms) +``` diff --git a/examples/bridge_rpc/custom_caller.cpp b/examples/bridge_rpc/custom_caller.cpp new file mode 100644 index 00000000..4ff5d355 --- /dev/null +++ b/examples/bridge_rpc/custom_caller.cpp @@ -0,0 +1,122 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Caller for the bridge_rpc example. + * + * Connects to a LiveKit room as "caller" and sends a string to the + * receiver's custom "print" RPC method every second. The receiver + * sleeps for 1s, 10s, or 20s depending on the call number, so some + * calls will take noticeably longer to return. + * + * Usage: + * BridgeRpcCaller + * LIVEKIT_URL=... LIVEKIT_TOKEN=... BridgeRpcCaller + * + * Generate a token with: + * lk token create --join --room --identity caller --valid-for 24h + */ + +#include "livekit/rpc_error.h" +#include "livekit_bridge/livekit_bridge.h" + +#include +#include +#include +#include +#include +#include +#include + +static std::atomic g_running{true}; +static void handleSignal(int) { g_running.store(false); } + +int main(int argc, char *argv[]) { + std::string url, token; + if (argc >= 3) { + url = argv[1]; + token = argv[2]; + } else { + const char *e = std::getenv("LIVEKIT_URL"); + if (e) + url = e; + e = std::getenv("LIVEKIT_TOKEN"); + if (e) + token = e; + } + if (url.empty() || token.empty()) { + std::cerr << "Usage: BridgeRpcCaller \n" + << " or: LIVEKIT_URL=... LIVEKIT_TOKEN=... BridgeRpcCaller\n"; + return 1; + } + + std::signal(SIGINT, handleSignal); + + livekit_bridge::LiveKitBridge bridge; + std::cout << "[caller] Connecting to " << url << " ...\n"; + + livekit::RoomOptions options; + if (!bridge.connect(url, token, options)) { + std::cerr << "[caller] Failed to connect.\n"; + return 1; + } + std::cout << "[caller] Connected.\n"; + + // Give the receiver a moment to join and register its handler. + for (int i = 0; i < 30 && g_running.load(); ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + int count = 0; + while (g_running.load()) { + ++count; + std::string message = "Hello from caller #" + std::to_string(count); + + std::cout << "[caller] #" << count << " Sending: \"" << message + << "\" ...\n"; + + auto t0 = std::chrono::steady_clock::now(); + try { + auto response = + bridge.performRpc("receiver", "print", message, std::nullopt); + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - t0) + .count(); + if (response.has_value()) { + std::cout << "[caller] #" << count << " Response: \"" + << response.value() << "\" (" << elapsed << "ms)\n"; + } else { + std::cout << "[caller] #" << count << " No response (" << elapsed + << "ms)\n"; + } + } catch (const livekit::RpcError &e) { + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - t0) + .count(); + std::cerr << "[caller] #" << count << " RPC error (code=" << e.code() + << " msg=\"" << e.message() << "\") (" << elapsed << "ms)\n"; + } catch (const std::exception &e) { + std::cerr << "[caller] #" << count << " Error: " << e.what() << "\n"; + } + + for (int i = 0; i < 10 && g_running.load(); ++i) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + std::cout << "[caller] Shutting down...\n"; + bridge.disconnect(); + std::cout << "[caller] Done.\n"; + return 0; +} diff --git a/examples/bridge_rpc/custom_receiver.cpp b/examples/bridge_rpc/custom_receiver.cpp new file mode 100644 index 00000000..a98cbd3b --- /dev/null +++ b/examples/bridge_rpc/custom_receiver.cpp @@ -0,0 +1,113 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Receiver for the bridge_rpc example. + * + * Connects to a LiveKit room as "receiver", registers a custom RPC method + * called "print", and prints whatever string the caller sends. + * + * Usage: + * BridgeRpcReceiver + * LIVEKIT_URL=... LIVEKIT_TOKEN=... BridgeRpcReceiver + * + * Generate a token with: + * lk token create --join --room --identity receiver --valid-for 24h + */ + +#include "livekit_bridge/livekit_bridge.h" + +#include +#include +#include +#include +#include +#include +#include + +static std::atomic g_running{true}; +static void handleSignal(int) { g_running.store(false); } + +int main(int argc, char *argv[]) { + std::string url, token; + if (argc >= 3) { + url = argv[1]; + token = argv[2]; + } else { + const char *e = std::getenv("LIVEKIT_URL"); + if (e) + url = e; + e = std::getenv("LIVEKIT_TOKEN"); + if (e) + token = e; + } + if (url.empty() || token.empty()) { + std::cerr << "Usage: BridgeRpcReceiver \n" + << " or: LIVEKIT_URL=... LIVEKIT_TOKEN=... BridgeRpcReceiver\n"; + return 1; + } + + std::signal(SIGINT, handleSignal); + + livekit_bridge::LiveKitBridge bridge; + std::cout << "[receiver] Connecting to " << url << " ...\n"; + + livekit::RoomOptions options; + if (!bridge.connect(url, token, options)) { + std::cerr << "[receiver] Failed to connect.\n"; + return 1; + } + std::cout << "[receiver] Connected.\n"; + + std::atomic call_count{0}; + + bridge.registerRpcMethod( + "print", + [&call_count](const livekit::RpcInvocationData &data) + -> std::optional { + int n = call_count.fetch_add(1) + 1; + + int sleep_sec = 1; + if (n % 10 == 0) + sleep_sec = 20; + else if (n % 5 == 0) + sleep_sec = 10; + + std::cout << "[receiver] Call #" << n << " from " + << data.caller_identity << ": \"" << data.payload + << "\" (sleeping " << sleep_sec << "s)\n"; + + std::this_thread::sleep_for(std::chrono::seconds(sleep_sec)); + + std::cout << "[receiver] Call #" << n << " done.\n"; + return "ok (slept " + std::to_string(sleep_sec) + "s)"; + }); + + std::cout << "[receiver] Registered RPC method \"print\".\n" + << "[receiver] call %10==0 -> 20s sleep\n" + << "[receiver] call %5==0 -> 10s sleep\n" + << "[receiver] otherwise -> 1s sleep\n" + << "[receiver] Waiting for calls...\n"; + + while (g_running.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + std::cout << "[receiver] Shutting down...\n"; + bridge.disconnect(); + std::cout << "[receiver] Done.\n"; + return 0; +} diff --git a/include/livekit/local_participant.h b/include/livekit/local_participant.h index a624238e..3d0954c6 100644 --- a/include/livekit/local_participant.h +++ b/include/livekit/local_participant.h @@ -152,9 +152,10 @@ class LocalParticipant : public Participant { * @throws std::runtime_error If the underlying FFI handle is invalid or * the FFI call fails unexpectedly. */ - std::string performRpc(const std::string &destination_identity, - const std::string &method, const std::string &payload, - std::optional response_timeout = std::nullopt); + std::string + performRpc(const std::string &destination_identity, const std::string &method, + const std::string &payload, + const std::optional &response_timeout = std::nullopt); /** * Register a handler for an incoming RPC method. diff --git a/include/livekit/track.h b/include/livekit/track.h index 2487d396..850e359b 100644 --- a/include/livekit/track.h +++ b/include/livekit/track.h @@ -88,7 +88,7 @@ class Track { std::optional mime_type() const noexcept { return mime_type_; } // Handle access - bool has_handle() const noexcept { return !handle_.valid(); } + bool has_handle() const noexcept { return handle_.valid(); } uintptr_t ffi_handle_id() const noexcept { return handle_.get(); } // Async get stats diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 20f26333..b52d1464 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -317,7 +317,7 @@ FfiClient::connectAsync(const std::string &url, const std::string &token, auto *opts = connect->mutable_options(); opts->set_auto_subscribe(options.auto_subscribe); opts->set_dynacast(options.dynacast); - opts->set_single_peer_connection(options.single_peer_connection); + // --- E2EE / encryption (optional) --- if (options.encryption.has_value()) { const E2EEOptions &e2ee = *options.encryption; diff --git a/src/local_participant.cpp b/src/local_participant.cpp index aa1b869f..c908cc9e 100644 --- a/src/local_participant.cpp +++ b/src/local_participant.cpp @@ -225,7 +225,7 @@ void LocalParticipant::unpublishTrack(const std::string &track_sid) { std::string LocalParticipant::performRpc( const std::string &destination_identity, const std::string &method, - const std::string &payload, std::optional response_timeout) { + const std::string &payload, const std::optional &response_timeout) { auto handle_id = ffiHandleId(); if (handle_id == 0) { throw std::runtime_error(