diff --git a/backend/app/api/activity.py b/backend/app/api/activity.py index fefda1a..af68199 100644 --- a/backend/app/api/activity.py +++ b/backend/app/api/activity.py @@ -4,8 +4,10 @@ from __future__ import annotations import asyncio import json +import os from collections import deque from datetime import UTC, datetime, timedelta +from pathlib import Path from typing import TYPE_CHECKING, Any from uuid import UUID, uuid4 @@ -312,6 +314,205 @@ async def _fetch_runtime_ticker_items( return items +def _parse_ts(ts: str | None) -> datetime | None: + if not ts: + return None + try: + return datetime.fromisoformat(ts.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None) + except ValueError: + return None + + +_BASH_CMD_MAX = 80 +_TASK_DESC_MAX = 60 + + +def _format_tool_status(tool_name: str, inp: dict[str, Any]) -> str: + """Human-readable tool label — ported from pixel-agents-openclaw/transcriptParser.ts.""" + def _base(p: Any) -> str: + return os.path.basename(str(p)) if p else "" + + if tool_name == "Read": + return f"Reading {_base(inp.get('file_path', ''))}" + if tool_name == "Edit": + return f"Editing {_base(inp.get('file_path', ''))}" + if tool_name == "Write": + return f"Writing {_base(inp.get('file_path', ''))}" + if tool_name == "Bash": + cmd = str(inp.get("command", "")) + return f"Running: {cmd[:_BASH_CMD_MAX]}…" if len(cmd) > _BASH_CMD_MAX else f"Running: {cmd}" + if tool_name == "Glob": + return "Searching files" + if tool_name == "Grep": + return "Searching code" + if tool_name == "WebFetch": + return "Fetching web content" + if tool_name == "WebSearch": + return "Searching the web" + if tool_name == "Task": + desc = str(inp.get("description", "")) + if desc: + return f"Subtask: {desc[:_TASK_DESC_MAX]}…" if len(desc) > _TASK_DESC_MAX else f"Subtask: {desc}" + return "Running subtask" + if tool_name == "AskUserQuestion": + return "Waiting for input" + if tool_name == "EnterPlanMode": + return "Planning" + if tool_name == "NotebookEdit": + return "Editing notebook" + return f"Using {tool_name}" + + +def _tail_lines(path: Path, nbytes: int = 80_000) -> list[str]: + try: + with open(path, "rb") as fh: + fh.seek(0, 2) + size = fh.tell() + fh.seek(max(0, size - nbytes)) + data = fh.read() + idx = data.find(b"\n") + if idx >= 0: + data = data[idx + 1:] + return data.decode("utf-8", errors="replace").splitlines() + except (OSError, PermissionError): + return [] + + +def _extract_claude_ticker_items(path: Path, cutoff: datetime) -> list[ActivityTickerItem]: + items: list[ActivityTickerItem] = [] + seen_msg_ids: set[str] = set() + for line in _tail_lines(path): + if not line.strip(): + continue + try: + rec = json.loads(line) + except json.JSONDecodeError: + continue + if rec.get("type") != "assistant" or rec.get("isSidechain"): + continue + ts = _parse_ts(rec.get("timestamp")) + if ts is None or ts < cutoff: + continue + msg = rec.get("message") or {} + msg_id = msg.get("id") + if msg_id: + if msg_id in seen_msg_ids: + continue + seen_msg_ids.add(msg_id) + content = msg.get("content") if isinstance(msg.get("content"), list) else [] + + tool_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "tool_use"] + text_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "text"] + + if tool_blocks: + # Emit one ticker item per tool call (most informative for live activity) + for block in tool_blocks: + tool_name = str(block.get("name") or "") + inp = block.get("input") or {} + if not isinstance(inp, dict): + inp = {} + label = _format_tool_status(tool_name, inp) + items.append(ActivityTickerItem( + id=uuid4(), + source="Claude Code", + message=label, + created_at=ts, + )) + elif text_blocks: + # Text-only turn (no tools used) — show the response text + text = " ".join( + b.get("text", "").strip() for b in text_blocks if b.get("text", "").strip() + ).strip() + if text: + items.append(ActivityTickerItem( + id=uuid4(), + source="Claude Code", + message=text[:200], + created_at=ts, + )) + return items + + +def _extract_codex_ticker_items(path: Path, cutoff: datetime) -> list[ActivityTickerItem]: + items: list[ActivityTickerItem] = [] + seen_turn_ids: set[str] = set() + for line in _tail_lines(path): + if not line.strip(): + continue + try: + rec = json.loads(line) + except json.JSONDecodeError: + continue + ts = _parse_ts(rec.get("timestamp")) + if ts is None or ts < cutoff: + continue + payload = rec.get("payload") or {} + rec_type = rec.get("type") + + if rec_type == "event_msg" and payload.get("type") == "task_complete": + turn_id = payload.get("turn_id") or "" + if turn_id and turn_id in seen_turn_ids: + continue + if turn_id: + seen_turn_ids.add(turn_id) + text = (payload.get("last_agent_message") or "").strip() + if text: + items.append(ActivityTickerItem( + id=uuid4(), + source="Codex", + message=text[:200], + created_at=ts, + )) + elif rec_type == "response_item" and payload.get("role") == "assistant": + content = payload.get("content") or [] + text = " ".join( + b.get("text", "").strip() + for b in content + if isinstance(b, dict) and b.get("type") == "output_text" and b.get("text", "").strip() + ).strip() + if text: + items.append(ActivityTickerItem( + id=uuid4(), + source="Codex", + message=text[:200], + created_at=ts, + )) + return items + + +def _fetch_local_session_items_sync(cutoff: datetime) -> list[ActivityTickerItem]: + items: list[ActivityTickerItem] = [] + + claude_root_env = os.environ.get("CLAUDE_PROJECTS_PATH", "").strip() + claude_root = Path(claude_root_env) if claude_root_env else Path.home() / ".claude" / "projects" + if claude_root.exists(): + for path in claude_root.rglob("*.jsonl"): + try: + if datetime.utcfromtimestamp(path.stat().st_mtime) >= cutoff: + items.extend(_extract_claude_ticker_items(path, cutoff)) + except (OSError, PermissionError): + pass + + codex_root_env = os.environ.get("CODEX_SESSIONS_PATH", "").strip() + codex_root = Path(codex_root_env) if codex_root_env else Path.home() / ".codex" / "sessions" + if codex_root.exists(): + for path in codex_root.rglob("*.jsonl"): + try: + if datetime.utcfromtimestamp(path.stat().st_mtime) >= cutoff: + items.extend(_extract_codex_ticker_items(path, cutoff)) + except (OSError, PermissionError): + pass + + return items + + +async def _fetch_local_session_ticker_items(cutoff: datetime) -> list[ActivityTickerItem]: + try: + return await asyncio.to_thread(_fetch_local_session_items_sync, cutoff) + except Exception: # noqa: BLE001 + return [] + + @router.get("/ticker", response_model=list[ActivityTickerItem]) async def get_activity_ticker( limit: int = Query(default=20, ge=1, le=50), @@ -322,53 +523,62 @@ async def get_activity_ticker( board_ids = await list_accessible_board_ids(session, member=ctx.member, write=False) cutoff = utcnow() - timedelta(minutes=15) - statement = ( - select(ActivityEvent, Agent) - .outerjoin(Agent, col(ActivityEvent.agent_id) == col(Agent.id)) - .outerjoin(Task, col(ActivityEvent.task_id) == col(Task.id)) - .where(func.length(func.trim(col(ActivityEvent.message))) > 0) - .where(col(ActivityEvent.created_at) >= cutoff) - .where(col(ActivityEvent.event_type) != "agent.heartbeat") - .order_by(desc(col(ActivityEvent.created_at))) - .limit(limit) - ) - - if board_ids: - statement = statement.where( - or_( - col(ActivityEvent.board_id).in_(board_ids), - and_( - col(ActivityEvent.board_id).is_(None), - col(Task.board_id).in_(board_ids), - ), - ) + def _build_db_statement(since: datetime) -> Any: + stmt = ( + select(ActivityEvent, Agent) + .outerjoin(Agent, col(ActivityEvent.agent_id) == col(Agent.id)) + .outerjoin(Task, col(ActivityEvent.task_id) == col(Task.id)) + .where(func.length(func.trim(col(ActivityEvent.message))) > 0) + .where(col(ActivityEvent.created_at) >= since) + .order_by(desc(col(ActivityEvent.created_at))) + .limit(limit) ) - else: - statement = statement.where(col(ActivityEvent.id).is_(None)) + if board_ids: + return stmt.where( + or_( + col(ActivityEvent.board_id).in_(board_ids), + and_( + col(ActivityEvent.board_id).is_(None), + col(Task.board_id).in_(board_ids), + ), + ) + ) + return stmt.where(col(ActivityEvent.id).is_(None)) - rows = (await session.exec(statement)).all() - db_items: list[ActivityTickerItem] = [] - for row in rows: - event: ActivityEvent = row[0] - agent: Agent | None = row[1] - msg = (event.message or "").strip() - if not msg: - continue - db_items.append( - ActivityTickerItem( + def _rows_to_items(rows: Any) -> list[ActivityTickerItem]: + result: list[ActivityTickerItem] = [] + for row in rows: + event: ActivityEvent = row[0] + agent: Agent | None = row[1] + msg = (event.message or "").strip() + if not msg: + continue + result.append(ActivityTickerItem( id=event.id, source=_ticker_source(event, agent), message=msg[:200], created_at=event.created_at, - ) - ) + )) + return result + + rows = (await session.exec(_build_db_statement(cutoff))).all() + db_items = _rows_to_items(rows) + + # If nothing recent, widen to 2 hours so the ticker is never blank + if not db_items: + fallback_cutoff = utcnow() - timedelta(hours=2) + rows = (await session.exec(_build_db_statement(fallback_cutoff))).all() + db_items = _rows_to_items(rows) gw_rows = list((await session.exec( select(Gateway).where(col(Gateway.organization_id) == ctx.organization.id) )).all()) - runtime_items = await _fetch_runtime_ticker_items(gw_rows, cutoff) + runtime_items, local_items = await asyncio.gather( + _fetch_runtime_ticker_items(gw_rows, cutoff), + _fetch_local_session_ticker_items(cutoff), + ) - all_items = db_items + runtime_items + all_items = db_items + runtime_items + local_items all_items.sort(key=lambda x: x.created_at, reverse=True) return all_items[:limit] diff --git a/compose.yml b/compose.yml index 95b68fb..35c5634 100644 --- a/compose.yml +++ b/compose.yml @@ -54,6 +54,8 @@ services: CODEX_CREDENTIALS_PATH: /run/secrets/codex_credentials # Claude Code session JSONL files — read-only mount so the session viewer works. CLAUDE_PROJECTS_PATH: /run/claude/projects + # Codex CLI session JSONL files — read-only mount for the activity ticker. + CODEX_SESSIONS_PATH: /run/codex/sessions # AI provider API keys — seeded into provider_credentials on boot (optional). ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY:-} ANTHROPIC_BASE_URL: ${ANTHROPIC_BASE_URL:-} @@ -63,6 +65,7 @@ services: - ${CLAUDE_CREDENTIALS_FILE:-/home/kaspa/.claude/.credentials.json}:/run/secrets/claude_credentials:ro - ${CODEX_CREDENTIALS_FILE:-/home/kaspa/.codex/auth.json}:/run/secrets/codex_credentials:ro - ${CLAUDE_PROJECTS_DIR:-/home/kaspa/.claude/projects}:/run/claude/projects:ro + - ${CODEX_SESSIONS_DIR:-/home/kaspa/.codex/sessions}:/run/codex/sessions:ro depends_on: db: condition: service_healthy