Anomaly Detection in 100 Lines of Python
Rolling z-score anomaly detection on a Plexus telemetry stream — about 75 lines of Python, no ML. It polls a metric, computes a 30-second rolling z-score, and posts temperature.zscore back when three consecutive samples exceed |z| > 3, so an existing threshold monitor (and your Slack integration) fires.
Prerequisites
- The ESP32 → alerts pipeline running and pushing
temperature. - Python 3.8+ and an API key from your
/dev/keyspage.
Register the monitor
One threshold on the new derived metric so the Slack integration fires:
export PLEXUS_API_KEY=plx_xxx
curl -sS -X POST https://app.plexus.company/api/monitors \
-H "x-api-key: $PLEXUS_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"source_id": "esp32-bme280",
"metric": "temperature.zscore",
"threshold": { "max": 3.0, "severity": "warning", "message": "BME280 deviating from baseline" }
}'The detector
import collections, math, os, time
import requests
from plexus import Plexus
SOURCE_ID = "esp32-bme280"
METRIC = "temperature"
WINDOW_SAMPLES = 15 # 30s at a 2s cadence
Z_THRESHOLD = 3.0
CONSECUTIVE = 3
POLL_S = 1.0
API_BASE = os.environ.get("PLEXUS_ENDPOINT", "https://app.plexus.company")
session = requests.Session()
session.headers["x-api-key"] = os.environ["PLEXUS_API_KEY"]
px = Plexus(source_id=SOURCE_ID)
window = collections.deque(maxlen=WINDOW_SAMPLES)
def zscore(values):
if len(values) < 5:
return 0.0
mean = sum(values) / len(values)
std = math.sqrt(sum((v - mean) ** 2 for v in values) / len(values))
return 0.0 if std < 0.01 else (values[-1] - mean) / std
def fetch_recent():
r = session.get(f"{API_BASE}/api/v1/telemetry",
params={"source": SOURCE_ID, "metric": METRIC, "limit": 200}, timeout=5)
r.raise_for_status()
return sorted(r.json()["data"], key=lambda x: x["timestamp"])
def main():
last_ts, consecutive = None, 0
while True:
for row in fetch_recent():
ts = row["timestamp"]
if last_ts is not None and ts <= last_ts:
continue
last_ts = ts
window.append(float(row["value"]))
z = zscore(list(window))
consecutive = consecutive + 1 if abs(z) > Z_THRESHOLD else 0
px.send("temperature.zscore", abs(z) if consecutive >= CONSECUTIVE else 0.0)
time.sleep(POLL_S)
if __name__ == "__main__":
main()pip install plexus-python requests
export PLEXUS_API_KEY=plx_xxx
python detector.pyPress the BME280 between your fingers; within ~6 seconds of the streak holding, Slack lands the alert. Release it and Plexus emits alert.resolved.
Tuning knobs
All at the top of the script:
| Constant | Default | What it does |
|---|---|---|
WINDOW_SAMPLES | 15 | rolling window size in samples |
Z_THRESHOLD | 3.0 | how many σ from the mean counts as anomalous |
CONSECUTIVE | 3 | samples in a row needed before firing |
POLL_S | 1.0 | how often to query Plexus for new data |
Change SOURCE_ID and METRIC to point it at anything — humidity, pressure, motor RPM, battery voltage. One process per metric.
Last updated on