From 90a4abde30b4dd8cc6e57b3c272402cb1faaea44 Mon Sep 17 00:00:00 2001 From: null Date: Mon, 25 May 2026 12:35:00 -0500 Subject: [PATCH] Runtime gateway sessions (Claude CLI, Codex, GPT, Ollama) are fetched via fetch_recent_events for all org gateways, --- backend/app/api/activity.py | 80 ++++++++++++++++++- .../organisms/AgentActivityTicker.tsx | 3 +- 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/backend/app/api/activity.py b/backend/app/api/activity.py index 8f992cc..fefda1a 100644 --- a/backend/app/api/activity.py +++ b/backend/app/api/activity.py @@ -7,7 +7,7 @@ import json from collections import deque from datetime import UTC, datetime, timedelta from typing import TYPE_CHECKING, Any -from uuid import UUID +from uuid import UUID, uuid4 from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from sqlalchemy import and_, asc, desc, func, or_ @@ -21,6 +21,7 @@ from app.db.session import async_session_maker, get_session from app.models.activity_events import ActivityEvent from app.models.agents import Agent from app.models.boards import Board +from app.models.gateways import Gateway from app.models.tasks import Task from app.schemas.activity_events import ActivityEventRead, ActivityTaskCommentFeedItemRead, ActivityTickerItem from app.schemas.pagination import DefaultLimitOffsetPage @@ -249,6 +250,68 @@ def _ticker_source(event: ActivityEvent, agent: Agent | None) -> str: return " ".join(p.capitalize() for p in parts) +def _runtime_source(session_key: str, session_label: str | None, model: str | None) -> str: + if session_label: + return session_label + if model: + m = model.lower() + if "claude" in m: + return "Claude" + if "codex" in m: + return "Codex" + if "gpt" in m or "openai" in m: + return "GPT" + if "gemini" in m or "google" in m: + return "Gemini" + return model + parts = session_key.split(":") + if len(parts) >= 2: + return parts[1].replace("-", " ").replace("_", " ").title() + return "Session" + + +async def _fetch_runtime_ticker_items( + gateways: list[Gateway], + cutoff: datetime, +) -> list[ActivityTickerItem]: + from app.services.openclaw.gateway_rpc import GatewayConfig # noqa: PLC0415 + from app.services.openclaw.runtime_activity import fetch_recent_events # noqa: PLC0415 + + items: list[ActivityTickerItem] = [] + for gw in gateways: + try: + config = GatewayConfig( + url=gw.url, + token=gw.token, + allow_insecure_tls=gw.allow_insecure_tls, + disable_device_pairing=gw.disable_device_pairing, + ) + events = await asyncio.wait_for( + fetch_recent_events(config, max_sessions=10, history_limit=15), + timeout=3.0, + ) + for ev in events: + if ev.role != "assistant": + continue + msg = ev.content_preview.strip() + if not msg: + continue + ts = ev.timestamp + if ts is None or ts < cutoff: + continue + items.append( + ActivityTickerItem( + id=uuid4(), + source=_runtime_source(ev.session_key, ev.session_label, ev.model), + message=msg[:200], + created_at=ts, + ) + ) + except Exception: # noqa: BLE001 + pass + return items + + @router.get("/ticker", response_model=list[ActivityTickerItem]) async def get_activity_ticker( limit: int = Query(default=20, ge=1, le=50), @@ -265,6 +328,7 @@ async def get_activity_ticker( .outerjoin(Task, col(ActivityEvent.task_id) == col(Task.id)) .where(func.length(func.trim(col(ActivityEvent.message))) > 0) .where(col(ActivityEvent.created_at) >= cutoff) + .where(col(ActivityEvent.event_type) != "agent.heartbeat") .order_by(desc(col(ActivityEvent.created_at))) .limit(limit) ) @@ -283,14 +347,14 @@ async def get_activity_ticker( statement = statement.where(col(ActivityEvent.id).is_(None)) rows = (await session.exec(statement)).all() - items: list[ActivityTickerItem] = [] + db_items: list[ActivityTickerItem] = [] for row in rows: event: ActivityEvent = row[0] agent: Agent | None = row[1] msg = (event.message or "").strip() if not msg: continue - items.append( + db_items.append( ActivityTickerItem( id=event.id, source=_ticker_source(event, agent), @@ -298,7 +362,15 @@ async def get_activity_ticker( created_at=event.created_at, ) ) - return items + + gw_rows = list((await session.exec( + select(Gateway).where(col(Gateway.organization_id) == ctx.organization.id) + )).all()) + runtime_items = await _fetch_runtime_ticker_items(gw_rows, cutoff) + + all_items = db_items + runtime_items + all_items.sort(key=lambda x: x.created_at, reverse=True) + return all_items[:limit] @router.get("", response_model=DefaultLimitOffsetPage[ActivityEventRead]) diff --git a/frontend/src/components/organisms/AgentActivityTicker.tsx b/frontend/src/components/organisms/AgentActivityTicker.tsx index 88c5067..012f2c0 100644 --- a/frontend/src/components/organisms/AgentActivityTicker.tsx +++ b/frontend/src/components/organisms/AgentActivityTicker.tsx @@ -13,7 +13,8 @@ interface TickerItem { } function fmtRelative(isoString: string): string { - const diffMs = Date.now() - new Date(isoString).getTime(); + const utc = isoString.endsWith("Z") || isoString.includes("+") ? isoString : isoString + "Z"; + const diffMs = Date.now() - new Date(utc).getTime(); const s = Math.round(diffMs / 1000); if (s < 60) return `${s}s ago`; const m = Math.floor(s / 60);