Claude Code: tool_use blocks only

This commit is contained in:
null 2026-05-25 13:22:36 -05:00
parent 00bae96490
commit b893941d71
1 changed files with 56 additions and 56 deletions

View File

@ -404,38 +404,52 @@ def _extract_claude_ticker_items(path: Path, cutoff: datetime) -> list[ActivityT
tool_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "tool_use"] tool_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "tool_use"]
text_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "text"] text_blocks = [b for b in content if isinstance(b, dict) and b.get("type") == "text"]
if tool_blocks: for block in tool_blocks:
# Emit one ticker item per tool call (most informative for live activity) tool_name = str(block.get("name") or "")
for block in tool_blocks: inp = block.get("input") or {}
tool_name = str(block.get("name") or "") if not isinstance(inp, dict):
inp = block.get("input") or {} inp = {}
if not isinstance(inp, dict): label = _format_tool_status(tool_name, inp)
inp = {} items.append(ActivityTickerItem(
label = _format_tool_status(tool_name, inp) id=uuid4(),
items.append(ActivityTickerItem( source="Claude Code",
id=uuid4(), message=label,
source="Claude Code", created_at=ts,
message=label, ))
created_at=ts,
))
elif text_blocks:
# Text-only turn (no tools used) — show the response text
text = " ".join(
b.get("text", "").strip() for b in text_blocks if b.get("text", "").strip()
).strip()
if text:
items.append(ActivityTickerItem(
id=uuid4(),
source="Claude Code",
message=text[:200],
created_at=ts,
))
return items return items
def _format_codex_tool(payload: dict[str, Any]) -> str | None:
"""Return a human-readable label for a Codex tool call record, or None to skip."""
ptype = payload.get("type")
name = str(payload.get("name") or "")
if ptype == "function_call":
raw_args = payload.get("arguments") or "{}"
try:
args = json.loads(raw_args) if isinstance(raw_args, str) else raw_args
except json.JSONDecodeError:
args = {}
if name == "exec_command":
cmd = str(args.get("cmd", "")).strip()
if not cmd:
return None
return f"Running: {cmd[:_BASH_CMD_MAX]}" if len(cmd) > _BASH_CMD_MAX else f"Running: {cmd}"
if name in ("write_stdin", "read_stdout", "create_session", "delete_session"):
return None # internal plumbing, not meaningful
return f"Using {name}"
if ptype == "custom_tool_call":
if name == "apply_patch":
return "Applying code patch"
return f"Using {name}"
return None
def _extract_codex_ticker_items(path: Path, cutoff: datetime) -> list[ActivityTickerItem]: def _extract_codex_ticker_items(path: Path, cutoff: datetime) -> list[ActivityTickerItem]:
items: list[ActivityTickerItem] = [] items: list[ActivityTickerItem] = []
seen_turn_ids: set[str] = set() seen_call_ids: set[str] = set()
for line in _tail_lines(path): for line in _tail_lines(path):
if not line.strip(): if not line.strip():
continue continue
@ -443,40 +457,26 @@ def _extract_codex_ticker_items(path: Path, cutoff: datetime) -> list[ActivityTi
rec = json.loads(line) rec = json.loads(line)
except json.JSONDecodeError: except json.JSONDecodeError:
continue continue
if rec.get("type") != "response_item":
continue
ts = _parse_ts(rec.get("timestamp")) ts = _parse_ts(rec.get("timestamp"))
if ts is None or ts < cutoff: if ts is None or ts < cutoff:
continue continue
payload = rec.get("payload") or {} payload = rec.get("payload") or {}
rec_type = rec.get("type") call_id = str(payload.get("call_id") or "")
if call_id:
if rec_type == "event_msg" and payload.get("type") == "task_complete": if call_id in seen_call_ids:
turn_id = payload.get("turn_id") or ""
if turn_id and turn_id in seen_turn_ids:
continue continue
if turn_id: seen_call_ids.add(call_id)
seen_turn_ids.add(turn_id) label = _format_codex_tool(payload)
text = (payload.get("last_agent_message") or "").strip() if not label:
if text: continue
items.append(ActivityTickerItem( items.append(ActivityTickerItem(
id=uuid4(), id=uuid4(),
source="Codex", source="Codex",
message=text[:200], message=label,
created_at=ts, created_at=ts,
)) ))
elif rec_type == "response_item" and payload.get("role") == "assistant":
content = payload.get("content") or []
text = " ".join(
b.get("text", "").strip()
for b in content
if isinstance(b, dict) and b.get("type") == "output_text" and b.get("text", "").strip()
).strip()
if text:
items.append(ActivityTickerItem(
id=uuid4(),
source="Codex",
message=text[:200],
created_at=ts,
))
return items return items