diff --git a/backend/app/api/activity.py b/backend/app/api/activity.py index dca3c96..d49fc11 100644 --- a/backend/app/api/activity.py +++ b/backend/app/api/activity.py @@ -323,6 +323,10 @@ def _parse_ts(ts: str | None) -> datetime | None: return None +_TICKER_STREAM_POLL = 2 # seconds between stream poll cycles +_TICKER_GATEWAY_INTERVAL = 30 # seconds between gateway polls in stream +_TICKER_SEEN_MAX = 2000 + _BASH_CMD_MAX = 80 _TASK_DESC_MAX = 60 @@ -480,6 +484,115 @@ def _extract_codex_ticker_items(path: Path, cutoff: datetime) -> list[ActivityTi return items +def _read_new_lines(path: Path, offsets: dict[str, int]) -> list[str]: + """Read only new bytes since the last recorded offset. Updates offsets in-place.""" + key = str(path) + offset = offsets.get(key, 0) + try: + with open(path, "rb") as fh: + fh.seek(0, 2) + size = fh.tell() + if size <= offset: + return [] + fh.seek(offset) + data = fh.read() + last_nl = data.rfind(b"\n") + if last_nl < 0: + return [] + offsets[key] = offset + last_nl + 1 + return data[: last_nl + 1].decode("utf-8", errors="replace").splitlines() + except (OSError, PermissionError): + return [] + + +def _init_offsets_to_eof(roots: list[Path], offsets: dict[str, int]) -> None: + """Seed offsets to current EOF so the stream only emits future writes.""" + for root in roots: + if not root.exists(): + continue + for path in root.rglob("*.jsonl"): + try: + offsets[str(path)] = path.stat().st_size + except (OSError, PermissionError): + pass + + +def _claude_items_from_lines( + lines: list[str], + seen_msg_ids: set[str], + cutoff: datetime | None = None, +) -> list[tuple[str, ActivityTickerItem]]: + """Return (stable_key, item) pairs for tool-use blocks in Claude JSONL lines.""" + result: list[tuple[str, ActivityTickerItem]] = [] + for line in lines: + if not line.strip(): + continue + try: + rec = json.loads(line) + except json.JSONDecodeError: + continue + if rec.get("type") != "assistant" or rec.get("isSidechain"): + continue + ts = _parse_ts(rec.get("timestamp")) + if ts is None or (cutoff and ts < cutoff): + continue + msg = rec.get("message") or {} + msg_id = msg.get("id") or "" + if msg_id and msg_id in seen_msg_ids: + continue + if msg_id: + seen_msg_ids.add(msg_id) + content = msg.get("content") if isinstance(msg.get("content"), list) else [] + tool_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "tool_use"] + for i, block in enumerate(tool_blocks): + tool_name = str(block.get("name") or "") + inp = block.get("input") or {} + if not isinstance(inp, dict): + inp = {} + label = _format_tool_status(tool_name, inp) + block_id = str(block.get("id") or f"{msg_id}:{i}") + key = f"cc:{block_id}" + result.append((key, ActivityTickerItem( + id=uuid4(), source="Claude Code", message=label, created_at=ts, + ))) + return result + + +def _codex_items_from_lines( + lines: list[str], + seen_call_ids: set[str], + cutoff: datetime | None = None, +) -> list[tuple[str, ActivityTickerItem]]: + """Return (stable_key, item) pairs for tool-call records in Codex JSONL lines.""" + result: list[tuple[str, ActivityTickerItem]] = [] + for line in lines: + if not line.strip(): + continue + try: + rec = json.loads(line) + except json.JSONDecodeError: + continue + if rec.get("type") != "response_item": + continue + ts = _parse_ts(rec.get("timestamp")) + if ts is None or (cutoff and ts < cutoff): + continue + payload = rec.get("payload") or {} + call_id = str(payload.get("call_id") or "") + if call_id and call_id in seen_call_ids: + continue + if call_id: + seen_call_ids.add(call_id) + label = _format_codex_tool(payload) + if not label: + continue + key = f"cx:{call_id or uuid4()}" + result.append((key, ActivityTickerItem( + id=uuid4(), source="Codex", message=label, created_at=ts, + ))) + return result + + def _fetch_local_session_items_sync(cutoff: datetime) -> list[ActivityTickerItem]: items: list[ActivityTickerItem] = [] @@ -583,6 +696,167 @@ async def get_activity_ticker( return all_items[:limit] +@router.get("/ticker/stream", summary="Live activity ticker (SSE)") +async def stream_activity_ticker( + request: Request, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> EventSourceResponse: + """Push ticker items as SSE events. Event type: ``ticker_item``.""" + board_ids = await list_accessible_board_ids(session, member=ctx.member, write=False) + gw_rows = list((await session.exec( + select(Gateway).where(col(Gateway.organization_id) == ctx.organization.id) + )).all()) + + claude_root_env = os.environ.get("CLAUDE_PROJECTS_PATH", "").strip() + claude_root = Path(claude_root_env) if claude_root_env else Path.home() / ".claude" / "projects" + codex_root_env = os.environ.get("CODEX_SESSIONS_PATH", "").strip() + codex_root = Path(codex_root_env) if codex_root_env else Path.home() / ".codex" / "sessions" + + async def event_generator() -> Any: + seen_keys: set[str] = set() + seen_queue: deque[str] = deque() + file_offsets: dict[str, int] = {} + seen_cc_msg_ids: set[str] = set() + seen_cx_call_ids: set[str] = set() + + def _track(key: str) -> bool: + if key in seen_keys: + return False + seen_keys.add(key) + seen_queue.append(key) + if len(seen_queue) > _TICKER_SEEN_MAX: + seen_keys.discard(seen_queue.popleft()) + return True + + def _emit(key: str, item: ActivityTickerItem) -> dict[str, str] | None: + if not _track(key): + return None + return {"event": "ticker_item", "data": item.model_dump_json()} + + # ── Initial snapshot ────────────────────────────────────────────── + cutoff_2h = utcnow() - timedelta(hours=2) + + def _build_db_stmt(since: datetime) -> Any: + stmt = ( + select(ActivityEvent, Agent) + .outerjoin(Agent, col(ActivityEvent.agent_id) == col(Agent.id)) + .outerjoin(Task, col(ActivityEvent.task_id) == col(Task.id)) + .where(func.length(func.trim(col(ActivityEvent.message))) > 0) + .where(col(ActivityEvent.created_at) >= since) + .order_by(asc(col(ActivityEvent.created_at))) + .limit(50) + ) + if board_ids: + return stmt.where(or_( + col(ActivityEvent.board_id).in_(board_ids), + and_( + col(ActivityEvent.board_id).is_(None), + col(Task.board_id).in_(board_ids), + ), + )) + return stmt.where(col(ActivityEvent.id).is_(None)) + + async with async_session_maker() as init_db: + init_rows = (await init_db.exec(_build_db_stmt(cutoff_2h))).all() + + batch: list[tuple[str, ActivityTickerItem]] = [] + for row in init_rows: + ev: ActivityEvent = row[0] + ag: Agent | None = row[1] + msg = (ev.message or "").strip() + if msg: + batch.append((f"db:{ev.id}", ActivityTickerItem( + id=ev.id, source=_ticker_source(ev, ag), + message=msg[:200], created_at=ev.created_at, + ))) + + # Initial JSONL snapshot using tail approach + def _init_local() -> list[tuple[str, ActivityTickerItem]]: + local: list[tuple[str, ActivityTickerItem]] = [] + for root, extractor in ( + (claude_root, lambda lines: _claude_items_from_lines(lines, set(), cutoff_2h)), + (codex_root, lambda lines: _codex_items_from_lines(lines, set(), cutoff_2h)), + ): + if not root.exists(): + continue + for path in root.rglob("*.jsonl"): + try: + mtime = datetime.utcfromtimestamp(path.stat().st_mtime) + if mtime >= cutoff_2h: + local.extend(extractor(_tail_lines(path))) + except (OSError, PermissionError): + pass + return local + + batch.extend(await asyncio.to_thread(_init_local)) + batch.sort(key=lambda x: x[1].created_at) + for key, item in batch: + ev_data = _emit(key, item) + if ev_data: + yield ev_data + + # Seed offsets to EOF so we only stream new writes going forward + await asyncio.to_thread(_init_offsets_to_eof, [claude_root, codex_root], file_offsets) + + last_db_ts = utcnow() + last_gateway_poll: datetime = datetime.min + + # ── Live loop ───────────────────────────────────────────────────── + while True: + if await request.is_disconnected(): + break + + await asyncio.sleep(_TICKER_STREAM_POLL) + now = utcnow() + live_batch: list[tuple[str, ActivityTickerItem]] = [] + + # DB: events since last check + async with async_session_maker() as poll_db: + poll_rows = (await poll_db.exec(_build_db_stmt(last_db_ts))).all() + last_db_ts = now + for row in poll_rows: + ev, ag = row[0], row[1] + msg = (ev.message or "").strip() + if msg: + live_batch.append((f"db:{ev.id}", ActivityTickerItem( + id=ev.id, source=_ticker_source(ev, ag), + message=msg[:200], created_at=ev.created_at, + ))) + + # Local JSONL: incremental (new bytes only) + def _poll_local() -> list[tuple[str, ActivityTickerItem]]: + items: list[tuple[str, ActivityTickerItem]] = [] + for root, extractor in ( + (claude_root, lambda lines: _claude_items_from_lines(lines, seen_cc_msg_ids)), + (codex_root, lambda lines: _codex_items_from_lines(lines, seen_cx_call_ids)), + ): + if not root.exists(): + continue + for path in root.rglob("*.jsonl"): + new_lines = _read_new_lines(path, file_offsets) + if new_lines: + items.extend(extractor(new_lines)) + return items + + live_batch.extend(await asyncio.to_thread(_poll_local)) + + # Gateway: every 30s + if (now - last_gateway_poll).total_seconds() >= _TICKER_GATEWAY_INTERVAL: + gw_items = await _fetch_runtime_ticker_items(gw_rows, cutoff_2h) + for item in gw_items: + live_batch.append((f"gw:{item.id}", item)) + last_gateway_poll = now + + live_batch.sort(key=lambda x: x[1].created_at) + for key, item in live_batch: + ev_data = _emit(key, item) + if ev_data: + yield ev_data + + return EventSourceResponse(event_generator(), ping=15) + + @router.get("", response_model=DefaultLimitOffsetPage[ActivityEventRead]) async def list_activity( session: AsyncSession = SESSION_DEP, diff --git a/frontend/src/components/organisms/AgentActivityTicker.tsx b/frontend/src/components/organisms/AgentActivityTicker.tsx index 012f2c0..21735d7 100644 --- a/frontend/src/components/organisms/AgentActivityTicker.tsx +++ b/frontend/src/components/organisms/AgentActivityTicker.tsx @@ -3,7 +3,8 @@ import { useCallback, useEffect, useRef, useState } from "react"; import { useAuth } from "@/auth/clerk"; -import { customFetch } from "@/api/mutator"; +import { getLocalAuthToken } from "@/auth/localAuth"; +import { getApiBaseUrl } from "@/lib/api-base"; interface TickerItem { id: string; @@ -24,41 +25,103 @@ function fmtRelative(isoString: string): string { return `${Math.floor(h / 24)}d ago`; } -async function fetchTickerItems(limit = 20): Promise { - const res = await customFetch<{ data: TickerItem[]; status: number }>( - `/api/v1/activity/ticker?limit=${limit}`, - { method: "GET" }, - ); - if (res.status === 200) return res.data; - return []; +async function getAuthHeaders(): Promise> { + const headers: Record = {}; + const local = getLocalAuthToken(); + if (local) { + headers.Authorization = `Bearer ${local}`; + return headers; + } + try { + const clerk = (window as unknown as { Clerk?: { session?: { getToken: () => Promise } } }).Clerk; + if (clerk?.session) { + const token = await clerk.session.getToken(); + if (token) headers.Authorization = `Bearer ${token}`; + } + } catch {} + return headers; } +const MAX_ITEMS = 40; +const RECONNECT_DELAY_MS = 3_000; + export function AgentActivityTicker() { const { isSignedIn } = useAuth(); const [items, setItems] = useState([]); - const intervalRef = useRef | null>(null); + const cancelledRef = useRef(false); + const seenRef = useRef(new Set()); - const load = useCallback(async () => { + const connect = useCallback(async () => { + if (!isSignedIn) return; + cancelledRef.current = false; + + const url = `${getApiBaseUrl()}/api/v1/activity/ticker/stream`; + let headers: Record; try { - const data = await fetchTickerItems(20); - if (data.length > 0) setItems(data); + headers = await getAuthHeaders(); } catch { - // Silent — ticker is non-critical + return; } - }, []); + + try { + const res = await fetch(url, { headers }); + if (!res.ok || !res.body) return; + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buf = ""; + + while (!cancelledRef.current) { + const { value, done } = await reader.read(); + if (done) break; + buf += decoder.decode(value, { stream: true }); + buf = buf.replace(/\r\n/g, "\n"); + let boundary = buf.indexOf("\n\n"); + while (boundary !== -1) { + const raw = buf.slice(0, boundary); + buf = buf.slice(boundary + 2); + let eventType = ""; + let data = ""; + for (const line of raw.split("\n")) { + if (line.startsWith("event:")) eventType = line.slice(6).trim(); + else if (line.startsWith("data:")) data += line.slice(5).trim(); + } + if (eventType === "ticker_item" && data) { + try { + const item = JSON.parse(data) as TickerItem; + if (!seenRef.current.has(item.id)) { + seenRef.current.add(item.id); + setItems((prev) => [item, ...prev].slice(0, MAX_ITEMS)); + } + } catch {} + } + boundary = buf.indexOf("\n\n"); + } + } + } catch { + // fall through to reconnect + } + + // Reconnect unless the component unmounted + if (!cancelledRef.current) { + setTimeout(() => { + if (!cancelledRef.current) connect(); + }, RECONNECT_DELAY_MS); + } + }, [isSignedIn]); useEffect(() => { - if (!isSignedIn) return; - void load(); - intervalRef.current = setInterval(() => void load(), 30_000); + cancelledRef.current = false; + seenRef.current = new Set(); + setItems([]); + void connect(); return () => { - if (intervalRef.current) clearInterval(intervalRef.current); + cancelledRef.current = true; }; - }, [isSignedIn, load]); + }, [connect]); if (items.length === 0) return null; - // Duplicate items for a seamless loop (animate-ticker moves -50%) const display = [...items, ...items]; return (