Pipeline/backend/app/api/codex_sessions.py

218 lines
7.5 KiB
Python

"""Codex CLI session API endpoints."""
from __future__ import annotations
import asyncio
from fastapi import APIRouter, Depends, HTTPException, Query, status
from app.api.deps import require_org_member
from app.schemas.agent_sessions import (
AgentSessionListResponse,
AgentSessionRead,
AgentSessionStatsRead,
CommandEntry,
FileEntry,
SessionMessage,
SessionMessagesResponse,
SessionTokensRead,
SessionTokenUsageRead,
TextBlock,
ThinkingBlock,
ToolAnalyticsResponse,
ToolUseBlock,
)
from app.services import codex_session_reader as reader
from app.services.organizations import OrganizationContext
router = APIRouter(prefix="/codex", tags=["codex"])
ORG_MEMBER_DEP = Depends(require_org_member)
def _session_to_read(session: reader.CodexSession) -> AgentSessionRead:
return AgentSessionRead(
session_id=session.session_id,
source="codex_cli",
provider_label=session.provider_label,
project_dir=session.project_dir,
cwd=session.cwd,
title=session.title,
models=session.models,
tokens=SessionTokensRead(
input=session.tokens.input,
output=session.tokens.output,
cache_read=session.tokens.cache_read,
cache_write=session.tokens.cache_write,
total=session.tokens.total,
),
cost_usd=session.cost_usd,
billing_source=session.billing_source,
message_count=session.message_count,
first_message_at=session.first_message_at,
last_message_at=session.last_message_at,
is_active=session.is_active,
entrypoints=session.entrypoints,
git_branch=session.git_branch,
version=session.version,
)
def _message_to_read(message: reader.ParsedMessage) -> SessionMessage:
return SessionMessage(
uuid=message.uuid,
role=message.role,
timestamp=message.timestamp,
text_blocks=[
TextBlock(text=block.text, truncated=block.truncated) for block in message.text_blocks
],
thinking_blocks=[
ThinkingBlock(text=block.text, truncated=block.truncated)
for block in message.thinking_blocks
],
tool_uses=[
ToolUseBlock(
tool_use_id=tool.tool_use_id,
tool_name=tool.tool_name,
input=tool.input,
input_truncated=tool.input_truncated,
result=tool.result,
result_truncated=tool.result_truncated,
is_error=tool.is_error,
)
for tool in message.tool_uses
],
model=message.model,
tokens=(
SessionTokenUsageRead(
input=message.tokens.input,
output=message.tokens.output,
cache_read=message.tokens.cache_read,
cache_write=message.tokens.cache_write,
)
if message.tokens
else None
),
)
@router.get(
"/sessions",
response_model=AgentSessionListResponse,
summary="List local Codex CLI sessions",
description=(
"Reads local Codex CLI JSONL session history from ~/.codex/sessions, or "
"CODEX_SESSIONS_PATH when configured. Missing history returns an empty "
"source-unavailable response rather than an error."
),
)
async def list_sessions(
project: str | None = Query(None, description="Filter by project directory name substring"),
active_only: bool = Query(False, description="Return only currently active sessions"),
limit: int = Query(100, ge=1, le=500, description="Maximum sessions to return"),
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> AgentSessionListResponse:
sessions = await asyncio.to_thread(
reader.list_sessions,
project_filter=project,
active_only=active_only,
limit=limit,
)
stats = reader.session_stats(sessions)
source = await asyncio.to_thread(reader.source_metadata)
return AgentSessionListResponse(
sessions=[_session_to_read(session) for session in sessions],
total=len(sessions),
stats=AgentSessionStatsRead(
session_count=stats["session_count"],
active_sessions=stats["active_sessions"],
total_tokens=stats["total_tokens"],
total_cost_usd=stats["total_cost_usd"],
models=stats["models"],
),
source="codex_cli",
provider_label=reader.PROVIDER_LABEL,
source_status=source.source_status,
source_path=source.source_path,
last_scanned_at=source.last_scanned_at,
unavailable_reason=source.unavailable_reason,
setup_hint=source.setup_hint,
)
@router.get(
"/sessions/{session_id}",
response_model=AgentSessionRead,
summary="Get a single Codex CLI session",
)
async def get_session(
session_id: str,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> AgentSessionRead:
session = await asyncio.to_thread(reader.get_session, session_id)
if session is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
return _session_to_read(session)
@router.get(
"/sessions/{session_id}/messages",
response_model=SessionMessagesResponse,
summary="Get conversation messages for a Codex CLI session",
)
async def get_session_messages(
session_id: str,
limit: int = Query(200, ge=1, le=500, description="Max messages to return"),
offset: int = Query(0, ge=0, description="Number of messages to skip"),
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> SessionMessagesResponse:
result = await asyncio.to_thread(reader.get_session_messages, session_id, limit, offset)
source = await asyncio.to_thread(reader.source_metadata)
if result is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
messages, total = result
return SessionMessagesResponse(
session_id=session_id,
source="codex_cli",
source_status=source.source_status,
source_path=source.source_path,
last_scanned_at=source.last_scanned_at,
messages=[_message_to_read(message) for message in messages],
total=total,
has_more=(offset + limit) < total,
)
@router.get(
"/analytics/tools",
response_model=ToolAnalyticsResponse,
summary="Aggregate tool-use statistics across Codex CLI sessions",
)
async def get_tool_analytics(
project: str | None = Query(None, description="Filter by project directory name substring"),
days: int = Query(30, ge=1, le=365, description="Number of days to look back"),
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ToolAnalyticsResponse:
data = await asyncio.to_thread(reader.get_tool_analytics, project, days)
source = await asyncio.to_thread(reader.source_metadata)
return ToolAnalyticsResponse(
tool_counts=data["tool_counts"],
top_files_read=[
FileEntry(path=entry["path"], count=entry["count"]) for entry in data["top_files_read"]
],
top_files_written=[
FileEntry(path=entry["path"], count=entry["count"])
for entry in data["top_files_written"]
],
top_commands=[
CommandEntry(command=entry["command"], count=entry["count"])
for entry in data["top_commands"]
],
session_count=data["session_count"],
date_range_days=data["date_range_days"],
source="codex_cli",
source_status=source.source_status,
source_path=source.source_path,
last_scanned_at=source.last_scanned_at,
)