Pipeline/backend/app/services/openclaw/system_health.py

270 lines
9.4 KiB
Python

"""System health service — CPU, RAM, disk, uptime, and 24-hour history.
Calls the gateway ``health`` or ``status`` RPC methods and normalises
the response into structured health snapshots. A module-level
``HealthHistory`` instance accumulates snapshots per gateway for up to
24 hours.
Parser is deliberately defensive: unknown keys are ignored and all
numeric fields default to None when absent or non-numeric.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any
from app.core.logging import get_logger
from app.core.time import utcnow
from app.services.openclaw.gateway_rpc import (
GatewayConfig,
OpenClawGatewayError,
openclaw_call,
)
logger = get_logger(__name__)
DEFAULT_HISTORY_WINDOW_HOURS = 24
# ---------------------------------------------------------------------------
# HealthSnapshot
# ---------------------------------------------------------------------------
class HealthSnapshot:
"""Normalised point-in-time system health reading."""
__slots__ = (
"recorded_at",
"cpu_pct",
"memory_pct",
"memory_used_bytes",
"memory_total_bytes",
"disk_pct",
"disk_used_bytes",
"disk_total_bytes",
"uptime_seconds",
"load_avg_1m",
"load_avg_5m",
"load_avg_15m",
"hostname",
"platform",
)
def __init__(
self,
*,
recorded_at: datetime,
cpu_pct: float | None = None,
memory_pct: float | None = None,
memory_used_bytes: int | None = None,
memory_total_bytes: int | None = None,
disk_pct: float | None = None,
disk_used_bytes: int | None = None,
disk_total_bytes: int | None = None,
uptime_seconds: int | None = None,
load_avg_1m: float | None = None,
load_avg_5m: float | None = None,
load_avg_15m: float | None = None,
hostname: str | None = None,
platform: str | None = None,
) -> None:
self.recorded_at = recorded_at
self.cpu_pct = cpu_pct
self.memory_pct = memory_pct
self.memory_used_bytes = memory_used_bytes
self.memory_total_bytes = memory_total_bytes
self.disk_pct = disk_pct
self.disk_used_bytes = disk_used_bytes
self.disk_total_bytes = disk_total_bytes
self.uptime_seconds = uptime_seconds
self.load_avg_1m = load_avg_1m
self.load_avg_5m = load_avg_5m
self.load_avg_15m = load_avg_15m
self.hostname = hostname
self.platform = platform
def to_dict(self) -> dict[str, Any]:
return {
"recorded_at": self.recorded_at.isoformat(),
"cpu_pct": self.cpu_pct,
"memory_pct": self.memory_pct,
"memory_used_bytes": self.memory_used_bytes,
"memory_total_bytes": self.memory_total_bytes,
"disk_pct": self.disk_pct,
"disk_used_bytes": self.disk_used_bytes,
"disk_total_bytes": self.disk_total_bytes,
"uptime_seconds": self.uptime_seconds,
"load_avg_1m": self.load_avg_1m,
"load_avg_5m": self.load_avg_5m,
"load_avg_15m": self.load_avg_15m,
"hostname": self.hostname,
"platform": self.platform,
}
# ---------------------------------------------------------------------------
# Parser — pure function
# ---------------------------------------------------------------------------
def _float(d: dict[str, Any], *keys: str) -> float | None:
for k in keys:
v = d.get(k)
if v is not None:
try:
return float(v)
except (TypeError, ValueError):
pass
return None
def _int(d: dict[str, Any], *keys: str) -> int | None:
f = _float(d, *keys)
return int(f) if f is not None else None
def _pct_from_used_total(used: int | None, total: int | None) -> float | None:
if used is not None and total and total > 0:
return round(used / total * 100, 1)
return None
def parse_health_response(raw: object) -> HealthSnapshot:
"""Parse a gateway ``health`` / ``status`` response into a HealthSnapshot.
Never raises — returns an empty snapshot on any input.
"""
now = utcnow()
if not isinstance(raw, dict):
return HealthSnapshot(recorded_at=now)
# CPU — try nested block first, then top-level alt keys
cpu_block = raw.get("cpu")
if isinstance(cpu_block, dict) and cpu_block:
cpu_pct = _float(cpu_block, "usage", "percent", "pct")
load_avgs = cpu_block.get("loadAvg") or cpu_block.get("load_avg")
load_avg_1m = float(load_avgs[0]) if isinstance(load_avgs, list) and load_avgs else None
load_avg_5m = float(load_avgs[1]) if isinstance(load_avgs, list) and len(load_avgs) > 1 else None
load_avg_15m = float(load_avgs[2]) if isinstance(load_avgs, list) and len(load_avgs) > 2 else None
else:
cpu_pct = _float(raw, "cpuUsage", "cpu_usage", "cpu_pct", "cpu_percent")
load_avg_1m = load_avg_5m = load_avg_15m = None
# Memory — try nested block first, then top-level alt keys
mem_block = raw.get("memory") or raw.get("mem")
if isinstance(mem_block, dict) and mem_block:
mem_used = _int(mem_block, "used", "rss", "heapUsed")
mem_total = _int(mem_block, "total", "heapTotal")
mem_pct = _float(mem_block, "percent", "pct", "usage") or _pct_from_used_total(mem_used, mem_total)
else:
mem_used = _int(raw, "memUsed", "mem_used", "memory_used")
mem_total = _int(raw, "memTotal", "mem_total", "memory_total")
mem_pct = _pct_from_used_total(mem_used, mem_total)
# Disk — try nested block first, then top-level alt keys
disk_block = raw.get("disk") or raw.get("storage")
if isinstance(disk_block, dict) and disk_block:
disk_used = _int(disk_block, "used")
disk_total = _int(disk_block, "total")
disk_pct = _float(disk_block, "percent", "pct", "usage") or _pct_from_used_total(disk_used, disk_total)
else:
disk_used = _int(raw, "diskUsed", "disk_used")
disk_total = _int(raw, "diskTotal", "disk_total")
disk_pct = _pct_from_used_total(disk_used, disk_total)
# Uptime
uptime = _int(raw, "uptime", "uptimeSeconds", "uptime_seconds")
# Hostname / platform
hostname = raw.get("hostname") or raw.get("host")
platform = raw.get("platform") or raw.get("os")
return HealthSnapshot(
recorded_at=now,
cpu_pct=cpu_pct,
memory_pct=mem_pct,
memory_used_bytes=mem_used,
memory_total_bytes=mem_total,
disk_pct=disk_pct,
disk_used_bytes=disk_used,
disk_total_bytes=disk_total,
uptime_seconds=uptime,
load_avg_1m=load_avg_1m,
load_avg_5m=load_avg_5m,
load_avg_15m=load_avg_15m,
hostname=str(hostname) if hostname else None,
platform=str(platform) if platform else None,
)
# ---------------------------------------------------------------------------
# Rolling history
# ---------------------------------------------------------------------------
class HealthHistory:
"""In-memory rolling window of HealthSnapshots per gateway ID."""
def __init__(self, window_hours: int = DEFAULT_HISTORY_WINDOW_HOURS) -> None:
self.window_hours = window_hours
self._data: dict[str, list[HealthSnapshot]] = {}
def add(self, gateway_id: str, snapshot: HealthSnapshot) -> None:
"""Append a snapshot and prune entries outside the window."""
bucket = self._data.setdefault(gateway_id, [])
bucket.append(snapshot)
cutoff = utcnow() - timedelta(hours=self.window_hours)
self._data[gateway_id] = sorted(
[s for s in bucket if s.recorded_at >= cutoff],
key=lambda s: s.recorded_at,
)
def get(self, gateway_id: str) -> list[HealthSnapshot]:
"""Return snapshots oldest-first for the given gateway."""
return list(self._data.get(gateway_id, []))
def latest(self, gateway_id: str) -> HealthSnapshot | None:
"""Return the most recent snapshot for the given gateway."""
snaps = self._data.get(gateway_id)
return snaps[-1] if snaps else None
# ---------------------------------------------------------------------------
# Gateway fetch
# ---------------------------------------------------------------------------
# Module-level singleton — shared across requests
_history = HealthHistory()
async def fetch_health(
gateway_id: str,
config: GatewayConfig,
*,
record: bool = True,
) -> HealthSnapshot:
"""Fetch current health from the gateway and optionally store in history.
Returns an empty snapshot on any failure.
"""
raw: dict[str, Any] = {}
for method in ("health", "status"):
try:
result = await openclaw_call(method, config=config)
if isinstance(result, dict) and result:
raw = result
break
except (OpenClawGatewayError, TimeoutError, OSError, RuntimeError) as exc:
logger.debug("system_health.fetch_failed method=%s error=%s", method, exc)
except Exception as exc:
logger.warning("system_health.fetch_unexpected method=%s error=%s", method, exc)
snapshot = parse_health_response(raw)
if record:
_history.add(gateway_id, snapshot)
return snapshot
def get_history(gateway_id: str) -> list[HealthSnapshot]:
"""Return the 24-hour history for a gateway (oldest-first)."""
return _history.get(gateway_id)