839 lines
28 KiB
Python
839 lines
28 KiB
Python
"""Provider-native usage scrapers for supplemental limit/percentage data.
|
|
|
|
Design principles:
|
|
- Opt-in only: all scrapers are disabled by default (USAGE_SCRAPER_ENABLED=false).
|
|
- Supplemental: scraper data enriches JSONL-based metrics; it is never the
|
|
primary source of truth for tokens or spend.
|
|
- Isolated: each provider adapter is independent and can fail without
|
|
affecting the rest of the system.
|
|
- Testable: parse_claude_usage() is a pure function with no side effects.
|
|
- Fragile by nature: CLI text output changes — treat results as best-effort.
|
|
|
|
Tmux requirement for ClaudeTmuxScraper:
|
|
The backend process must have access to the host's tmux socket. For
|
|
Docker-based local installs, mount the socket:
|
|
-v /tmp/tmux-1000:/tmp/tmux-1000 (or whatever $TMUX uses)
|
|
For bare-metal runs, no extra config is needed.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import re
|
|
import tempfile
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import ClassVar
|
|
|
|
from app.core.logging import get_logger
|
|
from app.core.time import utcnow
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal result dataclass (not a schema — stays inside the service layer)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class ParsedClaudeUsageWindow:
|
|
"""One parsed usage window from `claude /usage` text."""
|
|
|
|
key: str # current_session | weekly_all_models | weekly_sonnet | extra_usage
|
|
label: str
|
|
pct_used: float | None = None
|
|
remaining_ms: int | None = None
|
|
remaining_label: str | None = None
|
|
extra_text: str | None = None
|
|
source: str = "provider_native"
|
|
confidence: str = "high"
|
|
|
|
|
|
@dataclass
|
|
class ParsedClaudeUsage:
|
|
"""Structured result from parsing one block of ``claude /usage`` text."""
|
|
|
|
raw_text: str
|
|
windows: list[ParsedClaudeUsageWindow] = field(default_factory=list)
|
|
current_pct: float | None = None
|
|
remaining_ms: int | None = None
|
|
remaining_label: str | None = None
|
|
weekly_messages_used: int | None = None
|
|
weekly_messages_limit: int | None = None
|
|
weekly_tokens_used: int | None = None
|
|
weekly_cost_usd: float | None = None
|
|
error: str | None = None
|
|
|
|
# Source and confidence for the parsed data
|
|
source: str | None = None # e.g., "provider_native" or "provider_api_rate_limit"
|
|
confidence: str | None = None # e.g., "high" or "medium"
|
|
|
|
|
|
@dataclass
|
|
class ScrapeResult:
|
|
"""Complete result from one scraper invocation (parsed + metadata)."""
|
|
|
|
provider: str
|
|
source_name: str
|
|
scraped_at: datetime
|
|
fresh: bool
|
|
freshness_ttl_seconds: int
|
|
parsed: ParsedClaudeUsage
|
|
error: str | None = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parser — pure function, no I/O
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Strip thousands-separator commas from a numeric string
|
|
def _strip_commas(s: str) -> str:
|
|
return s.replace(",", "")
|
|
|
|
|
|
def _parse_int(s: str) -> int:
|
|
return int(_strip_commas(s.strip()))
|
|
|
|
|
|
def _parse_float(s: str) -> float:
|
|
return float(_strip_commas(s.strip()))
|
|
|
|
|
|
# Matches "67%" or "67.5%" anywhere on a line, with optional surrounding words
|
|
_PCT_RE = re.compile(
|
|
r"(?:^|[\s(])(\d+(?:\.\d+)?)\s*%",
|
|
re.MULTILINE,
|
|
)
|
|
|
|
# Matches "resets in", "resets at", "remaining", "time remaining", "next reset"
|
|
# followed by the time expression
|
|
_RESET_LINE_RE = re.compile(
|
|
r"(?:resets?\s+in[:\s]*|remaining[:\s]*|time\s+remaining[:\s]*|next\s+reset[:\s]*in[:\s]*)"
|
|
r"(.+?)(?:\)|$)",
|
|
re.IGNORECASE | re.MULTILINE,
|
|
)
|
|
|
|
# Also match "in 2h 47m" or "(resets in 2h 47m)" embedded in longer lines
|
|
_INLINE_RESET_RE = re.compile(
|
|
r"\(\s*resets?\s+in\s+(.+?)\s*\)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Time components: optional days, optional hours, optional minutes
|
|
_TIME_PARTS_RE = re.compile(
|
|
r"(?:(\d+)\s*d(?:ay)?s?)?" # days
|
|
r"\s*(?:(\d+)\s*h(?:our)?s?)?" # hours
|
|
r"\s*(?:(\d+)\s*m(?:in(?:ute)?s?)?)?" # minutes
|
|
r"|(<?)\s*1\s*m", # "< 1m" edge case
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Weekly/daily usage lines
|
|
_WEEKLY_MSGS_RE = re.compile(
|
|
# "messages: 234 / 500" OR "234 messages" OR "Messages: 234"
|
|
r"(?:messages?[:\s]+(\d[\d,]*)(?:\s*/\s*(\d[\d,]*))?)"
|
|
r"|(?:(\d[\d,]*)\s+messages?)",
|
|
re.IGNORECASE,
|
|
)
|
|
_WEEKLY_INPUT_RE = re.compile(r"input\s+tokens?[:\s]+([\d,]+)", re.IGNORECASE)
|
|
_WEEKLY_OUTPUT_RE = re.compile(r"output\s+tokens?[:\s]+([\d,]+)", re.IGNORECASE)
|
|
_WEEKLY_TOKENS_RE = re.compile(r"(?:total\s+)?tokens?[:\s]+([\d,]+)", re.IGNORECASE)
|
|
_WEEKLY_COST_RE = re.compile(r"\$\s*([\d]+\.[\d]{1,4})", re.IGNORECASE)
|
|
|
|
_SECTION_HEADER_CLEANUP_RE = re.compile(r"^[\s\-\*\|\u2022\u2502\u2500\u250c\u2510\u2514\u2518]+")
|
|
_WINDOW_HEADER_PATTERNS: tuple[tuple[re.Pattern[str], str, str], ...] = (
|
|
(re.compile(r"^current\s+session\b", re.IGNORECASE), "current_session", "Current session"),
|
|
(re.compile(r"^usage\s+window\b", re.IGNORECASE), "current_session", "Current session"),
|
|
(re.compile(r"^(?:this\s+week|weekly(?:\s+usage)?)\b", re.IGNORECASE), "weekly_all_models", "All models"),
|
|
(re.compile(r"^(?:all\s+models|weekly\s+all\s+models)\b", re.IGNORECASE), "weekly_all_models", "All models"),
|
|
(re.compile(r"^(?:weekly\s+sonnet|sonnet)\b", re.IGNORECASE), "weekly_sonnet", "Sonnet"),
|
|
(re.compile(r"^extra\s+usage\b", re.IGNORECASE), "extra_usage", "Extra usage"),
|
|
)
|
|
|
|
|
|
def _parse_remaining_ms(time_str: str) -> tuple[int, str] | None:
|
|
"""Convert a time string like '2h 47m', '1 day 4h', '< 1m' to milliseconds.
|
|
|
|
Returns (ms, normalised_label) or None if nothing matches.
|
|
"""
|
|
s = time_str.strip()
|
|
if not s:
|
|
return None
|
|
|
|
# "< 1m" / "< 1 minute"
|
|
if re.match(r"<\s*1\s*m", s, re.IGNORECASE):
|
|
return 30_000, "< 1m" # represent as 30 seconds
|
|
|
|
m = _TIME_PARTS_RE.search(s)
|
|
if not m:
|
|
return None
|
|
|
|
days = int(m.group(1) or 0)
|
|
hours = int(m.group(2) or 0)
|
|
minutes = int(m.group(3) or 0)
|
|
|
|
total_ms = (days * 86400 + hours * 3600 + minutes * 60) * 1000
|
|
if total_ms == 0:
|
|
return None
|
|
|
|
# Build a compact human label
|
|
parts = []
|
|
if days:
|
|
parts.append(f"{days}d")
|
|
if hours:
|
|
parts.append(f"{hours}h")
|
|
if minutes:
|
|
parts.append(f"{minutes}m")
|
|
label = " ".join(parts) if parts else s
|
|
|
|
return total_ms, label
|
|
|
|
|
|
def _extract_pct(text: str) -> float | None:
|
|
for match in _PCT_RE.finditer(text):
|
|
pct = float(match.group(1))
|
|
if 0.0 <= pct <= 100.0:
|
|
return pct
|
|
return None
|
|
|
|
|
|
def _extract_remaining(text: str) -> tuple[int, str] | None:
|
|
inline = _INLINE_RESET_RE.search(text)
|
|
if inline:
|
|
parsed = _parse_remaining_ms(inline.group(1))
|
|
if parsed:
|
|
return parsed
|
|
line_match = _RESET_LINE_RE.search(text)
|
|
if line_match:
|
|
return _parse_remaining_ms(line_match.group(1).strip())
|
|
return None
|
|
|
|
|
|
def _normalized_header(line: str) -> str:
|
|
return _SECTION_HEADER_CLEANUP_RE.sub("", line).strip()
|
|
|
|
|
|
def _detect_window_key(line: str) -> tuple[str, str] | None:
|
|
normalized = _normalized_header(line)
|
|
if not normalized:
|
|
return None
|
|
for pattern, key, label in _WINDOW_HEADER_PATTERNS:
|
|
if pattern.search(normalized):
|
|
return key, label
|
|
return None
|
|
|
|
|
|
def _split_usage_sections(raw: str) -> list[tuple[str, str, str]]:
|
|
sections: list[tuple[str, str, str]] = []
|
|
current_key: str | None = None
|
|
current_label: str | None = None
|
|
current_lines: list[str] = []
|
|
|
|
for line in raw.splitlines():
|
|
detected = _detect_window_key(line)
|
|
if detected is not None:
|
|
if current_key and current_lines:
|
|
sections.append((current_key, current_label or current_key, "\n".join(current_lines)))
|
|
current_key, current_label = detected
|
|
current_lines = [line]
|
|
continue
|
|
if current_key:
|
|
current_lines.append(line)
|
|
|
|
if current_key and current_lines:
|
|
sections.append((current_key, current_label or current_key, "\n".join(current_lines)))
|
|
|
|
return sections
|
|
|
|
|
|
def _parse_weekly_stats(result: ParsedClaudeUsage, section_text: str, raw: str) -> None:
|
|
msgs_m = _WEEKLY_MSGS_RE.search(section_text)
|
|
if msgs_m:
|
|
try:
|
|
used_str = msgs_m.group(1) or msgs_m.group(3)
|
|
if used_str:
|
|
result.weekly_messages_used = _parse_int(used_str)
|
|
if msgs_m.group(2):
|
|
result.weekly_messages_limit = _parse_int(msgs_m.group(2))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
input_m = _WEEKLY_INPUT_RE.search(section_text)
|
|
output_m = _WEEKLY_OUTPUT_RE.search(section_text)
|
|
if input_m and output_m:
|
|
try:
|
|
result.weekly_tokens_used = (
|
|
_parse_int(input_m.group(1)) + _parse_int(output_m.group(1))
|
|
)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
elif result.weekly_tokens_used is None:
|
|
tok_m = _WEEKLY_TOKENS_RE.search(section_text)
|
|
if tok_m:
|
|
try:
|
|
result.weekly_tokens_used = _parse_int(tok_m.group(1))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if result.weekly_tokens_used is None:
|
|
alt_tok = re.search(r"weekly\s+tokens?[:\s]+([\d,]+)", raw, re.IGNORECASE)
|
|
if alt_tok:
|
|
try:
|
|
result.weekly_tokens_used = _parse_int(alt_tok.group(1))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
cost_m = _WEEKLY_COST_RE.search(section_text)
|
|
if cost_m:
|
|
try:
|
|
result.weekly_cost_usd = _parse_float(cost_m.group(1))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
|
|
def parse_claude_usage(raw: str) -> ParsedClaudeUsage:
|
|
"""Parse a block of text from ``claude /usage`` into structured fields.
|
|
|
|
This is a pure function — no I/O, no side effects. It never raises;
|
|
on failure it sets ``error`` and leaves numeric fields as None.
|
|
"""
|
|
result = ParsedClaudeUsage(raw_text=raw)
|
|
|
|
if not raw or not raw.strip():
|
|
result.error = "empty output"
|
|
return result
|
|
|
|
sections = _split_usage_sections(raw)
|
|
|
|
seen_keys: set[str] = set()
|
|
for key, label, text in sections:
|
|
if key in seen_keys and key != "extra_usage":
|
|
continue
|
|
pct = _extract_pct(text)
|
|
remaining = _extract_remaining(text)
|
|
remaining_ms = remaining[0] if remaining else None
|
|
remaining_label = remaining[1] if remaining else None
|
|
cleaned_text = text.strip()
|
|
extra_text = cleaned_text[:240] if cleaned_text else None
|
|
has_metrics = pct is not None or remaining_ms is not None
|
|
if not has_metrics and key != "extra_usage":
|
|
continue
|
|
if not has_metrics and not extra_text:
|
|
continue
|
|
result.windows.append(
|
|
ParsedClaudeUsageWindow(
|
|
key=key,
|
|
label=label,
|
|
pct_used=pct,
|
|
remaining_ms=remaining_ms,
|
|
remaining_label=remaining_label,
|
|
extra_text=extra_text,
|
|
),
|
|
)
|
|
seen_keys.add(key)
|
|
|
|
# Back-compat flattened fields still exist for existing clients.
|
|
current_window = next((w for w in result.windows if w.key == "current_session"), None)
|
|
fallback_window = next(
|
|
(w for w in result.windows if w.pct_used is not None or w.remaining_ms is not None),
|
|
None,
|
|
)
|
|
primary_window = current_window or fallback_window
|
|
if primary_window:
|
|
result.current_pct = primary_window.pct_used
|
|
result.remaining_ms = primary_window.remaining_ms
|
|
result.remaining_label = primary_window.remaining_label
|
|
else:
|
|
result.current_pct = _extract_pct(raw)
|
|
fallback_remaining = _extract_remaining(raw)
|
|
if fallback_remaining:
|
|
result.remaining_ms, result.remaining_label = fallback_remaining
|
|
|
|
weekly_section_text = raw
|
|
for key, _, text in sections:
|
|
if key == "weekly_all_models":
|
|
weekly_section_text = text
|
|
break
|
|
else:
|
|
week_header = re.search(
|
|
r"(?:this\s+week|weekly|week\b)[^\n]*\n(.+?)(?=\n\s*\n|\Z)",
|
|
raw,
|
|
re.IGNORECASE | re.DOTALL,
|
|
)
|
|
if week_header:
|
|
weekly_section_text = week_header.group(0)
|
|
|
|
_parse_weekly_stats(result, weekly_section_text, raw)
|
|
|
|
# ---- validation --------------------------------------------------------
|
|
all_none = (
|
|
len(result.windows) == 0
|
|
and result.current_pct is None
|
|
and result.remaining_ms is None
|
|
and result.weekly_messages_used is None
|
|
and result.weekly_tokens_used is None
|
|
)
|
|
if all_none:
|
|
result.error = "no parseable usage data found"
|
|
|
|
compatibility_none = (
|
|
result.current_pct is None
|
|
and result.remaining_ms is None
|
|
and result.weekly_messages_used is None
|
|
and result.weekly_tokens_used is None
|
|
)
|
|
if compatibility_none and result.error is None:
|
|
result.error = "no parseable usage data found"
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Adapter interface
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class RuntimeUsageProviderAdapter(ABC):
|
|
"""Abstract base for provider-native usage scrapers."""
|
|
|
|
provider: ClassVar[str]
|
|
source_name: ClassVar[str]
|
|
freshness_ttl_seconds: ClassVar[int] = 300
|
|
|
|
@abstractmethod
|
|
async def fetch_raw(self) -> str:
|
|
"""Return the raw text output from the provider's usage source."""
|
|
|
|
@abstractmethod
|
|
def parse(self, raw: str) -> ParsedClaudeUsage:
|
|
"""Parse raw text into structured fields."""
|
|
|
|
def is_available(self) -> bool:
|
|
"""Return True if the prerequisites for this adapter exist on this host."""
|
|
return True
|
|
|
|
async def scrape(self) -> ScrapeResult:
|
|
"""Run fetch_raw + parse, returning a ScrapeResult regardless of errors."""
|
|
now = utcnow()
|
|
try:
|
|
raw = await self.fetch_raw()
|
|
parsed = self.parse(raw)
|
|
return ScrapeResult(
|
|
provider=self.provider,
|
|
source_name=self.source_name,
|
|
scraped_at=now,
|
|
fresh=True,
|
|
freshness_ttl_seconds=self.freshness_ttl_seconds,
|
|
parsed=parsed,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"usage_scraper.fetch_failed provider=%s source=%s error=%s",
|
|
self.provider, self.source_name, exc,
|
|
)
|
|
return ScrapeResult(
|
|
provider=self.provider,
|
|
source_name=self.source_name,
|
|
scraped_at=now,
|
|
fresh=False,
|
|
freshness_ttl_seconds=self.freshness_ttl_seconds,
|
|
parsed=ParsedClaudeUsage(raw_text="", error=str(exc)),
|
|
error=str(exc),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Claude CLI tmux adapter
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_TMUX_WAIT_SECONDS = 2.0 # seconds to wait after sending /usage
|
|
_TMUX_CAPTURE_LINES = 80 # lines to capture from the tmux pane
|
|
|
|
|
|
async def _run(
|
|
*args: str,
|
|
timeout: float = 5.0,
|
|
) -> tuple[str, str]:
|
|
"""Run a subprocess and return (stdout, stderr)."""
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*args,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace")
|
|
|
|
|
|
async def _find_claude_tmux_pane(tmux_socket: str = "") -> str | None:
|
|
"""Find the first tmux pane running ``claude``; return pane id or None."""
|
|
base = ["tmux"]
|
|
if tmux_socket:
|
|
base += ["-S", tmux_socket]
|
|
try:
|
|
stdout, _ = await _run(
|
|
*base,
|
|
"list-panes", "-a",
|
|
"-F", "#{pane_id}:#{pane_current_command}",
|
|
)
|
|
except (FileNotFoundError, asyncio.TimeoutError, OSError):
|
|
return None
|
|
|
|
for line in stdout.splitlines():
|
|
parts = line.strip().split(":", 1)
|
|
if len(parts) == 2:
|
|
pane_id, command = parts
|
|
if "claude" in command.lower():
|
|
return pane_id
|
|
return None
|
|
|
|
|
|
async def _tmux_send_and_capture(
|
|
pane_id: str,
|
|
command: str = "/usage",
|
|
tmux_socket: str = "",
|
|
wait_seconds: float = _TMUX_WAIT_SECONDS,
|
|
) -> str:
|
|
"""Send a command to a tmux pane and return the captured output."""
|
|
base = ["tmux"]
|
|
if tmux_socket:
|
|
base += ["-S", tmux_socket]
|
|
|
|
# Clear the pane buffer so we capture fresh output
|
|
await _run(*base, "clear-history", "-t", pane_id)
|
|
# Send the command
|
|
await _run(*base, "send-keys", "-t", pane_id, command, "Enter")
|
|
# Wait for the response to render
|
|
await asyncio.sleep(wait_seconds)
|
|
# Capture the pane contents
|
|
stdout, _ = await _run(
|
|
*base,
|
|
"capture-pane", "-pt", pane_id,
|
|
"-J", # join wrapped lines
|
|
"-e", # include escape sequences (ignored in text mode)
|
|
)
|
|
# Strip ANSI escape codes
|
|
ansi_re = re.compile(r"\x1b\[[0-9;]*[mGKHF]")
|
|
return ansi_re.sub("", stdout)
|
|
|
|
|
|
class ClaudeTmuxScraper(RuntimeUsageProviderAdapter):
|
|
"""Scraper that sends ``/usage`` to an active Claude tmux session.
|
|
|
|
Requires:
|
|
- tmux is installed and accessible from the backend process.
|
|
- At least one tmux pane is running the ``claude`` CLI.
|
|
- For Docker installs: mount the host tmux socket into the container.
|
|
|
|
This adapter is fragile by design — CLI output format changes over time.
|
|
Treat results as supplemental hints, not accounting truth.
|
|
"""
|
|
|
|
provider: ClassVar[str] = "anthropic"
|
|
source_name: ClassVar[str] = "claude_cli_tmux"
|
|
freshness_ttl_seconds: ClassVar[int] = 300
|
|
|
|
def __init__(
|
|
self,
|
|
tmux_socket: str = "",
|
|
wait_seconds: float = _TMUX_WAIT_SECONDS,
|
|
) -> None:
|
|
self.tmux_socket = tmux_socket
|
|
self.wait_seconds = wait_seconds
|
|
|
|
def is_available(self) -> bool:
|
|
import shutil
|
|
return shutil.which("tmux") is not None
|
|
|
|
async def fetch_raw(self) -> str:
|
|
pane_id = await _find_claude_tmux_pane(self.tmux_socket)
|
|
if pane_id is None:
|
|
raise RuntimeError(
|
|
"No tmux pane found running 'claude'. "
|
|
"Start a Claude session before enabling the tmux scraper."
|
|
)
|
|
return await _tmux_send_and_capture(
|
|
pane_id,
|
|
tmux_socket=self.tmux_socket,
|
|
wait_seconds=self.wait_seconds,
|
|
)
|
|
|
|
def parse(self, raw: str) -> ParsedClaudeUsage:
|
|
result = parse_claude_usage(raw)
|
|
# Tag the parsed result with source and confidence
|
|
result.source = "provider_native"
|
|
result.confidence = "high"
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Simple TTL cache
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class UsageScraperCache:
|
|
"""In-memory TTL cache for scrape results.
|
|
|
|
Optionally writes results to a cache directory so they survive
|
|
backend restarts (useful for slow-changing subscription data).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
ttl_seconds: int = 300,
|
|
cache_dir: str = "",
|
|
) -> None:
|
|
self.ttl_seconds = ttl_seconds
|
|
self._memory: dict[str, tuple[datetime, ScrapeResult]] = {}
|
|
self._cache_dir = Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "pipeline-usage-cache"
|
|
|
|
def _key(self, gateway_id: str, source_name: str) -> str:
|
|
return f"{gateway_id}:{source_name}"
|
|
|
|
def get(self, gateway_id: str, source_name: str) -> ScrapeResult | None:
|
|
key = self._key(gateway_id, source_name)
|
|
entry = self._memory.get(key)
|
|
if entry is None:
|
|
return None
|
|
cached_at, result = entry
|
|
age = (utcnow() - cached_at).total_seconds()
|
|
if age > self.ttl_seconds:
|
|
del self._memory[key]
|
|
return None
|
|
return result
|
|
|
|
def set(self, gateway_id: str, source_name: str, result: ScrapeResult) -> None:
|
|
key = self._key(gateway_id, source_name)
|
|
self._memory[key] = (utcnow(), result)
|
|
|
|
|
|
# Module-level singleton cache used by the endpoint
|
|
_cache = UsageScraperCache()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Claude Code status-line ingestion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_STATUSLINE_SOURCE_NAME = "claude_code_statusline"
|
|
_STATUSLINE_PROVIDER = "anthropic"
|
|
_STATUSLINE_WINDOW_MAP: dict[str, tuple[str, str]] = {
|
|
"five_hour": ("current_session", "Current session"),
|
|
"seven_day": ("weekly_all_models", "All models"),
|
|
}
|
|
|
|
|
|
def _as_number(value: object) -> float | None:
|
|
if isinstance(value, bool):
|
|
return None
|
|
if isinstance(value, int | float):
|
|
return float(value)
|
|
if isinstance(value, str):
|
|
try:
|
|
return float(value.strip())
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _parse_resets_at_ms(value: object, *, now: datetime) -> tuple[int | None, str | None]:
|
|
"""Parse Claude Code status-line ``resets_at`` into remaining milliseconds."""
|
|
if value is None:
|
|
return None, None
|
|
|
|
reset_dt: datetime | None = None
|
|
numeric = _as_number(value)
|
|
if numeric is not None:
|
|
# Claude documents Unix epoch seconds. Be defensive if a future build
|
|
# ever sends milliseconds.
|
|
if numeric > 10_000_000_000:
|
|
numeric = numeric / 1000
|
|
reset_dt = datetime.fromtimestamp(numeric, tz=UTC)
|
|
elif isinstance(value, str):
|
|
text = value.strip()
|
|
if text:
|
|
try:
|
|
reset_dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
reset_dt = None
|
|
|
|
if reset_dt is None:
|
|
return None, None
|
|
if reset_dt.tzinfo is None:
|
|
reset_dt = reset_dt.replace(tzinfo=UTC)
|
|
|
|
remaining_ms = max(0, int((reset_dt - now).total_seconds() * 1000))
|
|
return remaining_ms, _format_remaining_ms(remaining_ms)
|
|
|
|
|
|
def _format_remaining_ms(ms: int) -> str:
|
|
seconds = max(0, int(round(ms / 1000)))
|
|
days, remainder = divmod(seconds, 86_400)
|
|
hours, remainder = divmod(remainder, 3_600)
|
|
minutes = remainder // 60
|
|
parts: list[str] = []
|
|
if days:
|
|
parts.append(f"{days}d")
|
|
if hours:
|
|
parts.append(f"{hours}h")
|
|
if minutes:
|
|
parts.append(f"{minutes}m")
|
|
if not parts:
|
|
return "< 1m" if seconds > 0 else "now"
|
|
return " ".join(parts[:2])
|
|
|
|
|
|
def parse_claude_statusline_usage(
|
|
payload: dict[str, object],
|
|
*,
|
|
now: datetime | None = None,
|
|
) -> ParsedClaudeUsage:
|
|
"""Parse Claude Code status-line JSON into provider-native usage windows.
|
|
|
|
Official Claude Code status-line input exposes ``rate_limits.five_hour`` and
|
|
``rate_limits.seven_day`` for Claude.ai subscribers. This is more stable
|
|
than screen-scraping ``/usage`` and should be preferred when available.
|
|
"""
|
|
checked_at = now or utcnow()
|
|
if checked_at.tzinfo is None:
|
|
checked_at = checked_at.replace(tzinfo=UTC)
|
|
result = ParsedClaudeUsage(raw_text="")
|
|
result.source = "provider_native"
|
|
result.confidence = "high"
|
|
|
|
rate_limits = payload.get("rate_limits")
|
|
if not isinstance(rate_limits, dict):
|
|
result.error = "status-line payload did not include rate_limits"
|
|
return result
|
|
|
|
for upstream_key, upstream_value in rate_limits.items():
|
|
if not isinstance(upstream_value, dict):
|
|
continue
|
|
key, label = _STATUSLINE_WINDOW_MAP.get(
|
|
str(upstream_key),
|
|
(str(upstream_key), str(upstream_key).replace("_", " ").title()),
|
|
)
|
|
pct = _as_number(upstream_value.get("used_percentage"))
|
|
if pct is not None:
|
|
pct = max(0.0, min(100.0, pct))
|
|
remaining_ms, remaining_label = _parse_resets_at_ms(
|
|
upstream_value.get("resets_at"),
|
|
now=checked_at,
|
|
)
|
|
if pct is None and remaining_ms is None:
|
|
continue
|
|
result.windows.append(
|
|
ParsedClaudeUsageWindow(
|
|
key=key,
|
|
label=label,
|
|
pct_used=pct,
|
|
remaining_ms=remaining_ms,
|
|
remaining_label=remaining_label,
|
|
source="provider_native",
|
|
confidence="high",
|
|
),
|
|
)
|
|
|
|
current_window = next((w for w in result.windows if w.key == "current_session"), None)
|
|
primary_window = current_window or next(iter(result.windows), None)
|
|
if primary_window:
|
|
result.current_pct = primary_window.pct_used
|
|
result.remaining_ms = primary_window.remaining_ms
|
|
result.remaining_label = primary_window.remaining_label
|
|
else:
|
|
result.error = "status-line payload had no parseable rate_limits windows"
|
|
return result
|
|
|
|
|
|
def store_claude_statusline_usage(
|
|
gateway_id: str,
|
|
payload: dict[str, object],
|
|
) -> ScrapeResult:
|
|
"""Store the latest Claude Code status-line usage snapshot for a gateway."""
|
|
now = utcnow()
|
|
parsed = parse_claude_statusline_usage(payload, now=now)
|
|
result = ScrapeResult(
|
|
provider=_STATUSLINE_PROVIDER,
|
|
source_name=_STATUSLINE_SOURCE_NAME,
|
|
scraped_at=now,
|
|
fresh=parsed.error is None,
|
|
freshness_ttl_seconds=_cache.ttl_seconds,
|
|
parsed=parsed,
|
|
error=parsed.error,
|
|
)
|
|
_cache.set(gateway_id, _STATUSLINE_SOURCE_NAME, result)
|
|
logger.info(
|
|
"usage_statusline.ingested gateway_id=%s windows=%s error=%s",
|
|
gateway_id,
|
|
len(parsed.windows),
|
|
parsed.error,
|
|
)
|
|
return result
|
|
|
|
|
|
def get_cached_claude_statusline_usage(gateway_id: str) -> ScrapeResult | None:
|
|
"""Return the latest fresh Claude Code status-line snapshot, if any."""
|
|
return _cache.get(gateway_id, _STATUSLINE_SOURCE_NAME)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_adapters(
|
|
enabled_providers: list[str],
|
|
tmux_socket: str = "",
|
|
) -> list[RuntimeUsageProviderAdapter]:
|
|
"""Instantiate enabled adapters from a list of provider names."""
|
|
registry: dict[str, type[RuntimeUsageProviderAdapter]] = {
|
|
"claude_cli_tmux": ClaudeTmuxScraper,
|
|
}
|
|
adapters = []
|
|
for name in enabled_providers:
|
|
cls = registry.get(name.strip().lower())
|
|
if cls is None:
|
|
logger.warning("usage_scraper.unknown_provider name=%s", name)
|
|
continue
|
|
kwargs: dict = {}
|
|
if name == "claude_cli_tmux":
|
|
kwargs["tmux_socket"] = tmux_socket
|
|
adapter = cls(**kwargs)
|
|
if adapter.is_available():
|
|
adapters.append(adapter)
|
|
else:
|
|
logger.info("usage_scraper.not_available provider=%s", name)
|
|
return adapters
|
|
|
|
|
|
async def get_provider_usage(
|
|
gateway_id: str,
|
|
enabled_providers: list[str],
|
|
tmux_socket: str = "",
|
|
include_raw: bool = False,
|
|
) -> list[ScrapeResult]:
|
|
"""Run all enabled adapters for a gateway, using cache where fresh.
|
|
|
|
Returns one ScrapeResult per adapter (errors included, never raises).
|
|
"""
|
|
adapters = build_adapters(enabled_providers, tmux_socket=tmux_socket)
|
|
results: list[ScrapeResult] = []
|
|
for adapter in adapters:
|
|
cached = _cache.get(gateway_id, adapter.source_name)
|
|
if cached is not None:
|
|
results.append(cached)
|
|
continue
|
|
result = await adapter.scrape()
|
|
_cache.set(gateway_id, adapter.source_name, result)
|
|
if not include_raw and result.parsed:
|
|
result.parsed.raw_text = None # strip unless explicitly requested
|
|
results.append(result)
|
|
logger.info(
|
|
"usage_scraper.result gateway_id=%s provider=%s source=%s "
|
|
"pct=%s remaining_ms=%s error=%s",
|
|
gateway_id,
|
|
result.provider,
|
|
result.source_name,
|
|
result.parsed.current_pct,
|
|
result.parsed.remaining_ms,
|
|
result.error,
|
|
)
|
|
return results
|