diff --git a/backend/app/api/agent_sessions.py b/backend/app/api/agent_sessions.py new file mode 100644 index 0000000..99519cb --- /dev/null +++ b/backend/app/api/agent_sessions.py @@ -0,0 +1,37 @@ +"""Provider-neutral agent session source endpoints.""" + +from __future__ import annotations + +import asyncio +from datetime import UTC, datetime + +from fastapi import APIRouter, Depends + +from app.api.deps import require_org_member +from app.schemas.agent_sessions import AgentSessionSourceRead, AgentSessionSourcesResponse +from app.services import agent_session_sources +from app.services.organizations import OrganizationContext + +router = APIRouter(prefix="/agent-sessions", tags=["agent-sessions"]) +ORG_MEMBER_DEP = Depends(require_org_member) + + +@router.get( + "/sources", + response_model=AgentSessionSourcesResponse, + summary="List available agent session sources", + description=( + "Returns source cards for Claude Code, Codex CLI, and OpenAI API session " + "history. OpenAI API history is only marked available once Pipeline has an " + "owned local event source." + ), +) +async def list_sources( + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> AgentSessionSourcesResponse: + sources = await asyncio.to_thread(agent_session_sources.list_sources) + scanned_at = datetime.now(UTC).replace(tzinfo=None) + return AgentSessionSourcesResponse( + sources=[AgentSessionSourceRead(**source) for source in sources], + last_scanned_at=scanned_at, + ) diff --git a/backend/app/api/codex_sessions.py b/backend/app/api/codex_sessions.py new file mode 100644 index 0000000..906f2bf --- /dev/null +++ b/backend/app/api/codex_sessions.py @@ -0,0 +1,217 @@ +"""Codex CLI session API endpoints.""" + +from __future__ import annotations + +import asyncio + +from fastapi import APIRouter, Depends, HTTPException, Query, status + +from app.api.deps import require_org_member +from app.schemas.agent_sessions import ( + AgentSessionListResponse, + AgentSessionRead, + AgentSessionStatsRead, + CommandEntry, + FileEntry, + SessionMessage, + SessionMessagesResponse, + SessionTokensRead, + SessionTokenUsageRead, + TextBlock, + ThinkingBlock, + ToolAnalyticsResponse, + ToolUseBlock, +) +from app.services import codex_session_reader as reader +from app.services.organizations import OrganizationContext + +router = APIRouter(prefix="/codex", tags=["codex"]) +ORG_MEMBER_DEP = Depends(require_org_member) + + +def _session_to_read(session: reader.CodexSession) -> AgentSessionRead: + return AgentSessionRead( + session_id=session.session_id, + source="codex_cli", + provider_label=session.provider_label, + project_dir=session.project_dir, + cwd=session.cwd, + title=session.title, + models=session.models, + tokens=SessionTokensRead( + input=session.tokens.input, + output=session.tokens.output, + cache_read=session.tokens.cache_read, + cache_write=session.tokens.cache_write, + total=session.tokens.total, + ), + cost_usd=session.cost_usd, + billing_source=session.billing_source, + message_count=session.message_count, + first_message_at=session.first_message_at, + last_message_at=session.last_message_at, + is_active=session.is_active, + entrypoints=session.entrypoints, + git_branch=session.git_branch, + version=session.version, + ) + + +def _message_to_read(message: reader.ParsedMessage) -> SessionMessage: + return SessionMessage( + uuid=message.uuid, + role=message.role, + timestamp=message.timestamp, + text_blocks=[ + TextBlock(text=block.text, truncated=block.truncated) for block in message.text_blocks + ], + thinking_blocks=[ + ThinkingBlock(text=block.text, truncated=block.truncated) + for block in message.thinking_blocks + ], + tool_uses=[ + ToolUseBlock( + tool_use_id=tool.tool_use_id, + tool_name=tool.tool_name, + input=tool.input, + input_truncated=tool.input_truncated, + result=tool.result, + result_truncated=tool.result_truncated, + is_error=tool.is_error, + ) + for tool in message.tool_uses + ], + model=message.model, + tokens=( + SessionTokenUsageRead( + input=message.tokens.input, + output=message.tokens.output, + cache_read=message.tokens.cache_read, + cache_write=message.tokens.cache_write, + ) + if message.tokens + else None + ), + ) + + +@router.get( + "/sessions", + response_model=AgentSessionListResponse, + summary="List local Codex CLI sessions", + description=( + "Reads local Codex CLI JSONL session history from ~/.codex/sessions, or " + "CODEX_SESSIONS_PATH when configured. Missing history returns an empty " + "source-unavailable response rather than an error." + ), +) +async def list_sessions( + project: str | None = Query(None, description="Filter by project directory name substring"), + active_only: bool = Query(False, description="Return only currently active sessions"), + limit: int = Query(100, ge=1, le=500, description="Maximum sessions to return"), + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> AgentSessionListResponse: + sessions = await asyncio.to_thread( + reader.list_sessions, + project_filter=project, + active_only=active_only, + limit=limit, + ) + stats = reader.session_stats(sessions) + source = await asyncio.to_thread(reader.source_metadata) + return AgentSessionListResponse( + sessions=[_session_to_read(session) for session in sessions], + total=len(sessions), + stats=AgentSessionStatsRead( + session_count=stats["session_count"], + active_sessions=stats["active_sessions"], + total_tokens=stats["total_tokens"], + total_cost_usd=stats["total_cost_usd"], + models=stats["models"], + ), + source="codex_cli", + provider_label=reader.PROVIDER_LABEL, + source_status=source.source_status, + source_path=source.source_path, + last_scanned_at=source.last_scanned_at, + unavailable_reason=source.unavailable_reason, + setup_hint=source.setup_hint, + ) + + +@router.get( + "/sessions/{session_id}", + response_model=AgentSessionRead, + summary="Get a single Codex CLI session", +) +async def get_session( + session_id: str, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> AgentSessionRead: + session = await asyncio.to_thread(reader.get_session, session_id) + if session is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") + return _session_to_read(session) + + +@router.get( + "/sessions/{session_id}/messages", + response_model=SessionMessagesResponse, + summary="Get conversation messages for a Codex CLI session", +) +async def get_session_messages( + session_id: str, + limit: int = Query(200, ge=1, le=500, description="Max messages to return"), + offset: int = Query(0, ge=0, description="Number of messages to skip"), + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> SessionMessagesResponse: + result = await asyncio.to_thread(reader.get_session_messages, session_id, limit, offset) + source = await asyncio.to_thread(reader.source_metadata) + if result is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") + + messages, total = result + return SessionMessagesResponse( + session_id=session_id, + source="codex_cli", + source_status=source.source_status, + source_path=source.source_path, + last_scanned_at=source.last_scanned_at, + messages=[_message_to_read(message) for message in messages], + total=total, + has_more=(offset + limit) < total, + ) + + +@router.get( + "/analytics/tools", + response_model=ToolAnalyticsResponse, + summary="Aggregate tool-use statistics across Codex CLI sessions", +) +async def get_tool_analytics( + project: str | None = Query(None, description="Filter by project directory name substring"), + days: int = Query(30, ge=1, le=365, description="Number of days to look back"), + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> ToolAnalyticsResponse: + data = await asyncio.to_thread(reader.get_tool_analytics, project, days) + source = await asyncio.to_thread(reader.source_metadata) + return ToolAnalyticsResponse( + tool_counts=data["tool_counts"], + top_files_read=[ + FileEntry(path=entry["path"], count=entry["count"]) for entry in data["top_files_read"] + ], + top_files_written=[ + FileEntry(path=entry["path"], count=entry["count"]) + for entry in data["top_files_written"] + ], + top_commands=[ + CommandEntry(command=entry["command"], count=entry["count"]) + for entry in data["top_commands"] + ], + session_count=data["session_count"], + date_range_days=data["date_range_days"], + source="codex_cli", + source_status=source.source_status, + source_path=source.source_path, + last_scanned_at=source.last_scanned_at, + ) diff --git a/backend/app/db/session.py b/backend/app/db/session.py index b872e66..1d19023 100644 --- a/backend/app/db/session.py +++ b/backend/app/db/session.py @@ -74,8 +74,7 @@ def _warn_on_schema_drift() -> None: from sqlalchemy import inspect as sa_inspect sync_url = ( - settings.database_url - .replace("postgresql+asyncpg://", "postgresql+psycopg://") + settings.database_url.replace("postgresql+asyncpg://", "postgresql+psycopg://") .replace("postgresql://", "postgresql+psycopg://") .replace("postgres://", "postgresql+psycopg://") ) @@ -88,7 +87,8 @@ def _warn_on_schema_drift() -> None: inspector = sa_inspect(engine) except Exception as exc: logger.error("schema_drift_check_failed: %s", str(exc)) - engine.dispose() if "engine" in dir() else None # type: ignore[name-defined] + if "engine" in locals(): + engine.dispose() return missing: list[str] = [] diff --git a/backend/app/main.py b/backend/app/main.py index df289c1..7de42a6 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -14,6 +14,7 @@ from fastapi_pagination import add_pagination from app.api.activity import router as activity_router from app.api.agent import router as agent_router from app.api.agent_forgejo import router as agent_forgejo_router +from app.api.agent_sessions import router as agent_sessions_router from app.api.agents import router as agents_router from app.api.approvals import router as approvals_router from app.api.auth import router as auth_router @@ -24,27 +25,28 @@ from app.api.board_onboarding import router as board_onboarding_router from app.api.board_repository_links import router as board_repository_links_router from app.api.board_webhooks import router as board_webhooks_router from app.api.boards import router as boards_router +from app.api.claude_code import router as claude_code_router +from app.api.codex_sessions import router as codex_sessions_router from app.api.forgejo_connections import router as forgejo_connections_router -from app.api.provider_credentials import router as provider_credentials_router from app.api.forgejo_issues import router as forgejo_issues_router from app.api.forgejo_metrics import router as forgejo_metrics_router from app.api.forgejo_repositories import router as forgejo_repositories_router from app.api.forgejo_webhooks import router as forgejo_webhooks_router from app.api.gateway import router as gateway_router -from app.api.claude_code import router as claude_code_router from app.api.gateways import router as gateways_router from app.api.metrics import router as metrics_router from app.api.organizations import router as organizations_router +from app.api.provider_credentials import router as provider_credentials_router from app.api.skills_marketplace import router as skills_marketplace_router from app.api.souls_directory import router as souls_directory_router from app.api.tags import router as tags_router from app.api.task_custom_fields import router as task_custom_fields_router from app.api.tasks import router as tasks_router from app.api.users import router as users_router +from app.core.auth_mode import AuthMode from app.core.config import settings from app.core.error_handling import install_error_handling from app.core.logging import configure_logging, get_logger -from app.core.auth_mode import AuthMode from app.core.rate_limit import validate_rate_limit_redis from app.core.rate_limit_backend import RateLimitBackend from app.core.security_headers import SecurityHeadersMiddleware @@ -78,6 +80,14 @@ OPENAPI_TAGS = [ "name": "activity", "description": "Activity feed and audit timeline endpoints across boards and operations.", }, + { + "name": "agent-sessions", + "description": "Provider-neutral local agent session source discovery endpoints.", + }, + { + "name": "codex", + "description": "Local Codex CLI session, message, and tool analytics endpoints.", + }, { "name": "gateways", "description": "Gateway management, synchronization, and runtime control operations.", @@ -192,6 +202,8 @@ _JSON_SCHEMA_REF_PREFIX = "#/components/schemas/" _OPENAPI_EXAMPLE_TAGS = { "agents", "activity", + "agent-sessions", + "codex", "gateways", "metrics", "organizations", @@ -480,10 +492,9 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: # (VS Code extension login/logout events). credential_watcher_task = None try: - from scripts.seed_provider_credentials import ( - seed as seed_providers, - watch as watch_providers, - ) + from scripts.seed_provider_credentials import seed as seed_providers + from scripts.seed_provider_credentials import watch as watch_providers + changed = await seed_providers(verbose=False) if changed: logger.info("app.lifecycle.provider_credentials seeded count=%d", changed) @@ -599,6 +610,8 @@ api_v1.include_router(auth_router) api_v1.include_router(agent_router) api_v1.include_router(agents_router) api_v1.include_router(activity_router) +api_v1.include_router(agent_sessions_router) +api_v1.include_router(codex_sessions_router) api_v1.include_router(forgejo_connections_router) api_v1.include_router(forgejo_issues_router) api_v1.include_router(forgejo_metrics_router) diff --git a/backend/app/schemas/agent_sessions.py b/backend/app/schemas/agent_sessions.py new file mode 100644 index 0000000..70388fa --- /dev/null +++ b/backend/app/schemas/agent_sessions.py @@ -0,0 +1,149 @@ +"""Provider-neutral schemas for local agent session sources.""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any, Literal + +from sqlmodel import SQLModel + +AgentSessionSource = Literal["claude_code", "codex_cli", "openai_api"] +AgentSourceStatus = Literal["available", "unavailable", "unsupported"] + + +class TextBlock(SQLModel): + text: str + truncated: bool + + +class ThinkingBlock(SQLModel): + text: str + truncated: bool + + +class ToolUseBlock(SQLModel): + tool_use_id: str + tool_name: str + input: dict[str, Any] + input_truncated: bool + result: str | None = None + result_truncated: bool = False + is_error: bool = False + + +class SessionTokenUsageRead(SQLModel): + input: int + output: int + cache_read: int + cache_write: int + + +class SessionMessage(SQLModel): + uuid: str + role: str + timestamp: datetime | None = None + text_blocks: list[TextBlock] + thinking_blocks: list[ThinkingBlock] + tool_uses: list[ToolUseBlock] + model: str | None = None + tokens: SessionTokenUsageRead | None = None + + +class SessionMessagesResponse(SQLModel): + session_id: str + source: AgentSessionSource + source_status: AgentSourceStatus + source_path: str | None = None + last_scanned_at: datetime | None = None + messages: list[SessionMessage] + total: int + has_more: bool + + +class SessionTokensRead(SQLModel): + input: int + output: int + cache_read: int + cache_write: int + total: int + + +class AgentSessionRead(SQLModel): + session_id: str + source: AgentSessionSource + provider_label: str + project_dir: str + cwd: str | None = None + title: str | None = None + models: list[str] + tokens: SessionTokensRead + cost_usd: float + billing_source: str + message_count: int + first_message_at: datetime | None = None + last_message_at: datetime | None = None + is_active: bool + entrypoints: list[str] + git_branch: str | None = None + version: str | None = None + + +class AgentSessionStatsRead(SQLModel): + session_count: int + active_sessions: int + total_tokens: int + total_cost_usd: float + models: list[str] + + +class AgentSessionListResponse(SQLModel): + sessions: list[AgentSessionRead] + total: int + stats: AgentSessionStatsRead + source: AgentSessionSource + provider_label: str + source_status: AgentSourceStatus + source_path: str | None = None + last_scanned_at: datetime | None = None + unavailable_reason: str | None = None + setup_hint: str | None = None + + +class FileEntry(SQLModel): + path: str + count: int + + +class CommandEntry(SQLModel): + command: str + count: int + + +class ToolAnalyticsResponse(SQLModel): + tool_counts: dict[str, int] + top_files_read: list[FileEntry] + top_files_written: list[FileEntry] + top_commands: list[CommandEntry] + session_count: int + date_range_days: int + source: AgentSessionSource | None = None + source_status: AgentSourceStatus | None = None + source_path: str | None = None + last_scanned_at: datetime | None = None + + +class AgentSessionSourceRead(SQLModel): + source: AgentSessionSource + provider_label: str + source_status: AgentSourceStatus + source_path: str | None = None + session_count: int + last_activity_at: datetime | None = None + last_scanned_at: datetime | None = None + unavailable_reason: str | None = None + setup_hint: str | None = None + + +class AgentSessionSourcesResponse(SQLModel): + sources: list[AgentSessionSourceRead] + last_scanned_at: datetime diff --git a/backend/app/services/agent_session_sources.py b/backend/app/services/agent_session_sources.py new file mode 100644 index 0000000..3a84011 --- /dev/null +++ b/backend/app/services/agent_session_sources.py @@ -0,0 +1,87 @@ +"""Source discovery helpers for local agent session providers.""" + +from __future__ import annotations + +import os +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from app.services import codex_session_reader + + +def _claude_projects_dir() -> Path: + override = os.environ.get("CLAUDE_PROJECTS_PATH", "").strip() + if override: + return Path(override) + return Path.home() / ".claude" / "projects" + + +def _claude_source_card(scanned_at: datetime) -> dict[str, Any]: + root = _claude_projects_dir() + files = ( + sorted(root.rglob("*.jsonl"), key=lambda path: path.stat().st_mtime, reverse=True) + if root.exists() + else [] + ) + last_activity = None + if files: + last_activity = datetime.fromtimestamp(files[0].stat().st_mtime, UTC).replace(tzinfo=None) + return { + "source": "claude_code", + "provider_label": "Claude Code", + "source_status": "available" if files else "unavailable", + "source_path": str(root), + "session_count": len(files), + "last_activity_at": last_activity, + "last_scanned_at": scanned_at, + "unavailable_reason": ( + None + if files + else "Claude Code session history was not found or contains no readable sessions." + ), + "setup_hint": ( + None + if files + else "Run Claude Code locally, or set CLAUDE_PROJECTS_PATH to a readable projects directory." + ), + } + + +def list_sources() -> list[dict[str, Any]]: + """Return availability cards for supported agent session sources.""" + scanned_at = datetime.now(UTC).replace(tzinfo=None) + + codex = codex_session_reader.source_metadata() + + openai_reason = "Pipeline does not yet have an owned OpenAI API session event source." + openai_hint = ( + "Connect gateway logs, stored API traces, an import file, or a future collector before " + "OpenAI API history can be shown." + ) + + return [ + _claude_source_card(scanned_at), + { + "source": codex.source, + "provider_label": codex.provider_label, + "source_status": codex.source_status, + "source_path": codex.source_path, + "session_count": codex.session_count, + "last_activity_at": codex.last_activity_at, + "last_scanned_at": codex.last_scanned_at, + "unavailable_reason": codex.unavailable_reason, + "setup_hint": codex.setup_hint, + }, + { + "source": "openai_api", + "provider_label": "OpenAI API", + "source_status": "unavailable", + "source_path": None, + "session_count": 0, + "last_activity_at": None, + "last_scanned_at": scanned_at, + "unavailable_reason": openai_reason, + "setup_hint": openai_hint, + }, + ] diff --git a/backend/app/services/codex_session_reader.py b/backend/app/services/codex_session_reader.py new file mode 100644 index 0000000..a6ae53c --- /dev/null +++ b/backend/app/services/codex_session_reader.py @@ -0,0 +1,765 @@ +"""Reader for local Codex CLI session history. + +The reader only scans the discovered or explicitly configured sessions root. +It never reads Codex credential files and treats session logs as sensitive by +redacting likely secrets before returning tool inputs or outputs. +""" + +from __future__ import annotations + +import json +import os +import re +import shutil +from dataclasses import dataclass, field +from datetime import UTC, datetime, timedelta +from pathlib import Path +from typing import Any + +from app.core.logging import get_logger + +logger = get_logger(__name__) + +ACTIVE_WINDOW_MINUTES = 30 +CONTENT_TRUNCATE = 4000 +INPUT_VALUE_TRUNCATE = 2000 + +SOURCE = "codex_cli" +PROVIDER_LABEL = "Codex CLI" + + +@dataclass +class SessionTokens: + input: int = 0 + output: int = 0 + cache_read: int = 0 + cache_write: int = 0 + + @property + def total(self) -> int: + return self.input + self.output + self.cache_read + self.cache_write + + +@dataclass +class CodexSession: + session_id: str + source: str + provider_label: str + project_dir: str + cwd: str | None + title: str | None + models: list[str] + tokens: SessionTokens + cost_usd: float + billing_source: str + message_count: int + first_message_at: datetime | None + last_message_at: datetime | None + is_active: bool + entrypoints: list[str] + git_branch: str | None + version: str | None + path: Path + + +@dataclass +class SessionTextBlock: + text: str + truncated: bool + + +@dataclass +class SessionThinkingBlock: + text: str + truncated: bool + + +@dataclass +class SessionToolUse: + tool_use_id: str + tool_name: str + input: dict[str, Any] + input_truncated: bool + result: str | None + result_truncated: bool + is_error: bool + + +@dataclass +class SessionTokenUsage: + input: int + output: int + cache_read: int + cache_write: int + + +@dataclass +class ParsedMessage: + uuid: str + role: str + timestamp: datetime | None + text_blocks: list[SessionTextBlock] + thinking_blocks: list[SessionThinkingBlock] + tool_uses: list[SessionToolUse] + model: str | None + tokens: SessionTokenUsage | None + + +@dataclass +class SourceMetadata: + source: str = SOURCE + provider_label: str = PROVIDER_LABEL + source_status: str = "unavailable" + source_path: str | None = None + session_count: int = 0 + last_activity_at: datetime | None = None + last_scanned_at: datetime = field( + default_factory=lambda: datetime.now(UTC).replace(tzinfo=None) + ) + unavailable_reason: str | None = None + setup_hint: str | None = None + + +def _sessions_root() -> Path: + override = os.environ.get("CODEX_SESSIONS_PATH", "").strip() + if override: + return Path(override).expanduser() + return Path.home() / ".codex" / "sessions" + + +def _parse_iso(ts: str | None) -> datetime | None: + if not ts: + return None + try: + return ( + datetime.fromisoformat(ts.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None) + ) + except ValueError: + return None + + +def _is_relative_to(path: Path, root: Path) -> bool: + try: + path.resolve().relative_to(root.resolve()) + return True + except (OSError, ValueError): + return False + + +def _iter_session_files(root: Path | None = None) -> list[Path]: + sessions_root = root or _sessions_root() + if not sessions_root.exists() or not sessions_root.is_dir(): + return [] + + files: list[Path] = [] + for path in sessions_root.rglob("*.jsonl"): + if not path.is_file() or not _is_relative_to(path, sessions_root): + continue + files.append(path) + return sorted(files, key=lambda p: p.stat().st_mtime, reverse=True) + + +def _read_records(path: Path) -> list[dict[str, Any]]: + records: list[dict[str, Any]] = [] + try: + with open(path, encoding="utf-8", errors="replace") as fh: + for raw_line in fh: + raw_line = raw_line.strip() + if not raw_line: + continue + try: + rec = json.loads(raw_line) + except json.JSONDecodeError: + continue + if isinstance(rec, dict): + records.append(rec) + except (OSError, PermissionError) as exc: + logger.debug("codex_session_reader.read_error path=%s error=%s", path, exc) + return records + + +def _payload(rec: dict[str, Any]) -> dict[str, Any]: + payload = rec.get("payload") + return payload if isinstance(payload, dict) else {} + + +_SENSITIVE_KEY_RE = re.compile( + r"(api[_-]?key|authorization|bearer|cookie|credential|headers?|password|secret|token)", + re.I, +) +_SECRET_VALUE_RE = re.compile( + r"(?i)\b(" + r"sk-[a-z0-9_-]{16,}|" + r"gh[pousr]_[a-z0-9_]{16,}|" + r"bearer\s+[a-z0-9._~+/=-]{12,}|" + r"xox[baprs]-[a-z0-9-]{12,}" + r")\b" +) +_ENV_ASSIGNMENT_RE = re.compile( + r"(?i)\b([a-z_]*(?:api[_-]?key|password|secret|token)[a-z_]*)=([^\s]+)" +) +_URL_SECRET_QUERY_RE = re.compile( + r"(?i)([?&](?:api[_-]?key|access_token|auth|password|secret|token)=)[^&#\s]+" +) + + +def _redact_text(value: str) -> str: + redacted = _SECRET_VALUE_RE.sub("[REDACTED]", value) + redacted = _ENV_ASSIGNMENT_RE.sub(lambda m: f"{m.group(1)}=[REDACTED]", redacted) + redacted = _URL_SECRET_QUERY_RE.sub(lambda m: f"{m.group(1)}[REDACTED]", redacted) + redacted = re.sub(r"(https?://)([^/@\s]+):([^/@\s]+)@", r"\1[REDACTED]@", redacted) + return redacted + + +def _redact_value(value: Any, key: str = "") -> Any: + if _SENSITIVE_KEY_RE.search(key): + return "[REDACTED]" + if isinstance(value, str): + return _redact_text(value) + if isinstance(value, dict): + return {str(k): _redact_value(v, str(k)) for k, v in value.items()} + if isinstance(value, list): + return [_redact_value(item, key) for item in value] + return value + + +def _trunc(text: str, limit: int = CONTENT_TRUNCATE) -> tuple[str, bool]: + text = _redact_text(text) + if len(text) <= limit: + return text, False + return text[:limit], True + + +def _trunc_input(input_dict: dict[str, Any]) -> tuple[dict[str, Any], bool]: + redacted = _redact_value(input_dict) + truncated = False + + def _truncate_value(value: Any) -> Any: + nonlocal truncated + if isinstance(value, str) and len(value) > INPUT_VALUE_TRUNCATE: + truncated = True + return value[:INPUT_VALUE_TRUNCATE] + if isinstance(value, dict): + return {str(k): _truncate_value(v) for k, v in value.items()} + if isinstance(value, list): + return [_truncate_value(item) for item in value] + return value + + output = _truncate_value(redacted) + return output if isinstance(output, dict) else {}, truncated + + +def _decode_arguments(raw: Any) -> tuple[dict[str, Any], bool]: + if isinstance(raw, dict): + return raw, False + if isinstance(raw, str): + try: + decoded = json.loads(raw) + if isinstance(decoded, dict): + return decoded, False + except json.JSONDecodeError: + return {"value": raw}, False + return {"value": decoded}, False + return {}, False + + +def _content_blocks(content: Any, *, output: bool) -> list[SessionTextBlock]: + if isinstance(content, str): + text, truncated = _trunc(content) + return [SessionTextBlock(text=text, truncated=truncated)] if text.strip() else [] + if not isinstance(content, list): + return [] + + block_types = {"output_text", "text"} if output else {"input_text", "text"} + blocks: list[SessionTextBlock] = [] + for block in content: + if not isinstance(block, dict) or block.get("type") not in block_types: + continue + raw_text = block.get("text") + if not isinstance(raw_text, str): + continue + text, truncated = _trunc(raw_text) + if text.strip(): + blocks.append(SessionTextBlock(text=text, truncated=truncated)) + return blocks + + +def _reasoning_text(payload: dict[str, Any]) -> str: + content = payload.get("content") + if isinstance(content, str) and content.strip(): + return content + if isinstance(content, list): + parts: list[str] = [] + for item in content: + if isinstance(item, dict): + text = item.get("text") or item.get("summary") + if isinstance(text, str) and text.strip(): + parts.append(text) + elif isinstance(item, str) and item.strip(): + parts.append(item) + if parts: + return "\n".join(parts) + summary = payload.get("summary") + if isinstance(summary, str): + return summary + if isinstance(summary, list): + return "\n".join(str(item) for item in summary if str(item).strip()) + return "" + + +def _usage_from_token_count(payload: dict[str, Any], key: str) -> SessionTokenUsage | None: + info = payload.get("info") + usage = info.get(key) if isinstance(info, dict) else None + if not isinstance(usage, dict): + return None + return SessionTokenUsage( + input=int(usage.get("input_tokens") or 0), + output=int(usage.get("output_tokens") or 0), + cache_read=int(usage.get("cached_input_tokens") or 0), + cache_write=0, + ) + + +def _session_tokens_from_token_count(payload: dict[str, Any]) -> SessionTokens | None: + usage = _usage_from_token_count(payload, "total_token_usage") + if usage is None: + return None + return SessionTokens( + input=usage.input, + output=usage.output, + cache_read=usage.cache_read, + cache_write=usage.cache_write, + ) + + +def _project_name(cwd: str | None, fallback: str) -> str: + if cwd: + name = Path(cwd).name + if name: + return name + return fallback + + +def _title_from_records(records: list[dict[str, Any]]) -> str | None: + for rec in records: + payload = _payload(rec) + if payload.get("type") != "message" or payload.get("role") != "user": + continue + blocks = _content_blocks(payload.get("content"), output=False) + for block in blocks: + text = " ".join(block.text.split()) + if text: + return text[:80] + return None + + +def _parse_session_file(path: Path, title_hint: str | None = None) -> CodexSession | None: + records = _read_records(path) + if not records: + return None + + session_id = path.stem + cwd: str | None = None + version: str | None = None + git_branch: str | None = None + models: set[str] = set() + entrypoints: set[str] = {"codex-cli"} + tokens = SessionTokens() + first_ts: datetime | None = None + last_ts: datetime | None = None + message_count = 0 + + for rec in records: + ts = _parse_iso(rec.get("timestamp") if isinstance(rec.get("timestamp"), str) else None) + if ts: + first_ts = ts if first_ts is None or ts < first_ts else first_ts + last_ts = ts if last_ts is None or ts > last_ts else last_ts + + rec_type = rec.get("type") + payload = _payload(rec) + payload_type = payload.get("type") + + if rec_type == "session_meta": + if isinstance(payload.get("id"), str): + session_id = payload["id"] + cwd = payload.get("cwd") if isinstance(payload.get("cwd"), str) else cwd + version = ( + payload.get("cli_version") + if isinstance(payload.get("cli_version"), str) + else version + ) + source = payload.get("source") + if isinstance(source, str) and source.strip(): + entrypoints.add(source.strip()) + git = payload.get("git") + if isinstance(git, dict) and isinstance(git.get("branch"), str): + git_branch = git["branch"] + + elif rec_type == "turn_context": + cwd = payload.get("cwd") if isinstance(payload.get("cwd"), str) else cwd + model = payload.get("model") + if isinstance(model, str) and model.strip(): + models.add(model.strip()) + + elif rec_type == "response_item" and payload_type == "message": + if payload.get("role") == "assistant": + message_count += 1 + + elif rec_type == "event_msg" and payload_type == "token_count": + latest = _session_tokens_from_token_count(payload) + if latest is not None: + tokens = latest + + if message_count == 0 and first_ts is None: + return None + + now = datetime.now(UTC).replace(tzinfo=None) + is_active = bool(last_ts and (now - last_ts) < timedelta(minutes=ACTIVE_WINDOW_MINUTES)) + project_dir = _project_name(cwd, path.parent.name) + + return CodexSession( + session_id=session_id, + source=SOURCE, + provider_label=PROVIDER_LABEL, + project_dir=project_dir, + cwd=cwd, + title=title_hint or _title_from_records(records), + models=sorted(models), + tokens=tokens, + cost_usd=0.0, + billing_source="subscription", + message_count=message_count, + first_message_at=first_ts, + last_message_at=last_ts, + is_active=is_active, + entrypoints=sorted(entrypoints), + git_branch=git_branch, + version=version, + path=path, + ) + + +def source_metadata() -> SourceMetadata: + root = _sessions_root() + files = _iter_session_files(root) + installed = shutil.which("codex") is not None + meta = SourceMetadata(source_path=str(root)) + + if not root.exists(): + meta.unavailable_reason = "Codex session history directory was not found." + meta.setup_hint = ( + "Install and run Codex CLI, or set CODEX_SESSIONS_PATH to a readable history directory." + ) + return meta + if not root.is_dir(): + meta.unavailable_reason = "CODEX_SESSIONS_PATH does not point to a directory." + meta.setup_hint = "Set CODEX_SESSIONS_PATH to the Codex CLI sessions directory." + return meta + if not files: + meta.unavailable_reason = ( + "Codex CLI is installed but no readable session history exists." + if installed + else "No readable Codex session history exists." + ) + meta.setup_hint = ( + "Start a Codex CLI session, or set CODEX_SESSIONS_PATH for nonstandard installs." + ) + return meta + + meta.source_status = "available" + meta.session_count = len(files) + latest_session = None + for path in files[:20]: + session = _parse_session_file(path) + if session and session.last_message_at: + if latest_session is None or session.last_message_at > latest_session: + latest_session = session.last_message_at + meta.last_activity_at = latest_session + return meta + + +def list_sessions( + *, + project_filter: str | None = None, + active_only: bool = False, + limit: int = 200, +) -> list[CodexSession]: + root = _sessions_root() + sessions: list[CodexSession] = [] + + for path in _iter_session_files(root): + session = _parse_session_file(path) + if session is None: + continue + if project_filter and project_filter.lower() not in session.project_dir.lower(): + continue + if active_only and not session.is_active: + continue + sessions.append(session) + if len(sessions) >= limit: + break + return sessions + + +def get_session(session_id: str) -> CodexSession | None: + root = _sessions_root() + for path in _iter_session_files(root): + session = _parse_session_file(path) + if session and session.session_id == session_id: + return session + return None + + +def _find_session_path(session_id: str) -> Path | None: + root = _sessions_root() + for path in _iter_session_files(root): + if path.stem == session_id: + return path + session = _parse_session_file(path) + if session and session.session_id == session_id: + return path + return None + + +def get_session_messages( + session_id: str, + limit: int = 200, + offset: int = 0, +) -> tuple[list[ParsedMessage], int] | None: + path = _find_session_path(session_id) + if path is None: + return None + + records = _read_records(path) + if not records: + return None + + call_results: dict[str, tuple[str, bool, bool]] = {} + for rec in records: + payload = _payload(rec) + if payload.get("type") != "function_call_output": + continue + call_id = payload.get("call_id") + if not isinstance(call_id, str): + continue + output = payload.get("output") + text = output if isinstance(output, str) else json.dumps(output, default=str) + result_text, result_truncated = _trunc(text) + call_results[call_id] = (result_text, False, result_truncated) + + parsed: list[ParsedMessage] = [] + last_assistant: ParsedMessage | None = None + current_model: str | None = None + + def _assistant_placeholder(uuid: str, ts: datetime | None) -> ParsedMessage: + msg = ParsedMessage( + uuid=uuid, + role="assistant", + timestamp=ts, + text_blocks=[], + thinking_blocks=[], + tool_uses=[], + model=current_model, + tokens=None, + ) + parsed.append(msg) + return msg + + for idx, rec in enumerate(records): + ts = _parse_iso(rec.get("timestamp") if isinstance(rec.get("timestamp"), str) else None) + payload = _payload(rec) + payload_type = payload.get("type") + + if rec.get("type") == "turn_context": + model = payload.get("model") + if isinstance(model, str): + current_model = model + continue + + if rec.get("type") == "response_item" and payload_type == "message": + role = payload.get("role") + if role == "user": + text_blocks = _content_blocks(payload.get("content"), output=False) + if text_blocks: + parsed.append( + ParsedMessage( + uuid=f"{path.stem}:user:{idx}", + role="user", + timestamp=ts, + text_blocks=text_blocks, + thinking_blocks=[], + tool_uses=[], + model=None, + tokens=None, + ) + ) + elif role == "assistant": + text_blocks = _content_blocks(payload.get("content"), output=True) + if text_blocks: + last_assistant = ParsedMessage( + uuid=f"{path.stem}:assistant:{idx}", + role="assistant", + timestamp=ts, + text_blocks=text_blocks, + thinking_blocks=[], + tool_uses=[], + model=current_model, + tokens=None, + ) + parsed.append(last_assistant) + continue + + if rec.get("type") != "response_item" and rec.get("type") != "event_msg": + continue + + if payload_type == "reasoning": + text = _reasoning_text(payload) + if not text.strip(): + continue + if last_assistant is None: + last_assistant = _assistant_placeholder(f"{path.stem}:assistant:{idx}", ts) + thinking, truncated = _trunc(text) + last_assistant.thinking_blocks.append( + SessionThinkingBlock(text=thinking, truncated=truncated) + ) + + elif payload_type in {"function_call", "web_search_call"}: + if last_assistant is None: + last_assistant = _assistant_placeholder(f"{path.stem}:assistant:{idx}", ts) + call_id = payload.get("call_id") + if not isinstance(call_id, str): + call_id = f"{path.stem}:tool:{idx}" + raw_input, input_was_truncated = _decode_arguments(payload.get("arguments")) + tool_input, input_truncated = _trunc_input(raw_input) + result = call_results.get(call_id) + last_assistant.tool_uses.append( + SessionToolUse( + tool_use_id=call_id, + tool_name=str(payload.get("name") or payload_type), + input=tool_input, + input_truncated=input_truncated or input_was_truncated, + result=result[0] if result else None, + result_truncated=result[2] if result else False, + is_error=result[1] if result else False, + ) + ) + + elif payload_type == "token_count": + usage = _usage_from_token_count(payload, "last_token_usage") + if usage is not None and last_assistant is not None: + last_assistant.tokens = usage + + total = len(parsed) + return parsed[offset : offset + limit], total + + +_FILE_READ_TOOLS = {"read_file", "view_image", "open", "screenshot"} +_FILE_WRITE_TOOLS = {"apply_patch", "imagegen"} + + +def _bash_binary(command: str) -> str | None: + cmd = command.strip().lstrip("!").strip() + if not cmd: + return None + parts = cmd.split() + while parts and "=" in parts[0] and not parts[0].startswith(("=", "./", "/")): + parts.pop(0) + if not parts: + return None + first = parts[0] + binary = first.lstrip("./").rsplit("/", 1)[-1] + return binary or None + + +def _extract_patch_files(value: Any) -> list[str]: + if not isinstance(value, str): + return [] + files: list[str] = [] + for line in value.splitlines(): + prefix = None + for candidate in ("*** Update File: ", "*** Add File: ", "*** Delete File: "): + if line.startswith(candidate): + prefix = candidate + break + if prefix: + files.append(line[len(prefix) :].strip()) + return files + + +def get_tool_analytics(project_filter: str | None = None, days: int = 30) -> dict[str, Any]: + root = _sessions_root() + cutoff = datetime.now(UTC).replace(tzinfo=None) - timedelta(days=days) + tool_counts: dict[str, int] = {} + files_read: dict[str, int] = {} + files_written: dict[str, int] = {} + commands: dict[str, int] = {} + session_count = 0 + + for session in list_sessions(project_filter=project_filter, limit=1000): + try: + mtime = datetime.fromtimestamp(session.path.stat().st_mtime, UTC).replace(tzinfo=None) + except OSError: + continue + if mtime < cutoff: + continue + + result = get_session_messages(session.session_id, limit=1000, offset=0) + if result is None: + continue + messages, _ = result + session_had_tools = False + for message in messages: + for tool in message.tool_uses: + name = tool.tool_name + tool_counts[name] = tool_counts.get(name, 0) + 1 + session_had_tools = True + inp = tool.input + + path_value = inp.get("path") or inp.get("file_path") + if name in _FILE_READ_TOOLS and isinstance(path_value, str): + files_read[path_value] = files_read.get(path_value, 0) + 1 + if name in _FILE_WRITE_TOOLS and isinstance(path_value, str): + files_written[path_value] = files_written.get(path_value, 0) + 1 + + if name == "exec_command": + cmd = inp.get("cmd") + if isinstance(cmd, str): + binary = _bash_binary(cmd) + if binary: + commands[binary] = commands.get(binary, 0) + 1 + + if name == "apply_patch": + for file_path in _extract_patch_files(inp.get("value") or inp.get("patch")): + files_written[file_path] = files_written.get(file_path, 0) + 1 + + if session_had_tools: + session_count += 1 + + def _top(counter: dict[str, int], key: str) -> list[dict[str, Any]]: + return [ + {key: item, "count": count} + for item, count in sorted(counter.items(), key=lambda pair: pair[1], reverse=True)[:20] + ] + + return { + "tool_counts": dict(sorted(tool_counts.items(), key=lambda pair: pair[1], reverse=True)), + "top_files_read": _top(files_read, "path"), + "top_files_written": _top(files_written, "path"), + "top_commands": _top(commands, "command"), + "session_count": session_count, + "date_range_days": days, + "source_path": str(root), + } + + +def session_stats(sessions: list[CodexSession]) -> dict[str, Any]: + models: set[str] = set() + for session in sessions: + models.update(session.models) + return { + "session_count": len(sessions), + "active_sessions": sum(1 for session in sessions if session.is_active), + "total_tokens": sum(session.tokens.total for session in sessions), + "total_cost_usd": round(sum(session.cost_usd for session in sessions), 6), + "models": sorted(models), + } diff --git a/backend/tests/test_codex_session_reader.py b/backend/tests/test_codex_session_reader.py new file mode 100644 index 0000000..76f8a3b --- /dev/null +++ b/backend/tests/test_codex_session_reader.py @@ -0,0 +1,176 @@ +# ruff: noqa: INP001 +"""Tests for local Codex CLI session parsing.""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +from app.services import codex_session_reader as reader + + +def _write_jsonl(path: Path, records: list[dict]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + "\n".join(json.dumps(record) for record in records) + "\n", + encoding="utf-8", + ) + + +def _fixture_records(session_id: str = "session-1") -> list[dict]: + return [ + { + "timestamp": "2026-05-20T12:00:00Z", + "type": "session_meta", + "payload": { + "id": session_id, + "cwd": "/work/pipeline", + "cli_version": "0.133.0", + "source": "vscode", + "git": {"branch": "feature/codex"}, + }, + }, + { + "timestamp": "2026-05-20T12:00:01Z", + "type": "turn_context", + "payload": {"type": "turn_context", "cwd": "/work/pipeline", "model": "gpt-5.5"}, + }, + { + "timestamp": "2026-05-20T12:00:02Z", + "type": "response_item", + "payload": { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "Please inspect app.py"}], + }, + }, + { + "timestamp": "2026-05-20T12:00:03Z", + "type": "response_item", + "payload": { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "I will inspect it."}], + }, + }, + { + "timestamp": "2026-05-20T12:00:04Z", + "type": "response_item", + "payload": { + "type": "function_call", + "call_id": "call-1", + "name": "exec_command", + "arguments": json.dumps( + { + "cmd": "OPENAI_API_KEY=sk-testsecret1234567890 pytest", + "headers": {"Authorization": "Bearer abcdef1234567890"}, + "url": "https://example.test/path?token=secret-token", + } + ), + }, + }, + { + "timestamp": "2026-05-20T12:00:05Z", + "type": "response_item", + "payload": { + "type": "function_call_output", + "call_id": "call-1", + "output": "ok with sk-outputsecret1234567890", + }, + }, + { + "timestamp": "2026-05-20T12:00:06Z", + "type": "response_item", + "payload": { + "type": "function_call", + "call_id": "call-2", + "name": "apply_patch", + "arguments": "*** Begin Patch\n*** Update File: app.py\n+print('x')\n*** End Patch", + }, + }, + { + "timestamp": "2026-05-20T12:00:07Z", + "type": "event_msg", + "payload": { + "type": "token_count", + "info": { + "last_token_usage": { + "input_tokens": 10, + "cached_input_tokens": 4, + "output_tokens": 3, + }, + "total_token_usage": { + "input_tokens": 100, + "cached_input_tokens": 40, + "output_tokens": 30, + }, + }, + }, + }, + ] + + +def test_codex_sessions_parse_normalized_shape(tmp_path: Path, monkeypatch) -> None: + root = tmp_path / "sessions" + _write_jsonl(root / "2026/05/20/rollout-test.jsonl", _fixture_records()) + monkeypatch.setenv("CODEX_SESSIONS_PATH", str(root)) + + sessions = reader.list_sessions() + + assert len(sessions) == 1 + session = sessions[0] + assert session.session_id == "session-1" + assert session.source == "codex_cli" + assert session.provider_label == "Codex CLI" + assert session.project_dir == "pipeline" + assert session.models == ["gpt-5.5"] + assert session.tokens.total == 170 + assert session.git_branch == "feature/codex" + assert session.entrypoints == ["codex-cli", "vscode"] + + +def test_codex_messages_redact_and_paginate(tmp_path: Path, monkeypatch) -> None: + root = tmp_path / "sessions" + _write_jsonl(root / "rollout-test.jsonl", _fixture_records()) + monkeypatch.setenv("CODEX_SESSIONS_PATH", str(root)) + + page = reader.get_session_messages("session-1", limit=1, offset=1) + + assert page is not None + messages, total = page + assert total == 2 + assert len(messages) == 1 + assistant = messages[0] + assert assistant.role == "assistant" + assert assistant.tokens is not None + assert assistant.tokens.input == 10 + assert assistant.tool_uses[0].input["cmd"] == "OPENAI_API_KEY=[REDACTED] pytest" + assert assistant.tool_uses[0].input["headers"] == "[REDACTED]" + assert assistant.tool_uses[0].input["url"] == "https://example.test/path?token=[REDACTED]" + assert assistant.tool_uses[0].result == "ok with [REDACTED]" + + +def test_codex_unavailable_source_is_not_error(tmp_path: Path, monkeypatch) -> None: + missing = tmp_path / "missing" + monkeypatch.setenv("CODEX_SESSIONS_PATH", str(missing)) + + assert reader.list_sessions() == [] + metadata = reader.source_metadata() + assert metadata.source_status == "unavailable" + assert "not found" in (metadata.unavailable_reason or "") + + +def test_codex_tool_analytics(tmp_path: Path, monkeypatch) -> None: + root = tmp_path / "sessions" + session_path = root / "rollout-test.jsonl" + _write_jsonl(session_path, _fixture_records()) + os.utime(session_path, None) + monkeypatch.setenv("CODEX_SESSIONS_PATH", str(root)) + + analytics = reader.get_tool_analytics(days=30) + + assert analytics["tool_counts"] == {"exec_command": 1, "apply_patch": 1} + assert analytics["top_commands"] == [{"command": "pytest", "count": 1}] + assert analytics["top_files_written"] == [{"path": "app.py", "count": 1}] + assert analytics["session_count"] == 1 diff --git a/backend/tests/test_codex_sessions_api.py b/backend/tests/test_codex_sessions_api.py new file mode 100644 index 0000000..c23887a --- /dev/null +++ b/backend/tests/test_codex_sessions_api.py @@ -0,0 +1,168 @@ +# ruff: noqa: INP001 +"""API tests for Codex session endpoints.""" + +from __future__ import annotations + +import json +from pathlib import Path +from uuid import uuid4 + +import pytest +from fastapi import APIRouter, FastAPI +from httpx import ASGITransport, AsyncClient + +from app.api.agent_sessions import router as agent_sessions_router +from app.api.codex_sessions import router as codex_router +from app.services.organizations import OrganizationContext + + +def _write_jsonl(path: Path, records: list[dict]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + "\n".join(json.dumps(record) for record in records) + "\n", + encoding="utf-8", + ) + + +def _records() -> list[dict]: + return [ + { + "timestamp": "2026-05-20T12:00:00Z", + "type": "session_meta", + "payload": {"id": "api-session", "cwd": "/work/pipeline", "source": "terminal"}, + }, + { + "timestamp": "2026-05-20T12:00:01Z", + "type": "turn_context", + "payload": {"model": "gpt-5.5", "cwd": "/work/pipeline"}, + }, + { + "timestamp": "2026-05-20T12:00:02Z", + "type": "response_item", + "payload": { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "hello"}], + }, + }, + { + "timestamp": "2026-05-20T12:00:03Z", + "type": "response_item", + "payload": { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "hi"}], + }, + }, + { + "timestamp": "2026-05-20T12:00:04Z", + "type": "response_item", + "payload": { + "type": "function_call", + "call_id": "call-api", + "name": "exec_command", + "arguments": json.dumps({"cmd": "npm test"}), + }, + }, + { + "timestamp": "2026-05-20T12:00:05Z", + "type": "event_msg", + "payload": { + "type": "token_count", + "info": { + "last_token_usage": {"input_tokens": 1, "output_tokens": 2}, + "total_token_usage": {"input_tokens": 3, "output_tokens": 4}, + }, + }, + }, + ] + + +def _build_app() -> FastAPI: + app = FastAPI() + api_v1 = APIRouter(prefix="/api/v1") + api_v1.include_router(codex_router) + api_v1.include_router(agent_sessions_router) + app.include_router(api_v1) + + async def _override_org_member() -> OrganizationContext: + from app.models.organization_members import OrganizationMember + from app.models.organizations import Organization + + org = Organization(id=uuid4(), name="test-org") + member = OrganizationMember(organization_id=org.id, user_id=uuid4(), role="admin") + return OrganizationContext(organization=org, member=member) + + from app.api.deps import require_org_member + + app.dependency_overrides[require_org_member] = _override_org_member + return app + + +@pytest.mark.asyncio +async def test_codex_sessions_endpoints(tmp_path: Path, monkeypatch) -> None: + root = tmp_path / "sessions" + _write_jsonl(root / "2026/05/20/rollout-api.jsonl", _records()) + monkeypatch.setenv("CODEX_SESSIONS_PATH", str(root)) + + async with AsyncClient( + transport=ASGITransport(app=_build_app()), base_url="http://test" + ) as client: + list_response = await client.get("/api/v1/codex/sessions") + assert list_response.status_code == 200 + list_data = list_response.json() + assert list_data["source"] == "codex_cli" + assert list_data["source_status"] == "available" + assert list_data["total"] == 1 + assert list_data["sessions"][0]["session_id"] == "api-session" + assert list_data["sessions"][0]["provider_label"] == "Codex CLI" + + messages_response = await client.get("/api/v1/codex/sessions/api-session/messages") + assert messages_response.status_code == 200 + messages_data = messages_response.json() + assert messages_data["source"] == "codex_cli" + assert messages_data["total"] == 2 + assert messages_data["messages"][1]["tool_uses"][0]["tool_name"] == "exec_command" + + analytics_response = await client.get("/api/v1/codex/analytics/tools?days=30") + assert analytics_response.status_code == 200 + assert analytics_response.json()["top_commands"] == [{"command": "npm", "count": 1}] + + +@pytest.mark.asyncio +async def test_missing_codex_history_returns_source_unavailable( + tmp_path: Path, monkeypatch +) -> None: + monkeypatch.setenv("CODEX_SESSIONS_PATH", str(tmp_path / "missing")) + + async with AsyncClient( + transport=ASGITransport(app=_build_app()), base_url="http://test" + ) as client: + response = await client.get("/api/v1/codex/sessions") + assert response.status_code == 200 + data = response.json() + assert data["sessions"] == [] + assert data["source_status"] == "unavailable" + assert data["unavailable_reason"] + + +@pytest.mark.asyncio +async def test_agent_session_sources_include_openai_unavailable( + tmp_path: Path, monkeypatch +) -> None: + root = tmp_path / "sessions" + _write_jsonl(root / "rollout-api.jsonl", _records()) + monkeypatch.setenv("CODEX_SESSIONS_PATH", str(root)) + monkeypatch.setenv("CLAUDE_PROJECTS_PATH", str(tmp_path / "claude-missing")) + + async with AsyncClient( + transport=ASGITransport(app=_build_app()), base_url="http://test" + ) as client: + response = await client.get("/api/v1/agent-sessions/sources") + assert response.status_code == 200 + sources = {source["source"]: source for source in response.json()["sources"]} + assert sources["codex_cli"]["source_status"] == "available" + assert sources["openai_api"]["source_status"] == "unavailable" + assert ( + "owned OpenAI API session event source" in sources["openai_api"]["unavailable_reason"] + )