"""Activity listing and task-comment feed endpoints.""" from __future__ import annotations import asyncio import json import os from collections import deque from datetime import UTC, datetime, timedelta from pathlib import Path from typing import TYPE_CHECKING, Any from uuid import UUID, uuid4 from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from sqlalchemy import and_, asc, desc, func, or_ from sqlmodel import col, select from sse_starlette.sse import EventSourceResponse from app.api.deps import ActorContext, require_org_member, require_user_or_agent from app.core.time import utcnow from app.db.pagination import paginate from app.db.session import async_session_maker, get_session from app.models.activity_events import ActivityEvent from app.models.agents import Agent from app.models.boards import Board from app.models.gateways import Gateway from app.models.tasks import Task from app.schemas.activity_events import ActivityEventRead, ActivityTaskCommentFeedItemRead, ActivityTickerItem from app.schemas.pagination import DefaultLimitOffsetPage from app.services.organizations import ( OrganizationContext, get_active_membership, list_accessible_board_ids, ) if TYPE_CHECKING: from collections.abc import AsyncIterator, Sequence from fastapi_pagination.limit_offset import LimitOffsetPage from sqlmodel.ext.asyncio.session import AsyncSession router = APIRouter(prefix="/activity", tags=["activity"]) SSE_SEEN_MAX = 2000 STREAM_POLL_SECONDS = 2 TASK_COMMENT_ROW_LEN = 4 SESSION_DEP = Depends(get_session) ACTOR_DEP = Depends(require_user_or_agent) ORG_MEMBER_DEP = Depends(require_org_member) BOARD_ID_QUERY = Query(default=None) SINCE_QUERY = Query(default=None) _RUNTIME_TYPE_REFERENCES = (UUID,) def _parse_since(value: str | None) -> datetime | None: if not value: return None normalized = value.strip() if not normalized: return None normalized = normalized.replace("Z", "+00:00") try: parsed = datetime.fromisoformat(normalized) except ValueError: return None if parsed.tzinfo is not None: return parsed.astimezone(UTC).replace(tzinfo=None) return parsed def _agent_role(agent: Agent | None) -> str | None: if agent is None: return None profile = agent.identity_profile if not isinstance(profile, dict): return None raw = profile.get("role") if isinstance(raw, str): role = raw.strip() return role or None return None def _build_activity_route( *, event: ActivityEvent, board_id: UUID | None, ) -> tuple[str, dict[str, str]]: if board_id is not None: board_id_str = str(board_id) board_params = {"boardId": board_id_str} if event.event_type == "task.comment" and event.task_id is not None: return ( "board", { **board_params, "taskId": str(event.task_id), "commentId": str(event.id), }, ) if event.event_type.startswith("approval."): return ("board.approvals", board_params) if event.event_type.startswith("board."): return ("board", {**board_params, "panel": "chat"}) if event.task_id is not None: return ("board", {**board_params, "taskId": str(event.task_id)}) return ("board", board_params) fallback_params = { "eventId": str(event.id), "eventType": event.event_type, "createdAt": event.created_at.isoformat(), } if event.task_id is not None: fallback_params["taskId"] = str(event.task_id) return ("activity", fallback_params) def _feed_item( event: ActivityEvent, task: Task, board: Board, agent: Agent | None, ) -> ActivityTaskCommentFeedItemRead: return ActivityTaskCommentFeedItemRead( id=event.id, created_at=event.created_at, message=event.message, agent_id=event.agent_id, agent_name=agent.name if agent else None, agent_role=_agent_role(agent), task_id=task.id, task_title=task.title, board_id=board.id, board_name=board.name, ) def _coerce_task_comment_rows( items: Sequence[Any], ) -> list[tuple[ActivityEvent, Task, Board, Agent | None]]: rows: list[tuple[ActivityEvent, Task, Board, Agent | None]] = [] for item in items: first: Any second: Any third: Any fourth: Any if isinstance(item, tuple): if len(item) != TASK_COMMENT_ROW_LEN: msg = "Expected (ActivityEvent, Task, Board, Agent | None) rows" raise TypeError(msg) first, second, third, fourth = item else: try: row_len = len(item) first = item[0] second = item[1] third = item[2] fourth = item[3] except (IndexError, KeyError, TypeError): msg = "Expected (ActivityEvent, Task, Board, Agent | None) rows" raise TypeError(msg) from None if row_len != TASK_COMMENT_ROW_LEN: msg = "Expected (ActivityEvent, Task, Board, Agent | None) rows" raise TypeError(msg) if ( isinstance(first, ActivityEvent) and isinstance(second, Task) and isinstance(third, Board) and (isinstance(fourth, Agent) or fourth is None) ): rows.append((first, second, third, fourth)) continue msg = "Expected (ActivityEvent, Task, Board, Agent | None) rows" raise TypeError(msg) return rows def _coerce_activity_rows( items: Sequence[Any], ) -> list[tuple[ActivityEvent, UUID | None, UUID | None]]: rows: list[tuple[ActivityEvent, UUID | None, UUID | None]] = [] for item in items: first: Any second: Any third: Any if isinstance(item, tuple): if len(item) != 3: msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows" raise TypeError(msg) first, second, third = item else: try: row_len = len(item) first = item[0] second = item[1] third = item[2] except (IndexError, KeyError, TypeError): msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows" raise TypeError(msg) from None if row_len != 3: msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows" raise TypeError(msg) if not isinstance(first, ActivityEvent): msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows" raise TypeError(msg) if second is not None and not isinstance(second, UUID): msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows" raise TypeError(msg) if third is not None and not isinstance(third, UUID): msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows" raise TypeError(msg) rows.append((first, second, third)) return rows async def _fetch_task_comment_events( session: AsyncSession, since: datetime, *, board_id: UUID | None = None, ) -> Sequence[tuple[ActivityEvent, Task, Board, Agent | None]]: statement = ( select(ActivityEvent, Task, Board, Agent) .join(Task, col(ActivityEvent.task_id) == col(Task.id)) .join(Board, col(Task.board_id) == col(Board.id)) .outerjoin(Agent, col(ActivityEvent.agent_id) == col(Agent.id)) .where(col(ActivityEvent.event_type) == "task.comment") .where(col(ActivityEvent.created_at) >= since) .where(func.length(func.trim(col(ActivityEvent.message))) > 0) .order_by(asc(col(ActivityEvent.created_at))) ) if board_id is not None: statement = statement.where(col(Task.board_id) == board_id) return _coerce_task_comment_rows(list(await session.exec(statement))) def _ticker_source(event: ActivityEvent, agent: Agent | None) -> str: if agent is not None: return agent.name parts = event.event_type.replace(".", " ").replace("_", " ").split() return " ".join(p.capitalize() for p in parts) def _runtime_source(session_key: str, session_label: str | None, model: str | None) -> str: if session_label: return session_label if model: m = model.lower() if "claude" in m: return "Claude" if "codex" in m: return "Codex" if "gpt" in m or "openai" in m: return "GPT" if "gemini" in m or "google" in m: return "Gemini" return model parts = session_key.split(":") if len(parts) >= 2: return parts[1].replace("-", " ").replace("_", " ").title() return "Session" async def _fetch_runtime_ticker_items( gateways: list[Gateway], cutoff: datetime, ) -> list[ActivityTickerItem]: from app.services.openclaw.gateway_rpc import GatewayConfig # noqa: PLC0415 from app.services.openclaw.runtime_activity import fetch_recent_events # noqa: PLC0415 items: list[ActivityTickerItem] = [] for gw in gateways: try: config = GatewayConfig( url=gw.url, token=gw.token, allow_insecure_tls=gw.allow_insecure_tls, disable_device_pairing=gw.disable_device_pairing, ) events = await asyncio.wait_for( fetch_recent_events(config, max_sessions=10, history_limit=15), timeout=3.0, ) for ev in events: if ev.role != "assistant": continue msg = ev.content_preview.strip() if not msg: continue ts = ev.timestamp if ts is None or ts < cutoff: continue items.append( ActivityTickerItem( id=uuid4(), source=_runtime_source(ev.session_key, ev.session_label, ev.model), message=msg[:200], created_at=ts, ) ) except Exception: # noqa: BLE001 pass return items def _parse_ts(ts: str | None) -> datetime | None: if not ts: return None try: return datetime.fromisoformat(ts.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None) except ValueError: return None _TICKER_STREAM_POLL = 2 # seconds between stream poll cycles _TICKER_GATEWAY_INTERVAL = 30 # seconds between gateway polls in stream _TICKER_SEEN_MAX = 2000 _BASH_CMD_MAX = 80 _TASK_DESC_MAX = 60 def _format_tool_status(tool_name: str, inp: dict[str, Any]) -> str: """Human-readable tool label — ported from pixel-agents-openclaw/transcriptParser.ts.""" def _base(p: Any) -> str: return os.path.basename(str(p)) if p else "" if tool_name == "Read": return f"Reading {_base(inp.get('file_path', ''))}" if tool_name == "Edit": return f"Editing {_base(inp.get('file_path', ''))}" if tool_name == "Write": return f"Writing {_base(inp.get('file_path', ''))}" if tool_name == "Bash": cmd = str(inp.get("command", "")) return f"Running: {cmd[:_BASH_CMD_MAX]}…" if len(cmd) > _BASH_CMD_MAX else f"Running: {cmd}" if tool_name == "Glob": return "Searching files" if tool_name == "Grep": return "Searching code" if tool_name == "WebFetch": return "Fetching web content" if tool_name == "WebSearch": return "Searching the web" if tool_name == "Task": desc = str(inp.get("description", "")) if desc: return f"Subtask: {desc[:_TASK_DESC_MAX]}…" if len(desc) > _TASK_DESC_MAX else f"Subtask: {desc}" return "Running subtask" if tool_name == "AskUserQuestion": return "Waiting for input" if tool_name == "EnterPlanMode": return "Planning" if tool_name == "NotebookEdit": return "Editing notebook" return f"Using {tool_name}" def _tail_lines(path: Path, nbytes: int = 80_000) -> list[str]: try: with open(path, "rb") as fh: fh.seek(0, 2) size = fh.tell() fh.seek(max(0, size - nbytes)) data = fh.read() idx = data.find(b"\n") if idx >= 0: data = data[idx + 1:] return data.decode("utf-8", errors="replace").splitlines() except (OSError, PermissionError): return [] def _extract_claude_ticker_items(path: Path, cutoff: datetime) -> list[ActivityTickerItem]: items: list[ActivityTickerItem] = [] seen_msg_ids: set[str] = set() for line in _tail_lines(path): if not line.strip(): continue try: rec = json.loads(line) except json.JSONDecodeError: continue if rec.get("type") != "assistant" or rec.get("isSidechain"): continue ts = _parse_ts(rec.get("timestamp")) if ts is None or ts < cutoff: continue msg = rec.get("message") or {} msg_id = msg.get("id") if msg_id: if msg_id in seen_msg_ids: continue seen_msg_ids.add(msg_id) content = msg.get("content") if isinstance(msg.get("content"), list) else [] tool_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "tool_use"] text_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "text"] for block in tool_blocks: tool_name = str(block.get("name") or "") inp = block.get("input") or {} if not isinstance(inp, dict): inp = {} label = _format_tool_status(tool_name, inp) items.append(ActivityTickerItem( id=uuid4(), source="Claude Code", message=label, created_at=ts, )) return items def _format_codex_tool(payload: dict[str, Any]) -> str | None: """Return a human-readable label for a Codex tool call record, or None to skip.""" ptype = payload.get("type") name = str(payload.get("name") or "") if ptype == "function_call": raw_args = payload.get("arguments") or "{}" try: args = json.loads(raw_args) if isinstance(raw_args, str) else raw_args except json.JSONDecodeError: args = {} if name == "exec_command": cmd = str(args.get("cmd", "")).strip() if not cmd: return None return f"Running: {cmd[:_BASH_CMD_MAX]}…" if len(cmd) > _BASH_CMD_MAX else f"Running: {cmd}" if name in ("write_stdin", "read_stdout", "create_session", "delete_session"): return None # internal plumbing, not meaningful return f"Using {name}" if ptype == "custom_tool_call": if name == "apply_patch": return "Applying code patch" return f"Using {name}" return None def _extract_codex_ticker_items(path: Path, cutoff: datetime) -> list[ActivityTickerItem]: items: list[ActivityTickerItem] = [] seen_call_ids: set[str] = set() for line in _tail_lines(path): if not line.strip(): continue try: rec = json.loads(line) except json.JSONDecodeError: continue if rec.get("type") != "response_item": continue ts = _parse_ts(rec.get("timestamp")) if ts is None or ts < cutoff: continue payload = rec.get("payload") or {} call_id = str(payload.get("call_id") or "") if call_id: if call_id in seen_call_ids: continue seen_call_ids.add(call_id) label = _format_codex_tool(payload) if not label: continue items.append(ActivityTickerItem( id=uuid4(), source="Codex", message=label, created_at=ts, )) return items def _read_new_lines(path: Path, offsets: dict[str, int]) -> list[str]: """Read only new bytes since the last recorded offset. Updates offsets in-place.""" key = str(path) offset = offsets.get(key, 0) try: with open(path, "rb") as fh: fh.seek(0, 2) size = fh.tell() if size <= offset: return [] fh.seek(offset) data = fh.read() last_nl = data.rfind(b"\n") if last_nl < 0: return [] offsets[key] = offset + last_nl + 1 return data[: last_nl + 1].decode("utf-8", errors="replace").splitlines() except (OSError, PermissionError): return [] def _init_offsets_to_eof(roots: list[Path], offsets: dict[str, int]) -> None: """Seed offsets to current EOF so the stream only emits future writes.""" for root in roots: if not root.exists(): continue for path in root.rglob("*.jsonl"): try: offsets[str(path)] = path.stat().st_size except (OSError, PermissionError): pass def _claude_items_from_lines( lines: list[str], seen_msg_ids: set[str], cutoff: datetime | None = None, ) -> list[tuple[str, ActivityTickerItem]]: """Return (stable_key, item) pairs for tool-use blocks in Claude JSONL lines.""" result: list[tuple[str, ActivityTickerItem]] = [] for line in lines: if not line.strip(): continue try: rec = json.loads(line) except json.JSONDecodeError: continue if rec.get("type") != "assistant" or rec.get("isSidechain"): continue ts = _parse_ts(rec.get("timestamp")) if ts is None or (cutoff and ts < cutoff): continue msg = rec.get("message") or {} msg_id = msg.get("id") or "" if msg_id and msg_id in seen_msg_ids: continue if msg_id: seen_msg_ids.add(msg_id) content = msg.get("content") if isinstance(msg.get("content"), list) else [] tool_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "tool_use"] for i, block in enumerate(tool_blocks): tool_name = str(block.get("name") or "") inp = block.get("input") or {} if not isinstance(inp, dict): inp = {} label = _format_tool_status(tool_name, inp) block_id = str(block.get("id") or f"{msg_id}:{i}") key = f"cc:{block_id}" result.append((key, ActivityTickerItem( id=uuid4(), source="Claude Code", message=label, created_at=ts, ))) return result def _codex_items_from_lines( lines: list[str], seen_call_ids: set[str], cutoff: datetime | None = None, ) -> list[tuple[str, ActivityTickerItem]]: """Return (stable_key, item) pairs for tool-call records in Codex JSONL lines.""" result: list[tuple[str, ActivityTickerItem]] = [] for line in lines: if not line.strip(): continue try: rec = json.loads(line) except json.JSONDecodeError: continue if rec.get("type") != "response_item": continue ts = _parse_ts(rec.get("timestamp")) if ts is None or (cutoff and ts < cutoff): continue payload = rec.get("payload") or {} call_id = str(payload.get("call_id") or "") if call_id and call_id in seen_call_ids: continue if call_id: seen_call_ids.add(call_id) label = _format_codex_tool(payload) if not label: continue key = f"cx:{call_id or uuid4()}" result.append((key, ActivityTickerItem( id=uuid4(), source="Codex", message=label, created_at=ts, ))) return result def _fetch_local_session_items_sync(cutoff: datetime) -> list[ActivityTickerItem]: items: list[ActivityTickerItem] = [] claude_root_env = os.environ.get("CLAUDE_PROJECTS_PATH", "").strip() claude_root = Path(claude_root_env) if claude_root_env else Path.home() / ".claude" / "projects" if claude_root.exists(): for path in claude_root.rglob("*.jsonl"): try: if datetime.utcfromtimestamp(path.stat().st_mtime) >= cutoff: items.extend(_extract_claude_ticker_items(path, cutoff)) except (OSError, PermissionError): pass codex_root_env = os.environ.get("CODEX_SESSIONS_PATH", "").strip() codex_root = Path(codex_root_env) if codex_root_env else Path.home() / ".codex" / "sessions" if codex_root.exists(): for path in codex_root.rglob("*.jsonl"): try: if datetime.utcfromtimestamp(path.stat().st_mtime) >= cutoff: items.extend(_extract_codex_ticker_items(path, cutoff)) except (OSError, PermissionError): pass return items async def _fetch_local_session_ticker_items(cutoff: datetime) -> list[ActivityTickerItem]: try: return await asyncio.to_thread(_fetch_local_session_items_sync, cutoff) except Exception: # noqa: BLE001 return [] @router.get("/ticker", response_model=list[ActivityTickerItem]) async def get_activity_ticker( limit: int = Query(default=20, ge=1, le=50), session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> list[ActivityTickerItem]: """Return recent activity items shaped for the navbar ticker.""" board_ids = await list_accessible_board_ids(session, member=ctx.member, write=False) cutoff = utcnow() - timedelta(minutes=15) def _build_db_statement(since: datetime) -> Any: stmt = ( select(ActivityEvent, Agent) .outerjoin(Agent, col(ActivityEvent.agent_id) == col(Agent.id)) .outerjoin(Task, col(ActivityEvent.task_id) == col(Task.id)) .where(func.length(func.trim(col(ActivityEvent.message))) > 0) .where(col(ActivityEvent.created_at) >= since) .order_by(desc(col(ActivityEvent.created_at))) .limit(limit) ) if board_ids: return stmt.where( or_( col(ActivityEvent.board_id).in_(board_ids), and_( col(ActivityEvent.board_id).is_(None), col(Task.board_id).in_(board_ids), ), ) ) return stmt.where(col(ActivityEvent.id).is_(None)) def _rows_to_items(rows: Any) -> list[ActivityTickerItem]: result: list[ActivityTickerItem] = [] for row in rows: event: ActivityEvent = row[0] agent: Agent | None = row[1] msg = (event.message or "").strip() if not msg: continue result.append(ActivityTickerItem( id=event.id, source=_ticker_source(event, agent), message=msg[:200], created_at=event.created_at, )) return result rows = (await session.exec(_build_db_statement(cutoff))).all() db_items = _rows_to_items(rows) # If nothing recent, widen to 2 hours so the ticker is never blank if not db_items: fallback_cutoff = utcnow() - timedelta(hours=2) rows = (await session.exec(_build_db_statement(fallback_cutoff))).all() db_items = _rows_to_items(rows) gw_rows = list((await session.exec( select(Gateway).where(col(Gateway.organization_id) == ctx.organization.id) )).all()) runtime_items, local_items = await asyncio.gather( _fetch_runtime_ticker_items(gw_rows, cutoff), _fetch_local_session_ticker_items(cutoff), ) all_items = db_items + runtime_items + local_items all_items.sort(key=lambda x: x.created_at, reverse=True) return all_items[:limit] @router.get("/ticker/stream", summary="Live activity ticker (SSE)") async def stream_activity_ticker( request: Request, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> EventSourceResponse: """Push ticker items as SSE events. Event type: ``ticker_item``.""" board_ids = await list_accessible_board_ids(session, member=ctx.member, write=False) gw_rows = list((await session.exec( select(Gateway).where(col(Gateway.organization_id) == ctx.organization.id) )).all()) claude_root_env = os.environ.get("CLAUDE_PROJECTS_PATH", "").strip() claude_root = Path(claude_root_env) if claude_root_env else Path.home() / ".claude" / "projects" codex_root_env = os.environ.get("CODEX_SESSIONS_PATH", "").strip() codex_root = Path(codex_root_env) if codex_root_env else Path.home() / ".codex" / "sessions" async def event_generator() -> Any: seen_keys: set[str] = set() seen_queue: deque[str] = deque() file_offsets: dict[str, int] = {} seen_cc_msg_ids: set[str] = set() seen_cx_call_ids: set[str] = set() def _track(key: str) -> bool: if key in seen_keys: return False seen_keys.add(key) seen_queue.append(key) if len(seen_queue) > _TICKER_SEEN_MAX: seen_keys.discard(seen_queue.popleft()) return True def _emit(key: str, item: ActivityTickerItem) -> dict[str, str] | None: if not _track(key): return None return {"event": "ticker_item", "data": item.model_dump_json()} # ── Initial snapshot ────────────────────────────────────────────── cutoff_2h = utcnow() - timedelta(hours=2) def _build_db_stmt(since: datetime) -> Any: stmt = ( select(ActivityEvent, Agent) .outerjoin(Agent, col(ActivityEvent.agent_id) == col(Agent.id)) .outerjoin(Task, col(ActivityEvent.task_id) == col(Task.id)) .where(func.length(func.trim(col(ActivityEvent.message))) > 0) .where(col(ActivityEvent.created_at) >= since) .order_by(asc(col(ActivityEvent.created_at))) .limit(50) ) if board_ids: return stmt.where(or_( col(ActivityEvent.board_id).in_(board_ids), and_( col(ActivityEvent.board_id).is_(None), col(Task.board_id).in_(board_ids), ), )) return stmt.where(col(ActivityEvent.id).is_(None)) async with async_session_maker() as init_db: init_rows = (await init_db.exec(_build_db_stmt(cutoff_2h))).all() batch: list[tuple[str, ActivityTickerItem]] = [] for row in init_rows: ev: ActivityEvent = row[0] ag: Agent | None = row[1] msg = (ev.message or "").strip() if msg: batch.append((f"db:{ev.id}", ActivityTickerItem( id=ev.id, source=_ticker_source(ev, ag), message=msg[:200], created_at=ev.created_at, ))) # Initial JSONL snapshot using tail approach def _init_local() -> list[tuple[str, ActivityTickerItem]]: local: list[tuple[str, ActivityTickerItem]] = [] for root, extractor in ( (claude_root, lambda lines: _claude_items_from_lines(lines, set(), cutoff_2h)), (codex_root, lambda lines: _codex_items_from_lines(lines, set(), cutoff_2h)), ): if not root.exists(): continue for path in root.rglob("*.jsonl"): try: mtime = datetime.utcfromtimestamp(path.stat().st_mtime) if mtime >= cutoff_2h: local.extend(extractor(_tail_lines(path))) except (OSError, PermissionError): pass return local batch.extend(await asyncio.to_thread(_init_local)) batch.sort(key=lambda x: x[1].created_at) for key, item in batch: ev_data = _emit(key, item) if ev_data: yield ev_data # Seed offsets to EOF so we only stream new writes going forward await asyncio.to_thread(_init_offsets_to_eof, [claude_root, codex_root], file_offsets) last_db_ts = utcnow() last_gateway_poll: datetime = datetime.min # ── Live loop ───────────────────────────────────────────────────── while True: if await request.is_disconnected(): break await asyncio.sleep(_TICKER_STREAM_POLL) now = utcnow() live_batch: list[tuple[str, ActivityTickerItem]] = [] # DB: events since last check async with async_session_maker() as poll_db: poll_rows = (await poll_db.exec(_build_db_stmt(last_db_ts))).all() last_db_ts = now for row in poll_rows: ev, ag = row[0], row[1] msg = (ev.message or "").strip() if msg: live_batch.append((f"db:{ev.id}", ActivityTickerItem( id=ev.id, source=_ticker_source(ev, ag), message=msg[:200], created_at=ev.created_at, ))) # Local JSONL: incremental (new bytes only) def _poll_local() -> list[tuple[str, ActivityTickerItem]]: items: list[tuple[str, ActivityTickerItem]] = [] for root, extractor in ( (claude_root, lambda lines: _claude_items_from_lines(lines, seen_cc_msg_ids)), (codex_root, lambda lines: _codex_items_from_lines(lines, seen_cx_call_ids)), ): if not root.exists(): continue for path in root.rglob("*.jsonl"): new_lines = _read_new_lines(path, file_offsets) if new_lines: items.extend(extractor(new_lines)) return items live_batch.extend(await asyncio.to_thread(_poll_local)) # Gateway: every 30s if (now - last_gateway_poll).total_seconds() >= _TICKER_GATEWAY_INTERVAL: gw_items = await _fetch_runtime_ticker_items(gw_rows, cutoff_2h) for item in gw_items: live_batch.append((f"gw:{item.id}", item)) last_gateway_poll = now live_batch.sort(key=lambda x: x[1].created_at) for key, item in live_batch: ev_data = _emit(key, item) if ev_data: yield ev_data return EventSourceResponse(event_generator(), ping=15) @router.get("", response_model=DefaultLimitOffsetPage[ActivityEventRead]) async def list_activity( session: AsyncSession = SESSION_DEP, actor: ActorContext = ACTOR_DEP, ) -> LimitOffsetPage[ActivityEventRead]: """List activity events visible to the calling actor.""" statement: Any = select( ActivityEvent, col(ActivityEvent.board_id).label("event_board_id"), col(Task.board_id).label("task_board_id"), ).outerjoin(Task, col(ActivityEvent.task_id) == col(Task.id)) if actor.actor_type == "agent" and actor.agent: statement = statement.where(col(ActivityEvent.agent_id) == actor.agent.id) elif actor.actor_type == "user" and actor.user: member = await get_active_membership(session, actor.user) if member is None: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) board_ids = await list_accessible_board_ids(session, member=member, write=False) if not board_ids: statement = statement.where(col(ActivityEvent.id).is_(None)) else: statement = statement.where( or_( col(ActivityEvent.board_id).in_(board_ids), and_( col(ActivityEvent.board_id).is_(None), col(Task.board_id).in_(board_ids), ), ), ) statement = statement.order_by(desc(col(ActivityEvent.created_at))) def _transform(items: Sequence[Any]) -> Sequence[Any]: rows = _coerce_activity_rows(items) events: list[ActivityEventRead] = [] for event, event_board_id, task_board_id in rows: payload = ActivityEventRead.model_validate(event, from_attributes=True) resolved_board_id = event_board_id or task_board_id payload.board_id = resolved_board_id route_name, route_params = _build_activity_route( event=event, board_id=resolved_board_id, ) payload.route_name = route_name payload.route_params = route_params events.append(payload) return events return await paginate(session, statement, transformer=_transform) @router.get( "/task-comments", response_model=DefaultLimitOffsetPage[ActivityTaskCommentFeedItemRead], ) async def list_task_comment_feed( board_id: UUID | None = BOARD_ID_QUERY, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> LimitOffsetPage[ActivityTaskCommentFeedItemRead]: """List task-comment feed items for accessible boards.""" statement = ( select(ActivityEvent, Task, Board, Agent) .join(Task, col(ActivityEvent.task_id) == col(Task.id)) .join(Board, col(Task.board_id) == col(Board.id)) .outerjoin(Agent, col(ActivityEvent.agent_id) == col(Agent.id)) .where(col(ActivityEvent.event_type) == "task.comment") .where(func.length(func.trim(col(ActivityEvent.message))) > 0) .order_by(desc(col(ActivityEvent.created_at))) ) board_ids = await list_accessible_board_ids(session, member=ctx.member, write=False) if board_id is not None: if board_id not in set(board_ids): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) statement = statement.where(col(Task.board_id) == board_id) elif board_ids: statement = statement.where(col(Task.board_id).in_(board_ids)) else: statement = statement.where(col(Task.id).is_(None)) def _transform(items: Sequence[Any]) -> Sequence[Any]: rows = _coerce_task_comment_rows(items) return [_feed_item(event, task, board, agent) for event, task, board, agent in rows] return await paginate(session, statement, transformer=_transform) @router.get("/task-comments/stream") async def stream_task_comment_feed( request: Request, board_id: UUID | None = BOARD_ID_QUERY, since: str | None = SINCE_QUERY, db_session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> EventSourceResponse: """Stream task-comment events for accessible boards.""" since_dt = _parse_since(since) or utcnow() board_ids = await list_accessible_board_ids( db_session, member=ctx.member, write=False, ) allowed_ids = set(board_ids) if board_id is not None and board_id not in allowed_ids: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) seen_ids: set[UUID] = set() seen_queue: deque[UUID] = deque() async def event_generator() -> AsyncIterator[dict[str, str]]: last_seen = since_dt while True: if await request.is_disconnected(): break async with async_session_maker() as stream_session: if board_id is not None: rows = await _fetch_task_comment_events( stream_session, last_seen, board_id=board_id, ) elif allowed_ids: rows = await _fetch_task_comment_events(stream_session, last_seen) rows = [row for row in rows if row[1].board_id in allowed_ids] else: rows = [] for event, task, board, agent in rows: event_id = event.id if event_id in seen_ids: continue seen_ids.add(event_id) seen_queue.append(event_id) if len(seen_queue) > SSE_SEEN_MAX: oldest = seen_queue.popleft() seen_ids.discard(oldest) last_seen = max(event.created_at, last_seen) payload = { "comment": _feed_item( event, task, board, agent, ).model_dump(mode="json"), } yield {"event": "comment", "data": json.dumps(payload)} await asyncio.sleep(STREAM_POLL_SECONDS) return EventSourceResponse(event_generator(), ping=15)