-
Notifications
You must be signed in to change notification settings - Fork 1
FCE-2750 Add support for agent image capture #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 4 commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| [submodule "protos"] | ||
| path = protos | ||
| url = https://github.com/fishjam-cloud/protos.git | ||
| branch = "agent-capture-image" | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| # Multimodal Demo | ||
|
|
||
| A Fishjam SDK example that uses the Gemini Live API for multimodal real-time interaction. Users can speak to ask questions about captured video frames, and Gemini responds via voice. | ||
|
|
||
| ## How It Works | ||
|
|
||
| 1. A peer connects with both audio and video tracks | ||
| 2. The agent periodically captures images from video tracks | ||
| 3. When you speak ("What do you see?", "Describe this"), your audio and the captured images are sent to Gemini | ||
| 4. Gemini analyzes the visual content and responds with voice | ||
|
|
||
| ## Setup | ||
|
|
||
| 1. Set environment variables: | ||
|
|
||
| ```bash | ||
| export FISHJAM_ID="your-fishjam-id" | ||
| export FISHJAM_MANAGEMENT_TOKEN="your-token" | ||
| export GOOGLE_API_KEY="your-gemini-api-key" | ||
| ``` | ||
|
|
||
| 2. Optionally configure the image capture interval (default: 5 seconds): | ||
|
|
||
| ```bash | ||
| export IMAGE_CAPTURE_INTERVAL="3.0" | ||
| ``` | ||
|
|
||
| ## Running | ||
|
|
||
| ```bash | ||
| cd examples/multimodal | ||
| uv run uvicorn main:app | ||
| ``` | ||
|
|
||
| ## Usage | ||
|
|
||
| 1. The server will start on `http://localhost:8000` | ||
| 2. GET `/` returns a peer token for connecting a browser client | ||
| 3. Connect a peer with video and audio tracks | ||
| 4. Speak questions about what your camera sees | ||
| 5. Gemini will respond with voice analysis of the captured frames |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| from contextlib import asynccontextmanager | ||
| from typing import Annotated | ||
|
|
||
| from fastapi import Depends, FastAPI | ||
| from multimodal.notifier import make_notifier | ||
| from multimodal.room import RoomService, fishjam | ||
| from multimodal.worker import async_worker | ||
|
|
||
| _room_service: RoomService | None = None | ||
|
|
||
|
|
||
| def get_room_service(): | ||
| if not _room_service: | ||
| raise RuntimeError("Application skipped lifespan events!") | ||
| return _room_service | ||
|
|
||
|
|
||
| @asynccontextmanager | ||
| async def lifespan(_app: FastAPI): | ||
| async with async_worker() as worker: | ||
| global _room_service | ||
| _room_service = RoomService(worker) | ||
| notifier = make_notifier(_room_service) | ||
| worker.run_in_background(notifier.connect()) | ||
|
|
||
| yield | ||
|
|
||
|
|
||
| app = FastAPI(lifespan=lifespan) | ||
|
|
||
|
|
||
| @app.get("/") | ||
| def get_peer(room_service: Annotated[RoomService, Depends(get_room_service)]): | ||
| _peer, token = fishjam.create_peer(room_service.get_room().id) | ||
| return token |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| import asyncio | ||
|
|
||
| from fishjam.agent import Agent, AgentSession, IncomingTrackData, IncomingTrackImage | ||
| from fishjam.integrations.gemini import GeminiIntegration | ||
|
|
||
| from .config import IMAGE_CAPTURE_INTERVAL | ||
| from .session import MultimodalSession | ||
| from .worker import BackgroundWorker | ||
|
|
||
|
|
||
| class MultimodalAgent: | ||
| def __init__(self, room_id: str, agent: Agent, worker: BackgroundWorker): | ||
| self._room_id = room_id | ||
| self._agent = agent | ||
| self._worker = worker | ||
| self._task: asyncio.Task[None] | None = None | ||
| self._capture_task: asyncio.Task[None] | None = None | ||
|
|
||
| # Session management per peer | ||
| self._sessions: dict[str, MultimodalSession] = {} | ||
|
|
||
| # Track caching: peer_id -> set of video track_ids | ||
| self._peer_tracks: dict[str, set[str]] = {} | ||
| # Reverse lookup: track_id -> peer_id | ||
| self._track_to_peer: dict[str, str] = {} | ||
|
|
||
| # Agent session reference for capturing images and sending audio | ||
| self._agent_session: AgentSession | None = None | ||
|
|
||
| async def _start(self): | ||
| async with self._agent.connect() as session: | ||
| self._agent_session = session | ||
|
|
||
| # Create output track for Gemini audio responses | ||
|
|
||
| self._output_track = await session.add_track( | ||
| GeminiIntegration.GEMINI_OUTPUT_AUDIO_SETTINGS | ||
| ) | ||
| print(f"Agent connected to room {self._room_id}") | ||
|
|
||
| async for message in session.receive(): | ||
| match message: | ||
| case IncomingTrackImage(track_id=track_id, data=data): | ||
| peer_id = self._track_to_peer.get(track_id) | ||
| if peer_id and peer_id in self._sessions: | ||
| self._sessions[peer_id].send_image(data) | ||
| print(f"Sent image from {track_id} to {peer_id}") | ||
|
|
||
| case IncomingTrackData(peer_id=peer_id, data=data): | ||
| if peer_id in self._sessions: | ||
| self._sessions[peer_id].send_audio(data) | ||
|
|
||
| self._agent_session = None | ||
| print(f"Agent disconnected from room {self._room_id}") | ||
|
|
||
| async def _periodic_capture(self): | ||
| while True: | ||
| await asyncio.sleep(IMAGE_CAPTURE_INTERVAL) | ||
|
|
||
| if not self._agent_session: | ||
| continue | ||
|
|
||
| # Capture images from all known video tracks | ||
| for track_id in self._track_to_peer.keys(): | ||
| try: | ||
| await self._agent_session.capture_image(track_id) | ||
| print(f"Requested image capture for track {track_id}") | ||
| except Exception as e: | ||
| print(f"Error capturing image from track {track_id}: {e}") | ||
|
|
||
| def _handle_audio_response(self, peer_id: str, audio: bytes): | ||
| if self._output_track: | ||
| self._worker.run_in_background(self._output_track.send_chunk(audio)) | ||
|
|
||
| def on_peer_enter(self, peer_id: str): | ||
| if peer_id in self._sessions: | ||
| return | ||
|
|
||
| # Initialize track cache for this peer | ||
| self._peer_tracks[peer_id] = set() | ||
|
|
||
| # Start agent connection if this is the first peer | ||
| if len(self._sessions) == 0: | ||
| self._task = self._worker.run_in_background(self._start()) | ||
| capture_coro = self._periodic_capture() | ||
| self._capture_task = self._worker.run_in_background(capture_coro) | ||
|
|
||
| # Create multimodal session for this peer | ||
| def on_audio(audio: bytes, pid: str = peer_id): | ||
| self._handle_audio_response(pid, audio) | ||
|
|
||
| session = MultimodalSession(on_audio) | ||
| self._sessions[peer_id] = session | ||
| self._worker.run_in_background(session.start(peer_id)) | ||
|
|
||
| def on_peer_leave(self, peer_id: str): | ||
| if peer_id not in self._sessions: | ||
| return | ||
|
|
||
| # Clean up track cache | ||
| if peer_id in self._peer_tracks: | ||
| for track_id in self._peer_tracks[peer_id]: | ||
| self._track_to_peer.pop(track_id, None) | ||
| del self._peer_tracks[peer_id] | ||
|
|
||
| # End the session | ||
| self._sessions.pop(peer_id).end() | ||
|
|
||
| # Stop agent if no more sessions | ||
| if len(self._sessions) == 0: | ||
| if self._task is not None: | ||
| self._task.cancel() | ||
| self._task = None | ||
| if self._capture_task is not None: | ||
| self._capture_task.cancel() | ||
| self._capture_task = None | ||
|
|
||
| def on_track_added(self, peer_id: str, track_id: str, is_video: bool): | ||
| if not is_video: | ||
| return | ||
|
|
||
| if peer_id not in self._peer_tracks: | ||
| self._peer_tracks[peer_id] = set() | ||
|
|
||
| self._peer_tracks[peer_id].add(track_id) | ||
| self._track_to_peer[track_id] = peer_id | ||
| print(f"Added video track {track_id} for peer {peer_id}") | ||
|
|
||
| def on_track_removed(self, peer_id: str, track_id: str, is_video: bool): | ||
| if not is_video: | ||
| return | ||
|
|
||
| if peer_id in self._peer_tracks: | ||
| self._peer_tracks[peer_id].discard(track_id) | ||
| self._track_to_peer.pop(track_id, None) | ||
| print(f"Removed video track {track_id} for peer {peer_id}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| import os | ||
|
|
||
| from google.genai.types import LiveConnectConfigDict, Modality | ||
|
|
||
| FISHJAM_ID = os.getenv("FISHJAM_ID", "") | ||
| FISHJAM_TOKEN = os.environ["FISHJAM_MANAGEMENT_TOKEN"] | ||
|
|
||
| MULTIMODAL_MODEL = "gemini-2.5-flash-native-audio-preview-12-2025" | ||
|
|
||
| MULTIMODAL_CONFIG: LiveConnectConfigDict = { | ||
| "response_modalities": [Modality.AUDIO], | ||
| "thinking_config": { | ||
| "include_thoughts": False, | ||
| }, | ||
| } | ||
|
|
||
| # Interval in seconds between image captures | ||
| IMAGE_CAPTURE_INTERVAL = float(os.getenv("IMAGE_CAPTURE_INTERVAL", "5.0")) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| from fishjam import FishjamNotifier | ||
| from fishjam.events import ( | ||
| ServerMessagePeerConnected, | ||
| ServerMessagePeerDisconnected, | ||
| ServerMessagePeerType, | ||
| ServerMessageTrackAdded, | ||
| ServerMessageTrackRemoved, | ||
| TrackType, | ||
| ) | ||
| from fishjam.events.allowed_notifications import AllowedNotification | ||
|
|
||
| from .config import FISHJAM_ID, FISHJAM_TOKEN | ||
| from .room import RoomService | ||
|
|
||
|
|
||
| def make_notifier(room_service: RoomService): | ||
| notifier = FishjamNotifier( | ||
| FISHJAM_ID, | ||
| FISHJAM_TOKEN, | ||
| ) | ||
|
|
||
| @notifier.on_server_notification | ||
| def _(notification: AllowedNotification): | ||
| match notification: | ||
| case ServerMessagePeerConnected( | ||
| peer_id=peer_id, | ||
| room_id=room_id, | ||
| peer_type=ServerMessagePeerType.PEER_TYPE_WEBRTC, | ||
| ): | ||
| handle_peer_connected(peer_id, room_id) | ||
|
|
||
| case ServerMessagePeerDisconnected( | ||
| peer_id=peer_id, | ||
| room_id=room_id, | ||
| peer_type=ServerMessagePeerType.PEER_TYPE_WEBRTC, | ||
| ): | ||
| handle_peer_disconnected(peer_id, room_id) | ||
|
|
||
| case ServerMessageTrackAdded( | ||
| peer_id=peer_id, | ||
| room_id=room_id, | ||
| track=track, | ||
| ): | ||
| handle_track_added(peer_id, room_id, track) | ||
|
|
||
| case ServerMessageTrackRemoved( | ||
| peer_id=peer_id, | ||
| room_id=room_id, | ||
| track=track, | ||
| ): | ||
| handle_track_removed(peer_id, room_id, track) | ||
|
|
||
| def handle_peer_connected(peer_id: str, room_id: str): | ||
| if room_id == room_service.room.id: | ||
| room_service.agent.on_peer_enter(peer_id) | ||
|
|
||
| def handle_peer_disconnected(peer_id: str, room_id: str): | ||
| if room_id == room_service.room.id: | ||
| room_service.agent.on_peer_leave(peer_id) | ||
|
|
||
| def handle_track_added(peer_id: str, room_id: str, track): | ||
| if room_id == room_service.room.id: | ||
| is_video = track.type == TrackType.TRACK_TYPE_VIDEO | ||
| room_service.agent.on_track_added(peer_id, track.id, is_video) | ||
|
|
||
| def handle_track_removed(peer_id: str, room_id: str, track): | ||
| if room_id == room_service.room.id: | ||
| is_video = track.type == TrackType.TRACK_TYPE_VIDEO | ||
| room_service.agent.on_track_removed(peer_id, track.id, is_video) | ||
|
|
||
| return notifier |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| from fishjam import AgentOptions, FishjamClient, Room | ||
| from fishjam.errors import NotFoundError | ||
| from fishjam.integrations.gemini import GeminiIntegration | ||
|
|
||
| from .agent import MultimodalAgent | ||
| from .config import FISHJAM_ID, FISHJAM_TOKEN | ||
| from .worker import BackgroundWorker | ||
|
|
||
| print("Fishjam ID", FISHJAM_ID) | ||
|
Qizot marked this conversation as resolved.
Outdated
|
||
| fishjam = FishjamClient(FISHJAM_ID, FISHJAM_TOKEN) | ||
|
|
||
|
|
||
| class RoomService: | ||
| def __init__(self, worker: BackgroundWorker): | ||
| self._worker = worker | ||
| self._create_room() | ||
|
|
||
| def get_room(self) -> Room: | ||
| try: | ||
| self.room = fishjam.get_room(self.room.id) | ||
| except NotFoundError: | ||
| self._create_room() | ||
| return self.room | ||
|
|
||
| def _create_room(self): | ||
| self.room = fishjam.create_room() | ||
| self._create_agent() | ||
|
|
||
| def _create_agent(self): | ||
| self.agent = MultimodalAgent( | ||
| self.room.id, | ||
| fishjam.create_agent( | ||
| self.room.id, | ||
| AgentOptions(output=GeminiIntegration.GEMINI_INPUT_AUDIO_SETTINGS), | ||
| ), | ||
| self._worker, | ||
| ) | ||
|
|
||
| def get_agent(self): | ||
| return self.agent | ||
|
Qizot marked this conversation as resolved.
Outdated
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.