Pipeline/backend/app/services/codex_session_reader.py

766 lines
25 KiB
Python
Raw Permalink Normal View History

"""Reader for local Codex CLI session history.
The reader only scans the discovered or explicitly configured sessions root.
It never reads Codex credential files and treats session logs as sensitive by
redacting likely secrets before returning tool inputs or outputs.
"""
from __future__ import annotations
import json
import os
import re
import shutil
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from app.core.logging import get_logger
logger = get_logger(__name__)
ACTIVE_WINDOW_MINUTES = 30
CONTENT_TRUNCATE = 4000
INPUT_VALUE_TRUNCATE = 2000
SOURCE = "codex_cli"
PROVIDER_LABEL = "Codex CLI"
@dataclass
class SessionTokens:
input: int = 0
output: int = 0
cache_read: int = 0
cache_write: int = 0
@property
def total(self) -> int:
return self.input + self.output + self.cache_read + self.cache_write
@dataclass
class CodexSession:
session_id: str
source: str
provider_label: str
project_dir: str
cwd: str | None
title: str | None
models: list[str]
tokens: SessionTokens
cost_usd: float
billing_source: str
message_count: int
first_message_at: datetime | None
last_message_at: datetime | None
is_active: bool
entrypoints: list[str]
git_branch: str | None
version: str | None
path: Path
@dataclass
class SessionTextBlock:
text: str
truncated: bool
@dataclass
class SessionThinkingBlock:
text: str
truncated: bool
@dataclass
class SessionToolUse:
tool_use_id: str
tool_name: str
input: dict[str, Any]
input_truncated: bool
result: str | None
result_truncated: bool
is_error: bool
@dataclass
class SessionTokenUsage:
input: int
output: int
cache_read: int
cache_write: int
@dataclass
class ParsedMessage:
uuid: str
role: str
timestamp: datetime | None
text_blocks: list[SessionTextBlock]
thinking_blocks: list[SessionThinkingBlock]
tool_uses: list[SessionToolUse]
model: str | None
tokens: SessionTokenUsage | None
@dataclass
class SourceMetadata:
source: str = SOURCE
provider_label: str = PROVIDER_LABEL
source_status: str = "unavailable"
source_path: str | None = None
session_count: int = 0
last_activity_at: datetime | None = None
last_scanned_at: datetime = field(
default_factory=lambda: datetime.now(UTC).replace(tzinfo=None)
)
unavailable_reason: str | None = None
setup_hint: str | None = None
def _sessions_root() -> Path:
override = os.environ.get("CODEX_SESSIONS_PATH", "").strip()
if override:
return Path(override).expanduser()
return Path.home() / ".codex" / "sessions"
def _parse_iso(ts: str | None) -> datetime | None:
if not ts:
return None
try:
return (
datetime.fromisoformat(ts.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None)
)
except ValueError:
return None
def _is_relative_to(path: Path, root: Path) -> bool:
try:
path.resolve().relative_to(root.resolve())
return True
except (OSError, ValueError):
return False
def _iter_session_files(root: Path | None = None) -> list[Path]:
sessions_root = root or _sessions_root()
if not sessions_root.exists() or not sessions_root.is_dir():
return []
files: list[Path] = []
for path in sessions_root.rglob("*.jsonl"):
if not path.is_file() or not _is_relative_to(path, sessions_root):
continue
files.append(path)
return sorted(files, key=lambda p: p.stat().st_mtime, reverse=True)
def _read_records(path: Path) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
try:
with open(path, encoding="utf-8", errors="replace") as fh:
for raw_line in fh:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
rec = json.loads(raw_line)
except json.JSONDecodeError:
continue
if isinstance(rec, dict):
records.append(rec)
except (OSError, PermissionError) as exc:
logger.debug("codex_session_reader.read_error path=%s error=%s", path, exc)
return records
def _payload(rec: dict[str, Any]) -> dict[str, Any]:
payload = rec.get("payload")
return payload if isinstance(payload, dict) else {}
_SENSITIVE_KEY_RE = re.compile(
r"(api[_-]?key|authorization|bearer|cookie|credential|headers?|password|secret|token)",
re.I,
)
_SECRET_VALUE_RE = re.compile(
r"(?i)\b("
r"sk-[a-z0-9_-]{16,}|"
r"gh[pousr]_[a-z0-9_]{16,}|"
r"bearer\s+[a-z0-9._~+/=-]{12,}|"
r"xox[baprs]-[a-z0-9-]{12,}"
r")\b"
)
_ENV_ASSIGNMENT_RE = re.compile(
r"(?i)\b([a-z_]*(?:api[_-]?key|password|secret|token)[a-z_]*)=([^\s]+)"
)
_URL_SECRET_QUERY_RE = re.compile(
r"(?i)([?&](?:api[_-]?key|access_token|auth|password|secret|token)=)[^&#\s]+"
)
def _redact_text(value: str) -> str:
redacted = _SECRET_VALUE_RE.sub("[REDACTED]", value)
redacted = _ENV_ASSIGNMENT_RE.sub(lambda m: f"{m.group(1)}=[REDACTED]", redacted)
redacted = _URL_SECRET_QUERY_RE.sub(lambda m: f"{m.group(1)}[REDACTED]", redacted)
redacted = re.sub(r"(https?://)([^/@\s]+):([^/@\s]+)@", r"\1[REDACTED]@", redacted)
return redacted
def _redact_value(value: Any, key: str = "") -> Any:
if _SENSITIVE_KEY_RE.search(key):
return "[REDACTED]"
if isinstance(value, str):
return _redact_text(value)
if isinstance(value, dict):
return {str(k): _redact_value(v, str(k)) for k, v in value.items()}
if isinstance(value, list):
return [_redact_value(item, key) for item in value]
return value
def _trunc(text: str, limit: int = CONTENT_TRUNCATE) -> tuple[str, bool]:
text = _redact_text(text)
if len(text) <= limit:
return text, False
return text[:limit], True
def _trunc_input(input_dict: dict[str, Any]) -> tuple[dict[str, Any], bool]:
redacted = _redact_value(input_dict)
truncated = False
def _truncate_value(value: Any) -> Any:
nonlocal truncated
if isinstance(value, str) and len(value) > INPUT_VALUE_TRUNCATE:
truncated = True
return value[:INPUT_VALUE_TRUNCATE]
if isinstance(value, dict):
return {str(k): _truncate_value(v) for k, v in value.items()}
if isinstance(value, list):
return [_truncate_value(item) for item in value]
return value
output = _truncate_value(redacted)
return output if isinstance(output, dict) else {}, truncated
def _decode_arguments(raw: Any) -> tuple[dict[str, Any], bool]:
if isinstance(raw, dict):
return raw, False
if isinstance(raw, str):
try:
decoded = json.loads(raw)
if isinstance(decoded, dict):
return decoded, False
except json.JSONDecodeError:
return {"value": raw}, False
return {"value": decoded}, False
return {}, False
def _content_blocks(content: Any, *, output: bool) -> list[SessionTextBlock]:
if isinstance(content, str):
text, truncated = _trunc(content)
return [SessionTextBlock(text=text, truncated=truncated)] if text.strip() else []
if not isinstance(content, list):
return []
block_types = {"output_text", "text"} if output else {"input_text", "text"}
blocks: list[SessionTextBlock] = []
for block in content:
if not isinstance(block, dict) or block.get("type") not in block_types:
continue
raw_text = block.get("text")
if not isinstance(raw_text, str):
continue
text, truncated = _trunc(raw_text)
if text.strip():
blocks.append(SessionTextBlock(text=text, truncated=truncated))
return blocks
def _reasoning_text(payload: dict[str, Any]) -> str:
content = payload.get("content")
if isinstance(content, str) and content.strip():
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict):
text = item.get("text") or item.get("summary")
if isinstance(text, str) and text.strip():
parts.append(text)
elif isinstance(item, str) and item.strip():
parts.append(item)
if parts:
return "\n".join(parts)
summary = payload.get("summary")
if isinstance(summary, str):
return summary
if isinstance(summary, list):
return "\n".join(str(item) for item in summary if str(item).strip())
return ""
def _usage_from_token_count(payload: dict[str, Any], key: str) -> SessionTokenUsage | None:
info = payload.get("info")
usage = info.get(key) if isinstance(info, dict) else None
if not isinstance(usage, dict):
return None
return SessionTokenUsage(
input=int(usage.get("input_tokens") or 0),
output=int(usage.get("output_tokens") or 0),
cache_read=int(usage.get("cached_input_tokens") or 0),
cache_write=0,
)
def _session_tokens_from_token_count(payload: dict[str, Any]) -> SessionTokens | None:
usage = _usage_from_token_count(payload, "total_token_usage")
if usage is None:
return None
return SessionTokens(
input=usage.input,
output=usage.output,
cache_read=usage.cache_read,
cache_write=usage.cache_write,
)
def _project_name(cwd: str | None, fallback: str) -> str:
if cwd:
name = Path(cwd).name
if name:
return name
return fallback
def _title_from_records(records: list[dict[str, Any]]) -> str | None:
for rec in records:
payload = _payload(rec)
if payload.get("type") != "message" or payload.get("role") != "user":
continue
blocks = _content_blocks(payload.get("content"), output=False)
for block in blocks:
text = " ".join(block.text.split())
if text:
return text[:80]
return None
def _parse_session_file(path: Path, title_hint: str | None = None) -> CodexSession | None:
records = _read_records(path)
if not records:
return None
session_id = path.stem
cwd: str | None = None
version: str | None = None
git_branch: str | None = None
models: set[str] = set()
entrypoints: set[str] = {"codex-cli"}
tokens = SessionTokens()
first_ts: datetime | None = None
last_ts: datetime | None = None
message_count = 0
for rec in records:
ts = _parse_iso(rec.get("timestamp") if isinstance(rec.get("timestamp"), str) else None)
if ts:
first_ts = ts if first_ts is None or ts < first_ts else first_ts
last_ts = ts if last_ts is None or ts > last_ts else last_ts
rec_type = rec.get("type")
payload = _payload(rec)
payload_type = payload.get("type")
if rec_type == "session_meta":
if isinstance(payload.get("id"), str):
session_id = payload["id"]
cwd = payload.get("cwd") if isinstance(payload.get("cwd"), str) else cwd
version = (
payload.get("cli_version")
if isinstance(payload.get("cli_version"), str)
else version
)
source = payload.get("source")
if isinstance(source, str) and source.strip():
entrypoints.add(source.strip())
git = payload.get("git")
if isinstance(git, dict) and isinstance(git.get("branch"), str):
git_branch = git["branch"]
elif rec_type == "turn_context":
cwd = payload.get("cwd") if isinstance(payload.get("cwd"), str) else cwd
model = payload.get("model")
if isinstance(model, str) and model.strip():
models.add(model.strip())
elif rec_type == "response_item" and payload_type == "message":
if payload.get("role") == "assistant":
message_count += 1
elif rec_type == "event_msg" and payload_type == "token_count":
latest = _session_tokens_from_token_count(payload)
if latest is not None:
tokens = latest
if message_count == 0 and first_ts is None:
return None
now = datetime.now(UTC).replace(tzinfo=None)
is_active = bool(last_ts and (now - last_ts) < timedelta(minutes=ACTIVE_WINDOW_MINUTES))
project_dir = _project_name(cwd, path.parent.name)
return CodexSession(
session_id=session_id,
source=SOURCE,
provider_label=PROVIDER_LABEL,
project_dir=project_dir,
cwd=cwd,
title=title_hint or _title_from_records(records),
models=sorted(models),
tokens=tokens,
cost_usd=0.0,
billing_source="subscription",
message_count=message_count,
first_message_at=first_ts,
last_message_at=last_ts,
is_active=is_active,
entrypoints=sorted(entrypoints),
git_branch=git_branch,
version=version,
path=path,
)
def source_metadata() -> SourceMetadata:
root = _sessions_root()
files = _iter_session_files(root)
installed = shutil.which("codex") is not None
meta = SourceMetadata(source_path=str(root))
if not root.exists():
meta.unavailable_reason = "Codex session history directory was not found."
meta.setup_hint = (
"Install and run Codex CLI, or set CODEX_SESSIONS_PATH to a readable history directory."
)
return meta
if not root.is_dir():
meta.unavailable_reason = "CODEX_SESSIONS_PATH does not point to a directory."
meta.setup_hint = "Set CODEX_SESSIONS_PATH to the Codex CLI sessions directory."
return meta
if not files:
meta.unavailable_reason = (
"Codex CLI is installed but no readable session history exists."
if installed
else "No readable Codex session history exists."
)
meta.setup_hint = (
"Start a Codex CLI session, or set CODEX_SESSIONS_PATH for nonstandard installs."
)
return meta
meta.source_status = "available"
meta.session_count = len(files)
latest_session = None
for path in files[:20]:
session = _parse_session_file(path)
if session and session.last_message_at:
if latest_session is None or session.last_message_at > latest_session:
latest_session = session.last_message_at
meta.last_activity_at = latest_session
return meta
def list_sessions(
*,
project_filter: str | None = None,
active_only: bool = False,
limit: int = 200,
) -> list[CodexSession]:
root = _sessions_root()
sessions: list[CodexSession] = []
for path in _iter_session_files(root):
session = _parse_session_file(path)
if session is None:
continue
if project_filter and project_filter.lower() not in session.project_dir.lower():
continue
if active_only and not session.is_active:
continue
sessions.append(session)
if len(sessions) >= limit:
break
return sessions
def get_session(session_id: str) -> CodexSession | None:
root = _sessions_root()
for path in _iter_session_files(root):
session = _parse_session_file(path)
if session and session.session_id == session_id:
return session
return None
def _find_session_path(session_id: str) -> Path | None:
root = _sessions_root()
for path in _iter_session_files(root):
if path.stem == session_id:
return path
session = _parse_session_file(path)
if session and session.session_id == session_id:
return path
return None
def get_session_messages(
session_id: str,
limit: int = 200,
offset: int = 0,
) -> tuple[list[ParsedMessage], int] | None:
path = _find_session_path(session_id)
if path is None:
return None
records = _read_records(path)
if not records:
return None
call_results: dict[str, tuple[str, bool, bool]] = {}
for rec in records:
payload = _payload(rec)
if payload.get("type") != "function_call_output":
continue
call_id = payload.get("call_id")
if not isinstance(call_id, str):
continue
output = payload.get("output")
text = output if isinstance(output, str) else json.dumps(output, default=str)
result_text, result_truncated = _trunc(text)
call_results[call_id] = (result_text, False, result_truncated)
parsed: list[ParsedMessage] = []
last_assistant: ParsedMessage | None = None
current_model: str | None = None
def _assistant_placeholder(uuid: str, ts: datetime | None) -> ParsedMessage:
msg = ParsedMessage(
uuid=uuid,
role="assistant",
timestamp=ts,
text_blocks=[],
thinking_blocks=[],
tool_uses=[],
model=current_model,
tokens=None,
)
parsed.append(msg)
return msg
for idx, rec in enumerate(records):
ts = _parse_iso(rec.get("timestamp") if isinstance(rec.get("timestamp"), str) else None)
payload = _payload(rec)
payload_type = payload.get("type")
if rec.get("type") == "turn_context":
model = payload.get("model")
if isinstance(model, str):
current_model = model
continue
if rec.get("type") == "response_item" and payload_type == "message":
role = payload.get("role")
if role == "user":
text_blocks = _content_blocks(payload.get("content"), output=False)
if text_blocks:
parsed.append(
ParsedMessage(
uuid=f"{path.stem}:user:{idx}",
role="user",
timestamp=ts,
text_blocks=text_blocks,
thinking_blocks=[],
tool_uses=[],
model=None,
tokens=None,
)
)
elif role == "assistant":
text_blocks = _content_blocks(payload.get("content"), output=True)
if text_blocks:
last_assistant = ParsedMessage(
uuid=f"{path.stem}:assistant:{idx}",
role="assistant",
timestamp=ts,
text_blocks=text_blocks,
thinking_blocks=[],
tool_uses=[],
model=current_model,
tokens=None,
)
parsed.append(last_assistant)
continue
if rec.get("type") != "response_item" and rec.get("type") != "event_msg":
continue
if payload_type == "reasoning":
text = _reasoning_text(payload)
if not text.strip():
continue
if last_assistant is None:
last_assistant = _assistant_placeholder(f"{path.stem}:assistant:{idx}", ts)
thinking, truncated = _trunc(text)
last_assistant.thinking_blocks.append(
SessionThinkingBlock(text=thinking, truncated=truncated)
)
elif payload_type in {"function_call", "web_search_call"}:
if last_assistant is None:
last_assistant = _assistant_placeholder(f"{path.stem}:assistant:{idx}", ts)
call_id = payload.get("call_id")
if not isinstance(call_id, str):
call_id = f"{path.stem}:tool:{idx}"
raw_input, input_was_truncated = _decode_arguments(payload.get("arguments"))
tool_input, input_truncated = _trunc_input(raw_input)
result = call_results.get(call_id)
last_assistant.tool_uses.append(
SessionToolUse(
tool_use_id=call_id,
tool_name=str(payload.get("name") or payload_type),
input=tool_input,
input_truncated=input_truncated or input_was_truncated,
result=result[0] if result else None,
result_truncated=result[2] if result else False,
is_error=result[1] if result else False,
)
)
elif payload_type == "token_count":
usage = _usage_from_token_count(payload, "last_token_usage")
if usage is not None and last_assistant is not None:
last_assistant.tokens = usage
total = len(parsed)
return parsed[offset : offset + limit], total
_FILE_READ_TOOLS = {"read_file", "view_image", "open", "screenshot"}
_FILE_WRITE_TOOLS = {"apply_patch", "imagegen"}
def _bash_binary(command: str) -> str | None:
cmd = command.strip().lstrip("!").strip()
if not cmd:
return None
parts = cmd.split()
while parts and "=" in parts[0] and not parts[0].startswith(("=", "./", "/")):
parts.pop(0)
if not parts:
return None
first = parts[0]
binary = first.lstrip("./").rsplit("/", 1)[-1]
return binary or None
def _extract_patch_files(value: Any) -> list[str]:
if not isinstance(value, str):
return []
files: list[str] = []
for line in value.splitlines():
prefix = None
for candidate in ("*** Update File: ", "*** Add File: ", "*** Delete File: "):
if line.startswith(candidate):
prefix = candidate
break
if prefix:
files.append(line[len(prefix) :].strip())
return files
def get_tool_analytics(project_filter: str | None = None, days: int = 30) -> dict[str, Any]:
root = _sessions_root()
cutoff = datetime.now(UTC).replace(tzinfo=None) - timedelta(days=days)
tool_counts: dict[str, int] = {}
files_read: dict[str, int] = {}
files_written: dict[str, int] = {}
commands: dict[str, int] = {}
session_count = 0
for session in list_sessions(project_filter=project_filter, limit=1000):
try:
mtime = datetime.fromtimestamp(session.path.stat().st_mtime, UTC).replace(tzinfo=None)
except OSError:
continue
if mtime < cutoff:
continue
result = get_session_messages(session.session_id, limit=1000, offset=0)
if result is None:
continue
messages, _ = result
session_had_tools = False
for message in messages:
for tool in message.tool_uses:
name = tool.tool_name
tool_counts[name] = tool_counts.get(name, 0) + 1
session_had_tools = True
inp = tool.input
path_value = inp.get("path") or inp.get("file_path")
if name in _FILE_READ_TOOLS and isinstance(path_value, str):
files_read[path_value] = files_read.get(path_value, 0) + 1
if name in _FILE_WRITE_TOOLS and isinstance(path_value, str):
files_written[path_value] = files_written.get(path_value, 0) + 1
if name == "exec_command":
cmd = inp.get("cmd")
if isinstance(cmd, str):
binary = _bash_binary(cmd)
if binary:
commands[binary] = commands.get(binary, 0) + 1
if name == "apply_patch":
for file_path in _extract_patch_files(inp.get("value") or inp.get("patch")):
files_written[file_path] = files_written.get(file_path, 0) + 1
if session_had_tools:
session_count += 1
def _top(counter: dict[str, int], key: str) -> list[dict[str, Any]]:
return [
{key: item, "count": count}
for item, count in sorted(counter.items(), key=lambda pair: pair[1], reverse=True)[:20]
]
return {
"tool_counts": dict(sorted(tool_counts.items(), key=lambda pair: pair[1], reverse=True)),
"top_files_read": _top(files_read, "path"),
"top_files_written": _top(files_written, "path"),
"top_commands": _top(commands, "command"),
"session_count": session_count,
"date_range_days": days,
"source_path": str(root),
}
def session_stats(sessions: list[CodexSession]) -> dict[str, Any]:
models: set[str] = set()
for session in sessions:
models.update(session.models)
return {
"session_count": len(sessions),
"active_sessions": sum(1 for session in sessions if session.is_active),
"total_tokens": sum(session.tokens.total for session in sessions),
"total_cost_usd": round(sum(session.cost_usd for session in sessions), 6),
"models": sorted(models),
}