diff --git a/backend/app/api/claude_code.py b/backend/app/api/claude_code.py index 96f45c5..7189b83 100644 --- a/backend/app/api/claude_code.py +++ b/backend/app/api/claude_code.py @@ -31,7 +31,13 @@ from app.schemas.claude_code import ( ClaudeSessionListResponse, ClaudeSessionRead, ClaudeSessionStatsRead, + SessionMessagesResponse, + SessionMessage, SessionTokensRead as ClaudeSessionTokensRead, + SessionTokenUsageRead, + TextBlock, + ThinkingBlock, + ToolUseBlock, ) from app.services import claude_code_reader as reader from app.services.organizations import OrganizationContext @@ -122,6 +128,66 @@ async def get_session( return _session_to_read(session) +@router.get( + "/sessions/{session_id}/messages", + response_model=SessionMessagesResponse, + summary="Get conversation messages for a Claude Code session", + description=( + "Parses the raw JSONL file for a session and returns the full conversation " + "thread: user turns, assistant text/thinking blocks, and tool calls with " + "embedded results. Tool-result-only user turns are suppressed. " + "Duplicate assistant message records (streaming artefacts) are merged." + ), +) +async def get_session_messages( + session_id: str, + limit: int = Query(200, ge=1, le=500, description="Max messages to return"), + offset: int = Query(0, ge=0, description="Number of messages to skip"), + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> SessionMessagesResponse: + """Return the conversation thread for a single Claude Code session.""" + result = await asyncio.to_thread(reader.get_session_messages, session_id, limit, offset) + if result is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") + + messages, total = result + + def _to_schema(m: reader.ParsedMessage) -> SessionMessage: + return SessionMessage( + uuid=m.uuid, + role=m.role, + timestamp=m.timestamp, + text_blocks=[TextBlock(text=b.text, truncated=b.truncated) for b in m.text_blocks], + thinking_blocks=[ThinkingBlock(text=b.text, truncated=b.truncated) for b in m.thinking_blocks], + tool_uses=[ + ToolUseBlock( + tool_use_id=t.tool_use_id, + tool_name=t.tool_name, + input=t.input, + input_truncated=t.input_truncated, + result=t.result, + result_truncated=t.result_truncated, + is_error=t.is_error, + ) + for t in m.tool_uses + ], + model=m.model, + tokens=SessionTokenUsageRead( + input=m.tokens.input, + output=m.tokens.output, + cache_read=m.tokens.cache_read, + cache_write=m.tokens.cache_write, + ) if m.tokens else None, + ) + + return SessionMessagesResponse( + session_id=session_id, + messages=[_to_schema(m) for m in messages], + total=total, + has_more=(offset + limit) < total, + ) + + @router.get( "/projects", response_model=list[ClaudeProjectRead], diff --git a/backend/app/schemas/claude_code.py b/backend/app/schemas/claude_code.py index 4b7d0ab..1d0f8f4 100644 --- a/backend/app/schemas/claude_code.py +++ b/backend/app/schemas/claude_code.py @@ -8,6 +8,55 @@ from typing import Any from sqlmodel import SQLModel +# ── Session messages ────────────────────────────────────────────────────────── + +class TextBlock(SQLModel): + text: str + truncated: bool + + +class ThinkingBlock(SQLModel): + text: str + truncated: bool + + +class ToolUseBlock(SQLModel): + tool_use_id: str + tool_name: str + input: dict[str, Any] + input_truncated: bool + result: str | None = None + result_truncated: bool = False + is_error: bool = False + + +class SessionTokenUsageRead(SQLModel): + input: int + output: int + cache_read: int + cache_write: int + + +class SessionMessage(SQLModel): + uuid: str + role: str # "user" | "assistant" + timestamp: datetime | None = None + text_blocks: list[TextBlock] + thinking_blocks: list[ThinkingBlock] + tool_uses: list[ToolUseBlock] + model: str | None = None + tokens: SessionTokenUsageRead | None = None + + +class SessionMessagesResponse(SQLModel): + session_id: str + messages: list[SessionMessage] + total: int + has_more: bool + + +# ── Session token totals (used in list/detail) ──────────────────────────────── + class SessionTokensRead(SQLModel): input: int output: int diff --git a/backend/app/services/claude_code_reader.py b/backend/app/services/claude_code_reader.py index 435201f..311aa2a 100644 --- a/backend/app/services/claude_code_reader.py +++ b/backend/app/services/claude_code_reader.py @@ -96,6 +96,49 @@ class ClaudeSession: version: str | None +@dataclass +class SessionTextBlock: + text: str + truncated: bool + + +@dataclass +class SessionThinkingBlock: + text: str + truncated: bool + + +@dataclass +class SessionToolUse: + tool_use_id: str + tool_name: str + input: dict[str, Any] + input_truncated: bool + result: str | None + result_truncated: bool + is_error: bool + + +@dataclass +class SessionTokenUsage: + input: int + output: int + cache_read: int + cache_write: int + + +@dataclass +class ParsedMessage: + uuid: str + role: str # "user" | "assistant" + timestamp: datetime | None + text_blocks: list[SessionTextBlock] + thinking_blocks: list[SessionThinkingBlock] + tool_uses: list[SessionToolUse] + model: str | None + tokens: SessionTokenUsage | None + + @dataclass class ClaudeConfig: claude_settings: dict[str, Any] = field(default_factory=dict) @@ -255,6 +298,228 @@ def get_session(session_id: str) -> ClaudeSession | None: return None +# --------------------------------------------------------------------------- +# Session message reader +# --------------------------------------------------------------------------- + +_CONTENT_TRUNCATE = 4000 +_INPUT_VALUE_TRUNCATE = 2000 + + +def _trunc(text: str, limit: int = _CONTENT_TRUNCATE) -> tuple[str, bool]: + if len(text) <= limit: + return text, False + return text[:limit], True + + +def _trunc_input(input_dict: dict[str, Any]) -> tuple[dict[str, Any], bool]: + """Truncate long string values inside a tool input dict.""" + truncated = False + result: dict[str, Any] = {} + for k, v in input_dict.items(): + if isinstance(v, str) and len(v) > _INPUT_VALUE_TRUNCATE: + result[k] = v[:_INPUT_VALUE_TRUNCATE] + truncated = True + else: + result[k] = v + return result, truncated + + +def _extract_tool_result_text(raw: Any) -> str: + """Normalise a tool_result content field to plain text.""" + if isinstance(raw, str): + return raw + if isinstance(raw, list): + return "\n".join( + b.get("text", "") for b in raw if isinstance(b, dict) and b.get("type") == "text" + ) + return str(raw) if raw is not None else "" + + +def get_session_messages( + session_id: str, + limit: int = 200, + offset: int = 0, +) -> tuple[list[ParsedMessage], int] | None: + """Parse the full conversation from a session JSONL file. + + Returns (messages[offset:offset+limit], total) or None if not found. + Tool results are embedded in the tool_use blocks of the preceding + assistant message. User records that contain only tool_results are + suppressed from the returned list. + """ + root = _projects_dir() + if not root.exists(): + return None + + path: Path | None = None + for p in root.rglob(f"{session_id}.jsonl"): + path = p + break + if path is None: + return None + + # tool_use_id -> (result_text, is_error, truncated) + tool_results: dict[str, tuple[str, bool, bool]] = {} + + # Insertion-ordered list of "role:key" to preserve conversation order. + # For assistant messages the key is message.id (deduplication handle). + # For user messages the key is their uuid. + message_order: list[str] = [] + + # Accumulated data per assistant message.id + assistant_acc: dict[str, dict[str, Any]] = {} + + # User message data keyed by uuid + user_acc: dict[str, dict[str, Any]] = {} + + try: + with open(path, encoding="utf-8", errors="replace") as fh: + for raw_line in fh: + raw_line = raw_line.strip() + if not raw_line: + continue + try: + rec = json.loads(raw_line) + except json.JSONDecodeError: + continue + + if rec.get("isSidechain"): + continue + + rec_type = rec.get("type") + if rec_type not in ("user", "assistant"): + continue + + ts = _parse_iso(rec.get("timestamp")) + uuid = rec.get("uuid", "") + + if rec_type == "user": + content = rec.get("message", {}).get("content") or [] + if not isinstance(content, list): + continue + + # Collect tool results for later embedding + for block in content: + if block.get("type") == "tool_result": + tid = block.get("tool_use_id", "") + text = _extract_tool_result_text(block.get("content", "")) + t, trunc = _trunc(text) + tool_results[tid] = (t, bool(block.get("is_error", False)), trunc) + + # Only surface user records that carry visible text + has_text = any(b.get("type") == "text" for b in content) + if has_text and uuid not in user_acc: + user_acc[uuid] = {"ts": ts, "content": content} + message_order.append(f"user:{uuid}") + + else: # assistant + msg = rec.get("message") or {} + msg_id = msg.get("id") or uuid + content = msg.get("content") if isinstance(msg.get("content"), list) else [] + + if msg_id not in assistant_acc: + assistant_acc[msg_id] = { + "uuid": uuid, + "ts": ts, + "model": msg.get("model"), + "usage": msg.get("usage") or {}, + "blocks": [], + "seen_block_ids": set(), + } + message_order.append(f"assistant:{msg_id}") + + acc = assistant_acc[msg_id] + for block in content: + bid = block.get("id") + if bid: + if bid in acc["seen_block_ids"]: + continue + acc["seen_block_ids"].add(bid) + acc["blocks"].append(block) + + except (OSError, PermissionError) as exc: + logger.debug("claude_code_reader.messages_read_error path=%s error=%s", path, exc) + return None + + # Build the final parsed list + parsed: list[ParsedMessage] = [] + + for key in message_order: + role, key_id = key.split(":", 1) + + if role == "user": + data = user_acc[key_id] + text_blocks: list[SessionTextBlock] = [] + for block in data["content"]: + if block.get("type") == "text": + text, trunc = _trunc(block.get("text", "")) + if text.strip(): + text_blocks.append(SessionTextBlock(text=text, truncated=trunc)) + if not text_blocks: + continue + parsed.append(ParsedMessage( + uuid=key_id, + role="user", + timestamp=data["ts"], + text_blocks=text_blocks, + thinking_blocks=[], + tool_uses=[], + model=None, + tokens=None, + )) + + else: # assistant + data = assistant_acc[key_id] + text_blocks = [] + thinking_blocks: list[SessionThinkingBlock] = [] + tool_uses: list[SessionToolUse] = [] + + for block in data["blocks"]: + btype = block.get("type") + if btype == "text": + text, trunc = _trunc(block.get("text", "")) + if text: + text_blocks.append(SessionTextBlock(text=text, truncated=trunc)) + elif btype == "thinking": + text, trunc = _trunc(block.get("thinking", "")) + if text: + thinking_blocks.append(SessionThinkingBlock(text=text, truncated=trunc)) + elif btype == "tool_use": + tid = block.get("id", "") + inp, inp_trunc = _trunc_input(block.get("input") or {}) + result = tool_results.get(tid) + tool_uses.append(SessionToolUse( + tool_use_id=tid, + tool_name=block.get("name", "unknown"), + input=inp, + input_truncated=inp_trunc, + result=result[0] if result else None, + result_truncated=result[2] if result else False, + is_error=result[1] if result else False, + )) + + usage = data["usage"] + parsed.append(ParsedMessage( + uuid=data["uuid"], + role="assistant", + timestamp=data["ts"], + text_blocks=text_blocks, + thinking_blocks=thinking_blocks, + tool_uses=tool_uses, + model=data["model"], + tokens=SessionTokenUsage( + input=usage.get("input_tokens", 0), + output=usage.get("output_tokens", 0), + cache_read=usage.get("cache_read_input_tokens", 0), + cache_write=usage.get("cache_creation_input_tokens", 0), + ), + )) + + total = len(parsed) + return parsed[offset : offset + limit], total + + def list_projects() -> list[dict[str, Any]]: """Return discovered projects with aggregate stats.""" root = _projects_dir()