Skip to content

Microphone

Runnable example

python/examples/microphone_send.py — generates a 440 Hz / 3 s sine wave and pushes it into the virtual mic.

resoio.microphone.MicrophoneClient

MicrophoneClient(socket_path: str | None = None)

Bases: _BaseClient[MicrophoneStub]

Async client for the Resonite IO Microphone service over a UDS.

Use as an async context manager so the gRPC channel is closed deterministically. The wire format is fixed at 48 kHz / Mono / float32 LE (:data:SAMPLE_RATE, :data:CHANNELS, :data:DTYPE).

Source code in src/resoio/_client.py
def __init__(self, socket_path: str | None = None) -> None:
    # Defer resolution to __aenter__ so env vars patched between
    # construction and connection are honoured, and so resolution
    # errors surface at the connect site.
    self._explicit_path: str | None = socket_path
    self._channel: Channel | None = None
    self._stub: TStub | None = None
    self._resolved_path: str | None = None

stream async

stream(chunks: AsyncIterable[NDArray[float32]]) -> MicrophoneStreamSummary

Stream microphone chunks to the server and await the summary.

chunks is consumed lazily. Callers hand in plain 1-D float32 samples arrays (no dtype coercion happens here — tobytes blindly serialises whatever buffer it gets). The client owns frame_id (auto-incremented from 0) and stamps :func:time.time_ns on every chunk. Raises :class:RuntimeError if called outside async with.

Source code in src/resoio/microphone.py
async def stream(
    self, chunks: AsyncIterable[NDArray[np.float32]]
) -> MicrophoneStreamSummary:
    """Stream microphone chunks to the server and await the summary.

    ``chunks`` is consumed lazily. Callers hand in plain 1-D float32
    ``samples`` arrays (no dtype coercion happens here — ``tobytes``
    blindly serialises whatever buffer it gets). The client owns
    ``frame_id`` (auto-incremented from 0) and stamps
    :func:`time.time_ns` on every chunk. Raises :class:`RuntimeError`
    if called outside ``async with``.
    """
    stub = self._require_stub()

    async def _wire() -> AsyncIterator[MicrophoneAudioFrame]:
        frame_id = 0
        async for samples in chunks:
            yield MicrophoneAudioFrame(
                frame_id=frame_id,
                unix_nanos=time.time_ns(),
                sample_count=int(samples.shape[0]),
                samples=samples.tobytes(),
            )
            frame_id += 1

    summary: _WireSummary = await stub.stream_audio(_wire())
    return MicrophoneStreamSummary(
        received_frames=summary.received_frames,
        received_samples=summary.received_samples,
        dropped_frames=summary.dropped_frames,
        unix_nanos=summary.unix_nanos,
    )

resoio.microphone.MicrophoneStreamSummary dataclass

MicrophoneStreamSummary(
    received_frames: int, received_samples: int, dropped_frames: int, unix_nanos: int
)

Server-side summary returned when a StreamAudio stream ends.

dropped_frames counts frames the bridge discarded on ring buffer overflow (client outpacing engine consumption); a healthy run is 0.

resoio.microphone.paced async

paced(
    chunks: AsyncIterable[NDArray[float32]], sample_rate: int = SAMPLE_RATE
) -> AsyncIterator[NDArray[np.float32]]

Yield chunks at wall-clock pace for replaying a pre-loaded buffer.

Opt-in helper for sources that hand over their whole payload at once (e.g. a WAV file): yields each ndarray, then sleeps for its natural duration before pulling the next one, so the downstream Bridge ring buffer never overflows on long inputs.

Do not wrap real-time producers (live mic, TTS streams) — they pace themselves; the extra sleep would compound into latency.

Source code in src/resoio/microphone.py
async def paced(
    chunks: AsyncIterable[NDArray[np.float32]],
    sample_rate: int = SAMPLE_RATE,
) -> AsyncIterator[NDArray[np.float32]]:
    """Yield chunks at wall-clock pace for replaying a pre-loaded buffer.

    Opt-in helper for sources that hand over their whole payload at once
    (e.g. a WAV file): yields each ndarray, then sleeps for its natural
    duration before pulling the next one, so the downstream Bridge ring
    buffer never overflows on long inputs.

    Do **not** wrap real-time producers (live mic, TTS streams) — they
    pace themselves; the extra sleep would compound into latency.
    """
    async for samples in chunks:
        yield samples
        n_samples = int(samples.shape[0])
        if n_samples > 0:
            await asyncio.sleep(n_samples / sample_rate)