2026-05-22 04:23:27 -05:00
|
|
|
"""Reader for local Claude Code and Codex CLI data.
|
|
|
|
|
|
|
|
|
|
Discovers sessions from ~/.claude/projects/**/*.jsonl, extracts token usage,
|
|
|
|
|
model info, cost estimates, and activity status. Also reads ~/.claude/settings.json
|
|
|
|
|
and ~/.codex/config.toml for the config scanner.
|
|
|
|
|
|
|
|
|
|
All I/O is synchronous and file-local — no network calls.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import tomllib
|
|
|
|
|
from dataclasses import dataclass, field
|
|
|
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
|
|
from app.core.logging import get_logger
|
|
|
|
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
|
|
|
|
ACTIVE_WINDOW_MINUTES = 30
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Pricing (USD per million tokens) — mirrors runtime_usage.DEFAULT_MODEL_PRICING
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
_PRICING: dict[str, dict[str, float]] = {
|
|
|
|
|
"claude-opus-4-7": {"input": 15.00, "output": 75.00, "cache_read": 1.50, "cache_write": 18.75},
|
|
|
|
|
"claude-opus-4-5": {"input": 15.00, "output": 75.00, "cache_read": 1.50, "cache_write": 18.75},
|
|
|
|
|
"claude-sonnet-4-6": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_write": 3.75},
|
|
|
|
|
"claude-sonnet-4-5": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_write": 3.75},
|
|
|
|
|
"claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00, "cache_read": 0.08, "cache_write": 1.00},
|
|
|
|
|
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00, "cache_read": 0.30, "cache_write": 3.75},
|
|
|
|
|
"claude-3-5-haiku": {"input": 0.80, "output": 4.00, "cache_read": 0.08, "cache_write": 1.00},
|
|
|
|
|
"claude-3-opus": {"input": 15.00, "output": 75.00, "cache_read": 1.50, "cache_write": 18.75},
|
|
|
|
|
"claude-3-haiku": {"input": 0.25, "output": 1.25, "cache_read": 0.03, "cache_write": 0.30},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _price(model: str, input_t: int, output_t: int, cache_read: int, cache_write: int) -> float:
|
|
|
|
|
key = next((k for k in _PRICING if model.endswith(k) or k in model), None)
|
|
|
|
|
if not key:
|
|
|
|
|
return 0.0
|
|
|
|
|
p = _PRICING[key]
|
|
|
|
|
return (
|
|
|
|
|
input_t * p["input"] / 1_000_000
|
|
|
|
|
+ output_t * p["output"] / 1_000_000
|
|
|
|
|
+ cache_read * p["cache_read"] / 1_000_000
|
|
|
|
|
+ cache_write * p["cache_write"] / 1_000_000
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_iso(ts: str | None) -> datetime | None:
|
|
|
|
|
if not ts:
|
|
|
|
|
return None
|
|
|
|
|
try:
|
|
|
|
|
return datetime.fromisoformat(ts.replace("Z", "+00:00")).astimezone(UTC).replace(tzinfo=None)
|
|
|
|
|
except ValueError:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Data classes
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class SessionTokens:
|
|
|
|
|
input: int = 0
|
|
|
|
|
output: int = 0
|
|
|
|
|
cache_read: int = 0
|
|
|
|
|
cache_write: int = 0
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def total(self) -> int:
|
|
|
|
|
return self.input + self.output + self.cache_read + self.cache_write
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class ClaudeSession:
|
|
|
|
|
session_id: str
|
|
|
|
|
project_dir: str # raw directory name under ~/.claude/projects/
|
|
|
|
|
cwd: str | None # actual working directory from JSONL records
|
|
|
|
|
title: str | None
|
|
|
|
|
models: list[str]
|
|
|
|
|
tokens: SessionTokens
|
|
|
|
|
cost_usd: float
|
|
|
|
|
message_count: int # assistant turns
|
|
|
|
|
first_message_at: datetime | None
|
|
|
|
|
last_message_at: datetime | None
|
|
|
|
|
is_active: bool
|
|
|
|
|
entrypoints: list[str] # e.g. ["claude-vscode", "claude"]
|
|
|
|
|
git_branch: str | None
|
|
|
|
|
version: str | None
|
|
|
|
|
|
|
|
|
|
|
2026-05-24 16:19:24 -05:00
|
|
|
@dataclass
|
|
|
|
|
class SessionTextBlock:
|
|
|
|
|
text: str
|
|
|
|
|
truncated: bool
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class SessionThinkingBlock:
|
|
|
|
|
text: str
|
|
|
|
|
truncated: bool
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class SessionToolUse:
|
|
|
|
|
tool_use_id: str
|
|
|
|
|
tool_name: str
|
|
|
|
|
input: dict[str, Any]
|
|
|
|
|
input_truncated: bool
|
|
|
|
|
result: str | None
|
|
|
|
|
result_truncated: bool
|
|
|
|
|
is_error: bool
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class SessionTokenUsage:
|
|
|
|
|
input: int
|
|
|
|
|
output: int
|
|
|
|
|
cache_read: int
|
|
|
|
|
cache_write: int
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class ParsedMessage:
|
|
|
|
|
uuid: str
|
|
|
|
|
role: str # "user" | "assistant"
|
|
|
|
|
timestamp: datetime | None
|
|
|
|
|
text_blocks: list[SessionTextBlock]
|
|
|
|
|
thinking_blocks: list[SessionThinkingBlock]
|
|
|
|
|
tool_uses: list[SessionToolUse]
|
|
|
|
|
model: str | None
|
|
|
|
|
tokens: SessionTokenUsage | None
|
|
|
|
|
|
|
|
|
|
|
2026-05-22 04:23:27 -05:00
|
|
|
@dataclass
|
|
|
|
|
class ClaudeConfig:
|
|
|
|
|
claude_settings: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
codex_config: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
codex_rules: list[str] = field(default_factory=list)
|
|
|
|
|
claude_credentials_path: str | None = None
|
|
|
|
|
codex_credentials_path: str | None = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# JSONL parser — one file = one session
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def _parse_session_file(path: Path) -> ClaudeSession | None:
|
|
|
|
|
session_id = path.stem
|
|
|
|
|
project_dir = path.parent.name
|
|
|
|
|
|
|
|
|
|
tokens = SessionTokens()
|
|
|
|
|
models: set[str] = set()
|
|
|
|
|
entrypoints: set[str] = set()
|
|
|
|
|
first_ts: datetime | None = None
|
|
|
|
|
last_ts: datetime | None = None
|
|
|
|
|
title: str | None = None
|
|
|
|
|
cwd: str | None = None
|
|
|
|
|
git_branch: str | None = None
|
|
|
|
|
version: str | None = None
|
|
|
|
|
message_count = 0
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
with open(path, encoding="utf-8", errors="replace") as fh:
|
|
|
|
|
for raw_line in fh:
|
|
|
|
|
raw_line = raw_line.strip()
|
|
|
|
|
if not raw_line:
|
|
|
|
|
continue
|
|
|
|
|
try:
|
|
|
|
|
rec = json.loads(raw_line)
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
ts = _parse_iso(rec.get("timestamp"))
|
|
|
|
|
if ts:
|
|
|
|
|
if first_ts is None or ts < first_ts:
|
|
|
|
|
first_ts = ts
|
|
|
|
|
if last_ts is None or ts > last_ts:
|
|
|
|
|
last_ts = ts
|
|
|
|
|
|
|
|
|
|
rec_type = rec.get("type")
|
|
|
|
|
|
|
|
|
|
if rec_type == "ai-title":
|
|
|
|
|
title = rec.get("title") or title
|
|
|
|
|
|
|
|
|
|
if not cwd:
|
|
|
|
|
cwd = rec.get("cwd")
|
|
|
|
|
if not git_branch:
|
|
|
|
|
git_branch = rec.get("gitBranch")
|
|
|
|
|
if not version:
|
|
|
|
|
version = rec.get("version")
|
|
|
|
|
|
|
|
|
|
ep = rec.get("entrypoint")
|
|
|
|
|
if ep:
|
|
|
|
|
entrypoints.add(ep)
|
|
|
|
|
|
|
|
|
|
if rec_type == "assistant":
|
|
|
|
|
message_count += 1
|
|
|
|
|
msg = rec.get("message") or {}
|
|
|
|
|
model = msg.get("model")
|
|
|
|
|
if model:
|
|
|
|
|
models.add(model)
|
|
|
|
|
usage = msg.get("usage") or {}
|
|
|
|
|
tokens.input += usage.get("input_tokens", 0)
|
|
|
|
|
tokens.output += usage.get("output_tokens", 0)
|
|
|
|
|
tokens.cache_read += usage.get("cache_read_input_tokens", 0)
|
|
|
|
|
tokens.cache_write += usage.get("cache_creation_input_tokens", 0)
|
|
|
|
|
|
|
|
|
|
except (OSError, PermissionError) as exc:
|
|
|
|
|
logger.debug("claude_code_reader.session_read_error path=%s error=%s", path, exc)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
if message_count == 0 and first_ts is None:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
model_list = sorted(models)
|
|
|
|
|
primary_model = model_list[0] if model_list else ""
|
|
|
|
|
cost = _price(primary_model, tokens.input, tokens.output, tokens.cache_read, tokens.cache_write)
|
|
|
|
|
for m in model_list[1:]:
|
|
|
|
|
# Additional models — approximate with same token split (rare)
|
|
|
|
|
cost += _price(m, 0, 0, 0, 0)
|
|
|
|
|
|
|
|
|
|
now = datetime.utcnow()
|
|
|
|
|
is_active = bool(last_ts and (now - last_ts) < timedelta(minutes=ACTIVE_WINDOW_MINUTES))
|
|
|
|
|
|
|
|
|
|
return ClaudeSession(
|
|
|
|
|
session_id=session_id,
|
|
|
|
|
project_dir=project_dir,
|
|
|
|
|
cwd=cwd,
|
|
|
|
|
title=title,
|
|
|
|
|
models=model_list,
|
|
|
|
|
tokens=tokens,
|
|
|
|
|
cost_usd=round(cost, 6),
|
|
|
|
|
message_count=message_count,
|
|
|
|
|
first_message_at=first_ts,
|
|
|
|
|
last_message_at=last_ts,
|
|
|
|
|
is_active=is_active,
|
|
|
|
|
entrypoints=sorted(entrypoints),
|
|
|
|
|
git_branch=git_branch,
|
|
|
|
|
version=version,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Session listing
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def _projects_dir() -> Path:
|
|
|
|
|
override = os.environ.get("CLAUDE_PROJECTS_PATH", "").strip()
|
|
|
|
|
if override:
|
|
|
|
|
return Path(override)
|
|
|
|
|
return Path.home() / ".claude" / "projects"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def list_sessions(
|
|
|
|
|
*,
|
|
|
|
|
project_filter: str | None = None,
|
|
|
|
|
active_only: bool = False,
|
|
|
|
|
limit: int = 200,
|
|
|
|
|
) -> list[ClaudeSession]:
|
|
|
|
|
"""Return parsed sessions from ~/.claude/projects/, newest first."""
|
|
|
|
|
root = _projects_dir()
|
|
|
|
|
if not root.exists():
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
sessions: list[ClaudeSession] = []
|
|
|
|
|
jsonl_files = sorted(root.rglob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True)
|
|
|
|
|
|
|
|
|
|
for path in jsonl_files:
|
|
|
|
|
if project_filter and project_filter.lower() not in path.parent.name.lower():
|
|
|
|
|
continue
|
|
|
|
|
session = _parse_session_file(path)
|
|
|
|
|
if session is None:
|
|
|
|
|
continue
|
|
|
|
|
if active_only and not session.is_active:
|
|
|
|
|
continue
|
|
|
|
|
sessions.append(session)
|
|
|
|
|
if len(sessions) >= limit:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
return sessions
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_session(session_id: str) -> ClaudeSession | None:
|
|
|
|
|
"""Return a single parsed session by ID."""
|
|
|
|
|
root = _projects_dir()
|
|
|
|
|
if not root.exists():
|
|
|
|
|
return None
|
|
|
|
|
for path in root.rglob(f"{session_id}.jsonl"):
|
|
|
|
|
return _parse_session_file(path)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
2026-05-24 16:19:24 -05:00
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Session message reader
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
_CONTENT_TRUNCATE = 4000
|
|
|
|
|
_INPUT_VALUE_TRUNCATE = 2000
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _trunc(text: str, limit: int = _CONTENT_TRUNCATE) -> tuple[str, bool]:
|
|
|
|
|
if len(text) <= limit:
|
|
|
|
|
return text, False
|
|
|
|
|
return text[:limit], True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _trunc_input(input_dict: dict[str, Any]) -> tuple[dict[str, Any], bool]:
|
|
|
|
|
"""Truncate long string values inside a tool input dict."""
|
|
|
|
|
truncated = False
|
|
|
|
|
result: dict[str, Any] = {}
|
|
|
|
|
for k, v in input_dict.items():
|
|
|
|
|
if isinstance(v, str) and len(v) > _INPUT_VALUE_TRUNCATE:
|
|
|
|
|
result[k] = v[:_INPUT_VALUE_TRUNCATE]
|
|
|
|
|
truncated = True
|
|
|
|
|
else:
|
|
|
|
|
result[k] = v
|
|
|
|
|
return result, truncated
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_tool_result_text(raw: Any) -> str:
|
|
|
|
|
"""Normalise a tool_result content field to plain text."""
|
|
|
|
|
if isinstance(raw, str):
|
|
|
|
|
return raw
|
|
|
|
|
if isinstance(raw, list):
|
|
|
|
|
return "\n".join(
|
|
|
|
|
b.get("text", "") for b in raw if isinstance(b, dict) and b.get("type") == "text"
|
|
|
|
|
)
|
|
|
|
|
return str(raw) if raw is not None else ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_session_messages(
|
|
|
|
|
session_id: str,
|
|
|
|
|
limit: int = 200,
|
|
|
|
|
offset: int = 0,
|
|
|
|
|
) -> tuple[list[ParsedMessage], int] | None:
|
|
|
|
|
"""Parse the full conversation from a session JSONL file.
|
|
|
|
|
|
|
|
|
|
Returns (messages[offset:offset+limit], total) or None if not found.
|
|
|
|
|
Tool results are embedded in the tool_use blocks of the preceding
|
|
|
|
|
assistant message. User records that contain only tool_results are
|
|
|
|
|
suppressed from the returned list.
|
|
|
|
|
"""
|
|
|
|
|
root = _projects_dir()
|
|
|
|
|
if not root.exists():
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
path: Path | None = None
|
|
|
|
|
for p in root.rglob(f"{session_id}.jsonl"):
|
|
|
|
|
path = p
|
|
|
|
|
break
|
|
|
|
|
if path is None:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# tool_use_id -> (result_text, is_error, truncated)
|
|
|
|
|
tool_results: dict[str, tuple[str, bool, bool]] = {}
|
|
|
|
|
|
|
|
|
|
# Insertion-ordered list of "role:key" to preserve conversation order.
|
|
|
|
|
# For assistant messages the key is message.id (deduplication handle).
|
|
|
|
|
# For user messages the key is their uuid.
|
|
|
|
|
message_order: list[str] = []
|
|
|
|
|
|
|
|
|
|
# Accumulated data per assistant message.id
|
|
|
|
|
assistant_acc: dict[str, dict[str, Any]] = {}
|
|
|
|
|
|
|
|
|
|
# User message data keyed by uuid
|
|
|
|
|
user_acc: dict[str, dict[str, Any]] = {}
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
with open(path, encoding="utf-8", errors="replace") as fh:
|
|
|
|
|
for raw_line in fh:
|
|
|
|
|
raw_line = raw_line.strip()
|
|
|
|
|
if not raw_line:
|
|
|
|
|
continue
|
|
|
|
|
try:
|
|
|
|
|
rec = json.loads(raw_line)
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if rec.get("isSidechain"):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
rec_type = rec.get("type")
|
|
|
|
|
if rec_type not in ("user", "assistant"):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
ts = _parse_iso(rec.get("timestamp"))
|
|
|
|
|
uuid = rec.get("uuid", "")
|
|
|
|
|
|
|
|
|
|
if rec_type == "user":
|
|
|
|
|
content = rec.get("message", {}).get("content") or []
|
|
|
|
|
if not isinstance(content, list):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Collect tool results for later embedding
|
|
|
|
|
for block in content:
|
|
|
|
|
if block.get("type") == "tool_result":
|
|
|
|
|
tid = block.get("tool_use_id", "")
|
|
|
|
|
text = _extract_tool_result_text(block.get("content", ""))
|
|
|
|
|
t, trunc = _trunc(text)
|
|
|
|
|
tool_results[tid] = (t, bool(block.get("is_error", False)), trunc)
|
|
|
|
|
|
|
|
|
|
# Only surface user records that carry visible text
|
|
|
|
|
has_text = any(b.get("type") == "text" for b in content)
|
|
|
|
|
if has_text and uuid not in user_acc:
|
|
|
|
|
user_acc[uuid] = {"ts": ts, "content": content}
|
|
|
|
|
message_order.append(f"user:{uuid}")
|
|
|
|
|
|
|
|
|
|
else: # assistant
|
|
|
|
|
msg = rec.get("message") or {}
|
|
|
|
|
msg_id = msg.get("id") or uuid
|
|
|
|
|
content = msg.get("content") if isinstance(msg.get("content"), list) else []
|
|
|
|
|
|
|
|
|
|
if msg_id not in assistant_acc:
|
|
|
|
|
assistant_acc[msg_id] = {
|
|
|
|
|
"uuid": uuid,
|
|
|
|
|
"ts": ts,
|
|
|
|
|
"model": msg.get("model"),
|
|
|
|
|
"usage": msg.get("usage") or {},
|
|
|
|
|
"blocks": [],
|
|
|
|
|
"seen_block_ids": set(),
|
|
|
|
|
}
|
|
|
|
|
message_order.append(f"assistant:{msg_id}")
|
|
|
|
|
|
|
|
|
|
acc = assistant_acc[msg_id]
|
|
|
|
|
for block in content:
|
|
|
|
|
bid = block.get("id")
|
|
|
|
|
if bid:
|
|
|
|
|
if bid in acc["seen_block_ids"]:
|
|
|
|
|
continue
|
|
|
|
|
acc["seen_block_ids"].add(bid)
|
|
|
|
|
acc["blocks"].append(block)
|
|
|
|
|
|
|
|
|
|
except (OSError, PermissionError) as exc:
|
|
|
|
|
logger.debug("claude_code_reader.messages_read_error path=%s error=%s", path, exc)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Build the final parsed list
|
|
|
|
|
parsed: list[ParsedMessage] = []
|
|
|
|
|
|
|
|
|
|
for key in message_order:
|
|
|
|
|
role, key_id = key.split(":", 1)
|
|
|
|
|
|
|
|
|
|
if role == "user":
|
|
|
|
|
data = user_acc[key_id]
|
|
|
|
|
text_blocks: list[SessionTextBlock] = []
|
|
|
|
|
for block in data["content"]:
|
|
|
|
|
if block.get("type") == "text":
|
|
|
|
|
text, trunc = _trunc(block.get("text", ""))
|
|
|
|
|
if text.strip():
|
|
|
|
|
text_blocks.append(SessionTextBlock(text=text, truncated=trunc))
|
|
|
|
|
if not text_blocks:
|
|
|
|
|
continue
|
|
|
|
|
parsed.append(ParsedMessage(
|
|
|
|
|
uuid=key_id,
|
|
|
|
|
role="user",
|
|
|
|
|
timestamp=data["ts"],
|
|
|
|
|
text_blocks=text_blocks,
|
|
|
|
|
thinking_blocks=[],
|
|
|
|
|
tool_uses=[],
|
|
|
|
|
model=None,
|
|
|
|
|
tokens=None,
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
else: # assistant
|
|
|
|
|
data = assistant_acc[key_id]
|
|
|
|
|
text_blocks = []
|
|
|
|
|
thinking_blocks: list[SessionThinkingBlock] = []
|
|
|
|
|
tool_uses: list[SessionToolUse] = []
|
|
|
|
|
|
|
|
|
|
for block in data["blocks"]:
|
|
|
|
|
btype = block.get("type")
|
|
|
|
|
if btype == "text":
|
|
|
|
|
text, trunc = _trunc(block.get("text", ""))
|
|
|
|
|
if text:
|
|
|
|
|
text_blocks.append(SessionTextBlock(text=text, truncated=trunc))
|
|
|
|
|
elif btype == "thinking":
|
|
|
|
|
text, trunc = _trunc(block.get("thinking", ""))
|
|
|
|
|
if text:
|
|
|
|
|
thinking_blocks.append(SessionThinkingBlock(text=text, truncated=trunc))
|
|
|
|
|
elif btype == "tool_use":
|
|
|
|
|
tid = block.get("id", "")
|
|
|
|
|
inp, inp_trunc = _trunc_input(block.get("input") or {})
|
|
|
|
|
result = tool_results.get(tid)
|
|
|
|
|
tool_uses.append(SessionToolUse(
|
|
|
|
|
tool_use_id=tid,
|
|
|
|
|
tool_name=block.get("name", "unknown"),
|
|
|
|
|
input=inp,
|
|
|
|
|
input_truncated=inp_trunc,
|
|
|
|
|
result=result[0] if result else None,
|
|
|
|
|
result_truncated=result[2] if result else False,
|
|
|
|
|
is_error=result[1] if result else False,
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
usage = data["usage"]
|
|
|
|
|
parsed.append(ParsedMessage(
|
|
|
|
|
uuid=data["uuid"],
|
|
|
|
|
role="assistant",
|
|
|
|
|
timestamp=data["ts"],
|
|
|
|
|
text_blocks=text_blocks,
|
|
|
|
|
thinking_blocks=thinking_blocks,
|
|
|
|
|
tool_uses=tool_uses,
|
|
|
|
|
model=data["model"],
|
|
|
|
|
tokens=SessionTokenUsage(
|
|
|
|
|
input=usage.get("input_tokens", 0),
|
|
|
|
|
output=usage.get("output_tokens", 0),
|
|
|
|
|
cache_read=usage.get("cache_read_input_tokens", 0),
|
|
|
|
|
cache_write=usage.get("cache_creation_input_tokens", 0),
|
|
|
|
|
),
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
total = len(parsed)
|
|
|
|
|
return parsed[offset : offset + limit], total
|
|
|
|
|
|
|
|
|
|
|
2026-05-22 04:23:27 -05:00
|
|
|
def list_projects() -> list[dict[str, Any]]:
|
|
|
|
|
"""Return discovered projects with aggregate stats."""
|
|
|
|
|
root = _projects_dir()
|
|
|
|
|
if not root.exists():
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
projects: dict[str, dict[str, Any]] = {}
|
|
|
|
|
for path in root.rglob("*.jsonl"):
|
|
|
|
|
project_dir = path.parent.name
|
|
|
|
|
if project_dir not in projects:
|
|
|
|
|
projects[project_dir] = {
|
|
|
|
|
"project_dir": project_dir,
|
|
|
|
|
"session_count": 0,
|
|
|
|
|
"total_tokens": 0,
|
|
|
|
|
"total_cost_usd": 0.0,
|
|
|
|
|
"last_active_at": None,
|
|
|
|
|
"cwd": None,
|
|
|
|
|
"is_active": False,
|
|
|
|
|
}
|
|
|
|
|
session = _parse_session_file(path)
|
|
|
|
|
if session is None:
|
|
|
|
|
continue
|
|
|
|
|
p = projects[project_dir]
|
|
|
|
|
p["session_count"] += 1
|
|
|
|
|
p["total_tokens"] += session.tokens.total
|
|
|
|
|
p["total_cost_usd"] = round(p["total_cost_usd"] + session.cost_usd, 6)
|
|
|
|
|
if session.cwd and not p["cwd"]:
|
|
|
|
|
p["cwd"] = session.cwd
|
|
|
|
|
if session.last_message_at:
|
|
|
|
|
current = p["last_active_at"]
|
|
|
|
|
if current is None or session.last_message_at > current:
|
|
|
|
|
p["last_active_at"] = session.last_message_at
|
|
|
|
|
if session.is_active:
|
|
|
|
|
p["is_active"] = True
|
|
|
|
|
|
|
|
|
|
return sorted(projects.values(), key=lambda x: x["last_active_at"] or datetime.min, reverse=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Config scanner
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def _read_json(path: Path) -> dict[str, Any]:
|
|
|
|
|
try:
|
|
|
|
|
with open(path, encoding="utf-8") as fh:
|
|
|
|
|
data = json.load(fh)
|
|
|
|
|
return data if isinstance(data, dict) else {}
|
|
|
|
|
except (OSError, json.JSONDecodeError):
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _read_toml(path: Path) -> dict[str, Any]:
|
|
|
|
|
try:
|
|
|
|
|
with open(path, "rb") as fh:
|
|
|
|
|
return tomllib.load(fh)
|
|
|
|
|
except (OSError, tomllib.TOMLDecodeError):
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _read_rules(path: Path) -> list[str]:
|
|
|
|
|
try:
|
|
|
|
|
return [line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip()]
|
|
|
|
|
except (OSError, PermissionError):
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def read_config() -> ClaudeConfig:
|
|
|
|
|
"""Read Claude Code and Codex CLI configuration from local files."""
|
|
|
|
|
claude_dir = Path.home() / ".claude"
|
|
|
|
|
codex_dir = Path.home() / ".codex"
|
|
|
|
|
|
|
|
|
|
# ~/.claude/settings.json
|
|
|
|
|
claude_settings = _read_json(claude_dir / "settings.json")
|
|
|
|
|
|
|
|
|
|
# ~/.codex/config.toml
|
|
|
|
|
codex_config = _read_toml(codex_dir / "config.toml")
|
|
|
|
|
|
|
|
|
|
# ~/.codex/rules/ — all .rules files
|
|
|
|
|
codex_rules: list[str] = []
|
|
|
|
|
rules_dir = codex_dir / "rules"
|
|
|
|
|
if rules_dir.exists():
|
|
|
|
|
for rules_file in sorted(rules_dir.glob("*.rules")):
|
|
|
|
|
codex_rules.extend(_read_rules(rules_file))
|
|
|
|
|
|
|
|
|
|
claude_creds = os.environ.get("CLAUDE_CREDENTIALS_PATH", "").strip() or str(claude_dir / ".credentials.json")
|
|
|
|
|
codex_creds = os.environ.get("CODEX_CREDENTIALS_PATH", "").strip() or str(codex_dir / "auth.json")
|
|
|
|
|
|
|
|
|
|
return ClaudeConfig(
|
|
|
|
|
claude_settings=claude_settings,
|
|
|
|
|
codex_config=codex_config,
|
|
|
|
|
codex_rules=codex_rules,
|
|
|
|
|
claude_credentials_path=claude_creds if Path(claude_creds).exists() else None,
|
|
|
|
|
codex_credentials_path=codex_creds if Path(codex_creds).exists() else None,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Aggregate stats helper
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def session_stats(sessions: list[ClaudeSession]) -> dict[str, Any]:
|
|
|
|
|
total_tokens = sum(s.tokens.total for s in sessions)
|
|
|
|
|
total_cost = round(sum(s.cost_usd for s in sessions), 6)
|
|
|
|
|
active = sum(1 for s in sessions if s.is_active)
|
|
|
|
|
all_models: set[str] = set()
|
|
|
|
|
for s in sessions:
|
|
|
|
|
all_models.update(s.models)
|
|
|
|
|
return {
|
|
|
|
|
"session_count": len(sessions),
|
|
|
|
|
"active_sessions": active,
|
|
|
|
|
"total_tokens": total_tokens,
|
|
|
|
|
"total_cost_usd": total_cost,
|
|
|
|
|
"models": sorted(all_models),
|
|
|
|
|
}
|