diff --git a/Notes-Audio.docx b/Notes-Audio.docx new file mode 100644 index 00000000..d2c20514 Binary files /dev/null and b/Notes-Audio.docx differ diff --git a/samples/cs/GettingStarted/cross-platform/LiveAudioTranscriptionExample/LiveAudioTranscriptionExample.csproj b/samples/cs/GettingStarted/cross-platform/LiveAudioTranscriptionExample/LiveAudioTranscriptionExample.csproj new file mode 100644 index 00000000..ad6086f5 --- /dev/null +++ b/samples/cs/GettingStarted/cross-platform/LiveAudioTranscriptionExample/LiveAudioTranscriptionExample.csproj @@ -0,0 +1,32 @@ + + + + Exe + net9.0 + enable + enable + + + + $(NETCoreSdkRuntimeIdentifier) + + + + + + + + + + + + + + + + + + + + + diff --git a/samples/cs/GettingStarted/src/LiveAudioTranscriptionExample/Program.cs b/samples/cs/GettingStarted/src/LiveAudioTranscriptionExample/Program.cs new file mode 100644 index 00000000..d6e812e3 --- /dev/null +++ b/samples/cs/GettingStarted/src/LiveAudioTranscriptionExample/Program.cs @@ -0,0 +1,105 @@ +// Live Audio Transcription — Foundry Local SDK Example +// +// Demonstrates real-time microphone-to-text using: +// SDK (FoundryLocalManager) → Core (NativeAOT DLL) → onnxruntime-genai (StreamingProcessor) + +using Microsoft.AI.Foundry.Local; +using NAudio.Wave; + +Console.WriteLine("==========================================================="); +Console.WriteLine(" Foundry Local -- Live Audio Transcription Demo"); +Console.WriteLine("==========================================================="); +Console.WriteLine(); + +var config = new Configuration +{ + AppName = "foundry_local_samples", + LogLevel = Microsoft.AI.Foundry.Local.LogLevel.Information +}; + +await FoundryLocalManager.CreateAsync(config, Utils.GetAppLogger()); +var mgr = FoundryLocalManager.Instance; + +await Utils.RunWithSpinner("Registering execution providers", mgr.EnsureEpsDownloadedAsync()); + +var catalog = await mgr.GetCatalogAsync(); + +var model = await catalog.GetModelAsync("nemotron") ?? throw new Exception("Model \"nemotron\" not found in catalog"); + +await model.DownloadAsync(progress => +{ + Console.Write($"\rDownloading model: {progress:F2}%"); + if (progress >= 100f) + { + Console.WriteLine(); + } +}); + +Console.Write($"Loading model {model.Id}..."); +await model.LoadAsync(); +Console.WriteLine("done."); + +var audioClient = await model.GetAudioClientAsync(); +var session = audioClient.CreateLiveTranscriptionSession(); +session.Settings.SampleRate = 16000; +session.Settings.Channels = 1; +session.Settings.Language = "en"; + +await session.StartAsync(); +Console.WriteLine(" Session started"); + +var readTask = Task.Run(async () => +{ + try + { + await foreach (var result in session.GetTranscriptionStream()) + { + if (result.IsFinal) + { + Console.WriteLine(); + Console.WriteLine($" [FINAL] {result.Text}"); + Console.Out.Flush(); + } + else if (!string.IsNullOrEmpty(result.Text)) + { + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write(result.Text); + Console.ResetColor(); + Console.Out.Flush(); + } + } + } + catch (OperationCanceledException) { } +}); + +using var waveIn = new WaveInEvent +{ + WaveFormat = new WaveFormat(rate: 16000, bits: 16, channels: 1), + BufferMilliseconds = 100 +}; + +waveIn.DataAvailable += (sender, e) => +{ + if (e.BytesRecorded > 0) + { + _ = session.AppendAsync(new ReadOnlyMemory(e.Buffer, 0, e.BytesRecorded)); + } +}; + +Console.WriteLine(); +Console.WriteLine("==========================================================="); +Console.WriteLine(" LIVE TRANSCRIPTION ACTIVE"); +Console.WriteLine(" Speak into your microphone."); +Console.WriteLine(" Transcription appears in real-time (cyan text)."); +Console.WriteLine(" Press ENTER to stop recording."); +Console.WriteLine("==========================================================="); +Console.WriteLine(); + +waveIn.StartRecording(); +Console.ReadLine(); +waveIn.StopRecording(); + +await session.StopAsync(); +await readTask; + +await model.UnloadAsync(); diff --git a/samples/cs/GettingStarted/windows/LiveAudioTranscriptionExample/LiveAudioTranscriptionExample.csproj b/samples/cs/GettingStarted/windows/LiveAudioTranscriptionExample/LiveAudioTranscriptionExample.csproj new file mode 100644 index 00000000..b4489af2 --- /dev/null +++ b/samples/cs/GettingStarted/windows/LiveAudioTranscriptionExample/LiveAudioTranscriptionExample.csproj @@ -0,0 +1,30 @@ + + + + Exe + enable + enable + + net9.0-windows10.0.26100 + false + ARM64;x64 + None + false + + + + $(NETCoreSdkRuntimeIdentifier) + + + + + + + + + + + + + + diff --git a/sdk/cs/README.md b/sdk/cs/README.md index f58e41e0..48f37d05 100644 --- a/sdk/cs/README.md +++ b/sdk/cs/README.md @@ -233,6 +233,64 @@ audioClient.Settings.Language = "en"; audioClient.Settings.Temperature = 0.0f; ``` +### Live Audio Transcription (Real-Time Streaming) + +For real-time microphone-to-text transcription, use `CreateLiveTranscriptionSession()`. Audio is pushed as raw PCM chunks and transcription results stream back as an `IAsyncEnumerable`. + +The streaming result type (`LiveAudioTranscriptionResponse`) extends `AudioCreateTranscriptionResponse` from the Betalgo OpenAI SDK, so it's compatible with the file-based transcription output format while adding streaming-specific fields. + +```csharp +var audioClient = await model.GetAudioClientAsync(); +var session = audioClient.CreateLiveTranscriptionSession(); + +// Configure audio format (must be set before StartAsync) +session.Settings.SampleRate = 16000; +session.Settings.Channels = 1; +session.Settings.Language = "en"; + +await session.StartAsync(); + +// Push audio from a microphone callback (thread-safe) +waveIn.DataAvailable += (sender, e) => +{ + _ = session.AppendAsync(new ReadOnlyMemory(e.Buffer, 0, e.BytesRecorded)); +}; + +// Read transcription results as they arrive +await foreach (var result in session.GetTranscriptionStream()) +{ + // result inherits from AudioCreateTranscriptionResponse + // - result.Text — incremental transcribed text (per chunk, not accumulated) + // - result.IsFinal — true for final results, false for interim hypotheses + // - result.Segments — segment-level timing data (Start/End in seconds) + // - result.Language — language code + Console.Write(result.Text); +} + +await session.StopAsync(); +``` + +#### Output Type + +| Field | Type | Description | +|-------|------|-------------| +| `Text` | `string` | Transcribed text from this audio chunk (inherited from `AudioCreateTranscriptionResponse`) | +| `IsFinal` | `bool` | Whether this is a final or interim result. Nemotron always returns `true`. | +| `Language` | `string` | Language code (inherited) | +| `Duration` | `float` | Audio duration in seconds (inherited) | +| `Segments` | `List` | Segment timing with `Start`/`End` offsets (inherited) | +| `Words` | `List` | Word-level timing (inherited, when available) | + +#### Session Lifecycle + +| Method | Description | +|--------|-------------| +| `StartAsync()` | Initialize the streaming session. Settings are frozen after this call. | +| `AppendAsync(pcmData)` | Push a chunk of raw PCM audio. Thread-safe (bounded internal queue). | +| `GetTranscriptionStream()` | Async enumerable of transcription results. | +| `StopAsync()` | Signal end-of-audio, flush remaining audio, and clean up. | +| `DisposeAsync()` | Calls `StopAsync` if needed. Use `await using` for automatic cleanup. | + ### Web Service Start an OpenAI-compatible REST endpoint for use by external tools or processes: @@ -297,6 +355,8 @@ Key types: | [`ModelVariant`](./docs/api/microsoft.ai.foundry.local.modelvariant.md) | Specific model variant (hardware/quantization) | | [`OpenAIChatClient`](./docs/api/microsoft.ai.foundry.local.openaichatclient.md) | Chat completions (sync + streaming) | | [`OpenAIAudioClient`](./docs/api/microsoft.ai.foundry.local.openaiaudioclient.md) | Audio transcription (sync + streaming) | +| [`LiveAudioTranscriptionSession`](./docs/api/microsoft.ai.foundry.local.openai.liveaudiotranscriptionsession.md) | Real-time audio streaming session | +| [`LiveAudioTranscriptionResponse`](./docs/api/microsoft.ai.foundry.local.openai.liveaudiotranscriptionresponse.md) | Streaming transcription result (extends `AudioCreateTranscriptionResponse`) | | [`ModelInfo`](./docs/api/microsoft.ai.foundry.local.modelinfo.md) | Full model metadata record | ## Tests diff --git a/sdk/cs/src/Detail/CoreInterop.cs b/sdk/cs/src/Detail/CoreInterop.cs index 8411473b..c5eba7ec 100644 --- a/sdk/cs/src/Detail/CoreInterop.cs +++ b/sdk/cs/src/Detail/CoreInterop.cs @@ -158,6 +158,31 @@ private static unsafe partial void CoreExecuteCommandWithCallback(RequestBuffer* nint callbackPtr, // NativeCallbackFn pointer nint userData); + [LibraryImport(LibraryName, EntryPoint = "execute_command_with_binary")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreExecuteCommandWithBinary(StreamingRequestBuffer* nativeRequest, + ResponseBuffer* nativeResponse); + + // --- Audio streaming P/Invoke imports (kept for future dedicated entry points) --- + + [LibraryImport(LibraryName, EntryPoint = "audio_stream_start")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreAudioStreamStart( + RequestBuffer* request, + ResponseBuffer* response); + + [LibraryImport(LibraryName, EntryPoint = "audio_stream_push")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreAudioStreamPush( + StreamingRequestBuffer* request, + ResponseBuffer* response); + + [LibraryImport(LibraryName, EntryPoint = "audio_stream_stop")] + [UnmanagedCallConv(CallConvs = new[] { typeof(System.Runtime.CompilerServices.CallConvCdecl) })] + private static unsafe partial void CoreAudioStreamStop( + RequestBuffer* request, + ResponseBuffer* response); + // helper to capture exceptions in callbacks internal class CallbackHelper { @@ -331,4 +356,94 @@ public Task ExecuteCommandWithCallbackAsync(string commandName, CoreIn return Task.Run(() => ExecuteCommandWithCallback(commandName, commandInput, callback), ct); } + /// + /// Marshal a ResponseBuffer from unmanaged memory into a managed Response and free the unmanaged memory. + /// + private Response MarshalResponse(ResponseBuffer response) + { + Response result = new(); + + if (response.Data != IntPtr.Zero && response.DataLength > 0) + { + byte[] managedResponse = new byte[response.DataLength]; + Marshal.Copy(response.Data, managedResponse, 0, response.DataLength); + result.Data = System.Text.Encoding.UTF8.GetString(managedResponse); + } + + if (response.Error != IntPtr.Zero && response.ErrorLength > 0) + { + result.Error = Marshal.PtrToStringUTF8(response.Error, response.ErrorLength)!; + } + + Marshal.FreeHGlobal(response.Data); + Marshal.FreeHGlobal(response.Error); + + return result; + } + + // --- Audio streaming managed implementations --- + // Route through the existing execute_command / execute_command_with_binary entry points. + // The Core handles audio_stream_start / audio_stream_stop as command cases in ExecuteCommandManaged, + // and audio_stream_push as a command case in ExecuteCommandWithBinaryManaged. + + public Response StartAudioStream(CoreInteropRequest request) + { + return ExecuteCommand("audio_stream_start", request); + } + + public Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory audioData) + { + try + { + var commandInputJson = request.ToJson(); + byte[] commandBytes = System.Text.Encoding.UTF8.GetBytes("audio_stream_push"); + byte[] inputBytes = System.Text.Encoding.UTF8.GetBytes(commandInputJson); + + IntPtr commandPtr = Marshal.AllocHGlobal(commandBytes.Length); + Marshal.Copy(commandBytes, 0, commandPtr, commandBytes.Length); + + IntPtr inputPtr = Marshal.AllocHGlobal(inputBytes.Length); + Marshal.Copy(inputBytes, 0, inputPtr, inputBytes.Length); + + // Pin the managed audio data so GC won't move it during the native call + using var audioHandle = audioData.Pin(); + + unsafe + { + var reqBuf = new StreamingRequestBuffer + { + Command = commandPtr, + CommandLength = commandBytes.Length, + Data = inputPtr, + DataLength = inputBytes.Length, + BinaryData = (nint)audioHandle.Pointer, + BinaryDataLength = audioData.Length + }; + + ResponseBuffer response = default; + + try + { + CoreExecuteCommandWithBinary(&reqBuf, &response); + } + finally + { + Marshal.FreeHGlobal(commandPtr); + Marshal.FreeHGlobal(inputPtr); + } + + return MarshalResponse(response); + } + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + throw new FoundryLocalException("Error executing audio_stream_push", ex, _logger); + } + } + + public Response StopAudioStream(CoreInteropRequest request) + { + return ExecuteCommand("audio_stream_stop", request); + } + } diff --git a/sdk/cs/src/Detail/ICoreInterop.cs b/sdk/cs/src/Detail/ICoreInterop.cs index 1fff9dde..b493dfb7 100644 --- a/sdk/cs/src/Detail/ICoreInterop.cs +++ b/sdk/cs/src/Detail/ICoreInterop.cs @@ -51,4 +51,21 @@ Task ExecuteCommandAsync(string commandName, CoreInteropRequest? comma Task ExecuteCommandWithCallbackAsync(string commandName, CoreInteropRequest? commandInput, CallbackFn callback, CancellationToken? ct = null); + + // --- Audio streaming session support --- + + [StructLayout(LayoutKind.Sequential)] + protected unsafe struct StreamingRequestBuffer + { + public nint Command; + public int CommandLength; + public nint Data; // JSON params + public int DataLength; + public nint BinaryData; // raw PCM audio bytes + public int BinaryDataLength; + } + + Response StartAudioStream(CoreInteropRequest request); + Response PushAudioData(CoreInteropRequest request, ReadOnlyMemory audioData); + Response StopAudioStream(CoreInteropRequest request); } diff --git a/sdk/cs/src/Detail/JsonSerializationContext.cs b/sdk/cs/src/Detail/JsonSerializationContext.cs index 894f9454..ea5f5c21 100644 --- a/sdk/cs/src/Detail/JsonSerializationContext.cs +++ b/sdk/cs/src/Detail/JsonSerializationContext.cs @@ -33,6 +33,11 @@ namespace Microsoft.AI.Foundry.Local.Detail; [JsonSerializable(typeof(IList))] [JsonSerializable(typeof(PropertyDefinition))] [JsonSerializable(typeof(IList))] +// --- Audio streaming types --- +[JsonSerializable(typeof(LiveAudioTranscriptionResponse))] +[JsonSerializable(typeof(LiveAudioTranscriptionRaw))] +[JsonSerializable(typeof(CoreErrorResponse))] +[JsonSerializable(typeof(AudioCreateTranscriptionResponse.Segment))] [JsonSourceGenerationOptions(DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, WriteIndented = false)] internal partial class JsonSerializationContext : JsonSerializerContext diff --git a/sdk/cs/src/Microsoft.AI.Foundry.Local.csproj b/sdk/cs/src/Microsoft.AI.Foundry.Local.csproj index 905f9652..9f203a9b 100644 --- a/sdk/cs/src/Microsoft.AI.Foundry.Local.csproj +++ b/sdk/cs/src/Microsoft.AI.Foundry.Local.csproj @@ -99,8 +99,8 @@ $(FoundryLocalCoreVersion) - 0.9.0.8-rc3 - 0.9.0.8-rc3 + 0.9.0-dev + 0.9.0-dev True diff --git a/sdk/cs/src/OpenAI/AudioClient.cs b/sdk/cs/src/OpenAI/AudioClient.cs index 564858f3..a8cbc1d7 100644 --- a/sdk/cs/src/OpenAI/AudioClient.cs +++ b/sdk/cs/src/OpenAI/AudioClient.cs @@ -8,7 +8,6 @@ namespace Microsoft.AI.Foundry.Local; using System.Runtime.CompilerServices; using System.Threading.Channels; - using Betalgo.Ranul.OpenAI.ObjectModels.RequestModels; using Betalgo.Ranul.OpenAI.ObjectModels.ResponseModels; @@ -85,6 +84,16 @@ public async IAsyncEnumerable TranscribeAudioS } } + /// + /// Create a real-time streaming transcription session. + /// Audio data is pushed in as PCM chunks and transcription results are returned as an async stream. + /// + /// A streaming session that must be disposed when done. + public LiveAudioTranscriptionSession CreateLiveTranscriptionSession() + { + return new LiveAudioTranscriptionSession(_modelId); + } + private async Task TranscribeAudioImplAsync(string audioFilePath, CancellationToken? ct) { diff --git a/sdk/cs/src/OpenAI/ChatClient.cs b/sdk/cs/src/OpenAI/ChatClient.cs index b9f889f2..e237ee31 100644 --- a/sdk/cs/src/OpenAI/ChatClient.cs +++ b/sdk/cs/src/OpenAI/ChatClient.cs @@ -171,7 +171,7 @@ private async IAsyncEnumerable ChatStreamingImplAs { var failed = false; - var response = await _coreInterop.ExecuteCommandWithCallbackAsync( + await _coreInterop.ExecuteCommandWithCallbackAsync( "chat_completions", request, async (callbackData) => @@ -196,17 +196,6 @@ private async IAsyncEnumerable ChatStreamingImplAs ct ).ConfigureAwait(false); - // If the native layer returned an error (e.g. missing model, invalid input) - // without invoking any callbacks, propagate it so the caller sees an exception - // instead of an empty stream. - if (!failed && response.Error != null) - { - channel.Writer.TryComplete( - new FoundryLocalException($"Error from chat_completions command: {response.Error}", _logger)); - failed = true; - return; - } - // use TryComplete as an exception in the callback may have already closed the channel _ = channel.Writer.TryComplete(); } diff --git a/sdk/cs/src/OpenAI/LiveAudioTranscriptionClient.cs b/sdk/cs/src/OpenAI/LiveAudioTranscriptionClient.cs new file mode 100644 index 00000000..4b4b6d9a --- /dev/null +++ b/sdk/cs/src/OpenAI/LiveAudioTranscriptionClient.cs @@ -0,0 +1,381 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// Copyright (c) Microsoft. All rights reserved. +// +// -------------------------------------------------------------------------------------------------------------------- + +namespace Microsoft.AI.Foundry.Local.OpenAI; + +using System.Runtime.CompilerServices; +using System.Globalization; +using System.Threading.Channels; +using Microsoft.AI.Foundry.Local; +using Microsoft.AI.Foundry.Local.Detail; +using Microsoft.Extensions.Logging; + +/// +/// Session for real-time audio streaming ASR (Automatic Speech Recognition). +/// Audio data from a microphone (or other source) is pushed in as PCM chunks, +/// and transcription results are returned as an async stream. +/// +/// Created via . +/// +/// Thread safety: AppendAsync can be called from any thread (including high-frequency +/// audio callbacks). Pushes are internally serialized via a bounded channel to prevent +/// unbounded memory growth and ensure ordering. +/// + +public sealed class LiveAudioTranscriptionSession : IAsyncDisposable +{ + private readonly string _modelId; + private readonly ICoreInterop _coreInterop = FoundryLocalManager.Instance.CoreInterop; + private readonly ILogger _logger = FoundryLocalManager.Instance.Logger; + + // Session state — protected by _lock + private readonly AsyncLock _lock = new(); + private string? _sessionHandle; + private bool _started; + private bool _stopped; + + // Output channel: native callback writes, user reads via GetTranscriptionStream + private Channel? _outputChannel; + + // Internal push queue: user writes audio chunks, background loop drains to native core. + // Bounded to prevent unbounded memory growth if native core is slower than real-time. + private Channel>? _pushChannel; + private Task? _pushLoopTask; + + // Dedicated CTS for the push loop — decoupled from StartAsync's caller token. + // Cancelled only during StopAsync/DisposeAsync to allow clean drain. + private CancellationTokenSource? _sessionCts; + + // Snapshot of settings captured at StartAsync — prevents mutation after session starts. + private LiveAudioTranscriptionOptions? _activeSettings; + + /// + /// Audio format settings for the streaming session. + /// Must be configured before calling . + /// Settings are frozen once the session starts. + /// + public record LiveAudioTranscriptionOptions + { + /// PCM sample rate in Hz. Default: 16000. + public int SampleRate { get; set; } = 16000; + + /// Number of audio channels. Default: 1 (mono). + public int Channels { get; set; } = 1; + + /// Optional BCP-47 language hint (e.g., "en", "zh"). + public string? Language { get; set; } + + /// + /// Maximum number of audio chunks buffered in the internal push queue. + /// If the queue is full, AppendAsync will asynchronously wait. + /// Default: 100 (~3 seconds of audio at typical chunk sizes). + /// + public int PushQueueCapacity { get; set; } = 100; + + internal LiveAudioTranscriptionOptions Snapshot() => this with { }; // record copy + } + + public LiveAudioTranscriptionOptions Settings { get; } = new(); + + internal LiveAudioTranscriptionSession(string modelId) + { + _modelId = modelId; + } + + /// + /// Start a real-time audio streaming session. + /// Must be called before or . + /// Settings are frozen after this call. + /// + /// Cancellation token. + public async Task StartAsync(CancellationToken ct = default) + { + using var disposable = await _lock.LockAsync().ConfigureAwait(false); + + if (_started) + { + throw new FoundryLocalException("Streaming session already started. Call StopAsync first."); + } + + // Freeze settings + _activeSettings = Settings.Snapshot(); + + _outputChannel = Channel.CreateUnbounded( + new UnboundedChannelOptions + { + SingleWriter = true, // only the native callback writes + SingleReader = true, + AllowSynchronousContinuations = true + }); + + _pushChannel = Channel.CreateBounded>( + new BoundedChannelOptions(_activeSettings.PushQueueCapacity) + { + SingleReader = true, // only the push loop reads + SingleWriter = false, // multiple threads may push audio data + FullMode = BoundedChannelFullMode.Wait + }); + + var request = new CoreInteropRequest + { + Params = new Dictionary + { + { "Model", _modelId }, + { "SampleRate", _activeSettings.SampleRate.ToString(CultureInfo.InvariantCulture) }, + { "Channels", _activeSettings.Channels.ToString(CultureInfo.InvariantCulture) }, + } + }; + + if (_activeSettings.Language != null) + { + request.Params["Language"] = _activeSettings.Language; + } + + // StartAudioStream uses existing execute_command entry point — synchronous P/Invoke + var response = await Task.Run( + () => _coreInterop.StartAudioStream(request), ct) + .ConfigureAwait(false); + + if (response.Error != null) + { + _outputChannel.Writer.TryComplete(); + throw new FoundryLocalException( + $"Error starting audio stream session: {response.Error}", _logger); + } + + _sessionHandle = response.Data + ?? throw new FoundryLocalException("Native core did not return a session handle.", _logger); + _started = true; + _stopped = false; + + _sessionCts?.Dispose(); + _sessionCts = new CancellationTokenSource(); +#pragma warning disable IDISP013 // Await in using — Task.Run is intentionally fire-and-forget here + _pushLoopTask = Task.Run(() => PushLoopAsync(_sessionCts.Token), CancellationToken.None); +#pragma warning restore IDISP013 + } + + /// + /// Push a chunk of raw PCM audio data to the streaming session. + /// Can be called from any thread (including audio device callbacks). + /// Chunks are internally queued and serialized to the native core. + /// + /// Raw PCM audio bytes matching the configured format. + /// Cancellation token. + public async ValueTask AppendAsync(ReadOnlyMemory pcmData, CancellationToken ct = default) + { + if (!_started || _stopped) + { + throw new FoundryLocalException("No active streaming session. Call StartAsync first."); + } + + // Copy the data to avoid issues if the caller reuses the buffer (e.g. NAudio reuses e.Buffer) + var copy = new byte[pcmData.Length]; + pcmData.CopyTo(copy); + + await _pushChannel!.Writer.WriteAsync(copy, ct).ConfigureAwait(false); + } + + /// + /// Internal loop that drains the push queue and sends chunks to native core one at a time. + /// Terminates the session on any native error. + /// + private async Task PushLoopAsync(CancellationToken ct) + { + try + { + await foreach (var audioData in _pushChannel!.Reader.ReadAllAsync(ct).ConfigureAwait(false)) + { + var request = new CoreInteropRequest + { + Params = new Dictionary { { "SessionHandle", _sessionHandle! } } + }; + + var response = _coreInterop.PushAudioData(request, audioData); + + if (response.Error != null) + { + var errorInfo = CoreErrorResponse.TryParse(response.Error); + var fatalEx = new FoundryLocalException( + $"Push failed (code={errorInfo?.Code ?? "UNKNOWN"}): {response.Error}", + _logger); + _logger.LogError("Terminating push loop due to push failure: {Error}", + response.Error); + _outputChannel?.Writer.TryComplete(fatalEx); + return; + } + + // Parse transcription result from push response and surface it + if (!string.IsNullOrEmpty(response.Data)) + { + try + { + var transcription = LiveAudioTranscriptionResponse.FromJson(response.Data); + if (!string.IsNullOrEmpty(transcription.Text)) + { + _outputChannel?.Writer.TryWrite(transcription); + } + } + catch (Exception parseEx) + { + // Non-fatal: log and continue if response isn't a transcription result + _logger.LogDebug(parseEx, "Could not parse push response as transcription result"); + } + } + } + } + catch (OperationCanceledException) + { + // Expected on cancellation — push loop exits cleanly + } + catch (Exception ex) + { + _logger.LogError(ex, "Push loop terminated with unexpected error"); + _outputChannel?.Writer.TryComplete( + new FoundryLocalException("Push loop terminated unexpectedly.", ex, _logger)); + } + } + + /// + /// Get the async stream of transcription results. + /// Results arrive as the native ASR engine processes audio data. + /// + /// Cancellation token. + /// Async enumerable of transcription results. + public async IAsyncEnumerable GetTranscriptionStream( + [EnumeratorCancellation] CancellationToken ct = default) + { + if (_outputChannel == null) + { + throw new FoundryLocalException("No active streaming session. Call StartAsync first."); + } + + await foreach (var item in _outputChannel.Reader.ReadAllAsync(ct).ConfigureAwait(false)) + { + yield return item; + } + } + + /// + /// Signal end-of-audio and stop the streaming session. + /// Any remaining buffered audio in the push queue will be drained to native core first. + /// Final results are delivered through before it completes. + /// + /// Cancellation token. + public async Task StopAsync(CancellationToken ct = default) + { + using var disposable = await _lock.LockAsync().ConfigureAwait(false); + + if (!_started || _stopped) + { + return; // already stopped or never started + } + + _stopped = true; + + // 1. Complete the push channel so the push loop drains remaining items and exits + _pushChannel?.Writer.TryComplete(); + + // 2. Wait for the push loop to finish draining + if (_pushLoopTask != null) + { + await _pushLoopTask.ConfigureAwait(false); + } + + // 3. Cancel the session CTS (no-op if push loop already exited) + _sessionCts?.Cancel(); + + // 4. Tell native core to flush and finalize. + // This MUST happen even if ct is cancelled — otherwise native session leaks. + var request = new CoreInteropRequest + { + Params = new Dictionary { { "SessionHandle", _sessionHandle! } } + }; + + ICoreInterop.Response? response = null; + try + { + response = await Task.Run( + () => _coreInterop.StopAudioStream(request), ct) + .ConfigureAwait(false); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // ct fired, but we MUST still stop the native session to avoid a leak. + _logger.LogWarning("StopAsync cancelled — performing best-effort native session stop."); + try + { + response = await Task.Run( + () => _coreInterop.StopAudioStream(request)) + .ConfigureAwait(false); + } + catch (Exception cleanupEx) + { + _logger.LogError(cleanupEx, "Best-effort native session stop failed."); + } + + throw; // Re-throw the cancellation after cleanup + } + finally + { + // Parse final transcription from stop response before completing the channel + if (response?.Data != null) + { + try + { + var finalResult = LiveAudioTranscriptionResponse.FromJson(response.Data); + if (!string.IsNullOrEmpty(finalResult.Text)) + { + _outputChannel?.Writer.TryWrite(finalResult); + } + } + catch (Exception parseEx) + { + _logger.LogDebug(parseEx, "Could not parse stop response as transcription result"); + } + } + + _sessionHandle = null; + _started = false; + _sessionCts?.Dispose(); + _sessionCts = null; + + // Complete the output channel AFTER writing final result + _outputChannel?.Writer.TryComplete(); + } + + if (response?.Error != null) + { + throw new FoundryLocalException( + $"Error stopping audio stream session: {response.Error}", _logger); + } + } + + /// + /// Dispose the streaming session. Calls if the session is still active. + /// Safe to call multiple times. + /// + public async ValueTask DisposeAsync() + { + try + { + if (_started && !_stopped) + { + await StopAsync().ConfigureAwait(false); + } + } + catch (Exception ex) + { + // DisposeAsync must never throw — log and swallow + _logger.LogWarning(ex, "Error during DisposeAsync cleanup."); + } + finally + { + _sessionCts?.Dispose(); + _lock.Dispose(); + } + } +} \ No newline at end of file diff --git a/sdk/cs/src/OpenAI/LiveAudioTranscriptionTypes.cs b/sdk/cs/src/OpenAI/LiveAudioTranscriptionTypes.cs new file mode 100644 index 00000000..c9650232 --- /dev/null +++ b/sdk/cs/src/OpenAI/LiveAudioTranscriptionTypes.cs @@ -0,0 +1,104 @@ +namespace Microsoft.AI.Foundry.Local.OpenAI; + +using System.Text.Json; +using System.Text.Json.Serialization; +using Betalgo.Ranul.OpenAI.ObjectModels.ResponseModels; +using Microsoft.AI.Foundry.Local; +using Microsoft.AI.Foundry.Local.Detail; + +/// +/// Transcription result for real-time audio streaming sessions. +/// Extends to provide a consistent +/// output format with file-based transcription, while adding streaming-specific fields. +/// +public record LiveAudioTranscriptionResponse : AudioCreateTranscriptionResponse +{ + /// + /// Whether this is a final or partial (interim) result. + /// - Nemotron models always return true (every result is final). + /// - Other models (e.g., Azure Embedded) may return false for interim + /// hypotheses that will be replaced by a subsequent final result. + /// + [JsonPropertyName("is_final")] + public bool IsFinal { get; init; } + + internal static LiveAudioTranscriptionResponse FromJson(string json) + { + // Deserialize the core's JSON (which has is_final, text, start_time, end_time) + // into an intermediate record, then map to the response type. + var raw = JsonSerializer.Deserialize(json, + JsonSerializationContext.Default.LiveAudioTranscriptionRaw) + ?? throw new FoundryLocalException("Failed to deserialize live audio transcription result"); + + var response = new LiveAudioTranscriptionResponse + { + Text = raw.Text, + IsFinal = raw.IsFinal, + }; + + // Map start_time/end_time into a Segment for OpenAI-compatible output + if (raw.StartTime.HasValue || raw.EndTime.HasValue) + { + response.Segments = + [ + new Segment + { + Start = (float)(raw.StartTime ?? 0), + End = (float)(raw.EndTime ?? 0), + Text = raw.Text + } + ]; + } + + return response; + } +} + +/// +/// Internal raw deserialization target matching the Core's JSON format. +/// Mapped to in FromJson. +/// +internal record LiveAudioTranscriptionRaw +{ + [JsonPropertyName("is_final")] + public bool IsFinal { get; init; } + + [JsonPropertyName("text")] + public string Text { get; init; } = string.Empty; + + [JsonPropertyName("start_time")] + public double? StartTime { get; init; } + + [JsonPropertyName("end_time")] + public double? EndTime { get; init; } +} + +internal record CoreErrorResponse +{ + [JsonPropertyName("code")] + public string Code { get; init; } = ""; + + [JsonPropertyName("message")] + public string Message { get; init; } = ""; + + [JsonPropertyName("isTransient")] + public bool IsTransient { get; init; } + + /// + /// Attempt to parse a native error string as structured JSON. + /// Returns null if the error is not valid JSON or doesn't match the schema, + /// which should be treated as a permanent/unknown error. + /// + internal static CoreErrorResponse? TryParse(string errorString) + { + try + { + return JsonSerializer.Deserialize(errorString, + JsonSerializationContext.Default.CoreErrorResponse); + } + catch + { + return null; // unstructured error — treat as permanent + } + } +} \ No newline at end of file diff --git a/sdk/cs/test/FoundryLocal.Tests/LiveAudioTranscriptionTests.cs b/sdk/cs/test/FoundryLocal.Tests/LiveAudioTranscriptionTests.cs new file mode 100644 index 00000000..b29ecd77 --- /dev/null +++ b/sdk/cs/test/FoundryLocal.Tests/LiveAudioTranscriptionTests.cs @@ -0,0 +1,176 @@ +// -------------------------------------------------------------------------------------------------------------------- +// +// Copyright (c) Microsoft. All rights reserved. +// +// -------------------------------------------------------------------------------------------------------------------- + +namespace Microsoft.AI.Foundry.Local.Tests; + +using System.Text.Json; +using Microsoft.AI.Foundry.Local.Detail; +using Microsoft.AI.Foundry.Local.OpenAI; + +internal sealed class LiveAudioTranscriptionTests +{ + // --- LiveAudioTranscriptionResponse.FromJson tests --- + + [Test] + public async Task FromJson_ParsesTextAndIsFinal() + { + var json = """{"is_final":true,"text":"hello world","start_time":null,"end_time":null}"""; + + var result = LiveAudioTranscriptionResponse.FromJson(json); + + await Assert.That(result.Text).IsEqualTo("hello world"); + await Assert.That(result.IsFinal).IsTrue(); + await Assert.That(result.Segments).IsNull(); + } + + [Test] + public async Task FromJson_MapsTimingToSegments() + { + var json = """{"is_final":false,"text":"partial","start_time":1.5,"end_time":3.0}"""; + + var result = LiveAudioTranscriptionResponse.FromJson(json); + + await Assert.That(result.Text).IsEqualTo("partial"); + await Assert.That(result.IsFinal).IsFalse(); + await Assert.That(result.Segments).IsNotNull(); + await Assert.That(result.Segments!.Count).IsEqualTo(1); + await Assert.That(result.Segments[0].Start).IsEqualTo(1.5f); + await Assert.That(result.Segments[0].End).IsEqualTo(3.0f); + await Assert.That(result.Segments[0].Text).IsEqualTo("partial"); + } + + [Test] + public async Task FromJson_EmptyText_ParsesSuccessfully() + { + var json = """{"is_final":true,"text":"","start_time":null,"end_time":null}"""; + + var result = LiveAudioTranscriptionResponse.FromJson(json); + + await Assert.That(result.Text).IsEqualTo(""); + await Assert.That(result.IsFinal).IsTrue(); + } + + [Test] + public async Task FromJson_OnlyStartTime_CreatesSegment() + { + var json = """{"is_final":true,"text":"word","start_time":2.0,"end_time":null}"""; + + var result = LiveAudioTranscriptionResponse.FromJson(json); + + await Assert.That(result.Segments).IsNotNull(); + await Assert.That(result.Segments!.Count).IsEqualTo(1); + await Assert.That(result.Segments[0].Start).IsEqualTo(2.0f); + await Assert.That(result.Segments[0].End).IsEqualTo(0f); + } + + [Test] + public async Task FromJson_InvalidJson_Throws() + { + var ex = Assert.Throws(() => + LiveAudioTranscriptionResponse.FromJson("not valid json")); + await Assert.That(ex).IsNotNull(); + } + + [Test] + public async Task FromJson_InheritsFromAudioCreateTranscriptionResponse() + { + var json = """{"is_final":true,"text":"test","start_time":null,"end_time":null}"""; + + var result = LiveAudioTranscriptionResponse.FromJson(json); + + // Verify it's assignable to the base type + Betalgo.Ranul.OpenAI.ObjectModels.ResponseModels.AudioCreateTranscriptionResponse baseRef = result; + await Assert.That(baseRef.Text).IsEqualTo("test"); + } + + // --- LiveAudioTranscriptionOptions tests --- + + [Test] + public async Task Options_DefaultValues() + { + var options = new LiveAudioTranscriptionSession.LiveAudioTranscriptionOptions(); + + await Assert.That(options.SampleRate).IsEqualTo(16000); + await Assert.That(options.Channels).IsEqualTo(1); + await Assert.That(options.Language).IsNull(); + await Assert.That(options.PushQueueCapacity).IsEqualTo(100); + } + + // --- CoreErrorResponse tests --- + + [Test] + public async Task CoreErrorResponse_TryParse_ValidJson() + { + var json = """{"code":"ASR_SESSION_NOT_FOUND","message":"Session not found","isTransient":false}"""; + + var error = CoreErrorResponse.TryParse(json); + + await Assert.That(error).IsNotNull(); + await Assert.That(error!.Code).IsEqualTo("ASR_SESSION_NOT_FOUND"); + await Assert.That(error.Message).IsEqualTo("Session not found"); + await Assert.That(error.IsTransient).IsFalse(); + } + + [Test] + public async Task CoreErrorResponse_TryParse_InvalidJson_ReturnsNull() + { + var result = CoreErrorResponse.TryParse("not json"); + await Assert.That(result).IsNull(); + } + + [Test] + public async Task CoreErrorResponse_TryParse_TransientError() + { + var json = """{"code":"BUSY","message":"Model busy","isTransient":true}"""; + + var error = CoreErrorResponse.TryParse(json); + + await Assert.That(error).IsNotNull(); + await Assert.That(error!.IsTransient).IsTrue(); + } + + // --- Session state guard tests --- + + [Test] + public async Task AppendAsync_BeforeStart_Throws() + { + await using var session = new LiveAudioTranscriptionSession("test-model"); + var data = new ReadOnlyMemory(new byte[100]); + + FoundryLocalException? caught = null; + try + { + await session.AppendAsync(data); + } + catch (FoundryLocalException ex) + { + caught = ex; + } + + await Assert.That(caught).IsNotNull(); + } + + [Test] + public async Task GetTranscriptionStream_BeforeStart_Throws() + { + await using var session = new LiveAudioTranscriptionSession("test-model"); + + FoundryLocalException? caught = null; + try + { + await foreach (var _ in session.GetTranscriptionStream()) + { + // should not reach here + } + } + catch (FoundryLocalException ex) + { + caught = ex; + } + + await Assert.That(caught).IsNotNull(); + } +} diff --git a/sdk/cs/test/FoundryLocal.Tests/Microsoft.AI.Foundry.Local.Tests.csproj b/sdk/cs/test/FoundryLocal.Tests/Microsoft.AI.Foundry.Local.Tests.csproj index b0bd3cd0..60e9eb50 100644 --- a/sdk/cs/test/FoundryLocal.Tests/Microsoft.AI.Foundry.Local.Tests.csproj +++ b/sdk/cs/test/FoundryLocal.Tests/Microsoft.AI.Foundry.Local.Tests.csproj @@ -38,6 +38,10 @@ + + + + runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/sdk/cs/test/FoundryLocal.Tests/ModelTests.cs b/sdk/cs/test/FoundryLocal.Tests/ModelTests.cs index b5a49657..1f49560d 100644 --- a/sdk/cs/test/FoundryLocal.Tests/ModelTests.cs +++ b/sdk/cs/test/FoundryLocal.Tests/ModelTests.cs @@ -51,4 +51,4 @@ public async Task GetLastestVersion_Works() var latestB = model.GetLatestVersion(variants[2]); await Assert.That(latestB).IsEqualTo(variants[1]); } -} +} \ No newline at end of file diff --git a/sdk/js/script/install.cjs b/sdk/js/script/install.cjs index 3db771b8..600741ae 100644 --- a/sdk/js/script/install.cjs +++ b/sdk/js/script/install.cjs @@ -49,19 +49,19 @@ const ORT_FEED = 'https://pkgs.dev.azure.com/aiinfra/PublicPackages/_packaging/O const ORT_NIGHTLY_FEED = 'https://pkgs.dev.azure.com/aiinfra/PublicPackages/_packaging/ORT-Nightly/nuget/v3/index.json'; // If nightly is requested, pull Core/GenAI from the ORT-Nightly feed where nightly builds are published. -// Otherwise use the standard NuGet.org feed. +// Otherwise use the ORT stable feed where release Core packages are published. const CORE_FEED = useNightly ? ORT_NIGHTLY_FEED : NUGET_FEED; const FOUNDRY_LOCAL_CORE_ARTIFACT = { name: 'Microsoft.AI.Foundry.Local.Core', - version: '0.9.0.8-rc3', + version: '0.9.0-dev', feed: ORT_NIGHTLY_FEED, nightly: useNightly } const FOUNDRY_LOCAL_CORE_WINML_ARTIFACT = { name: 'Microsoft.AI.Foundry.Local.Core.WinML', - version: '0.9.0.8-rc3', + version: '0.9.0-dev', feed: ORT_NIGHTLY_FEED, nightly: useNightly } diff --git a/sdk/js/src/imodel.ts b/sdk/js/src/imodel.ts index be0913d6..c2984b62 100644 --- a/sdk/js/src/imodel.ts +++ b/sdk/js/src/imodel.ts @@ -1,5 +1,6 @@ import { ChatClient } from './openai/chatClient.js'; import { AudioClient } from './openai/audioClient.js'; +import { LiveAudioTranscriptionClient } from './openai/liveAudioTranscriptionClient.js'; import { ResponsesClient } from './openai/responsesClient.js'; export interface IModel { @@ -16,6 +17,18 @@ export interface IModel { createChatClient(): ChatClient; createAudioClient(): AudioClient; + + /** + * Creates a LiveAudioTranscriptionClient for real-time audio streaming ASR. + * The model must be loaded before calling this method. + * @returns A LiveAudioTranscriptionClient instance. + */ + createLiveTranscriptionClient(): LiveAudioTranscriptionClient; + /** + * Creates a LiveAudioTranscriptionClient for real-time audio streaming ASR. + * @returns A LiveAudioTranscriptionClient instance. + */ + createLiveTranscriptionClient(): LiveAudioTranscriptionClient; /** * Creates a ResponsesClient for interacting with the model via the Responses API. * Unlike createChatClient/createAudioClient (which use FFI), the Responses API diff --git a/sdk/js/src/index.ts b/sdk/js/src/index.ts index 7d7ee17a..63f971fd 100644 --- a/sdk/js/src/index.ts +++ b/sdk/js/src/index.ts @@ -6,6 +6,8 @@ export { ModelVariant } from './modelVariant.js'; export type { IModel } from './imodel.js'; export { ChatClient, ChatClientSettings } from './openai/chatClient.js'; export { AudioClient, AudioClientSettings } from './openai/audioClient.js'; +export { LiveAudioTranscriptionClient, LiveAudioTranscriptionSettings } from './openai/liveAudioTranscriptionClient.js'; +export type { LiveAudioTranscriptionResult, CoreErrorResponse } from './openai/liveAudioTranscriptionTypes.js'; export { ResponsesClient, ResponsesClientSettings, getOutputText } from './openai/responsesClient.js'; export { ModelLoadManager } from './detail/modelLoadManager.js'; /** @internal */ diff --git a/sdk/js/src/model.ts b/sdk/js/src/model.ts index e2b37119..2ea1da01 100644 --- a/sdk/js/src/model.ts +++ b/sdk/js/src/model.ts @@ -1,6 +1,7 @@ import { ModelVariant } from './modelVariant.js'; import { ChatClient } from './openai/chatClient.js'; import { AudioClient } from './openai/audioClient.js'; +import { LiveAudioTranscriptionClient } from './openai/liveAudioTranscriptionClient.js'; import { ResponsesClient } from './openai/responsesClient.js'; import { IModel } from './imodel.js'; @@ -159,6 +160,14 @@ export class Model implements IModel { return this.selectedVariant.createAudioClient(); } + /** + * Creates a LiveAudioTranscriptionClient for real-time audio streaming ASR. + * @returns A LiveAudioTranscriptionClient instance. + */ + public createLiveTranscriptionClient(): LiveAudioTranscriptionClient { + return this.selectedVariant.createLiveTranscriptionClient(); + } + /** * Creates a ResponsesClient for interacting with the model via the Responses API. * @param baseUrl - The base URL of the Foundry Local web service. diff --git a/sdk/js/src/modelVariant.ts b/sdk/js/src/modelVariant.ts index 4d3e2bee..c5bbf24e 100644 --- a/sdk/js/src/modelVariant.ts +++ b/sdk/js/src/modelVariant.ts @@ -3,6 +3,7 @@ import { ModelLoadManager } from './detail/modelLoadManager.js'; import { ModelInfo } from './types.js'; import { ChatClient } from './openai/chatClient.js'; import { AudioClient } from './openai/audioClient.js'; +import { LiveAudioTranscriptionClient } from './openai/liveAudioTranscriptionClient.js'; import { ResponsesClient } from './openai/responsesClient.js'; import { IModel } from './imodel.js'; @@ -129,6 +130,14 @@ export class ModelVariant implements IModel { return new AudioClient(this._modelInfo.id, this.coreInterop); } + /** + * Creates a LiveAudioTranscriptionClient for real-time audio streaming ASR. + * @returns A LiveAudioTranscriptionClient instance. + */ + public createLiveTranscriptionClient(): LiveAudioTranscriptionClient { + return new LiveAudioTranscriptionClient(this._modelInfo.id, this.coreInterop); + } + /** * Creates a ResponsesClient for interacting with the model via the Responses API. * @param baseUrl - The base URL of the Foundry Local web service. diff --git a/sdk/js/src/openai/liveAudioTranscriptionClient.ts b/sdk/js/src/openai/liveAudioTranscriptionClient.ts new file mode 100644 index 00000000..0857f840 --- /dev/null +++ b/sdk/js/src/openai/liveAudioTranscriptionClient.ts @@ -0,0 +1,369 @@ +import { CoreInterop } from '../detail/coreInterop.js'; +import { LiveAudioTranscriptionResult, tryParseCoreError } from './liveAudioTranscriptionTypes.js'; + +/** + * Audio format settings for a streaming session. + * Must be configured before calling start(). + * Settings are frozen once the session starts. + */ +export class LiveAudioTranscriptionSettings { + /** PCM sample rate in Hz. Default: 16000. */ + sampleRate: number = 16000; + /** Number of audio channels. Default: 1 (mono). */ + channels: number = 1; + /** Bits per sample. Default: 16. */ + bitsPerSample: number = 16; + /** Optional BCP-47 language hint (e.g., "en", "zh"). */ + language?: string; + /** Maximum number of audio chunks buffered in the internal push queue. Default: 100. */ + pushQueueCapacity: number = 100; + + /** @internal Create a frozen copy of these settings. */ + snapshot(): LiveAudioTranscriptionSettings { + const copy = new LiveAudioTranscriptionSettings(); + copy.sampleRate = this.sampleRate; + copy.channels = this.channels; + copy.bitsPerSample = this.bitsPerSample; + copy.language = this.language; + copy.pushQueueCapacity = this.pushQueueCapacity; + return Object.freeze(copy) as LiveAudioTranscriptionSettings; + } +} + +/** + * Internal async queue that acts like C#'s Channel. + * Supports a single consumer reading via async iteration and multiple producers writing. + * @internal + */ +class AsyncQueue { + private queue: T[] = []; + private waitingResolve: ((value: IteratorResult) => void) | null = null; + private completed = false; + private completionError: Error | null = null; + private maxCapacity: number; + private backpressureResolve: (() => void) | null = null; + + constructor(maxCapacity: number = Infinity) { + this.maxCapacity = maxCapacity; + } + + /** Push an item. If at capacity, waits until space is available. */ + async write(item: T): Promise { + if (this.completed) { + throw new Error('Cannot write to a completed queue.'); + } + + if (this.waitingResolve) { + const resolve = this.waitingResolve; + this.waitingResolve = null; + resolve({ value: item, done: false }); + return; + } + + if (this.queue.length >= this.maxCapacity) { + await new Promise((resolve) => { + this.backpressureResolve = resolve; + }); + } + + this.queue.push(item); + } + + /** Push an item synchronously (no backpressure wait). */ + tryWrite(item: T): boolean { + if (this.completed) return false; + + if (this.waitingResolve) { + const resolve = this.waitingResolve; + this.waitingResolve = null; + resolve({ value: item, done: false }); + return true; + } + + this.queue.push(item); + return true; + } + + /** Signal that no more items will be written. */ + complete(error?: Error): void { + if (this.completed) return; + this.completed = true; + this.completionError = error ?? null; + + if (this.backpressureResolve) { + this.backpressureResolve(); + this.backpressureResolve = null; + } + + if (this.waitingResolve) { + const resolve = this.waitingResolve; + this.waitingResolve = null; + resolve({ value: undefined as any, done: true }); + } + } + + get error(): Error | null { + return this.completionError; + } + + /** Async iterator for consuming items. */ + async *[Symbol.asyncIterator](): AsyncGenerator { + while (true) { + if (this.backpressureResolve && this.queue.length < this.maxCapacity) { + const resolve = this.backpressureResolve; + this.backpressureResolve = null; + resolve(); + } + + if (this.queue.length > 0) { + yield this.queue.shift()!; + continue; + } + + if (this.completed) { + if (this.completionError) { + throw this.completionError; + } + return; + } + + const result = await new Promise>((resolve) => { + this.waitingResolve = resolve; + }); + + if (result.done) { + if (this.completionError) { + throw this.completionError; + } + return; + } + + yield result.value; + } + } +} + +/** + * Client for real-time audio streaming ASR (Automatic Speech Recognition). + * Audio data from a microphone (or other source) is pushed in as PCM chunks, + * and transcription results are returned as an async iterable. + * + * Mirrors the C# LiveAudioTranscriptionSession. + */ +export class LiveAudioTranscriptionClient { + private modelId: string; + private coreInterop: CoreInterop; + + private sessionHandle: string | null = null; + private started = false; + private stopped = false; + + private outputQueue: AsyncQueue | null = null; + private pushQueue: AsyncQueue | null = null; + private pushLoopPromise: Promise | null = null; + private activeSettings: LiveAudioTranscriptionSettings | null = null; + private sessionAbortController: AbortController | null = null; + + /** + * Configuration settings for the streaming session. + * Must be configured before calling start(). Settings are frozen after start(). + */ + public settings = new LiveAudioTranscriptionSettings(); + + /** + * @internal + * Users should create clients via Model.createLiveTranscriptionClient(). + */ + constructor(modelId: string, coreInterop: CoreInterop) { + this.modelId = modelId; + this.coreInterop = coreInterop; + } + + /** + * Start a real-time audio streaming session. + * Must be called before pushAudioData() or getTranscriptionStream(). + * Settings are frozen after this call. + */ + public async start(): Promise { + if (this.started) { + throw new Error('Streaming session already started. Call stop() first.'); + } + + this.activeSettings = this.settings.snapshot(); + this.outputQueue = new AsyncQueue(); + this.pushQueue = new AsyncQueue(this.activeSettings.pushQueueCapacity); + + const params: Record = { + Model: this.modelId, + SampleRate: this.activeSettings.sampleRate.toString(), + Channels: this.activeSettings.channels.toString(), + BitsPerSample: this.activeSettings.bitsPerSample.toString(), + }; + + if (this.activeSettings.language) { + params['Language'] = this.activeSettings.language; + } + + try { + const response = this.coreInterop.executeCommand("audio_stream_start", { + Params: params + }); + + this.sessionHandle = response; + if (!this.sessionHandle) { + throw new Error('Native core did not return a session handle.'); + } + } catch (error) { + this.outputQueue.complete(); + throw new Error( + `Error starting audio stream session: ${error instanceof Error ? error.message : String(error)}`, + { cause: error } + ); + } + + this.started = true; + this.stopped = false; + + this.sessionAbortController = new AbortController(); + this.pushLoopPromise = this.pushLoop(); + } + + /** + * Push a chunk of raw PCM audio data to the streaming session. + * Can be called from any context. Chunks are internally queued + * and serialized to native core one at a time. + * + * @param pcmData - Raw PCM audio bytes matching the configured format. + */ + public async pushAudioData(pcmData: Uint8Array): Promise { + if (!this.started || this.stopped) { + throw new Error('No active streaming session. Call start() first.'); + } + + const copy = new Uint8Array(pcmData.length); + copy.set(pcmData); + + await this.pushQueue!.write(copy); + } + + /** + * Internal loop that drains the push queue and sends chunks to native core one at a time. + * Terminates the session on any native error. + * @internal + */ + private async pushLoop(): Promise { + try { + for await (const audioData of this.pushQueue!) { + if (this.sessionAbortController?.signal.aborted) { + break; + } + + try { + this.coreInterop.executeCommand("audio_stream_push", { + Params: { + SessionHandle: this.sessionHandle!, + AudioDataLength: audioData.length.toString() + } + }); + } catch (error) { + const errorMsg = error instanceof Error ? error.message : String(error); + const errorInfo = tryParseCoreError(errorMsg); + + const fatalError = new Error( + `Push failed (code=${errorInfo?.code ?? 'UNKNOWN'}): ${errorMsg}`, + { cause: error } + ); + console.error('Terminating push loop due to push failure:', errorMsg); + this.outputQueue?.complete(fatalError); + return; + } + } + } catch (error) { + if (this.sessionAbortController?.signal.aborted) { + return; + } + const err = error instanceof Error ? error : new Error(String(error)); + console.error('Push loop terminated with unexpected error:', err.message); + this.outputQueue?.complete(new Error('Push loop terminated unexpectedly.', { cause: err })); + } + } + + /** + * Get the async iterable of transcription results. + * Results arrive as the native ASR engine processes audio data. + * + * Usage: + * ```ts + * for await (const result of client.getTranscriptionStream()) { + * console.log(result.text); + * } + * ``` + */ + public async *getTranscriptionStream(): AsyncGenerator { + if (!this.outputQueue) { + throw new Error('No active streaming session. Call start() first.'); + } + + for await (const item of this.outputQueue) { + yield item; + } + } + + /** + * Signal end-of-audio and stop the streaming session. + * Any remaining buffered audio in the push queue will be drained to native core first. + * Final results are delivered through getTranscriptionStream() before it completes. + */ + public async stop(): Promise { + if (!this.started || this.stopped) { + return; + } + + this.stopped = true; + + this.pushQueue?.complete(); + + if (this.pushLoopPromise) { + await this.pushLoopPromise; + } + + this.sessionAbortController?.abort(); + + let stopError: Error | null = null; + try { + this.coreInterop.executeCommand("audio_stream_stop", { + Params: { SessionHandle: this.sessionHandle! } + }); + } catch (error) { + stopError = error instanceof Error ? error : new Error(String(error)); + console.error('Error stopping audio stream session:', stopError.message); + } + + this.sessionHandle = null; + this.started = false; + this.sessionAbortController = null; + + this.outputQueue?.complete(); + + if (stopError) { + throw new Error( + `Error stopping audio stream session: ${stopError.message}`, + { cause: stopError } + ); + } + } + + /** + * Dispose the client and stop any active session. + * Safe to call multiple times. + */ + public async dispose(): Promise { + try { + if (this.started && !this.stopped) { + await this.stop(); + } + } catch (error) { + console.warn('Error during dispose cleanup:', error instanceof Error ? error.message : String(error)); + } + } +} diff --git a/sdk/js/src/openai/liveAudioTranscriptionTypes.ts b/sdk/js/src/openai/liveAudioTranscriptionTypes.ts new file mode 100644 index 00000000..eb521cbd --- /dev/null +++ b/sdk/js/src/openai/liveAudioTranscriptionTypes.ts @@ -0,0 +1,49 @@ +/** + * Types for real-time audio streaming transcription results and structured errors. + * Mirrors the C# LiveAudioTranscriptionResponse and CoreErrorResponse. + */ + +/** + * A transcription result from a real-time audio streaming session. + * Mirrors the C# LiveAudioTranscriptionResponse which extends AudioCreateTranscriptionResponse. + */ +export interface LiveAudioTranscriptionResult { + /** Whether this is a partial (interim) or final result for this segment. */ + is_final: boolean; + /** The transcribed text. */ + text: string; + /** Start time offset of this segment in the audio stream (seconds). */ + start_time?: number | null; + /** End time offset of this segment in the audio stream (seconds). */ + end_time?: number | null; +} + +/** + * Structured error response from native core audio streaming commands. + * @internal + */ +export interface CoreErrorResponse { + /** Machine-readable error code. */ + code: string; + /** Human-readable error message. */ + message: string; + /** Whether this error is transient and may succeed on retry. */ + isTransient: boolean; +} + +/** + * Attempt to parse a native error string as a structured CoreErrorResponse. + * Returns null if the error is not valid JSON or doesn't match the schema. + * @internal + */ +export function tryParseCoreError(errorString: string): CoreErrorResponse | null { + try { + const parsed = JSON.parse(errorString); + if (typeof parsed.code === 'string' && typeof parsed.isTransient === 'boolean') { + return parsed as CoreErrorResponse; + } + return null; + } catch { + return null; + } +}