diff --git a/samples/python/live-audio-transcription/src/app.py b/samples/python/live-audio-transcription/src/app.py new file mode 100644 index 00000000..3688f1e0 --- /dev/null +++ b/samples/python/live-audio-transcription/src/app.py @@ -0,0 +1,117 @@ +# Live Audio Transcription — Foundry Local SDK Example (Python) +# +# Demonstrates real-time microphone-to-text using: +# SDK (FoundryLocalManager) → Core (NativeAOT DLL) → onnxruntime-genai (StreamingProcessor) +# +# Usage: +# pip install pyaudio +# python app.py + +import threading + +import pyaudio +from foundry_local_sdk import Configuration, FoundryLocalManager + +print("===========================================================") +print(" Foundry Local -- Live Audio Transcription Demo (Python)") +print("===========================================================") +print() + +# Initialize +config = Configuration(app_name="foundry_local_samples") +FoundryLocalManager.initialize(config) +manager = FoundryLocalManager.instance + +# Download and load the Nemotron ASR model +model = manager.catalog.get_model("nemotron") +model.download( + lambda progress: print( + f"\rDownloading model: {progress:.2f}%", end="", flush=True + ) +) +print() +print(f"Loading model {model.id}...", end="") +model.load() +print("done.") + +# Create a live transcription session +audio_client = model.get_audio_client() +session = audio_client.create_live_transcription_session() +session.settings.sample_rate = 16000 +session.settings.channels = 1 +session.settings.language = "en" + +session.start() +print(" Session started") + +# Start reading transcription results in a background thread +def read_results(): + try: + for result in session.get_transcription_stream(): + text = result.content[0].text if result.content else "" + if result.is_final: + print() + print(f" [FINAL] {text}") + elif text: + print(f"\033[96m{text}\033[0m", end="", flush=True) + except Exception: + pass + +read_thread = threading.Thread(target=read_results, daemon=True) +read_thread.start() + +# Open microphone with PyAudio +RATE = 16000 +CHANNELS = 1 +FORMAT = pyaudio.paInt16 +CHUNK = RATE // 10 # 100ms chunks + +pa = pyaudio.PyAudio() +stream = pa.open( + format=FORMAT, + channels=CHANNELS, + rate=RATE, + input=True, + frames_per_buffer=CHUNK, +) + +print() +print("===========================================================") +print(" LIVE TRANSCRIPTION ACTIVE") +print(" Speak into your microphone.") +print(" Transcription appears in real-time (cyan text).") +print(" Press ENTER to stop recording.") +print("===========================================================") +print() + +# Capture microphone audio in a background thread, push to session +stop_recording = threading.Event() + +def capture_mic(): + """Read PCM chunks from the microphone and push to the streaming session.""" + while not stop_recording.is_set(): + try: + pcm_data = stream.read(CHUNK, exception_on_overflow=False) + if pcm_data: + session.append(pcm_data) + except Exception: + break + +capture_thread = threading.Thread(target=capture_mic, daemon=True) +capture_thread.start() + +# Wait for ENTER to stop +input() + +# Stop recording +stop_recording.set() +capture_thread.join(timeout=2) + +stream.stop_stream() +stream.close() +pa.terminate() + +session.stop() +read_thread.join() + +model.unload() diff --git a/sdk/python/src/detail/core_interop.py b/sdk/python/src/detail/core_interop.py index 1cd53e33..f0361723 100644 --- a/sdk/python/src/detail/core_interop.py +++ b/sdk/python/src/detail/core_interop.py @@ -46,6 +46,23 @@ class RequestBuffer(ctypes.Structure): ] +class StreamingRequestBuffer(ctypes.Structure): + """ctypes Structure matching the native ``StreamingRequestBuffer`` C struct. + + Extends ``RequestBuffer`` with binary data fields for sending raw payloads + (e.g. PCM audio bytes) alongside JSON parameters. + """ + + _fields_ = [ + ("Command", ctypes.c_void_p), + ("CommandLength", ctypes.c_int), + ("Data", ctypes.c_void_p), + ("DataLength", ctypes.c_int), + ("BinaryData", ctypes.c_void_p), + ("BinaryDataLength", ctypes.c_int), + ] + + class ResponseBuffer(ctypes.Structure): """ctypes Structure matching the native ``ResponseBuffer`` C struct.""" @@ -108,6 +125,32 @@ class CoreInterop: # Returns c_int: 0 = continue, 1 = cancel. CALLBACK_TYPE = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p) + @staticmethod + def _load_dll_win(dll_path: str): + """Load a DLL on Windows using ``LOAD_WITH_ALTERED_SEARCH_PATH``. + + This flag tells Windows to resolve the DLL's dependencies starting from + the DLL's own directory rather than the process's default search path. + Prevents conflicts with stale same-named DLLs in system directories + (e.g. an old onnxruntime.dll in system32). + + The DLL is first loaded via ``LoadLibraryExW`` (which maps it into the + process), then wrapped in a ``ctypes.CDLL`` for Python access. + """ + kernel32 = ctypes.windll.kernel32 + kernel32.LoadLibraryExW.restype = ctypes.c_void_p + kernel32.LoadLibraryExW.argtypes = [ctypes.c_wchar_p, ctypes.c_void_p, ctypes.c_int] + _LOAD_WITH_ALTERED_SEARCH_PATH = 0x00000008 + + handle = kernel32.LoadLibraryExW(dll_path, None, _LOAD_WITH_ALTERED_SEARCH_PATH) + if not handle: + logger.warning("LoadLibraryExW failed for %s (error %d), falling back to ctypes.CDLL", + dll_path, kernel32.GetLastError()) + return ctypes.CDLL(dll_path) + + # DLL is now mapped; ctypes.CDLL will reuse the loaded module + return ctypes.CDLL(dll_path) + @staticmethod def _initialize_native_libraries() -> 'NativeBinaryPaths': """Load the native Foundry Local Core library and its dependencies. @@ -142,6 +185,11 @@ def _initialize_native_libraries() -> 'NativeBinaryPaths': for native_dir in paths.all_dirs(): os.add_dll_directory(str(native_dir)) + # Set the DLL search directory so that when ORT/GenAI load their + # own dependencies, they find sibling DLLs from the correct + # directory rather than stale copies in system directories. + ctypes.windll.kernel32.SetDllDirectoryW(str(paths.ort_dir)) + # Explicitly pre-load ORT and GenAI so their symbols are globally # available when Core does P/Invoke lookups at runtime. # On Windows the PATH manipulation above is sufficient; on @@ -149,8 +197,8 @@ def _initialize_native_libraries() -> 'NativeBinaryPaths': # Core native code can resolve ORT/GenAI symbols. # ORT must be loaded before GenAI (GenAI depends on ORT). if sys.platform.startswith("win"): - CoreInterop._ort_library = ctypes.CDLL(str(paths.ort)) - CoreInterop._genai_library = ctypes.CDLL(str(paths.genai)) + CoreInterop._ort_library = CoreInterop._load_dll_win(str(paths.ort)) + CoreInterop._genai_library = CoreInterop._load_dll_win(str(paths.genai)) else: CoreInterop._ort_library = ctypes.CDLL(str(paths.ort), mode=os.RTLD_GLOBAL) CoreInterop._genai_library = ctypes.CDLL(str(paths.genai), mode=os.RTLD_GLOBAL) @@ -173,6 +221,10 @@ def _initialize_native_libraries() -> 'NativeBinaryPaths': ctypes.c_void_p] # user_data lib.execute_command_with_callback.restype = None + lib.execute_command_with_binary.argtypes = [ctypes.POINTER(StreamingRequestBuffer), + ctypes.POINTER(ResponseBuffer)] + lib.execute_command_with_binary.restype = None + return paths @staticmethod @@ -295,6 +347,66 @@ def execute_command_with_callback(self, command_name: str, command_input: Option response = self._execute_command(command_name, command_input, callback) return response + def execute_command_with_binary(self, command_name: str, + command_input: Optional[InteropRequest], + binary_data: bytes) -> Response: + """Execute a command with both JSON parameters and a raw binary payload. + + Used for operations like pushing PCM audio data alongside JSON metadata. + + Args: + command_name: The native command name (e.g. ``"audio_stream_push"``). + command_input: Optional request parameters (serialized as JSON). + binary_data: Raw binary payload (e.g. PCM audio bytes). + + Returns: + A ``Response`` with ``data`` on success or ``error`` on failure. + """ + logger.debug("Executing command with binary: %s Input: %s BinaryLen: %d", + command_name, command_input.params if command_input else None, len(binary_data)) + + cmd_ptr, cmd_len, cmd_buf = CoreInterop._to_c_buffer(command_name) + data_ptr, data_len, data_buf = CoreInterop._to_c_buffer( + command_input.to_json() if command_input else None + ) + + # Keep binary data alive for the duration of the native call + binary_buf = ctypes.create_string_buffer(binary_data) + binary_ptr = ctypes.cast(binary_buf, ctypes.c_void_p) + + req = StreamingRequestBuffer( + Command=cmd_ptr, CommandLength=cmd_len, + Data=data_ptr, DataLength=data_len, + BinaryData=binary_ptr, BinaryDataLength=len(binary_data), + ) + resp = ResponseBuffer() + lib = CoreInterop._flcore_library + + lib.execute_command_with_binary(ctypes.byref(req), ctypes.byref(resp)) + + req = None # Free Python reference to request + + response_str = ctypes.string_at(resp.Data, resp.DataLength).decode("utf-8") if resp.Data else None + error_str = ctypes.string_at(resp.Error, resp.ErrorLength).decode("utf-8") if resp.Error else None + + lib.free_response(resp) + + return Response(data=response_str, error=error_str) + + # --- Audio streaming session support --- + + def start_audio_stream(self, command_input: InteropRequest) -> Response: + """Start a real-time audio streaming session via ``audio_stream_start``.""" + return self.execute_command("audio_stream_start", command_input) + + def push_audio_data(self, command_input: InteropRequest, audio_data: bytes) -> Response: + """Push a chunk of raw PCM audio data via ``audio_stream_push``.""" + return self.execute_command_with_binary("audio_stream_push", command_input, audio_data) + + def stop_audio_stream(self, command_input: InteropRequest) -> Response: + """Stop a real-time audio streaming session via ``audio_stream_stop``.""" + return self.execute_command("audio_stream_stop", command_input) + def get_cached_model_ids(core_interop: CoreInterop) -> list[str]: """Get the list of models that have been downloaded and are cached.""" diff --git a/sdk/python/src/openai/__init__.py b/sdk/python/src/openai/__init__.py index e445ba1d..bd12172c 100644 --- a/sdk/python/src/openai/__init__.py +++ b/sdk/python/src/openai/__init__.py @@ -6,5 +6,21 @@ from .chat_client import ChatClient, ChatClientSettings from .audio_client import AudioClient +from .live_audio_transcription_client import LiveAudioTranscriptionSession +from .live_audio_transcription_types import ( + CoreErrorResponse, + LiveAudioTranscriptionOptions, + LiveAudioTranscriptionResponse, + TranscriptionContentPart, +) -__all__ = ["AudioClient", "ChatClient", "ChatClientSettings"] +__all__ = [ + "AudioClient", + "ChatClient", + "ChatClientSettings", + "CoreErrorResponse", + "LiveAudioTranscriptionOptions", + "LiveAudioTranscriptionResponse", + "LiveAudioTranscriptionSession", + "TranscriptionContentPart", +] diff --git a/sdk/python/src/openai/audio_client.py b/sdk/python/src/openai/audio_client.py index 0858e4aa..575e9abf 100644 --- a/sdk/python/src/openai/audio_client.py +++ b/sdk/python/src/openai/audio_client.py @@ -14,6 +14,7 @@ from ..detail.core_interop import CoreInterop, InteropRequest from ..exception import FoundryLocalException +from .live_audio_transcription_client import LiveAudioTranscriptionSession logger = logging.getLogger(__name__) @@ -61,6 +62,25 @@ def __init__(self, model_id: str, core_interop: CoreInterop): self.settings = AudioSettings() self._core_interop = core_interop + def create_live_transcription_session(self) -> LiveAudioTranscriptionSession: + """Create a real-time streaming transcription session. + + Audio data is pushed in as PCM chunks and transcription results are + returned as a synchronous generator. + + Returns: + A streaming session that should be stopped when done. + Supports use as a context manager:: + + with audio_client.create_live_transcription_session() as session: + session.settings.sample_rate = 16000 + session.start() + session.append(pcm_bytes) + for result in session.get_transcription_stream(): + print(result.content[0].text) + """ + return LiveAudioTranscriptionSession(self.model_id, self._core_interop) + @staticmethod def _validate_audio_file_path(audio_file_path: str) -> None: """Validate that the audio file path is a non-empty string.""" diff --git a/sdk/python/src/openai/live_audio_transcription_client.py b/sdk/python/src/openai/live_audio_transcription_client.py new file mode 100644 index 00000000..7f0c030f --- /dev/null +++ b/sdk/python/src/openai/live_audio_transcription_client.py @@ -0,0 +1,311 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- +"""Live audio transcription streaming session. + +Provides :class:`LiveAudioTranscriptionSession` — a push-based streaming +session for real-time audio-to-text transcription via ONNX Runtime GenAI. +""" + +from __future__ import annotations + +import logging +import queue +import threading +from typing import Generator, Optional + +from ..detail.core_interop import CoreInterop, InteropRequest +from ..exception import FoundryLocalException +from .live_audio_transcription_types import ( + CoreErrorResponse, + LiveAudioTranscriptionOptions, + LiveAudioTranscriptionResponse, +) + +logger = logging.getLogger(__name__) + +_SENTINEL = object() + + +class LiveAudioTranscriptionSession: + """Session for real-time audio streaming ASR (Automatic Speech Recognition). + + Audio data from a microphone (or other source) is pushed in as PCM chunks, + and transcription results are returned as a synchronous generator. + + Created via :meth:`AudioClient.create_live_transcription_session`. + + Thread safety + ------------- + :meth:`append` can be called from any thread (including high-frequency + audio callbacks). Pushes are internally serialized via a bounded queue + to prevent unbounded memory growth and ensure ordering. + + Example:: + + session = audio_client.create_live_transcription_session() + session.settings.sample_rate = 16000 + session.settings.channels = 1 + session.settings.language = "en" + + session.start() + + # Push audio from a microphone callback (thread-safe) + session.append(pcm_bytes) + + # Read results as they arrive + for result in session.get_transcription_stream(): + print(result.content[0].text, end="", flush=True) + + session.stop() + """ + + def __init__(self, model_id: str, core_interop: CoreInterop): + self._model_id = model_id + self._core_interop = core_interop + + # Public settings — mutable until start() + self.settings = LiveAudioTranscriptionOptions() + + # Session state — protected by _lock + self._lock = threading.Lock() + self._session_handle: Optional[str] = None + self._started = False + self._stopped = False + + # Frozen settings snapshot + self._active_settings: Optional[LiveAudioTranscriptionOptions] = None + + # Output queue: push loop writes, user reads via get_transcription_stream + self._output_queue: Optional[queue.Queue] = None + + # Internal push queue: user writes audio chunks, background loop drains to native core + self._push_queue: Optional[queue.Queue] = None + self._push_thread: Optional[threading.Thread] = None + + def start(self) -> None: + """Start a real-time audio streaming session. + + Must be called before :meth:`append` or :meth:`get_transcription_stream`. + Settings are frozen after this call. + + Raises: + FoundryLocalException: If the session is already started or the + native core returns an error. + """ + with self._lock: + if self._started: + raise FoundryLocalException( + "Streaming session already started. Call stop() first." + ) + + # Freeze settings + self._active_settings = self.settings.snapshot() + + self._output_queue = queue.Queue() + self._push_queue = queue.Queue(maxsize=self._active_settings.push_queue_capacity) + + request = InteropRequest(params={ + "Model": self._model_id, + "SampleRate": str(self._active_settings.sample_rate), + "Channels": str(self._active_settings.channels), + "BitsPerSample": str(self._active_settings.bits_per_sample), + }) + + if self._active_settings.language is not None: + request.params["Language"] = self._active_settings.language + + response = self._core_interop.start_audio_stream(request) + + if response.error is not None: + raise FoundryLocalException( + f"Error starting audio stream session: {response.error}" + ) + + self._session_handle = response.data + if self._session_handle is None: + raise FoundryLocalException( + "Native core did not return a session handle." + ) + + self._started = True + self._stopped = False + + # Start the push loop thread (non-daemon so it blocks process + # exit until stop() is called — aligns with FL Core's no-daemon design) + self._push_thread = threading.Thread( + target=self._push_loop, daemon=False + ) + self._push_thread.start() + + def append(self, pcm_data: bytes) -> None: + """Push a chunk of raw PCM audio data to the streaming session. + + Can be called from any thread (including audio device callbacks). + Chunks are internally queued and serialized to the native core. + + The data is copied to avoid issues if the caller reuses the buffer. + + Args: + pcm_data: Raw PCM audio bytes matching the configured format. + + Raises: + FoundryLocalException: If no active streaming session exists. + """ + # Copy the data to avoid issues if the caller reuses the buffer + data_copy = bytes(pcm_data) + + with self._lock: + if not self._started or self._stopped: + raise FoundryLocalException( + "No active streaming session. Call start() first." + ) + + push_queue = self._push_queue + if push_queue is None: + raise FoundryLocalException( + "No active streaming session. Call start() first." + ) + + push_queue.put(data_copy) + + def get_transcription_stream(self) -> Generator[LiveAudioTranscriptionResponse, None, None]: + """Get the stream of transcription results. + + Results arrive as the native ASR engine processes audio data. + The generator completes when :meth:`stop` is called and all + remaining audio has been processed. + + Yields: + Transcription results as ``LiveAudioTranscriptionResponse`` objects. + + Raises: + FoundryLocalException: If no active streaming session exists, + or if the push loop encountered a fatal error. + """ + q = self._output_queue + if q is None: + raise FoundryLocalException( + "No active streaming session. Call start() first." + ) + + while True: + item = q.get() + if item is _SENTINEL: + break + if isinstance(item, Exception): + raise item + yield item + + def stop(self) -> None: + """Signal end-of-audio and stop the streaming session. + + Any remaining buffered audio in the push queue will be drained to + native core first. Final results are delivered through + :meth:`get_transcription_stream` before it completes. + """ + with self._lock: + if not self._started or self._stopped: + return # already stopped or never started + + self._stopped = True + + # 1. Signal push loop to finish (put sentinel) + self._push_queue.put(_SENTINEL) + + # 2. Wait for push loop to finish draining + if self._push_thread is not None: + self._push_thread.join() + + # 3. Tell native core to flush and finalize + request = InteropRequest(params={"SessionHandle": self._session_handle}) + response = self._core_interop.stop_audio_stream(request) + + # Parse final transcription from stop response + if response.data: + try: + final_result = LiveAudioTranscriptionResponse.from_json(response.data) + text = (final_result.content[0].text + if final_result.content else "") + if text: + self._output_queue.put(final_result) + except Exception as parse_ex: + logger.debug("Could not parse stop response as transcription result: %s", parse_ex) + + # 4. Complete the output queue + self._output_queue.put(_SENTINEL) + + # 5. Clean up — set _output_queue to None so subsequent calls to + # get_transcription_stream() fail fast instead of hanging. + self._session_handle = None + self._started = False + self._output_queue = None + + if response.error is not None: + raise FoundryLocalException( + f"Error stopping audio stream session: {response.error}" + ) + + def _push_loop(self) -> None: + """Internal loop that drains the push queue and sends chunks to native core. + + Terminates the session on any native error. + """ + try: + while True: + audio_data = self._push_queue.get() + if audio_data is _SENTINEL: + break + + request = InteropRequest( + params={"SessionHandle": self._session_handle} + ) + response = self._core_interop.push_audio_data(request, audio_data) + + if response.error is not None: + error_info = CoreErrorResponse.try_parse(response.error) + code = error_info.code if error_info else "UNKNOWN" + fatal_ex = FoundryLocalException( + f"Push failed (code={code}): {response.error}" + ) + logger.error("Terminating push loop due to push failure: %s", + response.error) + self._output_queue.put(fatal_ex) + self._output_queue.put(_SENTINEL) + return + + # Parse transcription result from push response and surface it + if response.data: + try: + transcription = LiveAudioTranscriptionResponse.from_json( + response.data + ) + text = (transcription.content[0].text + if transcription.content else "") + if text: + self._output_queue.put(transcription) + except Exception as parse_ex: + # Non-fatal: log and continue + logger.debug( + "Could not parse push response as transcription: %s", + parse_ex, + ) + except Exception as ex: + logger.error("Push loop terminated with unexpected error: %s", ex) + self._output_queue.put( + FoundryLocalException("Push loop terminated unexpectedly.", ex) + ) + self._output_queue.put(_SENTINEL) + + # --- Context manager support --- + + def __enter__(self) -> LiveAudioTranscriptionSession: + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + try: + if self._started and not self._stopped: + self.stop() + except Exception as ex: + logger.warning("Error during context manager cleanup: %s", ex) diff --git a/sdk/python/src/openai/live_audio_transcription_types.py b/sdk/python/src/openai/live_audio_transcription_types.py new file mode 100644 index 00000000..11ebbfae --- /dev/null +++ b/sdk/python/src/openai/live_audio_transcription_types.py @@ -0,0 +1,144 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- +"""Data types for live audio transcription streaming sessions.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from typing import List, Optional + + +@dataclass +class TranscriptionContentPart: + """A content part within a live transcription response. + + Mirrors the OpenAI Realtime API ``ContentPart`` structure so that + ``result.content[0].text`` and ``result.content[0].transcript`` + both return the transcribed text. + + Attributes: + text: The transcribed text for this content part. + transcript: Alias for ``text`` (OpenAI Realtime API compatibility). + """ + + text: str = "" + transcript: str = "" + + +@dataclass +class LiveAudioTranscriptionResponse: + """Transcription result for real-time audio streaming sessions. + + Shaped like the OpenAI Realtime API ``ConversationItem`` so that + consumers can access text via ``result.content[0].text`` or + ``result.content[0].transcript``. + + Attributes: + content: List of transcription content parts. + is_final: Whether this is a final or partial (interim) result. + Nemotron models always return ``True``. + start_time: Start time offset of this segment in the audio stream (seconds). + end_time: End time offset of this segment in the audio stream (seconds). + id: Unique identifier for this result (if available). + """ + + content: List[TranscriptionContentPart] = field(default_factory=list) + is_final: bool = True + start_time: Optional[float] = None + end_time: Optional[float] = None + id: Optional[str] = None + + @staticmethod + def from_json(json_str: str) -> LiveAudioTranscriptionResponse: + """Deserialize a native Core JSON response into a ``LiveAudioTranscriptionResponse``. + + The native JSON format uses flat fields (``text``, ``is_final``, + ``start_time``, ``end_time``). This method maps them into the + ``ConversationItem``-shaped structure with a ``content`` list. + + Args: + json_str: Raw JSON string from the native core. + + Returns: + A ``LiveAudioTranscriptionResponse`` instance. + + Raises: + json.JSONDecodeError: If *json_str* is not valid JSON. + Exception: If deserialization fails. + """ + raw = json.loads(json_str) + text = raw.get("text", "") + return LiveAudioTranscriptionResponse( + content=[TranscriptionContentPart(text=text, transcript=text)], + is_final=raw.get("is_final", True), + start_time=raw.get("start_time"), + end_time=raw.get("end_time"), + ) + + +@dataclass +class LiveAudioTranscriptionOptions: + """Audio format settings for a live transcription streaming session. + + Must be configured before calling :meth:`LiveAudioTranscriptionSession.start`. + Settings are frozen (snapshot-copied) once the session starts. + + Attributes: + sample_rate: PCM sample rate in Hz. Default: 16000. + channels: Number of audio channels. Default: 1 (mono). + bits_per_sample: Number of bits per audio sample. Default: 16. + language: Optional BCP-47 language hint (e.g. ``"en"``, ``"zh"``). + push_queue_capacity: Maximum number of audio chunks buffered in the + internal push queue. Default: 100 (~3 s at typical chunk sizes). + """ + + sample_rate: int = 16000 + channels: int = 1 + bits_per_sample: int = 16 + language: Optional[str] = None + push_queue_capacity: int = 100 + + def snapshot(self) -> LiveAudioTranscriptionOptions: + """Return a shallow copy of these settings (freeze pattern).""" + return LiveAudioTranscriptionOptions( + sample_rate=self.sample_rate, + channels=self.channels, + bits_per_sample=self.bits_per_sample, + language=self.language, + push_queue_capacity=self.push_queue_capacity, + ) + + +@dataclass +class CoreErrorResponse: + """Structured error response from the native core. + + Attributes: + code: Error code string (e.g. ``"ASR_SESSION_NOT_FOUND"``). + message: Human-readable error description. + is_transient: Whether the error is transient and may succeed on retry. + """ + + code: str = "" + message: str = "" + is_transient: bool = False + + @staticmethod + def try_parse(error_string: str) -> Optional[CoreErrorResponse]: + """Attempt to parse a native error string as structured JSON. + + Returns ``None`` if the error is not valid JSON or doesn't match + the expected schema, which should be treated as a permanent/unknown error. + """ + try: + raw = json.loads(error_string) + return CoreErrorResponse( + code=raw.get("code", ""), + message=raw.get("message", ""), + is_transient=raw.get("isTransient", False), + ) + except Exception: + return None diff --git a/sdk/python/test/conftest.py b/sdk/python/test/conftest.py index b7e22c97..b3ad2657 100644 --- a/sdk/python/test/conftest.py +++ b/sdk/python/test/conftest.py @@ -14,6 +14,54 @@ import os import logging +import sys + +# --------------------------------------------------------------------------- +# Pre-load ORT/GenAI DLLs from e2e-test-pkgs BEFORE importing the SDK. +# +# The ``_brotli`` C extension (pulled in by httpx → openai → SDK) calls +# ``SetDefaultDllDirectories`` during import, which restricts subsequent +# ``LoadLibraryExW`` calls. Pre-loading here ensures ORT/GenAI are already +# in the process before brotli changes the DLL search behavior. +# --------------------------------------------------------------------------- + +def _get_e2e_test_pkgs_path(): + """Locate the e2e-test-pkgs directory by walking up from this file.""" + from pathlib import Path as _Path + current = _Path(__file__).resolve().parent + while True: + candidate = current / "samples" / "python" / "e2e-test-pkgs" + if candidate.exists(): + return candidate + parent = current.parent + if parent == current: + return None + current = parent + +if sys.platform.startswith("win"): + import ctypes + + def _preload_e2e_dlls(): + pkgs = _get_e2e_test_pkgs_path() + if pkgs is None: + return + + ort_dll = pkgs / "onnxruntime.dll" + genai_dll = pkgs / "onnxruntime-genai.dll" + if not (ort_dll.exists() and genai_dll.exists()): + return + + kernel32 = ctypes.windll.kernel32 + kernel32.SetDllDirectoryW(str(pkgs)) + os.add_dll_directory(str(pkgs)) + os.environ["ORT_LIB_PATH"] = str(ort_dll) + + kernel32.LoadLibraryExW.restype = ctypes.c_void_p + kernel32.LoadLibraryExW.argtypes = [ctypes.c_wchar_p, ctypes.c_void_p, ctypes.c_int] + kernel32.LoadLibraryExW(str(ort_dll), None, 0x00000008) + kernel32.LoadLibraryExW(str(genai_dll), None, 0x00000008) + + _preload_e2e_dlls() import pytest @@ -143,3 +191,134 @@ def core_interop(manager): def model_load_manager(manager): """Return the ModelLoadManager from the session-scoped manager (internal, for component tests).""" return manager._model_load_manager + + +# --------------------------------------------------------------------------- +# E2E fixtures for live audio transcription tests +# --------------------------------------------------------------------------- + +def _preload_and_init_e2e(): + """Pre-load DLLs from e2e-test-pkgs and initialize the SDK for E2E tests. + + Checks whether ORT is already loaded in the process (from the DLL preload + above) and skips the pre-load if so. Then loads Core.dll, sets up FFI + function signatures, and initializes FoundryLocalManager. + """ + import ctypes + from foundry_local_sdk.detail.core_interop import ( + CoreInterop, + RequestBuffer, + ResponseBuffer, + StreamingRequestBuffer, + ) + from foundry_local_sdk.detail.utils import NativeBinaryPaths, _get_ext + + pkgs = _get_e2e_test_pkgs_path() + if pkgs is None: + return None, "e2e-test-pkgs directory not found" + + paths = NativeBinaryPaths( + core=pkgs / "Microsoft.AI.Foundry.Local.Core.dll", + ort=pkgs / "onnxruntime.dll", + genai=pkgs / "onnxruntime-genai.dll", + ) + + if not (paths.core.exists() and paths.ort.exists() and paths.genai.exists()): + return None, "E2E DLLs not found" + + kernel32 = ctypes.windll.kernel32 + + # Check if ORT is already loaded (e.g. from conftest preload) + kernel32.GetModuleHandleW.restype = ctypes.c_void_p + kernel32.GetModuleHandleW.argtypes = [ctypes.c_wchar_p] + ort_already_loaded = bool(kernel32.GetModuleHandleW("onnxruntime.dll")) + + if not ort_already_loaded: + kernel32.SetDllDirectoryW(str(pkgs)) + os.add_dll_directory(str(pkgs)) + os.environ["ORT_LIB_PATH"] = str(paths.ort) + + kernel32.LoadLibraryExW.restype = ctypes.c_void_p + kernel32.LoadLibraryExW.argtypes = [ctypes.c_wchar_p, ctypes.c_void_p, ctypes.c_int] + _LOAD_WITH_ALTERED_SEARCH_PATH = 0x00000008 + + h_ort = kernel32.LoadLibraryExW(str(paths.ort), None, _LOAD_WITH_ALTERED_SEARCH_PATH) + if not h_ort: + return None, f"Failed to load ORT (WinError {kernel32.GetLastError()})" + + h_genai = kernel32.LoadLibraryExW(str(paths.genai), None, _LOAD_WITH_ALTERED_SEARCH_PATH) + if not h_genai: + return None, f"Failed to load GenAI (WinError {kernel32.GetLastError()})" + + # Load Core.dll and set up function signatures + CoreInterop._flcore_library = ctypes.CDLL(str(paths.core)) + lib = CoreInterop._flcore_library + lib.execute_command.argtypes = [ctypes.POINTER(RequestBuffer), ctypes.POINTER(ResponseBuffer)] + lib.execute_command.restype = None + lib.free_response.argtypes = [ctypes.POINTER(ResponseBuffer)] + lib.free_response.restype = None + lib.execute_command_with_callback.argtypes = [ + ctypes.POINTER(RequestBuffer), ctypes.POINTER(ResponseBuffer), + ctypes.c_void_p, ctypes.c_void_p, + ] + lib.execute_command_with_callback.restype = None + lib.execute_command_with_binary.argtypes = [ + ctypes.POINTER(StreamingRequestBuffer), ctypes.POINTER(ResponseBuffer), + ] + lib.execute_command_with_binary.restype = None + CoreInterop._initialized = True + + # Initialize FoundryLocalManager + config = Configuration( + app_name="FoundryLocalE2ETest", + model_cache_dir=str(pkgs / "models"), + additional_settings={"Bootstrap": "false"}, + ) + flcore_lib_name = f"Microsoft.AI.Foundry.Local.Core{_get_ext()}" + config.foundry_local_core_path = str(paths.core_dir / flcore_lib_name) + config.additional_settings["OrtLibraryPath"] = str(paths.ort) + config.additional_settings["OrtGenAILibraryPath"] = str(paths.genai) + + FoundryLocalManager.instance = None + FoundryLocalManager.initialize(config) + + return FoundryLocalManager.instance, None + + +@pytest.fixture(scope="module") +def e2e_manager(): + """Initialize FoundryLocalManager with e2e-test-pkgs DLLs for E2E tests.""" + if not sys.platform.startswith("win"): + pytest.skip("E2E test requires Windows") + + from foundry_local_sdk.detail.core_interop import CoreInterop + + CoreInterop._initialized = False + CoreInterop._flcore_library = None + CoreInterop._ort_library = None + CoreInterop._genai_library = None + FoundryLocalManager.instance = None + + try: + mgr, error = _preload_and_init_e2e() + except Exception as e: + pytest.skip(f"Could not initialize: {e}") + return + + if error: + pytest.skip(error) + return + + yield mgr + + # Teardown + try: + catalog = mgr.catalog + for mv in catalog.get_loaded_models(): + try: + mv.unload() + except Exception: + pass + except Exception: + pass + FoundryLocalManager.instance = None diff --git a/sdk/python/test/openai/test_live_audio_transcription.py b/sdk/python/test/openai/test_live_audio_transcription.py new file mode 100644 index 00000000..92284d29 --- /dev/null +++ b/sdk/python/test/openai/test_live_audio_transcription.py @@ -0,0 +1,380 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- +"""Unit tests for live audio transcription — mirrors C# LiveAudioTranscriptionTests.cs. + +These tests cover: +- LiveAudioTranscriptionResponse.from_json deserialization +- LiveAudioTranscriptionOptions defaults and snapshot +- CoreErrorResponse.try_parse +- Session state guards (append/get_transcription_stream before start) +""" + +from __future__ import annotations + +import json +import threading +from unittest.mock import MagicMock + +import pytest + +from foundry_local_sdk.openai.live_audio_transcription_types import ( + CoreErrorResponse, + LiveAudioTranscriptionOptions, + LiveAudioTranscriptionResponse, + TranscriptionContentPart, +) +from foundry_local_sdk.openai.live_audio_transcription_client import ( + LiveAudioTranscriptionSession, +) +from foundry_local_sdk.detail.core_interop import CoreInterop, Response +from foundry_local_sdk.exception import FoundryLocalException + + +# --------------------------------------------------------------------------- +# LiveAudioTranscriptionResponse.from_json tests +# --------------------------------------------------------------------------- + + +class TestFromJson: + """LiveAudioTranscriptionResponse.from_json deserialization tests.""" + + def test_parses_text_and_is_final(self): + json_str = '{"is_final":true,"text":"hello world","start_time":null,"end_time":null}' + + result = LiveAudioTranscriptionResponse.from_json(json_str) + + assert result.content is not None + assert len(result.content) == 1 + assert result.content[0].text == "hello world" + assert result.content[0].transcript == "hello world" + assert result.is_final is True + + def test_maps_timing_fields(self): + json_str = '{"is_final":false,"text":"partial","start_time":1.5,"end_time":3.0}' + + result = LiveAudioTranscriptionResponse.from_json(json_str) + + assert result.content[0].text == "partial" + assert result.is_final is False + assert result.start_time == 1.5 + assert result.end_time == 3.0 + + def test_empty_text_parses_successfully(self): + json_str = '{"is_final":true,"text":"","start_time":null,"end_time":null}' + + result = LiveAudioTranscriptionResponse.from_json(json_str) + + assert result.content[0].text == "" + assert result.is_final is True + + def test_only_start_time_sets_start_time(self): + json_str = '{"is_final":true,"text":"word","start_time":2.0,"end_time":null}' + + result = LiveAudioTranscriptionResponse.from_json(json_str) + + assert result.start_time == 2.0 + assert result.end_time is None + assert result.content[0].text == "word" + + def test_invalid_json_throws(self): + with pytest.raises(Exception): + LiveAudioTranscriptionResponse.from_json("not valid json") + + def test_content_has_text_and_transcript(self): + json_str = '{"is_final":true,"text":"test","start_time":null,"end_time":null}' + + result = LiveAudioTranscriptionResponse.from_json(json_str) + + # Both Text and Transcript should have the same value + assert result.content[0].text == "test" + assert result.content[0].transcript == "test" + + def test_missing_fields_use_defaults(self): + json_str = '{}' + + result = LiveAudioTranscriptionResponse.from_json(json_str) + + assert result.content[0].text == "" + assert result.is_final is True + assert result.start_time is None + assert result.end_time is None + + +# --------------------------------------------------------------------------- +# LiveAudioTranscriptionOptions tests +# --------------------------------------------------------------------------- + + +class TestOptions: + """LiveAudioTranscriptionOptions tests.""" + + def test_default_values(self): + options = LiveAudioTranscriptionOptions() + + assert options.sample_rate == 16000 + assert options.channels == 1 + assert options.bits_per_sample == 16 + assert options.language is None + assert options.push_queue_capacity == 100 + + def test_snapshot_creates_independent_copy(self): + options = LiveAudioTranscriptionOptions(language="en") + snapshot = options.snapshot() + + # Modify original — snapshot should be unaffected + options.language = "zh" + options.sample_rate = 44100 + + assert snapshot.language == "en" + assert snapshot.sample_rate == 16000 + + +# --------------------------------------------------------------------------- +# CoreErrorResponse tests +# --------------------------------------------------------------------------- + + +class TestCoreErrorResponse: + """CoreErrorResponse.try_parse tests.""" + + def test_try_parse_valid_json(self): + json_str = '{"code":"ASR_SESSION_NOT_FOUND","message":"Session not found","isTransient":false}' + + error = CoreErrorResponse.try_parse(json_str) + + assert error is not None + assert error.code == "ASR_SESSION_NOT_FOUND" + assert error.message == "Session not found" + assert error.is_transient is False + + def test_try_parse_invalid_json_returns_none(self): + result = CoreErrorResponse.try_parse("not json") + assert result is None + + def test_try_parse_transient_error(self): + json_str = '{"code":"BUSY","message":"Model busy","isTransient":true}' + + error = CoreErrorResponse.try_parse(json_str) + + assert error is not None + assert error.is_transient is True + + +# --------------------------------------------------------------------------- +# Session state guard tests +# --------------------------------------------------------------------------- + + +class TestSessionStateGuards: + """Verify that append/get_transcription_stream raise before start.""" + + def _make_session(self) -> LiveAudioTranscriptionSession: + """Create a session with a mock CoreInterop (no native DLLs needed).""" + mock_interop = MagicMock(spec=CoreInterop) + return LiveAudioTranscriptionSession("test-model", mock_interop) + + def test_append_before_start_throws(self): + session = self._make_session() + data = b'\x00' * 100 + + with pytest.raises(FoundryLocalException): + session.append(data) + + def test_get_transcription_stream_before_start_throws(self): + session = self._make_session() + + with pytest.raises(FoundryLocalException): + # Attempt to iterate — should raise immediately + next(iter(session.get_transcription_stream())) + + def test_start_sets_started_flag(self): + session = self._make_session() + session._core_interop.start_audio_stream.return_value = Response( + data="handle-123", error=None + ) + session._core_interop.stop_audio_stream.return_value = Response( + data=None, error=None + ) + + session.start() + + assert session._started is True + assert session._session_handle == "handle-123" + + # Cleanup via public API + session.stop() + + def test_double_start_throws(self): + session = self._make_session() + session._core_interop.start_audio_stream.return_value = Response( + data="handle-123", error=None + ) + session._core_interop.stop_audio_stream.return_value = Response( + data=None, error=None + ) + + session.start() + + with pytest.raises(FoundryLocalException, match="already started"): + session.start() + + # Cleanup via public API + session.stop() + + def test_start_error_raises(self): + session = self._make_session() + session._core_interop.start_audio_stream.return_value = Response( + data=None, error="init failed" + ) + + with pytest.raises(FoundryLocalException, match="Error starting"): + session.start() + + def test_stop_without_start_is_noop(self): + session = self._make_session() + # Should not raise + session.stop() + + +# --------------------------------------------------------------------------- +# Session streaming integration test (mocked native core) +# --------------------------------------------------------------------------- + + +class TestSessionStreaming: + """Verify the full push → output pipeline with a mocked native core.""" + + def test_push_and_receive_transcription(self): + """Simulate pushing audio and receiving transcription results.""" + mock_interop = MagicMock(spec=CoreInterop) + + # start_audio_stream returns a handle + mock_interop.start_audio_stream.return_value = Response( + data="session-42", error=None + ) + + # push_audio_data returns a transcription result + push_response = json.dumps({ + "is_final": True, + "text": "hello world", + "start_time": 0.0, + "end_time": 1.5, + }) + mock_interop.push_audio_data.return_value = Response( + data=push_response, error=None + ) + + # stop_audio_stream returns empty (no final result) + mock_interop.stop_audio_stream.return_value = Response( + data=None, error=None + ) + + session = LiveAudioTranscriptionSession("test-model", mock_interop) + session.start() + + # Start reading results in background (must start before stop) + results = [] + + def read(): + for r in session.get_transcription_stream(): + results.append(r) + + reader = threading.Thread(target=read, daemon=True) + reader.start() + + # Push a chunk of audio + session.append(b'\x00' * 3200) + + # Stop to flush and complete + session.stop() + reader.join(timeout=5) + + assert len(results) == 1 + assert results[0].content[0].text == "hello world" + assert results[0].is_final is True + assert results[0].start_time == 0.0 + assert results[0].end_time == 1.5 + + def test_push_error_surfaces_as_exception(self): + """Verify that a native push error terminates the stream with an exception.""" + mock_interop = MagicMock(spec=CoreInterop) + + mock_interop.start_audio_stream.return_value = Response( + data="session-42", error=None + ) + mock_interop.push_audio_data.return_value = Response( + data=None, error='{"code":"ASR_ERROR","message":"decode failed","isTransient":false}' + ) + mock_interop.stop_audio_stream.return_value = Response( + data=None, error=None + ) + + session = LiveAudioTranscriptionSession("test-model", mock_interop) + session.start() + + session.append(b'\x00' * 3200) + + # Give push loop time to process + import time + time.sleep(0.5) + + with pytest.raises(FoundryLocalException, match="Push failed"): + for _ in session.get_transcription_stream(): + pass + + def test_context_manager_calls_stop(self): + """Verify context manager calls stop on exit.""" + mock_interop = MagicMock(spec=CoreInterop) + mock_interop.start_audio_stream.return_value = Response( + data="session-42", error=None + ) + mock_interop.push_audio_data.return_value = Response( + data=None, error=None + ) + mock_interop.stop_audio_stream.return_value = Response( + data=None, error=None + ) + + with LiveAudioTranscriptionSession("test-model", mock_interop) as session: + session.start() + + # stop_audio_stream should have been called via context manager + mock_interop.stop_audio_stream.assert_called_once() + + def test_stop_with_final_result(self): + """Verify that stop() parses and surfaces a final transcription result.""" + mock_interop = MagicMock(spec=CoreInterop) + mock_interop.start_audio_stream.return_value = Response( + data="session-42", error=None + ) + final_json = json.dumps({ + "is_final": True, + "text": "final words", + "start_time": 5.0, + "end_time": 6.0, + }) + mock_interop.stop_audio_stream.return_value = Response( + data=final_json, error=None + ) + + session = LiveAudioTranscriptionSession("test-model", mock_interop) + session.start() + + # Start reading results in background (must start before stop) + results = [] + + def read(): + for r in session.get_transcription_stream(): + results.append(r) + + reader = threading.Thread(target=read, daemon=True) + reader.start() + + # No audio pushed — just stop to get final result + session.stop() + reader.join(timeout=5) + + assert len(results) == 1 + assert results[0].content[0].text == "final words" diff --git a/sdk/python/test/openai/test_live_audio_transcription_e2e.py b/sdk/python/test/openai/test_live_audio_transcription_e2e.py new file mode 100644 index 00000000..d15cbeae --- /dev/null +++ b/sdk/python/test/openai/test_live_audio_transcription_e2e.py @@ -0,0 +1,157 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- +"""E2E test for live audio transcription using the e2e-test-pkgs assets. + +This test validates the full pipeline: + Audio input → streaming → transcription output + +Architecture: + Python SDK → Core.dll → onnxruntime.dll / onnxruntime-genai.dll + (Core.dll loads ORT/GenAI internally via P/Invoke) + +It uses synthetic PCM audio (440 Hz sine wave) to test the session lifecycle +without requiring a microphone. + +Prerequisites: + - e2e-test-pkgs must be present at samples/python/e2e-test-pkgs + - The nemotron model must be available in e2e-test-pkgs/models/ + - Native DLLs (Core.dll, onnxruntime.dll, onnxruntime-genai.dll) must be present + +DLL preloading and the ``e2e_manager`` fixture are provided by ``test/conftest.py``. +""" + +from __future__ import annotations + +import math +import struct +import sys +import threading + +import pytest + +from foundry_local_sdk.openai.live_audio_transcription_types import ( + LiveAudioTranscriptionResponse, +) + +# Import shared helper from conftest +from ..conftest import _get_e2e_test_pkgs_path + + +def _has_e2e_assets() -> bool: + """Check if all required E2E assets are present.""" + pkgs = _get_e2e_test_pkgs_path() + if pkgs is None: + return False + required = [ + pkgs / "Microsoft.AI.Foundry.Local.Core.dll", + pkgs / "onnxruntime.dll", + pkgs / "onnxruntime-genai.dll", + pkgs / "models" / "nemotron", + ] + return all(p.exists() for p in required) + + +def _generate_sine_wave_pcm( + sample_rate: int = 16000, + duration_seconds: float = 2.0, + frequency: float = 440.0, + amplitude: float = 0.5, +) -> bytes: + """Generate synthetic PCM audio (16-bit mono sine wave).""" + total_samples = int(sample_rate * duration_seconds) + pcm_bytes = bytearray(total_samples * 2) # 16-bit = 2 bytes per sample + + for i in range(total_samples): + t = i / sample_rate + sample = int(32767 * amplitude * math.sin(2 * math.pi * frequency * t)) + struct.pack_into(" 0 + assert result.content[0].text is not None + # text and transcript should match + assert result.content[0].transcript == result.content[0].text + + finally: + model.unload()