Skip to content

Auto-Generated Python API Reference

This page is generated directly from source code docstrings using mkdocstrings.


Robot

threewe.Robot

AI-First interface for controlling a 3we robot.

Supports three backends: - "gazebo": Gazebo Harmonic simulation (CPU, CI, quick iteration) - "isaac_sim": NVIDIA Isaac Sim (GPU RL training, domain randomization) - "real": Physical robot via ROS2 (Pi 5 + ESP32-S3 + micro-ROS)

All backends return data in identical formats — code written for one backend works on all others without modification.

Source code in sdk/threewe/src/threewe/robot.py
class Robot:
    """AI-First interface for controlling a 3we robot.

    Supports three backends:
    - "gazebo": Gazebo Harmonic simulation (CPU, CI, quick iteration)
    - "isaac_sim": NVIDIA Isaac Sim (GPU RL training, domain randomization)
    - "real": Physical robot via ROS2 (Pi 5 + ESP32-S3 + micro-ROS)

    All backends return data in identical formats — code written for one backend
    works on all others without modification.
    """

    def __init__(
        self,
        backend: str = "gazebo",
        config: str | RobotConfig = "standard_v2",
        *,
        hardware: str = "3we_standard_v2",
        scene: str = "office_v2",
        auto_connect: bool = True,
        verbose: bool = False,
    ) -> None:
        if isinstance(config, str):
            self._config = load_config(config)
        else:
            self._config = config

        from threewe.hal import load_hardware_profile

        self._hardware = load_hardware_profile(hardware)
        self._backend_name = backend
        self._scene = scene
        self._verbose = verbose
        self._backend: BackendBase = self._create_backend(backend)
        self._auto_connect = auto_connect

    async def __aenter__(self) -> Robot:
        if self._auto_connect:
            self.connect()
        return self

    async def __aexit__(self, *_args: object) -> None:
        self.disconnect()

    @property
    def config(self) -> RobotConfig:
        return self._config

    @property
    def hardware(self) -> HardwareProfileData:
        return self._hardware

    @property
    def backend_name(self) -> str:
        return self._backend_name

    @property
    def is_connected(self) -> bool:
        return self._backend.is_connected

    @property
    def ros2_node(self):
        """Advanced: access the underlying ROS2 node for custom subscriptions.

        Returns None for backends without ROS2 (mock, isaac_sim).
        """
        if hasattr(self._backend, "_ros2_node"):
            return self._backend._ros2_node
        return None

    def connect(self) -> None:
        """Connect to the robot or simulator."""
        self._backend.connect()

    def disconnect(self) -> None:
        """Disconnect from the robot or simulator."""
        if self._backend.is_connected:
            self._backend.disconnect()

    # ─── Perception ───

    def get_camera_image(self) -> np.ndarray:
        """Get RGB image. Returns (H, W, 3) uint8 numpy array."""
        self._ensure_connected()
        return self._backend.get_camera_image()

    get_image = get_camera_image

    def get_rgbd_image(self) -> RGBDImage:
        """Get RGB-D image with depth in meters."""
        self._ensure_connected()
        return self._backend.get_rgbd_image()

    def get_lidar_scan(self) -> LaserScan:
        """Get 2D laser scan."""
        self._ensure_connected()
        return self._backend.get_lidar_scan()

    def get_pose(self) -> Pose2D:
        """Get robot pose in the map frame."""
        self._ensure_connected()
        return self._backend.get_pose()

    def get_velocity(self) -> Velocity:
        """Get current body-frame velocity."""
        self._ensure_connected()
        return self._backend.get_velocity()

    def get_imu(self) -> IMUData:
        """Get IMU sensor data."""
        self._ensure_connected()
        return self._backend.get_imu()

    def get_battery_state(self) -> BatteryState:
        """Get battery voltage, percentage, and charging state."""
        self._ensure_connected()
        return self._backend.get_battery_state()

    def get_map(self) -> OccupancyGrid:
        """Get current SLAM occupancy grid map."""
        self._ensure_connected()
        return self._backend.get_map()

    def get_wheel_speeds(self) -> np.ndarray:
        """Get wheel encoder speeds. Returns (4,) float32 in RPM."""
        self._ensure_connected()
        return self._backend.get_wheel_speeds()

    def get_motor_current(self) -> np.ndarray:
        """Get motor current draw. Returns (4,) float32 in Amps."""
        self._ensure_connected()
        return self._backend.get_motor_current()

    _DEFAULT_MODALITIES: tuple[str, ...] = ("image", "lidar", "pose", "velocity")

    def get_observation(self, modalities: list[str] | None = None) -> dict[str, np.ndarray]:
        """Get standardized observation dict for VLA/RL model ingestion.

        Args:
            modalities: Which sensor modalities to include. Defaults to
                ("image", "lidar", "pose", "velocity"). Supported:
                "image", "depth", "lidar", "pose", "velocity", "imu", "map".
        """
        self._ensure_connected()
        keys = modalities if modalities is not None else list(self._DEFAULT_MODALITIES)
        obs: dict[str, np.ndarray] = {}

        for key in keys:
            if key == "image":
                obs["image"] = self.get_camera_image()
            elif key == "depth":
                rgbd = self.get_rgbd_image()
                obs["depth"] = rgbd.depth
            elif key == "lidar":
                scan = self.get_lidar_scan()
                obs["lidar"] = scan.ranges
            elif key == "pose":
                pose = self.get_pose()
                obs["pose"] = np.array([pose.x, pose.y, pose.theta], dtype=np.float32)
            elif key == "velocity":
                vel = self.get_velocity()
                obs["velocity"] = np.array([vel.vx, vel.vy, vel.omega], dtype=np.float32)
            elif key == "imu":
                imu = self.get_imu()
                obs["imu"] = np.concatenate(
                    [imu.acceleration, imu.angular_velocity, imu.orientation]
                )
            elif key == "map":
                grid = self.get_map()
                obs["map"] = grid.data.astype(np.float32)
            else:
                raise ValueError(
                    f"Unknown modality '{key}'. Supported: "
                    "image, depth, lidar, pose, velocity, imu, map"
                )

        return obs

    # ─── Action ───

    def set_velocity(self, vx: float, vy: float, omega: float) -> None:
        """Command body velocity. Must be called continuously or timeout stops motors."""
        self._ensure_connected()
        self._clamp_and_send(vx, vy, omega)

    def set_wheel_velocities(self, speeds: list[float]) -> None:
        """Command individual wheel speeds via inverse kinematics.

        Args:
            speeds: (4,) target wheel speeds in RPM [FL, FR, RL, RR].
                Converted to body velocity via mecanum forward kinematics
                and sent as cmd_vel.
        """
        self._ensure_connected()
        r = self._hardware.wheel_radius
        k = self._hardware.wheel_separation / 2.0
        rad_s = [s * (2.0 * np.pi) / 60.0 for s in speeds]
        vx = r * (rad_s[0] + rad_s[1] + rad_s[2] + rad_s[3]) / 4.0
        vy = r * (-rad_s[0] + rad_s[1] + rad_s[2] - rad_s[3]) / 4.0
        omega = r * (-rad_s[0] + rad_s[1] - rad_s[2] + rad_s[3]) / (4.0 * k)
        self._clamp_and_send(vx, vy, omega)

    def stop(self) -> None:
        """Immediately stop all motion."""
        self._ensure_connected()
        self._backend.stop()

    async def move_to(
        self, x: float, y: float, theta: float | None = None, timeout: float = 60.0
    ) -> MoveResult:
        """Navigate to target pose using Nav2 path planning."""
        self._ensure_connected()
        return await self._backend.move_to(x, y, theta, timeout)

    async def move_forward(self, distance: float) -> MoveResult:
        """Drive straight forward by the specified distance (meters)."""
        self._ensure_connected()
        return await self._backend.move_forward(distance)

    async def rotate(self, angle: float) -> MoveResult:
        """Rotate in place by the specified angle (radians, positive=CCW)."""
        self._ensure_connected()
        return await self._backend.rotate(angle)

    async def explore(self, timeout: float = 60.0) -> ExploreResult:
        """Autonomously explore unknown areas until map is complete or timeout."""
        self._ensure_connected()
        return await self._backend.explore(timeout)

    async def follow_path(self, waypoints: list[Pose2D]) -> MoveResult:
        """Follow a sequence of waypoints in order."""
        self._ensure_connected()
        return await self._backend.follow_path(waypoints)

    # ─── AI Integration ───

    def execute_action(self, action: np.ndarray) -> None:
        """Execute a policy network output action vector.

        Expected shape: (3,) normalized to [-1, 1] as [vx, vy, omega].
        Scaled by configured max velocities.
        """
        self._ensure_connected()
        limits = self._config.limits
        vx = float(action[0]) * limits.max_linear_velocity
        vy = float(action[1]) * limits.max_linear_velocity
        omega = float(action[2]) * limits.max_angular_velocity
        self._clamp_and_send(vx, vy, omega)

    async def execute_instruction(
        self, instruction: str, mode: str = "blocking"
    ) -> ExecutionResult:
        """Execute a natural language instruction using VLM reasoning.

        Args:
            instruction: Natural language task description.
            mode: "blocking" for sequential perception-action loop,
                  "async" for decoupled VLM inference + 20Hz control loop.

        Requires the `ai` extra: pip install threewe[ai]
        """
        self._ensure_connected()
        if mode == "async":
            from threewe.ai.vlm_runner import execute_vlm_instruction_async

            return await execute_vlm_instruction_async(self, instruction)

        from threewe.ai.vlm_runner import execute_vlm_instruction

        return await execute_vlm_instruction(self, instruction)

    # ─── Internal ───

    def _ensure_connected(self) -> None:
        if not self._backend.is_connected:
            raise RobotConnectionError("Robot is not connected. Call robot.connect() first.")

    def _clamp_and_send(self, vx: float, vy: float, omega: float) -> None:
        limits = self._config.limits
        vx = max(-limits.max_linear_velocity, min(limits.max_linear_velocity, vx))
        vy = max(-limits.max_linear_velocity, min(limits.max_linear_velocity, vy))
        omega = max(-limits.max_angular_velocity, min(limits.max_angular_velocity, omega))
        self._backend.set_velocity(vx, vy, omega)

    def _create_backend(self, backend: str) -> BackendBase:
        if backend == "gazebo":
            from threewe.backends.gazebo import GazeboBackend

            return GazeboBackend(config=self._config, scene=self._scene)
        elif backend == "real":
            from threewe.backends.real import RealBackend

            return RealBackend(config=self._config)
        elif backend == "isaac_sim":
            from threewe.backends.isaac_sim import IsaacSimBackend, IsaacSimConfig

            return IsaacSimBackend(config=IsaacSimConfig(scene=self._scene))
        elif backend == "mock":
            from threewe.backends.mock import MockBackend

            return MockBackend(config=self._config, scene=self._scene, verbose=self._verbose)
        else:
            raise ValueError(
                f"Unknown backend '{backend}'. Choose from: 'gazebo', 'real', 'isaac_sim', 'mock'"
            )

ros2_node property

Advanced: access the underlying ROS2 node for custom subscriptions.

Returns None for backends without ROS2 (mock, isaac_sim).

connect()

Connect to the robot or simulator.

Source code in sdk/threewe/src/threewe/robot.py
def connect(self) -> None:
    """Connect to the robot or simulator."""
    self._backend.connect()

disconnect()

Disconnect from the robot or simulator.

Source code in sdk/threewe/src/threewe/robot.py
def disconnect(self) -> None:
    """Disconnect from the robot or simulator."""
    if self._backend.is_connected:
        self._backend.disconnect()

get_camera_image()

Get RGB image. Returns (H, W, 3) uint8 numpy array.

Source code in sdk/threewe/src/threewe/robot.py
def get_camera_image(self) -> np.ndarray:
    """Get RGB image. Returns (H, W, 3) uint8 numpy array."""
    self._ensure_connected()
    return self._backend.get_camera_image()

get_rgbd_image()

Get RGB-D image with depth in meters.

Source code in sdk/threewe/src/threewe/robot.py
def get_rgbd_image(self) -> RGBDImage:
    """Get RGB-D image with depth in meters."""
    self._ensure_connected()
    return self._backend.get_rgbd_image()

get_lidar_scan()

Get 2D laser scan.

Source code in sdk/threewe/src/threewe/robot.py
def get_lidar_scan(self) -> LaserScan:
    """Get 2D laser scan."""
    self._ensure_connected()
    return self._backend.get_lidar_scan()

get_pose()

Get robot pose in the map frame.

Source code in sdk/threewe/src/threewe/robot.py
def get_pose(self) -> Pose2D:
    """Get robot pose in the map frame."""
    self._ensure_connected()
    return self._backend.get_pose()

get_velocity()

Get current body-frame velocity.

Source code in sdk/threewe/src/threewe/robot.py
def get_velocity(self) -> Velocity:
    """Get current body-frame velocity."""
    self._ensure_connected()
    return self._backend.get_velocity()

get_imu()

Get IMU sensor data.

Source code in sdk/threewe/src/threewe/robot.py
def get_imu(self) -> IMUData:
    """Get IMU sensor data."""
    self._ensure_connected()
    return self._backend.get_imu()

get_battery_state()

Get battery voltage, percentage, and charging state.

Source code in sdk/threewe/src/threewe/robot.py
def get_battery_state(self) -> BatteryState:
    """Get battery voltage, percentage, and charging state."""
    self._ensure_connected()
    return self._backend.get_battery_state()

get_map()

Get current SLAM occupancy grid map.

Source code in sdk/threewe/src/threewe/robot.py
def get_map(self) -> OccupancyGrid:
    """Get current SLAM occupancy grid map."""
    self._ensure_connected()
    return self._backend.get_map()

get_wheel_speeds()

Get wheel encoder speeds. Returns (4,) float32 in RPM.

Source code in sdk/threewe/src/threewe/robot.py
def get_wheel_speeds(self) -> np.ndarray:
    """Get wheel encoder speeds. Returns (4,) float32 in RPM."""
    self._ensure_connected()
    return self._backend.get_wheel_speeds()

get_motor_current()

Get motor current draw. Returns (4,) float32 in Amps.

Source code in sdk/threewe/src/threewe/robot.py
def get_motor_current(self) -> np.ndarray:
    """Get motor current draw. Returns (4,) float32 in Amps."""
    self._ensure_connected()
    return self._backend.get_motor_current()

get_observation(modalities=None)

Get standardized observation dict for VLA/RL model ingestion.

Parameters:

Name Type Description Default
modalities list[str] | None

Which sensor modalities to include. Defaults to ("image", "lidar", "pose", "velocity"). Supported: "image", "depth", "lidar", "pose", "velocity", "imu", "map".

None
Source code in sdk/threewe/src/threewe/robot.py
def get_observation(self, modalities: list[str] | None = None) -> dict[str, np.ndarray]:
    """Get standardized observation dict for VLA/RL model ingestion.

    Args:
        modalities: Which sensor modalities to include. Defaults to
            ("image", "lidar", "pose", "velocity"). Supported:
            "image", "depth", "lidar", "pose", "velocity", "imu", "map".
    """
    self._ensure_connected()
    keys = modalities if modalities is not None else list(self._DEFAULT_MODALITIES)
    obs: dict[str, np.ndarray] = {}

    for key in keys:
        if key == "image":
            obs["image"] = self.get_camera_image()
        elif key == "depth":
            rgbd = self.get_rgbd_image()
            obs["depth"] = rgbd.depth
        elif key == "lidar":
            scan = self.get_lidar_scan()
            obs["lidar"] = scan.ranges
        elif key == "pose":
            pose = self.get_pose()
            obs["pose"] = np.array([pose.x, pose.y, pose.theta], dtype=np.float32)
        elif key == "velocity":
            vel = self.get_velocity()
            obs["velocity"] = np.array([vel.vx, vel.vy, vel.omega], dtype=np.float32)
        elif key == "imu":
            imu = self.get_imu()
            obs["imu"] = np.concatenate(
                [imu.acceleration, imu.angular_velocity, imu.orientation]
            )
        elif key == "map":
            grid = self.get_map()
            obs["map"] = grid.data.astype(np.float32)
        else:
            raise ValueError(
                f"Unknown modality '{key}'. Supported: "
                "image, depth, lidar, pose, velocity, imu, map"
            )

    return obs

set_velocity(vx, vy, omega)

Command body velocity. Must be called continuously or timeout stops motors.

Source code in sdk/threewe/src/threewe/robot.py
def set_velocity(self, vx: float, vy: float, omega: float) -> None:
    """Command body velocity. Must be called continuously or timeout stops motors."""
    self._ensure_connected()
    self._clamp_and_send(vx, vy, omega)

set_wheel_velocities(speeds)

Command individual wheel speeds via inverse kinematics.

Parameters:

Name Type Description Default
speeds list[float]

(4,) target wheel speeds in RPM [FL, FR, RL, RR]. Converted to body velocity via mecanum forward kinematics and sent as cmd_vel.

required
Source code in sdk/threewe/src/threewe/robot.py
def set_wheel_velocities(self, speeds: list[float]) -> None:
    """Command individual wheel speeds via inverse kinematics.

    Args:
        speeds: (4,) target wheel speeds in RPM [FL, FR, RL, RR].
            Converted to body velocity via mecanum forward kinematics
            and sent as cmd_vel.
    """
    self._ensure_connected()
    r = self._hardware.wheel_radius
    k = self._hardware.wheel_separation / 2.0
    rad_s = [s * (2.0 * np.pi) / 60.0 for s in speeds]
    vx = r * (rad_s[0] + rad_s[1] + rad_s[2] + rad_s[3]) / 4.0
    vy = r * (-rad_s[0] + rad_s[1] + rad_s[2] - rad_s[3]) / 4.0
    omega = r * (-rad_s[0] + rad_s[1] - rad_s[2] + rad_s[3]) / (4.0 * k)
    self._clamp_and_send(vx, vy, omega)

stop()

Immediately stop all motion.

Source code in sdk/threewe/src/threewe/robot.py
def stop(self) -> None:
    """Immediately stop all motion."""
    self._ensure_connected()
    self._backend.stop()

move_to(x, y, theta=None, timeout=60.0) async

Navigate to target pose using Nav2 path planning.

Source code in sdk/threewe/src/threewe/robot.py
async def move_to(
    self, x: float, y: float, theta: float | None = None, timeout: float = 60.0
) -> MoveResult:
    """Navigate to target pose using Nav2 path planning."""
    self._ensure_connected()
    return await self._backend.move_to(x, y, theta, timeout)

move_forward(distance) async

Drive straight forward by the specified distance (meters).

Source code in sdk/threewe/src/threewe/robot.py
async def move_forward(self, distance: float) -> MoveResult:
    """Drive straight forward by the specified distance (meters)."""
    self._ensure_connected()
    return await self._backend.move_forward(distance)

rotate(angle) async

Rotate in place by the specified angle (radians, positive=CCW).

Source code in sdk/threewe/src/threewe/robot.py
async def rotate(self, angle: float) -> MoveResult:
    """Rotate in place by the specified angle (radians, positive=CCW)."""
    self._ensure_connected()
    return await self._backend.rotate(angle)

explore(timeout=60.0) async

Autonomously explore unknown areas until map is complete or timeout.

Source code in sdk/threewe/src/threewe/robot.py
async def explore(self, timeout: float = 60.0) -> ExploreResult:
    """Autonomously explore unknown areas until map is complete or timeout."""
    self._ensure_connected()
    return await self._backend.explore(timeout)

follow_path(waypoints) async

Follow a sequence of waypoints in order.

Source code in sdk/threewe/src/threewe/robot.py
async def follow_path(self, waypoints: list[Pose2D]) -> MoveResult:
    """Follow a sequence of waypoints in order."""
    self._ensure_connected()
    return await self._backend.follow_path(waypoints)

execute_action(action)

Execute a policy network output action vector.

Expected shape: (3,) normalized to [-1, 1] as [vx, vy, omega]. Scaled by configured max velocities.

Source code in sdk/threewe/src/threewe/robot.py
def execute_action(self, action: np.ndarray) -> None:
    """Execute a policy network output action vector.

    Expected shape: (3,) normalized to [-1, 1] as [vx, vy, omega].
    Scaled by configured max velocities.
    """
    self._ensure_connected()
    limits = self._config.limits
    vx = float(action[0]) * limits.max_linear_velocity
    vy = float(action[1]) * limits.max_linear_velocity
    omega = float(action[2]) * limits.max_angular_velocity
    self._clamp_and_send(vx, vy, omega)

execute_instruction(instruction, mode='blocking') async

Execute a natural language instruction using VLM reasoning.

Parameters:

Name Type Description Default
instruction str

Natural language task description.

required
mode str

"blocking" for sequential perception-action loop, "async" for decoupled VLM inference + 20Hz control loop.

'blocking'

Requires the ai extra: pip install threewe[ai]

Source code in sdk/threewe/src/threewe/robot.py
async def execute_instruction(
    self, instruction: str, mode: str = "blocking"
) -> ExecutionResult:
    """Execute a natural language instruction using VLM reasoning.

    Args:
        instruction: Natural language task description.
        mode: "blocking" for sequential perception-action loop,
              "async" for decoupled VLM inference + 20Hz control loop.

    Requires the `ai` extra: pip install threewe[ai]
    """
    self._ensure_connected()
    if mode == "async":
        from threewe.ai.vlm_runner import execute_vlm_instruction_async

        return await execute_vlm_instruction_async(self, instruction)

    from threewe.ai.vlm_runner import execute_vlm_instruction

    return await execute_vlm_instruction(self, instruction)

Types

threewe.types

Core data types for the threewe API.

All sensor data is returned as numpy arrays with standardized shapes and dtypes. Coordinate system: right-hand, X forward, Y left, Z up. Angles in radians.

Pose2D dataclass

2D pose in the map frame.

Source code in sdk/threewe/src/threewe/types.py
@dataclass(frozen=True, slots=True)
class Pose2D:
    """2D pose in the map frame."""

    x: float = 0.0
    y: float = 0.0
    theta: float = 0.0

Velocity dataclass

Body-frame velocity.

Source code in sdk/threewe/src/threewe/types.py
@dataclass(frozen=True, slots=True)
class Velocity:
    """Body-frame velocity."""

    vx: float = 0.0
    vy: float = 0.0
    omega: float = 0.0

CameraIntrinsics dataclass

Pinhole camera intrinsic parameters.

Source code in sdk/threewe/src/threewe/types.py
@dataclass(frozen=True, slots=True)
class CameraIntrinsics:
    """Pinhole camera intrinsic parameters."""

    fx: float = 0.0
    fy: float = 0.0
    cx: float = 0.0
    cy: float = 0.0
    width: int = 640
    height: int = 480

RGBDImage dataclass

RGB-D image pair with intrinsics.

Source code in sdk/threewe/src/threewe/types.py
@dataclass(slots=True)
class RGBDImage:
    """RGB-D image pair with intrinsics."""

    rgb: np.ndarray  # (H, W, 3) uint8
    depth: np.ndarray  # (H, W) float32, meters
    intrinsics: CameraIntrinsics
    timestamp: float = 0.0

LaserScan dataclass

2D laser scan.

Source code in sdk/threewe/src/threewe/types.py
@dataclass(slots=True)
class LaserScan:
    """2D laser scan."""

    ranges: np.ndarray  # (N,) float32, meters
    angles: np.ndarray  # (N,) float32, radians
    angle_min: float = 0.0
    angle_max: float = 0.0
    range_max: float = 12.0
    timestamp: float = 0.0

OccupancyGrid dataclass

2D occupancy grid map.

Source code in sdk/threewe/src/threewe/types.py
@dataclass(slots=True)
class OccupancyGrid:
    """2D occupancy grid map."""

    data: np.ndarray  # (H, W) int8: 0=free, 100=occupied, -1=unknown
    resolution: float = 0.05
    origin: Pose2D = field(default_factory=Pose2D)
    timestamp: float = 0.0

MoveResult dataclass

Result of a navigation action.

Source code in sdk/threewe/src/threewe/types.py
@dataclass(frozen=True, slots=True)
class MoveResult:
    """Result of a navigation action."""

    success: bool
    final_pose: Pose2D
    duration: float = 0.0
    distance: float = 0.0
    reason: str = "reached"

IMUData dataclass

IMU sensor reading.

Source code in sdk/threewe/src/threewe/types.py
@dataclass(slots=True)
class IMUData:
    """IMU sensor reading."""

    acceleration: np.ndarray  # (3,) m/s²
    angular_velocity: np.ndarray  # (3,) rad/s
    orientation: np.ndarray  # (4,) quaternion [x, y, z, w]
    timestamp: float = 0.0

BatteryState dataclass

Battery status.

Source code in sdk/threewe/src/threewe/types.py
@dataclass(frozen=True, slots=True)
class BatteryState:
    """Battery status."""

    voltage: float = 0.0
    percentage: float = 0.0
    is_charging: bool = False

ExploreResult dataclass

Result of an exploration action.

Source code in sdk/threewe/src/threewe/types.py
@dataclass(frozen=True, slots=True)
class ExploreResult:
    """Result of an exploration action."""

    coverage: float = 0.0
    duration: float = 0.0
    cells_explored: int = 0
    timed_out: bool = False

ExecutionResult dataclass

Result of a VLM instruction execution.

Source code in sdk/threewe/src/threewe/types.py
@dataclass(frozen=True, slots=True)
class ExecutionResult:
    """Result of a VLM instruction execution."""

    success: bool = False
    description: str = ""
    images: list[np.ndarray] = field(default_factory=list)

Configuration

threewe.config

Configuration loading and validation.

load_config(name='standard_v2')

Load a robot configuration by name.

Searches in order: 1. Absolute/relative path if name contains a path separator 2. Built-in configs directory (sdk/threewe/configs/) 3. Current working directory

Source code in sdk/threewe/src/threewe/config.py
def load_config(name: str = "standard_v2") -> RobotConfig:
    """Load a robot configuration by name.

    Searches in order:
    1. Absolute/relative path if name contains a path separator
    2. Built-in configs directory (sdk/threewe/configs/)
    3. Current working directory
    """
    path = _resolve_config_path(name)
    if path is None:
        return RobotConfig()

    with open(path) as f:
        raw = yaml.safe_load(f) or {}

    return _parse_config(raw)

Exceptions

threewe.exceptions

Exception hierarchy for threewe.

All exceptions inherit from ThreeweError to allow blanket catching.

ThreeweError

Bases: Exception

Base exception for all threewe errors.

Source code in sdk/threewe/src/threewe/exceptions.py
class ThreeweError(Exception):
    """Base exception for all threewe errors."""

RobotConnectionError

Bases: ThreeweError

Cannot connect to the robot or simulator.

Source code in sdk/threewe/src/threewe/exceptions.py
class RobotConnectionError(ThreeweError):
    """Cannot connect to the robot or simulator."""

NavigationError

Bases: ThreeweError

Navigation action failed (path blocked, goal unreachable).

Source code in sdk/threewe/src/threewe/exceptions.py
class NavigationError(ThreeweError):
    """Navigation action failed (path blocked, goal unreachable)."""

HardwareError

Bases: ThreeweError

Hardware fault detected (motor overheat, sensor disconnect).

Source code in sdk/threewe/src/threewe/exceptions.py
class HardwareError(ThreeweError):
    """Hardware fault detected (motor overheat, sensor disconnect)."""

EmergencyStopError

Bases: ThreeweError

Emergency stop triggered.

Source code in sdk/threewe/src/threewe/exceptions.py
class EmergencyStopError(ThreeweError):
    """Emergency stop triggered."""

RobotTimeoutError

Bases: ThreeweError

Operation timed out.

Source code in sdk/threewe/src/threewe/exceptions.py
class RobotTimeoutError(ThreeweError):
    """Operation timed out."""

SafetyError

Bases: ThreeweError

Safety constraint violated (speed limit, boundary breach).

Source code in sdk/threewe/src/threewe/exceptions.py
class SafetyError(ThreeweError):
    """Safety constraint violated (speed limit, boundary breach)."""

Backends

threewe.backends

Backend Abstraction Layer — the Sim2Real consistency contract.

All backends (Gazebo, Isaac Sim, Real Hardware) implement this interface. Users interact with the Robot class which delegates to the active backend.

BackendBase

Bases: ABC

Abstract base for all robot backends.

Consistency contract guarantees: - Image: (H, W, 3) uint8 RGB - Depth: (H, W) float32, meters, invalid=0.0 - LiDAR: (N,) float32, meters, uniform angular sampling - Pose: right-hand, X forward, Y left, Z up, radians - Velocity: m/s linear, rad/s angular

Source code in sdk/threewe/src/threewe/backends/__init__.py
class BackendBase(ABC):
    """Abstract base for all robot backends.

    Consistency contract guarantees:
    - Image: (H, W, 3) uint8 RGB
    - Depth: (H, W) float32, meters, invalid=0.0
    - LiDAR: (N,) float32, meters, uniform angular sampling
    - Pose: right-hand, X forward, Y left, Z up, radians
    - Velocity: m/s linear, rad/s angular
    """

    @abstractmethod
    def connect(self) -> None:
        """Establish connection to the backend."""

    @abstractmethod
    def disconnect(self) -> None:
        """Cleanly disconnect from the backend."""

    @property
    @abstractmethod
    def is_connected(self) -> bool:
        """Whether the backend is currently connected."""

    @abstractmethod
    def get_camera_image(self) -> np.ndarray:
        """Get RGB image. Returns (H, W, 3) uint8."""

    @abstractmethod
    def get_rgbd_image(self) -> RGBDImage:
        """Get RGB-D image pair."""

    @abstractmethod
    def get_lidar_scan(self) -> LaserScan:
        """Get 2D laser scan."""

    @abstractmethod
    def get_pose(self) -> Pose2D:
        """Get robot pose in map frame."""

    @abstractmethod
    def get_velocity(self) -> Velocity:
        """Get current body velocity."""

    @abstractmethod
    def get_imu(self) -> IMUData:
        """Get IMU reading."""

    @abstractmethod
    def get_battery_state(self) -> BatteryState:
        """Get battery status."""

    @abstractmethod
    def get_map(self) -> OccupancyGrid:
        """Get current occupancy grid map."""

    @abstractmethod
    def get_wheel_speeds(self) -> np.ndarray:
        """Get wheel encoder speeds. Returns (4,) float32, RPM."""

    @abstractmethod
    def get_motor_current(self) -> np.ndarray:
        """Get motor current draw. Returns (4,) float32, Amps."""

    @abstractmethod
    def set_velocity(self, vx: float, vy: float, omega: float) -> None:
        """Command body velocity. Must be called continuously or timeout stops motors."""

    @abstractmethod
    def stop(self) -> None:
        """Immediately stop all motion."""

    @abstractmethod
    async def move_to(
        self, x: float, y: float, theta: float | None = None, timeout: float = 60.0
    ) -> MoveResult:
        """Navigate to target pose using path planning."""

    @abstractmethod
    async def move_forward(self, distance: float) -> MoveResult:
        """Drive forward by the specified distance in meters."""

    @abstractmethod
    async def rotate(self, angle: float) -> MoveResult:
        """Rotate in place by the specified angle in radians."""

    @abstractmethod
    async def explore(self, timeout: float = 60.0) -> ExploreResult:
        """Autonomously explore unknown areas."""

    @abstractmethod
    async def follow_path(self, waypoints: list) -> MoveResult:
        """Follow a sequence of Pose2D waypoints in order."""

is_connected abstractmethod property

Whether the backend is currently connected.

connect() abstractmethod

Establish connection to the backend.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def connect(self) -> None:
    """Establish connection to the backend."""

disconnect() abstractmethod

Cleanly disconnect from the backend.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def disconnect(self) -> None:
    """Cleanly disconnect from the backend."""

get_camera_image() abstractmethod

Get RGB image. Returns (H, W, 3) uint8.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def get_camera_image(self) -> np.ndarray:
    """Get RGB image. Returns (H, W, 3) uint8."""

get_rgbd_image() abstractmethod

Get RGB-D image pair.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def get_rgbd_image(self) -> RGBDImage:
    """Get RGB-D image pair."""

get_lidar_scan() abstractmethod

Get 2D laser scan.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def get_lidar_scan(self) -> LaserScan:
    """Get 2D laser scan."""

get_pose() abstractmethod

Get robot pose in map frame.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def get_pose(self) -> Pose2D:
    """Get robot pose in map frame."""

get_velocity() abstractmethod

Get current body velocity.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def get_velocity(self) -> Velocity:
    """Get current body velocity."""

get_imu() abstractmethod

Get IMU reading.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def get_imu(self) -> IMUData:
    """Get IMU reading."""

get_battery_state() abstractmethod

Get battery status.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def get_battery_state(self) -> BatteryState:
    """Get battery status."""

get_map() abstractmethod

Get current occupancy grid map.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def get_map(self) -> OccupancyGrid:
    """Get current occupancy grid map."""

get_wheel_speeds() abstractmethod

Get wheel encoder speeds. Returns (4,) float32, RPM.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def get_wheel_speeds(self) -> np.ndarray:
    """Get wheel encoder speeds. Returns (4,) float32, RPM."""

get_motor_current() abstractmethod

Get motor current draw. Returns (4,) float32, Amps.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def get_motor_current(self) -> np.ndarray:
    """Get motor current draw. Returns (4,) float32, Amps."""

set_velocity(vx, vy, omega) abstractmethod

Command body velocity. Must be called continuously or timeout stops motors.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def set_velocity(self, vx: float, vy: float, omega: float) -> None:
    """Command body velocity. Must be called continuously or timeout stops motors."""

stop() abstractmethod

Immediately stop all motion.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
def stop(self) -> None:
    """Immediately stop all motion."""

move_to(x, y, theta=None, timeout=60.0) abstractmethod async

Navigate to target pose using path planning.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
async def move_to(
    self, x: float, y: float, theta: float | None = None, timeout: float = 60.0
) -> MoveResult:
    """Navigate to target pose using path planning."""

move_forward(distance) abstractmethod async

Drive forward by the specified distance in meters.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
async def move_forward(self, distance: float) -> MoveResult:
    """Drive forward by the specified distance in meters."""

rotate(angle) abstractmethod async

Rotate in place by the specified angle in radians.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
async def rotate(self, angle: float) -> MoveResult:
    """Rotate in place by the specified angle in radians."""

explore(timeout=60.0) abstractmethod async

Autonomously explore unknown areas.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
async def explore(self, timeout: float = 60.0) -> ExploreResult:
    """Autonomously explore unknown areas."""

follow_path(waypoints) abstractmethod async

Follow a sequence of Pose2D waypoints in order.

Source code in sdk/threewe/src/threewe/backends/__init__.py
@abstractmethod
async def follow_path(self, waypoints: list) -> MoveResult:
    """Follow a sequence of Pose2D waypoints in order."""

Benchmark

threewe.benchmark

Benchmark framework for standardized robot evaluation.

Provides reproducible evaluation tasks with standard metrics: - PointNav: Point-to-point navigation (SPL, Success Rate) - ObjectNav: Object-goal navigation (SPL, Success Rate) - Exploration: Coverage exploration (Coverage %, Duration)

Usage

threewe benchmark run --task pointnav --episodes 100 --backend gazebo threewe benchmark compare --result result.json --baseline nav2_pointnav_office

LeaderboardEntry dataclass

A single leaderboard submission entry.

Source code in sdk/threewe/src/threewe/benchmark/leaderboard.py
@dataclass(frozen=True)
class LeaderboardEntry:
    """A single leaderboard submission entry."""

    agent_name: str
    task: str
    scene: str
    success_rate: float
    spl: float
    hardware: str
    timestamp: str

ObjectNavEpisodeConfig dataclass

Configuration for a single ObjectNav episode.

Source code in sdk/threewe/src/threewe/benchmark/objectnav_runner.py
@dataclass(frozen=True)
class ObjectNavEpisodeConfig:
    """Configuration for a single ObjectNav episode."""

    target_category: str
    target_position: Pose2D
    start_pose: Pose2D
    success_radius: float = 1.0
    timeout: float = 60.0

BenchmarkRunner

Runs standardized benchmark episodes on the robot.

Source code in sdk/threewe/src/threewe/benchmark/runner.py
class BenchmarkRunner:
    """Runs standardized benchmark episodes on the robot."""

    def __init__(
        self,
        backend: str = "gazebo",
        scene: str = "office_v2",
    ) -> None:
        self._backend = backend
        self._scene = scene

    async def run_pointnav(
        self,
        num_episodes: int = 100,
        seed: int = 42,
    ) -> BenchmarkReport:
        """Run point-to-point navigation benchmark using scene poses."""
        import numpy as np

        from threewe import Robot
        from threewe.scenes import load_scene

        rng = np.random.default_rng(seed)
        scene = load_scene(self._scene)
        episodes: list[EpisodeResult] = []

        async with Robot(backend=self._backend, auto_connect=True) as robot:
            for ep in range(num_episodes):
                goal = scene.goal_poses[rng.integers(0, len(scene.goal_poses))]

                start_pose = robot.get_pose()
                optimal = math.sqrt((goal.x - start_pose.x) ** 2 + (goal.y - start_pose.y) ** 2)

                start_time = time.time()
                result = await robot.move_to(x=goal.x, y=goal.y, timeout=30.0)
                duration = time.time() - start_time

                episodes.append(
                    EpisodeResult(
                        episode_id=ep,
                        success=result.success,
                        duration=duration,
                        path_length=result.distance,
                        optimal_length=optimal,
                        reason=result.reason,
                    )
                )

        return self._build_report("pointnav", episodes)

    async def run_exploration(
        self,
        num_episodes: int = 10,
        timeout_per_episode: float = 120.0,
    ) -> BenchmarkReport:
        """Run exploration coverage benchmark."""
        from threewe import Robot

        episodes: list[EpisodeResult] = []

        async with Robot(backend=self._backend, auto_connect=True) as robot:
            for ep in range(num_episodes):
                start_time = time.time()
                result = await robot.explore(timeout=timeout_per_episode)
                duration = time.time() - start_time

                episodes.append(
                    EpisodeResult(
                        episode_id=ep,
                        success=result.coverage >= 0.8,
                        duration=duration,
                        path_length=0.0,
                        coverage=result.coverage,
                        reason="timed_out" if result.timed_out else "completed",
                    )
                )

        return self._build_report("exploration", episodes)

    async def run_objectnav(
        self,
        num_episodes: int = 100,
        seed: int = 42,
    ) -> BenchmarkReport:
        """Run object-goal navigation benchmark using scene poses."""
        from threewe.benchmark.objectnav_runner import run_objectnav_benchmark

        episodes = await run_objectnav_benchmark(
            backend=self._backend,
            scene=self._scene,
            num_episodes=num_episodes,
            seed=seed,
        )
        return self._build_report("objectnav", episodes)

    def _build_report(self, task: str, episodes: list[EpisodeResult]) -> BenchmarkReport:
        successes = [ep.success for ep in episodes]
        path_lengths = [ep.path_length for ep in episodes]
        optimal_lengths = [ep.optimal_length for ep in episodes]
        durations = [ep.duration for ep in episodes]
        coverages = [ep.coverage for ep in episodes]

        return BenchmarkReport(
            task=task,
            backend=self._backend,
            num_episodes=len(episodes),
            success_rate=compute_success_rate(successes),
            spl=compute_spl(successes, path_lengths, optimal_lengths),
            avg_duration=sum(durations) / len(durations) if durations else 0.0,
            avg_path_length=sum(path_lengths) / len(path_lengths) if path_lengths else 0.0,
            coverage=sum(coverages) / len(coverages) if coverages else 0.0,
            episodes=episodes,
            timestamp=time.strftime("%Y-%m-%dT%H:%M:%S"),
        )

run_pointnav(num_episodes=100, seed=42) async

Run point-to-point navigation benchmark using scene poses.

Source code in sdk/threewe/src/threewe/benchmark/runner.py
async def run_pointnav(
    self,
    num_episodes: int = 100,
    seed: int = 42,
) -> BenchmarkReport:
    """Run point-to-point navigation benchmark using scene poses."""
    import numpy as np

    from threewe import Robot
    from threewe.scenes import load_scene

    rng = np.random.default_rng(seed)
    scene = load_scene(self._scene)
    episodes: list[EpisodeResult] = []

    async with Robot(backend=self._backend, auto_connect=True) as robot:
        for ep in range(num_episodes):
            goal = scene.goal_poses[rng.integers(0, len(scene.goal_poses))]

            start_pose = robot.get_pose()
            optimal = math.sqrt((goal.x - start_pose.x) ** 2 + (goal.y - start_pose.y) ** 2)

            start_time = time.time()
            result = await robot.move_to(x=goal.x, y=goal.y, timeout=30.0)
            duration = time.time() - start_time

            episodes.append(
                EpisodeResult(
                    episode_id=ep,
                    success=result.success,
                    duration=duration,
                    path_length=result.distance,
                    optimal_length=optimal,
                    reason=result.reason,
                )
            )

    return self._build_report("pointnav", episodes)

run_exploration(num_episodes=10, timeout_per_episode=120.0) async

Run exploration coverage benchmark.

Source code in sdk/threewe/src/threewe/benchmark/runner.py
async def run_exploration(
    self,
    num_episodes: int = 10,
    timeout_per_episode: float = 120.0,
) -> BenchmarkReport:
    """Run exploration coverage benchmark."""
    from threewe import Robot

    episodes: list[EpisodeResult] = []

    async with Robot(backend=self._backend, auto_connect=True) as robot:
        for ep in range(num_episodes):
            start_time = time.time()
            result = await robot.explore(timeout=timeout_per_episode)
            duration = time.time() - start_time

            episodes.append(
                EpisodeResult(
                    episode_id=ep,
                    success=result.coverage >= 0.8,
                    duration=duration,
                    path_length=0.0,
                    coverage=result.coverage,
                    reason="timed_out" if result.timed_out else "completed",
                )
            )

    return self._build_report("exploration", episodes)

run_objectnav(num_episodes=100, seed=42) async

Run object-goal navigation benchmark using scene poses.

Source code in sdk/threewe/src/threewe/benchmark/runner.py
async def run_objectnav(
    self,
    num_episodes: int = 100,
    seed: int = 42,
) -> BenchmarkReport:
    """Run object-goal navigation benchmark using scene poses."""
    from threewe.benchmark.objectnav_runner import run_objectnav_benchmark

    episodes = await run_objectnav_benchmark(
        backend=self._backend,
        scene=self._scene,
        num_episodes=num_episodes,
        seed=seed,
    )
    return self._build_report("objectnav", episodes)

compare_to_baseline(baseline_name, success_rate, spl, avg_duration, coverage=0.0)

Compare new results to a stored baseline.

Parameters:

Name Type Description Default
baseline_name str

Key in BASELINES dict.

required
success_rate float

New success rate.

required
spl float

New SPL value.

required
avg_duration float

New average duration.

required
coverage float

New coverage (for exploration tasks).

0.0

Returns:

Type Description
ComparisonResult

ComparisonResult with deltas and improvement flag.

Raises:

Type Description
ValueError

If baseline name is unknown.

Source code in sdk/threewe/src/threewe/benchmark/baselines.py
def compare_to_baseline(
    baseline_name: str,
    success_rate: float,
    spl: float,
    avg_duration: float,
    coverage: float = 0.0,
) -> ComparisonResult:
    """Compare new results to a stored baseline.

    Args:
        baseline_name: Key in BASELINES dict.
        success_rate: New success rate.
        spl: New SPL value.
        avg_duration: New average duration.
        coverage: New coverage (for exploration tasks).

    Returns:
        ComparisonResult with deltas and improvement flag.

    Raises:
        ValueError: If baseline name is unknown.
    """
    if baseline_name not in BASELINES:
        raise ValueError(
            f"Unknown baseline: '{baseline_name}'. Available: {list(BASELINES.keys())}"
        )

    baseline = BASELINES[baseline_name]
    sr_delta = success_rate - baseline.success_rate
    spl_delta = spl - baseline.spl
    dur_delta = avg_duration - baseline.avg_duration
    cov_delta = coverage - baseline.coverage

    improved = sr_delta >= 0 and spl_delta >= 0

    return ComparisonResult(
        baseline_name=baseline_name,
        baseline=baseline,
        success_rate_delta=sr_delta,
        spl_delta=spl_delta,
        duration_delta=dur_delta,
        coverage_delta=cov_delta,
        improved=improved,
    )

list_baselines()

Return names of all available baselines.

Source code in sdk/threewe/src/threewe/benchmark/baselines.py
def list_baselines() -> list[str]:
    """Return names of all available baselines."""
    return list(BASELINES.keys())

rank_entries(entries, metric='spl')

Rank leaderboard entries by a given metric (descending).

Parameters:

Name Type Description Default
entries list[LeaderboardEntry]

List of entries to rank.

required
metric str

Metric to sort by ("spl", "success_rate").

'spl'

Returns:

Type Description
list[LeaderboardEntry]

Sorted list from best to worst.

Source code in sdk/threewe/src/threewe/benchmark/leaderboard.py
def rank_entries(entries: list[LeaderboardEntry], metric: str = "spl") -> list[LeaderboardEntry]:
    """Rank leaderboard entries by a given metric (descending).

    Args:
        entries: List of entries to rank.
        metric: Metric to sort by ("spl", "success_rate").

    Returns:
        Sorted list from best to worst.
    """
    if metric not in ("spl", "success_rate"):
        raise ValueError(f"Invalid metric: '{metric}'. Choose from: 'spl', 'success_rate'")

    return sorted(entries, key=lambda e: getattr(e, metric), reverse=True)

validate_submission(data)

Validate a leaderboard submission dict.

Returns:

Type Description
list[str]

List of error messages. Empty list means valid.

Source code in sdk/threewe/src/threewe/benchmark/leaderboard.py
def validate_submission(data: dict) -> list[str]:
    """Validate a leaderboard submission dict.

    Returns:
        List of error messages. Empty list means valid.
    """
    errors: list[str] = []

    for key, expected_type in SUBMISSION_SCHEMA.items():
        if key not in data:
            errors.append(f"Missing required field: '{key}'")
        elif not isinstance(data[key], expected_type):
            errors.append(
                f"Field '{key}' must be {expected_type.__name__}, got {type(data[key]).__name__}"
            )

    if "task" in data and data["task"] not in _VALID_TASKS:
        errors.append(f"Invalid task: '{data['task']}'. Must be one of: {_VALID_TASKS}")

    if "success_rate" in data and isinstance(data["success_rate"], (int, float)):
        if not (0.0 <= data["success_rate"] <= 1.0):
            errors.append("success_rate must be between 0.0 and 1.0")

    if "spl" in data and isinstance(data["spl"], (int, float)):
        if not (0.0 <= data["spl"] <= 1.0):
            errors.append("spl must be between 0.0 and 1.0")

    return errors

compute_spl(successes, path_lengths, optimal_lengths)

Compute Success weighted by Path Length (SPL).

SPL = (1/N) * sum( S_i * (L_i / max(P_i, L_i)) ) where S_i = success, L_i = optimal path, P_i = actual path.

Source code in sdk/threewe/src/threewe/benchmark/metrics.py
def compute_spl(
    successes: list[bool],
    path_lengths: list[float],
    optimal_lengths: list[float],
) -> float:
    """Compute Success weighted by Path Length (SPL).

    SPL = (1/N) * sum( S_i * (L_i / max(P_i, L_i)) )
    where S_i = success, L_i = optimal path, P_i = actual path.
    """
    if not successes:
        return 0.0

    n = len(successes)
    total = 0.0
    for success, path_len, opt_len in zip(successes, path_lengths, optimal_lengths, strict=True):
        if success:
            total += opt_len / max(path_len, opt_len) if max(path_len, opt_len) > 0 else 0.0

    return total / n

compute_success_rate(successes)

Compute success rate as fraction of successful episodes.

Source code in sdk/threewe/src/threewe/benchmark/metrics.py
def compute_success_rate(successes: list[bool]) -> float:
    """Compute success rate as fraction of successful episodes."""
    if not successes:
        return 0.0
    return sum(successes) / len(successes)

generate_objectnav_episodes(scene_name, num_episodes, seed=42)

Generate ObjectNav episode configs from a scene's goal poses.

Each episode picks a random object category and goal pose from the scene.

Source code in sdk/threewe/src/threewe/benchmark/objectnav_runner.py
def generate_objectnav_episodes(
    scene_name: str,
    num_episodes: int,
    seed: int = 42,
) -> list[ObjectNavEpisodeConfig]:
    """Generate ObjectNav episode configs from a scene's goal poses.

    Each episode picks a random object category and goal pose from the scene.
    """
    import numpy as np

    from threewe.benchmark.tasks import ObjectNavTask
    from threewe.scenes import load_scene

    rng = np.random.default_rng(seed)
    scene = load_scene(scene_name)
    task = ObjectNavTask()

    episodes: list[ObjectNavEpisodeConfig] = []
    for _ in range(num_episodes):
        goal = scene.goal_poses[rng.integers(0, len(scene.goal_poses))]
        category = task.object_categories[rng.integers(0, len(task.object_categories))]
        start = scene.start_poses[rng.integers(0, len(scene.start_poses))]

        episodes.append(
            ObjectNavEpisodeConfig(
                target_category=category,
                target_position=goal,
                start_pose=start,
                success_radius=1.0,
                timeout=60.0,
            )
        )

    return episodes

run_objectnav_episode(robot, config) async

Run a single ObjectNav episode.

The robot navigates toward the target object position. Success is determined by reaching within config.success_radius of the target.

Source code in sdk/threewe/src/threewe/benchmark/objectnav_runner.py
async def run_objectnav_episode(robot, config: ObjectNavEpisodeConfig) -> EpisodeResult:
    """Run a single ObjectNav episode.

    The robot navigates toward the target object position. Success is
    determined by reaching within `config.success_radius` of the target.
    """
    start_time = time.time()

    start_pose = robot.get_pose()
    optimal_length = math.sqrt(
        (config.target_position.x - start_pose.x) ** 2
        + (config.target_position.y - start_pose.y) ** 2
    )

    result = await robot.move_to(
        x=config.target_position.x,
        y=config.target_position.y,
        timeout=config.timeout,
    )
    duration = time.time() - start_time

    final_pose = robot.get_pose()
    dist_to_target = math.sqrt(
        (config.target_position.x - final_pose.x) ** 2
        + (config.target_position.y - final_pose.y) ** 2
    )
    success = dist_to_target <= config.success_radius

    reason = "reached_object" if success else result.reason

    return EpisodeResult(
        episode_id=0,
        success=success,
        duration=duration,
        path_length=result.distance,
        optimal_length=optimal_length,
        reason=reason,
    )

get_task(name)

Get a benchmark task by name.

Raises:

Type Description
ValueError

If task name is unknown.

Source code in sdk/threewe/src/threewe/benchmark/tasks.py
def get_task(name: str) -> BenchmarkTask:
    """Get a benchmark task by name.

    Raises:
        ValueError: If task name is unknown.
    """
    if name not in TASK_REGISTRY:
        raise ValueError(f"Unknown task: '{name}'. Available: {list(TASK_REGISTRY.keys())}")
    return TASK_REGISTRY[name]

list_tasks()

Return names of all registered benchmark tasks.

Source code in sdk/threewe/src/threewe/benchmark/tasks.py
def list_tasks() -> list[str]:
    """Return names of all registered benchmark tasks."""
    return list(TASK_REGISTRY.keys())

AI Integration

threewe.ai

AI integration sub-package — VLM, VLA, and text encoding runners.

TextEncoder

Encode text instructions into fixed-dimensional vectors.

Methods:

Name Description
- "tfidf"

Zero-dependency TF-IDF-like bag-of-words encoding. Fast, deterministic, no downloads. Suitable for low-dimensional conditioning signals in simple VLA models.

- "sentence_transformers"

Uses sentence-transformers library for high-quality semantic embeddings. Requires optional dependency.

Source code in sdk/threewe/src/threewe/ai/text_encoder.py
class TextEncoder:
    """Encode text instructions into fixed-dimensional vectors.

    Methods:
        - "tfidf": Zero-dependency TF-IDF-like bag-of-words encoding.
            Fast, deterministic, no downloads. Suitable for low-dimensional
            conditioning signals in simple VLA models.
        - "sentence_transformers": Uses sentence-transformers library for
            high-quality semantic embeddings. Requires optional dependency.
    """

    def __init__(self, method: str = "tfidf", dim: int = 64) -> None:
        if method not in ("tfidf", "sentence_transformers"):
            raise ValueError(
                f"Unknown method '{method}'. Choose from: 'tfidf', 'sentence_transformers'"
            )
        self._method = method
        self._dim = dim
        self._st_model = None

        if method == "sentence_transformers":
            self._init_sentence_transformers()

    @property
    def method(self) -> str:
        return self._method

    @property
    def dim(self) -> int:
        return self._dim

    def encode(self, text: str) -> np.ndarray:
        """Encode text into a fixed-size float32 vector.

        Args:
            text: The instruction text to encode.

        Returns:
            numpy array of shape (dim,) with dtype float32.
        """
        if self._method == "tfidf":
            return self._encode_tfidf(text)
        else:
            return self._encode_sentence_transformers(text)

    def _encode_tfidf(self, text: str) -> np.ndarray:
        """Deterministic hash-based bag-of-words encoding."""
        tokens = text.lower().split()
        vec = np.zeros(self._dim, dtype=np.float32)

        if not tokens:
            return vec

        for token in tokens:
            h = hashlib.sha256(token.encode()).hexdigest()
            idx = int(h[:8], 16) % self._dim
            sign = 1.0 if int(h[8:16], 16) % 2 == 0 else -1.0
            idf = 1.0 / math.log(2.0 + len(token))
            vec[idx] += sign * idf

        norm = np.linalg.norm(vec)
        if norm > 0:
            vec /= norm

        return vec

    def _init_sentence_transformers(self) -> None:
        try:
            from sentence_transformers import SentenceTransformer
        except ImportError as e:
            raise ImportError(
                "sentence-transformers is required for method='sentence_transformers'. "
                "Install with: pip install sentence-transformers"
            ) from e

        self._st_model = SentenceTransformer("all-MiniLM-L6-v2")

    def _encode_sentence_transformers(self, text: str) -> np.ndarray:
        if self._st_model is None:
            raise RuntimeError("Sentence transformer model not initialized.")

        embedding = self._st_model.encode(text, convert_to_numpy=True)
        embedding = embedding.astype(np.float32)

        if embedding.shape[0] != self._dim:
            if embedding.shape[0] > self._dim:
                embedding = embedding[: self._dim]
            else:
                padded = np.zeros(self._dim, dtype=np.float32)
                padded[: embedding.shape[0]] = embedding
                embedding = padded

        norm = np.linalg.norm(embedding)
        if norm > 0:
            embedding /= norm

        return embedding

encode(text)

Encode text into a fixed-size float32 vector.

Parameters:

Name Type Description Default
text str

The instruction text to encode.

required

Returns:

Type Description
ndarray

numpy array of shape (dim,) with dtype float32.

Source code in sdk/threewe/src/threewe/ai/text_encoder.py
def encode(self, text: str) -> np.ndarray:
    """Encode text into a fixed-size float32 vector.

    Args:
        text: The instruction text to encode.

    Returns:
        numpy array of shape (dim,) with dtype float32.
    """
    if self._method == "tfidf":
        return self._encode_tfidf(text)
    else:
        return self._encode_sentence_transformers(text)

VLARunner

Run VLA models for end-to-end robot control.

A VLA model takes observations (image + state) and optionally a language instruction, and directly outputs action vectors (velocities/joint commands).

Source code in sdk/threewe/src/threewe/ai/vla_runner.py
class VLARunner:
    """Run VLA models for end-to-end robot control.

    A VLA model takes observations (image + state) and optionally a language
    instruction, and directly outputs action vectors (velocities/joint commands).
    """

    def __init__(self, model_path: str, device: str = "cpu") -> None:
        self._model_path = model_path
        self._device = device
        self._model: Any = None
        self._config: dict = {}
        self._text_encoder: Any = None

    @classmethod
    def from_pretrained(
        cls, model_id: str, device: str = "cpu", revision: str | None = None
    ) -> VLARunner:
        """Load a VLA model from HuggingFace Hub.

        Args:
            model_id: HuggingFace model identifier (e.g., "lerobot/act_3we_nav")
            device: Target device ("cpu", "cuda", "mps")
            revision: Specific model revision (commit hash) to pin the download.
        """
        try:
            from huggingface_hub import snapshot_download
        except ImportError as e:
            raise ImportError(
                "huggingface_hub is required to download VLA models. "
                "Install with: pip install huggingface-hub"
            ) from e

        local_dir = snapshot_download(repo_id=model_id, revision=revision)
        return cls._load_from_dir(local_dir, device)

    @classmethod
    def from_local(cls, path: str, device: str = "cpu") -> VLARunner:
        """Load a local VLA model (ONNX, PyTorch, or Hailo HEF).

        Args:
            path: Path to model directory or file
            device: Target device ("cpu", "cuda", "mps", "hailo")
        """
        if device == "hailo":
            return cls._load_hailo(path)
        return cls._load_from_dir(path, device)

    @classmethod
    def _load_from_dir(cls, path: str, device: str) -> VLARunner:
        """Load model from a local directory."""
        model_dir = Path(path)
        runner = cls(str(model_dir), device)

        onnx_path = model_dir / "model.onnx"
        if onnx_path.exists():
            runner._load_onnx(onnx_path)
            return runner

        pt_path = model_dir / "model.pt"
        if not pt_path.exists():
            pt_path = model_dir / "pytorch_model.bin"
        if pt_path.exists():
            runner._load_pytorch(pt_path)
            return runner

        raise FileNotFoundError(
            f"No model file found in {model_dir}. "
            "Expected model.onnx, model.pt, or pytorch_model.bin"
        )

    @classmethod
    def _load_hailo(cls, path: str) -> VLARunner:
        """Load a HEF model for Hailo-8L inference."""
        from threewe.ai.hailo import HailoRunner

        runner = cls(path, "hailo")
        runner._model = HailoRunner.from_hef(path)
        runner._config["runtime"] = "hailo"
        return runner

    def _load_onnx(self, path: Path) -> None:
        try:
            import onnxruntime as ort
        except ImportError as e:
            raise ImportError(
                "onnxruntime is required for ONNX VLA models. Install with: pip install onnxruntime"
            ) from e

        providers = ["CPUExecutionProvider"]
        if self._device == "cuda":
            providers.insert(0, "CUDAExecutionProvider")

        self._model = ort.InferenceSession(str(path), providers=providers)
        self._config["runtime"] = "onnx"

    def _load_pytorch(self, path: Path) -> None:
        try:
            import torch
        except ImportError as e:
            raise ImportError(
                "PyTorch is required for PyTorch VLA models. Install with: pip install torch"
            ) from e

        self._model = torch.load(  # nosemgrep
            str(path), map_location=self._device, weights_only=True
        )
        if hasattr(self._model, "eval"):
            self._model.eval()
        self._config["runtime"] = "pytorch"

    def predict(self, obs: dict[str, np.ndarray], instruction: str = "") -> np.ndarray:
        """Predict action from observation.

        Args:
            obs: Observation dict with keys like "image", "state", "lidar"
            instruction: Optional language instruction for conditioned policies

        Returns:
            Action vector as numpy array. Shape depends on model
            (typically (3,) for [vx, vy, omega] or (7,) for arm joints).
        """
        if self._model is None:
            raise RuntimeError("Model not loaded. Use from_pretrained() or from_local().")

        runtime = self._config.get("runtime", "")

        if runtime == "onnx":
            return self._predict_onnx(obs, instruction)
        elif runtime == "pytorch":
            return self._predict_pytorch(obs, instruction)
        elif runtime == "hailo":
            return self._predict_hailo(obs, instruction)
        else:
            raise RuntimeError(f"Unknown runtime: {runtime}")

    def _encode_instruction(self, instruction: str) -> np.ndarray:
        """Encode a language instruction into a fixed-size embedding."""
        if self._text_encoder is None:
            from threewe.ai.text_encoder import TextEncoder

            self._text_encoder = TextEncoder(method="tfidf", dim=64)
        return self._text_encoder.encode(instruction)

    def _predict_onnx(self, obs: dict[str, np.ndarray], instruction: str) -> np.ndarray:
        feed = {}
        for key, value in obs.items():
            if isinstance(value, np.ndarray):
                if value.ndim == 3:
                    feed[key] = value[np.newaxis].astype(np.float32) / 255.0
                else:
                    feed[key] = value[np.newaxis].astype(np.float32)

        if instruction:
            feed["instruction_embedding"] = self._encode_instruction(instruction)[
                np.newaxis
            ].astype(np.float32)

        outputs = self._model.run(None, feed)
        return outputs[0][0].astype(np.float32)

    def _predict_pytorch(self, obs: dict[str, np.ndarray], instruction: str) -> np.ndarray:
        import torch

        with torch.no_grad():
            tensors = {}
            for key, value in obs.items():
                if isinstance(value, np.ndarray):
                    t = torch.from_numpy(value).float().to(self._device)
                    if t.ndim == 3:
                        t = t.permute(2, 0, 1).unsqueeze(0) / 255.0
                    else:
                        t = t.unsqueeze(0)
                    tensors[key] = t

            if hasattr(self._model, "forward"):
                output = self._model(**tensors)
            else:
                output = self._model(tensors)

            if isinstance(output, torch.Tensor):
                return output[0].cpu().numpy().astype(np.float32)
            from threewe.logging import get_logger

            get_logger(__name__).warning("pytorch_non_tensor_output", output_type=str(type(output)))
            return np.zeros(self.action_dim, dtype=np.float32)

    @property
    def action_dim(self) -> int:
        """Expected output action dimension."""
        return self._config.get("action_dim", 3)

    def _predict_hailo(self, obs: dict[str, np.ndarray], instruction: str) -> np.ndarray:
        """Run inference on Hailo-8L NPU."""
        feed = {}
        for key, value in obs.items():
            if isinstance(value, np.ndarray):
                if value.ndim == 3:
                    feed[key] = (value.astype(np.float32) / 255.0)[np.newaxis]
                else:
                    feed[key] = value[np.newaxis].astype(np.float32)

        if instruction:
            feed["instruction_embedding"] = self._encode_instruction(instruction)[
                np.newaxis
            ].astype(np.float32)

        results = self._model.infer(feed)
        first_output = next(iter(results.values()))
        return first_output[0].astype(np.float32)

action_dim property

Expected output action dimension.

from_pretrained(model_id, device='cpu', revision=None) classmethod

Load a VLA model from HuggingFace Hub.

Parameters:

Name Type Description Default
model_id str

HuggingFace model identifier (e.g., "lerobot/act_3we_nav")

required
device str

Target device ("cpu", "cuda", "mps")

'cpu'
revision str | None

Specific model revision (commit hash) to pin the download.

None
Source code in sdk/threewe/src/threewe/ai/vla_runner.py
@classmethod
def from_pretrained(
    cls, model_id: str, device: str = "cpu", revision: str | None = None
) -> VLARunner:
    """Load a VLA model from HuggingFace Hub.

    Args:
        model_id: HuggingFace model identifier (e.g., "lerobot/act_3we_nav")
        device: Target device ("cpu", "cuda", "mps")
        revision: Specific model revision (commit hash) to pin the download.
    """
    try:
        from huggingface_hub import snapshot_download
    except ImportError as e:
        raise ImportError(
            "huggingface_hub is required to download VLA models. "
            "Install with: pip install huggingface-hub"
        ) from e

    local_dir = snapshot_download(repo_id=model_id, revision=revision)
    return cls._load_from_dir(local_dir, device)

from_local(path, device='cpu') classmethod

Load a local VLA model (ONNX, PyTorch, or Hailo HEF).

Parameters:

Name Type Description Default
path str

Path to model directory or file

required
device str

Target device ("cpu", "cuda", "mps", "hailo")

'cpu'
Source code in sdk/threewe/src/threewe/ai/vla_runner.py
@classmethod
def from_local(cls, path: str, device: str = "cpu") -> VLARunner:
    """Load a local VLA model (ONNX, PyTorch, or Hailo HEF).

    Args:
        path: Path to model directory or file
        device: Target device ("cpu", "cuda", "mps", "hailo")
    """
    if device == "hailo":
        return cls._load_hailo(path)
    return cls._load_from_dir(path, device)

predict(obs, instruction='')

Predict action from observation.

Parameters:

Name Type Description Default
obs dict[str, ndarray]

Observation dict with keys like "image", "state", "lidar"

required
instruction str

Optional language instruction for conditioned policies

''

Returns:

Type Description
ndarray

Action vector as numpy array. Shape depends on model

ndarray

(typically (3,) for [vx, vy, omega] or (7,) for arm joints).

Source code in sdk/threewe/src/threewe/ai/vla_runner.py
def predict(self, obs: dict[str, np.ndarray], instruction: str = "") -> np.ndarray:
    """Predict action from observation.

    Args:
        obs: Observation dict with keys like "image", "state", "lidar"
        instruction: Optional language instruction for conditioned policies

    Returns:
        Action vector as numpy array. Shape depends on model
        (typically (3,) for [vx, vy, omega] or (7,) for arm joints).
    """
    if self._model is None:
        raise RuntimeError("Model not loaded. Use from_pretrained() or from_local().")

    runtime = self._config.get("runtime", "")

    if runtime == "onnx":
        return self._predict_onnx(obs, instruction)
    elif runtime == "pytorch":
        return self._predict_pytorch(obs, instruction)
    elif runtime == "hailo":
        return self._predict_hailo(obs, instruction)
    else:
        raise RuntimeError(f"Unknown runtime: {runtime}")

VLMRunner

Runs VLM inference for robot instruction execution.

Source code in sdk/threewe/src/threewe/ai/vlm_runner.py
class VLMRunner:
    """Runs VLM inference for robot instruction execution."""

    def __init__(
        self,
        model: str = "gpt-4o",
        api_key: str | None = None,
        base_url: str | None = None,
        max_steps: int = 20,
        temperature: float = 0.0,
    ) -> None:
        self._model = model
        self._api_key = api_key
        self._base_url = base_url
        self._max_steps = max_steps
        self._temperature = temperature
        self._client = None

    @classmethod
    def from_env(cls, max_steps: int = 20, temperature: float = 0.0) -> VLMRunner:
        """Create a VLMRunner from environment variables.

        Reads:
            THREEWE_VLM_MODEL: Model name (default: gpt-4o)
            OPENAI_API_KEY: API key for OpenAI or compatible endpoint
            THREEWE_VLM_BASE_URL: Custom API endpoint (for Qwen-VL, local models)
            THREEWE_VLM_MAX_STEPS: Max steps per instruction (default: 20)
            THREEWE_VLM_TEMPERATURE: Sampling temperature (default: 0.0)
        """
        return cls(
            model=os.environ.get("THREEWE_VLM_MODEL", "gpt-4o"),
            api_key=os.environ.get("OPENAI_API_KEY"),
            base_url=os.environ.get("THREEWE_VLM_BASE_URL"),
            max_steps=int(os.environ.get("THREEWE_VLM_MAX_STEPS", str(max_steps))),
            temperature=float(os.environ.get("THREEWE_VLM_TEMPERATURE", str(temperature))),
        )

    def _get_client(self):
        if self._client is not None:
            return self._client
        try:
            from openai import OpenAI
        except ImportError as e:
            raise ImportError(
                "openai package is required for VLM integration. "
                "Install with: pip install threewe[ai]"
            ) from e

        kwargs = {}
        if self._api_key is not None:
            kwargs["api_key"] = self._api_key
        if self._base_url is not None:
            kwargs["base_url"] = self._base_url

        self._client = OpenAI(**kwargs)
        return self._client

    def _encode_image(self, image: np.ndarray) -> str:
        try:
            from PIL import Image
        except ImportError as e:
            raise ImportError(
                "Pillow is required for image encoding. Install with: pip install threewe[ai]"
            ) from e

        if image.ndim == 2:
            pil_image = Image.fromarray(image, mode="L")
        elif image.ndim == 3 and image.shape[2] == 3:
            pil_image = Image.fromarray(image[:, :, ::-1])
        elif image.ndim == 3 and image.shape[2] == 4:
            pil_image = Image.fromarray(image[:, :, 2::-1])  # BGRA -> RGB
        else:
            pil_image = Image.fromarray(image)

        buffer = io.BytesIO()
        pil_image.save(buffer, format="JPEG", quality=85)
        return base64.b64encode(buffer.getvalue()).decode("utf-8")

    def _build_system_prompt(self, instruction: str) -> str:
        base_prompt = (
            "You are a robot navigation assistant. "
            "Given an image from the robot's camera "
            "and an instruction, output a single JSON action: "
            '{"action": "move_forward"|"rotate_left"|'
            '"rotate_right"|"stop"|"done", '
            '"distance": <meters>, "angle": <radians>, '
            '"reason": "<brief>"}. '
            "Only output valid JSON, nothing else."
        )
        has_cjk = any("一" <= ch <= "鿿" for ch in instruction)
        if has_cjk:
            base_prompt += (
                " The instruction is in Chinese. "
                "Respond with reason in Chinese but keep action keys in English."
            )
        return base_prompt

    def plan(self, image: np.ndarray, instruction: str) -> str:
        """Get a single-step action plan from the VLM given an image and instruction."""
        client = self._get_client()
        b64_image = self._encode_image(image)

        response = client.chat.completions.create(
            model=self._model,
            temperature=self._temperature,
            messages=[
                {
                    "role": "system",
                    "content": self._build_system_prompt(instruction),
                },
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": instruction},
                        {
                            "type": "image_url",
                            "image_url": {"url": f"data:image/jpeg;base64,{b64_image}"},
                        },
                    ],
                },
            ],
            max_tokens=200,
        )

        return response.choices[0].message.content or '{"action": "stop", "reason": "no response"}'

from_env(max_steps=20, temperature=0.0) classmethod

Create a VLMRunner from environment variables.

Reads

THREEWE_VLM_MODEL: Model name (default: gpt-4o) OPENAI_API_KEY: API key for OpenAI or compatible endpoint THREEWE_VLM_BASE_URL: Custom API endpoint (for Qwen-VL, local models) THREEWE_VLM_MAX_STEPS: Max steps per instruction (default: 20) THREEWE_VLM_TEMPERATURE: Sampling temperature (default: 0.0)

Source code in sdk/threewe/src/threewe/ai/vlm_runner.py
@classmethod
def from_env(cls, max_steps: int = 20, temperature: float = 0.0) -> VLMRunner:
    """Create a VLMRunner from environment variables.

    Reads:
        THREEWE_VLM_MODEL: Model name (default: gpt-4o)
        OPENAI_API_KEY: API key for OpenAI or compatible endpoint
        THREEWE_VLM_BASE_URL: Custom API endpoint (for Qwen-VL, local models)
        THREEWE_VLM_MAX_STEPS: Max steps per instruction (default: 20)
        THREEWE_VLM_TEMPERATURE: Sampling temperature (default: 0.0)
    """
    return cls(
        model=os.environ.get("THREEWE_VLM_MODEL", "gpt-4o"),
        api_key=os.environ.get("OPENAI_API_KEY"),
        base_url=os.environ.get("THREEWE_VLM_BASE_URL"),
        max_steps=int(os.environ.get("THREEWE_VLM_MAX_STEPS", str(max_steps))),
        temperature=float(os.environ.get("THREEWE_VLM_TEMPERATURE", str(temperature))),
    )

plan(image, instruction)

Get a single-step action plan from the VLM given an image and instruction.

Source code in sdk/threewe/src/threewe/ai/vlm_runner.py
def plan(self, image: np.ndarray, instruction: str) -> str:
    """Get a single-step action plan from the VLM given an image and instruction."""
    client = self._get_client()
    b64_image = self._encode_image(image)

    response = client.chat.completions.create(
        model=self._model,
        temperature=self._temperature,
        messages=[
            {
                "role": "system",
                "content": self._build_system_prompt(instruction),
            },
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": instruction},
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:image/jpeg;base64,{b64_image}"},
                    },
                ],
            },
        ],
        max_tokens=200,
    )

    return response.choices[0].message.content or '{"action": "stop", "reason": "no response"}'

execute_vlm_instruction(robot, instruction, model='gpt-4o', api_key=None, base_url=None, max_steps=20, on_step=None) async

Execute a natural language instruction using a VLM in a perception-action loop.

The VLM observes camera images and generates navigation actions until the instruction is fulfilled or max_steps is reached.

Parameters:

Name Type Description Default
robot Robot

Connected Robot instance.

required
instruction str

Natural language task description.

required
model str

VLM model name.

'gpt-4o'
api_key str | None

Optional API key override.

None
base_url str | None

Optional API base URL override.

None
max_steps int

Maximum perception-action cycles.

20
on_step StepCallback | None

Optional callback invoked after each step with (step_number, raw_response, parsed_action).

None
Source code in sdk/threewe/src/threewe/ai/vlm_runner.py
async def execute_vlm_instruction(
    robot: Robot,
    instruction: str,
    model: str = "gpt-4o",
    api_key: str | None = None,
    base_url: str | None = None,
    max_steps: int = 20,
    on_step: StepCallback | None = None,
) -> ExecutionResult:
    """Execute a natural language instruction using a VLM in a perception-action loop.

    The VLM observes camera images and generates navigation actions
    until the instruction is fulfilled or max_steps is reached.

    Args:
        robot: Connected Robot instance.
        instruction: Natural language task description.
        model: VLM model name.
        api_key: Optional API key override.
        base_url: Optional API base URL override.
        max_steps: Maximum perception-action cycles.
        on_step: Optional callback invoked after each step with
            (step_number, raw_response, parsed_action).
    """
    import json

    runner = VLMRunner(model=model, api_key=api_key, base_url=base_url, max_steps=max_steps)
    images_collected: list = []

    for step in range(max_steps):
        image = robot.get_image()
        images_collected.append(image)

        try:
            response_text = runner.plan(image, instruction)
            action = json.loads(response_text)
        except json.JSONDecodeError:
            if on_step:
                on_step(step, response_text if "response_text" in dir() else "", {})
            continue
        except Exception as e:
            from threewe.logging import get_logger

            get_logger(__name__).warning("vlm_step_error", step=step, error=str(e))
            if on_step:
                on_step(step, "", {})
            continue

        if on_step:
            on_step(step, response_text, action)

        cmd = action.get("action", "stop")

        if cmd == "done":
            return ExecutionResult(
                success=True,
                description=action.get("reason", "instruction completed"),
                images=images_collected,
            )
        elif cmd == "move_forward":
            dist = float(action.get("distance", 0.3))
            await robot.move_forward(dist)
        elif cmd == "rotate_left":
            angle = float(action.get("angle", 0.5))
            await robot.rotate(angle)
        elif cmd == "rotate_right":
            angle = float(action.get("angle", 0.5))
            await robot.rotate(-angle)
        elif cmd == "stop":
            robot.stop()

    return ExecutionResult(
        success=False,
        description=f"max steps ({max_steps}) reached without completion",
        images=images_collected,
    )

Data Recording

threewe.data

Data recording and export for trajectory collection.

Supports: - HDF5 format for efficient storage - LeRobot-compatible Parquet + MP4 export - HuggingFace Hub push/pull

TrajectoryRecorder

Records robot trajectories for imitation learning.

Usage

recorder = TrajectoryRecorder(robot) recorder.start_episode()

... control robot ...

recorder.record_step(action=[0.1, 0.0, 0.0]) recorder.end_episode() recorder.save_hdf5("trajectories.h5")

Source code in sdk/threewe/src/threewe/data/recorder.py
class TrajectoryRecorder:
    """Records robot trajectories for imitation learning.

    Usage:
        recorder = TrajectoryRecorder(robot)
        recorder.start_episode()
        # ... control robot ...
        recorder.record_step(action=[0.1, 0.0, 0.0])
        recorder.end_episode()
        recorder.save_hdf5("trajectories.h5")
    """

    def __init__(
        self,
        robot: Robot,
        record_lidar: bool = False,
        record_depth: bool = False,
    ) -> None:
        self._robot = robot
        self._record_lidar = record_lidar
        self._record_depth = record_depth
        self._episodes: list[Episode] = []
        self._current_episode: Episode | None = None

    @property
    def num_episodes(self) -> int:
        return len(self._episodes)

    @property
    def total_steps(self) -> int:
        return sum(ep.length for ep in self._episodes)

    def start_episode(self, metadata: dict | None = None) -> None:
        """Start recording a new episode."""
        self._current_episode = Episode(metadata=metadata or {})

    def record_step(self, action: list[float] | np.ndarray) -> None:
        """Record current observation and the action taken."""
        if self._current_episode is None:
            raise RuntimeError("Call start_episode() before recording steps.")

        image = self._robot.get_image()
        pose = self._robot.get_pose()
        velocity = self._robot.get_velocity()

        lidar = None
        if self._record_lidar:
            scan = self._robot.get_lidar_scan()
            lidar = scan.ranges

        depth = None
        if self._record_depth:
            rgbd = self._robot.get_rgbd_image()
            depth = rgbd.depth

        step = TimeStep(
            timestamp=time.time(),
            image=image,
            pose=np.array([pose.x, pose.y, pose.theta], dtype=np.float32),
            velocity=np.array([velocity.vx, velocity.vy, velocity.omega], dtype=np.float32),
            action=np.asarray(action, dtype=np.float32),
            lidar=lidar,
            depth=depth,
        )
        self._current_episode.steps.append(step)

    def end_episode(self) -> Episode:
        """End current episode and store it."""
        if self._current_episode is None:
            raise RuntimeError("No episode in progress.")
        episode = self._current_episode
        self._episodes.append(episode)
        self._current_episode = None
        return episode

    def save_hdf5(self, path: str | Path) -> None:
        """Save all episodes to HDF5 format.

        Structure:
            /episode_000/
                images: (T, H, W, 3) uint8
                poses: (T, 3) float32
                velocities: (T, 3) float32
                actions: (T, 3) float32
                timestamps: (T,) float64
                [lidar: (T, N) float32]
                [depth: (T, H, W) float32]
        """
        try:
            import h5py
        except ImportError as e:
            raise ImportError(
                "h5py is required for HDF5 export. Install with: pip install threewe[data]"
            ) from e

        path = Path(path)
        path.parent.mkdir(parents=True, exist_ok=True)

        with h5py.File(path, "w") as f:
            f.attrs["num_episodes"] = len(self._episodes)
            f.attrs["total_steps"] = self.total_steps

            for i, episode in enumerate(self._episodes):
                grp = f.create_group(f"episode_{i:03d}")
                grp.attrs["length"] = episode.length
                grp.attrs["duration"] = episode.duration
                for key, value in episode.metadata.items():
                    grp.attrs[key] = value

                if episode.length == 0:
                    continue

                images = np.stack([s.image for s in episode.steps])
                poses = np.stack([s.pose for s in episode.steps])
                velocities = np.stack([s.velocity for s in episode.steps])
                actions = np.stack([s.action for s in episode.steps])
                timestamps = np.array([s.timestamp for s in episode.steps], dtype=np.float64)

                grp.create_dataset("images", data=images, compression="gzip", compression_opts=4)
                grp.create_dataset("poses", data=poses)
                grp.create_dataset("velocities", data=velocities)
                grp.create_dataset("actions", data=actions)
                grp.create_dataset("timestamps", data=timestamps)

                if self._record_lidar and episode.steps[0].lidar is not None:
                    lidar_data = np.stack([s.lidar for s in episode.steps])
                    grp.create_dataset("lidar", data=lidar_data)

                if self._record_depth and episode.steps[0].depth is not None:
                    depth_data = np.stack([s.depth for s in episode.steps])
                    grp.create_dataset(
                        "depth", data=depth_data, compression="gzip", compression_opts=4
                    )

    def save_lerobot(self, output_dir: str | Path) -> None:
        """Export episodes in LeRobot-compatible format (Parquet + MP4).

        Creates:
            output_dir/
                data/
                    episode_000.parquet
                    episode_001.parquet
                    ...
                videos/
                    episode_000.mp4
                    episode_001.mp4
                    ...
                meta/
                    info.json
        """
        import json

        output_dir = Path(output_dir)
        data_dir = output_dir / "data"
        videos_dir = output_dir / "videos"
        meta_dir = output_dir / "meta"

        data_dir.mkdir(parents=True, exist_ok=True)
        videos_dir.mkdir(parents=True, exist_ok=True)
        meta_dir.mkdir(parents=True, exist_ok=True)

        for i, episode in enumerate(self._episodes):
            if episode.length == 0:
                continue

            self._export_episode_parquet(episode, data_dir / f"episode_{i:03d}.parquet")
            self._export_episode_video(episode, videos_dir / f"episode_{i:03d}.mp4")

        info = {
            "num_episodes": len(self._episodes),
            "total_steps": self.total_steps,
            "fps": 10,
            "observation_keys": ["image", "pose", "velocity"],
            "action_key": "action",
            "action_dim": 3,
        }
        (meta_dir / "info.json").write_text(json.dumps(info, indent=2))

    def _export_episode_parquet(self, episode: Episode, path: Path) -> None:
        """Export a single episode as Parquet."""
        try:
            import pyarrow as pa
            import pyarrow.parquet as pq
        except ImportError:
            rows = []
            for step in episode.steps:
                rows.append(
                    {
                        "timestamp": step.timestamp,
                        "pose_x": step.pose[0],
                        "pose_y": step.pose[1],
                        "pose_theta": step.pose[2],
                        "vel_vx": step.velocity[0],
                        "vel_vy": step.velocity[1],
                        "vel_omega": step.velocity[2],
                        "action_vx": step.action[0],
                        "action_vy": step.action[1],
                        "action_omega": step.action[2],
                    }
                )
            import csv

            with open(path.with_suffix(".csv"), "w", newline="") as f:
                writer = csv.DictWriter(f, fieldnames=rows[0].keys())
                writer.writeheader()
                writer.writerows(rows)
            return

        table = pa.table(
            {
                "timestamp": [s.timestamp for s in episode.steps],
                "pose_x": [float(s.pose[0]) for s in episode.steps],
                "pose_y": [float(s.pose[1]) for s in episode.steps],
                "pose_theta": [float(s.pose[2]) for s in episode.steps],
                "vel_vx": [float(s.velocity[0]) for s in episode.steps],
                "vel_vy": [float(s.velocity[1]) for s in episode.steps],
                "vel_omega": [float(s.velocity[2]) for s in episode.steps],
                "action_vx": [float(s.action[0]) for s in episode.steps],
                "action_vy": [float(s.action[1]) for s in episode.steps],
                "action_omega": [float(s.action[2]) for s in episode.steps],
            }
        )
        pq.write_table(table, path)

    def _export_episode_video(self, episode: Episode, path: Path) -> None:
        """Export episode images as MP4 video."""
        try:
            import cv2

            h, w = episode.steps[0].image.shape[:2]
            fourcc = cv2.VideoWriter_fourcc(*"mp4v")
            writer = cv2.VideoWriter(str(path), fourcc, 10.0, (w, h))
            for step in episode.steps:
                writer.write(step.image)
            writer.release()
        except ImportError:
            pass

start_episode(metadata=None)

Start recording a new episode.

Source code in sdk/threewe/src/threewe/data/recorder.py
def start_episode(self, metadata: dict | None = None) -> None:
    """Start recording a new episode."""
    self._current_episode = Episode(metadata=metadata or {})

record_step(action)

Record current observation and the action taken.

Source code in sdk/threewe/src/threewe/data/recorder.py
def record_step(self, action: list[float] | np.ndarray) -> None:
    """Record current observation and the action taken."""
    if self._current_episode is None:
        raise RuntimeError("Call start_episode() before recording steps.")

    image = self._robot.get_image()
    pose = self._robot.get_pose()
    velocity = self._robot.get_velocity()

    lidar = None
    if self._record_lidar:
        scan = self._robot.get_lidar_scan()
        lidar = scan.ranges

    depth = None
    if self._record_depth:
        rgbd = self._robot.get_rgbd_image()
        depth = rgbd.depth

    step = TimeStep(
        timestamp=time.time(),
        image=image,
        pose=np.array([pose.x, pose.y, pose.theta], dtype=np.float32),
        velocity=np.array([velocity.vx, velocity.vy, velocity.omega], dtype=np.float32),
        action=np.asarray(action, dtype=np.float32),
        lidar=lidar,
        depth=depth,
    )
    self._current_episode.steps.append(step)

end_episode()

End current episode and store it.

Source code in sdk/threewe/src/threewe/data/recorder.py
def end_episode(self) -> Episode:
    """End current episode and store it."""
    if self._current_episode is None:
        raise RuntimeError("No episode in progress.")
    episode = self._current_episode
    self._episodes.append(episode)
    self._current_episode = None
    return episode

save_hdf5(path)

Save all episodes to HDF5 format.

Structure

/episode_000/ images: (T, H, W, 3) uint8 poses: (T, 3) float32 velocities: (T, 3) float32 actions: (T, 3) float32 timestamps: (T,) float64 [lidar: (T, N) float32] [depth: (T, H, W) float32]

Source code in sdk/threewe/src/threewe/data/recorder.py
def save_hdf5(self, path: str | Path) -> None:
    """Save all episodes to HDF5 format.

    Structure:
        /episode_000/
            images: (T, H, W, 3) uint8
            poses: (T, 3) float32
            velocities: (T, 3) float32
            actions: (T, 3) float32
            timestamps: (T,) float64
            [lidar: (T, N) float32]
            [depth: (T, H, W) float32]
    """
    try:
        import h5py
    except ImportError as e:
        raise ImportError(
            "h5py is required for HDF5 export. Install with: pip install threewe[data]"
        ) from e

    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)

    with h5py.File(path, "w") as f:
        f.attrs["num_episodes"] = len(self._episodes)
        f.attrs["total_steps"] = self.total_steps

        for i, episode in enumerate(self._episodes):
            grp = f.create_group(f"episode_{i:03d}")
            grp.attrs["length"] = episode.length
            grp.attrs["duration"] = episode.duration
            for key, value in episode.metadata.items():
                grp.attrs[key] = value

            if episode.length == 0:
                continue

            images = np.stack([s.image for s in episode.steps])
            poses = np.stack([s.pose for s in episode.steps])
            velocities = np.stack([s.velocity for s in episode.steps])
            actions = np.stack([s.action for s in episode.steps])
            timestamps = np.array([s.timestamp for s in episode.steps], dtype=np.float64)

            grp.create_dataset("images", data=images, compression="gzip", compression_opts=4)
            grp.create_dataset("poses", data=poses)
            grp.create_dataset("velocities", data=velocities)
            grp.create_dataset("actions", data=actions)
            grp.create_dataset("timestamps", data=timestamps)

            if self._record_lidar and episode.steps[0].lidar is not None:
                lidar_data = np.stack([s.lidar for s in episode.steps])
                grp.create_dataset("lidar", data=lidar_data)

            if self._record_depth and episode.steps[0].depth is not None:
                depth_data = np.stack([s.depth for s in episode.steps])
                grp.create_dataset(
                    "depth", data=depth_data, compression="gzip", compression_opts=4
                )

save_lerobot(output_dir)

Export episodes in LeRobot-compatible format (Parquet + MP4).

Creates

output_dir/ data/ episode_000.parquet episode_001.parquet ... videos/ episode_000.mp4 episode_001.mp4 ... meta/ info.json

Source code in sdk/threewe/src/threewe/data/recorder.py
def save_lerobot(self, output_dir: str | Path) -> None:
    """Export episodes in LeRobot-compatible format (Parquet + MP4).

    Creates:
        output_dir/
            data/
                episode_000.parquet
                episode_001.parquet
                ...
            videos/
                episode_000.mp4
                episode_001.mp4
                ...
            meta/
                info.json
    """
    import json

    output_dir = Path(output_dir)
    data_dir = output_dir / "data"
    videos_dir = output_dir / "videos"
    meta_dir = output_dir / "meta"

    data_dir.mkdir(parents=True, exist_ok=True)
    videos_dir.mkdir(parents=True, exist_ok=True)
    meta_dir.mkdir(parents=True, exist_ok=True)

    for i, episode in enumerate(self._episodes):
        if episode.length == 0:
            continue

        self._export_episode_parquet(episode, data_dir / f"episode_{i:03d}.parquet")
        self._export_episode_video(episode, videos_dir / f"episode_{i:03d}.mp4")

    info = {
        "num_episodes": len(self._episodes),
        "total_steps": self.total_steps,
        "fps": 10,
        "observation_keys": ["image", "pose", "velocity"],
        "action_key": "action",
        "action_dim": 3,
    }
    (meta_dir / "info.json").write_text(json.dumps(info, indent=2))

pull_dataset(repo_id, local_dir, token=None, revision=None)

Pull a LeRobot dataset from HuggingFace Hub.

Parameters:

Name Type Description Default
repo_id str

HuggingFace repo ID.

required
local_dir str | Path

Local directory to save to.

required
token str | None

HuggingFace API token.

None
revision str | None

Specific dataset revision (commit hash) to pin the download.

None

Returns:

Type Description
Path

Path to the downloaded dataset.

Raises:

Type Description
ImportError

If huggingface-hub is not installed.

Source code in sdk/threewe/src/threewe/data/hub.py
def pull_dataset(
    repo_id: str,
    local_dir: str | Path,
    token: str | None = None,
    revision: str | None = None,
) -> Path:
    """Pull a LeRobot dataset from HuggingFace Hub.

    Args:
        repo_id: HuggingFace repo ID.
        local_dir: Local directory to save to.
        token: HuggingFace API token.
        revision: Specific dataset revision (commit hash) to pin the download.

    Returns:
        Path to the downloaded dataset.

    Raises:
        ImportError: If huggingface-hub is not installed.
    """
    try:
        from huggingface_hub import snapshot_download
    except ImportError as exc:
        raise ImportError(
            "huggingface-hub is required for Hub operations. Install with: pip install threewe[hub]"
        ) from exc

    local_dir = Path(local_dir)
    local_dir.mkdir(parents=True, exist_ok=True)

    path = snapshot_download(
        repo_id=repo_id,
        repo_type="dataset",
        local_dir=str(local_dir),
        token=token,
        revision=revision,
    )
    return Path(path)

push_dataset(local_dir, repo_id, token=None)

Push a LeRobot dataset directory to HuggingFace Hub.

Parameters:

Name Type Description Default
local_dir str | Path

Local directory containing data/, videos/, meta/.

required
repo_id str

HuggingFace repo ID (e.g. 'user/dataset-name').

required
token str | None

HuggingFace API token. Uses cached token if None.

None

Returns:

Type Description
str

URL of the uploaded dataset.

Raises:

Type Description
ImportError

If huggingface-hub is not installed.

FileNotFoundError

If local_dir doesn't exist.

Source code in sdk/threewe/src/threewe/data/hub.py
def push_dataset(
    local_dir: str | Path,
    repo_id: str,
    token: str | None = None,
) -> str:
    """Push a LeRobot dataset directory to HuggingFace Hub.

    Args:
        local_dir: Local directory containing data/, videos/, meta/.
        repo_id: HuggingFace repo ID (e.g. 'user/dataset-name').
        token: HuggingFace API token. Uses cached token if None.

    Returns:
        URL of the uploaded dataset.

    Raises:
        ImportError: If huggingface-hub is not installed.
        FileNotFoundError: If local_dir doesn't exist.
    """
    try:
        from huggingface_hub import HfApi
    except ImportError as exc:
        raise ImportError(
            "huggingface-hub is required for Hub operations. Install with: pip install threewe[hub]"
        ) from exc

    local_dir = Path(local_dir)
    if not local_dir.exists():
        raise FileNotFoundError(f"Dataset directory not found: {local_dir}")

    api = HfApi(token=token)
    api.create_repo(repo_id=repo_id, repo_type="dataset", exist_ok=True)
    api.upload_folder(
        folder_path=str(local_dir),
        repo_id=repo_id,
        repo_type="dataset",
    )

    return f"https://huggingface.co/datasets/{repo_id}"