Skip to content

Locomotion

Runnable example

python/examples/locomotion_drive.py — a 6 s scripted scenario (forward → strafe → yaw → jump → neutral) closed with reset().

resoio.locomotion.LocomotionClient

LocomotionClient(socket_path: str | None = None)

Bases: _BaseClient[LocomotionStub]

Async client for the Resonite IO Locomotion service over a UDS.

Use as an async context manager so the gRPC channel — and the Drive stream opened lazily by :meth:send — are closed deterministically. Socket resolution mirrors :class:resoio.ConnectionClient.

Source code in src/resoio/locomotion.py
def __init__(self, socket_path: str | None = None) -> None:
    super().__init__(socket_path)
    self._queue: asyncio.Queue[LocomotionCommand | None] = asyncio.Queue()
    self._drive_task: asyncio.Task[DriveSummary] | None = None
    self._drive_summary: DriveSummary | None = None

drive_summary property

drive_summary: DriveSummary | None

The :class:DriveSummary resolved after the context exits.

None until the async context manager has exited (or if :meth:send was never called, so no Drive stream was opened).

send async

send(
    *,
    move_forward: float | None = None,
    move_right: float | None = None,
    move_up: float | None = None,
    yaw_rate: float | None = None,
    pitch_rate: float | None = None,
    jump: bool | None = None,
    velocity: float | None = None,
    crouch: float | None = None,
) -> None

Send a partial locomotion update to the bridge.

Only the kwargs that are not None are set on the wire; omitted fields keep their previous value on the (stateful) bridge. The first call lazily opens the Drive client-streaming RPC and starts the background drain task; subsequent calls enqueue onto it. unix_nanos is stamped here at send time; callers should not set it. gRPC failures surface (from the awaited stream) as :class:grpclib.exceptions.GRPCError when the context exits.

Source code in src/resoio/locomotion.py
async def send(
    self,
    *,
    move_forward: float | None = None,
    move_right: float | None = None,
    move_up: float | None = None,
    yaw_rate: float | None = None,
    pitch_rate: float | None = None,
    jump: bool | None = None,
    velocity: float | None = None,
    crouch: float | None = None,
) -> None:
    """Send a partial locomotion update to the bridge.

    Only the kwargs that are not ``None`` are set on the wire; omitted
    fields keep their previous value on the (stateful) bridge. The
    first call lazily opens the ``Drive`` client-streaming RPC and
    starts the background drain task; subsequent calls enqueue onto it.
    ``unix_nanos`` is stamped here at send time; callers should not set
    it. gRPC failures surface (from the awaited stream) as
    :class:`grpclib.exceptions.GRPCError` when the context exits.
    """
    command = LocomotionCommand(
        move_forward=move_forward,
        move_right=move_right,
        move_up=move_up,
        yaw_rate=yaw_rate,
        pitch_rate=pitch_rate,
        jump=jump,
        velocity=velocity,
        crouch=crouch,
        unix_nanos=time.time_ns(),
    )
    if self._drive_task is None:
        stub = self._require_stub()
        self._drive_task = asyncio.create_task(self._run_drive(stub))
    await self._queue.put(command)

reset async

reset(
    *, move: bool = False, look: bool = False, crouch: bool = False, jump: bool = False
) -> ResetSummary

Reset selected locomotion fields on the bridge.

Each flag clears the corresponding group: movemove_forward / move_right / move_up / velocity back to (0, 0, 0, 1.0), lookyaw_rate / pitch_rate to 0, crouch0, jump → drop an un-consumed pulse. Calling with all defaults (every flag False) means "reset everything"; the service canonicalises that to all-true because proto3 cannot distinguish "unset" from "explicit false". The returned :class:ResetSummary echoes that canonicalised request. Safe to call concurrently with an in-flight :meth:send stream. unix_nanos is stamped here at send time. gRPC failures surface as :class:grpclib.exceptions.GRPCError.

Source code in src/resoio/locomotion.py
async def reset(
    self,
    *,
    move: bool = False,
    look: bool = False,
    crouch: bool = False,
    jump: bool = False,
) -> ResetSummary:
    """Reset selected locomotion fields on the bridge.

    Each flag clears the corresponding group: ``move`` →
    ``move_forward`` / ``move_right`` / ``move_up`` / ``velocity``
    back to ``(0, 0, 0, 1.0)``, ``look`` → ``yaw_rate`` /
    ``pitch_rate`` to ``0``, ``crouch``
    → ``0``, ``jump`` → drop an un-consumed pulse. Calling with all
    defaults (every flag ``False``) means "reset everything"; the
    service canonicalises that to all-true because proto3 cannot
    distinguish "unset" from "explicit false". The returned
    :class:`ResetSummary` echoes that canonicalised request. Safe
    to call concurrently with an in-flight :meth:`send` stream.
    ``unix_nanos`` is stamped here at send time. gRPC failures
    surface as :class:`grpclib.exceptions.GRPCError`.
    """
    stub = self._require_stub()
    request = LocomotionResetRequest(
        move=move,
        look=look,
        crouch=crouch,
        jump=jump,
        unix_nanos=time.time_ns(),
    )
    summary = await stub.reset(request)
    return ResetSummary(
        move=summary.move,
        look=summary.look,
        crouch=summary.crouch,
        jump=summary.jump,
        unix_nanos=summary.unix_nanos,
    )

resoio.locomotion.DriveSummary dataclass

DriveSummary(received_count: int, dropped_count: int, unix_nanos: int)

Server-side summary returned when a Drive stream ends.

dropped_count is reserved for a future non-blocking bridge and is always 0 today (the bridge applies commands serially).

resoio.locomotion.ResetSummary dataclass

ResetSummary(move: bool, look: bool, crouch: bool, jump: bool, unix_nanos: int)

Server-side echo of the canonicalised reset request.

Returned by :meth:LocomotionClient.reset. Each bool reflects the request after the service canonicalises it (an all-false request is expanded into all-true), not whether the engine has already applied the reset — application happens on the next engine tick.