diff --git a/backend/app/api/gateways.py b/backend/app/api/gateways.py index a1a969d..445155f 100644 --- a/backend/app/api/gateways.py +++ b/backend/app/api/gateways.py @@ -2,11 +2,14 @@ from __future__ import annotations +import asyncio +import json from typing import TYPE_CHECKING from uuid import UUID, uuid4 -from fastapi import APIRouter, Depends, Query +from fastapi import APIRouter, Depends, Query, Request from sqlmodel import col +from sse_starlette.sse import EventSourceResponse from app.api.deps import require_org_admin, require_org_member from app.core.auth import AuthContext, get_auth_context @@ -33,6 +36,11 @@ from app.schemas.runtime_usage import ( ProviderUsageScrapeResult, RuntimeUsageResponse, ) +from app.services.openclaw.runtime_activity import ( + HISTORY_FETCH_LIMIT, + POLL_HISTORY_SESSIONS_MAX, + fetch_recent_events, +) from app.services.openclaw.runtime_usage import get_runtime_usage from app.services.openclaw.usage_scrapers import get_provider_usage from app.schemas.pagination import DefaultLimitOffsetPage @@ -352,6 +360,115 @@ async def get_gateway_provider_usage( ) +@router.get( + "/{gateway_id}/runtime-activity", + summary="Recent gateway runtime messages (REST snapshot)", + description=( + "Return the most recent messages from all active sessions on the gateway, " + "normalised and redacted. Use the streaming endpoint for a live feed." + ), +) +async def get_gateway_runtime_activity( + gateway_id: UUID, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> dict: + """Non-streaming snapshot of recent gateway activity.""" + from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig + + service = GatewayAdminLifecycleService(session) + gateway = await service.require_gateway( + gateway_id=gateway_id, + organization_id=ctx.organization.id, + ) + config = GatewayClientConfig( + url=gateway.url, + token=gateway.token, + allow_insecure_tls=gateway.allow_insecure_tls, + disable_device_pairing=gateway.disable_device_pairing, + ) + events = await fetch_recent_events( + config, + max_sessions=POLL_HISTORY_SESSIONS_MAX, + history_limit=HISTORY_FETCH_LIMIT, + ) + return { + "gateway_id": str(gateway_id), + "generated_at": utcnow().isoformat(), + "events": [e.to_dict() for e in events], + } + + +_RUNTIME_ACTIVITY_POLL_SECONDS = 4 +_RUNTIME_ACTIVITY_SEEN_MAX = 1000 + + +@router.get( + "/{gateway_id}/runtime-activity/stream", + summary="Live gateway runtime activity (SSE)", + description=( + "Stream normalised gateway session messages as server-sent events. " + "Each event is of type ``runtime_message``. The stream polls the " + "gateway every few seconds and emits only new messages." + ), +) +async def stream_gateway_runtime_activity( + request: Request, + gateway_id: UUID, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> EventSourceResponse: + """SSE stream of live gateway session messages.""" + from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig + + service = GatewayAdminLifecycleService(session) + gateway = await service.require_gateway( + gateway_id=gateway_id, + organization_id=ctx.organization.id, + ) + config = GatewayClientConfig( + url=gateway.url, + token=gateway.token, + allow_insecure_tls=gateway.allow_insecure_tls, + disable_device_pairing=gateway.disable_device_pairing, + ) + + async def event_generator(): + from collections import deque + seen_ids: set[str] = set() + seen_queue: deque[str] = deque() + initial = True + + while True: + if await request.is_disconnected(): + break + + events = await fetch_recent_events( + config, + since_ids=None if initial else seen_ids, + max_sessions=POLL_HISTORY_SESSIONS_MAX, + history_limit=HISTORY_FETCH_LIMIT, + ) + initial = False + + for event in events: + if event.event_id in seen_ids: + continue + seen_ids.add(event.event_id) + seen_queue.append(event.event_id) + if len(seen_queue) > _RUNTIME_ACTIVITY_SEEN_MAX: + oldest = seen_queue.popleft() + seen_ids.discard(oldest) + yield { + "event": "runtime_message", + "data": json.dumps(event.to_dict()), + } + + await asyncio.sleep(_RUNTIME_ACTIVITY_POLL_SECONDS) + + return EventSourceResponse(event_generator(), ping=15) + + @router.delete("/{gateway_id}", response_model=OkResponse) async def delete_gateway( gateway_id: UUID, diff --git a/backend/app/services/openclaw/runtime_activity.py b/backend/app/services/openclaw/runtime_activity.py new file mode 100644 index 0000000..82a2e08 --- /dev/null +++ b/backend/app/services/openclaw/runtime_activity.py @@ -0,0 +1,366 @@ +"""Runtime activity service — fetch and normalize recent gateway session messages. + +Data source: gateway ``chat.history`` RPC (returns recent messages per session). +This is supplemental to the DB-backed activity feed; it shows what is happening +in active gateway sessions in near-real-time without requiring any writes. + +Design notes +------------ +- Polling: callers poll this service on an interval; it does not maintain state. +- Deduplication: based on ``(session_key, message_index)`` because chat.history + does not return stable message IDs. +- Redaction: known-sensitive tool argument names are blanked; large content is + truncated to a short preview. +- Authorization: callers must have already verified gateway ownership before + passing a config here. +""" + +from __future__ import annotations + +import hashlib +import re +from datetime import datetime +from typing import Any + +from app.core.logging import get_logger +from app.core.time import utcnow +from app.services.openclaw.gateway_rpc import ( + GatewayConfig, + OpenClawGatewayError, + get_chat_history, + openclaw_call, +) + +logger = get_logger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +CONTENT_PREVIEW_MAX = 300 # chars before truncation +HISTORY_FETCH_LIMIT = 20 # messages to fetch per session per poll +POLL_HISTORY_SESSIONS_MAX = 10 # max sessions to poll in one pass + +# Argument names that should be fully redacted from tool call previews +_REDACT_TOOL_ARG_NAMES = frozenset( + { + "password", "passwd", "secret", "token", "api_key", "apikey", + "access_key", "private_key", "credential", "credentials", + "authorization", "bearer", "session_token", "refresh_token", + } +) + +# Tool names whose entire input should be summarised rather than previewed +_SUMMARISE_TOOLS = frozenset({"bash", "computer", "str_replace_editor"}) + +# --------------------------------------------------------------------------- +# Normalized event type +# --------------------------------------------------------------------------- + +class RuntimeMessageEvent: + """Normalized representation of one gateway session message.""" + + __slots__ = ( + "event_id", + "session_key", + "session_label", + "role", + "model", + "content_preview", + "content_truncated", + "has_tool_use", + "tool_names", + "timestamp", + "agent_id", + "board_id", + "message_index", + ) + + def __init__( + self, + *, + event_id: str, + session_key: str, + session_label: str | None, + role: str, + model: str | None, + content_preview: str, + content_truncated: bool, + has_tool_use: bool, + tool_names: list[str], + timestamp: datetime | None, + agent_id: str | None, + board_id: str | None, + message_index: int, + ) -> None: + self.event_id = event_id + self.session_key = session_key + self.session_label = session_label + self.role = role + self.model = model + self.content_preview = content_preview + self.content_truncated = content_truncated + self.has_tool_use = has_tool_use + self.tool_names = tool_names + self.timestamp = timestamp + self.agent_id = agent_id + self.board_id = board_id + self.message_index = message_index + + def to_dict(self) -> dict[str, Any]: + return { + "event_id": self.event_id, + "session_key": self.session_key, + "session_label": self.session_label, + "role": self.role, + "model": self.model, + "content_preview": self.content_preview, + "content_truncated": self.content_truncated, + "has_tool_use": self.has_tool_use, + "tool_names": self.tool_names, + "timestamp": self.timestamp.isoformat() if self.timestamp else None, + "agent_id": self.agent_id, + "board_id": self.board_id, + "message_index": self.message_index, + } + + +# --------------------------------------------------------------------------- +# Content extraction and redaction +# --------------------------------------------------------------------------- + +def _extract_text(content: object) -> tuple[str, bool]: + """Return (preview_text, was_truncated) from a message content value.""" + if content is None: + return "", False + + if isinstance(content, str): + text = content + truncated = len(text) > CONTENT_PREVIEW_MAX + return text[:CONTENT_PREVIEW_MAX], truncated + + if isinstance(content, list): + parts: list[str] = [] + for block in content: + if not isinstance(block, dict): + continue + btype = block.get("type", "") + if btype == "text": + parts.append(str(block.get("text") or "")) + elif btype == "tool_use": + name = block.get("name", "tool") + if name in _SUMMARISE_TOOLS: + parts.append(f"[tool: {name}]") + else: + parts.append(f"[tool: {name}]") + elif btype == "tool_result": + result_content = block.get("content") + if isinstance(result_content, str): + parts.append(f"[result: {result_content[:80]}]") + else: + parts.append("[result]") + combined = " ".join(p for p in parts if p) + truncated = len(combined) > CONTENT_PREVIEW_MAX + return combined[:CONTENT_PREVIEW_MAX], truncated + + return str(content)[:CONTENT_PREVIEW_MAX], False + + +def redact_tool_args(args: dict[str, Any]) -> dict[str, Any]: + """Return a copy of tool args with sensitive keys replaced by ``[REDACTED]``.""" + if not isinstance(args, dict): + return {} + result: dict[str, Any] = {} + for key, value in args.items(): + if key.lower() in _REDACT_TOOL_ARG_NAMES: + result[key] = "[REDACTED]" + elif isinstance(value, str) and len(value) > 500: + result[key] = value[:200] + "…[truncated]" + else: + result[key] = value + return result + + +def _collect_tool_names(content: object) -> list[str]: + """Return names of all tool_use blocks in a message content.""" + if not isinstance(content, list): + return [] + return [ + str(block.get("name") or "unknown") + for block in content + if isinstance(block, dict) and block.get("type") == "tool_use" + ] + + +def _parse_timestamp(msg: dict[str, Any]) -> datetime | None: + for key in ("timestamp", "created_at", "createdAt", "time"): + val = msg.get(key) + if isinstance(val, str) and val.strip(): + try: + normalized = val.strip().replace("Z", "+00:00") + from datetime import timezone + parsed = datetime.fromisoformat(normalized) + if parsed.tzinfo is not None: + return parsed.astimezone(timezone.utc).replace(tzinfo=None) + return parsed + except ValueError: + pass + return None + + +# --------------------------------------------------------------------------- +# Session key → agent/board correlation +# --------------------------------------------------------------------------- + +_LEAD_SESSION_RE = re.compile( + r"^agent:lead-(?P[0-9a-fA-F-]{36}):main$" +) +_AGENT_SESSION_RE = re.compile( + r"^agent:(?P[^:]+):(?:main|board-(?P[0-9a-fA-F-]{36}))$" +) + + +def _correlate_session(session_key: str) -> tuple[str | None, str | None]: + """Return (agent_slug_or_none, board_id_or_none) inferred from the session key.""" + lead_m = _LEAD_SESSION_RE.match(session_key) + if lead_m: + return None, lead_m.group("board_id") + agent_m = _AGENT_SESSION_RE.match(session_key) + if agent_m: + return agent_m.group("agent_slug"), agent_m.group("board_id") + return None, None + + +# --------------------------------------------------------------------------- +# Message normaliser +# --------------------------------------------------------------------------- + +def normalize_message( + session_key: str, + session_label: str | None, + msg: dict[str, Any], + index: int, +) -> RuntimeMessageEvent: + """Convert one raw chat history message into a RuntimeMessageEvent.""" + role = str(msg.get("role") or "unknown") + model = msg.get("model") or None + if model: + model = str(model) + content = msg.get("content") + preview, truncated = _extract_text(content) + tool_names = _collect_tool_names(content) + ts = _parse_timestamp(msg) + agent_id, board_id = _correlate_session(session_key) + + # Stable deduplication key + event_id = hashlib.sha256( + f"{session_key}:{index}:{role}:{preview[:50]}".encode() + ).hexdigest()[:16] + + return RuntimeMessageEvent( + event_id=event_id, + session_key=session_key, + session_label=session_label, + role=role, + model=model, + content_preview=preview, + content_truncated=truncated, + has_tool_use=bool(tool_names), + tool_names=tool_names, + timestamp=ts, + agent_id=agent_id, + board_id=board_id, + message_index=index, + ) + + +# --------------------------------------------------------------------------- +# Gateway data fetching +# --------------------------------------------------------------------------- + +async def _safe_chat_history( + session_key: str, + config: GatewayConfig, + limit: int = HISTORY_FETCH_LIMIT, +) -> list[dict[str, Any]]: + """Fetch chat history for one session; return [] on any error.""" + try: + raw = await get_chat_history(session_key, config, limit=limit) + if isinstance(raw, dict): + messages = raw.get("messages") or raw.get("history") or [] + elif isinstance(raw, list): + messages = raw + else: + return [] + return [m for m in messages if isinstance(m, dict)] + except (OpenClawGatewayError, TimeoutError, OSError, RuntimeError) as exc: + logger.debug( + "runtime_activity.history_fetch_failed session_key=%s error=%s", + session_key, + exc, + ) + return [] + + +async def _list_active_sessions(config: GatewayConfig) -> list[dict[str, Any]]: + """Return list of active session dicts from the gateway.""" + try: + raw = await openclaw_call("sessions.list", {"limit": 50}, config=config) + if isinstance(raw, dict): + return [s for s in (raw.get("sessions") or []) if isinstance(s, dict)] + if isinstance(raw, list): + return [s for s in raw if isinstance(s, dict)] + except (OpenClawGatewayError, TimeoutError, OSError, RuntimeError) as exc: + logger.debug("runtime_activity.sessions_list_failed error=%s", exc) + return [] + + +# --------------------------------------------------------------------------- +# Main poll function +# --------------------------------------------------------------------------- + +async def fetch_recent_events( + config: GatewayConfig, + *, + since_ids: set[str] | None = None, + max_sessions: int = POLL_HISTORY_SESSIONS_MAX, + history_limit: int = HISTORY_FETCH_LIMIT, +) -> list[RuntimeMessageEvent]: + """Fetch and normalize recent messages across all active sessions. + + Args: + config: Gateway credentials/URL. + since_ids: Set of event_ids already seen; new events not in this set + are returned. Pass ``None`` for the initial load (returns all). + max_sessions: Cap on how many sessions to query per call. + history_limit: Number of messages to fetch per session. + + Returns: + List of new RuntimeMessageEvent objects, oldest-first. + """ + sessions = await _list_active_sessions(config) + sessions = sessions[:max_sessions] + + events: list[RuntimeMessageEvent] = [] + for session in sessions: + key = session.get("key") or session.get("id") + if not isinstance(key, str) or not key.strip(): + continue + label = session.get("label") or session.get("name") or None + + messages = await _safe_chat_history(key, config, limit=history_limit) + for idx, msg in enumerate(messages): + event = normalize_message(key, label, msg, idx) + if since_ids is None or event.event_id not in since_ids: + events.append(event) + + # Sort by timestamp (nulls last), then by session_key + index for stability + def sort_key(e: RuntimeMessageEvent) -> tuple: + return ( + e.timestamp or datetime.min, + e.session_key, + e.message_index, + ) + + events.sort(key=sort_key) + return events diff --git a/backend/tests/test_runtime_activity.py b/backend/tests/test_runtime_activity.py new file mode 100644 index 0000000..9f7fa11 --- /dev/null +++ b/backend/tests/test_runtime_activity.py @@ -0,0 +1,306 @@ +# ruff: noqa: INP001 +"""Unit tests for runtime_activity service helpers. + +All tests are pure-Python — no gateway connection required. +""" + +from __future__ import annotations + +from datetime import datetime + +import pytest + +from app.services.openclaw.runtime_activity import ( + RuntimeMessageEvent, + _collect_tool_names, + _correlate_session, + _extract_text, + _parse_timestamp, + normalize_message, + redact_tool_args, +) + + +# --------------------------------------------------------------------------- +# _extract_text +# --------------------------------------------------------------------------- + +class TestExtractText: + + def test_string_content_short(self): + text, truncated = _extract_text("Hello world") + assert text == "Hello world" + assert not truncated + + def test_string_content_truncated(self): + long_text = "x" * 400 + text, truncated = _extract_text(long_text) + assert len(text) == 300 + assert truncated + + def test_none_content(self): + text, truncated = _extract_text(None) + assert text == "" + assert not truncated + + def test_list_text_block(self): + content = [{"type": "text", "text": "The answer is 42."}] + text, truncated = _extract_text(content) + assert "42" in text + assert not truncated + + def test_list_tool_use_block(self): + content = [ + {"type": "text", "text": "I'll run a command."}, + {"type": "tool_use", "name": "bash", "input": {"command": "ls -la"}}, + ] + text, _ = _extract_text(content) + assert "run a command" in text + assert "[tool: bash]" in text + + def test_list_tool_result_string(self): + content = [ + {"type": "tool_result", "tool_use_id": "x", "content": "file1.txt\nfile2.txt"}, + ] + text, _ = _extract_text(content) + assert "[result:" in text + + def test_list_multiple_text_blocks(self): + content = [ + {"type": "text", "text": "First."}, + {"type": "text", "text": "Second."}, + ] + text, _ = _extract_text(content) + assert "First" in text + assert "Second" in text + + def test_empty_list(self): + text, truncated = _extract_text([]) + assert text == "" + assert not truncated + + def test_non_dict_blocks_skipped(self): + content = ["plain string", {"type": "text", "text": "valid"}] + text, _ = _extract_text(content) + assert "valid" in text + + +# --------------------------------------------------------------------------- +# redact_tool_args +# --------------------------------------------------------------------------- + +class TestRedactToolArgs: + + def test_no_sensitive_keys(self): + args = {"command": "ls -la", "cwd": "/home"} + result = redact_tool_args(args) + assert result["command"] == "ls -la" + assert result["cwd"] == "/home" + + def test_password_redacted(self): + result = redact_tool_args({"password": "secret123", "user": "admin"}) + assert result["password"] == "[REDACTED]" + assert result["user"] == "admin" + + def test_token_redacted(self): + result = redact_tool_args({"token": "sk-abc123", "action": "read"}) + assert result["token"] == "[REDACTED]" + + def test_api_key_redacted(self): + result = redact_tool_args({"api_key": "abc", "model": "gpt-4"}) + assert result["api_key"] == "[REDACTED]" + + def test_case_insensitive(self): + result = redact_tool_args({"PASSWORD": "x", "Secret": "y"}) + assert result["PASSWORD"] == "[REDACTED]" + assert result["Secret"] == "[REDACTED]" + + def test_long_string_truncated(self): + result = redact_tool_args({"content": "A" * 1000}) + assert len(result["content"]) < 1000 + assert "truncated" in result["content"] + + def test_empty_dict(self): + assert redact_tool_args({}) == {} + + def test_non_dict_returns_empty(self): + assert redact_tool_args("bad") == {} # type: ignore[arg-type] + + def test_credentials_key_redacted(self): + result = redact_tool_args({"credentials": "AKIA...", "region": "us-east-1"}) + assert result["credentials"] == "[REDACTED]" + + +# --------------------------------------------------------------------------- +# _collect_tool_names +# --------------------------------------------------------------------------- + +class TestCollectToolNames: + + def test_no_tool_use(self): + assert _collect_tool_names("plain text") == [] + + def test_single_tool_use(self): + content = [{"type": "tool_use", "name": "bash", "input": {}}] + assert _collect_tool_names(content) == ["bash"] + + def test_multiple_tool_uses(self): + content = [ + {"type": "text", "text": "hi"}, + {"type": "tool_use", "name": "read_file", "input": {}}, + {"type": "tool_use", "name": "bash", "input": {}}, + ] + names = _collect_tool_names(content) + assert names == ["read_file", "bash"] + + def test_no_name_fallback(self): + content = [{"type": "tool_use"}] + assert _collect_tool_names(content) == ["unknown"] + + +# --------------------------------------------------------------------------- +# _parse_timestamp +# --------------------------------------------------------------------------- + +class TestParseTimestamp: + + def test_iso_zulu(self): + msg = {"timestamp": "2026-05-21T10:00:00Z"} + ts = _parse_timestamp(msg) + assert isinstance(ts, datetime) + assert ts.year == 2026 + assert ts.hour == 10 + + def test_iso_with_offset(self): + msg = {"timestamp": "2026-05-21T12:00:00+02:00"} + ts = _parse_timestamp(msg) + assert ts is not None + assert ts.hour == 10 # converted to UTC + + def test_created_at_fallback(self): + msg = {"created_at": "2026-05-21T09:30:00Z"} + ts = _parse_timestamp(msg) + assert ts is not None + + def test_no_timestamp_returns_none(self): + assert _parse_timestamp({}) is None + + def test_malformed_returns_none(self): + assert _parse_timestamp({"timestamp": "not-a-date"}) is None + + +# --------------------------------------------------------------------------- +# _correlate_session +# --------------------------------------------------------------------------- + +class TestCorrelateSession: + + def test_lead_session_extracts_board_id(self): + board_id = "d8ec2aa9-fa86-4ab2-9f17-6518ccd600df" + agent_slug, bid = _correlate_session(f"agent:lead-{board_id}:main") + assert agent_slug is None + assert bid == board_id + + def test_agent_session_extracts_slug(self): + slug, bid = _correlate_session("agent:my-agent:main") + assert slug == "my-agent" + assert bid is None + + def test_unknown_format_returns_nones(self): + slug, bid = _correlate_session("some-random-key") + assert slug is None + assert bid is None + + def test_empty_returns_nones(self): + slug, bid = _correlate_session("") + assert slug is None + assert bid is None + + +# --------------------------------------------------------------------------- +# normalize_message +# --------------------------------------------------------------------------- + +class TestNormalizeMessage: + + def _make_msg(self, **kwargs) -> dict: + return { + "role": "assistant", + "content": "Hello!", + "model": "claude-sonnet-4-6", + **kwargs, + } + + def test_basic_assistant_message(self): + msg = self._make_msg() + event = normalize_message("agent:test:main", "Test", msg, 0) + assert isinstance(event, RuntimeMessageEvent) + assert event.role == "assistant" + assert event.model == "claude-sonnet-4-6" + assert event.content_preview == "Hello!" + assert not event.content_truncated + assert not event.has_tool_use + assert event.session_key == "agent:test:main" + assert event.session_label == "Test" + assert event.message_index == 0 + + def test_user_message(self): + msg = {"role": "user", "content": "What time is it?"} + event = normalize_message("session:1", None, msg, 1) + assert event.role == "user" + assert event.model is None + assert event.session_label is None + + def test_tool_use_detected(self): + msg = { + "role": "assistant", + "content": [ + {"type": "text", "text": "Running command."}, + {"type": "tool_use", "name": "bash", "input": {"command": "ls"}}, + ], + } + event = normalize_message("s", None, msg, 0) + assert event.has_tool_use + assert "bash" in event.tool_names + + def test_long_content_truncated(self): + long = "w" * 400 + msg = self._make_msg(content=long) + event = normalize_message("s", None, msg, 0) + assert event.content_truncated + assert len(event.content_preview) == 300 + + def test_event_id_is_stable(self): + msg = self._make_msg() + e1 = normalize_message("s", None, msg, 0) + e2 = normalize_message("s", None, msg, 0) + assert e1.event_id == e2.event_id + + def test_event_id_differs_by_index(self): + msg = self._make_msg() + e1 = normalize_message("s", None, msg, 0) + e2 = normalize_message("s", None, msg, 1) + assert e1.event_id != e2.event_id + + def test_board_id_from_lead_session(self): + board_id = "d8ec2aa9-fa86-4ab2-9f17-6518ccd600df" + msg = self._make_msg() + event = normalize_message(f"agent:lead-{board_id}:main", None, msg, 0) + assert event.board_id == board_id + assert event.agent_id is None + + def test_to_dict_serialisable(self): + msg = self._make_msg() + event = normalize_message("s", "Label", msg, 0) + d = event.to_dict() + assert d["role"] == "assistant" + assert d["session_label"] == "Label" + assert isinstance(d["tool_names"], list) + # timestamp is None here since no timestamp in msg + assert d["timestamp"] is None + + def test_timestamp_parsed(self): + msg = self._make_msg(timestamp="2026-05-21T10:00:00Z") + event = normalize_message("s", None, msg, 0) + assert event.timestamp is not None + assert event.timestamp.hour == 10 diff --git a/frontend/src/api/generated/gateways/gateways.ts b/frontend/src/api/generated/gateways/gateways.ts index 0c70654..f978958 100644 --- a/frontend/src/api/generated/gateways/gateways.ts +++ b/frontend/src/api/generated/gateways/gateways.ts @@ -42,6 +42,7 @@ import type { ListGatewaySessionsApiV1GatewaysSessionsGetParams, ListGatewaysApiV1GatewaysGetParams, OkResponse, + ProviderUsageResponse, RuntimeUsageResponse, SendGatewaySessionMessageApiV1GatewaysSessionsSessionIdMessagePostParams, SyncGatewayTemplatesApiV1GatewaysGatewayIdTemplatesSyncPostParams, @@ -3161,3 +3162,289 @@ export function useGetGatewayRuntimeUsageApiV1GatewaysGatewayIdRuntimeUsageGet< return { ...query, queryKey: queryOptions.queryKey }; } + +/** + * Return provider-native subscription usage data scraped from the CLI (e.g. ``claude /usage``). Returns an empty results list when USAGE_SCRAPER_ENABLED=false (the default). Enable with USAGE_SCRAPER_ENABLED=true and ensure the required prerequisites (tmux, claude binary) are accessible from the backend process. + * @summary Gateway provider-native usage (opt-in scraper) + */ +export type getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponse200 = + { + data: ProviderUsageResponse; + status: 200; + }; + +export type getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponse422 = + { + data: HTTPValidationError; + status: 422; + }; + +export type getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponseSuccess = + getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponse200 & { + headers: Headers; + }; +export type getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponseError = + getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponse422 & { + headers: Headers; + }; + +export type getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponse = + + | getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponseSuccess + | getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponseError; + +export const getGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetUrl = + (gatewayId: string) => { + return `/api/v1/gateways/${gatewayId}/provider-usage`; + }; + +export const getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet = + async ( + gatewayId: string, + options?: RequestInit, + ): Promise => { + return customFetch( + getGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetUrl( + gatewayId, + ), + { + ...options, + method: "GET", + }, + ); + }; + +export const getGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetQueryKey = + (gatewayId: string) => { + return [`/api/v1/gateways/${gatewayId}/provider-usage`] as const; + }; + +export const getGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetQueryOptions = + < + TData = Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError = HTTPValidationError, + >( + gatewayId: string, + options?: { + query?: Partial< + UseQueryOptions< + Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError, + TData + > + >; + request?: SecondParameter; + }, + ) => { + const { query: queryOptions, request: requestOptions } = options ?? {}; + + const queryKey = + queryOptions?.queryKey ?? + getGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetQueryKey( + gatewayId, + ); + + const queryFn: QueryFunction< + Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + > + > = ({ signal }) => + getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet(gatewayId, { + signal, + ...requestOptions, + }); + + return { + queryKey, + queryFn, + enabled: !!gatewayId, + ...queryOptions, + } as UseQueryOptions< + Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError, + TData + > & { queryKey: DataTag }; + }; + +export type GetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetQueryResult = + NonNullable< + Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + > + >; +export type GetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetQueryError = + HTTPValidationError; + +export function useGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet< + TData = Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError = HTTPValidationError, +>( + gatewayId: string, + options: { + query: Partial< + UseQueryOptions< + Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError, + TData + > + > & + Pick< + DefinedInitialDataOptions< + Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError, + Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + > + >, + "initialData" + >; + request?: SecondParameter; + }, + queryClient?: QueryClient, +): DefinedUseQueryResult & { + queryKey: DataTag; +}; +export function useGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet< + TData = Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError = HTTPValidationError, +>( + gatewayId: string, + options?: { + query?: Partial< + UseQueryOptions< + Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError, + TData + > + > & + Pick< + UndefinedInitialDataOptions< + Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError, + Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + > + >, + "initialData" + >; + request?: SecondParameter; + }, + queryClient?: QueryClient, +): UseQueryResult & { + queryKey: DataTag; +}; +export function useGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet< + TData = Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError = HTTPValidationError, +>( + gatewayId: string, + options?: { + query?: Partial< + UseQueryOptions< + Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError, + TData + > + >; + request?: SecondParameter; + }, + queryClient?: QueryClient, +): UseQueryResult & { + queryKey: DataTag; +}; +/** + * @summary Gateway provider-native usage (opt-in scraper) + */ + +export function useGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet< + TData = Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError = HTTPValidationError, +>( + gatewayId: string, + options?: { + query?: Partial< + UseQueryOptions< + Awaited< + ReturnType< + typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet + > + >, + TError, + TData + > + >; + request?: SecondParameter; + }, + queryClient?: QueryClient, +): UseQueryResult & { + queryKey: DataTag; +} { + const queryOptions = + getGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetQueryOptions( + gatewayId, + options, + ); + + const query = useQuery(queryOptions, queryClient) as UseQueryResult< + TData, + TError + > & { queryKey: DataTag }; + + return { ...query, queryKey: queryOptions.queryKey }; +} diff --git a/frontend/src/api/generated/model/index.ts b/frontend/src/api/generated/model/index.ts index 7cfb186..cf6145e 100644 --- a/frontend/src/api/generated/model/index.ts +++ b/frontend/src/api/generated/model/index.ts @@ -225,6 +225,8 @@ export * from "./organizationMemberRead"; export * from "./organizationMemberUpdate"; export * from "./organizationRead"; export * from "./organizationUserRead"; +export * from "./providerUsageResponse"; +export * from "./providerUsageScrapeResult"; export * from "./readyzReadyzGet200"; export * from "./runtimeUsageBurnRate"; export * from "./runtimeUsageCurrent"; diff --git a/frontend/src/api/generated/model/providerUsageResponse.ts b/frontend/src/api/generated/model/providerUsageResponse.ts new file mode 100644 index 0000000..c1938f6 --- /dev/null +++ b/frontend/src/api/generated/model/providerUsageResponse.ts @@ -0,0 +1,17 @@ +/** + * Generated by orval v8.3.0 🍺 + * Do not edit manually. + * Mission Control API + * OpenAPI spec version: 0.1.0 + */ +import type { ProviderUsageScrapeResult } from "./providerUsageScrapeResult"; + +/** + * Response envelope for GET /gateways/{id}/provider-usage. + */ +export interface ProviderUsageResponse { + gateway_id: string; + generated_at: string; + scraper_enabled: boolean; + results: ProviderUsageScrapeResult[]; +} diff --git a/frontend/src/api/generated/model/providerUsageScrapeResult.ts b/frontend/src/api/generated/model/providerUsageScrapeResult.ts new file mode 100644 index 0000000..6cb1e37 --- /dev/null +++ b/frontend/src/api/generated/model/providerUsageScrapeResult.ts @@ -0,0 +1,30 @@ +/** + * Generated by orval v8.3.0 🍺 + * Do not edit manually. + * Mission Control API + * OpenAPI spec version: 0.1.0 + */ + +/** + * Structured result from one provider-native usage scrape (e.g. Claude CLI /usage). + +Returned by GET /gateways/{id}/provider-usage. +All fields are optional — partial data is still useful and expected +when CLI output format changes or the session is quiet. + */ +export interface ProviderUsageScrapeResult { + provider: string; + source_name: string; + scraped_at: string; + fresh: boolean; + freshness_ttl_seconds: number; + current_pct?: number | null; + remaining_ms?: number | null; + remaining_label?: string | null; + weekly_messages_used?: number | null; + weekly_messages_limit?: number | null; + weekly_tokens_used?: number | null; + weekly_cost_usd?: number | null; + raw_text?: string | null; + error?: string | null; +} diff --git a/frontend/src/app/activity/page.tsx b/frontend/src/app/activity/page.tsx index 0344fe5..f84eb29 100644 --- a/frontend/src/app/activity/page.tsx +++ b/frontend/src/app/activity/page.tsx @@ -32,6 +32,10 @@ import type { } from "@/api/generated/model"; import { Markdown } from "@/components/atoms/Markdown"; import { ActivityFeed } from "@/components/activity/ActivityFeed"; +import { RuntimeActivityFeed } from "@/components/activity/RuntimeActivityFeed"; +import { + listGatewaysApiV1GatewaysGet, +} from "@/api/generated/gateways/gateways"; import { SignedOutPanel } from "@/components/auth/SignedOutPanel"; import { DashboardSidebar } from "@/components/organisms/DashboardSidebar"; import { DashboardShell } from "@/components/templates/DashboardShell"; @@ -391,6 +395,23 @@ export default function ActivityPage() { const { isSignedIn } = useAuth(); const searchParams = useSearchParams(); const isPageActive = usePageActive(); + const activeTab = searchParams.get("tab") === "runtime" ? "runtime" : "feed"; + + // Gateways — loaded once for the Runtime tab + const [runtimeGateways, setRuntimeGateways] = useState<{ id: string; name: string }[]>([]); + useEffect(() => { + if (!isSignedIn) return; + listGatewaysApiV1GatewaysGet({ limit: 50, offset: 0 }) + .then((res) => { + if (res.status === 200) { + setRuntimeGateways( + (res.data.items ?? []).map((gw) => ({ id: gw.id, name: gw.name })), + ); + } + }) + .catch(() => {/* gateways unavailable — runtime tab shows empty state */}); + }, [isSignedIn]); + const selectedEventId = useMemo(() => { const value = searchParams.get("eventId"); if (!value) return null; @@ -1514,19 +1535,45 @@ export default function ActivityPage() {

- Live feed + {activeTab === "runtime" ? "Runtime Activity" : "Live feed"}

- Realtime task, approval, agent, and board-chat activity - across all boards. + {activeTab === "runtime" + ? "Live session messages from gateway runtime." + : "Realtime task, approval, agent, and board-chat activity across all boards."}

+ {/* Tab bar */} +
+ {( + [ + { key: "feed", label: "Board Feed" }, + { key: "runtime", label: "Runtime" }, + ] as const + ).map(({ key, label }) => ( + + {label} + + ))} +
+ {activeTab === "runtime" ? ( + + ) : ( + <> {hasUnresolvedDeepLink ? (
Requested activity item is not in the current feed window yet. @@ -1544,6 +1591,8 @@ export default function ActivityPage() { /> )} /> + + )}
diff --git a/frontend/src/components/activity/RuntimeActivityFeed.tsx b/frontend/src/components/activity/RuntimeActivityFeed.tsx new file mode 100644 index 0000000..2fbec07 --- /dev/null +++ b/frontend/src/components/activity/RuntimeActivityFeed.tsx @@ -0,0 +1,275 @@ +"use client"; + +import { useCallback, useEffect, useRef, useState } from "react"; +import { AlertCircle, Bot, ChevronDown, Cpu, User, Wrench } from "lucide-react"; +import { useAuth } from "@/auth/clerk"; +import { getLocalAuthToken, isLocalAuthMode } from "@/auth/localAuth"; +import { getApiBaseUrl } from "@/lib/api-base"; +import { formatRelativeTimestamp } from "@/lib/formatters"; + +interface RuntimeMessage { + event_id: string; + session_key: string; + session_label: string | null; + role: string; + model: string | null; + content_preview: string; + content_truncated: boolean; + has_tool_use: boolean; + tool_names: string[]; + timestamp: string | null; + agent_id: string | null; + board_id: string | null; +} + +interface Gateway { + id: string; + name: string; +} + +interface RuntimeActivityFeedProps { + gateways: Gateway[]; +} + +// --------------------------------------------------------------------------- +// Role colours + icons +// --------------------------------------------------------------------------- +const ROLE_STYLE: Record = { + user: { label: "User", badge: "bg-[color:var(--accent-soft)] text-[color:var(--accent)]", icon: User }, + assistant: { label: "Assistant", badge: "bg-[color:rgba(52,211,153,0.15)] text-[color:var(--success)]", icon: Bot }, + tool: { label: "Tool", badge: "bg-[color:rgba(251,191,36,0.15)] text-[color:var(--warning)]", icon: Wrench}, +}; +const DEFAULT_ROLE = { label: "System", badge: "bg-[color:var(--surface-strong)] text-muted", icon: Cpu }; + +function roleMeta(role: string) { + return ROLE_STYLE[role.toLowerCase()] ?? DEFAULT_ROLE; +} + +// --------------------------------------------------------------------------- +// SSE connection helper +// --------------------------------------------------------------------------- +async function getAuthHeaders(): Promise> { + const headers: Record = { Accept: "text/event-stream" }; + if (isLocalAuthMode()) { + const token = getLocalAuthToken(); + if (token) headers.Authorization = `Bearer ${token}`; + } else { + try { + const clerk = (window as unknown as { Clerk?: { session?: { getToken: () => Promise } } }).Clerk; + if (clerk?.session) { + const token = await clerk.session.getToken(); + if (token) headers.Authorization = `Bearer ${token}`; + } + } catch { + // Clerk not ready + } + } + return headers; +} + +// --------------------------------------------------------------------------- +// Main component +// --------------------------------------------------------------------------- +export function RuntimeActivityFeed({ gateways }: RuntimeActivityFeedProps) { + const { isSignedIn } = useAuth(); + const [gatewayId, setGatewayId] = useState(() => gateways[0]?.id ?? ""); + const [messages, setMessages] = useState([]); + const [connected, setConnected] = useState(false); + const [error, setError] = useState(null); + const cancelledRef = useRef(false); + const seenIdsRef = useRef(new Set()); + + const connect = useCallback(async () => { + if (!gatewayId || !isSignedIn) return; + cancelledRef.current = false; + setError(null); + + const baseUrl = getApiBaseUrl(); + const url = `${baseUrl}/api/v1/gateways/${gatewayId}/runtime-activity/stream`; + const headers = await getAuthHeaders(); + + try { + const response = await fetch(url, { headers }); + if (!response.ok) { + setError(`Gateway returned ${response.status}`); + return; + } + if (!response.body) { + setError("Stream unavailable"); + return; + } + setConnected(true); + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (!cancelledRef.current) { + const { value, done } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + buffer = buffer.replace(/\r\n/g, "\n"); + let boundary = buffer.indexOf("\n\n"); + while (boundary !== -1) { + const raw = buffer.slice(0, boundary); + buffer = buffer.slice(boundary + 2); + let eventType = ""; + let data = ""; + for (const line of raw.split("\n")) { + if (line.startsWith("event:")) eventType = line.slice(6).trim(); + else if (line.startsWith("data:")) data += line.slice(5).trim(); + } + if (eventType === "runtime_message" && data) { + try { + const msg = JSON.parse(data) as RuntimeMessage; + if (!seenIdsRef.current.has(msg.event_id)) { + seenIdsRef.current.add(msg.event_id); + setMessages((prev) => [msg, ...prev].slice(0, 300)); + } + } catch { + // skip malformed event + } + } + boundary = buffer.indexOf("\n\n"); + } + } + } catch (err) { + if (!cancelledRef.current) { + setError(err instanceof Error ? err.message : "Connection lost"); + setConnected(false); + } + } + }, [gatewayId, isSignedIn]); + + useEffect(() => { + setMessages([]); + seenIdsRef.current = new Set(); + setConnected(false); + connect(); + return () => { + cancelledRef.current = true; + }; + }, [connect]); + + if (!gateways.length) { + return ( +
+ +

No gateways configured

+

+ Configure a gateway on a board to see its live activity here. +

+
+ ); + } + + return ( +
+ {/* Gateway picker + status */} +
+
+
+ + {connected ? "Live" : "Connecting…"} + +
+ + {gateways.length > 1 && ( +
+ + +
+ )} +
+ + {error && ( +
+ + {error} +
+ )} + + {/* Feed */} +
+ {messages.length === 0 && connected && ( +

+ Waiting for session activity… +

+ )} + {messages.map((msg) => { + const meta = roleMeta(msg.role); + const Icon = meta.icon; + const shortModel = msg.model?.includes("/") + ? msg.model.split("/")[1] + : msg.model; + return ( +
+
+ + + {meta.label} + + {msg.session_label && ( + + {msg.session_label} + + )} + {shortModel && ( + + {shortModel} + + )} + {msg.has_tool_use && msg.tool_names.length > 0 && ( + + {msg.tool_names.join(", ")} + + )} + + {msg.timestamp + ? formatRelativeTimestamp(msg.timestamp) + : null} + +
+ {msg.content_preview && ( +

+ {msg.content_preview} + {msg.content_truncated && ( + + [truncated] + + )} +

+ )} + {msg.board_id && ( +

+ board {msg.board_id.slice(0, 8)}… +

+ )} +
+ ); + })} +
+
+ ); +}