feat: add schemas and services for agent session management

- Introduced provider-neutral schemas for local agent session sources in `agent_sessions.py`.
- Implemented source discovery helpers for local agent session providers in `agent_session_sources.py`.
- Created a reader for local Codex CLI session history in `codex_session_reader.py`, including parsing and redaction of sensitive information.
- Added tests for Codex session parsing and API endpoints to ensure functionality and data integrity.
This commit is contained in:
null 2026-05-24 18:07:35 -05:00
parent 55a99ac9fd
commit fe6d9f219a
9 changed files with 1622 additions and 10 deletions

View File

@ -0,0 +1,37 @@
"""Provider-neutral agent session source endpoints."""
from __future__ import annotations
import asyncio
from datetime import UTC, datetime
from fastapi import APIRouter, Depends
from app.api.deps import require_org_member
from app.schemas.agent_sessions import AgentSessionSourceRead, AgentSessionSourcesResponse
from app.services import agent_session_sources
from app.services.organizations import OrganizationContext
router = APIRouter(prefix="/agent-sessions", tags=["agent-sessions"])
ORG_MEMBER_DEP = Depends(require_org_member)
@router.get(
"/sources",
response_model=AgentSessionSourcesResponse,
summary="List available agent session sources",
description=(
"Returns source cards for Claude Code, Codex CLI, and OpenAI API session "
"history. OpenAI API history is only marked available once Pipeline has an "
"owned local event source."
),
)
async def list_sources(
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> AgentSessionSourcesResponse:
sources = await asyncio.to_thread(agent_session_sources.list_sources)
scanned_at = datetime.now(UTC).replace(tzinfo=None)
return AgentSessionSourcesResponse(
sources=[AgentSessionSourceRead(**source) for source in sources],
last_scanned_at=scanned_at,
)

View File

@ -0,0 +1,217 @@
"""Codex CLI session API endpoints."""
from __future__ import annotations
import asyncio
from fastapi import APIRouter, Depends, HTTPException, Query, status
from app.api.deps import require_org_member
from app.schemas.agent_sessions import (
AgentSessionListResponse,
AgentSessionRead,
AgentSessionStatsRead,
CommandEntry,
FileEntry,
SessionMessage,
SessionMessagesResponse,
SessionTokensRead,
SessionTokenUsageRead,
TextBlock,
ThinkingBlock,
ToolAnalyticsResponse,
ToolUseBlock,
)
from app.services import codex_session_reader as reader
from app.services.organizations import OrganizationContext
router = APIRouter(prefix="/codex", tags=["codex"])
ORG_MEMBER_DEP = Depends(require_org_member)
def _session_to_read(session: reader.CodexSession) -> AgentSessionRead:
return AgentSessionRead(
session_id=session.session_id,
source="codex_cli",
provider_label=session.provider_label,
project_dir=session.project_dir,
cwd=session.cwd,
title=session.title,
models=session.models,
tokens=SessionTokensRead(
input=session.tokens.input,
output=session.tokens.output,
cache_read=session.tokens.cache_read,
cache_write=session.tokens.cache_write,
total=session.tokens.total,
),
cost_usd=session.cost_usd,
billing_source=session.billing_source,
message_count=session.message_count,
first_message_at=session.first_message_at,
last_message_at=session.last_message_at,
is_active=session.is_active,
entrypoints=session.entrypoints,
git_branch=session.git_branch,
version=session.version,
)
def _message_to_read(message: reader.ParsedMessage) -> SessionMessage:
return SessionMessage(
uuid=message.uuid,
role=message.role,
timestamp=message.timestamp,
text_blocks=[
TextBlock(text=block.text, truncated=block.truncated) for block in message.text_blocks
],
thinking_blocks=[
ThinkingBlock(text=block.text, truncated=block.truncated)
for block in message.thinking_blocks
],
tool_uses=[
ToolUseBlock(
tool_use_id=tool.tool_use_id,
tool_name=tool.tool_name,
input=tool.input,
input_truncated=tool.input_truncated,
result=tool.result,
result_truncated=tool.result_truncated,
is_error=tool.is_error,
)
for tool in message.tool_uses
],
model=message.model,
tokens=(
SessionTokenUsageRead(
input=message.tokens.input,
output=message.tokens.output,
cache_read=message.tokens.cache_read,
cache_write=message.tokens.cache_write,
)
if message.tokens
else None
),
)
@router.get(
"/sessions",
response_model=AgentSessionListResponse,
summary="List local Codex CLI sessions",
description=(
"Reads local Codex CLI JSONL session history from ~/.codex/sessions, or "
"CODEX_SESSIONS_PATH when configured. Missing history returns an empty "
"source-unavailable response rather than an error."
),
)
async def list_sessions(
project: str | None = Query(None, description="Filter by project directory name substring"),
active_only: bool = Query(False, description="Return only currently active sessions"),
limit: int = Query(100, ge=1, le=500, description="Maximum sessions to return"),
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> AgentSessionListResponse:
sessions = await asyncio.to_thread(
reader.list_sessions,
project_filter=project,
active_only=active_only,
limit=limit,
)
stats = reader.session_stats(sessions)
source = await asyncio.to_thread(reader.source_metadata)
return AgentSessionListResponse(
sessions=[_session_to_read(session) for session in sessions],
total=len(sessions),
stats=AgentSessionStatsRead(
session_count=stats["session_count"],
active_sessions=stats["active_sessions"],
total_tokens=stats["total_tokens"],
total_cost_usd=stats["total_cost_usd"],
models=stats["models"],
),
source="codex_cli",
provider_label=reader.PROVIDER_LABEL,
source_status=source.source_status,
source_path=source.source_path,
last_scanned_at=source.last_scanned_at,
unavailable_reason=source.unavailable_reason,
setup_hint=source.setup_hint,
)
@router.get(
"/sessions/{session_id}",
response_model=AgentSessionRead,
summary="Get a single Codex CLI session",
)
async def get_session(
session_id: str,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> AgentSessionRead:
session = await asyncio.to_thread(reader.get_session, session_id)
if session is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
return _session_to_read(session)
@router.get(
"/sessions/{session_id}/messages",
response_model=SessionMessagesResponse,
summary="Get conversation messages for a Codex CLI session",
)
async def get_session_messages(
session_id: str,
limit: int = Query(200, ge=1, le=500, description="Max messages to return"),
offset: int = Query(0, ge=0, description="Number of messages to skip"),
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> SessionMessagesResponse:
result = await asyncio.to_thread(reader.get_session_messages, session_id, limit, offset)
source = await asyncio.to_thread(reader.source_metadata)
if result is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
messages, total = result
return SessionMessagesResponse(
session_id=session_id,
source="codex_cli",
source_status=source.source_status,
source_path=source.source_path,
last_scanned_at=source.last_scanned_at,
messages=[_message_to_read(message) for message in messages],
total=total,
has_more=(offset + limit) < total,
)
@router.get(
"/analytics/tools",
response_model=ToolAnalyticsResponse,
summary="Aggregate tool-use statistics across Codex CLI sessions",
)
async def get_tool_analytics(
project: str | None = Query(None, description="Filter by project directory name substring"),
days: int = Query(30, ge=1, le=365, description="Number of days to look back"),
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ToolAnalyticsResponse:
data = await asyncio.to_thread(reader.get_tool_analytics, project, days)
source = await asyncio.to_thread(reader.source_metadata)
return ToolAnalyticsResponse(
tool_counts=data["tool_counts"],
top_files_read=[
FileEntry(path=entry["path"], count=entry["count"]) for entry in data["top_files_read"]
],
top_files_written=[
FileEntry(path=entry["path"], count=entry["count"])
for entry in data["top_files_written"]
],
top_commands=[
CommandEntry(command=entry["command"], count=entry["count"])
for entry in data["top_commands"]
],
session_count=data["session_count"],
date_range_days=data["date_range_days"],
source="codex_cli",
source_status=source.source_status,
source_path=source.source_path,
last_scanned_at=source.last_scanned_at,
)

View File

@ -74,8 +74,7 @@ def _warn_on_schema_drift() -> None:
from sqlalchemy import inspect as sa_inspect from sqlalchemy import inspect as sa_inspect
sync_url = ( sync_url = (
settings.database_url settings.database_url.replace("postgresql+asyncpg://", "postgresql+psycopg://")
.replace("postgresql+asyncpg://", "postgresql+psycopg://")
.replace("postgresql://", "postgresql+psycopg://") .replace("postgresql://", "postgresql+psycopg://")
.replace("postgres://", "postgresql+psycopg://") .replace("postgres://", "postgresql+psycopg://")
) )
@ -88,7 +87,8 @@ def _warn_on_schema_drift() -> None:
inspector = sa_inspect(engine) inspector = sa_inspect(engine)
except Exception as exc: except Exception as exc:
logger.error("schema_drift_check_failed: %s", str(exc)) logger.error("schema_drift_check_failed: %s", str(exc))
engine.dispose() if "engine" in dir() else None # type: ignore[name-defined] if "engine" in locals():
engine.dispose()
return return
missing: list[str] = [] missing: list[str] = []

View File

@ -14,6 +14,7 @@ from fastapi_pagination import add_pagination
from app.api.activity import router as activity_router from app.api.activity import router as activity_router
from app.api.agent import router as agent_router from app.api.agent import router as agent_router
from app.api.agent_forgejo import router as agent_forgejo_router from app.api.agent_forgejo import router as agent_forgejo_router
from app.api.agent_sessions import router as agent_sessions_router
from app.api.agents import router as agents_router from app.api.agents import router as agents_router
from app.api.approvals import router as approvals_router from app.api.approvals import router as approvals_router
from app.api.auth import router as auth_router from app.api.auth import router as auth_router
@ -24,27 +25,28 @@ from app.api.board_onboarding import router as board_onboarding_router
from app.api.board_repository_links import router as board_repository_links_router from app.api.board_repository_links import router as board_repository_links_router
from app.api.board_webhooks import router as board_webhooks_router from app.api.board_webhooks import router as board_webhooks_router
from app.api.boards import router as boards_router from app.api.boards import router as boards_router
from app.api.claude_code import router as claude_code_router
from app.api.codex_sessions import router as codex_sessions_router
from app.api.forgejo_connections import router as forgejo_connections_router from app.api.forgejo_connections import router as forgejo_connections_router
from app.api.provider_credentials import router as provider_credentials_router
from app.api.forgejo_issues import router as forgejo_issues_router from app.api.forgejo_issues import router as forgejo_issues_router
from app.api.forgejo_metrics import router as forgejo_metrics_router from app.api.forgejo_metrics import router as forgejo_metrics_router
from app.api.forgejo_repositories import router as forgejo_repositories_router from app.api.forgejo_repositories import router as forgejo_repositories_router
from app.api.forgejo_webhooks import router as forgejo_webhooks_router from app.api.forgejo_webhooks import router as forgejo_webhooks_router
from app.api.gateway import router as gateway_router from app.api.gateway import router as gateway_router
from app.api.claude_code import router as claude_code_router
from app.api.gateways import router as gateways_router from app.api.gateways import router as gateways_router
from app.api.metrics import router as metrics_router from app.api.metrics import router as metrics_router
from app.api.organizations import router as organizations_router from app.api.organizations import router as organizations_router
from app.api.provider_credentials import router as provider_credentials_router
from app.api.skills_marketplace import router as skills_marketplace_router from app.api.skills_marketplace import router as skills_marketplace_router
from app.api.souls_directory import router as souls_directory_router from app.api.souls_directory import router as souls_directory_router
from app.api.tags import router as tags_router from app.api.tags import router as tags_router
from app.api.task_custom_fields import router as task_custom_fields_router from app.api.task_custom_fields import router as task_custom_fields_router
from app.api.tasks import router as tasks_router from app.api.tasks import router as tasks_router
from app.api.users import router as users_router from app.api.users import router as users_router
from app.core.auth_mode import AuthMode
from app.core.config import settings from app.core.config import settings
from app.core.error_handling import install_error_handling from app.core.error_handling import install_error_handling
from app.core.logging import configure_logging, get_logger from app.core.logging import configure_logging, get_logger
from app.core.auth_mode import AuthMode
from app.core.rate_limit import validate_rate_limit_redis from app.core.rate_limit import validate_rate_limit_redis
from app.core.rate_limit_backend import RateLimitBackend from app.core.rate_limit_backend import RateLimitBackend
from app.core.security_headers import SecurityHeadersMiddleware from app.core.security_headers import SecurityHeadersMiddleware
@ -78,6 +80,14 @@ OPENAPI_TAGS = [
"name": "activity", "name": "activity",
"description": "Activity feed and audit timeline endpoints across boards and operations.", "description": "Activity feed and audit timeline endpoints across boards and operations.",
}, },
{
"name": "agent-sessions",
"description": "Provider-neutral local agent session source discovery endpoints.",
},
{
"name": "codex",
"description": "Local Codex CLI session, message, and tool analytics endpoints.",
},
{ {
"name": "gateways", "name": "gateways",
"description": "Gateway management, synchronization, and runtime control operations.", "description": "Gateway management, synchronization, and runtime control operations.",
@ -192,6 +202,8 @@ _JSON_SCHEMA_REF_PREFIX = "#/components/schemas/"
_OPENAPI_EXAMPLE_TAGS = { _OPENAPI_EXAMPLE_TAGS = {
"agents", "agents",
"activity", "activity",
"agent-sessions",
"codex",
"gateways", "gateways",
"metrics", "metrics",
"organizations", "organizations",
@ -480,10 +492,9 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]:
# (VS Code extension login/logout events). # (VS Code extension login/logout events).
credential_watcher_task = None credential_watcher_task = None
try: try:
from scripts.seed_provider_credentials import ( from scripts.seed_provider_credentials import seed as seed_providers
seed as seed_providers, from scripts.seed_provider_credentials import watch as watch_providers
watch as watch_providers,
)
changed = await seed_providers(verbose=False) changed = await seed_providers(verbose=False)
if changed: if changed:
logger.info("app.lifecycle.provider_credentials seeded count=%d", changed) logger.info("app.lifecycle.provider_credentials seeded count=%d", changed)
@ -599,6 +610,8 @@ api_v1.include_router(auth_router)
api_v1.include_router(agent_router) api_v1.include_router(agent_router)
api_v1.include_router(agents_router) api_v1.include_router(agents_router)
api_v1.include_router(activity_router) api_v1.include_router(activity_router)
api_v1.include_router(agent_sessions_router)
api_v1.include_router(codex_sessions_router)
api_v1.include_router(forgejo_connections_router) api_v1.include_router(forgejo_connections_router)
api_v1.include_router(forgejo_issues_router) api_v1.include_router(forgejo_issues_router)
api_v1.include_router(forgejo_metrics_router) api_v1.include_router(forgejo_metrics_router)

View File

@ -0,0 +1,149 @@
"""Provider-neutral schemas for local agent session sources."""
from __future__ import annotations
from datetime import datetime
from typing import Any, Literal
from sqlmodel import SQLModel
AgentSessionSource = Literal["claude_code", "codex_cli", "openai_api"]
AgentSourceStatus = Literal["available", "unavailable", "unsupported"]
class TextBlock(SQLModel):
text: str
truncated: bool
class ThinkingBlock(SQLModel):
text: str
truncated: bool
class ToolUseBlock(SQLModel):
tool_use_id: str
tool_name: str
input: dict[str, Any]
input_truncated: bool
result: str | None = None
result_truncated: bool = False
is_error: bool = False
class SessionTokenUsageRead(SQLModel):
input: int
output: int
cache_read: int
cache_write: int
class SessionMessage(SQLModel):
uuid: str
role: str
timestamp: datetime | None = None
text_blocks: list[TextBlock]
thinking_blocks: list[ThinkingBlock]
tool_uses: list[ToolUseBlock]
model: str | None = None
tokens: SessionTokenUsageRead | None = None
class SessionMessagesResponse(SQLModel):
session_id: str
source: AgentSessionSource
source_status: AgentSourceStatus
source_path: str | None = None
last_scanned_at: datetime | None = None
messages: list[SessionMessage]
total: int
has_more: bool
class SessionTokensRead(SQLModel):
input: int
output: int
cache_read: int
cache_write: int
total: int
class AgentSessionRead(SQLModel):
session_id: str
source: AgentSessionSource
provider_label: str
project_dir: str
cwd: str | None = None
title: str | None = None
models: list[str]
tokens: SessionTokensRead
cost_usd: float
billing_source: str
message_count: int
first_message_at: datetime | None = None
last_message_at: datetime | None = None
is_active: bool
entrypoints: list[str]
git_branch: str | None = None
version: str | None = None
class AgentSessionStatsRead(SQLModel):
session_count: int
active_sessions: int
total_tokens: int
total_cost_usd: float
models: list[str]
class AgentSessionListResponse(SQLModel):
sessions: list[AgentSessionRead]
total: int
stats: AgentSessionStatsRead
source: AgentSessionSource
provider_label: str
source_status: AgentSourceStatus
source_path: str | None = None
last_scanned_at: datetime | None = None
unavailable_reason: str | None = None
setup_hint: str | None = None
class FileEntry(SQLModel):
path: str
count: int
class CommandEntry(SQLModel):
command: str
count: int
class ToolAnalyticsResponse(SQLModel):
tool_counts: dict[str, int]
top_files_read: list[FileEntry]
top_files_written: list[FileEntry]
top_commands: list[CommandEntry]
session_count: int
date_range_days: int
source: AgentSessionSource | None = None
source_status: AgentSourceStatus | None = None
source_path: str | None = None
last_scanned_at: datetime | None = None
class AgentSessionSourceRead(SQLModel):
source: AgentSessionSource
provider_label: str
source_status: AgentSourceStatus
source_path: str | None = None
session_count: int
last_activity_at: datetime | None = None
last_scanned_at: datetime | None = None
unavailable_reason: str | None = None
setup_hint: str | None = None
class AgentSessionSourcesResponse(SQLModel):
sources: list[AgentSessionSourceRead]
last_scanned_at: datetime

View File

@ -0,0 +1,87 @@
"""Source discovery helpers for local agent session providers."""
from __future__ import annotations
import os
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from app.services import codex_session_reader
def _claude_projects_dir() -> Path:
override = os.environ.get("CLAUDE_PROJECTS_PATH", "").strip()
if override:
return Path(override)
return Path.home() / ".claude" / "projects"
def _claude_source_card(scanned_at: datetime) -> dict[str, Any]:
root = _claude_projects_dir()
files = (
sorted(root.rglob("*.jsonl"), key=lambda path: path.stat().st_mtime, reverse=True)
if root.exists()
else []
)
last_activity = None
if files:
last_activity = datetime.fromtimestamp(files[0].stat().st_mtime, UTC).replace(tzinfo=None)
return {
"source": "claude_code",
"provider_label": "Claude Code",
"source_status": "available" if files else "unavailable",
"source_path": str(root),
"session_count": len(files),
"last_activity_at": last_activity,
"last_scanned_at": scanned_at,
"unavailable_reason": (
None
if files
else "Claude Code session history was not found or contains no readable sessions."
),
"setup_hint": (
None
if files
else "Run Claude Code locally, or set CLAUDE_PROJECTS_PATH to a readable projects directory."
),
}
def list_sources() -> list[dict[str, Any]]:
"""Return availability cards for supported agent session sources."""
scanned_at = datetime.now(UTC).replace(tzinfo=None)
codex = codex_session_reader.source_metadata()
openai_reason = "Pipeline does not yet have an owned OpenAI API session event source."
openai_hint = (
"Connect gateway logs, stored API traces, an import file, or a future collector before "
"OpenAI API history can be shown."
)
return [
_claude_source_card(scanned_at),
{
"source": codex.source,
"provider_label": codex.provider_label,
"source_status": codex.source_status,
"source_path": codex.source_path,
"session_count": codex.session_count,
"last_activity_at": codex.last_activity_at,
"last_scanned_at": codex.last_scanned_at,
"unavailable_reason": codex.unavailable_reason,
"setup_hint": codex.setup_hint,
},
{
"source": "openai_api",
"provider_label": "OpenAI API",
"source_status": "unavailable",
"source_path": None,
"session_count": 0,
"last_activity_at": None,
"last_scanned_at": scanned_at,
"unavailable_reason": openai_reason,
"setup_hint": openai_hint,
},
]

View File

@ -0,0 +1,765 @@
"""Reader for local Codex CLI session history.
The reader only scans the discovered or explicitly configured sessions root.
It never reads Codex credential files and treats session logs as sensitive by
redacting likely secrets before returning tool inputs or outputs.
"""
from __future__ import annotations
import json
import os
import re
import shutil
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from app.core.logging import get_logger
logger = get_logger(__name__)
ACTIVE_WINDOW_MINUTES = 30
CONTENT_TRUNCATE = 4000
INPUT_VALUE_TRUNCATE = 2000
SOURCE = "codex_cli"
PROVIDER_LABEL = "Codex CLI"
@dataclass
class SessionTokens:
input: int = 0
output: int = 0
cache_read: int = 0
cache_write: int = 0
@property
def total(self) -> int:
return self.input + self.output + self.cache_read + self.cache_write
@dataclass
class CodexSession:
session_id: str
source: str
provider_label: str
project_dir: str
cwd: str | None
title: str | None
models: list[str]
tokens: SessionTokens
cost_usd: float
billing_source: str
message_count: int
first_message_at: datetime | None
last_message_at: datetime | None
is_active: bool
entrypoints: list[str]
git_branch: str | None
version: str | None
path: Path
@dataclass
class SessionTextBlock:
text: str
truncated: bool
@dataclass
class SessionThinkingBlock:
text: str
truncated: bool
@dataclass
class SessionToolUse:
tool_use_id: str
tool_name: str
input: dict[str, Any]
input_truncated: bool
result: str | None
result_truncated: bool
is_error: bool
@dataclass
class SessionTokenUsage:
input: int
output: int
cache_read: int
cache_write: int
@dataclass
class ParsedMessage:
uuid: str
role: str
timestamp: datetime | None
text_blocks: list[SessionTextBlock]
thinking_blocks: list[SessionThinkingBlock]
tool_uses: list[SessionToolUse]
model: str | None
tokens: SessionTokenUsage | None
@dataclass
class SourceMetadata:
source: str = SOURCE
provider_label: str = PROVIDER_LABEL
source_status: str = "unavailable"
source_path: str | None = None
session_count: int = 0
last_activity_at: datetime | None = None
last_scanned_at: datetime = field(
default_factory=lambda: datetime.now(UTC).replace(tzinfo=None)
)
unavailable_reason: str | None = None
setup_hint: str | None = None
def _sessions_root() -> Path:
override = os.environ.get("CODEX_SESSIONS_PATH", "").strip()
if override:
return Path(override).expanduser()
return Path.home() / ".codex" / "sessions"
def _parse_iso(ts: str | None) -> datetime | None:
if not ts:
return None
try:
return (
datetime.fromisoformat(ts.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None)
)
except ValueError:
return None
def _is_relative_to(path: Path, root: Path) -> bool:
try:
path.resolve().relative_to(root.resolve())
return True
except (OSError, ValueError):
return False
def _iter_session_files(root: Path | None = None) -> list[Path]:
sessions_root = root or _sessions_root()
if not sessions_root.exists() or not sessions_root.is_dir():
return []
files: list[Path] = []
for path in sessions_root.rglob("*.jsonl"):
if not path.is_file() or not _is_relative_to(path, sessions_root):
continue
files.append(path)
return sorted(files, key=lambda p: p.stat().st_mtime, reverse=True)
def _read_records(path: Path) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
try:
with open(path, encoding="utf-8", errors="replace") as fh:
for raw_line in fh:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
rec = json.loads(raw_line)
except json.JSONDecodeError:
continue
if isinstance(rec, dict):
records.append(rec)
except (OSError, PermissionError) as exc:
logger.debug("codex_session_reader.read_error path=%s error=%s", path, exc)
return records
def _payload(rec: dict[str, Any]) -> dict[str, Any]:
payload = rec.get("payload")
return payload if isinstance(payload, dict) else {}
_SENSITIVE_KEY_RE = re.compile(
r"(api[_-]?key|authorization|bearer|cookie|credential|headers?|password|secret|token)",
re.I,
)
_SECRET_VALUE_RE = re.compile(
r"(?i)\b("
r"sk-[a-z0-9_-]{16,}|"
r"gh[pousr]_[a-z0-9_]{16,}|"
r"bearer\s+[a-z0-9._~+/=-]{12,}|"
r"xox[baprs]-[a-z0-9-]{12,}"
r")\b"
)
_ENV_ASSIGNMENT_RE = re.compile(
r"(?i)\b([a-z_]*(?:api[_-]?key|password|secret|token)[a-z_]*)=([^\s]+)"
)
_URL_SECRET_QUERY_RE = re.compile(
r"(?i)([?&](?:api[_-]?key|access_token|auth|password|secret|token)=)[^&#\s]+"
)
def _redact_text(value: str) -> str:
redacted = _SECRET_VALUE_RE.sub("[REDACTED]", value)
redacted = _ENV_ASSIGNMENT_RE.sub(lambda m: f"{m.group(1)}=[REDACTED]", redacted)
redacted = _URL_SECRET_QUERY_RE.sub(lambda m: f"{m.group(1)}[REDACTED]", redacted)
redacted = re.sub(r"(https?://)([^/@\s]+):([^/@\s]+)@", r"\1[REDACTED]@", redacted)
return redacted
def _redact_value(value: Any, key: str = "") -> Any:
if _SENSITIVE_KEY_RE.search(key):
return "[REDACTED]"
if isinstance(value, str):
return _redact_text(value)
if isinstance(value, dict):
return {str(k): _redact_value(v, str(k)) for k, v in value.items()}
if isinstance(value, list):
return [_redact_value(item, key) for item in value]
return value
def _trunc(text: str, limit: int = CONTENT_TRUNCATE) -> tuple[str, bool]:
text = _redact_text(text)
if len(text) <= limit:
return text, False
return text[:limit], True
def _trunc_input(input_dict: dict[str, Any]) -> tuple[dict[str, Any], bool]:
redacted = _redact_value(input_dict)
truncated = False
def _truncate_value(value: Any) -> Any:
nonlocal truncated
if isinstance(value, str) and len(value) > INPUT_VALUE_TRUNCATE:
truncated = True
return value[:INPUT_VALUE_TRUNCATE]
if isinstance(value, dict):
return {str(k): _truncate_value(v) for k, v in value.items()}
if isinstance(value, list):
return [_truncate_value(item) for item in value]
return value
output = _truncate_value(redacted)
return output if isinstance(output, dict) else {}, truncated
def _decode_arguments(raw: Any) -> tuple[dict[str, Any], bool]:
if isinstance(raw, dict):
return raw, False
if isinstance(raw, str):
try:
decoded = json.loads(raw)
if isinstance(decoded, dict):
return decoded, False
except json.JSONDecodeError:
return {"value": raw}, False
return {"value": decoded}, False
return {}, False
def _content_blocks(content: Any, *, output: bool) -> list[SessionTextBlock]:
if isinstance(content, str):
text, truncated = _trunc(content)
return [SessionTextBlock(text=text, truncated=truncated)] if text.strip() else []
if not isinstance(content, list):
return []
block_types = {"output_text", "text"} if output else {"input_text", "text"}
blocks: list[SessionTextBlock] = []
for block in content:
if not isinstance(block, dict) or block.get("type") not in block_types:
continue
raw_text = block.get("text")
if not isinstance(raw_text, str):
continue
text, truncated = _trunc(raw_text)
if text.strip():
blocks.append(SessionTextBlock(text=text, truncated=truncated))
return blocks
def _reasoning_text(payload: dict[str, Any]) -> str:
content = payload.get("content")
if isinstance(content, str) and content.strip():
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict):
text = item.get("text") or item.get("summary")
if isinstance(text, str) and text.strip():
parts.append(text)
elif isinstance(item, str) and item.strip():
parts.append(item)
if parts:
return "\n".join(parts)
summary = payload.get("summary")
if isinstance(summary, str):
return summary
if isinstance(summary, list):
return "\n".join(str(item) for item in summary if str(item).strip())
return ""
def _usage_from_token_count(payload: dict[str, Any], key: str) -> SessionTokenUsage | None:
info = payload.get("info")
usage = info.get(key) if isinstance(info, dict) else None
if not isinstance(usage, dict):
return None
return SessionTokenUsage(
input=int(usage.get("input_tokens") or 0),
output=int(usage.get("output_tokens") or 0),
cache_read=int(usage.get("cached_input_tokens") or 0),
cache_write=0,
)
def _session_tokens_from_token_count(payload: dict[str, Any]) -> SessionTokens | None:
usage = _usage_from_token_count(payload, "total_token_usage")
if usage is None:
return None
return SessionTokens(
input=usage.input,
output=usage.output,
cache_read=usage.cache_read,
cache_write=usage.cache_write,
)
def _project_name(cwd: str | None, fallback: str) -> str:
if cwd:
name = Path(cwd).name
if name:
return name
return fallback
def _title_from_records(records: list[dict[str, Any]]) -> str | None:
for rec in records:
payload = _payload(rec)
if payload.get("type") != "message" or payload.get("role") != "user":
continue
blocks = _content_blocks(payload.get("content"), output=False)
for block in blocks:
text = " ".join(block.text.split())
if text:
return text[:80]
return None
def _parse_session_file(path: Path, title_hint: str | None = None) -> CodexSession | None:
records = _read_records(path)
if not records:
return None
session_id = path.stem
cwd: str | None = None
version: str | None = None
git_branch: str | None = None
models: set[str] = set()
entrypoints: set[str] = {"codex-cli"}
tokens = SessionTokens()
first_ts: datetime | None = None
last_ts: datetime | None = None
message_count = 0
for rec in records:
ts = _parse_iso(rec.get("timestamp") if isinstance(rec.get("timestamp"), str) else None)
if ts:
first_ts = ts if first_ts is None or ts < first_ts else first_ts
last_ts = ts if last_ts is None or ts > last_ts else last_ts
rec_type = rec.get("type")
payload = _payload(rec)
payload_type = payload.get("type")
if rec_type == "session_meta":
if isinstance(payload.get("id"), str):
session_id = payload["id"]
cwd = payload.get("cwd") if isinstance(payload.get("cwd"), str) else cwd
version = (
payload.get("cli_version")
if isinstance(payload.get("cli_version"), str)
else version
)
source = payload.get("source")
if isinstance(source, str) and source.strip():
entrypoints.add(source.strip())
git = payload.get("git")
if isinstance(git, dict) and isinstance(git.get("branch"), str):
git_branch = git["branch"]
elif rec_type == "turn_context":
cwd = payload.get("cwd") if isinstance(payload.get("cwd"), str) else cwd
model = payload.get("model")
if isinstance(model, str) and model.strip():
models.add(model.strip())
elif rec_type == "response_item" and payload_type == "message":
if payload.get("role") == "assistant":
message_count += 1
elif rec_type == "event_msg" and payload_type == "token_count":
latest = _session_tokens_from_token_count(payload)
if latest is not None:
tokens = latest
if message_count == 0 and first_ts is None:
return None
now = datetime.now(UTC).replace(tzinfo=None)
is_active = bool(last_ts and (now - last_ts) < timedelta(minutes=ACTIVE_WINDOW_MINUTES))
project_dir = _project_name(cwd, path.parent.name)
return CodexSession(
session_id=session_id,
source=SOURCE,
provider_label=PROVIDER_LABEL,
project_dir=project_dir,
cwd=cwd,
title=title_hint or _title_from_records(records),
models=sorted(models),
tokens=tokens,
cost_usd=0.0,
billing_source="subscription",
message_count=message_count,
first_message_at=first_ts,
last_message_at=last_ts,
is_active=is_active,
entrypoints=sorted(entrypoints),
git_branch=git_branch,
version=version,
path=path,
)
def source_metadata() -> SourceMetadata:
root = _sessions_root()
files = _iter_session_files(root)
installed = shutil.which("codex") is not None
meta = SourceMetadata(source_path=str(root))
if not root.exists():
meta.unavailable_reason = "Codex session history directory was not found."
meta.setup_hint = (
"Install and run Codex CLI, or set CODEX_SESSIONS_PATH to a readable history directory."
)
return meta
if not root.is_dir():
meta.unavailable_reason = "CODEX_SESSIONS_PATH does not point to a directory."
meta.setup_hint = "Set CODEX_SESSIONS_PATH to the Codex CLI sessions directory."
return meta
if not files:
meta.unavailable_reason = (
"Codex CLI is installed but no readable session history exists."
if installed
else "No readable Codex session history exists."
)
meta.setup_hint = (
"Start a Codex CLI session, or set CODEX_SESSIONS_PATH for nonstandard installs."
)
return meta
meta.source_status = "available"
meta.session_count = len(files)
latest_session = None
for path in files[:20]:
session = _parse_session_file(path)
if session and session.last_message_at:
if latest_session is None or session.last_message_at > latest_session:
latest_session = session.last_message_at
meta.last_activity_at = latest_session
return meta
def list_sessions(
*,
project_filter: str | None = None,
active_only: bool = False,
limit: int = 200,
) -> list[CodexSession]:
root = _sessions_root()
sessions: list[CodexSession] = []
for path in _iter_session_files(root):
session = _parse_session_file(path)
if session is None:
continue
if project_filter and project_filter.lower() not in session.project_dir.lower():
continue
if active_only and not session.is_active:
continue
sessions.append(session)
if len(sessions) >= limit:
break
return sessions
def get_session(session_id: str) -> CodexSession | None:
root = _sessions_root()
for path in _iter_session_files(root):
session = _parse_session_file(path)
if session and session.session_id == session_id:
return session
return None
def _find_session_path(session_id: str) -> Path | None:
root = _sessions_root()
for path in _iter_session_files(root):
if path.stem == session_id:
return path
session = _parse_session_file(path)
if session and session.session_id == session_id:
return path
return None
def get_session_messages(
session_id: str,
limit: int = 200,
offset: int = 0,
) -> tuple[list[ParsedMessage], int] | None:
path = _find_session_path(session_id)
if path is None:
return None
records = _read_records(path)
if not records:
return None
call_results: dict[str, tuple[str, bool, bool]] = {}
for rec in records:
payload = _payload(rec)
if payload.get("type") != "function_call_output":
continue
call_id = payload.get("call_id")
if not isinstance(call_id, str):
continue
output = payload.get("output")
text = output if isinstance(output, str) else json.dumps(output, default=str)
result_text, result_truncated = _trunc(text)
call_results[call_id] = (result_text, False, result_truncated)
parsed: list[ParsedMessage] = []
last_assistant: ParsedMessage | None = None
current_model: str | None = None
def _assistant_placeholder(uuid: str, ts: datetime | None) -> ParsedMessage:
msg = ParsedMessage(
uuid=uuid,
role="assistant",
timestamp=ts,
text_blocks=[],
thinking_blocks=[],
tool_uses=[],
model=current_model,
tokens=None,
)
parsed.append(msg)
return msg
for idx, rec in enumerate(records):
ts = _parse_iso(rec.get("timestamp") if isinstance(rec.get("timestamp"), str) else None)
payload = _payload(rec)
payload_type = payload.get("type")
if rec.get("type") == "turn_context":
model = payload.get("model")
if isinstance(model, str):
current_model = model
continue
if rec.get("type") == "response_item" and payload_type == "message":
role = payload.get("role")
if role == "user":
text_blocks = _content_blocks(payload.get("content"), output=False)
if text_blocks:
parsed.append(
ParsedMessage(
uuid=f"{path.stem}:user:{idx}",
role="user",
timestamp=ts,
text_blocks=text_blocks,
thinking_blocks=[],
tool_uses=[],
model=None,
tokens=None,
)
)
elif role == "assistant":
text_blocks = _content_blocks(payload.get("content"), output=True)
if text_blocks:
last_assistant = ParsedMessage(
uuid=f"{path.stem}:assistant:{idx}",
role="assistant",
timestamp=ts,
text_blocks=text_blocks,
thinking_blocks=[],
tool_uses=[],
model=current_model,
tokens=None,
)
parsed.append(last_assistant)
continue
if rec.get("type") != "response_item" and rec.get("type") != "event_msg":
continue
if payload_type == "reasoning":
text = _reasoning_text(payload)
if not text.strip():
continue
if last_assistant is None:
last_assistant = _assistant_placeholder(f"{path.stem}:assistant:{idx}", ts)
thinking, truncated = _trunc(text)
last_assistant.thinking_blocks.append(
SessionThinkingBlock(text=thinking, truncated=truncated)
)
elif payload_type in {"function_call", "web_search_call"}:
if last_assistant is None:
last_assistant = _assistant_placeholder(f"{path.stem}:assistant:{idx}", ts)
call_id = payload.get("call_id")
if not isinstance(call_id, str):
call_id = f"{path.stem}:tool:{idx}"
raw_input, input_was_truncated = _decode_arguments(payload.get("arguments"))
tool_input, input_truncated = _trunc_input(raw_input)
result = call_results.get(call_id)
last_assistant.tool_uses.append(
SessionToolUse(
tool_use_id=call_id,
tool_name=str(payload.get("name") or payload_type),
input=tool_input,
input_truncated=input_truncated or input_was_truncated,
result=result[0] if result else None,
result_truncated=result[2] if result else False,
is_error=result[1] if result else False,
)
)
elif payload_type == "token_count":
usage = _usage_from_token_count(payload, "last_token_usage")
if usage is not None and last_assistant is not None:
last_assistant.tokens = usage
total = len(parsed)
return parsed[offset : offset + limit], total
_FILE_READ_TOOLS = {"read_file", "view_image", "open", "screenshot"}
_FILE_WRITE_TOOLS = {"apply_patch", "imagegen"}
def _bash_binary(command: str) -> str | None:
cmd = command.strip().lstrip("!").strip()
if not cmd:
return None
parts = cmd.split()
while parts and "=" in parts[0] and not parts[0].startswith(("=", "./", "/")):
parts.pop(0)
if not parts:
return None
first = parts[0]
binary = first.lstrip("./").rsplit("/", 1)[-1]
return binary or None
def _extract_patch_files(value: Any) -> list[str]:
if not isinstance(value, str):
return []
files: list[str] = []
for line in value.splitlines():
prefix = None
for candidate in ("*** Update File: ", "*** Add File: ", "*** Delete File: "):
if line.startswith(candidate):
prefix = candidate
break
if prefix:
files.append(line[len(prefix) :].strip())
return files
def get_tool_analytics(project_filter: str | None = None, days: int = 30) -> dict[str, Any]:
root = _sessions_root()
cutoff = datetime.now(UTC).replace(tzinfo=None) - timedelta(days=days)
tool_counts: dict[str, int] = {}
files_read: dict[str, int] = {}
files_written: dict[str, int] = {}
commands: dict[str, int] = {}
session_count = 0
for session in list_sessions(project_filter=project_filter, limit=1000):
try:
mtime = datetime.fromtimestamp(session.path.stat().st_mtime, UTC).replace(tzinfo=None)
except OSError:
continue
if mtime < cutoff:
continue
result = get_session_messages(session.session_id, limit=1000, offset=0)
if result is None:
continue
messages, _ = result
session_had_tools = False
for message in messages:
for tool in message.tool_uses:
name = tool.tool_name
tool_counts[name] = tool_counts.get(name, 0) + 1
session_had_tools = True
inp = tool.input
path_value = inp.get("path") or inp.get("file_path")
if name in _FILE_READ_TOOLS and isinstance(path_value, str):
files_read[path_value] = files_read.get(path_value, 0) + 1
if name in _FILE_WRITE_TOOLS and isinstance(path_value, str):
files_written[path_value] = files_written.get(path_value, 0) + 1
if name == "exec_command":
cmd = inp.get("cmd")
if isinstance(cmd, str):
binary = _bash_binary(cmd)
if binary:
commands[binary] = commands.get(binary, 0) + 1
if name == "apply_patch":
for file_path in _extract_patch_files(inp.get("value") or inp.get("patch")):
files_written[file_path] = files_written.get(file_path, 0) + 1
if session_had_tools:
session_count += 1
def _top(counter: dict[str, int], key: str) -> list[dict[str, Any]]:
return [
{key: item, "count": count}
for item, count in sorted(counter.items(), key=lambda pair: pair[1], reverse=True)[:20]
]
return {
"tool_counts": dict(sorted(tool_counts.items(), key=lambda pair: pair[1], reverse=True)),
"top_files_read": _top(files_read, "path"),
"top_files_written": _top(files_written, "path"),
"top_commands": _top(commands, "command"),
"session_count": session_count,
"date_range_days": days,
"source_path": str(root),
}
def session_stats(sessions: list[CodexSession]) -> dict[str, Any]:
models: set[str] = set()
for session in sessions:
models.update(session.models)
return {
"session_count": len(sessions),
"active_sessions": sum(1 for session in sessions if session.is_active),
"total_tokens": sum(session.tokens.total for session in sessions),
"total_cost_usd": round(sum(session.cost_usd for session in sessions), 6),
"models": sorted(models),
}

View File

@ -0,0 +1,176 @@
# ruff: noqa: INP001
"""Tests for local Codex CLI session parsing."""
from __future__ import annotations
import json
import os
from pathlib import Path
from app.services import codex_session_reader as reader
def _write_jsonl(path: Path, records: list[dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
"\n".join(json.dumps(record) for record in records) + "\n",
encoding="utf-8",
)
def _fixture_records(session_id: str = "session-1") -> list[dict]:
return [
{
"timestamp": "2026-05-20T12:00:00Z",
"type": "session_meta",
"payload": {
"id": session_id,
"cwd": "/work/pipeline",
"cli_version": "0.133.0",
"source": "vscode",
"git": {"branch": "feature/codex"},
},
},
{
"timestamp": "2026-05-20T12:00:01Z",
"type": "turn_context",
"payload": {"type": "turn_context", "cwd": "/work/pipeline", "model": "gpt-5.5"},
},
{
"timestamp": "2026-05-20T12:00:02Z",
"type": "response_item",
"payload": {
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "Please inspect app.py"}],
},
},
{
"timestamp": "2026-05-20T12:00:03Z",
"type": "response_item",
"payload": {
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": "I will inspect it."}],
},
},
{
"timestamp": "2026-05-20T12:00:04Z",
"type": "response_item",
"payload": {
"type": "function_call",
"call_id": "call-1",
"name": "exec_command",
"arguments": json.dumps(
{
"cmd": "OPENAI_API_KEY=sk-testsecret1234567890 pytest",
"headers": {"Authorization": "Bearer abcdef1234567890"},
"url": "https://example.test/path?token=secret-token",
}
),
},
},
{
"timestamp": "2026-05-20T12:00:05Z",
"type": "response_item",
"payload": {
"type": "function_call_output",
"call_id": "call-1",
"output": "ok with sk-outputsecret1234567890",
},
},
{
"timestamp": "2026-05-20T12:00:06Z",
"type": "response_item",
"payload": {
"type": "function_call",
"call_id": "call-2",
"name": "apply_patch",
"arguments": "*** Begin Patch\n*** Update File: app.py\n+print('x')\n*** End Patch",
},
},
{
"timestamp": "2026-05-20T12:00:07Z",
"type": "event_msg",
"payload": {
"type": "token_count",
"info": {
"last_token_usage": {
"input_tokens": 10,
"cached_input_tokens": 4,
"output_tokens": 3,
},
"total_token_usage": {
"input_tokens": 100,
"cached_input_tokens": 40,
"output_tokens": 30,
},
},
},
},
]
def test_codex_sessions_parse_normalized_shape(tmp_path: Path, monkeypatch) -> None:
root = tmp_path / "sessions"
_write_jsonl(root / "2026/05/20/rollout-test.jsonl", _fixture_records())
monkeypatch.setenv("CODEX_SESSIONS_PATH", str(root))
sessions = reader.list_sessions()
assert len(sessions) == 1
session = sessions[0]
assert session.session_id == "session-1"
assert session.source == "codex_cli"
assert session.provider_label == "Codex CLI"
assert session.project_dir == "pipeline"
assert session.models == ["gpt-5.5"]
assert session.tokens.total == 170
assert session.git_branch == "feature/codex"
assert session.entrypoints == ["codex-cli", "vscode"]
def test_codex_messages_redact_and_paginate(tmp_path: Path, monkeypatch) -> None:
root = tmp_path / "sessions"
_write_jsonl(root / "rollout-test.jsonl", _fixture_records())
monkeypatch.setenv("CODEX_SESSIONS_PATH", str(root))
page = reader.get_session_messages("session-1", limit=1, offset=1)
assert page is not None
messages, total = page
assert total == 2
assert len(messages) == 1
assistant = messages[0]
assert assistant.role == "assistant"
assert assistant.tokens is not None
assert assistant.tokens.input == 10
assert assistant.tool_uses[0].input["cmd"] == "OPENAI_API_KEY=[REDACTED] pytest"
assert assistant.tool_uses[0].input["headers"] == "[REDACTED]"
assert assistant.tool_uses[0].input["url"] == "https://example.test/path?token=[REDACTED]"
assert assistant.tool_uses[0].result == "ok with [REDACTED]"
def test_codex_unavailable_source_is_not_error(tmp_path: Path, monkeypatch) -> None:
missing = tmp_path / "missing"
monkeypatch.setenv("CODEX_SESSIONS_PATH", str(missing))
assert reader.list_sessions() == []
metadata = reader.source_metadata()
assert metadata.source_status == "unavailable"
assert "not found" in (metadata.unavailable_reason or "")
def test_codex_tool_analytics(tmp_path: Path, monkeypatch) -> None:
root = tmp_path / "sessions"
session_path = root / "rollout-test.jsonl"
_write_jsonl(session_path, _fixture_records())
os.utime(session_path, None)
monkeypatch.setenv("CODEX_SESSIONS_PATH", str(root))
analytics = reader.get_tool_analytics(days=30)
assert analytics["tool_counts"] == {"exec_command": 1, "apply_patch": 1}
assert analytics["top_commands"] == [{"command": "pytest", "count": 1}]
assert analytics["top_files_written"] == [{"path": "app.py", "count": 1}]
assert analytics["session_count"] == 1

View File

@ -0,0 +1,168 @@
# ruff: noqa: INP001
"""API tests for Codex session endpoints."""
from __future__ import annotations
import json
from pathlib import Path
from uuid import uuid4
import pytest
from fastapi import APIRouter, FastAPI
from httpx import ASGITransport, AsyncClient
from app.api.agent_sessions import router as agent_sessions_router
from app.api.codex_sessions import router as codex_router
from app.services.organizations import OrganizationContext
def _write_jsonl(path: Path, records: list[dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
"\n".join(json.dumps(record) for record in records) + "\n",
encoding="utf-8",
)
def _records() -> list[dict]:
return [
{
"timestamp": "2026-05-20T12:00:00Z",
"type": "session_meta",
"payload": {"id": "api-session", "cwd": "/work/pipeline", "source": "terminal"},
},
{
"timestamp": "2026-05-20T12:00:01Z",
"type": "turn_context",
"payload": {"model": "gpt-5.5", "cwd": "/work/pipeline"},
},
{
"timestamp": "2026-05-20T12:00:02Z",
"type": "response_item",
"payload": {
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "hello"}],
},
},
{
"timestamp": "2026-05-20T12:00:03Z",
"type": "response_item",
"payload": {
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": "hi"}],
},
},
{
"timestamp": "2026-05-20T12:00:04Z",
"type": "response_item",
"payload": {
"type": "function_call",
"call_id": "call-api",
"name": "exec_command",
"arguments": json.dumps({"cmd": "npm test"}),
},
},
{
"timestamp": "2026-05-20T12:00:05Z",
"type": "event_msg",
"payload": {
"type": "token_count",
"info": {
"last_token_usage": {"input_tokens": 1, "output_tokens": 2},
"total_token_usage": {"input_tokens": 3, "output_tokens": 4},
},
},
},
]
def _build_app() -> FastAPI:
app = FastAPI()
api_v1 = APIRouter(prefix="/api/v1")
api_v1.include_router(codex_router)
api_v1.include_router(agent_sessions_router)
app.include_router(api_v1)
async def _override_org_member() -> OrganizationContext:
from app.models.organization_members import OrganizationMember
from app.models.organizations import Organization
org = Organization(id=uuid4(), name="test-org")
member = OrganizationMember(organization_id=org.id, user_id=uuid4(), role="admin")
return OrganizationContext(organization=org, member=member)
from app.api.deps import require_org_member
app.dependency_overrides[require_org_member] = _override_org_member
return app
@pytest.mark.asyncio
async def test_codex_sessions_endpoints(tmp_path: Path, monkeypatch) -> None:
root = tmp_path / "sessions"
_write_jsonl(root / "2026/05/20/rollout-api.jsonl", _records())
monkeypatch.setenv("CODEX_SESSIONS_PATH", str(root))
async with AsyncClient(
transport=ASGITransport(app=_build_app()), base_url="http://test"
) as client:
list_response = await client.get("/api/v1/codex/sessions")
assert list_response.status_code == 200
list_data = list_response.json()
assert list_data["source"] == "codex_cli"
assert list_data["source_status"] == "available"
assert list_data["total"] == 1
assert list_data["sessions"][0]["session_id"] == "api-session"
assert list_data["sessions"][0]["provider_label"] == "Codex CLI"
messages_response = await client.get("/api/v1/codex/sessions/api-session/messages")
assert messages_response.status_code == 200
messages_data = messages_response.json()
assert messages_data["source"] == "codex_cli"
assert messages_data["total"] == 2
assert messages_data["messages"][1]["tool_uses"][0]["tool_name"] == "exec_command"
analytics_response = await client.get("/api/v1/codex/analytics/tools?days=30")
assert analytics_response.status_code == 200
assert analytics_response.json()["top_commands"] == [{"command": "npm", "count": 1}]
@pytest.mark.asyncio
async def test_missing_codex_history_returns_source_unavailable(
tmp_path: Path, monkeypatch
) -> None:
monkeypatch.setenv("CODEX_SESSIONS_PATH", str(tmp_path / "missing"))
async with AsyncClient(
transport=ASGITransport(app=_build_app()), base_url="http://test"
) as client:
response = await client.get("/api/v1/codex/sessions")
assert response.status_code == 200
data = response.json()
assert data["sessions"] == []
assert data["source_status"] == "unavailable"
assert data["unavailable_reason"]
@pytest.mark.asyncio
async def test_agent_session_sources_include_openai_unavailable(
tmp_path: Path, monkeypatch
) -> None:
root = tmp_path / "sessions"
_write_jsonl(root / "rollout-api.jsonl", _records())
monkeypatch.setenv("CODEX_SESSIONS_PATH", str(root))
monkeypatch.setenv("CLAUDE_PROJECTS_PATH", str(tmp_path / "claude-missing"))
async with AsyncClient(
transport=ASGITransport(app=_build_app()), base_url="http://test"
) as client:
response = await client.get("/api/v1/agent-sessions/sources")
assert response.status_code == 200
sources = {source["source"]: source for source in response.json()["sources"]}
assert sources["codex_cli"]["source_status"] == "available"
assert sources["openai_api"]["source_status"] == "unavailable"
assert (
"owned OpenAI API session event source" in sources["openai_api"]["unavailable_reason"]
)