Commands & Remote Control
Commands let you send instructions from the Plexus dashboard to your device in real time. You define commands in Python with type annotations, and the dashboard auto-generates UI controls -- sliders, dropdowns, toggles, and text inputs.
Defining a Command
Use the @px.command decorator to register a function as a remote command. Use @param to define typed parameters with constraints.
from plexus import Plexus, param
px = Plexus()
@px.command("set_speed", description="Set motor speed")
@param("rpm", type="float", min=0, max=10000, step=50, unit="rpm", default=0)
async def set_speed(rpm):
motor.set_rpm(rpm)
return {"actual_rpm": motor.read_rpm()}When you register this command, the dashboard shows a slider labeled "rpm" with a range of 0--10000 and a unit suffix.
Parameter Types
| Type | Dashboard Control | Example |
|---|---|---|
float | Slider or number input | Speed, temperature setpoint |
int | Slider or number input (whole numbers) | Servo angle, step count |
string | Text input | File path, label |
bool | Toggle switch | Enable/disable, on/off |
enum | Dropdown menu | Operating mode, gear selection |
Parameter Constraints
Use @param to define validation rules. The dashboard enforces these constraints in the UI, and the agent validates them server-side before executing.
| Constraint | Type | Description |
|---|---|---|
min | float | int | Minimum allowed value |
max | float | int | Maximum allowed value |
step | float | int | Step size for slider increments |
unit | str | Display unit (e.g., "rpm", "°C", "mm/s") |
choices | list | Valid options for enum type |
default | any | Default value shown in the dashboard |
description | str | Human-readable description of the parameter |
required | bool | Whether the parameter is required (default True; auto-set to False when default is provided) |
Full Example
Here is a complete example with two commands using different parameter types:
from plexus import Plexus, param
import asyncio
px = Plexus()
@px.command("set_speed", description="Set motor speed")
@param("rpm", type="float", min=0, max=10000, step=50, unit="rpm", default=0)
async def set_speed(rpm):
motor.set_rpm(rpm)
await asyncio.sleep(0.5) # Wait for motor to reach target
actual = motor.read_rpm()
return {"actual_rpm": actual, "at_target": abs(actual - rpm) < 50}
@px.command("set_mode", description="Set operating mode")
@param("mode", type="enum", choices=["idle", "run", "calibrate", "emergency_stop"], default="idle")
@param("verbose", type="bool", default=False)
async def set_mode(mode, verbose):
controller.set_mode(mode)
if verbose:
px.send("mode_change", mode, tags={"previous": controller.previous_mode})
return {"mode": mode, "status": "ok"}
@px.command("move_axis", description="Move linear axis to position")
@param("position", type="float", min=0, max=500, step=0.1, unit="mm", default=0)
@param("speed", type="float", min=1, max=100, step=1, unit="mm/s", default=10)
async def move_axis(position, speed):
axis.move_to(position, speed=speed)
while axis.is_moving():
px.send("axis.position", axis.read_position())
await asyncio.sleep(0.05)
return {"final_position": axis.read_position()}
# Start the agent from the command line:
# plexus runDashboard UI
When you register commands with @param decorators, the dashboard generates controls automatically:
float/intwithminandmaxrenders a sliderboolrenders a toggle switchenumwithchoicesrenders a dropdown menustringrenders a text input
Each command shows a "Send" button. The return value from your function is displayed in the dashboard as a JSON response.
Execution Flow
- You click "Send" in the dashboard (or call the API).
- The command is sent over WebSocket to the device via the PartyKit server.
- The agent validates all parameters against their constraints.
- If valid, the agent executes your function.
- The return value is sent back to the dashboard and displayed.
Dashboard → WebSocket → PartyKit → Device Agent
│
validate params
│
execute function
│
return result
│
Device Agent → PartyKit → WebSocket → DashboardIf validation fails (e.g., value out of range), the command is rejected before execution and the dashboard shows the error.
Sync and Async Handlers
Both synchronous and asynchronous command handlers are supported. The agent auto-detects coroutines and awaits them:
# Async handler
@px.command("read_sensor")
async def read_sensor():
value = await sensor.read_async()
return {"value": value}
# Sync handler — also works
@px.command("get_position")
def get_position():
return {"x": axis.x, "y": axis.y}Error Handling
If your command function raises an exception, the error message is sent back to the dashboard:
@px.command("set_speed", description="Set motor speed")
@param("rpm", type="float", min=0, max=10000)
async def set_speed(rpm):
if not motor.is_connected():
raise RuntimeError("Motor not connected")
motor.set_rpm(rpm)
return {"actual_rpm": motor.read_rpm()}The dashboard displays: Error: Motor not connected.
Next Steps
- CLI Reference — Run the agent from the command line
- Sending Data — Send telemetry alongside command responses