"""Reader for local Claude Code and Codex CLI data. Discovers sessions from ~/.claude/projects/**/*.jsonl, extracts token usage, model info, cost estimates, and activity status. Also reads ~/.claude/settings.json and ~/.codex/config.toml for the config scanner. All I/O is synchronous and file-local — no network calls. """ from __future__ import annotations import json import os import tomllib from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Any from app.core.logging import get_logger logger = get_logger(__name__) ACTIVE_WINDOW_MINUTES = 30 # --------------------------------------------------------------------------- # Pricing (USD per million tokens) — mirrors runtime_usage.DEFAULT_MODEL_PRICING # --------------------------------------------------------------------------- _PRICING: dict[str, dict[str, float]] = { "claude-opus-4-7": {"input": 15.00, "output": 75.00, "cache_read": 1.50, "cache_write": 18.75}, "claude-opus-4-5": {"input": 15.00, "output": 75.00, "cache_read": 1.50, "cache_write": 18.75}, "claude-sonnet-4-6": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_write": 3.75}, "claude-sonnet-4-5": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_write": 3.75}, "claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00, "cache_read": 0.08, "cache_write": 1.00}, "claude-3-5-sonnet": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_write": 3.75}, "claude-3-5-haiku": {"input": 0.80, "output": 4.00, "cache_read": 0.08, "cache_write": 1.00}, "claude-3-opus": {"input": 15.00, "output": 75.00, "cache_read": 1.50, "cache_write": 18.75}, "claude-3-haiku": {"input": 0.25, "output": 1.25, "cache_read": 0.03, "cache_write": 0.30}, } def _price(model: str, input_t: int, output_t: int, cache_read: int, cache_write: int) -> float: key = next((k for k in _PRICING if model.endswith(k) or k in model), None) if not key: return 0.0 p = _PRICING[key] return ( input_t * p["input"] / 1_000_000 + output_t * p["output"] / 1_000_000 + cache_read * p["cache_read"] / 1_000_000 + cache_write * p["cache_write"] / 1_000_000 ) def _parse_iso(ts: str | None) -> datetime | None: if not ts: return None try: return datetime.fromisoformat(ts.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None) except ValueError: return None # --------------------------------------------------------------------------- # Data classes # --------------------------------------------------------------------------- @dataclass class SessionTokens: input: int = 0 output: int = 0 cache_read: int = 0 cache_write: int = 0 @property def total(self) -> int: return self.input + self.output + self.cache_read + self.cache_write @dataclass class ClaudeSession: session_id: str project_dir: str # raw directory name under ~/.claude/projects/ cwd: str | None # actual working directory from JSONL records title: str | None models: list[str] tokens: SessionTokens cost_usd: float message_count: int # assistant turns first_message_at: datetime | None last_message_at: datetime | None is_active: bool entrypoints: list[str] # e.g. ["claude-vscode", "claude"] git_branch: str | None version: str | None @dataclass class SessionTextBlock: text: str truncated: bool @dataclass class SessionThinkingBlock: text: str truncated: bool @dataclass class SessionToolUse: tool_use_id: str tool_name: str input: dict[str, Any] input_truncated: bool result: str | None result_truncated: bool is_error: bool @dataclass class SessionTokenUsage: input: int output: int cache_read: int cache_write: int @dataclass class ParsedMessage: uuid: str role: str # "user" | "assistant" timestamp: datetime | None text_blocks: list[SessionTextBlock] thinking_blocks: list[SessionThinkingBlock] tool_uses: list[SessionToolUse] model: str | None tokens: SessionTokenUsage | None @dataclass class ClaudeConfig: claude_settings: dict[str, Any] = field(default_factory=dict) codex_config: dict[str, Any] = field(default_factory=dict) codex_rules: list[str] = field(default_factory=list) claude_credentials_path: str | None = None codex_credentials_path: str | None = None # --------------------------------------------------------------------------- # JSONL parser — one file = one session # --------------------------------------------------------------------------- def _parse_session_file(path: Path) -> ClaudeSession | None: session_id = path.stem project_dir = path.parent.name tokens = SessionTokens() models: set[str] = set() entrypoints: set[str] = set() first_ts: datetime | None = None last_ts: datetime | None = None title: str | None = None cwd: str | None = None git_branch: str | None = None version: str | None = None message_count = 0 try: with open(path, encoding="utf-8", errors="replace") as fh: for raw_line in fh: raw_line = raw_line.strip() if not raw_line: continue try: rec = json.loads(raw_line) except json.JSONDecodeError: continue ts = _parse_iso(rec.get("timestamp")) if ts: if first_ts is None or ts < first_ts: first_ts = ts if last_ts is None or ts > last_ts: last_ts = ts rec_type = rec.get("type") if rec_type == "ai-title": title = rec.get("title") or title if not cwd: cwd = rec.get("cwd") if not git_branch: git_branch = rec.get("gitBranch") if not version: version = rec.get("version") ep = rec.get("entrypoint") if ep: entrypoints.add(ep) if rec_type == "assistant": message_count += 1 msg = rec.get("message") or {} model = msg.get("model") if model: models.add(model) usage = msg.get("usage") or {} tokens.input += usage.get("input_tokens", 0) tokens.output += usage.get("output_tokens", 0) tokens.cache_read += usage.get("cache_read_input_tokens", 0) tokens.cache_write += usage.get("cache_creation_input_tokens", 0) except (OSError, PermissionError) as exc: logger.debug("claude_code_reader.session_read_error path=%s error=%s", path, exc) return None if message_count == 0 and first_ts is None: return None model_list = sorted(models) primary_model = model_list[0] if model_list else "" cost = _price(primary_model, tokens.input, tokens.output, tokens.cache_read, tokens.cache_write) for m in model_list[1:]: # Additional models — approximate with same token split (rare) cost += _price(m, 0, 0, 0, 0) now = datetime.utcnow() is_active = bool(last_ts and (now - last_ts) < timedelta(minutes=ACTIVE_WINDOW_MINUTES)) return ClaudeSession( session_id=session_id, project_dir=project_dir, cwd=cwd, title=title, models=model_list, tokens=tokens, cost_usd=round(cost, 6), message_count=message_count, first_message_at=first_ts, last_message_at=last_ts, is_active=is_active, entrypoints=sorted(entrypoints), git_branch=git_branch, version=version, ) # --------------------------------------------------------------------------- # Session listing # --------------------------------------------------------------------------- def _projects_dir() -> Path: override = os.environ.get("CLAUDE_PROJECTS_PATH", "").strip() if override: return Path(override) return Path.home() / ".claude" / "projects" def list_sessions( *, project_filter: str | None = None, active_only: bool = False, limit: int = 200, ) -> list[ClaudeSession]: """Return parsed sessions from ~/.claude/projects/, newest first.""" root = _projects_dir() if not root.exists(): return [] sessions: list[ClaudeSession] = [] jsonl_files = sorted(root.rglob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True) for path in jsonl_files: if project_filter and project_filter.lower() not in path.parent.name.lower(): continue session = _parse_session_file(path) if session is None: continue if active_only and not session.is_active: continue sessions.append(session) if len(sessions) >= limit: break return sessions def get_session(session_id: str) -> ClaudeSession | None: """Return a single parsed session by ID.""" root = _projects_dir() if not root.exists(): return None for path in root.rglob(f"{session_id}.jsonl"): return _parse_session_file(path) return None # --------------------------------------------------------------------------- # Session message reader # --------------------------------------------------------------------------- _CONTENT_TRUNCATE = 4000 _INPUT_VALUE_TRUNCATE = 2000 def _trunc(text: str, limit: int = _CONTENT_TRUNCATE) -> tuple[str, bool]: if len(text) <= limit: return text, False return text[:limit], True def _trunc_input(input_dict: dict[str, Any]) -> tuple[dict[str, Any], bool]: """Truncate long string values inside a tool input dict.""" truncated = False result: dict[str, Any] = {} for k, v in input_dict.items(): if isinstance(v, str) and len(v) > _INPUT_VALUE_TRUNCATE: result[k] = v[:_INPUT_VALUE_TRUNCATE] truncated = True else: result[k] = v return result, truncated def _extract_tool_result_text(raw: Any) -> str: """Normalise a tool_result content field to plain text.""" if isinstance(raw, str): return raw if isinstance(raw, list): return "\n".join( b.get("text", "") for b in raw if isinstance(b, dict) and b.get("type") == "text" ) return str(raw) if raw is not None else "" def get_session_messages( session_id: str, limit: int = 200, offset: int = 0, ) -> tuple[list[ParsedMessage], int] | None: """Parse the full conversation from a session JSONL file. Returns (messages[offset:offset+limit], total) or None if not found. Tool results are embedded in the tool_use blocks of the preceding assistant message. User records that contain only tool_results are suppressed from the returned list. """ root = _projects_dir() if not root.exists(): return None path: Path | None = None for p in root.rglob(f"{session_id}.jsonl"): path = p break if path is None: return None # tool_use_id -> (result_text, is_error, truncated) tool_results: dict[str, tuple[str, bool, bool]] = {} # Insertion-ordered list of "role:key" to preserve conversation order. # For assistant messages the key is message.id (deduplication handle). # For user messages the key is their uuid. message_order: list[str] = [] # Accumulated data per assistant message.id assistant_acc: dict[str, dict[str, Any]] = {} # User message data keyed by uuid user_acc: dict[str, dict[str, Any]] = {} try: with open(path, encoding="utf-8", errors="replace") as fh: for raw_line in fh: raw_line = raw_line.strip() if not raw_line: continue try: rec = json.loads(raw_line) except json.JSONDecodeError: continue if rec.get("isSidechain"): continue rec_type = rec.get("type") if rec_type not in ("user", "assistant"): continue ts = _parse_iso(rec.get("timestamp")) uuid = rec.get("uuid", "") if rec_type == "user": content = rec.get("message", {}).get("content") or [] if not isinstance(content, list): continue # Collect tool results for later embedding for block in content: if block.get("type") == "tool_result": tid = block.get("tool_use_id", "") text = _extract_tool_result_text(block.get("content", "")) t, trunc = _trunc(text) tool_results[tid] = (t, bool(block.get("is_error", False)), trunc) # Only surface user records that carry visible text has_text = any(b.get("type") == "text" for b in content) if has_text and uuid not in user_acc: user_acc[uuid] = {"ts": ts, "content": content} message_order.append(f"user:{uuid}") else: # assistant msg = rec.get("message") or {} msg_id = msg.get("id") or uuid content = msg.get("content") if isinstance(msg.get("content"), list) else [] if msg_id not in assistant_acc: assistant_acc[msg_id] = { "uuid": uuid, "ts": ts, "model": msg.get("model"), "usage": msg.get("usage") or {}, "blocks": [], "seen_block_ids": set(), } message_order.append(f"assistant:{msg_id}") acc = assistant_acc[msg_id] for block in content: bid = block.get("id") if bid: if bid in acc["seen_block_ids"]: continue acc["seen_block_ids"].add(bid) acc["blocks"].append(block) except (OSError, PermissionError) as exc: logger.debug("claude_code_reader.messages_read_error path=%s error=%s", path, exc) return None # Build the final parsed list parsed: list[ParsedMessage] = [] for key in message_order: role, key_id = key.split(":", 1) if role == "user": data = user_acc[key_id] text_blocks: list[SessionTextBlock] = [] for block in data["content"]: if block.get("type") == "text": text, trunc = _trunc(block.get("text", "")) if text.strip(): text_blocks.append(SessionTextBlock(text=text, truncated=trunc)) if not text_blocks: continue parsed.append(ParsedMessage( uuid=key_id, role="user", timestamp=data["ts"], text_blocks=text_blocks, thinking_blocks=[], tool_uses=[], model=None, tokens=None, )) else: # assistant data = assistant_acc[key_id] text_blocks = [] thinking_blocks: list[SessionThinkingBlock] = [] tool_uses: list[SessionToolUse] = [] for block in data["blocks"]: btype = block.get("type") if btype == "text": text, trunc = _trunc(block.get("text", "")) if text: text_blocks.append(SessionTextBlock(text=text, truncated=trunc)) elif btype == "thinking": text, trunc = _trunc(block.get("thinking", "")) if text: thinking_blocks.append(SessionThinkingBlock(text=text, truncated=trunc)) elif btype == "tool_use": tid = block.get("id", "") inp, inp_trunc = _trunc_input(block.get("input") or {}) result = tool_results.get(tid) tool_uses.append(SessionToolUse( tool_use_id=tid, tool_name=block.get("name", "unknown"), input=inp, input_truncated=inp_trunc, result=result[0] if result else None, result_truncated=result[2] if result else False, is_error=result[1] if result else False, )) usage = data["usage"] parsed.append(ParsedMessage( uuid=data["uuid"], role="assistant", timestamp=data["ts"], text_blocks=text_blocks, thinking_blocks=thinking_blocks, tool_uses=tool_uses, model=data["model"], tokens=SessionTokenUsage( input=usage.get("input_tokens", 0), output=usage.get("output_tokens", 0), cache_read=usage.get("cache_read_input_tokens", 0), cache_write=usage.get("cache_creation_input_tokens", 0), ), )) total = len(parsed) return parsed[offset : offset + limit], total # --------------------------------------------------------------------------- # Tool analytics # --------------------------------------------------------------------------- # Tools that expose a readable file path in their input _FILE_READ_TOOLS = {"Read"} _FILE_WRITE_TOOLS = {"Edit", "Write", "NotebookEdit"} def _bash_binary(command: str) -> str | None: """Extract the leading binary name from a shell command string.""" cmd = command.strip().lstrip("!").strip() if not cmd: return None first = cmd.split()[0] # Strip leading path separators and common shell prefixes binary = first.lstrip("./").rsplit("/", 1)[-1] return binary or None def get_tool_analytics( project_filter: str | None = None, days: int = 30, ) -> dict[str, Any]: """Scan JSONL session files and return aggregated tool-use statistics. Uses file mtime for the days filter (fast, no need to fully parse every record). Deduplicates tool_use blocks by their block id so streaming artefacts (duplicate JSONL records with the same message.id) are not double-counted. """ root = _projects_dir() if not root.exists(): return { "tool_counts": {}, "top_files_read": [], "top_files_written": [], "top_commands": [], "session_count": 0, "date_range_days": days, } cutoff = datetime.utcnow() - timedelta(days=days) jsonl_files = sorted(root.rglob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True) tool_counts: dict[str, int] = {} files_read: dict[str, int] = {} files_written: dict[str, int] = {} bash_cmds: dict[str, int] = {} session_count = 0 seen_sessions: set[str] = set() for path in jsonl_files: if project_filter and project_filter.lower() not in path.parent.name.lower(): continue try: mtime = datetime.utcfromtimestamp(path.stat().st_mtime) except OSError: continue if mtime < cutoff: continue session_id = path.stem if session_id in seen_sessions: continue seen_sessions.add(session_id) session_had_tools = False # Deduplicate tool_use blocks within this session by block id seen_block_ids: set[str] = set() try: with open(path, encoding="utf-8", errors="replace") as fh: for raw_line in fh: raw_line = raw_line.strip() if not raw_line: continue try: rec = json.loads(raw_line) except json.JSONDecodeError: continue if rec.get("isSidechain") or rec.get("type") != "assistant": continue content = (rec.get("message") or {}).get("content") if not isinstance(content, list): continue for block in content: if block.get("type") != "tool_use": continue bid = block.get("id", "") if bid and bid in seen_block_ids: continue if bid: seen_block_ids.add(bid) name = block.get("name") or "unknown" tool_counts[name] = tool_counts.get(name, 0) + 1 session_had_tools = True inp = block.get("input") or {} if name in _FILE_READ_TOOLS: fp = inp.get("file_path", "").strip() if fp: files_read[fp] = files_read.get(fp, 0) + 1 elif name in _FILE_WRITE_TOOLS: fp = inp.get("file_path", "").strip() if fp: files_written[fp] = files_written.get(fp, 0) + 1 elif name == "Bash": binary = _bash_binary(inp.get("command", "")) if binary: bash_cmds[binary] = bash_cmds.get(binary, 0) + 1 except (OSError, PermissionError) as exc: logger.debug("claude_code_reader.analytics_read_error path=%s error=%s", path, exc) continue if session_had_tools: session_count += 1 def _top(counter: dict[str, int], key: str, n: int = 20) -> list[dict[str, Any]]: return [ {key: k, "count": v} for k, v in sorted(counter.items(), key=lambda x: x[1], reverse=True)[:n] ] return { "tool_counts": dict(sorted(tool_counts.items(), key=lambda x: x[1], reverse=True)), "top_files_read": _top(files_read, "path"), "top_files_written": _top(files_written, "path"), "top_commands": _top(bash_cmds, "command"), "session_count": session_count, "date_range_days": days, } def list_projects() -> list[dict[str, Any]]: """Return discovered projects with aggregate stats.""" root = _projects_dir() if not root.exists(): return [] projects: dict[str, dict[str, Any]] = {} for path in root.rglob("*.jsonl"): project_dir = path.parent.name if project_dir not in projects: projects[project_dir] = { "project_dir": project_dir, "session_count": 0, "total_tokens": 0, "total_cost_usd": 0.0, "last_active_at": None, "cwd": None, "is_active": False, } session = _parse_session_file(path) if session is None: continue p = projects[project_dir] p["session_count"] += 1 p["total_tokens"] += session.tokens.total p["total_cost_usd"] = round(p["total_cost_usd"] + session.cost_usd, 6) if session.cwd and not p["cwd"]: p["cwd"] = session.cwd if session.last_message_at: current = p["last_active_at"] if current is None or session.last_message_at > current: p["last_active_at"] = session.last_message_at if session.is_active: p["is_active"] = True return sorted(projects.values(), key=lambda x: x["last_active_at"] or datetime.min, reverse=True) # --------------------------------------------------------------------------- # Config scanner # --------------------------------------------------------------------------- def _read_json(path: Path) -> dict[str, Any]: try: with open(path, encoding="utf-8") as fh: data = json.load(fh) return data if isinstance(data, dict) else {} except (OSError, json.JSONDecodeError): return {} def _read_toml(path: Path) -> dict[str, Any]: try: with open(path, "rb") as fh: return tomllib.load(fh) except (OSError, tomllib.TOMLDecodeError): return {} def _read_rules(path: Path) -> list[str]: try: return [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()] except (OSError, PermissionError): return [] def read_config() -> ClaudeConfig: """Read Claude Code and Codex CLI configuration from local files.""" claude_dir = Path.home() / ".claude" codex_dir = Path.home() / ".codex" # ~/.claude/settings.json claude_settings = _read_json(claude_dir / "settings.json") # ~/.codex/config.toml codex_config = _read_toml(codex_dir / "config.toml") # ~/.codex/rules/ — all .rules files codex_rules: list[str] = [] rules_dir = codex_dir / "rules" if rules_dir.exists(): for rules_file in sorted(rules_dir.glob("*.rules")): codex_rules.extend(_read_rules(rules_file)) claude_creds = os.environ.get("CLAUDE_CREDENTIALS_PATH", "").strip() or str(claude_dir / ".credentials.json") codex_creds = os.environ.get("CODEX_CREDENTIALS_PATH", "").strip() or str(codex_dir / "auth.json") return ClaudeConfig( claude_settings=claude_settings, codex_config=codex_config, codex_rules=codex_rules, claude_credentials_path=claude_creds if Path(claude_creds).exists() else None, codex_credentials_path=codex_creds if Path(codex_creds).exists() else None, ) # --------------------------------------------------------------------------- # Aggregate stats helper # --------------------------------------------------------------------------- def session_stats(sessions: list[ClaudeSession]) -> dict[str, Any]: total_tokens = sum(s.tokens.total for s in sessions) total_cost = round(sum(s.cost_usd for s in sessions), 6) active = sum(1 for s in sessions if s.is_active) all_models: set[str] = set() for s in sessions: all_models.update(s.models) return { "session_count": len(sessions), "active_sessions": active, "total_tokens": total_tokens, "total_cost_usd": total_cost, "models": sorted(all_models), }