|
| 1 | +# mypy: ignore-errors |
| 2 | +import queue |
| 3 | +import asyncio |
| 4 | +from typing import Any, Union, Callable, AsyncGenerator, cast |
| 5 | + |
| 6 | +import numpy as np |
| 7 | +import sounddevice as sd # type: ignore |
| 8 | +import numpy.typing as npt |
| 9 | + |
| 10 | +from .. import _legacy_response |
| 11 | +from .._response import StreamedBinaryAPIResponse, AsyncStreamedBinaryAPIResponse |
| 12 | + |
| 13 | +SAMPLE_RATE = 24000 |
| 14 | + |
| 15 | + |
| 16 | +class LocalAudioPlayer: |
| 17 | + def __init__( |
| 18 | + self, |
| 19 | + should_stop: Union[Callable[[], bool], None] = None, |
| 20 | + ): |
| 21 | + self.channels = 1 |
| 22 | + self.dtype = np.float32 |
| 23 | + self.should_stop = should_stop |
| 24 | + |
| 25 | + async def _tts_response_to_buffer( |
| 26 | + self, |
| 27 | + response: Union[ |
| 28 | + _legacy_response.HttpxBinaryResponseContent, |
| 29 | + AsyncStreamedBinaryAPIResponse, |
| 30 | + StreamedBinaryAPIResponse, |
| 31 | + ], |
| 32 | + ) -> npt.NDArray[np.float32]: |
| 33 | + chunks: list[bytes] = [] |
| 34 | + if isinstance(response, _legacy_response.HttpxBinaryResponseContent) or isinstance( |
| 35 | + response, StreamedBinaryAPIResponse |
| 36 | + ): |
| 37 | + for chunk in response.iter_bytes(chunk_size=1024): |
| 38 | + if chunk: |
| 39 | + chunks.append(chunk) |
| 40 | + else: |
| 41 | + async for chunk in response.iter_bytes(chunk_size=1024): |
| 42 | + if chunk: |
| 43 | + chunks.append(chunk) |
| 44 | + |
| 45 | + audio_bytes = b"".join(chunks) |
| 46 | + audio_np = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32767.0 |
| 47 | + audio_np = audio_np.reshape(-1, 1) |
| 48 | + return audio_np |
| 49 | + |
| 50 | + async def play( |
| 51 | + self, |
| 52 | + input: Union[ |
| 53 | + npt.NDArray[np.int16], |
| 54 | + npt.NDArray[np.float32], |
| 55 | + _legacy_response.HttpxBinaryResponseContent, |
| 56 | + AsyncStreamedBinaryAPIResponse, |
| 57 | + StreamedBinaryAPIResponse, |
| 58 | + ], |
| 59 | + ) -> None: |
| 60 | + audio_content: npt.NDArray[np.float32] |
| 61 | + if isinstance(input, np.ndarray): |
| 62 | + if input.dtype == np.int16 and self.dtype == np.float32: |
| 63 | + audio_content = (input.astype(np.float32) / 32767.0).reshape(-1, self.channels) |
| 64 | + elif input.dtype == np.float32: |
| 65 | + audio_content = cast(npt.NDArray[np.float32], input) |
| 66 | + else: |
| 67 | + raise ValueError(f"Unsupported dtype: {input.dtype}") |
| 68 | + else: |
| 69 | + audio_content = await self._tts_response_to_buffer(input) |
| 70 | + |
| 71 | + loop = asyncio.get_event_loop() |
| 72 | + event = asyncio.Event() |
| 73 | + idx = 0 |
| 74 | + |
| 75 | + def callback( |
| 76 | + outdata: npt.NDArray[np.float32], |
| 77 | + frame_count: int, |
| 78 | + _time_info: Any, |
| 79 | + _status: Any, |
| 80 | + ): |
| 81 | + nonlocal idx |
| 82 | + |
| 83 | + remainder = len(audio_content) - idx |
| 84 | + if remainder == 0 or (callable(self.should_stop) and self.should_stop()): |
| 85 | + loop.call_soon_threadsafe(event.set) |
| 86 | + raise sd.CallbackStop |
| 87 | + valid_frames = frame_count if remainder >= frame_count else remainder |
| 88 | + outdata[:valid_frames] = audio_content[idx : idx + valid_frames] |
| 89 | + outdata[valid_frames:] = 0 |
| 90 | + idx += valid_frames |
| 91 | + |
| 92 | + stream = sd.OutputStream( |
| 93 | + samplerate=SAMPLE_RATE, |
| 94 | + callback=callback, |
| 95 | + dtype=audio_content.dtype, |
| 96 | + channels=audio_content.shape[1], |
| 97 | + ) |
| 98 | + with stream: |
| 99 | + await event.wait() |
| 100 | + |
| 101 | + async def play_stream( |
| 102 | + self, |
| 103 | + buffer_stream: AsyncGenerator[Union[npt.NDArray[np.float32], npt.NDArray[np.int16], None], None], |
| 104 | + ) -> None: |
| 105 | + loop = asyncio.get_event_loop() |
| 106 | + event = asyncio.Event() |
| 107 | + buffer_queue: queue.Queue[Union[npt.NDArray[np.float32], npt.NDArray[np.int16], None]] = queue.Queue(maxsize=50) |
| 108 | + |
| 109 | + async def buffer_producer(): |
| 110 | + async for buffer in buffer_stream: |
| 111 | + if buffer is None: |
| 112 | + break |
| 113 | + await loop.run_in_executor(None, buffer_queue.put, buffer) |
| 114 | + await loop.run_in_executor(None, buffer_queue.put, None) # Signal completion |
| 115 | + |
| 116 | + def callback( |
| 117 | + outdata: npt.NDArray[np.float32], |
| 118 | + frame_count: int, |
| 119 | + _time_info: Any, |
| 120 | + _status: Any, |
| 121 | + ): |
| 122 | + nonlocal current_buffer, buffer_pos |
| 123 | + |
| 124 | + frames_written = 0 |
| 125 | + while frames_written < frame_count: |
| 126 | + if current_buffer is None or buffer_pos >= len(current_buffer): |
| 127 | + try: |
| 128 | + current_buffer = buffer_queue.get(timeout=0.1) |
| 129 | + if current_buffer is None: |
| 130 | + loop.call_soon_threadsafe(event.set) |
| 131 | + raise sd.CallbackStop |
| 132 | + buffer_pos = 0 |
| 133 | + |
| 134 | + if current_buffer.dtype == np.int16 and self.dtype == np.float32: |
| 135 | + current_buffer = (current_buffer.astype(np.float32) / 32767.0).reshape(-1, self.channels) |
| 136 | + |
| 137 | + except queue.Empty: |
| 138 | + outdata[frames_written:] = 0 |
| 139 | + return |
| 140 | + |
| 141 | + remaining_frames = len(current_buffer) - buffer_pos |
| 142 | + frames_to_write = min(frame_count - frames_written, remaining_frames) |
| 143 | + outdata[frames_written : frames_written + frames_to_write] = current_buffer[ |
| 144 | + buffer_pos : buffer_pos + frames_to_write |
| 145 | + ] |
| 146 | + buffer_pos += frames_to_write |
| 147 | + frames_written += frames_to_write |
| 148 | + |
| 149 | + current_buffer = None |
| 150 | + buffer_pos = 0 |
| 151 | + |
| 152 | + producer_task = asyncio.create_task(buffer_producer()) |
| 153 | + |
| 154 | + with sd.OutputStream( |
| 155 | + samplerate=SAMPLE_RATE, |
| 156 | + channels=self.channels, |
| 157 | + dtype=self.dtype, |
| 158 | + callback=callback, |
| 159 | + ): |
| 160 | + await event.wait() |
| 161 | + |
| 162 | + await producer_task |
0 commit comments