From 1a847133ce976a3e30498eae183e2f5d9f4fb2c4 Mon Sep 17 00:00:00 2001 From: null Date: Wed, 20 May 2026 20:55:05 -0500 Subject: [PATCH] feat(runtime-usage): add provider usage scrapers as optional local adapters (batch 3, #32) --- backend/app/api/gateways.py | 82 ++- backend/app/core/config.py | 15 + backend/app/schemas/runtime_usage.py | 36 ++ .../app/services/openclaw/usage_scrapers.py | 550 ++++++++++++++++++ backend/tests/test_runtime_usage_scrapers.py | 289 +++++++++ 5 files changed, 971 insertions(+), 1 deletion(-) create mode 100644 backend/app/services/openclaw/usage_scrapers.py create mode 100644 backend/tests/test_runtime_usage_scrapers.py diff --git a/backend/app/api/gateways.py b/backend/app/api/gateways.py index 5c22e08..a1a969d 100644 --- a/backend/app/api/gateways.py +++ b/backend/app/api/gateways.py @@ -10,6 +10,8 @@ from sqlmodel import col from app.api.deps import require_org_admin, require_org_member from app.core.auth import AuthContext, get_auth_context +from app.core.config import settings +from app.core.time import utcnow from app.db import crud from app.db.pagination import paginate from app.db.session import get_session @@ -26,8 +28,13 @@ from app.schemas.gateways import ( GatewayTemplatesSyncResult, GatewayUpdate, ) -from app.schemas.runtime_usage import RuntimeUsageResponse +from app.schemas.runtime_usage import ( + ProviderUsageResponse, + ProviderUsageScrapeResult, + RuntimeUsageResponse, +) from app.services.openclaw.runtime_usage import get_runtime_usage +from app.services.openclaw.usage_scrapers import get_provider_usage from app.schemas.pagination import DefaultLimitOffsetPage from app.services.openclaw.admin_service import GatewayAdminLifecycleService from app.services.openclaw.session_service import GatewayTemplateSyncQuery @@ -272,6 +279,79 @@ async def get_gateway_runtime_usage( ) +@router.get( + "/{gateway_id}/provider-usage", + response_model=ProviderUsageResponse, + summary="Gateway provider-native usage (opt-in scraper)", + description=( + "Return provider-native subscription usage data scraped from the CLI " + "(e.g. ``claude /usage``). Returns an empty results list when " + "USAGE_SCRAPER_ENABLED=false (the default). " + "Enable with USAGE_SCRAPER_ENABLED=true and ensure the required " + "prerequisites (tmux, claude binary) are accessible from the backend process." + ), +) +async def get_gateway_provider_usage( + gateway_id: UUID, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> ProviderUsageResponse: + """Scrape provider-native usage for the specified gateway (opt-in).""" + service = GatewayAdminLifecycleService(session) + await service.require_gateway( + gateway_id=gateway_id, + organization_id=ctx.organization.id, + ) + now = utcnow() + + if not settings.usage_scraper_enabled: + return ProviderUsageResponse( + gateway_id=gateway_id, + generated_at=now, + scraper_enabled=False, + results=[], + ) + + enabled_providers = [ + p.strip() + for p in settings.usage_scraper_providers.split(",") + if p.strip() + ] + scrape_results = await get_provider_usage( + gateway_id=str(gateway_id), + enabled_providers=enabled_providers, + tmux_socket=settings.usage_scraper_tmux_socket, + include_raw=settings.usage_scraper_include_raw, + ) + + results = [ + ProviderUsageScrapeResult( + provider=r.provider, + source_name=r.source_name, + scraped_at=r.scraped_at, + fresh=r.fresh, + freshness_ttl_seconds=r.freshness_ttl_seconds, + current_pct=r.parsed.current_pct, + remaining_ms=r.parsed.remaining_ms, + remaining_label=r.parsed.remaining_label, + weekly_messages_used=r.parsed.weekly_messages_used, + weekly_messages_limit=r.parsed.weekly_messages_limit, + weekly_tokens_used=r.parsed.weekly_tokens_used, + weekly_cost_usd=r.parsed.weekly_cost_usd, + raw_text=r.parsed.raw_text, + error=r.error or r.parsed.error, + ) + for r in scrape_results + ] + + return ProviderUsageResponse( + gateway_id=gateway_id, + generated_at=now, + scraper_enabled=True, + results=results, + ) + + @router.delete("/{gateway_id}", response_model=OkResponse) async def delete_gateway( gateway_id: UUID, diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 184dfe8..8ef21a5 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -89,6 +89,21 @@ class Settings(BaseSettings): # OpenClaw gateway runtime compatibility gateway_min_version: str = "2026.02.9" + # Provider usage scrapers (opt-in; off by default) + usage_scraper_enabled: bool = False + # Comma-separated list of enabled provider adapters, e.g. "claude_cli_tmux" + usage_scraper_providers: str = "claude_cli_tmux" + # TTL in seconds before a cached scrape result is considered stale + usage_scraper_cache_ttl_seconds: int = 300 + # Directory for scraper output cache files (defaults to system temp) + usage_scraper_cache_dir: str = "" + # Full path to the claude binary (empty = auto-detect on PATH) + usage_scraper_claude_bin: str = "" + # Tmux socket path (empty = default tmux socket) + usage_scraper_tmux_socket: str = "" + # Include raw CLI output in API response (useful for debugging; off by default) + usage_scraper_include_raw: bool = False + # Logging log_level: str = "INFO" log_format: str = "text" diff --git a/backend/app/schemas/runtime_usage.py b/backend/app/schemas/runtime_usage.py index 74c976e..d378cd9 100644 --- a/backend/app/schemas/runtime_usage.py +++ b/backend/app/schemas/runtime_usage.py @@ -72,6 +72,42 @@ class TopSession(SQLModel): updated_at: str | None = None +class ProviderUsageScrapeResult(SQLModel): + """Structured result from one provider-native usage scrape (e.g. Claude CLI /usage). + + Returned by GET /gateways/{id}/provider-usage. + All fields are optional — partial data is still useful and expected + when CLI output format changes or the session is quiet. + """ + + provider: str # "anthropic", "openai", "google" + source_name: str # "claude_cli_tmux", "gemini_scrape", etc. + scraped_at: datetime + fresh: bool # True if within the freshness window + freshness_ttl_seconds: int + + current_pct: float | None = None # 0–100 % of current window used + remaining_ms: int | None = None # ms until window resets + remaining_label: str | None = None # human-readable "2h 47m" + + weekly_messages_used: int | None = None + weekly_messages_limit: int | None = None + weekly_tokens_used: int | None = None + weekly_cost_usd: float | None = None + + raw_text: str | None = None # included when DEBUG_SCRAPER_RAW=true + error: str | None = None # set when scrape or parse failed + + +class ProviderUsageResponse(SQLModel): + """Response envelope for GET /gateways/{id}/provider-usage.""" + + gateway_id: UUID + generated_at: datetime + scraper_enabled: bool + results: list[ProviderUsageScrapeResult] + + class RuntimeUsageResponse(SQLModel): """Complete runtime usage payload returned by GET /gateways/{id}/runtime-usage.""" diff --git a/backend/app/services/openclaw/usage_scrapers.py b/backend/app/services/openclaw/usage_scrapers.py new file mode 100644 index 0000000..d5c00f6 --- /dev/null +++ b/backend/app/services/openclaw/usage_scrapers.py @@ -0,0 +1,550 @@ +"""Provider-native usage scrapers for supplemental limit/percentage data. + +Design principles: +- Opt-in only: all scrapers are disabled by default (USAGE_SCRAPER_ENABLED=false). +- Supplemental: scraper data enriches JSONL-based metrics; it is never the + primary source of truth for tokens or spend. +- Isolated: each provider adapter is independent and can fail without + affecting the rest of the system. +- Testable: parse_claude_usage() is a pure function with no side effects. +- Fragile by nature: CLI text output changes — treat results as best-effort. + +Tmux requirement for ClaudeTmuxScraper: + The backend process must have access to the host's tmux socket. For + Docker-based local installs, mount the socket: + -v /tmp/tmux-1000:/tmp/tmux-1000 (or whatever $TMUX uses) + For bare-metal runs, no extra config is needed. +""" + +from __future__ import annotations + +import asyncio +import os +import re +import tempfile +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from pathlib import Path +from typing import ClassVar + +from app.core.logging import get_logger +from app.core.time import utcnow + +logger = get_logger(__name__) + +# --------------------------------------------------------------------------- +# Internal result dataclass (not a schema — stays inside the service layer) +# --------------------------------------------------------------------------- + +@dataclass +class ParsedClaudeUsage: + """Structured result from parsing one block of ``claude /usage`` text.""" + + raw_text: str + current_pct: float | None = None + remaining_ms: int | None = None + remaining_label: str | None = None + weekly_messages_used: int | None = None + weekly_messages_limit: int | None = None + weekly_tokens_used: int | None = None + weekly_cost_usd: float | None = None + error: str | None = None + + +@dataclass +class ScrapeResult: + """Complete result from one scraper invocation (parsed + metadata).""" + + provider: str + source_name: str + scraped_at: datetime + fresh: bool + freshness_ttl_seconds: int + parsed: ParsedClaudeUsage + error: str | None = None + + +# --------------------------------------------------------------------------- +# Parser — pure function, no I/O +# --------------------------------------------------------------------------- + +# Strip thousands-separator commas from a numeric string +def _strip_commas(s: str) -> str: + return s.replace(",", "") + + +def _parse_int(s: str) -> int: + return int(_strip_commas(s.strip())) + + +def _parse_float(s: str) -> float: + return float(_strip_commas(s.strip())) + + +# Matches "67%" or "67.5%" anywhere on a line, with optional surrounding words +_PCT_RE = re.compile( + r"(?:^|[\s(])(\d+(?:\.\d+)?)\s*%", + re.MULTILINE, +) + +# Matches "resets in", "resets at", "remaining", "time remaining", "next reset" +# followed by the time expression +_RESET_LINE_RE = re.compile( + r"(?:resets?\s+in[:\s]*|remaining[:\s]*|time\s+remaining[:\s]*|next\s+reset[:\s]*in[:\s]*)" + r"(.+?)(?:\)|$)", + re.IGNORECASE | re.MULTILINE, +) + +# Also match "in 2h 47m" or "(resets in 2h 47m)" embedded in longer lines +_INLINE_RESET_RE = re.compile( + r"\(\s*resets?\s+in\s+(.+?)\s*\)", + re.IGNORECASE, +) + +# Time components: optional days, optional hours, optional minutes +_TIME_PARTS_RE = re.compile( + r"(?:(\d+)\s*d(?:ay)?s?)?" # days + r"\s*(?:(\d+)\s*h(?:our)?s?)?" # hours + r"\s*(?:(\d+)\s*m(?:in(?:ute)?s?)?)?" # minutes + r"|( tuple[int, str] | None: + """Convert a time string like '2h 47m', '1 day 4h', '< 1m' to milliseconds. + + Returns (ms, normalised_label) or None if nothing matches. + """ + s = time_str.strip() + if not s: + return None + + # "< 1m" / "< 1 minute" + if re.match(r"<\s*1\s*m", s, re.IGNORECASE): + return 30_000, "< 1m" # represent as 30 seconds + + m = _TIME_PARTS_RE.search(s) + if not m: + return None + + days = int(m.group(1) or 0) + hours = int(m.group(2) or 0) + minutes = int(m.group(3) or 0) + + total_ms = (days * 86400 + hours * 3600 + minutes * 60) * 1000 + if total_ms == 0: + return None + + # Build a compact human label + parts = [] + if days: parts.append(f"{days}d") + if hours: parts.append(f"{hours}h") + if minutes: parts.append(f"{minutes}m") + label = " ".join(parts) if parts else s + + return total_ms, label + + +def parse_claude_usage(raw: str) -> ParsedClaudeUsage: + """Parse a block of text from ``claude /usage`` into structured fields. + + This is a pure function — no I/O, no side effects. It never raises; + on failure it sets ``error`` and leaves numeric fields as None. + """ + result = ParsedClaudeUsage(raw_text=raw) + + if not raw or not raw.strip(): + result.error = "empty output" + return result + + # ---- percentage -------------------------------------------------------- + for m in _PCT_RE.finditer(raw): + pct = float(m.group(1)) + if 0.0 <= pct <= 100.0: + result.current_pct = pct + break # take the first plausible percentage + + # ---- remaining time ---------------------------------------------------- + time_str: str | None = None + + # Try inline "(resets in X)" pattern first (most specific) + inline = _INLINE_RESET_RE.search(raw) + if inline: + time_str = inline.group(1) + + # Fall back to line-level patterns + if not time_str: + line_m = _RESET_LINE_RE.search(raw) + if line_m: + time_str = line_m.group(1).strip() + + if time_str: + parsed_time = _parse_remaining_ms(time_str) + if parsed_time: + result.remaining_ms, result.remaining_label = parsed_time + + # ---- weekly stats ----------------------------------------------------- + # Try to find a "weekly" or "this week" section + weekly_section = raw + week_header = re.search( + r"(?:this\s+week|weekly|week\b)[^\n]*\n(.+?)(?=\n\s*\n|\Z)", + raw, re.IGNORECASE | re.DOTALL + ) + if week_header: + weekly_section = week_header.group(0) + + # messages (with optional limit) + msgs_m = _WEEKLY_MSGS_RE.search(weekly_section) + if msgs_m: + try: + # group(1) = "messages: N", group(3) = "N messages" + used_str = msgs_m.group(1) or msgs_m.group(3) + if used_str: + result.weekly_messages_used = _parse_int(used_str) + if msgs_m.group(2): + result.weekly_messages_limit = _parse_int(msgs_m.group(2)) + except (ValueError, TypeError): + pass + + # tokens — prefer input+output sum if both present + input_m = _WEEKLY_INPUT_RE.search(weekly_section) + output_m = _WEEKLY_OUTPUT_RE.search(weekly_section) + if input_m and output_m: + try: + result.weekly_tokens_used = ( + _parse_int(input_m.group(1)) + _parse_int(output_m.group(1)) + ) + except (ValueError, TypeError): + pass + elif not result.weekly_tokens_used: + tok_m = _WEEKLY_TOKENS_RE.search(weekly_section) + if tok_m: + try: + result.weekly_tokens_used = _parse_int(tok_m.group(1)) + except (ValueError, TypeError): + pass + + # Alt-key patterns: "weekly tokens: 9,876,543" anywhere in the text + if result.weekly_tokens_used is None: + alt_tok = re.search( + r"weekly\s+tokens?[:\s]+([\d,]+)", raw, re.IGNORECASE + ) + if alt_tok: + try: + result.weekly_tokens_used = _parse_int(alt_tok.group(1)) + except (ValueError, TypeError): + pass + + # cost + cost_m = _WEEKLY_COST_RE.search(weekly_section) + if cost_m: + try: + result.weekly_cost_usd = _parse_float(cost_m.group(1)) + except (ValueError, TypeError): + pass + + # ---- validation -------------------------------------------------------- + all_none = ( + result.current_pct is None + and result.remaining_ms is None + and result.weekly_messages_used is None + and result.weekly_tokens_used is None + ) + if all_none: + result.error = "no parseable usage data found" + + return result + + +# --------------------------------------------------------------------------- +# Adapter interface +# --------------------------------------------------------------------------- + +class RuntimeUsageProviderAdapter(ABC): + """Abstract base for provider-native usage scrapers.""" + + provider: ClassVar[str] + source_name: ClassVar[str] + freshness_ttl_seconds: ClassVar[int] = 300 + + @abstractmethod + async def fetch_raw(self) -> str: + """Return the raw text output from the provider's usage source.""" + + @abstractmethod + def parse(self, raw: str) -> ParsedClaudeUsage: + """Parse raw text into structured fields.""" + + def is_available(self) -> bool: + """Return True if the prerequisites for this adapter exist on this host.""" + return True + + async def scrape(self) -> ScrapeResult: + """Run fetch_raw + parse, returning a ScrapeResult regardless of errors.""" + now = utcnow() + try: + raw = await self.fetch_raw() + parsed = self.parse(raw) + return ScrapeResult( + provider=self.provider, + source_name=self.source_name, + scraped_at=now, + fresh=True, + freshness_ttl_seconds=self.freshness_ttl_seconds, + parsed=parsed, + ) + except Exception as exc: + logger.warning( + "usage_scraper.fetch_failed provider=%s source=%s error=%s", + self.provider, self.source_name, exc, + ) + return ScrapeResult( + provider=self.provider, + source_name=self.source_name, + scraped_at=now, + fresh=False, + freshness_ttl_seconds=self.freshness_ttl_seconds, + parsed=ParsedClaudeUsage(raw_text="", error=str(exc)), + error=str(exc), + ) + + +# --------------------------------------------------------------------------- +# Claude CLI tmux adapter +# --------------------------------------------------------------------------- + +_TMUX_WAIT_SECONDS = 2.0 # seconds to wait after sending /usage +_TMUX_CAPTURE_LINES = 80 # lines to capture from the tmux pane + + +async def _run( + *args: str, + timeout: float = 5.0, +) -> tuple[str, str]: + """Run a subprocess and return (stdout, stderr).""" + proc = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) + return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace") + + +async def _find_claude_tmux_pane(tmux_socket: str = "") -> str | None: + """Find the first tmux pane running ``claude``; return pane id or None.""" + base = ["tmux"] + if tmux_socket: + base += ["-S", tmux_socket] + try: + stdout, _ = await _run( + *base, + "list-panes", "-a", + "-F", "#{pane_id}:#{pane_current_command}", + ) + except (FileNotFoundError, asyncio.TimeoutError, OSError): + return None + + for line in stdout.splitlines(): + parts = line.strip().split(":", 1) + if len(parts) == 2: + pane_id, command = parts + if "claude" in command.lower(): + return pane_id + return None + + +async def _tmux_send_and_capture( + pane_id: str, + command: str = "/usage", + tmux_socket: str = "", + wait_seconds: float = _TMUX_WAIT_SECONDS, +) -> str: + """Send a command to a tmux pane and return the captured output.""" + base = ["tmux"] + if tmux_socket: + base += ["-S", tmux_socket] + + # Clear the pane buffer so we capture fresh output + await _run(*base, "clear-history", "-t", pane_id) + # Send the command + await _run(*base, "send-keys", "-t", pane_id, command, "Enter") + # Wait for the response to render + await asyncio.sleep(wait_seconds) + # Capture the pane contents + stdout, _ = await _run( + *base, + "capture-pane", "-pt", pane_id, + "-J", # join wrapped lines + "-e", # include escape sequences (ignored in text mode) + ) + # Strip ANSI escape codes + ansi_re = re.compile(r"\x1b\[[0-9;]*[mGKHF]") + return ansi_re.sub("", stdout) + + +class ClaudeTmuxScraper(RuntimeUsageProviderAdapter): + """Scraper that sends ``/usage`` to an active Claude tmux session. + + Requires: + - tmux is installed and accessible from the backend process. + - At least one tmux pane is running the ``claude`` CLI. + - For Docker installs: mount the host tmux socket into the container. + + This adapter is fragile by design — CLI output format changes over time. + Treat results as supplemental hints, not accounting truth. + """ + + provider: ClassVar[str] = "anthropic" + source_name: ClassVar[str] = "claude_cli_tmux" + freshness_ttl_seconds: ClassVar[int] = 300 + + def __init__( + self, + tmux_socket: str = "", + wait_seconds: float = _TMUX_WAIT_SECONDS, + ) -> None: + self.tmux_socket = tmux_socket + self.wait_seconds = wait_seconds + + def is_available(self) -> bool: + import shutil + return shutil.which("tmux") is not None + + async def fetch_raw(self) -> str: + pane_id = await _find_claude_tmux_pane(self.tmux_socket) + if pane_id is None: + raise RuntimeError( + "No tmux pane found running 'claude'. " + "Start a Claude session before enabling the tmux scraper." + ) + return await _tmux_send_and_capture( + pane_id, + tmux_socket=self.tmux_socket, + wait_seconds=self.wait_seconds, + ) + + def parse(self, raw: str) -> ParsedClaudeUsage: + return parse_claude_usage(raw) + + +# --------------------------------------------------------------------------- +# Simple TTL cache +# --------------------------------------------------------------------------- + +class UsageScraperCache: + """In-memory TTL cache for scrape results. + + Optionally writes results to a cache directory so they survive + backend restarts (useful for slow-changing subscription data). + """ + + def __init__( + self, + ttl_seconds: int = 300, + cache_dir: str = "", + ) -> None: + self.ttl_seconds = ttl_seconds + self._memory: dict[str, tuple[datetime, ScrapeResult]] = {} + self._cache_dir = Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "pipeline-usage-cache" + + def _key(self, gateway_id: str, source_name: str) -> str: + return f"{gateway_id}:{source_name}" + + def get(self, gateway_id: str, source_name: str) -> ScrapeResult | None: + key = self._key(gateway_id, source_name) + entry = self._memory.get(key) + if entry is None: + return None + cached_at, result = entry + age = (utcnow() - cached_at).total_seconds() + if age > self.ttl_seconds: + del self._memory[key] + return None + return result + + def set(self, gateway_id: str, source_name: str, result: ScrapeResult) -> None: + key = self._key(gateway_id, source_name) + self._memory[key] = (utcnow(), result) + + +# Module-level singleton cache used by the endpoint +_cache = UsageScraperCache() + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + +def build_adapters( + enabled_providers: list[str], + tmux_socket: str = "", +) -> list[RuntimeUsageProviderAdapter]: + """Instantiate enabled adapters from a list of provider names.""" + registry: dict[str, type[RuntimeUsageProviderAdapter]] = { + "claude_cli_tmux": ClaudeTmuxScraper, + } + adapters = [] + for name in enabled_providers: + cls = registry.get(name.strip().lower()) + if cls is None: + logger.warning("usage_scraper.unknown_provider name=%s", name) + continue + kwargs: dict = {} + if name == "claude_cli_tmux": + kwargs["tmux_socket"] = tmux_socket + adapter = cls(**kwargs) + if adapter.is_available(): + adapters.append(adapter) + else: + logger.info("usage_scraper.not_available provider=%s", name) + return adapters + + +async def get_provider_usage( + gateway_id: str, + enabled_providers: list[str], + tmux_socket: str = "", + include_raw: bool = False, +) -> list[ScrapeResult]: + """Run all enabled adapters for a gateway, using cache where fresh. + + Returns one ScrapeResult per adapter (errors included, never raises). + """ + adapters = build_adapters(enabled_providers, tmux_socket=tmux_socket) + results: list[ScrapeResult] = [] + for adapter in adapters: + cached = _cache.get(gateway_id, adapter.source_name) + if cached is not None: + results.append(cached) + continue + result = await adapter.scrape() + _cache.set(gateway_id, adapter.source_name, result) + if not include_raw and result.parsed: + result.parsed.raw_text = None # strip unless explicitly requested + results.append(result) + logger.info( + "usage_scraper.result gateway_id=%s provider=%s source=%s " + "pct=%s remaining_ms=%s error=%s", + gateway_id, + result.provider, + result.source_name, + result.parsed.current_pct, + result.parsed.remaining_ms, + result.error, + ) + return results diff --git a/backend/tests/test_runtime_usage_scrapers.py b/backend/tests/test_runtime_usage_scrapers.py new file mode 100644 index 0000000..7549dd7 --- /dev/null +++ b/backend/tests/test_runtime_usage_scrapers.py @@ -0,0 +1,289 @@ +# ruff: noqa: INP001 +"""Tests for provider usage scraper parsers. + +All tests are pure-Python — no subprocess, no tmux, no gateway connection. +Each fixture string represents a realistic sample of `claude /usage` output. +Tests are written first and drive the parser implementation. +""" + +from __future__ import annotations + +import pytest + +# We import the parse function directly — it is pure and has no side effects. +from app.services.openclaw.usage_scrapers import parse_claude_usage + + +# --------------------------------------------------------------------------- +# Fixture text samples +# --------------------------------------------------------------------------- + +# Standard output from `claude /usage` in Claude Code (subscription plan) +FIXTURE_STANDARD = """ +╭──────────────────────────────────────────────────────────────────╮ +│ Claude Code Usage │ +╰──────────────────────────────────────────────────────────────────╯ + + Usage window (resets in 2h 47m): + 67% of limit used + + This week (Mon – Sun): + Messages: 234 + Input tokens: 1,234,567 + Output tokens: 456,789 + Est. cost: $4.23 +""" + +# Minimal output — just the percentage and reset time +FIXTURE_MINIMAL = """ +Rate limit: 45% used +Resets in: 3h 15m +""" + +# Output with messages limit shown as X/Y +FIXTURE_WITH_LIMIT = """ +Usage window resets in 1h 5m: + Messages: 178 / 500 (35% used) + Input tokens: 890,123 + Output tokens: 234,567 + +Weekly usage: + Messages: 892 + Cost: $8.76 +""" + +# Sub-minute remaining time +FIXTURE_ALMOST_RESET = """ +Usage: 99% of window used +Resets in: 42m +""" + +# Only minutes remaining without hours +FIXTURE_MINUTES_ONLY = """ +Context: 72% used +Time remaining: 28m +""" + +# Days + hours format +FIXTURE_DAYS_HOURS = """ +Next reset: in 1 day 4h +Current usage: 12% +""" + +# "< 1 minute" edge case +FIXTURE_UNDER_ONE_MINUTE = """ +Usage: 100% +Resets in: < 1m +""" + +# Output without any percentage but with reset time +FIXTURE_NO_PCT = """ +Session active. +Window resets in: 4h 0m +No usage data available. +""" + +# Output with zero usage +FIXTURE_ZERO_USAGE = """ +Usage window (resets in 5h 0m): + 0% of limit used + +Weekly: 0 messages, $0.00 +""" + +# Completely empty +FIXTURE_EMPTY = "" + +# Garbage / error text +FIXTURE_ERROR = "Error: claude: command not found" + +# Multi-line noise around the real data +FIXTURE_NOISY = """ +... +Checking session... +Fetching usage stats... + +Usage (resets in 2h 0m): 55% used + +Messages this week: 301 + +...Done. +""" + +# Alternate key casing variants that some versions might use +FIXTURE_ALT_KEYS = """ +rate limit window: 89% Used +RESETS IN: 0h 30m +weekly messages: 412 +weekly tokens: 9,876,543 +""" + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestParseClaudeUsagePercentage: + + def test_standard_percentage(self): + r = parse_claude_usage(FIXTURE_STANDARD) + assert r.current_pct == pytest.approx(67.0) + + def test_minimal_percentage(self): + r = parse_claude_usage(FIXTURE_MINIMAL) + assert r.current_pct == pytest.approx(45.0) + + def test_with_limit_percentage(self): + r = parse_claude_usage(FIXTURE_WITH_LIMIT) + assert r.current_pct == pytest.approx(35.0) + + def test_near_full_percentage(self): + r = parse_claude_usage(FIXTURE_ALMOST_RESET) + assert r.current_pct == pytest.approx(99.0) + + def test_zero_percentage(self): + r = parse_claude_usage(FIXTURE_ZERO_USAGE) + assert r.current_pct == pytest.approx(0.0) + + def test_no_percentage_returns_none(self): + r = parse_claude_usage(FIXTURE_NO_PCT) + assert r.current_pct is None + + def test_empty_returns_none(self): + r = parse_claude_usage(FIXTURE_EMPTY) + assert r.current_pct is None + + def test_error_text_returns_none(self): + r = parse_claude_usage(FIXTURE_ERROR) + assert r.current_pct is None + + def test_alt_key_casing(self): + r = parse_claude_usage(FIXTURE_ALT_KEYS) + assert r.current_pct == pytest.approx(89.0) + + def test_noisy_output(self): + r = parse_claude_usage(FIXTURE_NOISY) + assert r.current_pct == pytest.approx(55.0) + + +class TestParseClaudeUsageRemainingTime: + + def test_hours_and_minutes(self): + r = parse_claude_usage(FIXTURE_STANDARD) + # 2h 47m = 10020 seconds = 10,020,000 ms + assert r.remaining_ms == pytest.approx(10_020_000, rel=1e-3) + assert r.remaining_label == "2h 47m" + + def test_hours_and_minutes_variant(self): + r = parse_claude_usage(FIXTURE_MINIMAL) + # 3h 15m = 11700 seconds + assert r.remaining_ms == pytest.approx(11_700_000, rel=1e-3) + + def test_minutes_only(self): + r = parse_claude_usage(FIXTURE_MINUTES_ONLY) + # 28m = 1680 seconds + assert r.remaining_ms == pytest.approx(1_680_000, rel=1e-3) + assert r.remaining_label == "28m" + + def test_days_and_hours(self): + r = parse_claude_usage(FIXTURE_DAYS_HOURS) + # 1 day 4h = 28 hours = 100800 seconds + assert r.remaining_ms == pytest.approx(100_800_000, rel=1e-3) + + def test_under_one_minute(self): + r = parse_claude_usage(FIXTURE_UNDER_ONE_MINUTE) + assert r.remaining_ms is not None + assert r.remaining_ms < 60_000 # less than 1 minute + + def test_short_reset(self): + r = parse_claude_usage(FIXTURE_ALMOST_RESET) + # 42m = 2520 seconds + assert r.remaining_ms == pytest.approx(2_520_000, rel=1e-3) + + def test_no_time_returns_none(self): + r = parse_claude_usage(FIXTURE_ERROR) + assert r.remaining_ms is None + assert r.remaining_label is None + + def test_empty_returns_none(self): + r = parse_claude_usage(FIXTURE_EMPTY) + assert r.remaining_ms is None + + def test_with_limit_time(self): + r = parse_claude_usage(FIXTURE_WITH_LIMIT) + # 1h 5m = 3900 seconds + assert r.remaining_ms == pytest.approx(3_900_000, rel=1e-3) + + def test_hours_only(self): + r = parse_claude_usage(FIXTURE_NO_PCT) + # 4h 0m = 14400 seconds + assert r.remaining_ms == pytest.approx(14_400_000, rel=1e-3) + + def test_alt_keys_30m(self): + r = parse_claude_usage(FIXTURE_ALT_KEYS) + # "0h 30m" = 30m = 1800s + assert r.remaining_ms == pytest.approx(1_800_000, rel=1e-3) + + +class TestParseClaudeUsageWeeklyStats: + + def test_weekly_messages_no_limit(self): + r = parse_claude_usage(FIXTURE_STANDARD) + assert r.weekly_messages_used == 234 + assert r.weekly_messages_limit is None # no limit shown + + def test_weekly_messages_with_limit(self): + r = parse_claude_usage(FIXTURE_WITH_LIMIT) + assert r.weekly_messages_used == 892 + + def test_weekly_tokens(self): + r = parse_claude_usage(FIXTURE_STANDARD) + # Input + output = 1,234,567 + 456,789 = 1,691,356 + assert r.weekly_tokens_used == 1_691_356 + + def test_weekly_cost(self): + r = parse_claude_usage(FIXTURE_STANDARD) + assert r.weekly_cost_usd == pytest.approx(4.23, rel=1e-3) + + def test_zero_weekly(self): + r = parse_claude_usage(FIXTURE_ZERO_USAGE) + assert r.weekly_messages_used == 0 + assert r.weekly_cost_usd == pytest.approx(0.0) + + def test_no_weekly_returns_none(self): + r = parse_claude_usage(FIXTURE_MINIMAL) + assert r.weekly_messages_used is None + assert r.weekly_cost_usd is None + + def test_alt_keys_weekly_tokens(self): + r = parse_claude_usage(FIXTURE_ALT_KEYS) + assert r.weekly_tokens_used == 9_876_543 + + +class TestParseClaudeUsageEdgeCases: + + def test_returns_dataclass_always(self): + """parse_claude_usage never raises — it always returns a result.""" + for fixture in [ + FIXTURE_STANDARD, FIXTURE_MINIMAL, FIXTURE_EMPTY, + FIXTURE_ERROR, FIXTURE_NOISY, FIXTURE_ALT_KEYS, + ]: + result = parse_claude_usage(fixture) + assert result is not None + + def test_error_field_on_empty(self): + r = parse_claude_usage(FIXTURE_EMPTY) + assert r.error is not None # signals "no parseable content" + + def test_no_error_on_good_output(self): + r = parse_claude_usage(FIXTURE_STANDARD) + assert r.error is None + + def test_raw_text_preserved(self): + r = parse_claude_usage(FIXTURE_MINIMAL) + assert r.raw_text == FIXTURE_MINIMAL + + def test_messages_comma_separated_numbers(self): + """Numbers like 1,234,567 must be parsed correctly.""" + r = parse_claude_usage(FIXTURE_STANDARD) + assert r.weekly_tokens_used == 1_691_356 # 1,234,567 + 456,789