Pipeline/backend/app/api/activity.py

733 lines
26 KiB
Python

"""Activity listing and task-comment feed endpoints."""
from __future__ import annotations
import asyncio
import json
import os
from collections import deque
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import TYPE_CHECKING, Any
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import and_, asc, desc, func, or_
from sqlmodel import col, select
from sse_starlette.sse import EventSourceResponse
from app.api.deps import ActorContext, require_org_member, require_user_or_agent
from app.core.time import utcnow
from app.db.pagination import paginate
from app.db.session import async_session_maker, get_session
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.tasks import Task
from app.schemas.activity_events import ActivityEventRead, ActivityTaskCommentFeedItemRead, ActivityTickerItem
from app.schemas.pagination import DefaultLimitOffsetPage
from app.services.organizations import (
OrganizationContext,
get_active_membership,
list_accessible_board_ids,
)
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Sequence
from fastapi_pagination.limit_offset import LimitOffsetPage
from sqlmodel.ext.asyncio.session import AsyncSession
router = APIRouter(prefix="/activity", tags=["activity"])
SSE_SEEN_MAX = 2000
STREAM_POLL_SECONDS = 2
TASK_COMMENT_ROW_LEN = 4
SESSION_DEP = Depends(get_session)
ACTOR_DEP = Depends(require_user_or_agent)
ORG_MEMBER_DEP = Depends(require_org_member)
BOARD_ID_QUERY = Query(default=None)
SINCE_QUERY = Query(default=None)
_RUNTIME_TYPE_REFERENCES = (UUID,)
def _parse_since(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.strip()
if not normalized:
return None
normalized = normalized.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(UTC).replace(tzinfo=None)
return parsed
def _agent_role(agent: Agent | None) -> str | None:
if agent is None:
return None
profile = agent.identity_profile
if not isinstance(profile, dict):
return None
raw = profile.get("role")
if isinstance(raw, str):
role = raw.strip()
return role or None
return None
def _build_activity_route(
*,
event: ActivityEvent,
board_id: UUID | None,
) -> tuple[str, dict[str, str]]:
if board_id is not None:
board_id_str = str(board_id)
board_params = {"boardId": board_id_str}
if event.event_type == "task.comment" and event.task_id is not None:
return (
"board",
{
**board_params,
"taskId": str(event.task_id),
"commentId": str(event.id),
},
)
if event.event_type.startswith("approval."):
return ("board.approvals", board_params)
if event.event_type.startswith("board."):
return ("board", {**board_params, "panel": "chat"})
if event.task_id is not None:
return ("board", {**board_params, "taskId": str(event.task_id)})
return ("board", board_params)
fallback_params = {
"eventId": str(event.id),
"eventType": event.event_type,
"createdAt": event.created_at.isoformat(),
}
if event.task_id is not None:
fallback_params["taskId"] = str(event.task_id)
return ("activity", fallback_params)
def _feed_item(
event: ActivityEvent,
task: Task,
board: Board,
agent: Agent | None,
) -> ActivityTaskCommentFeedItemRead:
return ActivityTaskCommentFeedItemRead(
id=event.id,
created_at=event.created_at,
message=event.message,
agent_id=event.agent_id,
agent_name=agent.name if agent else None,
agent_role=_agent_role(agent),
task_id=task.id,
task_title=task.title,
board_id=board.id,
board_name=board.name,
)
def _coerce_task_comment_rows(
items: Sequence[Any],
) -> list[tuple[ActivityEvent, Task, Board, Agent | None]]:
rows: list[tuple[ActivityEvent, Task, Board, Agent | None]] = []
for item in items:
first: Any
second: Any
third: Any
fourth: Any
if isinstance(item, tuple):
if len(item) != TASK_COMMENT_ROW_LEN:
msg = "Expected (ActivityEvent, Task, Board, Agent | None) rows"
raise TypeError(msg)
first, second, third, fourth = item
else:
try:
row_len = len(item)
first = item[0]
second = item[1]
third = item[2]
fourth = item[3]
except (IndexError, KeyError, TypeError):
msg = "Expected (ActivityEvent, Task, Board, Agent | None) rows"
raise TypeError(msg) from None
if row_len != TASK_COMMENT_ROW_LEN:
msg = "Expected (ActivityEvent, Task, Board, Agent | None) rows"
raise TypeError(msg)
if (
isinstance(first, ActivityEvent)
and isinstance(second, Task)
and isinstance(third, Board)
and (isinstance(fourth, Agent) or fourth is None)
):
rows.append((first, second, third, fourth))
continue
msg = "Expected (ActivityEvent, Task, Board, Agent | None) rows"
raise TypeError(msg)
return rows
def _coerce_activity_rows(
items: Sequence[Any],
) -> list[tuple[ActivityEvent, UUID | None, UUID | None]]:
rows: list[tuple[ActivityEvent, UUID | None, UUID | None]] = []
for item in items:
first: Any
second: Any
third: Any
if isinstance(item, tuple):
if len(item) != 3:
msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows"
raise TypeError(msg)
first, second, third = item
else:
try:
row_len = len(item)
first = item[0]
second = item[1]
third = item[2]
except (IndexError, KeyError, TypeError):
msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows"
raise TypeError(msg) from None
if row_len != 3:
msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows"
raise TypeError(msg)
if not isinstance(first, ActivityEvent):
msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows"
raise TypeError(msg)
if second is not None and not isinstance(second, UUID):
msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows"
raise TypeError(msg)
if third is not None and not isinstance(third, UUID):
msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows"
raise TypeError(msg)
rows.append((first, second, third))
return rows
async def _fetch_task_comment_events(
session: AsyncSession,
since: datetime,
*,
board_id: UUID | None = None,
) -> Sequence[tuple[ActivityEvent, Task, Board, Agent | None]]:
statement = (
select(ActivityEvent, Task, Board, Agent)
.join(Task, col(ActivityEvent.task_id) == col(Task.id))
.join(Board, col(Task.board_id) == col(Board.id))
.outerjoin(Agent, col(ActivityEvent.agent_id) == col(Agent.id))
.where(col(ActivityEvent.event_type) == "task.comment")
.where(col(ActivityEvent.created_at) >= since)
.where(func.length(func.trim(col(ActivityEvent.message))) > 0)
.order_by(asc(col(ActivityEvent.created_at)))
)
if board_id is not None:
statement = statement.where(col(Task.board_id) == board_id)
return _coerce_task_comment_rows(list(await session.exec(statement)))
def _ticker_source(event: ActivityEvent, agent: Agent | None) -> str:
if agent is not None:
return agent.name
parts = event.event_type.replace(".", " ").replace("_", " ").split()
return " ".join(p.capitalize() for p in parts)
def _runtime_source(session_key: str, session_label: str | None, model: str | None) -> str:
if session_label:
return session_label
if model:
m = model.lower()
if "claude" in m:
return "Claude"
if "codex" in m:
return "Codex"
if "gpt" in m or "openai" in m:
return "GPT"
if "gemini" in m or "google" in m:
return "Gemini"
return model
parts = session_key.split(":")
if len(parts) >= 2:
return parts[1].replace("-", " ").replace("_", " ").title()
return "Session"
async def _fetch_runtime_ticker_items(
gateways: list[Gateway],
cutoff: datetime,
) -> list[ActivityTickerItem]:
from app.services.openclaw.gateway_rpc import GatewayConfig # noqa: PLC0415
from app.services.openclaw.runtime_activity import fetch_recent_events # noqa: PLC0415
items: list[ActivityTickerItem] = []
for gw in gateways:
try:
config = GatewayConfig(
url=gw.url,
token=gw.token,
allow_insecure_tls=gw.allow_insecure_tls,
disable_device_pairing=gw.disable_device_pairing,
)
events = await asyncio.wait_for(
fetch_recent_events(config, max_sessions=10, history_limit=15),
timeout=3.0,
)
for ev in events:
if ev.role != "assistant":
continue
msg = ev.content_preview.strip()
if not msg:
continue
ts = ev.timestamp
if ts is None or ts < cutoff:
continue
items.append(
ActivityTickerItem(
id=uuid4(),
source=_runtime_source(ev.session_key, ev.session_label, ev.model),
message=msg[:200],
created_at=ts,
)
)
except Exception: # noqa: BLE001
pass
return items
def _parse_ts(ts: str | None) -> datetime | None:
if not ts:
return None
try:
return datetime.fromisoformat(ts.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None)
except ValueError:
return None
_BASH_CMD_MAX = 80
_TASK_DESC_MAX = 60
def _format_tool_status(tool_name: str, inp: dict[str, Any]) -> str:
"""Human-readable tool label — ported from pixel-agents-openclaw/transcriptParser.ts."""
def _base(p: Any) -> str:
return os.path.basename(str(p)) if p else ""
if tool_name == "Read":
return f"Reading {_base(inp.get('file_path', ''))}"
if tool_name == "Edit":
return f"Editing {_base(inp.get('file_path', ''))}"
if tool_name == "Write":
return f"Writing {_base(inp.get('file_path', ''))}"
if tool_name == "Bash":
cmd = str(inp.get("command", ""))
return f"Running: {cmd[:_BASH_CMD_MAX]}" if len(cmd) > _BASH_CMD_MAX else f"Running: {cmd}"
if tool_name == "Glob":
return "Searching files"
if tool_name == "Grep":
return "Searching code"
if tool_name == "WebFetch":
return "Fetching web content"
if tool_name == "WebSearch":
return "Searching the web"
if tool_name == "Task":
desc = str(inp.get("description", ""))
if desc:
return f"Subtask: {desc[:_TASK_DESC_MAX]}" if len(desc) > _TASK_DESC_MAX else f"Subtask: {desc}"
return "Running subtask"
if tool_name == "AskUserQuestion":
return "Waiting for input"
if tool_name == "EnterPlanMode":
return "Planning"
if tool_name == "NotebookEdit":
return "Editing notebook"
return f"Using {tool_name}"
def _tail_lines(path: Path, nbytes: int = 80_000) -> list[str]:
try:
with open(path, "rb") as fh:
fh.seek(0, 2)
size = fh.tell()
fh.seek(max(0, size - nbytes))
data = fh.read()
idx = data.find(b"\n")
if idx >= 0:
data = data[idx + 1:]
return data.decode("utf-8", errors="replace").splitlines()
except (OSError, PermissionError):
return []
def _extract_claude_ticker_items(path: Path, cutoff: datetime) -> list[ActivityTickerItem]:
items: list[ActivityTickerItem] = []
seen_msg_ids: set[str] = set()
for line in _tail_lines(path):
if not line.strip():
continue
try:
rec = json.loads(line)
except json.JSONDecodeError:
continue
if rec.get("type") != "assistant" or rec.get("isSidechain"):
continue
ts = _parse_ts(rec.get("timestamp"))
if ts is None or ts < cutoff:
continue
msg = rec.get("message") or {}
msg_id = msg.get("id")
if msg_id:
if msg_id in seen_msg_ids:
continue
seen_msg_ids.add(msg_id)
content = msg.get("content") if isinstance(msg.get("content"), list) else []
tool_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "tool_use"]
text_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "text"]
if tool_blocks:
# Emit one ticker item per tool call (most informative for live activity)
for block in tool_blocks:
tool_name = str(block.get("name") or "")
inp = block.get("input") or {}
if not isinstance(inp, dict):
inp = {}
label = _format_tool_status(tool_name, inp)
items.append(ActivityTickerItem(
id=uuid4(),
source="Claude Code",
message=label,
created_at=ts,
))
elif text_blocks:
# Text-only turn (no tools used) — show the response text
text = " ".join(
b.get("text", "").strip() for b in text_blocks if b.get("text", "").strip()
).strip()
if text:
items.append(ActivityTickerItem(
id=uuid4(),
source="Claude Code",
message=text[:200],
created_at=ts,
))
return items
def _extract_codex_ticker_items(path: Path, cutoff: datetime) -> list[ActivityTickerItem]:
items: list[ActivityTickerItem] = []
seen_turn_ids: set[str] = set()
for line in _tail_lines(path):
if not line.strip():
continue
try:
rec = json.loads(line)
except json.JSONDecodeError:
continue
ts = _parse_ts(rec.get("timestamp"))
if ts is None or ts < cutoff:
continue
payload = rec.get("payload") or {}
rec_type = rec.get("type")
if rec_type == "event_msg" and payload.get("type") == "task_complete":
turn_id = payload.get("turn_id") or ""
if turn_id and turn_id in seen_turn_ids:
continue
if turn_id:
seen_turn_ids.add(turn_id)
text = (payload.get("last_agent_message") or "").strip()
if text:
items.append(ActivityTickerItem(
id=uuid4(),
source="Codex",
message=text[:200],
created_at=ts,
))
elif rec_type == "response_item" and payload.get("role") == "assistant":
content = payload.get("content") or []
text = " ".join(
b.get("text", "").strip()
for b in content
if isinstance(b, dict) and b.get("type") == "output_text" and b.get("text", "").strip()
).strip()
if text:
items.append(ActivityTickerItem(
id=uuid4(),
source="Codex",
message=text[:200],
created_at=ts,
))
return items
def _fetch_local_session_items_sync(cutoff: datetime) -> list[ActivityTickerItem]:
items: list[ActivityTickerItem] = []
claude_root_env = os.environ.get("CLAUDE_PROJECTS_PATH", "").strip()
claude_root = Path(claude_root_env) if claude_root_env else Path.home() / ".claude" / "projects"
if claude_root.exists():
for path in claude_root.rglob("*.jsonl"):
try:
if datetime.utcfromtimestamp(path.stat().st_mtime) >= cutoff:
items.extend(_extract_claude_ticker_items(path, cutoff))
except (OSError, PermissionError):
pass
codex_root_env = os.environ.get("CODEX_SESSIONS_PATH", "").strip()
codex_root = Path(codex_root_env) if codex_root_env else Path.home() / ".codex" / "sessions"
if codex_root.exists():
for path in codex_root.rglob("*.jsonl"):
try:
if datetime.utcfromtimestamp(path.stat().st_mtime) >= cutoff:
items.extend(_extract_codex_ticker_items(path, cutoff))
except (OSError, PermissionError):
pass
return items
async def _fetch_local_session_ticker_items(cutoff: datetime) -> list[ActivityTickerItem]:
try:
return await asyncio.to_thread(_fetch_local_session_items_sync, cutoff)
except Exception: # noqa: BLE001
return []
@router.get("/ticker", response_model=list[ActivityTickerItem])
async def get_activity_ticker(
limit: int = Query(default=20, ge=1, le=50),
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> list[ActivityTickerItem]:
"""Return recent activity items shaped for the navbar ticker."""
board_ids = await list_accessible_board_ids(session, member=ctx.member, write=False)
cutoff = utcnow() - timedelta(minutes=15)
def _build_db_statement(since: datetime) -> Any:
stmt = (
select(ActivityEvent, Agent)
.outerjoin(Agent, col(ActivityEvent.agent_id) == col(Agent.id))
.outerjoin(Task, col(ActivityEvent.task_id) == col(Task.id))
.where(func.length(func.trim(col(ActivityEvent.message))) > 0)
.where(col(ActivityEvent.created_at) >= since)
.order_by(desc(col(ActivityEvent.created_at)))
.limit(limit)
)
if board_ids:
return stmt.where(
or_(
col(ActivityEvent.board_id).in_(board_ids),
and_(
col(ActivityEvent.board_id).is_(None),
col(Task.board_id).in_(board_ids),
),
)
)
return stmt.where(col(ActivityEvent.id).is_(None))
def _rows_to_items(rows: Any) -> list[ActivityTickerItem]:
result: list[ActivityTickerItem] = []
for row in rows:
event: ActivityEvent = row[0]
agent: Agent | None = row[1]
msg = (event.message or "").strip()
if not msg:
continue
result.append(ActivityTickerItem(
id=event.id,
source=_ticker_source(event, agent),
message=msg[:200],
created_at=event.created_at,
))
return result
rows = (await session.exec(_build_db_statement(cutoff))).all()
db_items = _rows_to_items(rows)
# If nothing recent, widen to 2 hours so the ticker is never blank
if not db_items:
fallback_cutoff = utcnow() - timedelta(hours=2)
rows = (await session.exec(_build_db_statement(fallback_cutoff))).all()
db_items = _rows_to_items(rows)
gw_rows = list((await session.exec(
select(Gateway).where(col(Gateway.organization_id) == ctx.organization.id)
)).all())
runtime_items, local_items = await asyncio.gather(
_fetch_runtime_ticker_items(gw_rows, cutoff),
_fetch_local_session_ticker_items(cutoff),
)
all_items = db_items + runtime_items + local_items
all_items.sort(key=lambda x: x.created_at, reverse=True)
return all_items[:limit]
@router.get("", response_model=DefaultLimitOffsetPage[ActivityEventRead])
async def list_activity(
session: AsyncSession = SESSION_DEP,
actor: ActorContext = ACTOR_DEP,
) -> LimitOffsetPage[ActivityEventRead]:
"""List activity events visible to the calling actor."""
statement: Any = select(
ActivityEvent,
col(ActivityEvent.board_id).label("event_board_id"),
col(Task.board_id).label("task_board_id"),
).outerjoin(Task, col(ActivityEvent.task_id) == col(Task.id))
if actor.actor_type == "agent" and actor.agent:
statement = statement.where(col(ActivityEvent.agent_id) == actor.agent.id)
elif actor.actor_type == "user" and actor.user:
member = await get_active_membership(session, actor.user)
if member is None:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
board_ids = await list_accessible_board_ids(session, member=member, write=False)
if not board_ids:
statement = statement.where(col(ActivityEvent.id).is_(None))
else:
statement = statement.where(
or_(
col(ActivityEvent.board_id).in_(board_ids),
and_(
col(ActivityEvent.board_id).is_(None),
col(Task.board_id).in_(board_ids),
),
),
)
statement = statement.order_by(desc(col(ActivityEvent.created_at)))
def _transform(items: Sequence[Any]) -> Sequence[Any]:
rows = _coerce_activity_rows(items)
events: list[ActivityEventRead] = []
for event, event_board_id, task_board_id in rows:
payload = ActivityEventRead.model_validate(event, from_attributes=True)
resolved_board_id = event_board_id or task_board_id
payload.board_id = resolved_board_id
route_name, route_params = _build_activity_route(
event=event,
board_id=resolved_board_id,
)
payload.route_name = route_name
payload.route_params = route_params
events.append(payload)
return events
return await paginate(session, statement, transformer=_transform)
@router.get(
"/task-comments",
response_model=DefaultLimitOffsetPage[ActivityTaskCommentFeedItemRead],
)
async def list_task_comment_feed(
board_id: UUID | None = BOARD_ID_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> LimitOffsetPage[ActivityTaskCommentFeedItemRead]:
"""List task-comment feed items for accessible boards."""
statement = (
select(ActivityEvent, Task, Board, Agent)
.join(Task, col(ActivityEvent.task_id) == col(Task.id))
.join(Board, col(Task.board_id) == col(Board.id))
.outerjoin(Agent, col(ActivityEvent.agent_id) == col(Agent.id))
.where(col(ActivityEvent.event_type) == "task.comment")
.where(func.length(func.trim(col(ActivityEvent.message))) > 0)
.order_by(desc(col(ActivityEvent.created_at)))
)
board_ids = await list_accessible_board_ids(session, member=ctx.member, write=False)
if board_id is not None:
if board_id not in set(board_ids):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
statement = statement.where(col(Task.board_id) == board_id)
elif board_ids:
statement = statement.where(col(Task.board_id).in_(board_ids))
else:
statement = statement.where(col(Task.id).is_(None))
def _transform(items: Sequence[Any]) -> Sequence[Any]:
rows = _coerce_task_comment_rows(items)
return [_feed_item(event, task, board, agent) for event, task, board, agent in rows]
return await paginate(session, statement, transformer=_transform)
@router.get("/task-comments/stream")
async def stream_task_comment_feed(
request: Request,
board_id: UUID | None = BOARD_ID_QUERY,
since: str | None = SINCE_QUERY,
db_session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> EventSourceResponse:
"""Stream task-comment events for accessible boards."""
since_dt = _parse_since(since) or utcnow()
board_ids = await list_accessible_board_ids(
db_session,
member=ctx.member,
write=False,
)
allowed_ids = set(board_ids)
if board_id is not None and board_id not in allowed_ids:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
seen_ids: set[UUID] = set()
seen_queue: deque[UUID] = deque()
async def event_generator() -> AsyncIterator[dict[str, str]]:
last_seen = since_dt
while True:
if await request.is_disconnected():
break
async with async_session_maker() as stream_session:
if board_id is not None:
rows = await _fetch_task_comment_events(
stream_session,
last_seen,
board_id=board_id,
)
elif allowed_ids:
rows = await _fetch_task_comment_events(stream_session, last_seen)
rows = [row for row in rows if row[1].board_id in allowed_ids]
else:
rows = []
for event, task, board, agent in rows:
event_id = event.id
if event_id in seen_ids:
continue
seen_ids.add(event_id)
seen_queue.append(event_id)
if len(seen_queue) > SSE_SEEN_MAX:
oldest = seen_queue.popleft()
seen_ids.discard(oldest)
last_seen = max(event.created_at, last_seen)
payload = {
"comment": _feed_item(
event,
task,
board,
agent,
).model_dump(mode="json"),
}
yield {"event": "comment", "data": json.dumps(payload)}
await asyncio.sleep(STREAM_POLL_SECONDS)
return EventSourceResponse(event_generator(), ping=15)