"""Reader for local Codex CLI session history. The reader only scans the discovered or explicitly configured sessions root. It never reads Codex credential files and treats session logs as sensitive by redacting likely secrets before returning tool inputs or outputs. """ from __future__ import annotations import json import os import re import shutil from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Any from app.core.logging import get_logger logger = get_logger(__name__) ACTIVE_WINDOW_MINUTES = 30 CONTENT_TRUNCATE = 4000 INPUT_VALUE_TRUNCATE = 2000 SOURCE = "codex_cli" PROVIDER_LABEL = "Codex CLI" @dataclass class SessionTokens: input: int = 0 output: int = 0 cache_read: int = 0 cache_write: int = 0 @property def total(self) -> int: return self.input + self.output + self.cache_read + self.cache_write @dataclass class CodexSession: session_id: str source: str provider_label: str project_dir: str cwd: str | None title: str | None models: list[str] tokens: SessionTokens cost_usd: float billing_source: str message_count: int first_message_at: datetime | None last_message_at: datetime | None is_active: bool entrypoints: list[str] git_branch: str | None version: str | None path: Path @dataclass class SessionTextBlock: text: str truncated: bool @dataclass class SessionThinkingBlock: text: str truncated: bool @dataclass class SessionToolUse: tool_use_id: str tool_name: str input: dict[str, Any] input_truncated: bool result: str | None result_truncated: bool is_error: bool @dataclass class SessionTokenUsage: input: int output: int cache_read: int cache_write: int @dataclass class ParsedMessage: uuid: str role: str timestamp: datetime | None text_blocks: list[SessionTextBlock] thinking_blocks: list[SessionThinkingBlock] tool_uses: list[SessionToolUse] model: str | None tokens: SessionTokenUsage | None @dataclass class SourceMetadata: source: str = SOURCE provider_label: str = PROVIDER_LABEL source_status: str = "unavailable" source_path: str | None = None session_count: int = 0 last_activity_at: datetime | None = None last_scanned_at: datetime = field( default_factory=lambda: datetime.now(UTC).replace(tzinfo=None) ) unavailable_reason: str | None = None setup_hint: str | None = None def _sessions_root() -> Path: override = os.environ.get("CODEX_SESSIONS_PATH", "").strip() if override: return Path(override).expanduser() return Path.home() / ".codex" / "sessions" def _parse_iso(ts: str | None) -> datetime | None: if not ts: return None try: return ( datetime.fromisoformat(ts.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None) ) except ValueError: return None def _is_relative_to(path: Path, root: Path) -> bool: try: path.resolve().relative_to(root.resolve()) return True except (OSError, ValueError): return False def _iter_session_files(root: Path | None = None) -> list[Path]: sessions_root = root or _sessions_root() if not sessions_root.exists() or not sessions_root.is_dir(): return [] files: list[Path] = [] for path in sessions_root.rglob("*.jsonl"): if not path.is_file() or not _is_relative_to(path, sessions_root): continue files.append(path) return sorted(files, key=lambda p: p.stat().st_mtime, reverse=True) def _read_records(path: Path) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] try: with open(path, encoding="utf-8", errors="replace") as fh: for raw_line in fh: raw_line = raw_line.strip() if not raw_line: continue try: rec = json.loads(raw_line) except json.JSONDecodeError: continue if isinstance(rec, dict): records.append(rec) except (OSError, PermissionError) as exc: logger.debug("codex_session_reader.read_error path=%s error=%s", path, exc) return records def _payload(rec: dict[str, Any]) -> dict[str, Any]: payload = rec.get("payload") return payload if isinstance(payload, dict) else {} _SENSITIVE_KEY_RE = re.compile( r"(api[_-]?key|authorization|bearer|cookie|credential|headers?|password|secret|token)", re.I, ) _SECRET_VALUE_RE = re.compile( r"(?i)\b(" r"sk-[a-z0-9_-]{16,}|" r"gh[pousr]_[a-z0-9_]{16,}|" r"bearer\s+[a-z0-9._~+/=-]{12,}|" r"xox[baprs]-[a-z0-9-]{12,}" r")\b" ) _ENV_ASSIGNMENT_RE = re.compile( r"(?i)\b([a-z_]*(?:api[_-]?key|password|secret|token)[a-z_]*)=([^\s]+)" ) _URL_SECRET_QUERY_RE = re.compile( r"(?i)([?&](?:api[_-]?key|access_token|auth|password|secret|token)=)[^&#\s]+" ) def _redact_text(value: str) -> str: redacted = _SECRET_VALUE_RE.sub("[REDACTED]", value) redacted = _ENV_ASSIGNMENT_RE.sub(lambda m: f"{m.group(1)}=[REDACTED]", redacted) redacted = _URL_SECRET_QUERY_RE.sub(lambda m: f"{m.group(1)}[REDACTED]", redacted) redacted = re.sub(r"(https?://)([^/@\s]+):([^/@\s]+)@", r"\1[REDACTED]@", redacted) return redacted def _redact_value(value: Any, key: str = "") -> Any: if _SENSITIVE_KEY_RE.search(key): return "[REDACTED]" if isinstance(value, str): return _redact_text(value) if isinstance(value, dict): return {str(k): _redact_value(v, str(k)) for k, v in value.items()} if isinstance(value, list): return [_redact_value(item, key) for item in value] return value def _trunc(text: str, limit: int = CONTENT_TRUNCATE) -> tuple[str, bool]: text = _redact_text(text) if len(text) <= limit: return text, False return text[:limit], True def _trunc_input(input_dict: dict[str, Any]) -> tuple[dict[str, Any], bool]: redacted = _redact_value(input_dict) truncated = False def _truncate_value(value: Any) -> Any: nonlocal truncated if isinstance(value, str) and len(value) > INPUT_VALUE_TRUNCATE: truncated = True return value[:INPUT_VALUE_TRUNCATE] if isinstance(value, dict): return {str(k): _truncate_value(v) for k, v in value.items()} if isinstance(value, list): return [_truncate_value(item) for item in value] return value output = _truncate_value(redacted) return output if isinstance(output, dict) else {}, truncated def _decode_arguments(raw: Any) -> tuple[dict[str, Any], bool]: if isinstance(raw, dict): return raw, False if isinstance(raw, str): try: decoded = json.loads(raw) if isinstance(decoded, dict): return decoded, False except json.JSONDecodeError: return {"value": raw}, False return {"value": decoded}, False return {}, False def _content_blocks(content: Any, *, output: bool) -> list[SessionTextBlock]: if isinstance(content, str): text, truncated = _trunc(content) return [SessionTextBlock(text=text, truncated=truncated)] if text.strip() else [] if not isinstance(content, list): return [] block_types = {"output_text", "text"} if output else {"input_text", "text"} blocks: list[SessionTextBlock] = [] for block in content: if not isinstance(block, dict) or block.get("type") not in block_types: continue raw_text = block.get("text") if not isinstance(raw_text, str): continue text, truncated = _trunc(raw_text) if text.strip(): blocks.append(SessionTextBlock(text=text, truncated=truncated)) return blocks def _reasoning_text(payload: dict[str, Any]) -> str: content = payload.get("content") if isinstance(content, str) and content.strip(): return content if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, dict): text = item.get("text") or item.get("summary") if isinstance(text, str) and text.strip(): parts.append(text) elif isinstance(item, str) and item.strip(): parts.append(item) if parts: return "\n".join(parts) summary = payload.get("summary") if isinstance(summary, str): return summary if isinstance(summary, list): return "\n".join(str(item) for item in summary if str(item).strip()) return "" def _usage_from_token_count(payload: dict[str, Any], key: str) -> SessionTokenUsage | None: info = payload.get("info") usage = info.get(key) if isinstance(info, dict) else None if not isinstance(usage, dict): return None return SessionTokenUsage( input=int(usage.get("input_tokens") or 0), output=int(usage.get("output_tokens") or 0), cache_read=int(usage.get("cached_input_tokens") or 0), cache_write=0, ) def _session_tokens_from_token_count(payload: dict[str, Any]) -> SessionTokens | None: usage = _usage_from_token_count(payload, "total_token_usage") if usage is None: return None return SessionTokens( input=usage.input, output=usage.output, cache_read=usage.cache_read, cache_write=usage.cache_write, ) def _project_name(cwd: str | None, fallback: str) -> str: if cwd: name = Path(cwd).name if name: return name return fallback def _title_from_records(records: list[dict[str, Any]]) -> str | None: for rec in records: payload = _payload(rec) if payload.get("type") != "message" or payload.get("role") != "user": continue blocks = _content_blocks(payload.get("content"), output=False) for block in blocks: text = " ".join(block.text.split()) if text: return text[:80] return None def _parse_session_file(path: Path, title_hint: str | None = None) -> CodexSession | None: records = _read_records(path) if not records: return None session_id = path.stem cwd: str | None = None version: str | None = None git_branch: str | None = None models: set[str] = set() entrypoints: set[str] = {"codex-cli"} tokens = SessionTokens() first_ts: datetime | None = None last_ts: datetime | None = None message_count = 0 for rec in records: ts = _parse_iso(rec.get("timestamp") if isinstance(rec.get("timestamp"), str) else None) if ts: first_ts = ts if first_ts is None or ts < first_ts else first_ts last_ts = ts if last_ts is None or ts > last_ts else last_ts rec_type = rec.get("type") payload = _payload(rec) payload_type = payload.get("type") if rec_type == "session_meta": if isinstance(payload.get("id"), str): session_id = payload["id"] cwd = payload.get("cwd") if isinstance(payload.get("cwd"), str) else cwd version = ( payload.get("cli_version") if isinstance(payload.get("cli_version"), str) else version ) source = payload.get("source") if isinstance(source, str) and source.strip(): entrypoints.add(source.strip()) git = payload.get("git") if isinstance(git, dict) and isinstance(git.get("branch"), str): git_branch = git["branch"] elif rec_type == "turn_context": cwd = payload.get("cwd") if isinstance(payload.get("cwd"), str) else cwd model = payload.get("model") if isinstance(model, str) and model.strip(): models.add(model.strip()) elif rec_type == "response_item" and payload_type == "message": if payload.get("role") == "assistant": message_count += 1 elif rec_type == "event_msg" and payload_type == "token_count": latest = _session_tokens_from_token_count(payload) if latest is not None: tokens = latest if message_count == 0 and first_ts is None: return None now = datetime.now(UTC).replace(tzinfo=None) is_active = bool(last_ts and (now - last_ts) < timedelta(minutes=ACTIVE_WINDOW_MINUTES)) project_dir = _project_name(cwd, path.parent.name) return CodexSession( session_id=session_id, source=SOURCE, provider_label=PROVIDER_LABEL, project_dir=project_dir, cwd=cwd, title=title_hint or _title_from_records(records), models=sorted(models), tokens=tokens, cost_usd=0.0, billing_source="subscription", message_count=message_count, first_message_at=first_ts, last_message_at=last_ts, is_active=is_active, entrypoints=sorted(entrypoints), git_branch=git_branch, version=version, path=path, ) def source_metadata() -> SourceMetadata: root = _sessions_root() files = _iter_session_files(root) installed = shutil.which("codex") is not None meta = SourceMetadata(source_path=str(root)) if not root.exists(): meta.unavailable_reason = "Codex session history directory was not found." meta.setup_hint = ( "Install and run Codex CLI, or set CODEX_SESSIONS_PATH to a readable history directory." ) return meta if not root.is_dir(): meta.unavailable_reason = "CODEX_SESSIONS_PATH does not point to a directory." meta.setup_hint = "Set CODEX_SESSIONS_PATH to the Codex CLI sessions directory." return meta if not files: meta.unavailable_reason = ( "Codex CLI is installed but no readable session history exists." if installed else "No readable Codex session history exists." ) meta.setup_hint = ( "Start a Codex CLI session, or set CODEX_SESSIONS_PATH for nonstandard installs." ) return meta meta.source_status = "available" meta.session_count = len(files) latest_session = None for path in files[:20]: session = _parse_session_file(path) if session and session.last_message_at: if latest_session is None or session.last_message_at > latest_session: latest_session = session.last_message_at meta.last_activity_at = latest_session return meta def list_sessions( *, project_filter: str | None = None, active_only: bool = False, limit: int = 200, ) -> list[CodexSession]: root = _sessions_root() sessions: list[CodexSession] = [] for path in _iter_session_files(root): session = _parse_session_file(path) if session is None: continue if project_filter and project_filter.lower() not in session.project_dir.lower(): continue if active_only and not session.is_active: continue sessions.append(session) if len(sessions) >= limit: break return sessions def get_session(session_id: str) -> CodexSession | None: root = _sessions_root() for path in _iter_session_files(root): session = _parse_session_file(path) if session and session.session_id == session_id: return session return None def _find_session_path(session_id: str) -> Path | None: root = _sessions_root() for path in _iter_session_files(root): if path.stem == session_id: return path session = _parse_session_file(path) if session and session.session_id == session_id: return path return None def get_session_messages( session_id: str, limit: int = 200, offset: int = 0, ) -> tuple[list[ParsedMessage], int] | None: path = _find_session_path(session_id) if path is None: return None records = _read_records(path) if not records: return None call_results: dict[str, tuple[str, bool, bool]] = {} for rec in records: payload = _payload(rec) if payload.get("type") != "function_call_output": continue call_id = payload.get("call_id") if not isinstance(call_id, str): continue output = payload.get("output") text = output if isinstance(output, str) else json.dumps(output, default=str) result_text, result_truncated = _trunc(text) call_results[call_id] = (result_text, False, result_truncated) parsed: list[ParsedMessage] = [] last_assistant: ParsedMessage | None = None current_model: str | None = None def _assistant_placeholder(uuid: str, ts: datetime | None) -> ParsedMessage: msg = ParsedMessage( uuid=uuid, role="assistant", timestamp=ts, text_blocks=[], thinking_blocks=[], tool_uses=[], model=current_model, tokens=None, ) parsed.append(msg) return msg for idx, rec in enumerate(records): ts = _parse_iso(rec.get("timestamp") if isinstance(rec.get("timestamp"), str) else None) payload = _payload(rec) payload_type = payload.get("type") if rec.get("type") == "turn_context": model = payload.get("model") if isinstance(model, str): current_model = model continue if rec.get("type") == "response_item" and payload_type == "message": role = payload.get("role") if role == "user": text_blocks = _content_blocks(payload.get("content"), output=False) if text_blocks: parsed.append( ParsedMessage( uuid=f"{path.stem}:user:{idx}", role="user", timestamp=ts, text_blocks=text_blocks, thinking_blocks=[], tool_uses=[], model=None, tokens=None, ) ) elif role == "assistant": text_blocks = _content_blocks(payload.get("content"), output=True) if text_blocks: last_assistant = ParsedMessage( uuid=f"{path.stem}:assistant:{idx}", role="assistant", timestamp=ts, text_blocks=text_blocks, thinking_blocks=[], tool_uses=[], model=current_model, tokens=None, ) parsed.append(last_assistant) continue if rec.get("type") != "response_item" and rec.get("type") != "event_msg": continue if payload_type == "reasoning": text = _reasoning_text(payload) if not text.strip(): continue if last_assistant is None: last_assistant = _assistant_placeholder(f"{path.stem}:assistant:{idx}", ts) thinking, truncated = _trunc(text) last_assistant.thinking_blocks.append( SessionThinkingBlock(text=thinking, truncated=truncated) ) elif payload_type in {"function_call", "web_search_call"}: if last_assistant is None: last_assistant = _assistant_placeholder(f"{path.stem}:assistant:{idx}", ts) call_id = payload.get("call_id") if not isinstance(call_id, str): call_id = f"{path.stem}:tool:{idx}" raw_input, input_was_truncated = _decode_arguments(payload.get("arguments")) tool_input, input_truncated = _trunc_input(raw_input) result = call_results.get(call_id) last_assistant.tool_uses.append( SessionToolUse( tool_use_id=call_id, tool_name=str(payload.get("name") or payload_type), input=tool_input, input_truncated=input_truncated or input_was_truncated, result=result[0] if result else None, result_truncated=result[2] if result else False, is_error=result[1] if result else False, ) ) elif payload_type == "token_count": usage = _usage_from_token_count(payload, "last_token_usage") if usage is not None and last_assistant is not None: last_assistant.tokens = usage total = len(parsed) return parsed[offset : offset + limit], total _FILE_READ_TOOLS = {"read_file", "view_image", "open", "screenshot"} _FILE_WRITE_TOOLS = {"apply_patch", "imagegen"} def _bash_binary(command: str) -> str | None: cmd = command.strip().lstrip("!").strip() if not cmd: return None parts = cmd.split() while parts and "=" in parts[0] and not parts[0].startswith(("=", "./", "/")): parts.pop(0) if not parts: return None first = parts[0] binary = first.lstrip("./").rsplit("/", 1)[-1] return binary or None def _extract_patch_files(value: Any) -> list[str]: if not isinstance(value, str): return [] files: list[str] = [] for line in value.splitlines(): prefix = None for candidate in ("*** Update File: ", "*** Add File: ", "*** Delete File: "): if line.startswith(candidate): prefix = candidate break if prefix: files.append(line[len(prefix) :].strip()) return files def get_tool_analytics(project_filter: str | None = None, days: int = 30) -> dict[str, Any]: root = _sessions_root() cutoff = datetime.now(UTC).replace(tzinfo=None) - timedelta(days=days) tool_counts: dict[str, int] = {} files_read: dict[str, int] = {} files_written: dict[str, int] = {} commands: dict[str, int] = {} session_count = 0 for session in list_sessions(project_filter=project_filter, limit=1000): try: mtime = datetime.fromtimestamp(session.path.stat().st_mtime, UTC).replace(tzinfo=None) except OSError: continue if mtime < cutoff: continue result = get_session_messages(session.session_id, limit=1000, offset=0) if result is None: continue messages, _ = result session_had_tools = False for message in messages: for tool in message.tool_uses: name = tool.tool_name tool_counts[name] = tool_counts.get(name, 0) + 1 session_had_tools = True inp = tool.input path_value = inp.get("path") or inp.get("file_path") if name in _FILE_READ_TOOLS and isinstance(path_value, str): files_read[path_value] = files_read.get(path_value, 0) + 1 if name in _FILE_WRITE_TOOLS and isinstance(path_value, str): files_written[path_value] = files_written.get(path_value, 0) + 1 if name == "exec_command": cmd = inp.get("cmd") if isinstance(cmd, str): binary = _bash_binary(cmd) if binary: commands[binary] = commands.get(binary, 0) + 1 if name == "apply_patch": for file_path in _extract_patch_files(inp.get("value") or inp.get("patch")): files_written[file_path] = files_written.get(file_path, 0) + 1 if session_had_tools: session_count += 1 def _top(counter: dict[str, int], key: str) -> list[dict[str, Any]]: return [ {key: item, "count": count} for item, count in sorted(counter.items(), key=lambda pair: pair[1], reverse=True)[:20] ] return { "tool_counts": dict(sorted(tool_counts.items(), key=lambda pair: pair[1], reverse=True)), "top_files_read": _top(files_read, "path"), "top_files_written": _top(files_written, "path"), "top_commands": _top(commands, "command"), "session_count": session_count, "date_range_days": days, "source_path": str(root), } def session_stats(sessions: list[CodexSession]) -> dict[str, Any]: models: set[str] = set() for session in sessions: models.update(session.models) return { "session_count": len(sessions), "active_sessions": sum(1 for session in sessions if session.is_active), "total_tokens": sum(session.tokens.total for session in sessions), "total_cost_usd": round(sum(session.cost_usd for session in sessions), 6), "models": sorted(models), }