"""Provider-native usage scrapers for supplemental limit/percentage data. Design principles: - Opt-in only: all scrapers are disabled by default (USAGE_SCRAPER_ENABLED=false). - Supplemental: scraper data enriches JSONL-based metrics; it is never the primary source of truth for tokens or spend. - Isolated: each provider adapter is independent and can fail without affecting the rest of the system. - Testable: parse_claude_usage() is a pure function with no side effects. - Fragile by nature: CLI text output changes — treat results as best-effort. Tmux requirement for ClaudeTmuxScraper: The backend process must have access to the host's tmux socket. For Docker-based local installs, mount the socket: -v /tmp/tmux-1000:/tmp/tmux-1000 (or whatever $TMUX uses) For bare-metal runs, no extra config is needed. """ from __future__ import annotations import asyncio import os import re import tempfile from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path from typing import ClassVar from app.core.logging import get_logger from app.core.time import utcnow logger = get_logger(__name__) # --------------------------------------------------------------------------- # Internal result dataclass (not a schema — stays inside the service layer) # --------------------------------------------------------------------------- @dataclass class ParsedClaudeUsage: """Structured result from parsing one block of ``claude /usage`` text.""" raw_text: str current_pct: float | None = None remaining_ms: int | None = None remaining_label: str | None = None weekly_messages_used: int | None = None weekly_messages_limit: int | None = None weekly_tokens_used: int | None = None weekly_cost_usd: float | None = None error: str | None = None @dataclass class ScrapeResult: """Complete result from one scraper invocation (parsed + metadata).""" provider: str source_name: str scraped_at: datetime fresh: bool freshness_ttl_seconds: int parsed: ParsedClaudeUsage error: str | None = None # --------------------------------------------------------------------------- # Parser — pure function, no I/O # --------------------------------------------------------------------------- # Strip thousands-separator commas from a numeric string def _strip_commas(s: str) -> str: return s.replace(",", "") def _parse_int(s: str) -> int: return int(_strip_commas(s.strip())) def _parse_float(s: str) -> float: return float(_strip_commas(s.strip())) # Matches "67%" or "67.5%" anywhere on a line, with optional surrounding words _PCT_RE = re.compile( r"(?:^|[\s(])(\d+(?:\.\d+)?)\s*%", re.MULTILINE, ) # Matches "resets in", "resets at", "remaining", "time remaining", "next reset" # followed by the time expression _RESET_LINE_RE = re.compile( r"(?:resets?\s+in[:\s]*|remaining[:\s]*|time\s+remaining[:\s]*|next\s+reset[:\s]*in[:\s]*)" r"(.+?)(?:\)|$)", re.IGNORECASE | re.MULTILINE, ) # Also match "in 2h 47m" or "(resets in 2h 47m)" embedded in longer lines _INLINE_RESET_RE = re.compile( r"\(\s*resets?\s+in\s+(.+?)\s*\)", re.IGNORECASE, ) # Time components: optional days, optional hours, optional minutes _TIME_PARTS_RE = re.compile( r"(?:(\d+)\s*d(?:ay)?s?)?" # days r"\s*(?:(\d+)\s*h(?:our)?s?)?" # hours r"\s*(?:(\d+)\s*m(?:in(?:ute)?s?)?)?" # minutes r"|( tuple[int, str] | None: """Convert a time string like '2h 47m', '1 day 4h', '< 1m' to milliseconds. Returns (ms, normalised_label) or None if nothing matches. """ s = time_str.strip() if not s: return None # "< 1m" / "< 1 minute" if re.match(r"<\s*1\s*m", s, re.IGNORECASE): return 30_000, "< 1m" # represent as 30 seconds m = _TIME_PARTS_RE.search(s) if not m: return None days = int(m.group(1) or 0) hours = int(m.group(2) or 0) minutes = int(m.group(3) or 0) total_ms = (days * 86400 + hours * 3600 + minutes * 60) * 1000 if total_ms == 0: return None # Build a compact human label parts = [] if days: parts.append(f"{days}d") if hours: parts.append(f"{hours}h") if minutes: parts.append(f"{minutes}m") label = " ".join(parts) if parts else s return total_ms, label def parse_claude_usage(raw: str) -> ParsedClaudeUsage: """Parse a block of text from ``claude /usage`` into structured fields. This is a pure function — no I/O, no side effects. It never raises; on failure it sets ``error`` and leaves numeric fields as None. """ result = ParsedClaudeUsage(raw_text=raw) if not raw or not raw.strip(): result.error = "empty output" return result # ---- percentage -------------------------------------------------------- for m in _PCT_RE.finditer(raw): pct = float(m.group(1)) if 0.0 <= pct <= 100.0: result.current_pct = pct break # take the first plausible percentage # ---- remaining time ---------------------------------------------------- time_str: str | None = None # Try inline "(resets in X)" pattern first (most specific) inline = _INLINE_RESET_RE.search(raw) if inline: time_str = inline.group(1) # Fall back to line-level patterns if not time_str: line_m = _RESET_LINE_RE.search(raw) if line_m: time_str = line_m.group(1).strip() if time_str: parsed_time = _parse_remaining_ms(time_str) if parsed_time: result.remaining_ms, result.remaining_label = parsed_time # ---- weekly stats ----------------------------------------------------- # Try to find a "weekly" or "this week" section weekly_section = raw week_header = re.search( r"(?:this\s+week|weekly|week\b)[^\n]*\n(.+?)(?=\n\s*\n|\Z)", raw, re.IGNORECASE | re.DOTALL ) if week_header: weekly_section = week_header.group(0) # messages (with optional limit) msgs_m = _WEEKLY_MSGS_RE.search(weekly_section) if msgs_m: try: # group(1) = "messages: N", group(3) = "N messages" used_str = msgs_m.group(1) or msgs_m.group(3) if used_str: result.weekly_messages_used = _parse_int(used_str) if msgs_m.group(2): result.weekly_messages_limit = _parse_int(msgs_m.group(2)) except (ValueError, TypeError): pass # tokens — prefer input+output sum if both present input_m = _WEEKLY_INPUT_RE.search(weekly_section) output_m = _WEEKLY_OUTPUT_RE.search(weekly_section) if input_m and output_m: try: result.weekly_tokens_used = ( _parse_int(input_m.group(1)) + _parse_int(output_m.group(1)) ) except (ValueError, TypeError): pass elif not result.weekly_tokens_used: tok_m = _WEEKLY_TOKENS_RE.search(weekly_section) if tok_m: try: result.weekly_tokens_used = _parse_int(tok_m.group(1)) except (ValueError, TypeError): pass # Alt-key patterns: "weekly tokens: 9,876,543" anywhere in the text if result.weekly_tokens_used is None: alt_tok = re.search( r"weekly\s+tokens?[:\s]+([\d,]+)", raw, re.IGNORECASE ) if alt_tok: try: result.weekly_tokens_used = _parse_int(alt_tok.group(1)) except (ValueError, TypeError): pass # cost cost_m = _WEEKLY_COST_RE.search(weekly_section) if cost_m: try: result.weekly_cost_usd = _parse_float(cost_m.group(1)) except (ValueError, TypeError): pass # ---- validation -------------------------------------------------------- all_none = ( result.current_pct is None and result.remaining_ms is None and result.weekly_messages_used is None and result.weekly_tokens_used is None ) if all_none: result.error = "no parseable usage data found" return result # --------------------------------------------------------------------------- # Adapter interface # --------------------------------------------------------------------------- class RuntimeUsageProviderAdapter(ABC): """Abstract base for provider-native usage scrapers.""" provider: ClassVar[str] source_name: ClassVar[str] freshness_ttl_seconds: ClassVar[int] = 300 @abstractmethod async def fetch_raw(self) -> str: """Return the raw text output from the provider's usage source.""" @abstractmethod def parse(self, raw: str) -> ParsedClaudeUsage: """Parse raw text into structured fields.""" def is_available(self) -> bool: """Return True if the prerequisites for this adapter exist on this host.""" return True async def scrape(self) -> ScrapeResult: """Run fetch_raw + parse, returning a ScrapeResult regardless of errors.""" now = utcnow() try: raw = await self.fetch_raw() parsed = self.parse(raw) return ScrapeResult( provider=self.provider, source_name=self.source_name, scraped_at=now, fresh=True, freshness_ttl_seconds=self.freshness_ttl_seconds, parsed=parsed, ) except Exception as exc: logger.warning( "usage_scraper.fetch_failed provider=%s source=%s error=%s", self.provider, self.source_name, exc, ) return ScrapeResult( provider=self.provider, source_name=self.source_name, scraped_at=now, fresh=False, freshness_ttl_seconds=self.freshness_ttl_seconds, parsed=ParsedClaudeUsage(raw_text="", error=str(exc)), error=str(exc), ) # --------------------------------------------------------------------------- # Claude CLI tmux adapter # --------------------------------------------------------------------------- _TMUX_WAIT_SECONDS = 2.0 # seconds to wait after sending /usage _TMUX_CAPTURE_LINES = 80 # lines to capture from the tmux pane async def _run( *args: str, timeout: float = 5.0, ) -> tuple[str, str]: """Run a subprocess and return (stdout, stderr).""" proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace") async def _find_claude_tmux_pane(tmux_socket: str = "") -> str | None: """Find the first tmux pane running ``claude``; return pane id or None.""" base = ["tmux"] if tmux_socket: base += ["-S", tmux_socket] try: stdout, _ = await _run( *base, "list-panes", "-a", "-F", "#{pane_id}:#{pane_current_command}", ) except (FileNotFoundError, asyncio.TimeoutError, OSError): return None for line in stdout.splitlines(): parts = line.strip().split(":", 1) if len(parts) == 2: pane_id, command = parts if "claude" in command.lower(): return pane_id return None async def _tmux_send_and_capture( pane_id: str, command: str = "/usage", tmux_socket: str = "", wait_seconds: float = _TMUX_WAIT_SECONDS, ) -> str: """Send a command to a tmux pane and return the captured output.""" base = ["tmux"] if tmux_socket: base += ["-S", tmux_socket] # Clear the pane buffer so we capture fresh output await _run(*base, "clear-history", "-t", pane_id) # Send the command await _run(*base, "send-keys", "-t", pane_id, command, "Enter") # Wait for the response to render await asyncio.sleep(wait_seconds) # Capture the pane contents stdout, _ = await _run( *base, "capture-pane", "-pt", pane_id, "-J", # join wrapped lines "-e", # include escape sequences (ignored in text mode) ) # Strip ANSI escape codes ansi_re = re.compile(r"\x1b\[[0-9;]*[mGKHF]") return ansi_re.sub("", stdout) class ClaudeTmuxScraper(RuntimeUsageProviderAdapter): """Scraper that sends ``/usage`` to an active Claude tmux session. Requires: - tmux is installed and accessible from the backend process. - At least one tmux pane is running the ``claude`` CLI. - For Docker installs: mount the host tmux socket into the container. This adapter is fragile by design — CLI output format changes over time. Treat results as supplemental hints, not accounting truth. """ provider: ClassVar[str] = "anthropic" source_name: ClassVar[str] = "claude_cli_tmux" freshness_ttl_seconds: ClassVar[int] = 300 def __init__( self, tmux_socket: str = "", wait_seconds: float = _TMUX_WAIT_SECONDS, ) -> None: self.tmux_socket = tmux_socket self.wait_seconds = wait_seconds def is_available(self) -> bool: import shutil return shutil.which("tmux") is not None async def fetch_raw(self) -> str: pane_id = await _find_claude_tmux_pane(self.tmux_socket) if pane_id is None: raise RuntimeError( "No tmux pane found running 'claude'. " "Start a Claude session before enabling the tmux scraper." ) return await _tmux_send_and_capture( pane_id, tmux_socket=self.tmux_socket, wait_seconds=self.wait_seconds, ) def parse(self, raw: str) -> ParsedClaudeUsage: return parse_claude_usage(raw) # --------------------------------------------------------------------------- # Simple TTL cache # --------------------------------------------------------------------------- class UsageScraperCache: """In-memory TTL cache for scrape results. Optionally writes results to a cache directory so they survive backend restarts (useful for slow-changing subscription data). """ def __init__( self, ttl_seconds: int = 300, cache_dir: str = "", ) -> None: self.ttl_seconds = ttl_seconds self._memory: dict[str, tuple[datetime, ScrapeResult]] = {} self._cache_dir = Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "pipeline-usage-cache" def _key(self, gateway_id: str, source_name: str) -> str: return f"{gateway_id}:{source_name}" def get(self, gateway_id: str, source_name: str) -> ScrapeResult | None: key = self._key(gateway_id, source_name) entry = self._memory.get(key) if entry is None: return None cached_at, result = entry age = (utcnow() - cached_at).total_seconds() if age > self.ttl_seconds: del self._memory[key] return None return result def set(self, gateway_id: str, source_name: str, result: ScrapeResult) -> None: key = self._key(gateway_id, source_name) self._memory[key] = (utcnow(), result) # Module-level singleton cache used by the endpoint _cache = UsageScraperCache() # --------------------------------------------------------------------------- # Public entry point # --------------------------------------------------------------------------- def build_adapters( enabled_providers: list[str], tmux_socket: str = "", ) -> list[RuntimeUsageProviderAdapter]: """Instantiate enabled adapters from a list of provider names.""" registry: dict[str, type[RuntimeUsageProviderAdapter]] = { "claude_cli_tmux": ClaudeTmuxScraper, } adapters = [] for name in enabled_providers: cls = registry.get(name.strip().lower()) if cls is None: logger.warning("usage_scraper.unknown_provider name=%s", name) continue kwargs: dict = {} if name == "claude_cli_tmux": kwargs["tmux_socket"] = tmux_socket adapter = cls(**kwargs) if adapter.is_available(): adapters.append(adapter) else: logger.info("usage_scraper.not_available provider=%s", name) return adapters async def get_provider_usage( gateway_id: str, enabled_providers: list[str], tmux_socket: str = "", include_raw: bool = False, ) -> list[ScrapeResult]: """Run all enabled adapters for a gateway, using cache where fresh. Returns one ScrapeResult per adapter (errors included, never raises). """ adapters = build_adapters(enabled_providers, tmux_socket=tmux_socket) results: list[ScrapeResult] = [] for adapter in adapters: cached = _cache.get(gateway_id, adapter.source_name) if cached is not None: results.append(cached) continue result = await adapter.scrape() _cache.set(gateway_id, adapter.source_name, result) if not include_raw and result.parsed: result.parsed.raw_text = None # strip unless explicitly requested results.append(result) logger.info( "usage_scraper.result gateway_id=%s provider=%s source=%s " "pct=%s remaining_ms=%s error=%s", gateway_id, result.provider, result.source_name, result.parsed.current_pct, result.parsed.remaining_ms, result.error, ) return results