Pipeline/backend/app/services/claude_code_reader.py

372 lines
13 KiB
Python
Raw Normal View History

"""Reader for local Claude Code and Codex CLI data.
Discovers sessions from ~/.claude/projects/**/*.jsonl, extracts token usage,
model info, cost estimates, and activity status. Also reads ~/.claude/settings.json
and ~/.codex/config.toml for the config scanner.
All I/O is synchronous and file-local no network calls.
"""
from __future__ import annotations
import json
import os
import tomllib
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from app.core.logging import get_logger
logger = get_logger(__name__)
ACTIVE_WINDOW_MINUTES = 30
# ---------------------------------------------------------------------------
# Pricing (USD per million tokens) — mirrors runtime_usage.DEFAULT_MODEL_PRICING
# ---------------------------------------------------------------------------
_PRICING: dict[str, dict[str, float]] = {
"claude-opus-4-7": {"input": 15.00, "output": 75.00, "cache_read": 1.50, "cache_write": 18.75},
"claude-opus-4-5": {"input": 15.00, "output": 75.00, "cache_read": 1.50, "cache_write": 18.75},
"claude-sonnet-4-6": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_write": 3.75},
"claude-sonnet-4-5": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_write": 3.75},
"claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00, "cache_read": 0.08, "cache_write": 1.00},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_write": 3.75},
"claude-3-5-haiku": {"input": 0.80, "output": 4.00, "cache_read": 0.08, "cache_write": 1.00},
"claude-3-opus": {"input": 15.00, "output": 75.00, "cache_read": 1.50, "cache_write": 18.75},
"claude-3-haiku": {"input": 0.25, "output": 1.25, "cache_read": 0.03, "cache_write": 0.30},
}
def _price(model: str, input_t: int, output_t: int, cache_read: int, cache_write: int) -> float:
key = next((k for k in _PRICING if model.endswith(k) or k in model), None)
if not key:
return 0.0
p = _PRICING[key]
return (
input_t * p["input"] / 1_000_000
+ output_t * p["output"] / 1_000_000
+ cache_read * p["cache_read"] / 1_000_000
+ cache_write * p["cache_write"] / 1_000_000
)
def _parse_iso(ts: str | None) -> datetime | None:
if not ts:
return None
try:
return datetime.fromisoformat(ts.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None)
except ValueError:
return None
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class SessionTokens:
input: int = 0
output: int = 0
cache_read: int = 0
cache_write: int = 0
@property
def total(self) -> int:
return self.input + self.output + self.cache_read + self.cache_write
@dataclass
class ClaudeSession:
session_id: str
project_dir: str # raw directory name under ~/.claude/projects/
cwd: str | None # actual working directory from JSONL records
title: str | None
models: list[str]
tokens: SessionTokens
cost_usd: float
message_count: int # assistant turns
first_message_at: datetime | None
last_message_at: datetime | None
is_active: bool
entrypoints: list[str] # e.g. ["claude-vscode", "claude"]
git_branch: str | None
version: str | None
@dataclass
class ClaudeConfig:
claude_settings: dict[str, Any] = field(default_factory=dict)
codex_config: dict[str, Any] = field(default_factory=dict)
codex_rules: list[str] = field(default_factory=list)
claude_credentials_path: str | None = None
codex_credentials_path: str | None = None
# ---------------------------------------------------------------------------
# JSONL parser — one file = one session
# ---------------------------------------------------------------------------
def _parse_session_file(path: Path) -> ClaudeSession | None:
session_id = path.stem
project_dir = path.parent.name
tokens = SessionTokens()
models: set[str] = set()
entrypoints: set[str] = set()
first_ts: datetime | None = None
last_ts: datetime | None = None
title: str | None = None
cwd: str | None = None
git_branch: str | None = None
version: str | None = None
message_count = 0
try:
with open(path, encoding="utf-8", errors="replace") as fh:
for raw_line in fh:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
rec = json.loads(raw_line)
except json.JSONDecodeError:
continue
ts = _parse_iso(rec.get("timestamp"))
if ts:
if first_ts is None or ts < first_ts:
first_ts = ts
if last_ts is None or ts > last_ts:
last_ts = ts
rec_type = rec.get("type")
if rec_type == "ai-title":
title = rec.get("title") or title
if not cwd:
cwd = rec.get("cwd")
if not git_branch:
git_branch = rec.get("gitBranch")
if not version:
version = rec.get("version")
ep = rec.get("entrypoint")
if ep:
entrypoints.add(ep)
if rec_type == "assistant":
message_count += 1
msg = rec.get("message") or {}
model = msg.get("model")
if model:
models.add(model)
usage = msg.get("usage") or {}
tokens.input += usage.get("input_tokens", 0)
tokens.output += usage.get("output_tokens", 0)
tokens.cache_read += usage.get("cache_read_input_tokens", 0)
tokens.cache_write += usage.get("cache_creation_input_tokens", 0)
except (OSError, PermissionError) as exc:
logger.debug("claude_code_reader.session_read_error path=%s error=%s", path, exc)
return None
if message_count == 0 and first_ts is None:
return None
model_list = sorted(models)
primary_model = model_list[0] if model_list else ""
cost = _price(primary_model, tokens.input, tokens.output, tokens.cache_read, tokens.cache_write)
for m in model_list[1:]:
# Additional models — approximate with same token split (rare)
cost += _price(m, 0, 0, 0, 0)
now = datetime.utcnow()
is_active = bool(last_ts and (now - last_ts) < timedelta(minutes=ACTIVE_WINDOW_MINUTES))
return ClaudeSession(
session_id=session_id,
project_dir=project_dir,
cwd=cwd,
title=title,
models=model_list,
tokens=tokens,
cost_usd=round(cost, 6),
message_count=message_count,
first_message_at=first_ts,
last_message_at=last_ts,
is_active=is_active,
entrypoints=sorted(entrypoints),
git_branch=git_branch,
version=version,
)
# ---------------------------------------------------------------------------
# Session listing
# ---------------------------------------------------------------------------
def _projects_dir() -> Path:
override = os.environ.get("CLAUDE_PROJECTS_PATH", "").strip()
if override:
return Path(override)
return Path.home() / ".claude" / "projects"
def list_sessions(
*,
project_filter: str | None = None,
active_only: bool = False,
limit: int = 200,
) -> list[ClaudeSession]:
"""Return parsed sessions from ~/.claude/projects/, newest first."""
root = _projects_dir()
if not root.exists():
return []
sessions: list[ClaudeSession] = []
jsonl_files = sorted(root.rglob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True)
for path in jsonl_files:
if project_filter and project_filter.lower() not in path.parent.name.lower():
continue
session = _parse_session_file(path)
if session is None:
continue
if active_only and not session.is_active:
continue
sessions.append(session)
if len(sessions) >= limit:
break
return sessions
def get_session(session_id: str) -> ClaudeSession | None:
"""Return a single parsed session by ID."""
root = _projects_dir()
if not root.exists():
return None
for path in root.rglob(f"{session_id}.jsonl"):
return _parse_session_file(path)
return None
def list_projects() -> list[dict[str, Any]]:
"""Return discovered projects with aggregate stats."""
root = _projects_dir()
if not root.exists():
return []
projects: dict[str, dict[str, Any]] = {}
for path in root.rglob("*.jsonl"):
project_dir = path.parent.name
if project_dir not in projects:
projects[project_dir] = {
"project_dir": project_dir,
"session_count": 0,
"total_tokens": 0,
"total_cost_usd": 0.0,
"last_active_at": None,
"cwd": None,
"is_active": False,
}
session = _parse_session_file(path)
if session is None:
continue
p = projects[project_dir]
p["session_count"] += 1
p["total_tokens"] += session.tokens.total
p["total_cost_usd"] = round(p["total_cost_usd"] + session.cost_usd, 6)
if session.cwd and not p["cwd"]:
p["cwd"] = session.cwd
if session.last_message_at:
current = p["last_active_at"]
if current is None or session.last_message_at > current:
p["last_active_at"] = session.last_message_at
if session.is_active:
p["is_active"] = True
return sorted(projects.values(), key=lambda x: x["last_active_at"] or datetime.min, reverse=True)
# ---------------------------------------------------------------------------
# Config scanner
# ---------------------------------------------------------------------------
def _read_json(path: Path) -> dict[str, Any]:
try:
with open(path, encoding="utf-8") as fh:
data = json.load(fh)
return data if isinstance(data, dict) else {}
except (OSError, json.JSONDecodeError):
return {}
def _read_toml(path: Path) -> dict[str, Any]:
try:
with open(path, "rb") as fh:
return tomllib.load(fh)
except (OSError, tomllib.TOMLDecodeError):
return {}
def _read_rules(path: Path) -> list[str]:
try:
return [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()]
except (OSError, PermissionError):
return []
def read_config() -> ClaudeConfig:
"""Read Claude Code and Codex CLI configuration from local files."""
claude_dir = Path.home() / ".claude"
codex_dir = Path.home() / ".codex"
# ~/.claude/settings.json
claude_settings = _read_json(claude_dir / "settings.json")
# ~/.codex/config.toml
codex_config = _read_toml(codex_dir / "config.toml")
# ~/.codex/rules/ — all .rules files
codex_rules: list[str] = []
rules_dir = codex_dir / "rules"
if rules_dir.exists():
for rules_file in sorted(rules_dir.glob("*.rules")):
codex_rules.extend(_read_rules(rules_file))
claude_creds = os.environ.get("CLAUDE_CREDENTIALS_PATH", "").strip() or str(claude_dir / ".credentials.json")
codex_creds = os.environ.get("CODEX_CREDENTIALS_PATH", "").strip() or str(codex_dir / "auth.json")
return ClaudeConfig(
claude_settings=claude_settings,
codex_config=codex_config,
codex_rules=codex_rules,
claude_credentials_path=claude_creds if Path(claude_creds).exists() else None,
codex_credentials_path=codex_creds if Path(codex_creds).exists() else None,
)
# ---------------------------------------------------------------------------
# Aggregate stats helper
# ---------------------------------------------------------------------------
def session_stats(sessions: list[ClaudeSession]) -> dict[str, Any]:
total_tokens = sum(s.tokens.total for s in sessions)
total_cost = round(sum(s.cost_usd for s in sessions), 6)
active = sum(1 for s in sessions if s.is_active)
all_models: set[str] = set()
for s in sessions:
all_models.update(s.models)
return {
"session_count": len(sessions),
"active_sessions": active,
"total_tokens": total_tokens,
"total_cost_usd": total_cost,
"models": sorted(all_models),
}