|
| 1 | +import { |
| 2 | + FishjamAgent, |
| 3 | + FishjamConfig, |
| 4 | + FishjamWSNotifier, |
| 5 | + FishjamClient, |
| 6 | + PeerConnected, |
| 7 | + PeerDisconnected, |
| 8 | + PeerId, |
| 9 | + RoomId, |
| 10 | + TrackAdded, |
| 11 | + TrackRemoved, |
| 12 | + TrackId, |
| 13 | + IncomingTrackData, |
| 14 | + IncomingTrackImage, |
| 15 | +} from '@fishjam-cloud/js-server-sdk'; |
| 16 | +import GeminiIntegration from '@fishjam-cloud/js-server-sdk/gemini'; |
| 17 | +import { GoogleGenAI, LiveServerMessage, Modality, Session } from '@google/genai'; |
| 18 | +import { MULTIMODAL_MODEL, CAPTURE_INTERVAL_MS } from '../const'; |
| 19 | + |
| 20 | +type AgentState = { |
| 21 | + agent: FishjamAgent; |
| 22 | + outputTrackId: TrackId; |
| 23 | +}; |
| 24 | + |
| 25 | +export class MultimodalService { |
| 26 | + peerSessions: Map<PeerId, Session> = new Map(); |
| 27 | + agents: Map<RoomId, AgentState> = new Map(); |
| 28 | + videoTracks: Map<RoomId, Set<TrackId>> = new Map(); |
| 29 | + captureIntervals: Map<RoomId, ReturnType<typeof setInterval>> = new Map(); |
| 30 | + ai: GoogleGenAI; |
| 31 | + fishjamConfig: FishjamConfig; |
| 32 | + fishjamClient: FishjamClient; |
| 33 | + |
| 34 | + constructor(fishjamConfig: FishjamConfig, geminiKey: string) { |
| 35 | + this.ai = GeminiIntegration.createClient({ apiKey: geminiKey }); |
| 36 | + this.fishjamConfig = fishjamConfig; |
| 37 | + this.fishjamClient = new FishjamClient(fishjamConfig); |
| 38 | + this.initFishjam(); |
| 39 | + } |
| 40 | + |
| 41 | + private initFishjam() { |
| 42 | + const notifier = new FishjamWSNotifier( |
| 43 | + this.fishjamConfig, |
| 44 | + (error) => console.error('Fishjam websocket error: %O', error), |
| 45 | + (code, reason) => console.log(`Fishjam websocket closed. code: ${code}, reason: ${reason}`) |
| 46 | + ); |
| 47 | + |
| 48 | + notifier.on('peerConnected', (msg) => this.handlePeerConnected(msg)); |
| 49 | + notifier.on('peerDisconnected', (msg) => this.handlePeerDisconnected(msg)); |
| 50 | + notifier.on('trackAdded', (msg) => this.handleTrackAdded(msg)); |
| 51 | + notifier.on('trackRemoved', (msg) => this.handleTrackRemoved(msg)); |
| 52 | + } |
| 53 | + |
| 54 | + async handlePeerConnected(message: PeerConnected) { |
| 55 | + if (message.peerType === 2) return; |
| 56 | + |
| 57 | + console.log('Peer connected: %O', message); |
| 58 | + |
| 59 | + const peerId = message.peerId; |
| 60 | + const agentState = this.agents.get(message.roomId); |
| 61 | + |
| 62 | + if (agentState && peerId === (agentState as { agent: FishjamAgent }).agent.constructor.name) return; |
| 63 | + |
| 64 | + if (agentState == undefined) { |
| 65 | + const { |
| 66 | + peer: { id: newAgentId }, |
| 67 | + agent, |
| 68 | + } = await this.fishjamClient.createAgent( |
| 69 | + message.roomId, |
| 70 | + { output: GeminiIntegration.geminiInputAudioSettings }, |
| 71 | + { |
| 72 | + onClose: (code, reason) => console.log(`Fishjam agent websocket closed. code: ${code}, reason: ${reason}`), |
| 73 | + onError: (error) => console.error('Fishjam agent websocket error: %O', error), |
| 74 | + } |
| 75 | + ); |
| 76 | + |
| 77 | + const outputTrack = agent.createTrack(GeminiIntegration.geminiOutputAudioSettings); |
| 78 | + |
| 79 | + this.agents.set(message.roomId, { agent, outputTrackId: outputTrack.id }); |
| 80 | + this.videoTracks.set(message.roomId, new Set()); |
| 81 | + |
| 82 | + agent.on('trackData', (msg) => this.handleTrackData(msg)); |
| 83 | + agent.on('trackImage', (msg) => this.handleTrackImage(message.roomId, msg)); |
| 84 | + |
| 85 | + this.startImageCapture(message.roomId); |
| 86 | + |
| 87 | + console.log(`Agent ${newAgentId} created`); |
| 88 | + } |
| 89 | + |
| 90 | + const session = await this.ai.live.connect({ |
| 91 | + model: MULTIMODAL_MODEL, |
| 92 | + config: { |
| 93 | + responseModalities: [Modality.AUDIO], |
| 94 | + }, |
| 95 | + callbacks: { |
| 96 | + onopen: () => console.log(`Connected to Gemini Live API for peer ${peerId}.`), |
| 97 | + onerror: (error) => console.error(`Gemini error for peer ${peerId}: %O`, error), |
| 98 | + onclose: (e) => |
| 99 | + console.log(`Connection to Gemini Live API for peer ${peerId} closed. code: ${e.code}, reason: ${e.reason}`), |
| 100 | + onmessage: (msg) => this.handleGeminiMessage(message.roomId, peerId, msg), |
| 101 | + }, |
| 102 | + }); |
| 103 | + this.peerSessions.set(peerId, session); |
| 104 | + } |
| 105 | + |
| 106 | + async handlePeerDisconnected(message: PeerDisconnected) { |
| 107 | + const agentState = this.agents.get(message.roomId); |
| 108 | + if (agentState) { |
| 109 | + // Check if the disconnecting peer is the agent itself |
| 110 | + const room = await this.fishjamClient.getRoom(message.roomId); |
| 111 | + const isAgent = room.peers.every((peer) => peer.id !== message.peerId); |
| 112 | + if (isAgent) return this.handleAgentDisconnected(message); |
| 113 | + } |
| 114 | + |
| 115 | + this.handleWebrtcPeerDisconnected(message); |
| 116 | + } |
| 117 | + |
| 118 | + handleAgentDisconnected(message: PeerDisconnected) { |
| 119 | + console.log(`Agent ${message.peerId} disconnected`); |
| 120 | + |
| 121 | + this.stopImageCapture(message.roomId); |
| 122 | + this.agents.delete(message.roomId); |
| 123 | + this.videoTracks.delete(message.roomId); |
| 124 | + } |
| 125 | + |
| 126 | + async handleWebrtcPeerDisconnected(message: PeerDisconnected) { |
| 127 | + console.log('Peer disconnected: %O', message); |
| 128 | + |
| 129 | + const peerId = message.peerId; |
| 130 | + const session = this.peerSessions.get(peerId); |
| 131 | + session?.close(); |
| 132 | + this.peerSessions.delete(peerId); |
| 133 | + |
| 134 | + const room = await this.fishjamClient.getRoom(message.roomId); |
| 135 | + const activePeers = room.peers.filter((peer) => peer.status === 'connected'); |
| 136 | + if (activePeers.length === 1) { |
| 137 | + console.log('Last peer left room, removing agent'); |
| 138 | + this.stopImageCapture(message.roomId); |
| 139 | + await this.fishjamClient.deletePeer(message.roomId, activePeers[0].id); |
| 140 | + } |
| 141 | + } |
| 142 | + |
| 143 | + handleTrackAdded(message: TrackAdded) { |
| 144 | + if (!message.track || message.track.type !== 1) return; |
| 145 | + |
| 146 | + const trackId = message.track.id as TrackId; |
| 147 | + const tracks = this.videoTracks.get(message.roomId); |
| 148 | + if (tracks) { |
| 149 | + tracks.add(trackId); |
| 150 | + console.log(`Video track ${trackId} added in room ${message.roomId}`); |
| 151 | + } |
| 152 | + } |
| 153 | + |
| 154 | + handleTrackRemoved(message: TrackRemoved) { |
| 155 | + if (!message.track) return; |
| 156 | + |
| 157 | + const trackId = message.track.id as TrackId; |
| 158 | + const tracks = this.videoTracks.get(message.roomId); |
| 159 | + if (tracks) { |
| 160 | + tracks.delete(trackId); |
| 161 | + console.log(`Video track ${trackId} removed from room ${message.roomId}`); |
| 162 | + } |
| 163 | + } |
| 164 | + |
| 165 | + handleTrackData(message: IncomingTrackData) { |
| 166 | + const { data, peerId } = message; |
| 167 | + const session = this.peerSessions.get(peerId); |
| 168 | + |
| 169 | + session?.sendRealtimeInput({ |
| 170 | + audio: { |
| 171 | + data: data.toBase64(), |
| 172 | + mimeType: GeminiIntegration.inputMimeType, |
| 173 | + }, |
| 174 | + }); |
| 175 | + } |
| 176 | + |
| 177 | + handleTrackImage(roomId: RoomId, message: IncomingTrackImage) { |
| 178 | + const { contentType, data } = message; |
| 179 | + |
| 180 | + for (const [peerId, session] of this.peerSessions) { |
| 181 | + session.sendRealtimeInput({ |
| 182 | + media: { |
| 183 | + data: Buffer.from(data).toString('base64'), |
| 184 | + mimeType: contentType, |
| 185 | + }, |
| 186 | + }); |
| 187 | + } |
| 188 | + } |
| 189 | + |
| 190 | + handleGeminiMessage(roomId: RoomId, peerId: PeerId, msg: LiveServerMessage) { |
| 191 | + const agentState = this.agents.get(roomId); |
| 192 | + if (!agentState) return; |
| 193 | + |
| 194 | + const audioData = msg.serverContent?.modelTurn?.parts?.[0]?.inlineData; |
| 195 | + if (audioData?.data) { |
| 196 | + const buffer = Buffer.from(audioData.data, 'base64'); |
| 197 | + agentState.agent.sendData(agentState.outputTrackId, new Uint8Array(buffer)); |
| 198 | + } |
| 199 | + |
| 200 | + const transcription = msg.serverContent?.inputTranscription?.text; |
| 201 | + if (transcription) console.log(`Peer ${peerId} said: "${transcription}".`); |
| 202 | + } |
| 203 | + |
| 204 | + private startImageCapture(roomId: RoomId) { |
| 205 | + const interval = setInterval(() => { |
| 206 | + const agentState = this.agents.get(roomId); |
| 207 | + const tracks = this.videoTracks.get(roomId); |
| 208 | + |
| 209 | + if (!agentState || !tracks || tracks.size === 0) return; |
| 210 | + |
| 211 | + for (const trackId of tracks) { |
| 212 | + console.log('Sending image capture request for track', trackId); |
| 213 | + agentState.agent.captureImage(trackId); |
| 214 | + } |
| 215 | + }, CAPTURE_INTERVAL_MS); |
| 216 | + |
| 217 | + this.captureIntervals.set(roomId, interval); |
| 218 | + } |
| 219 | + |
| 220 | + private stopImageCapture(roomId: RoomId) { |
| 221 | + const interval = this.captureIntervals.get(roomId); |
| 222 | + if (interval) { |
| 223 | + clearInterval(interval); |
| 224 | + this.captureIntervals.delete(roomId); |
| 225 | + } |
| 226 | + } |
| 227 | +} |
0 commit comments