"""Runtime activity service — fetch and normalize recent gateway session messages. Data source: gateway ``chat.history`` RPC (returns recent messages per session). This is supplemental to the DB-backed activity feed; it shows what is happening in active gateway sessions in near-real-time without requiring any writes. Design notes ------------ - Polling: callers poll this service on an interval; it does not maintain state. - Deduplication: based on ``(session_key, message_index)`` because chat.history does not return stable message IDs. - Redaction: known-sensitive tool argument names are blanked; large content is truncated to a short preview. - Authorization: callers must have already verified gateway ownership before passing a config here. """ from __future__ import annotations import hashlib import re from datetime import datetime from typing import Any from app.core.logging import get_logger from app.core.time import utcnow from app.services.openclaw.gateway_rpc import ( GatewayConfig, OpenClawGatewayError, get_chat_history, openclaw_call, ) logger = get_logger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- CONTENT_PREVIEW_MAX = 300 # chars before truncation HISTORY_FETCH_LIMIT = 20 # messages to fetch per session per poll POLL_HISTORY_SESSIONS_MAX = 10 # max sessions to poll in one pass # Argument names that should be fully redacted from tool call previews _REDACT_TOOL_ARG_NAMES = frozenset( { "password", "passwd", "secret", "token", "api_key", "apikey", "access_key", "private_key", "credential", "credentials", "authorization", "bearer", "session_token", "refresh_token", } ) # Tool names whose entire input should be summarised rather than previewed _SUMMARISE_TOOLS = frozenset({"bash", "computer", "str_replace_editor"}) # --------------------------------------------------------------------------- # Normalized event type # --------------------------------------------------------------------------- class RuntimeMessageEvent: """Normalized representation of one gateway session message.""" __slots__ = ( "event_id", "session_key", "session_label", "role", "model", "content_preview", "content_truncated", "has_tool_use", "tool_names", "timestamp", "agent_id", "board_id", "message_index", ) def __init__( self, *, event_id: str, session_key: str, session_label: str | None, role: str, model: str | None, content_preview: str, content_truncated: bool, has_tool_use: bool, tool_names: list[str], timestamp: datetime | None, agent_id: str | None, board_id: str | None, message_index: int, ) -> None: self.event_id = event_id self.session_key = session_key self.session_label = session_label self.role = role self.model = model self.content_preview = content_preview self.content_truncated = content_truncated self.has_tool_use = has_tool_use self.tool_names = tool_names self.timestamp = timestamp self.agent_id = agent_id self.board_id = board_id self.message_index = message_index def to_dict(self) -> dict[str, Any]: return { "event_id": self.event_id, "session_key": self.session_key, "session_label": self.session_label, "role": self.role, "model": self.model, "content_preview": self.content_preview, "content_truncated": self.content_truncated, "has_tool_use": self.has_tool_use, "tool_names": self.tool_names, "timestamp": self.timestamp.isoformat() if self.timestamp else None, "agent_id": self.agent_id, "board_id": self.board_id, "message_index": self.message_index, } # --------------------------------------------------------------------------- # Content extraction and redaction # --------------------------------------------------------------------------- def _extract_text(content: object) -> tuple[str, bool]: """Return (preview_text, was_truncated) from a message content value.""" if content is None: return "", False if isinstance(content, str): text = content truncated = len(text) > CONTENT_PREVIEW_MAX return text[:CONTENT_PREVIEW_MAX], truncated if isinstance(content, list): parts: list[str] = [] for block in content: if not isinstance(block, dict): continue btype = block.get("type", "") if btype == "text": parts.append(str(block.get("text") or "")) elif btype == "tool_use": name = block.get("name", "tool") if name in _SUMMARISE_TOOLS: parts.append(f"[tool: {name}]") else: parts.append(f"[tool: {name}]") elif btype == "tool_result": result_content = block.get("content") if isinstance(result_content, str): parts.append(f"[result: {result_content[:80]}]") else: parts.append("[result]") combined = " ".join(p for p in parts if p) truncated = len(combined) > CONTENT_PREVIEW_MAX return combined[:CONTENT_PREVIEW_MAX], truncated return str(content)[:CONTENT_PREVIEW_MAX], False def redact_tool_args(args: dict[str, Any]) -> dict[str, Any]: """Return a copy of tool args with sensitive keys replaced by ``[REDACTED]``.""" if not isinstance(args, dict): return {} result: dict[str, Any] = {} for key, value in args.items(): if key.lower() in _REDACT_TOOL_ARG_NAMES: result[key] = "[REDACTED]" elif isinstance(value, str) and len(value) > 500: result[key] = value[:200] + "…[truncated]" else: result[key] = value return result def _collect_tool_names(content: object) -> list[str]: """Return names of all tool_use blocks in a message content.""" if not isinstance(content, list): return [] return [ str(block.get("name") or "unknown") for block in content if isinstance(block, dict) and block.get("type") == "tool_use" ] def _parse_timestamp(msg: dict[str, Any]) -> datetime | None: for key in ("timestamp", "created_at", "createdAt", "time"): val = msg.get(key) if isinstance(val, str) and val.strip(): try: normalized = val.strip().replace("Z", "+00:00") from datetime import timezone parsed = datetime.fromisoformat(normalized) if parsed.tzinfo is not None: return parsed.astimezone(timezone.utc).replace(tzinfo=None) return parsed except ValueError: pass return None # --------------------------------------------------------------------------- # Session key → agent/board correlation # --------------------------------------------------------------------------- _LEAD_SESSION_RE = re.compile( r"^agent:lead-(?P[0-9a-fA-F-]{36}):main$" ) _AGENT_SESSION_RE = re.compile( r"^agent:(?P[^:]+):(?:main|board-(?P[0-9a-fA-F-]{36}))$" ) def _correlate_session(session_key: str) -> tuple[str | None, str | None]: """Return (agent_slug_or_none, board_id_or_none) inferred from the session key.""" lead_m = _LEAD_SESSION_RE.match(session_key) if lead_m: return None, lead_m.group("board_id") agent_m = _AGENT_SESSION_RE.match(session_key) if agent_m: return agent_m.group("agent_slug"), agent_m.group("board_id") return None, None # --------------------------------------------------------------------------- # Message normaliser # --------------------------------------------------------------------------- def normalize_message( session_key: str, session_label: str | None, msg: dict[str, Any], index: int, ) -> RuntimeMessageEvent: """Convert one raw chat history message into a RuntimeMessageEvent.""" role = str(msg.get("role") or "unknown") model = msg.get("model") or None if model: model = str(model) content = msg.get("content") preview, truncated = _extract_text(content) tool_names = _collect_tool_names(content) ts = _parse_timestamp(msg) agent_id, board_id = _correlate_session(session_key) # Stable deduplication key event_id = hashlib.sha256( f"{session_key}:{index}:{role}:{preview[:50]}".encode() ).hexdigest()[:16] return RuntimeMessageEvent( event_id=event_id, session_key=session_key, session_label=session_label, role=role, model=model, content_preview=preview, content_truncated=truncated, has_tool_use=bool(tool_names), tool_names=tool_names, timestamp=ts, agent_id=agent_id, board_id=board_id, message_index=index, ) # --------------------------------------------------------------------------- # Gateway data fetching # --------------------------------------------------------------------------- async def _safe_chat_history( session_key: str, config: GatewayConfig, limit: int = HISTORY_FETCH_LIMIT, ) -> list[dict[str, Any]]: """Fetch chat history for one session; return [] on any error.""" try: raw = await get_chat_history(session_key, config, limit=limit) if isinstance(raw, dict): messages = raw.get("messages") or raw.get("history") or [] elif isinstance(raw, list): messages = raw else: return [] return [m for m in messages if isinstance(m, dict)] except (OpenClawGatewayError, TimeoutError, OSError, RuntimeError) as exc: logger.debug( "runtime_activity.history_fetch_failed session_key=%s error=%s", session_key, exc, ) return [] async def _list_active_sessions(config: GatewayConfig) -> list[dict[str, Any]]: """Return list of active session dicts from the gateway.""" try: raw = await openclaw_call("sessions.list", {"limit": 50}, config=config) if isinstance(raw, dict): return [s for s in (raw.get("sessions") or []) if isinstance(s, dict)] if isinstance(raw, list): return [s for s in raw if isinstance(s, dict)] except (OpenClawGatewayError, TimeoutError, OSError, RuntimeError) as exc: logger.debug("runtime_activity.sessions_list_failed error=%s", exc) return [] # --------------------------------------------------------------------------- # Main poll function # --------------------------------------------------------------------------- async def fetch_recent_events( config: GatewayConfig, *, since_ids: set[str] | None = None, max_sessions: int = POLL_HISTORY_SESSIONS_MAX, history_limit: int = HISTORY_FETCH_LIMIT, ) -> list[RuntimeMessageEvent]: """Fetch and normalize recent messages across all active sessions. Args: config: Gateway credentials/URL. since_ids: Set of event_ids already seen; new events not in this set are returned. Pass ``None`` for the initial load (returns all). max_sessions: Cap on how many sessions to query per call. history_limit: Number of messages to fetch per session. Returns: List of new RuntimeMessageEvent objects, oldest-first. """ sessions = await _list_active_sessions(config) sessions = sessions[:max_sessions] events: list[RuntimeMessageEvent] = [] for session in sessions: key = session.get("key") or session.get("id") if not isinstance(key, str) or not key.strip(): continue label = session.get("label") or session.get("name") or None messages = await _safe_chat_history(key, config, limit=history_limit) for idx, msg in enumerate(messages): event = normalize_message(key, label, msg, idx) if since_ids is None or event.event_id not in since_ids: events.append(event) # Sort by timestamp (nulls last), then by session_key + index for stability def sort_key(e: RuntimeMessageEvent) -> tuple: return ( e.timestamp or datetime.min, e.session_key, e.message_index, ) events.sort(key=sort_key) return events