MQTT Bridge
The MQTT adapter subscribes to MQTT topics and forwards messages as Plexus metrics. It automatically parses JSON, numeric, and string payloads.
Install
pip install plexus-agent[mqtt]Quick Start
CLI
plexus run --mqtt localhost:1883
plexus run --mqtt localhost:1883 --mqtt-topic "sensors/#"Python
from plexus.adapters import MQTTAdapter
adapter = MQTTAdapter(broker="localhost", topic="sensors/#")
adapter.connect()
adapter.run()Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
broker | str | "localhost" | MQTT broker hostname |
port | int | 1883 | MQTT broker port |
topic | str | "#" | Topic pattern to subscribe to |
username | str | None | Broker authentication username |
password | str | None | Broker authentication password |
use_tls | bool | False | Enable TLS encryption |
client_id | str | None | MQTT client ID (auto-generated if omitted) |
prefix | str | "" | Topic prefix to strip from metric names |
qos | int | 0 | Quality of Service level (0, 1, or 2) |
Topic Patterns
Standard MQTT wildcards are supported:
| Pattern | Matches |
|---|---|
# | All topics (default) |
sensors/# | All topics under sensors/ |
home/+/temperature | Single-level wildcard (home/kitchen/temperature, home/bedroom/temperature) |
factory/line1/motor3 | Exact topic match |
Payload Parsing
The adapter automatically detects the payload format and converts it to metrics:
JSON objects — each key becomes a separate metric:
Topic: "device/status"
Payload: {"temp": 72.5, "humidity": 45}
→ Metric: "device.status.temp" = 72.5
→ Metric: "device.status.humidity" = 45Numeric values — sent as a single metric:
Topic: "sensors/temperature"
Payload: "72.5"
→ Metric: "sensors.temperature" = 72.5Strings — sent as a string metric:
Topic: "device/state"
Payload: "RUNNING"
→ Metric: "device.state" = "RUNNING"Topic names are converted to metric names by replacing / with . and stripping any leading/trailing dots.
Prefix Stripping
Use prefix to remove a common topic prefix from metric names:
adapter = MQTTAdapter(
broker="localhost",
topic="factory/line1/#",
prefix="factory/line1/",
)
# Topic "factory/line1/motor/rpm" → Metric "motor.rpm"With Callback
def handle_data(metrics):
for m in metrics:
print(f"{m.name}: {m.value}")
adapter = MQTTAdapter(broker="localhost", topic="sensors/#")
adapter.connect()
adapter.run(on_data=handle_data)With Authentication
adapter = MQTTAdapter(
broker="mqtt.example.com",
port=8883,
topic="home/+/temperature",
username="user",
password="pass",
use_tls=True,
)
adapter.connect()
adapter.run()Non-Blocking Mode
adapter = MQTTAdapter(broker="localhost")
adapter.connect()
adapter.run(blocking=False) # Returns immediately
# Do other work...
metrics = adapter.poll() # Check for pending metrics
adapter.disconnect()Next Steps
- CAN Bus -- Read CAN bus data with DBC decoding
- MAVLink -- Connect drones and autonomous vehicles
- CLI Reference -- Full list of
plexuscommands and flags