Replaced 30s setInterval with a persistent fetch + ReadableStream SSE consumer

This commit is contained in:
null 2026-05-25 13:33:04 -05:00
parent b893941d71
commit ea113fcacb
2 changed files with 357 additions and 20 deletions

View File

@ -323,6 +323,10 @@ def _parse_ts(ts: str | None) -> datetime | None:
return None return None
_TICKER_STREAM_POLL = 2 # seconds between stream poll cycles
_TICKER_GATEWAY_INTERVAL = 30 # seconds between gateway polls in stream
_TICKER_SEEN_MAX = 2000
_BASH_CMD_MAX = 80 _BASH_CMD_MAX = 80
_TASK_DESC_MAX = 60 _TASK_DESC_MAX = 60
@ -480,6 +484,115 @@ def _extract_codex_ticker_items(path: Path, cutoff: datetime) -> list[ActivityTi
return items return items
def _read_new_lines(path: Path, offsets: dict[str, int]) -> list[str]:
"""Read only new bytes since the last recorded offset. Updates offsets in-place."""
key = str(path)
offset = offsets.get(key, 0)
try:
with open(path, "rb") as fh:
fh.seek(0, 2)
size = fh.tell()
if size <= offset:
return []
fh.seek(offset)
data = fh.read()
last_nl = data.rfind(b"\n")
if last_nl < 0:
return []
offsets[key] = offset + last_nl + 1
return data[: last_nl + 1].decode("utf-8", errors="replace").splitlines()
except (OSError, PermissionError):
return []
def _init_offsets_to_eof(roots: list[Path], offsets: dict[str, int]) -> None:
"""Seed offsets to current EOF so the stream only emits future writes."""
for root in roots:
if not root.exists():
continue
for path in root.rglob("*.jsonl"):
try:
offsets[str(path)] = path.stat().st_size
except (OSError, PermissionError):
pass
def _claude_items_from_lines(
lines: list[str],
seen_msg_ids: set[str],
cutoff: datetime | None = None,
) -> list[tuple[str, ActivityTickerItem]]:
"""Return (stable_key, item) pairs for tool-use blocks in Claude JSONL lines."""
result: list[tuple[str, ActivityTickerItem]] = []
for line in lines:
if not line.strip():
continue
try:
rec = json.loads(line)
except json.JSONDecodeError:
continue
if rec.get("type") != "assistant" or rec.get("isSidechain"):
continue
ts = _parse_ts(rec.get("timestamp"))
if ts is None or (cutoff and ts < cutoff):
continue
msg = rec.get("message") or {}
msg_id = msg.get("id") or ""
if msg_id and msg_id in seen_msg_ids:
continue
if msg_id:
seen_msg_ids.add(msg_id)
content = msg.get("content") if isinstance(msg.get("content"), list) else []
tool_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "tool_use"]
for i, block in enumerate(tool_blocks):
tool_name = str(block.get("name") or "")
inp = block.get("input") or {}
if not isinstance(inp, dict):
inp = {}
label = _format_tool_status(tool_name, inp)
block_id = str(block.get("id") or f"{msg_id}:{i}")
key = f"cc:{block_id}"
result.append((key, ActivityTickerItem(
id=uuid4(), source="Claude Code", message=label, created_at=ts,
)))
return result
def _codex_items_from_lines(
lines: list[str],
seen_call_ids: set[str],
cutoff: datetime | None = None,
) -> list[tuple[str, ActivityTickerItem]]:
"""Return (stable_key, item) pairs for tool-call records in Codex JSONL lines."""
result: list[tuple[str, ActivityTickerItem]] = []
for line in lines:
if not line.strip():
continue
try:
rec = json.loads(line)
except json.JSONDecodeError:
continue
if rec.get("type") != "response_item":
continue
ts = _parse_ts(rec.get("timestamp"))
if ts is None or (cutoff and ts < cutoff):
continue
payload = rec.get("payload") or {}
call_id = str(payload.get("call_id") or "")
if call_id and call_id in seen_call_ids:
continue
if call_id:
seen_call_ids.add(call_id)
label = _format_codex_tool(payload)
if not label:
continue
key = f"cx:{call_id or uuid4()}"
result.append((key, ActivityTickerItem(
id=uuid4(), source="Codex", message=label, created_at=ts,
)))
return result
def _fetch_local_session_items_sync(cutoff: datetime) -> list[ActivityTickerItem]: def _fetch_local_session_items_sync(cutoff: datetime) -> list[ActivityTickerItem]:
items: list[ActivityTickerItem] = [] items: list[ActivityTickerItem] = []
@ -583,6 +696,167 @@ async def get_activity_ticker(
return all_items[:limit] return all_items[:limit]
@router.get("/ticker/stream", summary="Live activity ticker (SSE)")
async def stream_activity_ticker(
request: Request,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> EventSourceResponse:
"""Push ticker items as SSE events. Event type: ``ticker_item``."""
board_ids = await list_accessible_board_ids(session, member=ctx.member, write=False)
gw_rows = list((await session.exec(
select(Gateway).where(col(Gateway.organization_id) == ctx.organization.id)
)).all())
claude_root_env = os.environ.get("CLAUDE_PROJECTS_PATH", "").strip()
claude_root = Path(claude_root_env) if claude_root_env else Path.home() / ".claude" / "projects"
codex_root_env = os.environ.get("CODEX_SESSIONS_PATH", "").strip()
codex_root = Path(codex_root_env) if codex_root_env else Path.home() / ".codex" / "sessions"
async def event_generator() -> Any:
seen_keys: set[str] = set()
seen_queue: deque[str] = deque()
file_offsets: dict[str, int] = {}
seen_cc_msg_ids: set[str] = set()
seen_cx_call_ids: set[str] = set()
def _track(key: str) -> bool:
if key in seen_keys:
return False
seen_keys.add(key)
seen_queue.append(key)
if len(seen_queue) > _TICKER_SEEN_MAX:
seen_keys.discard(seen_queue.popleft())
return True
def _emit(key: str, item: ActivityTickerItem) -> dict[str, str] | None:
if not _track(key):
return None
return {"event": "ticker_item", "data": item.model_dump_json()}
# ── Initial snapshot ──────────────────────────────────────────────
cutoff_2h = utcnow() - timedelta(hours=2)
def _build_db_stmt(since: datetime) -> Any:
stmt = (
select(ActivityEvent, Agent)
.outerjoin(Agent, col(ActivityEvent.agent_id) == col(Agent.id))
.outerjoin(Task, col(ActivityEvent.task_id) == col(Task.id))
.where(func.length(func.trim(col(ActivityEvent.message))) > 0)
.where(col(ActivityEvent.created_at) >= since)
.order_by(asc(col(ActivityEvent.created_at)))
.limit(50)
)
if board_ids:
return stmt.where(or_(
col(ActivityEvent.board_id).in_(board_ids),
and_(
col(ActivityEvent.board_id).is_(None),
col(Task.board_id).in_(board_ids),
),
))
return stmt.where(col(ActivityEvent.id).is_(None))
async with async_session_maker() as init_db:
init_rows = (await init_db.exec(_build_db_stmt(cutoff_2h))).all()
batch: list[tuple[str, ActivityTickerItem]] = []
for row in init_rows:
ev: ActivityEvent = row[0]
ag: Agent | None = row[1]
msg = (ev.message or "").strip()
if msg:
batch.append((f"db:{ev.id}", ActivityTickerItem(
id=ev.id, source=_ticker_source(ev, ag),
message=msg[:200], created_at=ev.created_at,
)))
# Initial JSONL snapshot using tail approach
def _init_local() -> list[tuple[str, ActivityTickerItem]]:
local: list[tuple[str, ActivityTickerItem]] = []
for root, extractor in (
(claude_root, lambda lines: _claude_items_from_lines(lines, set(), cutoff_2h)),
(codex_root, lambda lines: _codex_items_from_lines(lines, set(), cutoff_2h)),
):
if not root.exists():
continue
for path in root.rglob("*.jsonl"):
try:
mtime = datetime.utcfromtimestamp(path.stat().st_mtime)
if mtime >= cutoff_2h:
local.extend(extractor(_tail_lines(path)))
except (OSError, PermissionError):
pass
return local
batch.extend(await asyncio.to_thread(_init_local))
batch.sort(key=lambda x: x[1].created_at)
for key, item in batch:
ev_data = _emit(key, item)
if ev_data:
yield ev_data
# Seed offsets to EOF so we only stream new writes going forward
await asyncio.to_thread(_init_offsets_to_eof, [claude_root, codex_root], file_offsets)
last_db_ts = utcnow()
last_gateway_poll: datetime = datetime.min
# ── Live loop ─────────────────────────────────────────────────────
while True:
if await request.is_disconnected():
break
await asyncio.sleep(_TICKER_STREAM_POLL)
now = utcnow()
live_batch: list[tuple[str, ActivityTickerItem]] = []
# DB: events since last check
async with async_session_maker() as poll_db:
poll_rows = (await poll_db.exec(_build_db_stmt(last_db_ts))).all()
last_db_ts = now
for row in poll_rows:
ev, ag = row[0], row[1]
msg = (ev.message or "").strip()
if msg:
live_batch.append((f"db:{ev.id}", ActivityTickerItem(
id=ev.id, source=_ticker_source(ev, ag),
message=msg[:200], created_at=ev.created_at,
)))
# Local JSONL: incremental (new bytes only)
def _poll_local() -> list[tuple[str, ActivityTickerItem]]:
items: list[tuple[str, ActivityTickerItem]] = []
for root, extractor in (
(claude_root, lambda lines: _claude_items_from_lines(lines, seen_cc_msg_ids)),
(codex_root, lambda lines: _codex_items_from_lines(lines, seen_cx_call_ids)),
):
if not root.exists():
continue
for path in root.rglob("*.jsonl"):
new_lines = _read_new_lines(path, file_offsets)
if new_lines:
items.extend(extractor(new_lines))
return items
live_batch.extend(await asyncio.to_thread(_poll_local))
# Gateway: every 30s
if (now - last_gateway_poll).total_seconds() >= _TICKER_GATEWAY_INTERVAL:
gw_items = await _fetch_runtime_ticker_items(gw_rows, cutoff_2h)
for item in gw_items:
live_batch.append((f"gw:{item.id}", item))
last_gateway_poll = now
live_batch.sort(key=lambda x: x[1].created_at)
for key, item in live_batch:
ev_data = _emit(key, item)
if ev_data:
yield ev_data
return EventSourceResponse(event_generator(), ping=15)
@router.get("", response_model=DefaultLimitOffsetPage[ActivityEventRead]) @router.get("", response_model=DefaultLimitOffsetPage[ActivityEventRead])
async def list_activity( async def list_activity(
session: AsyncSession = SESSION_DEP, session: AsyncSession = SESSION_DEP,

View File

@ -3,7 +3,8 @@
import { useCallback, useEffect, useRef, useState } from "react"; import { useCallback, useEffect, useRef, useState } from "react";
import { useAuth } from "@/auth/clerk"; import { useAuth } from "@/auth/clerk";
import { customFetch } from "@/api/mutator"; import { getLocalAuthToken } from "@/auth/localAuth";
import { getApiBaseUrl } from "@/lib/api-base";
interface TickerItem { interface TickerItem {
id: string; id: string;
@ -24,41 +25,103 @@ function fmtRelative(isoString: string): string {
return `${Math.floor(h / 24)}d ago`; return `${Math.floor(h / 24)}d ago`;
} }
async function fetchTickerItems(limit = 20): Promise<TickerItem[]> { async function getAuthHeaders(): Promise<Record<string, string>> {
const res = await customFetch<{ data: TickerItem[]; status: number }>( const headers: Record<string, string> = {};
`/api/v1/activity/ticker?limit=${limit}`, const local = getLocalAuthToken();
{ method: "GET" }, if (local) {
); headers.Authorization = `Bearer ${local}`;
if (res.status === 200) return res.data; return headers;
return [];
} }
try {
const clerk = (window as unknown as { Clerk?: { session?: { getToken: () => Promise<string> } } }).Clerk;
if (clerk?.session) {
const token = await clerk.session.getToken();
if (token) headers.Authorization = `Bearer ${token}`;
}
} catch {}
return headers;
}
const MAX_ITEMS = 40;
const RECONNECT_DELAY_MS = 3_000;
export function AgentActivityTicker() { export function AgentActivityTicker() {
const { isSignedIn } = useAuth(); const { isSignedIn } = useAuth();
const [items, setItems] = useState<TickerItem[]>([]); const [items, setItems] = useState<TickerItem[]>([]);
const intervalRef = useRef<ReturnType<typeof setInterval> | null>(null); const cancelledRef = useRef(false);
const seenRef = useRef(new Set<string>());
const load = useCallback(async () => { const connect = useCallback(async () => {
if (!isSignedIn) return;
cancelledRef.current = false;
const url = `${getApiBaseUrl()}/api/v1/activity/ticker/stream`;
let headers: Record<string, string>;
try { try {
const data = await fetchTickerItems(20); headers = await getAuthHeaders();
if (data.length > 0) setItems(data);
} catch { } catch {
// Silent — ticker is non-critical return;
} }
}, []);
try {
const res = await fetch(url, { headers });
if (!res.ok || !res.body) return;
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buf = "";
while (!cancelledRef.current) {
const { value, done } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
buf = buf.replace(/\r\n/g, "\n");
let boundary = buf.indexOf("\n\n");
while (boundary !== -1) {
const raw = buf.slice(0, boundary);
buf = buf.slice(boundary + 2);
let eventType = "";
let data = "";
for (const line of raw.split("\n")) {
if (line.startsWith("event:")) eventType = line.slice(6).trim();
else if (line.startsWith("data:")) data += line.slice(5).trim();
}
if (eventType === "ticker_item" && data) {
try {
const item = JSON.parse(data) as TickerItem;
if (!seenRef.current.has(item.id)) {
seenRef.current.add(item.id);
setItems((prev) => [item, ...prev].slice(0, MAX_ITEMS));
}
} catch {}
}
boundary = buf.indexOf("\n\n");
}
}
} catch {
// fall through to reconnect
}
// Reconnect unless the component unmounted
if (!cancelledRef.current) {
setTimeout(() => {
if (!cancelledRef.current) connect();
}, RECONNECT_DELAY_MS);
}
}, [isSignedIn]);
useEffect(() => { useEffect(() => {
if (!isSignedIn) return; cancelledRef.current = false;
void load(); seenRef.current = new Set();
intervalRef.current = setInterval(() => void load(), 30_000); setItems([]);
void connect();
return () => { return () => {
if (intervalRef.current) clearInterval(intervalRef.current); cancelledRef.current = true;
}; };
}, [isSignedIn, load]); }, [connect]);
if (items.length === 0) return null; if (items.length === 0) return null;
// Duplicate items for a seamless loop (animate-ticker moves -50%)
const display = [...items, ...items]; const display = [...items, ...items];
return ( return (