551 lines
18 KiB
Python
551 lines
18 KiB
Python
"""Provider-native usage scrapers for supplemental limit/percentage data.
|
|
|
|
Design principles:
|
|
- Opt-in only: all scrapers are disabled by default (USAGE_SCRAPER_ENABLED=false).
|
|
- Supplemental: scraper data enriches JSONL-based metrics; it is never the
|
|
primary source of truth for tokens or spend.
|
|
- Isolated: each provider adapter is independent and can fail without
|
|
affecting the rest of the system.
|
|
- Testable: parse_claude_usage() is a pure function with no side effects.
|
|
- Fragile by nature: CLI text output changes — treat results as best-effort.
|
|
|
|
Tmux requirement for ClaudeTmuxScraper:
|
|
The backend process must have access to the host's tmux socket. For
|
|
Docker-based local installs, mount the socket:
|
|
-v /tmp/tmux-1000:/tmp/tmux-1000 (or whatever $TMUX uses)
|
|
For bare-metal runs, no extra config is needed.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import re
|
|
import tempfile
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import ClassVar
|
|
|
|
from app.core.logging import get_logger
|
|
from app.core.time import utcnow
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal result dataclass (not a schema — stays inside the service layer)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class ParsedClaudeUsage:
|
|
"""Structured result from parsing one block of ``claude /usage`` text."""
|
|
|
|
raw_text: str
|
|
current_pct: float | None = None
|
|
remaining_ms: int | None = None
|
|
remaining_label: str | None = None
|
|
weekly_messages_used: int | None = None
|
|
weekly_messages_limit: int | None = None
|
|
weekly_tokens_used: int | None = None
|
|
weekly_cost_usd: float | None = None
|
|
error: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class ScrapeResult:
|
|
"""Complete result from one scraper invocation (parsed + metadata)."""
|
|
|
|
provider: str
|
|
source_name: str
|
|
scraped_at: datetime
|
|
fresh: bool
|
|
freshness_ttl_seconds: int
|
|
parsed: ParsedClaudeUsage
|
|
error: str | None = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parser — pure function, no I/O
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Strip thousands-separator commas from a numeric string
|
|
def _strip_commas(s: str) -> str:
|
|
return s.replace(",", "")
|
|
|
|
|
|
def _parse_int(s: str) -> int:
|
|
return int(_strip_commas(s.strip()))
|
|
|
|
|
|
def _parse_float(s: str) -> float:
|
|
return float(_strip_commas(s.strip()))
|
|
|
|
|
|
# Matches "67%" or "67.5%" anywhere on a line, with optional surrounding words
|
|
_PCT_RE = re.compile(
|
|
r"(?:^|[\s(])(\d+(?:\.\d+)?)\s*%",
|
|
re.MULTILINE,
|
|
)
|
|
|
|
# Matches "resets in", "resets at", "remaining", "time remaining", "next reset"
|
|
# followed by the time expression
|
|
_RESET_LINE_RE = re.compile(
|
|
r"(?:resets?\s+in[:\s]*|remaining[:\s]*|time\s+remaining[:\s]*|next\s+reset[:\s]*in[:\s]*)"
|
|
r"(.+?)(?:\)|$)",
|
|
re.IGNORECASE | re.MULTILINE,
|
|
)
|
|
|
|
# Also match "in 2h 47m" or "(resets in 2h 47m)" embedded in longer lines
|
|
_INLINE_RESET_RE = re.compile(
|
|
r"\(\s*resets?\s+in\s+(.+?)\s*\)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Time components: optional days, optional hours, optional minutes
|
|
_TIME_PARTS_RE = re.compile(
|
|
r"(?:(\d+)\s*d(?:ay)?s?)?" # days
|
|
r"\s*(?:(\d+)\s*h(?:our)?s?)?" # hours
|
|
r"\s*(?:(\d+)\s*m(?:in(?:ute)?s?)?)?" # minutes
|
|
r"|(<?)\s*1\s*m", # "< 1m" edge case
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Weekly/daily usage lines
|
|
_WEEKLY_MSGS_RE = re.compile(
|
|
# "messages: 234 / 500" OR "234 messages" OR "Messages: 234"
|
|
r"(?:messages?[:\s]+(\d[\d,]*)(?:\s*/\s*(\d[\d,]*))?)"
|
|
r"|(?:(\d[\d,]*)\s+messages?)",
|
|
re.IGNORECASE,
|
|
)
|
|
_WEEKLY_INPUT_RE = re.compile(r"input\s+tokens?[:\s]+([\d,]+)", re.IGNORECASE)
|
|
_WEEKLY_OUTPUT_RE = re.compile(r"output\s+tokens?[:\s]+([\d,]+)", re.IGNORECASE)
|
|
_WEEKLY_TOKENS_RE = re.compile(r"(?:total\s+)?tokens?[:\s]+([\d,]+)", re.IGNORECASE)
|
|
_WEEKLY_COST_RE = re.compile(r"\$\s*([\d]+\.[\d]{1,4})", re.IGNORECASE)
|
|
|
|
|
|
def _parse_remaining_ms(time_str: str) -> tuple[int, str] | None:
|
|
"""Convert a time string like '2h 47m', '1 day 4h', '< 1m' to milliseconds.
|
|
|
|
Returns (ms, normalised_label) or None if nothing matches.
|
|
"""
|
|
s = time_str.strip()
|
|
if not s:
|
|
return None
|
|
|
|
# "< 1m" / "< 1 minute"
|
|
if re.match(r"<\s*1\s*m", s, re.IGNORECASE):
|
|
return 30_000, "< 1m" # represent as 30 seconds
|
|
|
|
m = _TIME_PARTS_RE.search(s)
|
|
if not m:
|
|
return None
|
|
|
|
days = int(m.group(1) or 0)
|
|
hours = int(m.group(2) or 0)
|
|
minutes = int(m.group(3) or 0)
|
|
|
|
total_ms = (days * 86400 + hours * 3600 + minutes * 60) * 1000
|
|
if total_ms == 0:
|
|
return None
|
|
|
|
# Build a compact human label
|
|
parts = []
|
|
if days: parts.append(f"{days}d")
|
|
if hours: parts.append(f"{hours}h")
|
|
if minutes: parts.append(f"{minutes}m")
|
|
label = " ".join(parts) if parts else s
|
|
|
|
return total_ms, label
|
|
|
|
|
|
def parse_claude_usage(raw: str) -> ParsedClaudeUsage:
|
|
"""Parse a block of text from ``claude /usage`` into structured fields.
|
|
|
|
This is a pure function — no I/O, no side effects. It never raises;
|
|
on failure it sets ``error`` and leaves numeric fields as None.
|
|
"""
|
|
result = ParsedClaudeUsage(raw_text=raw)
|
|
|
|
if not raw or not raw.strip():
|
|
result.error = "empty output"
|
|
return result
|
|
|
|
# ---- percentage --------------------------------------------------------
|
|
for m in _PCT_RE.finditer(raw):
|
|
pct = float(m.group(1))
|
|
if 0.0 <= pct <= 100.0:
|
|
result.current_pct = pct
|
|
break # take the first plausible percentage
|
|
|
|
# ---- remaining time ----------------------------------------------------
|
|
time_str: str | None = None
|
|
|
|
# Try inline "(resets in X)" pattern first (most specific)
|
|
inline = _INLINE_RESET_RE.search(raw)
|
|
if inline:
|
|
time_str = inline.group(1)
|
|
|
|
# Fall back to line-level patterns
|
|
if not time_str:
|
|
line_m = _RESET_LINE_RE.search(raw)
|
|
if line_m:
|
|
time_str = line_m.group(1).strip()
|
|
|
|
if time_str:
|
|
parsed_time = _parse_remaining_ms(time_str)
|
|
if parsed_time:
|
|
result.remaining_ms, result.remaining_label = parsed_time
|
|
|
|
# ---- weekly stats -----------------------------------------------------
|
|
# Try to find a "weekly" or "this week" section
|
|
weekly_section = raw
|
|
week_header = re.search(
|
|
r"(?:this\s+week|weekly|week\b)[^\n]*\n(.+?)(?=\n\s*\n|\Z)",
|
|
raw, re.IGNORECASE | re.DOTALL
|
|
)
|
|
if week_header:
|
|
weekly_section = week_header.group(0)
|
|
|
|
# messages (with optional limit)
|
|
msgs_m = _WEEKLY_MSGS_RE.search(weekly_section)
|
|
if msgs_m:
|
|
try:
|
|
# group(1) = "messages: N", group(3) = "N messages"
|
|
used_str = msgs_m.group(1) or msgs_m.group(3)
|
|
if used_str:
|
|
result.weekly_messages_used = _parse_int(used_str)
|
|
if msgs_m.group(2):
|
|
result.weekly_messages_limit = _parse_int(msgs_m.group(2))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# tokens — prefer input+output sum if both present
|
|
input_m = _WEEKLY_INPUT_RE.search(weekly_section)
|
|
output_m = _WEEKLY_OUTPUT_RE.search(weekly_section)
|
|
if input_m and output_m:
|
|
try:
|
|
result.weekly_tokens_used = (
|
|
_parse_int(input_m.group(1)) + _parse_int(output_m.group(1))
|
|
)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
elif not result.weekly_tokens_used:
|
|
tok_m = _WEEKLY_TOKENS_RE.search(weekly_section)
|
|
if tok_m:
|
|
try:
|
|
result.weekly_tokens_used = _parse_int(tok_m.group(1))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Alt-key patterns: "weekly tokens: 9,876,543" anywhere in the text
|
|
if result.weekly_tokens_used is None:
|
|
alt_tok = re.search(
|
|
r"weekly\s+tokens?[:\s]+([\d,]+)", raw, re.IGNORECASE
|
|
)
|
|
if alt_tok:
|
|
try:
|
|
result.weekly_tokens_used = _parse_int(alt_tok.group(1))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# cost
|
|
cost_m = _WEEKLY_COST_RE.search(weekly_section)
|
|
if cost_m:
|
|
try:
|
|
result.weekly_cost_usd = _parse_float(cost_m.group(1))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# ---- validation --------------------------------------------------------
|
|
all_none = (
|
|
result.current_pct is None
|
|
and result.remaining_ms is None
|
|
and result.weekly_messages_used is None
|
|
and result.weekly_tokens_used is None
|
|
)
|
|
if all_none:
|
|
result.error = "no parseable usage data found"
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Adapter interface
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class RuntimeUsageProviderAdapter(ABC):
|
|
"""Abstract base for provider-native usage scrapers."""
|
|
|
|
provider: ClassVar[str]
|
|
source_name: ClassVar[str]
|
|
freshness_ttl_seconds: ClassVar[int] = 300
|
|
|
|
@abstractmethod
|
|
async def fetch_raw(self) -> str:
|
|
"""Return the raw text output from the provider's usage source."""
|
|
|
|
@abstractmethod
|
|
def parse(self, raw: str) -> ParsedClaudeUsage:
|
|
"""Parse raw text into structured fields."""
|
|
|
|
def is_available(self) -> bool:
|
|
"""Return True if the prerequisites for this adapter exist on this host."""
|
|
return True
|
|
|
|
async def scrape(self) -> ScrapeResult:
|
|
"""Run fetch_raw + parse, returning a ScrapeResult regardless of errors."""
|
|
now = utcnow()
|
|
try:
|
|
raw = await self.fetch_raw()
|
|
parsed = self.parse(raw)
|
|
return ScrapeResult(
|
|
provider=self.provider,
|
|
source_name=self.source_name,
|
|
scraped_at=now,
|
|
fresh=True,
|
|
freshness_ttl_seconds=self.freshness_ttl_seconds,
|
|
parsed=parsed,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"usage_scraper.fetch_failed provider=%s source=%s error=%s",
|
|
self.provider, self.source_name, exc,
|
|
)
|
|
return ScrapeResult(
|
|
provider=self.provider,
|
|
source_name=self.source_name,
|
|
scraped_at=now,
|
|
fresh=False,
|
|
freshness_ttl_seconds=self.freshness_ttl_seconds,
|
|
parsed=ParsedClaudeUsage(raw_text="", error=str(exc)),
|
|
error=str(exc),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Claude CLI tmux adapter
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_TMUX_WAIT_SECONDS = 2.0 # seconds to wait after sending /usage
|
|
_TMUX_CAPTURE_LINES = 80 # lines to capture from the tmux pane
|
|
|
|
|
|
async def _run(
|
|
*args: str,
|
|
timeout: float = 5.0,
|
|
) -> tuple[str, str]:
|
|
"""Run a subprocess and return (stdout, stderr)."""
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*args,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace")
|
|
|
|
|
|
async def _find_claude_tmux_pane(tmux_socket: str = "") -> str | None:
|
|
"""Find the first tmux pane running ``claude``; return pane id or None."""
|
|
base = ["tmux"]
|
|
if tmux_socket:
|
|
base += ["-S", tmux_socket]
|
|
try:
|
|
stdout, _ = await _run(
|
|
*base,
|
|
"list-panes", "-a",
|
|
"-F", "#{pane_id}:#{pane_current_command}",
|
|
)
|
|
except (FileNotFoundError, asyncio.TimeoutError, OSError):
|
|
return None
|
|
|
|
for line in stdout.splitlines():
|
|
parts = line.strip().split(":", 1)
|
|
if len(parts) == 2:
|
|
pane_id, command = parts
|
|
if "claude" in command.lower():
|
|
return pane_id
|
|
return None
|
|
|
|
|
|
async def _tmux_send_and_capture(
|
|
pane_id: str,
|
|
command: str = "/usage",
|
|
tmux_socket: str = "",
|
|
wait_seconds: float = _TMUX_WAIT_SECONDS,
|
|
) -> str:
|
|
"""Send a command to a tmux pane and return the captured output."""
|
|
base = ["tmux"]
|
|
if tmux_socket:
|
|
base += ["-S", tmux_socket]
|
|
|
|
# Clear the pane buffer so we capture fresh output
|
|
await _run(*base, "clear-history", "-t", pane_id)
|
|
# Send the command
|
|
await _run(*base, "send-keys", "-t", pane_id, command, "Enter")
|
|
# Wait for the response to render
|
|
await asyncio.sleep(wait_seconds)
|
|
# Capture the pane contents
|
|
stdout, _ = await _run(
|
|
*base,
|
|
"capture-pane", "-pt", pane_id,
|
|
"-J", # join wrapped lines
|
|
"-e", # include escape sequences (ignored in text mode)
|
|
)
|
|
# Strip ANSI escape codes
|
|
ansi_re = re.compile(r"\x1b\[[0-9;]*[mGKHF]")
|
|
return ansi_re.sub("", stdout)
|
|
|
|
|
|
class ClaudeTmuxScraper(RuntimeUsageProviderAdapter):
|
|
"""Scraper that sends ``/usage`` to an active Claude tmux session.
|
|
|
|
Requires:
|
|
- tmux is installed and accessible from the backend process.
|
|
- At least one tmux pane is running the ``claude`` CLI.
|
|
- For Docker installs: mount the host tmux socket into the container.
|
|
|
|
This adapter is fragile by design — CLI output format changes over time.
|
|
Treat results as supplemental hints, not accounting truth.
|
|
"""
|
|
|
|
provider: ClassVar[str] = "anthropic"
|
|
source_name: ClassVar[str] = "claude_cli_tmux"
|
|
freshness_ttl_seconds: ClassVar[int] = 300
|
|
|
|
def __init__(
|
|
self,
|
|
tmux_socket: str = "",
|
|
wait_seconds: float = _TMUX_WAIT_SECONDS,
|
|
) -> None:
|
|
self.tmux_socket = tmux_socket
|
|
self.wait_seconds = wait_seconds
|
|
|
|
def is_available(self) -> bool:
|
|
import shutil
|
|
return shutil.which("tmux") is not None
|
|
|
|
async def fetch_raw(self) -> str:
|
|
pane_id = await _find_claude_tmux_pane(self.tmux_socket)
|
|
if pane_id is None:
|
|
raise RuntimeError(
|
|
"No tmux pane found running 'claude'. "
|
|
"Start a Claude session before enabling the tmux scraper."
|
|
)
|
|
return await _tmux_send_and_capture(
|
|
pane_id,
|
|
tmux_socket=self.tmux_socket,
|
|
wait_seconds=self.wait_seconds,
|
|
)
|
|
|
|
def parse(self, raw: str) -> ParsedClaudeUsage:
|
|
return parse_claude_usage(raw)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Simple TTL cache
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class UsageScraperCache:
|
|
"""In-memory TTL cache for scrape results.
|
|
|
|
Optionally writes results to a cache directory so they survive
|
|
backend restarts (useful for slow-changing subscription data).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
ttl_seconds: int = 300,
|
|
cache_dir: str = "",
|
|
) -> None:
|
|
self.ttl_seconds = ttl_seconds
|
|
self._memory: dict[str, tuple[datetime, ScrapeResult]] = {}
|
|
self._cache_dir = Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "pipeline-usage-cache"
|
|
|
|
def _key(self, gateway_id: str, source_name: str) -> str:
|
|
return f"{gateway_id}:{source_name}"
|
|
|
|
def get(self, gateway_id: str, source_name: str) -> ScrapeResult | None:
|
|
key = self._key(gateway_id, source_name)
|
|
entry = self._memory.get(key)
|
|
if entry is None:
|
|
return None
|
|
cached_at, result = entry
|
|
age = (utcnow() - cached_at).total_seconds()
|
|
if age > self.ttl_seconds:
|
|
del self._memory[key]
|
|
return None
|
|
return result
|
|
|
|
def set(self, gateway_id: str, source_name: str, result: ScrapeResult) -> None:
|
|
key = self._key(gateway_id, source_name)
|
|
self._memory[key] = (utcnow(), result)
|
|
|
|
|
|
# Module-level singleton cache used by the endpoint
|
|
_cache = UsageScraperCache()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_adapters(
|
|
enabled_providers: list[str],
|
|
tmux_socket: str = "",
|
|
) -> list[RuntimeUsageProviderAdapter]:
|
|
"""Instantiate enabled adapters from a list of provider names."""
|
|
registry: dict[str, type[RuntimeUsageProviderAdapter]] = {
|
|
"claude_cli_tmux": ClaudeTmuxScraper,
|
|
}
|
|
adapters = []
|
|
for name in enabled_providers:
|
|
cls = registry.get(name.strip().lower())
|
|
if cls is None:
|
|
logger.warning("usage_scraper.unknown_provider name=%s", name)
|
|
continue
|
|
kwargs: dict = {}
|
|
if name == "claude_cli_tmux":
|
|
kwargs["tmux_socket"] = tmux_socket
|
|
adapter = cls(**kwargs)
|
|
if adapter.is_available():
|
|
adapters.append(adapter)
|
|
else:
|
|
logger.info("usage_scraper.not_available provider=%s", name)
|
|
return adapters
|
|
|
|
|
|
async def get_provider_usage(
|
|
gateway_id: str,
|
|
enabled_providers: list[str],
|
|
tmux_socket: str = "",
|
|
include_raw: bool = False,
|
|
) -> list[ScrapeResult]:
|
|
"""Run all enabled adapters for a gateway, using cache where fresh.
|
|
|
|
Returns one ScrapeResult per adapter (errors included, never raises).
|
|
"""
|
|
adapters = build_adapters(enabled_providers, tmux_socket=tmux_socket)
|
|
results: list[ScrapeResult] = []
|
|
for adapter in adapters:
|
|
cached = _cache.get(gateway_id, adapter.source_name)
|
|
if cached is not None:
|
|
results.append(cached)
|
|
continue
|
|
result = await adapter.scrape()
|
|
_cache.set(gateway_id, adapter.source_name, result)
|
|
if not include_raw and result.parsed:
|
|
result.parsed.raw_text = None # strip unless explicitly requested
|
|
results.append(result)
|
|
logger.info(
|
|
"usage_scraper.result gateway_id=%s provider=%s source=%s "
|
|
"pct=%s remaining_ms=%s error=%s",
|
|
gateway_id,
|
|
result.provider,
|
|
result.source_name,
|
|
result.parsed.current_pct,
|
|
result.parsed.remaining_ms,
|
|
result.error,
|
|
)
|
|
return results
|