11from __future__ import annotations
22
3+ import contextlib
34import io
45import logging
6+ import os
57import tempfile
68from urllib .parse import quote
79
810import aiohttp
911from nio import AsyncClient
12+ from nio .crypto .attachments import decrypt_attachment
13+
14+ _ = io # kept for backwards compat; pydub no longer used
1015
1116logger = logging .getLogger (__name__ )
1217
@@ -33,6 +38,13 @@ def _pydub_format(mime: str) -> str:
3338 return _MIME_TO_PYDUB .get (mime .lower ().split (";" )[0 ].strip (), "ogg" )
3439
3540
41+ def _input_suffix (mime : str ) -> str :
42+ fmt = _pydub_format (mime )
43+ return {"mp4" : ".m4a" , "webm" : ".webm" , "ogg" : ".ogg" ,
44+ "mp3" : ".mp3" , "flac" : ".flac" , "wav" : ".wav" ,
45+ "aac" : ".aac" }.get (fmt , ".bin" )
46+
47+
3648def _parse_mxc (mxc_url : str ) -> tuple [str , str ] | None :
3749 if not mxc_url .startswith ("mxc://" ):
3850 return None
@@ -49,8 +61,15 @@ def __init__(self, matrix: AsyncClient, max_bytes: int) -> None:
4961 self ._matrix = matrix
5062 self ._max_bytes = max_bytes
5163
52- async def download_mxc (self , mxc_url : str ) -> bytes | None :
53- """Download mxc:// URL via authenticated v1 endpoint with legacy fallback."""
64+ async def download_mxc (
65+ self ,
66+ mxc_url : str ,
67+ encrypted_file : dict | None = None ,
68+ ) -> bytes | None :
69+ """Download mxc:// URL and optionally decrypt an E2EE attachment.
70+
71+ Pass the content["file"] dict from the Matrix event for encrypted rooms.
72+ """
5473 parsed = _parse_mxc (mxc_url )
5574 if parsed is None :
5675 logger .warning ("Invalid mxc URL: %s" , mxc_url )
@@ -71,17 +90,54 @@ async def download_mxc(self, mxc_url: str) -> bytes | None:
7190 async with aiohttp .ClientSession () as session :
7291 for url in urls :
7392 try :
74- async with session .get (url , headers = headers ) as resp :
93+ async with session .get (url , headers = headers , allow_redirects = True ) as resp :
7594 if resp .status != 200 :
95+ logger .info ("Download endpoint %s returned %d" , url , resp .status )
7696 continue
7797 content_length = resp .content_length
7898 if content_length and content_length > self ._max_bytes :
7999 logger .warning ("Audio too large: %d bytes" , content_length )
80100 return None
81- data = await resp .content .read (self ._max_bytes + 1 )
101+ # Read full body, comparing against declared Content-Length to
102+ # catch truncated responses from Synapse's media replication.
103+ data = await resp .read ()
82104 if len (data ) > self ._max_bytes :
83105 logger .warning ("Audio exceeds max bytes after download" )
84106 return None
107+ if not data :
108+ logger .warning ("Empty body from %s" , url )
109+ continue
110+ if content_length and len (data ) < content_length :
111+ logger .warning (
112+ "Truncated response from %s: got %d / %d bytes" ,
113+ url , len (data ), content_length ,
114+ )
115+ continue
116+ content_type = resp .headers .get ("Content-Type" , "?" )
117+ logger .info (
118+ "Downloaded %d bytes from %s (Content-Type=%s, declared=%s)" ,
119+ len (data ), url , content_type , content_length ,
120+ )
121+ # Suspiciously small response that isn't audio — likely a JSON error
122+ if len (data ) < 1024 and not content_type .startswith ("audio/" ):
123+ preview = data [:512 ].decode ("utf-8" , errors = "replace" )
124+ logger .warning ("Non-audio short response body: %s" , preview )
125+ continue
126+ # Validate OGG signature when MIME claims ogg — Synapse sometimes
127+ # serves stub responses on cache miss
128+ if content_type .startswith ("audio/ogg" ) and not data .startswith (b"OggS" ):
129+ preview = data [:64 ].hex ()
130+ logger .warning (
131+ "Response claims ogg but missing OggS magic from %s: %s" ,
132+ url , preview ,
133+ )
134+ continue
135+ if encrypted_file :
136+ try :
137+ data = decrypt_attachment (data , encrypted_file )
138+ except Exception :
139+ logger .exception ("Failed to decrypt attachment mxc=%s" , mxc_url )
140+ return None
85141 return data
86142 except aiohttp .ClientError :
87143 logger .exception ("Download failed for %s" , url )
@@ -97,15 +153,37 @@ def convert_to_wav(self, audio_bytes: bytes, mime: str) -> str | None:
97153 Caller must delete the file (e.g. via os.unlink in a finally block).
98154 Blocking — must be called via run_in_executor.
99155 """
100- from pydub import AudioSegment
156+ import subprocess
157+
158+ # Write to disk first — ffmpeg's pipe input is unreliable with some
159+ # ogg/opus payloads on ffmpeg 7.x, while file input is rock-solid.
160+ with tempfile .NamedTemporaryFile (suffix = _input_suffix (mime ), delete = False ) as src :
161+ src .write (audio_bytes )
162+ src_path = src .name
163+
164+ dst_path = tempfile .NamedTemporaryFile (suffix = ".wav" , delete = False ).name
101165
102- fmt = _pydub_format (mime )
103166 try :
104- seg = AudioSegment .from_file (io .BytesIO (audio_bytes ), format = fmt )
105- seg = seg .set_frame_rate (16000 ).set_channels (1 ).set_sample_width (2 )
106- with tempfile .NamedTemporaryFile (suffix = ".wav" , delete = False ) as tmp :
107- seg .export (tmp .name , format = "wav" )
108- return tmp .name
109- except Exception :
110- logger .exception ("Audio conversion failed (mime=%s)" , mime )
111- return None
167+ result = subprocess .run (
168+ [
169+ "ffmpeg" , "-y" , "-hide_banner" , "-loglevel" , "error" ,
170+ "-i" , src_path ,
171+ "-ac" , "1" , "-ar" , "16000" , "-sample_fmt" , "s16" ,
172+ dst_path ,
173+ ],
174+ capture_output = True ,
175+ check = False ,
176+ )
177+ if result .returncode != 0 :
178+ logger .error (
179+ "ffmpeg failed (mime=%s, rc=%d): %s" ,
180+ mime , result .returncode ,
181+ result .stderr .decode ("utf-8" , errors = "replace" ).strip (),
182+ )
183+ with contextlib .suppress (OSError ):
184+ os .unlink (dst_path )
185+ return None
186+ return dst_path
187+ finally :
188+ with contextlib .suppress (OSError ):
189+ os .unlink (src_path )
0 commit comments