Pipeline/backend/app/services/claude_code_reader.py

813 lines
28 KiB
Python
Raw Permalink Normal View History

"""Reader for local Claude Code and Codex CLI data.
Discovers sessions from ~/.claude/projects/**/*.jsonl, extracts token usage,
model info, cost estimates, and activity status. Also reads ~/.claude/settings.json
and ~/.codex/config.toml for the config scanner.
All I/O is synchronous and file-local no network calls.
"""
from __future__ import annotations
import json
import os
import tomllib
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from app.core.logging import get_logger
logger = get_logger(__name__)
ACTIVE_WINDOW_MINUTES = 30
# ---------------------------------------------------------------------------
# Billing classification
# ---------------------------------------------------------------------------
# Entrypoints that run against the user's Claude subscription (flat-rate, not
# per-token API billing). Sessions from these clients cost $0 in real money
# even though they consume tokens.
_SUBSCRIPTION_ENTRYPOINTS: frozenset[str] = frozenset({
"claude-vscode",
"claude-jetbrains",
"claude-web",
})
def _billing_source(entrypoints: set[str]) -> str:
"""Return 'subscription' or 'api' based on the session's entrypoints.
Any session that touched a subscription client is treated as subscription
(cost = $0). Sessions with no entrypoint metadata default to subscription
because JSONL files are written by local Claude Code clients, not the API.
"""
if not entrypoints or entrypoints & _SUBSCRIPTION_ENTRYPOINTS:
return "subscription"
return "api"
# ---------------------------------------------------------------------------
# Pricing (USD per million tokens) — mirrors runtime_usage.DEFAULT_MODEL_PRICING
# ---------------------------------------------------------------------------
_PRICING: dict[str, dict[str, float]] = {
"claude-opus-4-7": {"input": 15.00, "output": 75.00, "cache_read": 1.50, "cache_write": 18.75},
"claude-opus-4-5": {"input": 15.00, "output": 75.00, "cache_read": 1.50, "cache_write": 18.75},
"claude-sonnet-4-6": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_write": 3.75},
"claude-sonnet-4-5": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_write": 3.75},
"claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00, "cache_read": 0.08, "cache_write": 1.00},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_write": 3.75},
"claude-3-5-haiku": {"input": 0.80, "output": 4.00, "cache_read": 0.08, "cache_write": 1.00},
"claude-3-opus": {"input": 15.00, "output": 75.00, "cache_read": 1.50, "cache_write": 18.75},
"claude-3-haiku": {"input": 0.25, "output": 1.25, "cache_read": 0.03, "cache_write": 0.30},
}
def _price(model: str, input_t: int, output_t: int, cache_read: int, cache_write: int) -> float:
key = next((k for k in _PRICING if model.endswith(k) or k in model), None)
if not key:
return 0.0
p = _PRICING[key]
return (
input_t * p["input"] / 1_000_000
+ output_t * p["output"] / 1_000_000
+ cache_read * p["cache_read"] / 1_000_000
+ cache_write * p["cache_write"] / 1_000_000
)
def _parse_iso(ts: str | None) -> datetime | None:
if not ts:
return None
try:
return datetime.fromisoformat(ts.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None)
except ValueError:
return None
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class SessionTokens:
input: int = 0
output: int = 0
cache_read: int = 0
cache_write: int = 0
@property
def total(self) -> int:
return self.input + self.output + self.cache_read + self.cache_write
@dataclass
class ClaudeSession:
session_id: str
project_dir: str # raw directory name under ~/.claude/projects/
cwd: str | None # actual working directory from JSONL records
title: str | None
models: list[str]
tokens: SessionTokens
cost_usd: float # $0 for subscription sessions, real cost for api sessions
billing_source: str # "subscription" | "api"
message_count: int # assistant turns
first_message_at: datetime | None
last_message_at: datetime | None
is_active: bool
entrypoints: list[str] # e.g. ["claude-vscode", "claude"]
git_branch: str | None
version: str | None
@dataclass
class SessionTextBlock:
text: str
truncated: bool
@dataclass
class SessionThinkingBlock:
text: str
truncated: bool
@dataclass
class SessionToolUse:
tool_use_id: str
tool_name: str
input: dict[str, Any]
input_truncated: bool
result: str | None
result_truncated: bool
is_error: bool
@dataclass
class SessionTokenUsage:
input: int
output: int
cache_read: int
cache_write: int
@dataclass
class ParsedMessage:
uuid: str
role: str # "user" | "assistant"
timestamp: datetime | None
text_blocks: list[SessionTextBlock]
thinking_blocks: list[SessionThinkingBlock]
tool_uses: list[SessionToolUse]
model: str | None
tokens: SessionTokenUsage | None
@dataclass
class ClaudeConfig:
claude_settings: dict[str, Any] = field(default_factory=dict)
codex_config: dict[str, Any] = field(default_factory=dict)
codex_rules: list[str] = field(default_factory=list)
claude_credentials_path: str | None = None
codex_credentials_path: str | None = None
# ---------------------------------------------------------------------------
# JSONL parser — one file = one session
# ---------------------------------------------------------------------------
def _parse_session_file(path: Path) -> ClaudeSession | None:
session_id = path.stem
project_dir = path.parent.name
tokens = SessionTokens()
models: set[str] = set()
entrypoints: set[str] = set()
first_ts: datetime | None = None
last_ts: datetime | None = None
title: str | None = None
cwd: str | None = None
git_branch: str | None = None
version: str | None = None
message_count = 0
try:
with open(path, encoding="utf-8", errors="replace") as fh:
for raw_line in fh:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
rec = json.loads(raw_line)
except json.JSONDecodeError:
continue
ts = _parse_iso(rec.get("timestamp"))
if ts:
if first_ts is None or ts < first_ts:
first_ts = ts
if last_ts is None or ts > last_ts:
last_ts = ts
rec_type = rec.get("type")
if rec_type == "ai-title":
title = rec.get("title") or title
if not cwd:
cwd = rec.get("cwd")
if not git_branch:
git_branch = rec.get("gitBranch")
if not version:
version = rec.get("version")
ep = rec.get("entrypoint")
if ep:
entrypoints.add(ep)
if rec_type == "assistant":
message_count += 1
msg = rec.get("message") or {}
model = msg.get("model")
if model:
models.add(model)
usage = msg.get("usage") or {}
tokens.input += usage.get("input_tokens", 0)
tokens.output += usage.get("output_tokens", 0)
tokens.cache_read += usage.get("cache_read_input_tokens", 0)
tokens.cache_write += usage.get("cache_creation_input_tokens", 0)
except (OSError, PermissionError) as exc:
logger.debug("claude_code_reader.session_read_error path=%s error=%s", path, exc)
return None
if message_count == 0 and first_ts is None:
return None
model_list = sorted(models)
billing = _billing_source(entrypoints)
if billing == "subscription":
cost = 0.0
else:
primary_model = model_list[0] if model_list else ""
cost = _price(primary_model, tokens.input, tokens.output, tokens.cache_read, tokens.cache_write)
for m in model_list[1:]:
cost += _price(m, 0, 0, 0, 0)
now = datetime.utcnow()
is_active = bool(last_ts and (now - last_ts) < timedelta(minutes=ACTIVE_WINDOW_MINUTES))
return ClaudeSession(
session_id=session_id,
project_dir=project_dir,
cwd=cwd,
title=title,
models=model_list,
tokens=tokens,
cost_usd=round(cost, 6),
billing_source=billing,
message_count=message_count,
first_message_at=first_ts,
last_message_at=last_ts,
is_active=is_active,
entrypoints=sorted(entrypoints),
git_branch=git_branch,
version=version,
)
# ---------------------------------------------------------------------------
# Session listing
# ---------------------------------------------------------------------------
def _projects_dir() -> Path:
override = os.environ.get("CLAUDE_PROJECTS_PATH", "").strip()
if override:
return Path(override)
return Path.home() / ".claude" / "projects"
def list_sessions(
*,
project_filter: str | None = None,
active_only: bool = False,
limit: int = 200,
) -> list[ClaudeSession]:
"""Return parsed sessions from ~/.claude/projects/, newest first."""
root = _projects_dir()
if not root.exists():
return []
sessions: list[ClaudeSession] = []
jsonl_files = sorted(root.rglob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True)
for path in jsonl_files:
if project_filter and project_filter.lower() not in path.parent.name.lower():
continue
session = _parse_session_file(path)
if session is None:
continue
if active_only and not session.is_active:
continue
sessions.append(session)
if len(sessions) >= limit:
break
return sessions
def get_session(session_id: str) -> ClaudeSession | None:
"""Return a single parsed session by ID."""
root = _projects_dir()
if not root.exists():
return None
for path in root.rglob(f"{session_id}.jsonl"):
return _parse_session_file(path)
return None
# ---------------------------------------------------------------------------
# Session message reader
# ---------------------------------------------------------------------------
_CONTENT_TRUNCATE = 4000
_INPUT_VALUE_TRUNCATE = 2000
def _trunc(text: str, limit: int = _CONTENT_TRUNCATE) -> tuple[str, bool]:
if len(text) <= limit:
return text, False
return text[:limit], True
def _trunc_input(input_dict: dict[str, Any]) -> tuple[dict[str, Any], bool]:
"""Truncate long string values inside a tool input dict."""
truncated = False
result: dict[str, Any] = {}
for k, v in input_dict.items():
if isinstance(v, str) and len(v) > _INPUT_VALUE_TRUNCATE:
result[k] = v[:_INPUT_VALUE_TRUNCATE]
truncated = True
else:
result[k] = v
return result, truncated
def _extract_tool_result_text(raw: Any) -> str:
"""Normalise a tool_result content field to plain text."""
if isinstance(raw, str):
return raw
if isinstance(raw, list):
return "\n".join(
b.get("text", "") for b in raw if isinstance(b, dict) and b.get("type") == "text"
)
return str(raw) if raw is not None else ""
def get_session_messages(
session_id: str,
limit: int = 200,
offset: int = 0,
) -> tuple[list[ParsedMessage], int] | None:
"""Parse the full conversation from a session JSONL file.
Returns (messages[offset:offset+limit], total) or None if not found.
Tool results are embedded in the tool_use blocks of the preceding
assistant message. User records that contain only tool_results are
suppressed from the returned list.
"""
root = _projects_dir()
if not root.exists():
return None
path: Path | None = None
for p in root.rglob(f"{session_id}.jsonl"):
path = p
break
if path is None:
return None
# tool_use_id -> (result_text, is_error, truncated)
tool_results: dict[str, tuple[str, bool, bool]] = {}
# Insertion-ordered list of "role:key" to preserve conversation order.
# For assistant messages the key is message.id (deduplication handle).
# For user messages the key is their uuid.
message_order: list[str] = []
# Accumulated data per assistant message.id
assistant_acc: dict[str, dict[str, Any]] = {}
# User message data keyed by uuid
user_acc: dict[str, dict[str, Any]] = {}
try:
with open(path, encoding="utf-8", errors="replace") as fh:
for raw_line in fh:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
rec = json.loads(raw_line)
except json.JSONDecodeError:
continue
if rec.get("isSidechain"):
continue
rec_type = rec.get("type")
if rec_type not in ("user", "assistant"):
continue
ts = _parse_iso(rec.get("timestamp"))
uuid = rec.get("uuid", "")
if rec_type == "user":
content = rec.get("message", {}).get("content") or []
if not isinstance(content, list):
continue
# Collect tool results for later embedding
for block in content:
if block.get("type") == "tool_result":
tid = block.get("tool_use_id", "")
text = _extract_tool_result_text(block.get("content", ""))
t, trunc = _trunc(text)
tool_results[tid] = (t, bool(block.get("is_error", False)), trunc)
# Only surface user records that carry visible text
has_text = any(b.get("type") == "text" for b in content)
if has_text and uuid not in user_acc:
user_acc[uuid] = {"ts": ts, "content": content}
message_order.append(f"user:{uuid}")
else: # assistant
msg = rec.get("message") or {}
msg_id = msg.get("id") or uuid
content = msg.get("content") if isinstance(msg.get("content"), list) else []
if msg_id not in assistant_acc:
assistant_acc[msg_id] = {
"uuid": uuid,
"ts": ts,
"model": msg.get("model"),
"usage": msg.get("usage") or {},
"blocks": [],
"seen_block_ids": set(),
}
message_order.append(f"assistant:{msg_id}")
acc = assistant_acc[msg_id]
for block in content:
bid = block.get("id")
if bid:
if bid in acc["seen_block_ids"]:
continue
acc["seen_block_ids"].add(bid)
acc["blocks"].append(block)
except (OSError, PermissionError) as exc:
logger.debug("claude_code_reader.messages_read_error path=%s error=%s", path, exc)
return None
# Build the final parsed list
parsed: list[ParsedMessage] = []
for key in message_order:
role, key_id = key.split(":", 1)
if role == "user":
data = user_acc[key_id]
text_blocks: list[SessionTextBlock] = []
for block in data["content"]:
if block.get("type") == "text":
text, trunc = _trunc(block.get("text", ""))
if text.strip():
text_blocks.append(SessionTextBlock(text=text, truncated=trunc))
if not text_blocks:
continue
parsed.append(ParsedMessage(
uuid=key_id,
role="user",
timestamp=data["ts"],
text_blocks=text_blocks,
thinking_blocks=[],
tool_uses=[],
model=None,
tokens=None,
))
else: # assistant
data = assistant_acc[key_id]
text_blocks = []
thinking_blocks: list[SessionThinkingBlock] = []
tool_uses: list[SessionToolUse] = []
for block in data["blocks"]:
btype = block.get("type")
if btype == "text":
text, trunc = _trunc(block.get("text", ""))
if text:
text_blocks.append(SessionTextBlock(text=text, truncated=trunc))
elif btype == "thinking":
text, trunc = _trunc(block.get("thinking", ""))
if text:
thinking_blocks.append(SessionThinkingBlock(text=text, truncated=trunc))
elif btype == "tool_use":
tid = block.get("id", "")
inp, inp_trunc = _trunc_input(block.get("input") or {})
result = tool_results.get(tid)
tool_uses.append(SessionToolUse(
tool_use_id=tid,
tool_name=block.get("name", "unknown"),
input=inp,
input_truncated=inp_trunc,
result=result[0] if result else None,
result_truncated=result[2] if result else False,
is_error=result[1] if result else False,
))
usage = data["usage"]
parsed.append(ParsedMessage(
uuid=data["uuid"],
role="assistant",
timestamp=data["ts"],
text_blocks=text_blocks,
thinking_blocks=thinking_blocks,
tool_uses=tool_uses,
model=data["model"],
tokens=SessionTokenUsage(
input=usage.get("input_tokens", 0),
output=usage.get("output_tokens", 0),
cache_read=usage.get("cache_read_input_tokens", 0),
cache_write=usage.get("cache_creation_input_tokens", 0),
),
))
total = len(parsed)
return parsed[offset : offset + limit], total
# ---------------------------------------------------------------------------
# Tool analytics
# ---------------------------------------------------------------------------
# Tools that expose a readable file path in their input
_FILE_READ_TOOLS = {"Read"}
_FILE_WRITE_TOOLS = {"Edit", "Write", "NotebookEdit"}
def _bash_binary(command: str) -> str | None:
"""Extract the leading binary name from a shell command string."""
cmd = command.strip().lstrip("!").strip()
if not cmd:
return None
first = cmd.split()[0]
# Strip leading path separators and common shell prefixes
binary = first.lstrip("./").rsplit("/", 1)[-1]
return binary or None
def get_tool_analytics(
project_filter: str | None = None,
days: int = 30,
) -> dict[str, Any]:
"""Scan JSONL session files and return aggregated tool-use statistics.
Uses file mtime for the days filter (fast, no need to fully parse
every record). Deduplicates tool_use blocks by their block id so
streaming artefacts (duplicate JSONL records with the same message.id)
are not double-counted.
"""
root = _projects_dir()
if not root.exists():
return {
"tool_counts": {},
"top_files_read": [],
"top_files_written": [],
"top_commands": [],
"session_count": 0,
"date_range_days": days,
}
cutoff = datetime.utcnow() - timedelta(days=days)
jsonl_files = sorted(root.rglob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True)
tool_counts: dict[str, int] = {}
files_read: dict[str, int] = {}
files_written: dict[str, int] = {}
bash_cmds: dict[str, int] = {}
session_count = 0
seen_sessions: set[str] = set()
for path in jsonl_files:
if project_filter and project_filter.lower() not in path.parent.name.lower():
continue
try:
mtime = datetime.utcfromtimestamp(path.stat().st_mtime)
except OSError:
continue
if mtime < cutoff:
continue
session_id = path.stem
if session_id in seen_sessions:
continue
seen_sessions.add(session_id)
session_had_tools = False
# Deduplicate tool_use blocks within this session by block id
seen_block_ids: set[str] = set()
try:
with open(path, encoding="utf-8", errors="replace") as fh:
for raw_line in fh:
raw_line = raw_line.strip()
if not raw_line:
continue
try:
rec = json.loads(raw_line)
except json.JSONDecodeError:
continue
if rec.get("isSidechain") or rec.get("type") != "assistant":
continue
content = (rec.get("message") or {}).get("content")
if not isinstance(content, list):
continue
for block in content:
if block.get("type") != "tool_use":
continue
bid = block.get("id", "")
if bid and bid in seen_block_ids:
continue
if bid:
seen_block_ids.add(bid)
name = block.get("name") or "unknown"
tool_counts[name] = tool_counts.get(name, 0) + 1
session_had_tools = True
inp = block.get("input") or {}
if name in _FILE_READ_TOOLS:
fp = inp.get("file_path", "").strip()
if fp:
files_read[fp] = files_read.get(fp, 0) + 1
elif name in _FILE_WRITE_TOOLS:
fp = inp.get("file_path", "").strip()
if fp:
files_written[fp] = files_written.get(fp, 0) + 1
elif name == "Bash":
binary = _bash_binary(inp.get("command", ""))
if binary:
bash_cmds[binary] = bash_cmds.get(binary, 0) + 1
except (OSError, PermissionError) as exc:
logger.debug("claude_code_reader.analytics_read_error path=%s error=%s", path, exc)
continue
if session_had_tools:
session_count += 1
def _top(counter: dict[str, int], key: str, n: int = 20) -> list[dict[str, Any]]:
return [
{key: k, "count": v}
for k, v in sorted(counter.items(), key=lambda x: x[1], reverse=True)[:n]
]
return {
"tool_counts": dict(sorted(tool_counts.items(), key=lambda x: x[1], reverse=True)),
"top_files_read": _top(files_read, "path"),
"top_files_written": _top(files_written, "path"),
"top_commands": _top(bash_cmds, "command"),
"session_count": session_count,
"date_range_days": days,
}
def list_projects() -> list[dict[str, Any]]:
"""Return discovered projects with aggregate stats."""
root = _projects_dir()
if not root.exists():
return []
projects: dict[str, dict[str, Any]] = {}
for path in root.rglob("*.jsonl"):
project_dir = path.parent.name
if project_dir not in projects:
projects[project_dir] = {
"project_dir": project_dir,
"session_count": 0,
"total_tokens": 0,
"total_cost_usd": 0.0,
"last_active_at": None,
"cwd": None,
"is_active": False,
}
session = _parse_session_file(path)
if session is None:
continue
p = projects[project_dir]
p["session_count"] += 1
p["total_tokens"] += session.tokens.total
p["total_cost_usd"] = round(p["total_cost_usd"] + session.cost_usd, 6)
if session.cwd and not p["cwd"]:
p["cwd"] = session.cwd
if session.last_message_at:
current = p["last_active_at"]
if current is None or session.last_message_at > current:
p["last_active_at"] = session.last_message_at
if session.is_active:
p["is_active"] = True
return sorted(projects.values(), key=lambda x: x["last_active_at"] or datetime.min, reverse=True)
# ---------------------------------------------------------------------------
# Config scanner
# ---------------------------------------------------------------------------
def _read_json(path: Path) -> dict[str, Any]:
try:
with open(path, encoding="utf-8") as fh:
data = json.load(fh)
return data if isinstance(data, dict) else {}
except (OSError, json.JSONDecodeError):
return {}
def _read_toml(path: Path) -> dict[str, Any]:
try:
with open(path, "rb") as fh:
return tomllib.load(fh)
except (OSError, tomllib.TOMLDecodeError):
return {}
def _read_rules(path: Path) -> list[str]:
try:
return [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()]
except (OSError, PermissionError):
return []
def read_config() -> ClaudeConfig:
"""Read Claude Code and Codex CLI configuration from local files."""
claude_dir = Path.home() / ".claude"
codex_dir = Path.home() / ".codex"
# ~/.claude/settings.json
claude_settings = _read_json(claude_dir / "settings.json")
# ~/.codex/config.toml
codex_config = _read_toml(codex_dir / "config.toml")
# ~/.codex/rules/ — all .rules files
codex_rules: list[str] = []
rules_dir = codex_dir / "rules"
if rules_dir.exists():
for rules_file in sorted(rules_dir.glob("*.rules")):
codex_rules.extend(_read_rules(rules_file))
claude_creds = os.environ.get("CLAUDE_CREDENTIALS_PATH", "").strip() or str(claude_dir / ".credentials.json")
codex_creds = os.environ.get("CODEX_CREDENTIALS_PATH", "").strip() or str(codex_dir / "auth.json")
return ClaudeConfig(
claude_settings=claude_settings,
codex_config=codex_config,
codex_rules=codex_rules,
claude_credentials_path=claude_creds if Path(claude_creds).exists() else None,
codex_credentials_path=codex_creds if Path(codex_creds).exists() else None,
)
# ---------------------------------------------------------------------------
# Aggregate stats helper
# ---------------------------------------------------------------------------
def session_stats(sessions: list[ClaudeSession]) -> dict[str, Any]:
total_tokens = sum(s.tokens.total for s in sessions)
total_cost = round(sum(s.cost_usd for s in sessions), 6)
active = sum(1 for s in sessions if s.is_active)
all_models: set[str] = set()
for s in sessions:
all_models.update(s.models)
return {
"session_count": len(sessions),
"active_sessions": active,
"total_tokens": total_tokens,
"total_cost_usd": total_cost,
"models": sorted(all_models),
}