Adapters
MQTT

MQTT Bridge

The MQTT adapter subscribes to MQTT topics and forwards messages as Plexus metrics. It automatically parses JSON, numeric, and string payloads.

Install

pip install plexus-agent[mqtt]

Quick Start

CLI

plexus run --mqtt localhost:1883
plexus run --mqtt localhost:1883 --mqtt-topic "sensors/#"

Python

from plexus.adapters import MQTTAdapter
 
adapter = MQTTAdapter(broker="localhost", topic="sensors/#")
adapter.connect()
adapter.run()

Parameters

ParameterTypeDefaultDescription
brokerstr"localhost"MQTT broker hostname
portint1883MQTT broker port
topicstr"#"Topic pattern to subscribe to
usernamestrNoneBroker authentication username
passwordstrNoneBroker authentication password
use_tlsboolFalseEnable TLS encryption
client_idstrNoneMQTT client ID (auto-generated if omitted)
prefixstr""Topic prefix to strip from metric names
qosint0Quality of Service level (0, 1, or 2)

Topic Patterns

Standard MQTT wildcards are supported:

PatternMatches
#All topics (default)
sensors/#All topics under sensors/
home/+/temperatureSingle-level wildcard (home/kitchen/temperature, home/bedroom/temperature)
factory/line1/motor3Exact topic match

Payload Parsing

The adapter automatically detects the payload format and converts it to metrics:

JSON objects — each key becomes a separate metric:

Topic: "device/status"
Payload: {"temp": 72.5, "humidity": 45}
→ Metric: "device.status.temp" = 72.5
→ Metric: "device.status.humidity" = 45

Numeric values — sent as a single metric:

Topic: "sensors/temperature"
Payload: "72.5"
→ Metric: "sensors.temperature" = 72.5

Strings — sent as a string metric:

Topic: "device/state"
Payload: "RUNNING"
→ Metric: "device.state" = "RUNNING"

Topic names are converted to metric names by replacing / with . and stripping any leading/trailing dots.

Prefix Stripping

Use prefix to remove a common topic prefix from metric names:

adapter = MQTTAdapter(
    broker="localhost",
    topic="factory/line1/#",
    prefix="factory/line1/",
)
 
# Topic "factory/line1/motor/rpm" → Metric "motor.rpm"

With Callback

def handle_data(metrics):
    for m in metrics:
        print(f"{m.name}: {m.value}")
 
adapter = MQTTAdapter(broker="localhost", topic="sensors/#")
adapter.connect()
adapter.run(on_data=handle_data)

With Authentication

adapter = MQTTAdapter(
    broker="mqtt.example.com",
    port=8883,
    topic="home/+/temperature",
    username="user",
    password="pass",
    use_tls=True,
)
adapter.connect()
adapter.run()

Non-Blocking Mode

adapter = MQTTAdapter(broker="localhost")
adapter.connect()
adapter.run(blocking=False)  # Returns immediately
 
# Do other work...
 
metrics = adapter.poll()  # Check for pending metrics
adapter.disconnect()

Next Steps

  • CAN Bus -- Read CAN bus data with DBC decoding
  • MAVLink -- Connect drones and autonomous vehicles
  • CLI Reference -- Full list of plexus commands and flags