Skip to Content
GuidesAnomaly detection in 100 lines

Anomaly Detection in 100 Lines of Python

Rolling z-score anomaly detection on a Plexus telemetry stream — about 75 lines of Python, no ML. It polls a metric, computes a 30-second rolling z-score, and posts temperature.zscore back when three consecutive samples exceed |z| > 3, so an existing threshold monitor (and your Slack integration) fires.

Prerequisites

  • The ESP32 → alerts pipeline running and pushing temperature.
  • Python 3.8+ and an API key from your /dev/keys page.

Register the monitor

One threshold on the new derived metric so the Slack integration fires:

export PLEXUS_API_KEY=plx_xxx curl -sS -X POST https://app.plexus.company/api/monitors \ -H "x-api-key: $PLEXUS_API_KEY" \ -H "Content-Type: application/json" \ -d '{ "source_id": "esp32-bme280", "metric": "temperature.zscore", "threshold": { "max": 3.0, "severity": "warning", "message": "BME280 deviating from baseline" } }'

The detector

import collections, math, os, time import requests from plexus import Plexus SOURCE_ID = "esp32-bme280" METRIC = "temperature" WINDOW_SAMPLES = 15 # 30s at a 2s cadence Z_THRESHOLD = 3.0 CONSECUTIVE = 3 POLL_S = 1.0 API_BASE = os.environ.get("PLEXUS_ENDPOINT", "https://app.plexus.company") session = requests.Session() session.headers["x-api-key"] = os.environ["PLEXUS_API_KEY"] px = Plexus(source_id=SOURCE_ID) window = collections.deque(maxlen=WINDOW_SAMPLES) def zscore(values): if len(values) < 5: return 0.0 mean = sum(values) / len(values) std = math.sqrt(sum((v - mean) ** 2 for v in values) / len(values)) return 0.0 if std < 0.01 else (values[-1] - mean) / std def fetch_recent(): r = session.get(f"{API_BASE}/api/v1/telemetry", params={"source": SOURCE_ID, "metric": METRIC, "limit": 200}, timeout=5) r.raise_for_status() return sorted(r.json()["data"], key=lambda x: x["timestamp"]) def main(): last_ts, consecutive = None, 0 while True: for row in fetch_recent(): ts = row["timestamp"] if last_ts is not None and ts <= last_ts: continue last_ts = ts window.append(float(row["value"])) z = zscore(list(window)) consecutive = consecutive + 1 if abs(z) > Z_THRESHOLD else 0 px.send("temperature.zscore", abs(z) if consecutive >= CONSECUTIVE else 0.0) time.sleep(POLL_S) if __name__ == "__main__": main()
pip install plexus-python requests export PLEXUS_API_KEY=plx_xxx python detector.py

Press the BME280 between your fingers; within ~6 seconds of the streak holding, Slack lands the alert. Release it and Plexus emits alert.resolved.

Tuning knobs

All at the top of the script:

ConstantDefaultWhat it does
WINDOW_SAMPLES15rolling window size in samples
Z_THRESHOLD3.0how many σ from the mean counts as anomalous
CONSECUTIVE3samples in a row needed before firing
POLL_S1.0how often to query Plexus for new data

Change SOURCE_ID and METRIC to point it at anything — humidity, pressure, motor RPM, battery voltage. One process per metric.

Last updated on