Source code for kani.ext.realtime.audio

import asyncio
import base64
import io
import queue
import threading
import time
import warnings
from typing import AsyncIterable

from pydub import AudioSegment


# ===== output =====
class AudioManagerBase:
    def play(self, segment: AudioSegment):
        raise NotImplementedError


class PyAudioAudioManager(AudioManagerBase):
    """Audio manager using a PyAudio stream"""

    def __init__(self):
        self.q = queue.Queue()
        self.thread = None
        self.stream = None

    def play(self, segment: AudioSegment):
        # push the segment onto the queue
        self.q.put(segment)
        # open the stream
        if self.stream is None:
            p = pyaudio.PyAudio()
            self.stream = p.open(
                format=p.get_format_from_width(segment.sample_width),
                channels=segment.channels,
                rate=segment.frame_rate,
                output=True,
            )
        # start the thread to handle the queue
        if self.thread is None:
            self.thread = threading.Thread(target=self._thread_entrypoint, daemon=True)
            self.thread.start()

    def _thread_entrypoint(self):
        from pydub.utils import make_chunks

        # hack: 100ms sleep before reading from q to avoid startup crunchiness
        time.sleep(0.1)
        while True:
            segment = self.q.get()
            for chunk in make_chunks(segment, 500):
                self.stream.write(chunk.raw_data)


# class FFMPEGAudioManager(AudioManagerBase):
#     """Audio manager using a ffplay process with a byte pipe"""
#
#     def __init__(self):
#         self._lock = threading.Lock()
#         self.ffmpeg = None
#         self.ffplay = None
#
#     def play(self, segment: AudioSegment):
#         # start the ffplay process to consume from our byte pipe
#         # TODO fixme - seems like ffplay continues to seek if no data from pipe, won't play new data until it
#         # "catches up" with the seek. Maybe piping to ffmpeg and having ffmpeg merge with silence can help?
#         with self._lock:
#             if self.ffmpeg is None:
#                 self.ffmpeg = subprocess.Popen(
#                     [
#                         "ffmpeg",
#                         "-use_wallclock_as_timestamps",
#                         "true",
#                         "-f",
#                         "s16le",
#                         "-ar",
#                         "24000",
#                         "-ac",
#                         "1",
#                         # "-re",
#                         "-i",
#                         "pipe:0",
#                         # "-af",
#                         # "aresample=async=1",
#                         "-f",
#                         "wav",
#                         "pipe:",
#                     ],
#                     stdin=subprocess.PIPE,
#                     stdout=subprocess.PIPE,
#                     stderr=subprocess.DEVNULL,
#                 )
#                 self.ffplay = subprocess.Popen(
#                     ["ffplay", "-nodisp", "-i", "-"],
#                     stdin=self.ffmpeg.stdout,
#                     stdout=subprocess.DEVNULL,
#                     stderr=subprocess.DEVNULL,
#                 )
#                 # self.ffplay = subprocess.Popen(
#                 #     ["ffplay", "-nodisp", "-f", "s16le", "-ar", "24000", "-acodec", "pcm_s16le", "-i", "-"],
#                 #     stdin=self.ffmpeg.stdout,
#                 #     stdout=subprocess.DEVNULL,
#                 #     stderr=subprocess.DEVNULL,
#                 # )
#         # then send the bytes over the pipe
#         self.ffmpeg.stdin.write(segment.raw_data)
#         self.ffmpeg.stdin.flush()


class PyDubAudioManager(AudioManagerBase):
    """Fallback audio manager using pydub's default play if we don't have ffplay or pyaudio"""

    def __init__(self):
        self.pending_segment: AudioSegment | None = None
        self.thread = None
        self._has_pending = threading.Event()
        self._lock = threading.Lock()

    def play(self, segment: AudioSegment):
        # start the thread to play in the bg
        if self.thread is None:
            self.thread = threading.Thread(target=self._thread_entrypoint, daemon=True)
            self.thread.start()
        # then do bookkeeping
        with self._lock:
            if self.pending_segment is not None:
                self.pending_segment += segment
            else:
                self.pending_segment = segment
                self._has_pending.set()

    def _thread_entrypoint(self):
        from pydub.playback import play

        while True:
            self._has_pending.wait()
            with self._lock:
                segment = self.pending_segment
                self.pending_segment = None
                self._has_pending.clear()
            play(segment)


try:
    import pyaudio

    _global_audio_manager = PyAudioAudioManager()
    _has_pyaudio = True
except ImportError:
    _has_pyaudio = False
    # # check if ffplay is available
    # _ffplay_available = (
    #     subprocess.run(["ffplay", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0
    # )
    # if _ffplay_available:
    #     _global_audio_manager = FFMPEGAudioManager()
    # else:
    #     warnings.warn(
    #         "You do not have PyAudio or ffmpeg installed. Playback from utilities like chat_in_terminal_audio may have"
    #         " choppy output. We recommend installing PyAudio or ffmpeg for best performance, but it is unnecessary if"
    #         " you are not playing audio on this machine."
    #     )
    warnings.warn(
        "You do not have PyAudio installed. Playback from utilities like chat_in_terminal_audio may have choppy"
        " output. We recommend installing PyAudio for best audio performance, but it is unnecessary if you are not"
        ' playing audio on this machine. You can also use `pip install "kani-ext-realtime[all]"`.'
    )
    _global_audio_manager = PyDubAudioManager()


[docs] async def play_audio(audio_bytes: bytes): """ Play the given audio at the next available opportunity, using a global audio queue. This is a callback that should be passed to :meth:`~kani.Kani.full_round_stream` or :meth:`~kani.Kani.chat_round_stream`, or :meth:`.OpenAIRealtimeKani.full_duplex` as the ``audio_callback`` parameter. """ # we assume we're running in "raw 16 bit PCM audio at 24kHz, 1 channel, little-endian" mode # if we're in G.711 this will probably break audio = AudioSegment(data=audio_bytes, sample_width=2, channels=1, frame_rate=24000) asyncio.get_event_loop().run_in_executor(None, _global_audio_manager.play, audio)
# ===== input ===== def audio_to_b64(audio_bytes: bytes) -> str: """Encode an arbitrarily-encoded audio bytestring into the correct format.""" # Load the audio file from the byte stream audio = AudioSegment.from_file(io.BytesIO(audio_bytes)) # Resample to 24kHz mono pcm16 pcm_audio = audio.set_frame_rate(24000).set_channels(1).set_sample_width(2).raw_data # Encode to base64 string pcm_base64 = base64.b64encode(pcm_audio).decode() return pcm_base64 if _has_pyaudio: class PyAudioInputManager: """Audio manager using a PyAudio stream. This class should NOT be constructed manually.""" def __init__(self, mic_id: int | None): self.q = asyncio.Queue() self.loop = asyncio.get_event_loop() # init pyaudio, create a recording stream p = pyaudio.PyAudio() self.stream = p.open( format=p.get_format_from_width(2), channels=1, rate=24000, frames_per_buffer=1200, input=True, input_device_index=mic_id, ) # launch thread to start streaming from it self.thread = threading.Thread(target=self._thread_entrypoint, daemon=True) self.thread.start() def _thread_entrypoint(self): while True: n_available = self.stream.get_read_available() if not n_available: time.sleep(0.05) continue frame = self.stream.read(n_available, exception_on_overflow=False) fut = asyncio.run_coroutine_threadsafe(self.q.put(frame), self.loop) fut.result() def __aiter__(self): return self async def __anext__(self): return await self.q.get()
[docs] def get_audio_stream(mic_id: int | None) -> AsyncIterable[bytes]: """Return an audio stream manager that yields audio frames from the given mic.""" return PyAudioInputManager(mic_id)
[docs] def list_mics(): """Print a list of all microphones connected to this device.""" p = pyaudio.PyAudio() info = p.get_host_api_info_by_index(0) n_devices = info.get("deviceCount") for i in range(0, n_devices): if (p.get_device_info_by_host_api_device_index(0, i).get("maxInputChannels")) > 0: print(f"ID: {i} -- {p.get_device_info_by_host_api_device_index(0, i).get('name')}")
else: def _missing(*_, **__): raise ImportError( "You must install PyAudio to record from the mic. You can install this" ' with `pip install "kani-ext-realtime[all]"`.' ) get_audio_stream = _missing list_mics = _missing