feat(runtime-usage): add provider usage scrapers as optional local adapters (batch 3, #32)

This commit is contained in:
null 2026-05-20 20:55:05 -05:00
parent 609d04095d
commit 1a847133ce
5 changed files with 971 additions and 1 deletions

View File

@ -10,6 +10,8 @@ from sqlmodel import col
from app.api.deps import require_org_admin, require_org_member from app.api.deps import require_org_admin, require_org_member
from app.core.auth import AuthContext, get_auth_context from app.core.auth import AuthContext, get_auth_context
from app.core.config import settings
from app.core.time import utcnow
from app.db import crud from app.db import crud
from app.db.pagination import paginate from app.db.pagination import paginate
from app.db.session import get_session from app.db.session import get_session
@ -26,8 +28,13 @@ from app.schemas.gateways import (
GatewayTemplatesSyncResult, GatewayTemplatesSyncResult,
GatewayUpdate, GatewayUpdate,
) )
from app.schemas.runtime_usage import RuntimeUsageResponse from app.schemas.runtime_usage import (
ProviderUsageResponse,
ProviderUsageScrapeResult,
RuntimeUsageResponse,
)
from app.services.openclaw.runtime_usage import get_runtime_usage from app.services.openclaw.runtime_usage import get_runtime_usage
from app.services.openclaw.usage_scrapers import get_provider_usage
from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.pagination import DefaultLimitOffsetPage
from app.services.openclaw.admin_service import GatewayAdminLifecycleService from app.services.openclaw.admin_service import GatewayAdminLifecycleService
from app.services.openclaw.session_service import GatewayTemplateSyncQuery from app.services.openclaw.session_service import GatewayTemplateSyncQuery
@ -272,6 +279,79 @@ async def get_gateway_runtime_usage(
) )
@router.get(
"/{gateway_id}/provider-usage",
response_model=ProviderUsageResponse,
summary="Gateway provider-native usage (opt-in scraper)",
description=(
"Return provider-native subscription usage data scraped from the CLI "
"(e.g. ``claude /usage``). Returns an empty results list when "
"USAGE_SCRAPER_ENABLED=false (the default). "
"Enable with USAGE_SCRAPER_ENABLED=true and ensure the required "
"prerequisites (tmux, claude binary) are accessible from the backend process."
),
)
async def get_gateway_provider_usage(
gateway_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ProviderUsageResponse:
"""Scrape provider-native usage for the specified gateway (opt-in)."""
service = GatewayAdminLifecycleService(session)
await service.require_gateway(
gateway_id=gateway_id,
organization_id=ctx.organization.id,
)
now = utcnow()
if not settings.usage_scraper_enabled:
return ProviderUsageResponse(
gateway_id=gateway_id,
generated_at=now,
scraper_enabled=False,
results=[],
)
enabled_providers = [
p.strip()
for p in settings.usage_scraper_providers.split(",")
if p.strip()
]
scrape_results = await get_provider_usage(
gateway_id=str(gateway_id),
enabled_providers=enabled_providers,
tmux_socket=settings.usage_scraper_tmux_socket,
include_raw=settings.usage_scraper_include_raw,
)
results = [
ProviderUsageScrapeResult(
provider=r.provider,
source_name=r.source_name,
scraped_at=r.scraped_at,
fresh=r.fresh,
freshness_ttl_seconds=r.freshness_ttl_seconds,
current_pct=r.parsed.current_pct,
remaining_ms=r.parsed.remaining_ms,
remaining_label=r.parsed.remaining_label,
weekly_messages_used=r.parsed.weekly_messages_used,
weekly_messages_limit=r.parsed.weekly_messages_limit,
weekly_tokens_used=r.parsed.weekly_tokens_used,
weekly_cost_usd=r.parsed.weekly_cost_usd,
raw_text=r.parsed.raw_text,
error=r.error or r.parsed.error,
)
for r in scrape_results
]
return ProviderUsageResponse(
gateway_id=gateway_id,
generated_at=now,
scraper_enabled=True,
results=results,
)
@router.delete("/{gateway_id}", response_model=OkResponse) @router.delete("/{gateway_id}", response_model=OkResponse)
async def delete_gateway( async def delete_gateway(
gateway_id: UUID, gateway_id: UUID,

View File

@ -89,6 +89,21 @@ class Settings(BaseSettings):
# OpenClaw gateway runtime compatibility # OpenClaw gateway runtime compatibility
gateway_min_version: str = "2026.02.9" gateway_min_version: str = "2026.02.9"
# Provider usage scrapers (opt-in; off by default)
usage_scraper_enabled: bool = False
# Comma-separated list of enabled provider adapters, e.g. "claude_cli_tmux"
usage_scraper_providers: str = "claude_cli_tmux"
# TTL in seconds before a cached scrape result is considered stale
usage_scraper_cache_ttl_seconds: int = 300
# Directory for scraper output cache files (defaults to system temp)
usage_scraper_cache_dir: str = ""
# Full path to the claude binary (empty = auto-detect on PATH)
usage_scraper_claude_bin: str = ""
# Tmux socket path (empty = default tmux socket)
usage_scraper_tmux_socket: str = ""
# Include raw CLI output in API response (useful for debugging; off by default)
usage_scraper_include_raw: bool = False
# Logging # Logging
log_level: str = "INFO" log_level: str = "INFO"
log_format: str = "text" log_format: str = "text"

View File

@ -72,6 +72,42 @@ class TopSession(SQLModel):
updated_at: str | None = None updated_at: str | None = None
class ProviderUsageScrapeResult(SQLModel):
"""Structured result from one provider-native usage scrape (e.g. Claude CLI /usage).
Returned by GET /gateways/{id}/provider-usage.
All fields are optional partial data is still useful and expected
when CLI output format changes or the session is quiet.
"""
provider: str # "anthropic", "openai", "google"
source_name: str # "claude_cli_tmux", "gemini_scrape", etc.
scraped_at: datetime
fresh: bool # True if within the freshness window
freshness_ttl_seconds: int
current_pct: float | None = None # 0100 % of current window used
remaining_ms: int | None = None # ms until window resets
remaining_label: str | None = None # human-readable "2h 47m"
weekly_messages_used: int | None = None
weekly_messages_limit: int | None = None
weekly_tokens_used: int | None = None
weekly_cost_usd: float | None = None
raw_text: str | None = None # included when DEBUG_SCRAPER_RAW=true
error: str | None = None # set when scrape or parse failed
class ProviderUsageResponse(SQLModel):
"""Response envelope for GET /gateways/{id}/provider-usage."""
gateway_id: UUID
generated_at: datetime
scraper_enabled: bool
results: list[ProviderUsageScrapeResult]
class RuntimeUsageResponse(SQLModel): class RuntimeUsageResponse(SQLModel):
"""Complete runtime usage payload returned by GET /gateways/{id}/runtime-usage.""" """Complete runtime usage payload returned by GET /gateways/{id}/runtime-usage."""

View File

@ -0,0 +1,550 @@
"""Provider-native usage scrapers for supplemental limit/percentage data.
Design principles:
- Opt-in only: all scrapers are disabled by default (USAGE_SCRAPER_ENABLED=false).
- Supplemental: scraper data enriches JSONL-based metrics; it is never the
primary source of truth for tokens or spend.
- Isolated: each provider adapter is independent and can fail without
affecting the rest of the system.
- Testable: parse_claude_usage() is a pure function with no side effects.
- Fragile by nature: CLI text output changes treat results as best-effort.
Tmux requirement for ClaudeTmuxScraper:
The backend process must have access to the host's tmux socket. For
Docker-based local installs, mount the socket:
-v /tmp/tmux-1000:/tmp/tmux-1000 (or whatever $TMUX uses)
For bare-metal runs, no extra config is needed.
"""
from __future__ import annotations
import asyncio
import os
import re
import tempfile
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import ClassVar
from app.core.logging import get_logger
from app.core.time import utcnow
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Internal result dataclass (not a schema — stays inside the service layer)
# ---------------------------------------------------------------------------
@dataclass
class ParsedClaudeUsage:
"""Structured result from parsing one block of ``claude /usage`` text."""
raw_text: str
current_pct: float | None = None
remaining_ms: int | None = None
remaining_label: str | None = None
weekly_messages_used: int | None = None
weekly_messages_limit: int | None = None
weekly_tokens_used: int | None = None
weekly_cost_usd: float | None = None
error: str | None = None
@dataclass
class ScrapeResult:
"""Complete result from one scraper invocation (parsed + metadata)."""
provider: str
source_name: str
scraped_at: datetime
fresh: bool
freshness_ttl_seconds: int
parsed: ParsedClaudeUsage
error: str | None = None
# ---------------------------------------------------------------------------
# Parser — pure function, no I/O
# ---------------------------------------------------------------------------
# Strip thousands-separator commas from a numeric string
def _strip_commas(s: str) -> str:
return s.replace(",", "")
def _parse_int(s: str) -> int:
return int(_strip_commas(s.strip()))
def _parse_float(s: str) -> float:
return float(_strip_commas(s.strip()))
# Matches "67%" or "67.5%" anywhere on a line, with optional surrounding words
_PCT_RE = re.compile(
r"(?:^|[\s(])(\d+(?:\.\d+)?)\s*%",
re.MULTILINE,
)
# Matches "resets in", "resets at", "remaining", "time remaining", "next reset"
# followed by the time expression
_RESET_LINE_RE = re.compile(
r"(?:resets?\s+in[:\s]*|remaining[:\s]*|time\s+remaining[:\s]*|next\s+reset[:\s]*in[:\s]*)"
r"(.+?)(?:\)|$)",
re.IGNORECASE | re.MULTILINE,
)
# Also match "in 2h 47m" or "(resets in 2h 47m)" embedded in longer lines
_INLINE_RESET_RE = re.compile(
r"\(\s*resets?\s+in\s+(.+?)\s*\)",
re.IGNORECASE,
)
# Time components: optional days, optional hours, optional minutes
_TIME_PARTS_RE = re.compile(
r"(?:(\d+)\s*d(?:ay)?s?)?" # days
r"\s*(?:(\d+)\s*h(?:our)?s?)?" # hours
r"\s*(?:(\d+)\s*m(?:in(?:ute)?s?)?)?" # minutes
r"|(<?)\s*1\s*m", # "< 1m" edge case
re.IGNORECASE,
)
# Weekly/daily usage lines
_WEEKLY_MSGS_RE = re.compile(
# "messages: 234 / 500" OR "234 messages" OR "Messages: 234"
r"(?:messages?[:\s]+(\d[\d,]*)(?:\s*/\s*(\d[\d,]*))?)"
r"|(?:(\d[\d,]*)\s+messages?)",
re.IGNORECASE,
)
_WEEKLY_INPUT_RE = re.compile(r"input\s+tokens?[:\s]+([\d,]+)", re.IGNORECASE)
_WEEKLY_OUTPUT_RE = re.compile(r"output\s+tokens?[:\s]+([\d,]+)", re.IGNORECASE)
_WEEKLY_TOKENS_RE = re.compile(r"(?:total\s+)?tokens?[:\s]+([\d,]+)", re.IGNORECASE)
_WEEKLY_COST_RE = re.compile(r"\$\s*([\d]+\.[\d]{1,4})", re.IGNORECASE)
def _parse_remaining_ms(time_str: str) -> tuple[int, str] | None:
"""Convert a time string like '2h 47m', '1 day 4h', '< 1m' to milliseconds.
Returns (ms, normalised_label) or None if nothing matches.
"""
s = time_str.strip()
if not s:
return None
# "< 1m" / "< 1 minute"
if re.match(r"<\s*1\s*m", s, re.IGNORECASE):
return 30_000, "< 1m" # represent as 30 seconds
m = _TIME_PARTS_RE.search(s)
if not m:
return None
days = int(m.group(1) or 0)
hours = int(m.group(2) or 0)
minutes = int(m.group(3) or 0)
total_ms = (days * 86400 + hours * 3600 + minutes * 60) * 1000
if total_ms == 0:
return None
# Build a compact human label
parts = []
if days: parts.append(f"{days}d")
if hours: parts.append(f"{hours}h")
if minutes: parts.append(f"{minutes}m")
label = " ".join(parts) if parts else s
return total_ms, label
def parse_claude_usage(raw: str) -> ParsedClaudeUsage:
"""Parse a block of text from ``claude /usage`` into structured fields.
This is a pure function no I/O, no side effects. It never raises;
on failure it sets ``error`` and leaves numeric fields as None.
"""
result = ParsedClaudeUsage(raw_text=raw)
if not raw or not raw.strip():
result.error = "empty output"
return result
# ---- percentage --------------------------------------------------------
for m in _PCT_RE.finditer(raw):
pct = float(m.group(1))
if 0.0 <= pct <= 100.0:
result.current_pct = pct
break # take the first plausible percentage
# ---- remaining time ----------------------------------------------------
time_str: str | None = None
# Try inline "(resets in X)" pattern first (most specific)
inline = _INLINE_RESET_RE.search(raw)
if inline:
time_str = inline.group(1)
# Fall back to line-level patterns
if not time_str:
line_m = _RESET_LINE_RE.search(raw)
if line_m:
time_str = line_m.group(1).strip()
if time_str:
parsed_time = _parse_remaining_ms(time_str)
if parsed_time:
result.remaining_ms, result.remaining_label = parsed_time
# ---- weekly stats -----------------------------------------------------
# Try to find a "weekly" or "this week" section
weekly_section = raw
week_header = re.search(
r"(?:this\s+week|weekly|week\b)[^\n]*\n(.+?)(?=\n\s*\n|\Z)",
raw, re.IGNORECASE | re.DOTALL
)
if week_header:
weekly_section = week_header.group(0)
# messages (with optional limit)
msgs_m = _WEEKLY_MSGS_RE.search(weekly_section)
if msgs_m:
try:
# group(1) = "messages: N", group(3) = "N messages"
used_str = msgs_m.group(1) or msgs_m.group(3)
if used_str:
result.weekly_messages_used = _parse_int(used_str)
if msgs_m.group(2):
result.weekly_messages_limit = _parse_int(msgs_m.group(2))
except (ValueError, TypeError):
pass
# tokens — prefer input+output sum if both present
input_m = _WEEKLY_INPUT_RE.search(weekly_section)
output_m = _WEEKLY_OUTPUT_RE.search(weekly_section)
if input_m and output_m:
try:
result.weekly_tokens_used = (
_parse_int(input_m.group(1)) + _parse_int(output_m.group(1))
)
except (ValueError, TypeError):
pass
elif not result.weekly_tokens_used:
tok_m = _WEEKLY_TOKENS_RE.search(weekly_section)
if tok_m:
try:
result.weekly_tokens_used = _parse_int(tok_m.group(1))
except (ValueError, TypeError):
pass
# Alt-key patterns: "weekly tokens: 9,876,543" anywhere in the text
if result.weekly_tokens_used is None:
alt_tok = re.search(
r"weekly\s+tokens?[:\s]+([\d,]+)", raw, re.IGNORECASE
)
if alt_tok:
try:
result.weekly_tokens_used = _parse_int(alt_tok.group(1))
except (ValueError, TypeError):
pass
# cost
cost_m = _WEEKLY_COST_RE.search(weekly_section)
if cost_m:
try:
result.weekly_cost_usd = _parse_float(cost_m.group(1))
except (ValueError, TypeError):
pass
# ---- validation --------------------------------------------------------
all_none = (
result.current_pct is None
and result.remaining_ms is None
and result.weekly_messages_used is None
and result.weekly_tokens_used is None
)
if all_none:
result.error = "no parseable usage data found"
return result
# ---------------------------------------------------------------------------
# Adapter interface
# ---------------------------------------------------------------------------
class RuntimeUsageProviderAdapter(ABC):
"""Abstract base for provider-native usage scrapers."""
provider: ClassVar[str]
source_name: ClassVar[str]
freshness_ttl_seconds: ClassVar[int] = 300
@abstractmethod
async def fetch_raw(self) -> str:
"""Return the raw text output from the provider's usage source."""
@abstractmethod
def parse(self, raw: str) -> ParsedClaudeUsage:
"""Parse raw text into structured fields."""
def is_available(self) -> bool:
"""Return True if the prerequisites for this adapter exist on this host."""
return True
async def scrape(self) -> ScrapeResult:
"""Run fetch_raw + parse, returning a ScrapeResult regardless of errors."""
now = utcnow()
try:
raw = await self.fetch_raw()
parsed = self.parse(raw)
return ScrapeResult(
provider=self.provider,
source_name=self.source_name,
scraped_at=now,
fresh=True,
freshness_ttl_seconds=self.freshness_ttl_seconds,
parsed=parsed,
)
except Exception as exc:
logger.warning(
"usage_scraper.fetch_failed provider=%s source=%s error=%s",
self.provider, self.source_name, exc,
)
return ScrapeResult(
provider=self.provider,
source_name=self.source_name,
scraped_at=now,
fresh=False,
freshness_ttl_seconds=self.freshness_ttl_seconds,
parsed=ParsedClaudeUsage(raw_text="", error=str(exc)),
error=str(exc),
)
# ---------------------------------------------------------------------------
# Claude CLI tmux adapter
# ---------------------------------------------------------------------------
_TMUX_WAIT_SECONDS = 2.0 # seconds to wait after sending /usage
_TMUX_CAPTURE_LINES = 80 # lines to capture from the tmux pane
async def _run(
*args: str,
timeout: float = 5.0,
) -> tuple[str, str]:
"""Run a subprocess and return (stdout, stderr)."""
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace")
async def _find_claude_tmux_pane(tmux_socket: str = "") -> str | None:
"""Find the first tmux pane running ``claude``; return pane id or None."""
base = ["tmux"]
if tmux_socket:
base += ["-S", tmux_socket]
try:
stdout, _ = await _run(
*base,
"list-panes", "-a",
"-F", "#{pane_id}:#{pane_current_command}",
)
except (FileNotFoundError, asyncio.TimeoutError, OSError):
return None
for line in stdout.splitlines():
parts = line.strip().split(":", 1)
if len(parts) == 2:
pane_id, command = parts
if "claude" in command.lower():
return pane_id
return None
async def _tmux_send_and_capture(
pane_id: str,
command: str = "/usage",
tmux_socket: str = "",
wait_seconds: float = _TMUX_WAIT_SECONDS,
) -> str:
"""Send a command to a tmux pane and return the captured output."""
base = ["tmux"]
if tmux_socket:
base += ["-S", tmux_socket]
# Clear the pane buffer so we capture fresh output
await _run(*base, "clear-history", "-t", pane_id)
# Send the command
await _run(*base, "send-keys", "-t", pane_id, command, "Enter")
# Wait for the response to render
await asyncio.sleep(wait_seconds)
# Capture the pane contents
stdout, _ = await _run(
*base,
"capture-pane", "-pt", pane_id,
"-J", # join wrapped lines
"-e", # include escape sequences (ignored in text mode)
)
# Strip ANSI escape codes
ansi_re = re.compile(r"\x1b\[[0-9;]*[mGKHF]")
return ansi_re.sub("", stdout)
class ClaudeTmuxScraper(RuntimeUsageProviderAdapter):
"""Scraper that sends ``/usage`` to an active Claude tmux session.
Requires:
- tmux is installed and accessible from the backend process.
- At least one tmux pane is running the ``claude`` CLI.
- For Docker installs: mount the host tmux socket into the container.
This adapter is fragile by design CLI output format changes over time.
Treat results as supplemental hints, not accounting truth.
"""
provider: ClassVar[str] = "anthropic"
source_name: ClassVar[str] = "claude_cli_tmux"
freshness_ttl_seconds: ClassVar[int] = 300
def __init__(
self,
tmux_socket: str = "",
wait_seconds: float = _TMUX_WAIT_SECONDS,
) -> None:
self.tmux_socket = tmux_socket
self.wait_seconds = wait_seconds
def is_available(self) -> bool:
import shutil
return shutil.which("tmux") is not None
async def fetch_raw(self) -> str:
pane_id = await _find_claude_tmux_pane(self.tmux_socket)
if pane_id is None:
raise RuntimeError(
"No tmux pane found running 'claude'. "
"Start a Claude session before enabling the tmux scraper."
)
return await _tmux_send_and_capture(
pane_id,
tmux_socket=self.tmux_socket,
wait_seconds=self.wait_seconds,
)
def parse(self, raw: str) -> ParsedClaudeUsage:
return parse_claude_usage(raw)
# ---------------------------------------------------------------------------
# Simple TTL cache
# ---------------------------------------------------------------------------
class UsageScraperCache:
"""In-memory TTL cache for scrape results.
Optionally writes results to a cache directory so they survive
backend restarts (useful for slow-changing subscription data).
"""
def __init__(
self,
ttl_seconds: int = 300,
cache_dir: str = "",
) -> None:
self.ttl_seconds = ttl_seconds
self._memory: dict[str, tuple[datetime, ScrapeResult]] = {}
self._cache_dir = Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "pipeline-usage-cache"
def _key(self, gateway_id: str, source_name: str) -> str:
return f"{gateway_id}:{source_name}"
def get(self, gateway_id: str, source_name: str) -> ScrapeResult | None:
key = self._key(gateway_id, source_name)
entry = self._memory.get(key)
if entry is None:
return None
cached_at, result = entry
age = (utcnow() - cached_at).total_seconds()
if age > self.ttl_seconds:
del self._memory[key]
return None
return result
def set(self, gateway_id: str, source_name: str, result: ScrapeResult) -> None:
key = self._key(gateway_id, source_name)
self._memory[key] = (utcnow(), result)
# Module-level singleton cache used by the endpoint
_cache = UsageScraperCache()
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def build_adapters(
enabled_providers: list[str],
tmux_socket: str = "",
) -> list[RuntimeUsageProviderAdapter]:
"""Instantiate enabled adapters from a list of provider names."""
registry: dict[str, type[RuntimeUsageProviderAdapter]] = {
"claude_cli_tmux": ClaudeTmuxScraper,
}
adapters = []
for name in enabled_providers:
cls = registry.get(name.strip().lower())
if cls is None:
logger.warning("usage_scraper.unknown_provider name=%s", name)
continue
kwargs: dict = {}
if name == "claude_cli_tmux":
kwargs["tmux_socket"] = tmux_socket
adapter = cls(**kwargs)
if adapter.is_available():
adapters.append(adapter)
else:
logger.info("usage_scraper.not_available provider=%s", name)
return adapters
async def get_provider_usage(
gateway_id: str,
enabled_providers: list[str],
tmux_socket: str = "",
include_raw: bool = False,
) -> list[ScrapeResult]:
"""Run all enabled adapters for a gateway, using cache where fresh.
Returns one ScrapeResult per adapter (errors included, never raises).
"""
adapters = build_adapters(enabled_providers, tmux_socket=tmux_socket)
results: list[ScrapeResult] = []
for adapter in adapters:
cached = _cache.get(gateway_id, adapter.source_name)
if cached is not None:
results.append(cached)
continue
result = await adapter.scrape()
_cache.set(gateway_id, adapter.source_name, result)
if not include_raw and result.parsed:
result.parsed.raw_text = None # strip unless explicitly requested
results.append(result)
logger.info(
"usage_scraper.result gateway_id=%s provider=%s source=%s "
"pct=%s remaining_ms=%s error=%s",
gateway_id,
result.provider,
result.source_name,
result.parsed.current_pct,
result.parsed.remaining_ms,
result.error,
)
return results

View File

@ -0,0 +1,289 @@
# ruff: noqa: INP001
"""Tests for provider usage scraper parsers.
All tests are pure-Python no subprocess, no tmux, no gateway connection.
Each fixture string represents a realistic sample of `claude /usage` output.
Tests are written first and drive the parser implementation.
"""
from __future__ import annotations
import pytest
# We import the parse function directly — it is pure and has no side effects.
from app.services.openclaw.usage_scrapers import parse_claude_usage
# ---------------------------------------------------------------------------
# Fixture text samples
# ---------------------------------------------------------------------------
# Standard output from `claude /usage` in Claude Code (subscription plan)
FIXTURE_STANDARD = """
Claude Code Usage
Usage window (resets in 2h 47m):
67% of limit used
This week (Mon Sun):
Messages: 234
Input tokens: 1,234,567
Output tokens: 456,789
Est. cost: $4.23
"""
# Minimal output — just the percentage and reset time
FIXTURE_MINIMAL = """
Rate limit: 45% used
Resets in: 3h 15m
"""
# Output with messages limit shown as X/Y
FIXTURE_WITH_LIMIT = """
Usage window resets in 1h 5m:
Messages: 178 / 500 (35% used)
Input tokens: 890,123
Output tokens: 234,567
Weekly usage:
Messages: 892
Cost: $8.76
"""
# Sub-minute remaining time
FIXTURE_ALMOST_RESET = """
Usage: 99% of window used
Resets in: 42m
"""
# Only minutes remaining without hours
FIXTURE_MINUTES_ONLY = """
Context: 72% used
Time remaining: 28m
"""
# Days + hours format
FIXTURE_DAYS_HOURS = """
Next reset: in 1 day 4h
Current usage: 12%
"""
# "< 1 minute" edge case
FIXTURE_UNDER_ONE_MINUTE = """
Usage: 100%
Resets in: < 1m
"""
# Output without any percentage but with reset time
FIXTURE_NO_PCT = """
Session active.
Window resets in: 4h 0m
No usage data available.
"""
# Output with zero usage
FIXTURE_ZERO_USAGE = """
Usage window (resets in 5h 0m):
0% of limit used
Weekly: 0 messages, $0.00
"""
# Completely empty
FIXTURE_EMPTY = ""
# Garbage / error text
FIXTURE_ERROR = "Error: claude: command not found"
# Multi-line noise around the real data
FIXTURE_NOISY = """
...
Checking session...
Fetching usage stats...
Usage (resets in 2h 0m): 55% used
Messages this week: 301
...Done.
"""
# Alternate key casing variants that some versions might use
FIXTURE_ALT_KEYS = """
rate limit window: 89% Used
RESETS IN: 0h 30m
weekly messages: 412
weekly tokens: 9,876,543
"""
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestParseClaudeUsagePercentage:
def test_standard_percentage(self):
r = parse_claude_usage(FIXTURE_STANDARD)
assert r.current_pct == pytest.approx(67.0)
def test_minimal_percentage(self):
r = parse_claude_usage(FIXTURE_MINIMAL)
assert r.current_pct == pytest.approx(45.0)
def test_with_limit_percentage(self):
r = parse_claude_usage(FIXTURE_WITH_LIMIT)
assert r.current_pct == pytest.approx(35.0)
def test_near_full_percentage(self):
r = parse_claude_usage(FIXTURE_ALMOST_RESET)
assert r.current_pct == pytest.approx(99.0)
def test_zero_percentage(self):
r = parse_claude_usage(FIXTURE_ZERO_USAGE)
assert r.current_pct == pytest.approx(0.0)
def test_no_percentage_returns_none(self):
r = parse_claude_usage(FIXTURE_NO_PCT)
assert r.current_pct is None
def test_empty_returns_none(self):
r = parse_claude_usage(FIXTURE_EMPTY)
assert r.current_pct is None
def test_error_text_returns_none(self):
r = parse_claude_usage(FIXTURE_ERROR)
assert r.current_pct is None
def test_alt_key_casing(self):
r = parse_claude_usage(FIXTURE_ALT_KEYS)
assert r.current_pct == pytest.approx(89.0)
def test_noisy_output(self):
r = parse_claude_usage(FIXTURE_NOISY)
assert r.current_pct == pytest.approx(55.0)
class TestParseClaudeUsageRemainingTime:
def test_hours_and_minutes(self):
r = parse_claude_usage(FIXTURE_STANDARD)
# 2h 47m = 10020 seconds = 10,020,000 ms
assert r.remaining_ms == pytest.approx(10_020_000, rel=1e-3)
assert r.remaining_label == "2h 47m"
def test_hours_and_minutes_variant(self):
r = parse_claude_usage(FIXTURE_MINIMAL)
# 3h 15m = 11700 seconds
assert r.remaining_ms == pytest.approx(11_700_000, rel=1e-3)
def test_minutes_only(self):
r = parse_claude_usage(FIXTURE_MINUTES_ONLY)
# 28m = 1680 seconds
assert r.remaining_ms == pytest.approx(1_680_000, rel=1e-3)
assert r.remaining_label == "28m"
def test_days_and_hours(self):
r = parse_claude_usage(FIXTURE_DAYS_HOURS)
# 1 day 4h = 28 hours = 100800 seconds
assert r.remaining_ms == pytest.approx(100_800_000, rel=1e-3)
def test_under_one_minute(self):
r = parse_claude_usage(FIXTURE_UNDER_ONE_MINUTE)
assert r.remaining_ms is not None
assert r.remaining_ms < 60_000 # less than 1 minute
def test_short_reset(self):
r = parse_claude_usage(FIXTURE_ALMOST_RESET)
# 42m = 2520 seconds
assert r.remaining_ms == pytest.approx(2_520_000, rel=1e-3)
def test_no_time_returns_none(self):
r = parse_claude_usage(FIXTURE_ERROR)
assert r.remaining_ms is None
assert r.remaining_label is None
def test_empty_returns_none(self):
r = parse_claude_usage(FIXTURE_EMPTY)
assert r.remaining_ms is None
def test_with_limit_time(self):
r = parse_claude_usage(FIXTURE_WITH_LIMIT)
# 1h 5m = 3900 seconds
assert r.remaining_ms == pytest.approx(3_900_000, rel=1e-3)
def test_hours_only(self):
r = parse_claude_usage(FIXTURE_NO_PCT)
# 4h 0m = 14400 seconds
assert r.remaining_ms == pytest.approx(14_400_000, rel=1e-3)
def test_alt_keys_30m(self):
r = parse_claude_usage(FIXTURE_ALT_KEYS)
# "0h 30m" = 30m = 1800s
assert r.remaining_ms == pytest.approx(1_800_000, rel=1e-3)
class TestParseClaudeUsageWeeklyStats:
def test_weekly_messages_no_limit(self):
r = parse_claude_usage(FIXTURE_STANDARD)
assert r.weekly_messages_used == 234
assert r.weekly_messages_limit is None # no limit shown
def test_weekly_messages_with_limit(self):
r = parse_claude_usage(FIXTURE_WITH_LIMIT)
assert r.weekly_messages_used == 892
def test_weekly_tokens(self):
r = parse_claude_usage(FIXTURE_STANDARD)
# Input + output = 1,234,567 + 456,789 = 1,691,356
assert r.weekly_tokens_used == 1_691_356
def test_weekly_cost(self):
r = parse_claude_usage(FIXTURE_STANDARD)
assert r.weekly_cost_usd == pytest.approx(4.23, rel=1e-3)
def test_zero_weekly(self):
r = parse_claude_usage(FIXTURE_ZERO_USAGE)
assert r.weekly_messages_used == 0
assert r.weekly_cost_usd == pytest.approx(0.0)
def test_no_weekly_returns_none(self):
r = parse_claude_usage(FIXTURE_MINIMAL)
assert r.weekly_messages_used is None
assert r.weekly_cost_usd is None
def test_alt_keys_weekly_tokens(self):
r = parse_claude_usage(FIXTURE_ALT_KEYS)
assert r.weekly_tokens_used == 9_876_543
class TestParseClaudeUsageEdgeCases:
def test_returns_dataclass_always(self):
"""parse_claude_usage never raises — it always returns a result."""
for fixture in [
FIXTURE_STANDARD, FIXTURE_MINIMAL, FIXTURE_EMPTY,
FIXTURE_ERROR, FIXTURE_NOISY, FIXTURE_ALT_KEYS,
]:
result = parse_claude_usage(fixture)
assert result is not None
def test_error_field_on_empty(self):
r = parse_claude_usage(FIXTURE_EMPTY)
assert r.error is not None # signals "no parseable content"
def test_no_error_on_good_output(self):
r = parse_claude_usage(FIXTURE_STANDARD)
assert r.error is None
def test_raw_text_preserved(self):
r = parse_claude_usage(FIXTURE_MINIMAL)
assert r.raw_text == FIXTURE_MINIMAL
def test_messages_comma_separated_numbers(self):
"""Numbers like 1,234,567 must be parsed correctly."""
r = parse_claude_usage(FIXTURE_STANDARD)
assert r.weekly_tokens_used == 1_691_356 # 1,234,567 + 456,789