Pipeline/backend/app/services/openclaw/runtime_activity.py

367 lines
12 KiB
Python

"""Runtime activity service — fetch and normalize recent gateway session messages.
Data source: gateway ``chat.history`` RPC (returns recent messages per session).
This is supplemental to the DB-backed activity feed; it shows what is happening
in active gateway sessions in near-real-time without requiring any writes.
Design notes
------------
- Polling: callers poll this service on an interval; it does not maintain state.
- Deduplication: based on ``(session_key, message_index)`` because chat.history
does not return stable message IDs.
- Redaction: known-sensitive tool argument names are blanked; large content is
truncated to a short preview.
- Authorization: callers must have already verified gateway ownership before
passing a config here.
"""
from __future__ import annotations
import hashlib
import re
from datetime import datetime
from typing import Any
from app.core.logging import get_logger
from app.core.time import utcnow
from app.services.openclaw.gateway_rpc import (
GatewayConfig,
OpenClawGatewayError,
get_chat_history,
openclaw_call,
)
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
CONTENT_PREVIEW_MAX = 300 # chars before truncation
HISTORY_FETCH_LIMIT = 20 # messages to fetch per session per poll
POLL_HISTORY_SESSIONS_MAX = 10 # max sessions to poll in one pass
# Argument names that should be fully redacted from tool call previews
_REDACT_TOOL_ARG_NAMES = frozenset(
{
"password", "passwd", "secret", "token", "api_key", "apikey",
"access_key", "private_key", "credential", "credentials",
"authorization", "bearer", "session_token", "refresh_token",
}
)
# Tool names whose entire input should be summarised rather than previewed
_SUMMARISE_TOOLS = frozenset({"bash", "computer", "str_replace_editor"})
# ---------------------------------------------------------------------------
# Normalized event type
# ---------------------------------------------------------------------------
class RuntimeMessageEvent:
"""Normalized representation of one gateway session message."""
__slots__ = (
"event_id",
"session_key",
"session_label",
"role",
"model",
"content_preview",
"content_truncated",
"has_tool_use",
"tool_names",
"timestamp",
"agent_id",
"board_id",
"message_index",
)
def __init__(
self,
*,
event_id: str,
session_key: str,
session_label: str | None,
role: str,
model: str | None,
content_preview: str,
content_truncated: bool,
has_tool_use: bool,
tool_names: list[str],
timestamp: datetime | None,
agent_id: str | None,
board_id: str | None,
message_index: int,
) -> None:
self.event_id = event_id
self.session_key = session_key
self.session_label = session_label
self.role = role
self.model = model
self.content_preview = content_preview
self.content_truncated = content_truncated
self.has_tool_use = has_tool_use
self.tool_names = tool_names
self.timestamp = timestamp
self.agent_id = agent_id
self.board_id = board_id
self.message_index = message_index
def to_dict(self) -> dict[str, Any]:
return {
"event_id": self.event_id,
"session_key": self.session_key,
"session_label": self.session_label,
"role": self.role,
"model": self.model,
"content_preview": self.content_preview,
"content_truncated": self.content_truncated,
"has_tool_use": self.has_tool_use,
"tool_names": self.tool_names,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"agent_id": self.agent_id,
"board_id": self.board_id,
"message_index": self.message_index,
}
# ---------------------------------------------------------------------------
# Content extraction and redaction
# ---------------------------------------------------------------------------
def _extract_text(content: object) -> tuple[str, bool]:
"""Return (preview_text, was_truncated) from a message content value."""
if content is None:
return "", False
if isinstance(content, str):
text = content
truncated = len(text) > CONTENT_PREVIEW_MAX
return text[:CONTENT_PREVIEW_MAX], truncated
if isinstance(content, list):
parts: list[str] = []
for block in content:
if not isinstance(block, dict):
continue
btype = block.get("type", "")
if btype == "text":
parts.append(str(block.get("text") or ""))
elif btype == "tool_use":
name = block.get("name", "tool")
if name in _SUMMARISE_TOOLS:
parts.append(f"[tool: {name}]")
else:
parts.append(f"[tool: {name}]")
elif btype == "tool_result":
result_content = block.get("content")
if isinstance(result_content, str):
parts.append(f"[result: {result_content[:80]}]")
else:
parts.append("[result]")
combined = " ".join(p for p in parts if p)
truncated = len(combined) > CONTENT_PREVIEW_MAX
return combined[:CONTENT_PREVIEW_MAX], truncated
return str(content)[:CONTENT_PREVIEW_MAX], False
def redact_tool_args(args: dict[str, Any]) -> dict[str, Any]:
"""Return a copy of tool args with sensitive keys replaced by ``[REDACTED]``."""
if not isinstance(args, dict):
return {}
result: dict[str, Any] = {}
for key, value in args.items():
if key.lower() in _REDACT_TOOL_ARG_NAMES:
result[key] = "[REDACTED]"
elif isinstance(value, str) and len(value) > 500:
result[key] = value[:200] + "…[truncated]"
else:
result[key] = value
return result
def _collect_tool_names(content: object) -> list[str]:
"""Return names of all tool_use blocks in a message content."""
if not isinstance(content, list):
return []
return [
str(block.get("name") or "unknown")
for block in content
if isinstance(block, dict) and block.get("type") == "tool_use"
]
def _parse_timestamp(msg: dict[str, Any]) -> datetime | None:
for key in ("timestamp", "created_at", "createdAt", "time"):
val = msg.get(key)
if isinstance(val, str) and val.strip():
try:
normalized = val.strip().replace("Z", "+00:00")
from datetime import timezone
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is not None:
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
return parsed
except ValueError:
pass
return None
# ---------------------------------------------------------------------------
# Session key → agent/board correlation
# ---------------------------------------------------------------------------
_LEAD_SESSION_RE = re.compile(
r"^agent:lead-(?P<board_id>[0-9a-fA-F-]{36}):main$"
)
_AGENT_SESSION_RE = re.compile(
r"^agent:(?P<agent_slug>[^:]+):(?:main|board-(?P<board_id>[0-9a-fA-F-]{36}))$"
)
def _correlate_session(session_key: str) -> tuple[str | None, str | None]:
"""Return (agent_slug_or_none, board_id_or_none) inferred from the session key."""
lead_m = _LEAD_SESSION_RE.match(session_key)
if lead_m:
return None, lead_m.group("board_id")
agent_m = _AGENT_SESSION_RE.match(session_key)
if agent_m:
return agent_m.group("agent_slug"), agent_m.group("board_id")
return None, None
# ---------------------------------------------------------------------------
# Message normaliser
# ---------------------------------------------------------------------------
def normalize_message(
session_key: str,
session_label: str | None,
msg: dict[str, Any],
index: int,
) -> RuntimeMessageEvent:
"""Convert one raw chat history message into a RuntimeMessageEvent."""
role = str(msg.get("role") or "unknown")
model = msg.get("model") or None
if model:
model = str(model)
content = msg.get("content")
preview, truncated = _extract_text(content)
tool_names = _collect_tool_names(content)
ts = _parse_timestamp(msg)
agent_id, board_id = _correlate_session(session_key)
# Stable deduplication key
event_id = hashlib.sha256(
f"{session_key}:{index}:{role}:{preview[:50]}".encode()
).hexdigest()[:16]
return RuntimeMessageEvent(
event_id=event_id,
session_key=session_key,
session_label=session_label,
role=role,
model=model,
content_preview=preview,
content_truncated=truncated,
has_tool_use=bool(tool_names),
tool_names=tool_names,
timestamp=ts,
agent_id=agent_id,
board_id=board_id,
message_index=index,
)
# ---------------------------------------------------------------------------
# Gateway data fetching
# ---------------------------------------------------------------------------
async def _safe_chat_history(
session_key: str,
config: GatewayConfig,
limit: int = HISTORY_FETCH_LIMIT,
) -> list[dict[str, Any]]:
"""Fetch chat history for one session; return [] on any error."""
try:
raw = await get_chat_history(session_key, config, limit=limit)
if isinstance(raw, dict):
messages = raw.get("messages") or raw.get("history") or []
elif isinstance(raw, list):
messages = raw
else:
return []
return [m for m in messages if isinstance(m, dict)]
except (OpenClawGatewayError, TimeoutError, OSError, RuntimeError) as exc:
logger.debug(
"runtime_activity.history_fetch_failed session_key=%s error=%s",
session_key,
exc,
)
return []
async def _list_active_sessions(config: GatewayConfig) -> list[dict[str, Any]]:
"""Return list of active session dicts from the gateway."""
try:
raw = await openclaw_call("sessions.list", {"limit": 50}, config=config)
if isinstance(raw, dict):
return [s for s in (raw.get("sessions") or []) if isinstance(s, dict)]
if isinstance(raw, list):
return [s for s in raw if isinstance(s, dict)]
except (OpenClawGatewayError, TimeoutError, OSError, RuntimeError) as exc:
logger.debug("runtime_activity.sessions_list_failed error=%s", exc)
return []
# ---------------------------------------------------------------------------
# Main poll function
# ---------------------------------------------------------------------------
async def fetch_recent_events(
config: GatewayConfig,
*,
since_ids: set[str] | None = None,
max_sessions: int = POLL_HISTORY_SESSIONS_MAX,
history_limit: int = HISTORY_FETCH_LIMIT,
) -> list[RuntimeMessageEvent]:
"""Fetch and normalize recent messages across all active sessions.
Args:
config: Gateway credentials/URL.
since_ids: Set of event_ids already seen; new events not in this set
are returned. Pass ``None`` for the initial load (returns all).
max_sessions: Cap on how many sessions to query per call.
history_limit: Number of messages to fetch per session.
Returns:
List of new RuntimeMessageEvent objects, oldest-first.
"""
sessions = await _list_active_sessions(config)
sessions = sessions[:max_sessions]
events: list[RuntimeMessageEvent] = []
for session in sessions:
key = session.get("key") or session.get("id")
if not isinstance(key, str) or not key.strip():
continue
label = session.get("label") or session.get("name") or None
messages = await _safe_chat_history(key, config, limit=history_limit)
for idx, msg in enumerate(messages):
event = normalize_message(key, label, msg, idx)
if since_ids is None or event.event_id not in since_ids:
events.append(event)
# Sort by timestamp (nulls last), then by session_key + index for stability
def sort_key(e: RuntimeMessageEvent) -> tuple:
return (
e.timestamp or datetime.min,
e.session_key,
e.message_index,
)
events.sort(key=sort_key)
return events