Python Agent
Commands

Commands & Remote Control

Commands let you send instructions from the Plexus dashboard to your device in real time. You define commands in Python with type annotations, and the dashboard auto-generates UI controls -- sliders, dropdowns, toggles, and text inputs.

Defining a Command

Use the @px.command decorator to register a function as a remote command. Use @param to define typed parameters with constraints.

from plexus import Plexus, param
 
px = Plexus()
 
@px.command("set_speed", description="Set motor speed")
@param("rpm", type="float", min=0, max=10000, step=50, unit="rpm", default=0)
async def set_speed(rpm):
    motor.set_rpm(rpm)
    return {"actual_rpm": motor.read_rpm()}

When you register this command, the dashboard shows a slider labeled "rpm" with a range of 0--10000 and a unit suffix.

Parameter Types

TypeDashboard ControlExample
floatSlider or number inputSpeed, temperature setpoint
intSlider or number input (whole numbers)Servo angle, step count
stringText inputFile path, label
boolToggle switchEnable/disable, on/off
enumDropdown menuOperating mode, gear selection

Parameter Constraints

Use @param to define validation rules. The dashboard enforces these constraints in the UI, and the agent validates them server-side before executing.

ConstraintTypeDescription
minfloat | intMinimum allowed value
maxfloat | intMaximum allowed value
stepfloat | intStep size for slider increments
unitstrDisplay unit (e.g., "rpm", "°C", "mm/s")
choiceslistValid options for enum type
defaultanyDefault value shown in the dashboard
descriptionstrHuman-readable description of the parameter
requiredboolWhether the parameter is required (default True; auto-set to False when default is provided)

Full Example

Here is a complete example with two commands using different parameter types:

from plexus import Plexus, param
import asyncio
 
px = Plexus()
 
@px.command("set_speed", description="Set motor speed")
@param("rpm", type="float", min=0, max=10000, step=50, unit="rpm", default=0)
async def set_speed(rpm):
    motor.set_rpm(rpm)
    await asyncio.sleep(0.5)  # Wait for motor to reach target
    actual = motor.read_rpm()
    return {"actual_rpm": actual, "at_target": abs(actual - rpm) < 50}
 
 
@px.command("set_mode", description="Set operating mode")
@param("mode", type="enum", choices=["idle", "run", "calibrate", "emergency_stop"], default="idle")
@param("verbose", type="bool", default=False)
async def set_mode(mode, verbose):
    controller.set_mode(mode)
    if verbose:
        px.send("mode_change", mode, tags={"previous": controller.previous_mode})
    return {"mode": mode, "status": "ok"}
 
 
@px.command("move_axis", description="Move linear axis to position")
@param("position", type="float", min=0, max=500, step=0.1, unit="mm", default=0)
@param("speed", type="float", min=1, max=100, step=1, unit="mm/s", default=10)
async def move_axis(position, speed):
    axis.move_to(position, speed=speed)
    while axis.is_moving():
        px.send("axis.position", axis.read_position())
        await asyncio.sleep(0.05)
    return {"final_position": axis.read_position()}
 
 
# Start the agent from the command line:
#   plexus run

Dashboard UI

When you register commands with @param decorators, the dashboard generates controls automatically:

  • float / int with min and max renders a slider
  • bool renders a toggle switch
  • enum with choices renders a dropdown menu
  • string renders a text input

Each command shows a "Send" button. The return value from your function is displayed in the dashboard as a JSON response.

Execution Flow

  1. You click "Send" in the dashboard (or call the API).
  2. The command is sent over WebSocket to the device via the PartyKit server.
  3. The agent validates all parameters against their constraints.
  4. If valid, the agent executes your function.
  5. The return value is sent back to the dashboard and displayed.
Dashboard → WebSocket → PartyKit → Device Agent

                                  validate params

                                  execute function

                                  return result

Device Agent → PartyKit → WebSocket → Dashboard

If validation fails (e.g., value out of range), the command is rejected before execution and the dashboard shows the error.

Sync and Async Handlers

Both synchronous and asynchronous command handlers are supported. The agent auto-detects coroutines and awaits them:

# Async handler
@px.command("read_sensor")
async def read_sensor():
    value = await sensor.read_async()
    return {"value": value}
 
# Sync handler — also works
@px.command("get_position")
def get_position():
    return {"x": axis.x, "y": axis.y}

Error Handling

If your command function raises an exception, the error message is sent back to the dashboard:

@px.command("set_speed", description="Set motor speed")
@param("rpm", type="float", min=0, max=10000)
async def set_speed(rpm):
    if not motor.is_connected():
        raise RuntimeError("Motor not connected")
    motor.set_rpm(rpm)
    return {"actual_rpm": motor.read_rpm()}

The dashboard displays: Error: Motor not connected.

Next Steps