feat(runtime-activity): live feed and activity correlation (batch 4, #33)

This commit is contained in:
null 2026-05-20 21:08:20 -05:00
parent 1a847133ce
commit 16b23eef2e
9 changed files with 1453 additions and 4 deletions

View File

@ -2,11 +2,14 @@
from __future__ import annotations
import asyncio
import json
from typing import TYPE_CHECKING
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Depends, Query, Request
from sqlmodel import col
from sse_starlette.sse import EventSourceResponse
from app.api.deps import require_org_admin, require_org_member
from app.core.auth import AuthContext, get_auth_context
@ -33,6 +36,11 @@ from app.schemas.runtime_usage import (
ProviderUsageScrapeResult,
RuntimeUsageResponse,
)
from app.services.openclaw.runtime_activity import (
HISTORY_FETCH_LIMIT,
POLL_HISTORY_SESSIONS_MAX,
fetch_recent_events,
)
from app.services.openclaw.runtime_usage import get_runtime_usage
from app.services.openclaw.usage_scrapers import get_provider_usage
from app.schemas.pagination import DefaultLimitOffsetPage
@ -352,6 +360,115 @@ async def get_gateway_provider_usage(
)
@router.get(
"/{gateway_id}/runtime-activity",
summary="Recent gateway runtime messages (REST snapshot)",
description=(
"Return the most recent messages from all active sessions on the gateway, "
"normalised and redacted. Use the streaming endpoint for a live feed."
),
)
async def get_gateway_runtime_activity(
gateway_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> dict:
"""Non-streaming snapshot of recent gateway activity."""
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
service = GatewayAdminLifecycleService(session)
gateway = await service.require_gateway(
gateway_id=gateway_id,
organization_id=ctx.organization.id,
)
config = GatewayClientConfig(
url=gateway.url,
token=gateway.token,
allow_insecure_tls=gateway.allow_insecure_tls,
disable_device_pairing=gateway.disable_device_pairing,
)
events = await fetch_recent_events(
config,
max_sessions=POLL_HISTORY_SESSIONS_MAX,
history_limit=HISTORY_FETCH_LIMIT,
)
return {
"gateway_id": str(gateway_id),
"generated_at": utcnow().isoformat(),
"events": [e.to_dict() for e in events],
}
_RUNTIME_ACTIVITY_POLL_SECONDS = 4
_RUNTIME_ACTIVITY_SEEN_MAX = 1000
@router.get(
"/{gateway_id}/runtime-activity/stream",
summary="Live gateway runtime activity (SSE)",
description=(
"Stream normalised gateway session messages as server-sent events. "
"Each event is of type ``runtime_message``. The stream polls the "
"gateway every few seconds and emits only new messages."
),
)
async def stream_gateway_runtime_activity(
request: Request,
gateway_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> EventSourceResponse:
"""SSE stream of live gateway session messages."""
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
service = GatewayAdminLifecycleService(session)
gateway = await service.require_gateway(
gateway_id=gateway_id,
organization_id=ctx.organization.id,
)
config = GatewayClientConfig(
url=gateway.url,
token=gateway.token,
allow_insecure_tls=gateway.allow_insecure_tls,
disable_device_pairing=gateway.disable_device_pairing,
)
async def event_generator():
from collections import deque
seen_ids: set[str] = set()
seen_queue: deque[str] = deque()
initial = True
while True:
if await request.is_disconnected():
break
events = await fetch_recent_events(
config,
since_ids=None if initial else seen_ids,
max_sessions=POLL_HISTORY_SESSIONS_MAX,
history_limit=HISTORY_FETCH_LIMIT,
)
initial = False
for event in events:
if event.event_id in seen_ids:
continue
seen_ids.add(event.event_id)
seen_queue.append(event.event_id)
if len(seen_queue) > _RUNTIME_ACTIVITY_SEEN_MAX:
oldest = seen_queue.popleft()
seen_ids.discard(oldest)
yield {
"event": "runtime_message",
"data": json.dumps(event.to_dict()),
}
await asyncio.sleep(_RUNTIME_ACTIVITY_POLL_SECONDS)
return EventSourceResponse(event_generator(), ping=15)
@router.delete("/{gateway_id}", response_model=OkResponse)
async def delete_gateway(
gateway_id: UUID,

View File

@ -0,0 +1,366 @@
"""Runtime activity service — fetch and normalize recent gateway session messages.
Data source: gateway ``chat.history`` RPC (returns recent messages per session).
This is supplemental to the DB-backed activity feed; it shows what is happening
in active gateway sessions in near-real-time without requiring any writes.
Design notes
------------
- Polling: callers poll this service on an interval; it does not maintain state.
- Deduplication: based on ``(session_key, message_index)`` because chat.history
does not return stable message IDs.
- Redaction: known-sensitive tool argument names are blanked; large content is
truncated to a short preview.
- Authorization: callers must have already verified gateway ownership before
passing a config here.
"""
from __future__ import annotations
import hashlib
import re
from datetime import datetime
from typing import Any
from app.core.logging import get_logger
from app.core.time import utcnow
from app.services.openclaw.gateway_rpc import (
GatewayConfig,
OpenClawGatewayError,
get_chat_history,
openclaw_call,
)
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
CONTENT_PREVIEW_MAX = 300 # chars before truncation
HISTORY_FETCH_LIMIT = 20 # messages to fetch per session per poll
POLL_HISTORY_SESSIONS_MAX = 10 # max sessions to poll in one pass
# Argument names that should be fully redacted from tool call previews
_REDACT_TOOL_ARG_NAMES = frozenset(
{
"password", "passwd", "secret", "token", "api_key", "apikey",
"access_key", "private_key", "credential", "credentials",
"authorization", "bearer", "session_token", "refresh_token",
}
)
# Tool names whose entire input should be summarised rather than previewed
_SUMMARISE_TOOLS = frozenset({"bash", "computer", "str_replace_editor"})
# ---------------------------------------------------------------------------
# Normalized event type
# ---------------------------------------------------------------------------
class RuntimeMessageEvent:
"""Normalized representation of one gateway session message."""
__slots__ = (
"event_id",
"session_key",
"session_label",
"role",
"model",
"content_preview",
"content_truncated",
"has_tool_use",
"tool_names",
"timestamp",
"agent_id",
"board_id",
"message_index",
)
def __init__(
self,
*,
event_id: str,
session_key: str,
session_label: str | None,
role: str,
model: str | None,
content_preview: str,
content_truncated: bool,
has_tool_use: bool,
tool_names: list[str],
timestamp: datetime | None,
agent_id: str | None,
board_id: str | None,
message_index: int,
) -> None:
self.event_id = event_id
self.session_key = session_key
self.session_label = session_label
self.role = role
self.model = model
self.content_preview = content_preview
self.content_truncated = content_truncated
self.has_tool_use = has_tool_use
self.tool_names = tool_names
self.timestamp = timestamp
self.agent_id = agent_id
self.board_id = board_id
self.message_index = message_index
def to_dict(self) -> dict[str, Any]:
return {
"event_id": self.event_id,
"session_key": self.session_key,
"session_label": self.session_label,
"role": self.role,
"model": self.model,
"content_preview": self.content_preview,
"content_truncated": self.content_truncated,
"has_tool_use": self.has_tool_use,
"tool_names": self.tool_names,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"agent_id": self.agent_id,
"board_id": self.board_id,
"message_index": self.message_index,
}
# ---------------------------------------------------------------------------
# Content extraction and redaction
# ---------------------------------------------------------------------------
def _extract_text(content: object) -> tuple[str, bool]:
"""Return (preview_text, was_truncated) from a message content value."""
if content is None:
return "", False
if isinstance(content, str):
text = content
truncated = len(text) > CONTENT_PREVIEW_MAX
return text[:CONTENT_PREVIEW_MAX], truncated
if isinstance(content, list):
parts: list[str] = []
for block in content:
if not isinstance(block, dict):
continue
btype = block.get("type", "")
if btype == "text":
parts.append(str(block.get("text") or ""))
elif btype == "tool_use":
name = block.get("name", "tool")
if name in _SUMMARISE_TOOLS:
parts.append(f"[tool: {name}]")
else:
parts.append(f"[tool: {name}]")
elif btype == "tool_result":
result_content = block.get("content")
if isinstance(result_content, str):
parts.append(f"[result: {result_content[:80]}]")
else:
parts.append("[result]")
combined = " ".join(p for p in parts if p)
truncated = len(combined) > CONTENT_PREVIEW_MAX
return combined[:CONTENT_PREVIEW_MAX], truncated
return str(content)[:CONTENT_PREVIEW_MAX], False
def redact_tool_args(args: dict[str, Any]) -> dict[str, Any]:
"""Return a copy of tool args with sensitive keys replaced by ``[REDACTED]``."""
if not isinstance(args, dict):
return {}
result: dict[str, Any] = {}
for key, value in args.items():
if key.lower() in _REDACT_TOOL_ARG_NAMES:
result[key] = "[REDACTED]"
elif isinstance(value, str) and len(value) > 500:
result[key] = value[:200] + "…[truncated]"
else:
result[key] = value
return result
def _collect_tool_names(content: object) -> list[str]:
"""Return names of all tool_use blocks in a message content."""
if not isinstance(content, list):
return []
return [
str(block.get("name") or "unknown")
for block in content
if isinstance(block, dict) and block.get("type") == "tool_use"
]
def _parse_timestamp(msg: dict[str, Any]) -> datetime | None:
for key in ("timestamp", "created_at", "createdAt", "time"):
val = msg.get(key)
if isinstance(val, str) and val.strip():
try:
normalized = val.strip().replace("Z", "+00:00")
from datetime import timezone
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is not None:
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
return parsed
except ValueError:
pass
return None
# ---------------------------------------------------------------------------
# Session key → agent/board correlation
# ---------------------------------------------------------------------------
_LEAD_SESSION_RE = re.compile(
r"^agent:lead-(?P<board_id>[0-9a-fA-F-]{36}):main$"
)
_AGENT_SESSION_RE = re.compile(
r"^agent:(?P<agent_slug>[^:]+):(?:main|board-(?P<board_id>[0-9a-fA-F-]{36}))$"
)
def _correlate_session(session_key: str) -> tuple[str | None, str | None]:
"""Return (agent_slug_or_none, board_id_or_none) inferred from the session key."""
lead_m = _LEAD_SESSION_RE.match(session_key)
if lead_m:
return None, lead_m.group("board_id")
agent_m = _AGENT_SESSION_RE.match(session_key)
if agent_m:
return agent_m.group("agent_slug"), agent_m.group("board_id")
return None, None
# ---------------------------------------------------------------------------
# Message normaliser
# ---------------------------------------------------------------------------
def normalize_message(
session_key: str,
session_label: str | None,
msg: dict[str, Any],
index: int,
) -> RuntimeMessageEvent:
"""Convert one raw chat history message into a RuntimeMessageEvent."""
role = str(msg.get("role") or "unknown")
model = msg.get("model") or None
if model:
model = str(model)
content = msg.get("content")
preview, truncated = _extract_text(content)
tool_names = _collect_tool_names(content)
ts = _parse_timestamp(msg)
agent_id, board_id = _correlate_session(session_key)
# Stable deduplication key
event_id = hashlib.sha256(
f"{session_key}:{index}:{role}:{preview[:50]}".encode()
).hexdigest()[:16]
return RuntimeMessageEvent(
event_id=event_id,
session_key=session_key,
session_label=session_label,
role=role,
model=model,
content_preview=preview,
content_truncated=truncated,
has_tool_use=bool(tool_names),
tool_names=tool_names,
timestamp=ts,
agent_id=agent_id,
board_id=board_id,
message_index=index,
)
# ---------------------------------------------------------------------------
# Gateway data fetching
# ---------------------------------------------------------------------------
async def _safe_chat_history(
session_key: str,
config: GatewayConfig,
limit: int = HISTORY_FETCH_LIMIT,
) -> list[dict[str, Any]]:
"""Fetch chat history for one session; return [] on any error."""
try:
raw = await get_chat_history(session_key, config, limit=limit)
if isinstance(raw, dict):
messages = raw.get("messages") or raw.get("history") or []
elif isinstance(raw, list):
messages = raw
else:
return []
return [m for m in messages if isinstance(m, dict)]
except (OpenClawGatewayError, TimeoutError, OSError, RuntimeError) as exc:
logger.debug(
"runtime_activity.history_fetch_failed session_key=%s error=%s",
session_key,
exc,
)
return []
async def _list_active_sessions(config: GatewayConfig) -> list[dict[str, Any]]:
"""Return list of active session dicts from the gateway."""
try:
raw = await openclaw_call("sessions.list", {"limit": 50}, config=config)
if isinstance(raw, dict):
return [s for s in (raw.get("sessions") or []) if isinstance(s, dict)]
if isinstance(raw, list):
return [s for s in raw if isinstance(s, dict)]
except (OpenClawGatewayError, TimeoutError, OSError, RuntimeError) as exc:
logger.debug("runtime_activity.sessions_list_failed error=%s", exc)
return []
# ---------------------------------------------------------------------------
# Main poll function
# ---------------------------------------------------------------------------
async def fetch_recent_events(
config: GatewayConfig,
*,
since_ids: set[str] | None = None,
max_sessions: int = POLL_HISTORY_SESSIONS_MAX,
history_limit: int = HISTORY_FETCH_LIMIT,
) -> list[RuntimeMessageEvent]:
"""Fetch and normalize recent messages across all active sessions.
Args:
config: Gateway credentials/URL.
since_ids: Set of event_ids already seen; new events not in this set
are returned. Pass ``None`` for the initial load (returns all).
max_sessions: Cap on how many sessions to query per call.
history_limit: Number of messages to fetch per session.
Returns:
List of new RuntimeMessageEvent objects, oldest-first.
"""
sessions = await _list_active_sessions(config)
sessions = sessions[:max_sessions]
events: list[RuntimeMessageEvent] = []
for session in sessions:
key = session.get("key") or session.get("id")
if not isinstance(key, str) or not key.strip():
continue
label = session.get("label") or session.get("name") or None
messages = await _safe_chat_history(key, config, limit=history_limit)
for idx, msg in enumerate(messages):
event = normalize_message(key, label, msg, idx)
if since_ids is None or event.event_id not in since_ids:
events.append(event)
# Sort by timestamp (nulls last), then by session_key + index for stability
def sort_key(e: RuntimeMessageEvent) -> tuple:
return (
e.timestamp or datetime.min,
e.session_key,
e.message_index,
)
events.sort(key=sort_key)
return events

View File

@ -0,0 +1,306 @@
# ruff: noqa: INP001
"""Unit tests for runtime_activity service helpers.
All tests are pure-Python no gateway connection required.
"""
from __future__ import annotations
from datetime import datetime
import pytest
from app.services.openclaw.runtime_activity import (
RuntimeMessageEvent,
_collect_tool_names,
_correlate_session,
_extract_text,
_parse_timestamp,
normalize_message,
redact_tool_args,
)
# ---------------------------------------------------------------------------
# _extract_text
# ---------------------------------------------------------------------------
class TestExtractText:
def test_string_content_short(self):
text, truncated = _extract_text("Hello world")
assert text == "Hello world"
assert not truncated
def test_string_content_truncated(self):
long_text = "x" * 400
text, truncated = _extract_text(long_text)
assert len(text) == 300
assert truncated
def test_none_content(self):
text, truncated = _extract_text(None)
assert text == ""
assert not truncated
def test_list_text_block(self):
content = [{"type": "text", "text": "The answer is 42."}]
text, truncated = _extract_text(content)
assert "42" in text
assert not truncated
def test_list_tool_use_block(self):
content = [
{"type": "text", "text": "I'll run a command."},
{"type": "tool_use", "name": "bash", "input": {"command": "ls -la"}},
]
text, _ = _extract_text(content)
assert "run a command" in text
assert "[tool: bash]" in text
def test_list_tool_result_string(self):
content = [
{"type": "tool_result", "tool_use_id": "x", "content": "file1.txt\nfile2.txt"},
]
text, _ = _extract_text(content)
assert "[result:" in text
def test_list_multiple_text_blocks(self):
content = [
{"type": "text", "text": "First."},
{"type": "text", "text": "Second."},
]
text, _ = _extract_text(content)
assert "First" in text
assert "Second" in text
def test_empty_list(self):
text, truncated = _extract_text([])
assert text == ""
assert not truncated
def test_non_dict_blocks_skipped(self):
content = ["plain string", {"type": "text", "text": "valid"}]
text, _ = _extract_text(content)
assert "valid" in text
# ---------------------------------------------------------------------------
# redact_tool_args
# ---------------------------------------------------------------------------
class TestRedactToolArgs:
def test_no_sensitive_keys(self):
args = {"command": "ls -la", "cwd": "/home"}
result = redact_tool_args(args)
assert result["command"] == "ls -la"
assert result["cwd"] == "/home"
def test_password_redacted(self):
result = redact_tool_args({"password": "secret123", "user": "admin"})
assert result["password"] == "[REDACTED]"
assert result["user"] == "admin"
def test_token_redacted(self):
result = redact_tool_args({"token": "sk-abc123", "action": "read"})
assert result["token"] == "[REDACTED]"
def test_api_key_redacted(self):
result = redact_tool_args({"api_key": "abc", "model": "gpt-4"})
assert result["api_key"] == "[REDACTED]"
def test_case_insensitive(self):
result = redact_tool_args({"PASSWORD": "x", "Secret": "y"})
assert result["PASSWORD"] == "[REDACTED]"
assert result["Secret"] == "[REDACTED]"
def test_long_string_truncated(self):
result = redact_tool_args({"content": "A" * 1000})
assert len(result["content"]) < 1000
assert "truncated" in result["content"]
def test_empty_dict(self):
assert redact_tool_args({}) == {}
def test_non_dict_returns_empty(self):
assert redact_tool_args("bad") == {} # type: ignore[arg-type]
def test_credentials_key_redacted(self):
result = redact_tool_args({"credentials": "AKIA...", "region": "us-east-1"})
assert result["credentials"] == "[REDACTED]"
# ---------------------------------------------------------------------------
# _collect_tool_names
# ---------------------------------------------------------------------------
class TestCollectToolNames:
def test_no_tool_use(self):
assert _collect_tool_names("plain text") == []
def test_single_tool_use(self):
content = [{"type": "tool_use", "name": "bash", "input": {}}]
assert _collect_tool_names(content) == ["bash"]
def test_multiple_tool_uses(self):
content = [
{"type": "text", "text": "hi"},
{"type": "tool_use", "name": "read_file", "input": {}},
{"type": "tool_use", "name": "bash", "input": {}},
]
names = _collect_tool_names(content)
assert names == ["read_file", "bash"]
def test_no_name_fallback(self):
content = [{"type": "tool_use"}]
assert _collect_tool_names(content) == ["unknown"]
# ---------------------------------------------------------------------------
# _parse_timestamp
# ---------------------------------------------------------------------------
class TestParseTimestamp:
def test_iso_zulu(self):
msg = {"timestamp": "2026-05-21T10:00:00Z"}
ts = _parse_timestamp(msg)
assert isinstance(ts, datetime)
assert ts.year == 2026
assert ts.hour == 10
def test_iso_with_offset(self):
msg = {"timestamp": "2026-05-21T12:00:00+02:00"}
ts = _parse_timestamp(msg)
assert ts is not None
assert ts.hour == 10 # converted to UTC
def test_created_at_fallback(self):
msg = {"created_at": "2026-05-21T09:30:00Z"}
ts = _parse_timestamp(msg)
assert ts is not None
def test_no_timestamp_returns_none(self):
assert _parse_timestamp({}) is None
def test_malformed_returns_none(self):
assert _parse_timestamp({"timestamp": "not-a-date"}) is None
# ---------------------------------------------------------------------------
# _correlate_session
# ---------------------------------------------------------------------------
class TestCorrelateSession:
def test_lead_session_extracts_board_id(self):
board_id = "d8ec2aa9-fa86-4ab2-9f17-6518ccd600df"
agent_slug, bid = _correlate_session(f"agent:lead-{board_id}:main")
assert agent_slug is None
assert bid == board_id
def test_agent_session_extracts_slug(self):
slug, bid = _correlate_session("agent:my-agent:main")
assert slug == "my-agent"
assert bid is None
def test_unknown_format_returns_nones(self):
slug, bid = _correlate_session("some-random-key")
assert slug is None
assert bid is None
def test_empty_returns_nones(self):
slug, bid = _correlate_session("")
assert slug is None
assert bid is None
# ---------------------------------------------------------------------------
# normalize_message
# ---------------------------------------------------------------------------
class TestNormalizeMessage:
def _make_msg(self, **kwargs) -> dict:
return {
"role": "assistant",
"content": "Hello!",
"model": "claude-sonnet-4-6",
**kwargs,
}
def test_basic_assistant_message(self):
msg = self._make_msg()
event = normalize_message("agent:test:main", "Test", msg, 0)
assert isinstance(event, RuntimeMessageEvent)
assert event.role == "assistant"
assert event.model == "claude-sonnet-4-6"
assert event.content_preview == "Hello!"
assert not event.content_truncated
assert not event.has_tool_use
assert event.session_key == "agent:test:main"
assert event.session_label == "Test"
assert event.message_index == 0
def test_user_message(self):
msg = {"role": "user", "content": "What time is it?"}
event = normalize_message("session:1", None, msg, 1)
assert event.role == "user"
assert event.model is None
assert event.session_label is None
def test_tool_use_detected(self):
msg = {
"role": "assistant",
"content": [
{"type": "text", "text": "Running command."},
{"type": "tool_use", "name": "bash", "input": {"command": "ls"}},
],
}
event = normalize_message("s", None, msg, 0)
assert event.has_tool_use
assert "bash" in event.tool_names
def test_long_content_truncated(self):
long = "w" * 400
msg = self._make_msg(content=long)
event = normalize_message("s", None, msg, 0)
assert event.content_truncated
assert len(event.content_preview) == 300
def test_event_id_is_stable(self):
msg = self._make_msg()
e1 = normalize_message("s", None, msg, 0)
e2 = normalize_message("s", None, msg, 0)
assert e1.event_id == e2.event_id
def test_event_id_differs_by_index(self):
msg = self._make_msg()
e1 = normalize_message("s", None, msg, 0)
e2 = normalize_message("s", None, msg, 1)
assert e1.event_id != e2.event_id
def test_board_id_from_lead_session(self):
board_id = "d8ec2aa9-fa86-4ab2-9f17-6518ccd600df"
msg = self._make_msg()
event = normalize_message(f"agent:lead-{board_id}:main", None, msg, 0)
assert event.board_id == board_id
assert event.agent_id is None
def test_to_dict_serialisable(self):
msg = self._make_msg()
event = normalize_message("s", "Label", msg, 0)
d = event.to_dict()
assert d["role"] == "assistant"
assert d["session_label"] == "Label"
assert isinstance(d["tool_names"], list)
# timestamp is None here since no timestamp in msg
assert d["timestamp"] is None
def test_timestamp_parsed(self):
msg = self._make_msg(timestamp="2026-05-21T10:00:00Z")
event = normalize_message("s", None, msg, 0)
assert event.timestamp is not None
assert event.timestamp.hour == 10

View File

@ -42,6 +42,7 @@ import type {
ListGatewaySessionsApiV1GatewaysSessionsGetParams,
ListGatewaysApiV1GatewaysGetParams,
OkResponse,
ProviderUsageResponse,
RuntimeUsageResponse,
SendGatewaySessionMessageApiV1GatewaysSessionsSessionIdMessagePostParams,
SyncGatewayTemplatesApiV1GatewaysGatewayIdTemplatesSyncPostParams,
@ -3161,3 +3162,289 @@ export function useGetGatewayRuntimeUsageApiV1GatewaysGatewayIdRuntimeUsageGet<
return { ...query, queryKey: queryOptions.queryKey };
}
/**
* Return provider-native subscription usage data scraped from the CLI (e.g. ``claude /usage``). Returns an empty results list when USAGE_SCRAPER_ENABLED=false (the default). Enable with USAGE_SCRAPER_ENABLED=true and ensure the required prerequisites (tmux, claude binary) are accessible from the backend process.
* @summary Gateway provider-native usage (opt-in scraper)
*/
export type getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponse200 =
{
data: ProviderUsageResponse;
status: 200;
};
export type getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponse422 =
{
data: HTTPValidationError;
status: 422;
};
export type getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponseSuccess =
getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponse200 & {
headers: Headers;
};
export type getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponseError =
getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponse422 & {
headers: Headers;
};
export type getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponse =
| getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponseSuccess
| getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponseError;
export const getGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetUrl =
(gatewayId: string) => {
return `/api/v1/gateways/${gatewayId}/provider-usage`;
};
export const getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet =
async (
gatewayId: string,
options?: RequestInit,
): Promise<getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponse> => {
return customFetch<getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetResponse>(
getGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetUrl(
gatewayId,
),
{
...options,
method: "GET",
},
);
};
export const getGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetQueryKey =
(gatewayId: string) => {
return [`/api/v1/gateways/${gatewayId}/provider-usage`] as const;
};
export const getGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetQueryOptions =
<
TData = Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError = HTTPValidationError,
>(
gatewayId: string,
options?: {
query?: Partial<
UseQueryOptions<
Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError,
TData
>
>;
request?: SecondParameter<typeof customFetch>;
},
) => {
const { query: queryOptions, request: requestOptions } = options ?? {};
const queryKey =
queryOptions?.queryKey ??
getGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetQueryKey(
gatewayId,
);
const queryFn: QueryFunction<
Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>
> = ({ signal }) =>
getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet(gatewayId, {
signal,
...requestOptions,
});
return {
queryKey,
queryFn,
enabled: !!gatewayId,
...queryOptions,
} as UseQueryOptions<
Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError,
TData
> & { queryKey: DataTag<QueryKey, TData, TError> };
};
export type GetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetQueryResult =
NonNullable<
Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>
>;
export type GetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetQueryError =
HTTPValidationError;
export function useGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet<
TData = Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError = HTTPValidationError,
>(
gatewayId: string,
options: {
query: Partial<
UseQueryOptions<
Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError,
TData
>
> &
Pick<
DefinedInitialDataOptions<
Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError,
Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>
>,
"initialData"
>;
request?: SecondParameter<typeof customFetch>;
},
queryClient?: QueryClient,
): DefinedUseQueryResult<TData, TError> & {
queryKey: DataTag<QueryKey, TData, TError>;
};
export function useGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet<
TData = Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError = HTTPValidationError,
>(
gatewayId: string,
options?: {
query?: Partial<
UseQueryOptions<
Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError,
TData
>
> &
Pick<
UndefinedInitialDataOptions<
Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError,
Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>
>,
"initialData"
>;
request?: SecondParameter<typeof customFetch>;
},
queryClient?: QueryClient,
): UseQueryResult<TData, TError> & {
queryKey: DataTag<QueryKey, TData, TError>;
};
export function useGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet<
TData = Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError = HTTPValidationError,
>(
gatewayId: string,
options?: {
query?: Partial<
UseQueryOptions<
Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError,
TData
>
>;
request?: SecondParameter<typeof customFetch>;
},
queryClient?: QueryClient,
): UseQueryResult<TData, TError> & {
queryKey: DataTag<QueryKey, TData, TError>;
};
/**
* @summary Gateway provider-native usage (opt-in scraper)
*/
export function useGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet<
TData = Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError = HTTPValidationError,
>(
gatewayId: string,
options?: {
query?: Partial<
UseQueryOptions<
Awaited<
ReturnType<
typeof getGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGet
>
>,
TError,
TData
>
>;
request?: SecondParameter<typeof customFetch>;
},
queryClient?: QueryClient,
): UseQueryResult<TData, TError> & {
queryKey: DataTag<QueryKey, TData, TError>;
} {
const queryOptions =
getGetGatewayProviderUsageApiV1GatewaysGatewayIdProviderUsageGetQueryOptions(
gatewayId,
options,
);
const query = useQuery(queryOptions, queryClient) as UseQueryResult<
TData,
TError
> & { queryKey: DataTag<QueryKey, TData, TError> };
return { ...query, queryKey: queryOptions.queryKey };
}

View File

@ -225,6 +225,8 @@ export * from "./organizationMemberRead";
export * from "./organizationMemberUpdate";
export * from "./organizationRead";
export * from "./organizationUserRead";
export * from "./providerUsageResponse";
export * from "./providerUsageScrapeResult";
export * from "./readyzReadyzGet200";
export * from "./runtimeUsageBurnRate";
export * from "./runtimeUsageCurrent";

View File

@ -0,0 +1,17 @@
/**
* Generated by orval v8.3.0 🍺
* Do not edit manually.
* Mission Control API
* OpenAPI spec version: 0.1.0
*/
import type { ProviderUsageScrapeResult } from "./providerUsageScrapeResult";
/**
* Response envelope for GET /gateways/{id}/provider-usage.
*/
export interface ProviderUsageResponse {
gateway_id: string;
generated_at: string;
scraper_enabled: boolean;
results: ProviderUsageScrapeResult[];
}

View File

@ -0,0 +1,30 @@
/**
* Generated by orval v8.3.0 🍺
* Do not edit manually.
* Mission Control API
* OpenAPI spec version: 0.1.0
*/
/**
* Structured result from one provider-native usage scrape (e.g. Claude CLI /usage).
Returned by GET /gateways/{id}/provider-usage.
All fields are optional partial data is still useful and expected
when CLI output format changes or the session is quiet.
*/
export interface ProviderUsageScrapeResult {
provider: string;
source_name: string;
scraped_at: string;
fresh: boolean;
freshness_ttl_seconds: number;
current_pct?: number | null;
remaining_ms?: number | null;
remaining_label?: string | null;
weekly_messages_used?: number | null;
weekly_messages_limit?: number | null;
weekly_tokens_used?: number | null;
weekly_cost_usd?: number | null;
raw_text?: string | null;
error?: string | null;
}

View File

@ -32,6 +32,10 @@ import type {
} from "@/api/generated/model";
import { Markdown } from "@/components/atoms/Markdown";
import { ActivityFeed } from "@/components/activity/ActivityFeed";
import { RuntimeActivityFeed } from "@/components/activity/RuntimeActivityFeed";
import {
listGatewaysApiV1GatewaysGet,
} from "@/api/generated/gateways/gateways";
import { SignedOutPanel } from "@/components/auth/SignedOutPanel";
import { DashboardSidebar } from "@/components/organisms/DashboardSidebar";
import { DashboardShell } from "@/components/templates/DashboardShell";
@ -391,6 +395,23 @@ export default function ActivityPage() {
const { isSignedIn } = useAuth();
const searchParams = useSearchParams();
const isPageActive = usePageActive();
const activeTab = searchParams.get("tab") === "runtime" ? "runtime" : "feed";
// Gateways — loaded once for the Runtime tab
const [runtimeGateways, setRuntimeGateways] = useState<{ id: string; name: string }[]>([]);
useEffect(() => {
if (!isSignedIn) return;
listGatewaysApiV1GatewaysGet({ limit: 50, offset: 0 })
.then((res) => {
if (res.status === 200) {
setRuntimeGateways(
(res.data.items ?? []).map((gw) => ({ id: gw.id, name: gw.name })),
);
}
})
.catch(() => {/* gateways unavailable — runtime tab shows empty state */});
}, [isSignedIn]);
const selectedEventId = useMemo(() => {
const value = searchParams.get("eventId");
if (!value) return null;
@ -1514,19 +1535,45 @@ export default function ActivityPage() {
<div className="flex items-center gap-2">
<ActivityIcon className="h-5 w-5 text-muted-foreground" />
<h1 className="text-2xl font-semibold tracking-tight text-foreground">
Live feed
{activeTab === "runtime" ? "Runtime Activity" : "Live feed"}
</h1>
</div>
<p className="mt-1 text-sm text-muted-foreground">
Realtime task, approval, agent, and board-chat activity
across all boards.
{activeTab === "runtime"
? "Live session messages from gateway runtime."
: "Realtime task, approval, agent, and board-chat activity across all boards."}
</p>
</div>
</div>
{/* Tab bar */}
<div className="mt-3 flex gap-1 border-t border-border pt-3">
{(
[
{ key: "feed", label: "Board Feed" },
{ key: "runtime", label: "Runtime" },
] as const
).map(({ key, label }) => (
<Link
key={key}
href={key === "feed" ? "/activity" : `/activity?tab=${key}`}
className={`rounded-md px-3 py-1.5 text-sm font-medium transition ${
activeTab === key
? "bg-primary text-primary-foreground"
: "text-muted-foreground hover:bg-muted hover:text-foreground"
}`}
>
{label}
</Link>
))}
</div>
</div>
</div>
<div className="p-4 md:p-8">
{activeTab === "runtime" ? (
<RuntimeActivityFeed gateways={runtimeGateways} />
) : (
<>
{hasUnresolvedDeepLink ? (
<div className="mb-4 rounded-lg border border-amber-300 bg-amber-50 p-3 text-sm text-amber-800">
Requested activity item is not in the current feed window yet.
@ -1544,6 +1591,8 @@ export default function ActivityPage() {
/>
)}
/>
</>
)}
</div>
</main>
</SignedIn>

View File

@ -0,0 +1,275 @@
"use client";
import { useCallback, useEffect, useRef, useState } from "react";
import { AlertCircle, Bot, ChevronDown, Cpu, User, Wrench } from "lucide-react";
import { useAuth } from "@/auth/clerk";
import { getLocalAuthToken, isLocalAuthMode } from "@/auth/localAuth";
import { getApiBaseUrl } from "@/lib/api-base";
import { formatRelativeTimestamp } from "@/lib/formatters";
interface RuntimeMessage {
event_id: string;
session_key: string;
session_label: string | null;
role: string;
model: string | null;
content_preview: string;
content_truncated: boolean;
has_tool_use: boolean;
tool_names: string[];
timestamp: string | null;
agent_id: string | null;
board_id: string | null;
}
interface Gateway {
id: string;
name: string;
}
interface RuntimeActivityFeedProps {
gateways: Gateway[];
}
// ---------------------------------------------------------------------------
// Role colours + icons
// ---------------------------------------------------------------------------
const ROLE_STYLE: Record<string, { label: string; badge: string; icon: React.ElementType }> = {
user: { label: "User", badge: "bg-[color:var(--accent-soft)] text-[color:var(--accent)]", icon: User },
assistant: { label: "Assistant", badge: "bg-[color:rgba(52,211,153,0.15)] text-[color:var(--success)]", icon: Bot },
tool: { label: "Tool", badge: "bg-[color:rgba(251,191,36,0.15)] text-[color:var(--warning)]", icon: Wrench},
};
const DEFAULT_ROLE = { label: "System", badge: "bg-[color:var(--surface-strong)] text-muted", icon: Cpu };
function roleMeta(role: string) {
return ROLE_STYLE[role.toLowerCase()] ?? DEFAULT_ROLE;
}
// ---------------------------------------------------------------------------
// SSE connection helper
// ---------------------------------------------------------------------------
async function getAuthHeaders(): Promise<Record<string, string>> {
const headers: Record<string, string> = { Accept: "text/event-stream" };
if (isLocalAuthMode()) {
const token = getLocalAuthToken();
if (token) headers.Authorization = `Bearer ${token}`;
} else {
try {
const clerk = (window as unknown as { Clerk?: { session?: { getToken: () => Promise<string> } } }).Clerk;
if (clerk?.session) {
const token = await clerk.session.getToken();
if (token) headers.Authorization = `Bearer ${token}`;
}
} catch {
// Clerk not ready
}
}
return headers;
}
// ---------------------------------------------------------------------------
// Main component
// ---------------------------------------------------------------------------
export function RuntimeActivityFeed({ gateways }: RuntimeActivityFeedProps) {
const { isSignedIn } = useAuth();
const [gatewayId, setGatewayId] = useState<string>(() => gateways[0]?.id ?? "");
const [messages, setMessages] = useState<RuntimeMessage[]>([]);
const [connected, setConnected] = useState(false);
const [error, setError] = useState<string | null>(null);
const cancelledRef = useRef(false);
const seenIdsRef = useRef(new Set<string>());
const connect = useCallback(async () => {
if (!gatewayId || !isSignedIn) return;
cancelledRef.current = false;
setError(null);
const baseUrl = getApiBaseUrl();
const url = `${baseUrl}/api/v1/gateways/${gatewayId}/runtime-activity/stream`;
const headers = await getAuthHeaders();
try {
const response = await fetch(url, { headers });
if (!response.ok) {
setError(`Gateway returned ${response.status}`);
return;
}
if (!response.body) {
setError("Stream unavailable");
return;
}
setConnected(true);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (!cancelledRef.current) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
buffer = buffer.replace(/\r\n/g, "\n");
let boundary = buffer.indexOf("\n\n");
while (boundary !== -1) {
const raw = buffer.slice(0, boundary);
buffer = buffer.slice(boundary + 2);
let eventType = "";
let data = "";
for (const line of raw.split("\n")) {
if (line.startsWith("event:")) eventType = line.slice(6).trim();
else if (line.startsWith("data:")) data += line.slice(5).trim();
}
if (eventType === "runtime_message" && data) {
try {
const msg = JSON.parse(data) as RuntimeMessage;
if (!seenIdsRef.current.has(msg.event_id)) {
seenIdsRef.current.add(msg.event_id);
setMessages((prev) => [msg, ...prev].slice(0, 300));
}
} catch {
// skip malformed event
}
}
boundary = buffer.indexOf("\n\n");
}
}
} catch (err) {
if (!cancelledRef.current) {
setError(err instanceof Error ? err.message : "Connection lost");
setConnected(false);
}
}
}, [gatewayId, isSignedIn]);
useEffect(() => {
setMessages([]);
seenIdsRef.current = new Set();
setConnected(false);
connect();
return () => {
cancelledRef.current = true;
};
}, [connect]);
if (!gateways.length) {
return (
<div className="flex flex-col items-center gap-3 rounded-xl border border-dashed border-border bg-muted/50 p-12 text-center">
<Cpu className="h-10 w-10 text-muted-foreground" />
<p className="font-medium text-foreground">No gateways configured</p>
<p className="text-sm text-muted-foreground">
Configure a gateway on a board to see its live activity here.
</p>
</div>
);
}
return (
<div className="flex flex-col gap-4">
{/* Gateway picker + status */}
<div className="flex flex-wrap items-center justify-between gap-3">
<div className="flex items-center gap-2">
<div
className={`h-2 w-2 rounded-full ${
connected
? "bg-[color:var(--success)]"
: "bg-[color:var(--border-strong)]"
}`}
/>
<span className="text-sm text-muted-foreground">
{connected ? "Live" : "Connecting…"}
</span>
</div>
{gateways.length > 1 && (
<div className="relative">
<select
value={gatewayId}
onChange={(e) => setGatewayId(e.target.value)}
className="appearance-none rounded-lg border border-border bg-card px-3 py-1.5 pr-8 text-sm text-foreground focus:outline-none focus:ring-2 focus:ring-ring"
>
{gateways.map((gw) => (
<option key={gw.id} value={gw.id}>
{gw.name}
</option>
))}
</select>
<ChevronDown className="pointer-events-none absolute right-2 top-1/2 h-4 w-4 -translate-y-1/2 text-muted-foreground" />
</div>
)}
</div>
{error && (
<div className="flex items-start gap-2 rounded-lg border border-destructive/40 bg-destructive/10 px-3 py-2 text-sm text-destructive">
<AlertCircle className="mt-0.5 h-4 w-4 shrink-0" />
<span>{error}</span>
</div>
)}
{/* Feed */}
<div className="space-y-2">
{messages.length === 0 && connected && (
<p className="py-8 text-center text-sm text-muted-foreground">
Waiting for session activity
</p>
)}
{messages.map((msg) => {
const meta = roleMeta(msg.role);
const Icon = meta.icon;
const shortModel = msg.model?.includes("/")
? msg.model.split("/")[1]
: msg.model;
return (
<div
key={msg.event_id}
className="rounded-lg border border-border bg-card px-4 py-3 shadow-sm"
>
<div className="flex flex-wrap items-start gap-2">
<span
className={`inline-flex items-center gap-1 rounded-full px-2 py-0.5 text-xs font-medium ${meta.badge}`}
>
<Icon className="h-3 w-3" />
{meta.label}
</span>
{msg.session_label && (
<span className="text-xs text-muted-foreground">
{msg.session_label}
</span>
)}
{shortModel && (
<span className="rounded-full bg-muted px-2 py-0.5 text-xs text-muted-foreground">
{shortModel}
</span>
)}
{msg.has_tool_use && msg.tool_names.length > 0 && (
<span className="rounded-full bg-[color:rgba(251,191,36,0.10)] px-2 py-0.5 text-xs text-[color:var(--warning)]">
{msg.tool_names.join(", ")}
</span>
)}
<span className="ml-auto text-xs text-muted-foreground">
{msg.timestamp
? formatRelativeTimestamp(msg.timestamp)
: null}
</span>
</div>
{msg.content_preview && (
<p className="mt-2 line-clamp-3 text-sm text-foreground/80">
{msg.content_preview}
{msg.content_truncated && (
<span className="ml-1 text-xs text-muted-foreground">
[truncated]
</span>
)}
</p>
)}
{msg.board_id && (
<p className="mt-1 text-xs text-muted-foreground">
board {msg.board_id.slice(0, 8)}
</p>
)}
</div>
);
})}
</div>
</div>
);
}