Pipeline/backend/app/services/openclaw/usage_scrapers.py

839 lines
28 KiB
Python
Raw Normal View History

"""Provider-native usage scrapers for supplemental limit/percentage data.
Design principles:
- Opt-in only: all scrapers are disabled by default (USAGE_SCRAPER_ENABLED=false).
- Supplemental: scraper data enriches JSONL-based metrics; it is never the
primary source of truth for tokens or spend.
- Isolated: each provider adapter is independent and can fail without
affecting the rest of the system.
- Testable: parse_claude_usage() is a pure function with no side effects.
- Fragile by nature: CLI text output changes treat results as best-effort.
Tmux requirement for ClaudeTmuxScraper:
The backend process must have access to the host's tmux socket. For
Docker-based local installs, mount the socket:
-v /tmp/tmux-1000:/tmp/tmux-1000 (or whatever $TMUX uses)
For bare-metal runs, no extra config is needed.
"""
from __future__ import annotations
import asyncio
import re
import tempfile
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
2026-05-21 04:25:31 -05:00
from datetime import UTC, datetime
from pathlib import Path
from typing import ClassVar
from app.core.logging import get_logger
from app.core.time import utcnow
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Internal result dataclass (not a schema — stays inside the service layer)
# ---------------------------------------------------------------------------
@dataclass
class ParsedClaudeUsageWindow:
"""One parsed usage window from `claude /usage` text."""
key: str # current_session | weekly_all_models | weekly_sonnet | extra_usage
label: str
pct_used: float | None = None
remaining_ms: int | None = None
remaining_label: str | None = None
extra_text: str | None = None
source: str = "provider_native"
confidence: str = "high"
@dataclass
class ParsedClaudeUsage:
"""Structured result from parsing one block of ``claude /usage`` text."""
raw_text: str
windows: list[ParsedClaudeUsageWindow] = field(default_factory=list)
current_pct: float | None = None
remaining_ms: int | None = None
remaining_label: str | None = None
weekly_messages_used: int | None = None
weekly_messages_limit: int | None = None
weekly_tokens_used: int | None = None
weekly_cost_usd: float | None = None
error: str | None = None
# Source and confidence for the parsed data
source: str | None = None # e.g., "provider_native" or "provider_api_rate_limit"
confidence: str | None = None # e.g., "high" or "medium"
@dataclass
class ScrapeResult:
"""Complete result from one scraper invocation (parsed + metadata)."""
provider: str
source_name: str
scraped_at: datetime
fresh: bool
freshness_ttl_seconds: int
parsed: ParsedClaudeUsage
error: str | None = None
# ---------------------------------------------------------------------------
# Parser — pure function, no I/O
# ---------------------------------------------------------------------------
# Strip thousands-separator commas from a numeric string
def _strip_commas(s: str) -> str:
return s.replace(",", "")
def _parse_int(s: str) -> int:
return int(_strip_commas(s.strip()))
def _parse_float(s: str) -> float:
return float(_strip_commas(s.strip()))
# Matches "67%" or "67.5%" anywhere on a line, with optional surrounding words
_PCT_RE = re.compile(
r"(?:^|[\s(])(\d+(?:\.\d+)?)\s*%",
re.MULTILINE,
)
# Matches "resets in", "resets at", "remaining", "time remaining", "next reset"
# followed by the time expression
_RESET_LINE_RE = re.compile(
r"(?:resets?\s+in[:\s]*|remaining[:\s]*|time\s+remaining[:\s]*|next\s+reset[:\s]*in[:\s]*)"
r"(.+?)(?:\)|$)",
re.IGNORECASE | re.MULTILINE,
)
# Also match "in 2h 47m" or "(resets in 2h 47m)" embedded in longer lines
_INLINE_RESET_RE = re.compile(
r"\(\s*resets?\s+in\s+(.+?)\s*\)",
re.IGNORECASE,
)
# Time components: optional days, optional hours, optional minutes
_TIME_PARTS_RE = re.compile(
r"(?:(\d+)\s*d(?:ay)?s?)?" # days
r"\s*(?:(\d+)\s*h(?:our)?s?)?" # hours
r"\s*(?:(\d+)\s*m(?:in(?:ute)?s?)?)?" # minutes
r"|(<?)\s*1\s*m", # "< 1m" edge case
re.IGNORECASE,
)
# Weekly/daily usage lines
_WEEKLY_MSGS_RE = re.compile(
# "messages: 234 / 500" OR "234 messages" OR "Messages: 234"
r"(?:messages?[:\s]+(\d[\d,]*)(?:\s*/\s*(\d[\d,]*))?)"
r"|(?:(\d[\d,]*)\s+messages?)",
re.IGNORECASE,
)
_WEEKLY_INPUT_RE = re.compile(r"input\s+tokens?[:\s]+([\d,]+)", re.IGNORECASE)
_WEEKLY_OUTPUT_RE = re.compile(r"output\s+tokens?[:\s]+([\d,]+)", re.IGNORECASE)
_WEEKLY_TOKENS_RE = re.compile(r"(?:total\s+)?tokens?[:\s]+([\d,]+)", re.IGNORECASE)
_WEEKLY_COST_RE = re.compile(r"\$\s*([\d]+\.[\d]{1,4})", re.IGNORECASE)
_SECTION_HEADER_CLEANUP_RE = re.compile(r"^[\s\-\*\|\u2022\u2502\u2500\u250c\u2510\u2514\u2518]+")
_WINDOW_HEADER_PATTERNS: tuple[tuple[re.Pattern[str], str, str], ...] = (
(re.compile(r"^current\s+session\b", re.IGNORECASE), "current_session", "Current session"),
(re.compile(r"^usage\s+window\b", re.IGNORECASE), "current_session", "Current session"),
(re.compile(r"^(?:this\s+week|weekly(?:\s+usage)?)\b", re.IGNORECASE), "weekly_all_models", "All models"),
(re.compile(r"^(?:all\s+models|weekly\s+all\s+models)\b", re.IGNORECASE), "weekly_all_models", "All models"),
(re.compile(r"^(?:weekly\s+sonnet|sonnet)\b", re.IGNORECASE), "weekly_sonnet", "Sonnet"),
(re.compile(r"^extra\s+usage\b", re.IGNORECASE), "extra_usage", "Extra usage"),
)
def _parse_remaining_ms(time_str: str) -> tuple[int, str] | None:
"""Convert a time string like '2h 47m', '1 day 4h', '< 1m' to milliseconds.
Returns (ms, normalised_label) or None if nothing matches.
"""
s = time_str.strip()
if not s:
return None
# "< 1m" / "< 1 minute"
if re.match(r"<\s*1\s*m", s, re.IGNORECASE):
return 30_000, "< 1m" # represent as 30 seconds
m = _TIME_PARTS_RE.search(s)
if not m:
return None
days = int(m.group(1) or 0)
hours = int(m.group(2) or 0)
minutes = int(m.group(3) or 0)
total_ms = (days * 86400 + hours * 3600 + minutes * 60) * 1000
if total_ms == 0:
return None
# Build a compact human label
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
label = " ".join(parts) if parts else s
return total_ms, label
def _extract_pct(text: str) -> float | None:
for match in _PCT_RE.finditer(text):
pct = float(match.group(1))
if 0.0 <= pct <= 100.0:
return pct
return None
def _extract_remaining(text: str) -> tuple[int, str] | None:
inline = _INLINE_RESET_RE.search(text)
if inline:
parsed = _parse_remaining_ms(inline.group(1))
if parsed:
return parsed
line_match = _RESET_LINE_RE.search(text)
if line_match:
return _parse_remaining_ms(line_match.group(1).strip())
return None
def _normalized_header(line: str) -> str:
return _SECTION_HEADER_CLEANUP_RE.sub("", line).strip()
def _detect_window_key(line: str) -> tuple[str, str] | None:
normalized = _normalized_header(line)
if not normalized:
return None
for pattern, key, label in _WINDOW_HEADER_PATTERNS:
if pattern.search(normalized):
return key, label
return None
def _split_usage_sections(raw: str) -> list[tuple[str, str, str]]:
sections: list[tuple[str, str, str]] = []
current_key: str | None = None
current_label: str | None = None
current_lines: list[str] = []
for line in raw.splitlines():
detected = _detect_window_key(line)
if detected is not None:
if current_key and current_lines:
sections.append((current_key, current_label or current_key, "\n".join(current_lines)))
current_key, current_label = detected
current_lines = [line]
continue
if current_key:
current_lines.append(line)
if current_key and current_lines:
sections.append((current_key, current_label or current_key, "\n".join(current_lines)))
return sections
def _parse_weekly_stats(result: ParsedClaudeUsage, section_text: str, raw: str) -> None:
msgs_m = _WEEKLY_MSGS_RE.search(section_text)
if msgs_m:
try:
used_str = msgs_m.group(1) or msgs_m.group(3)
if used_str:
result.weekly_messages_used = _parse_int(used_str)
if msgs_m.group(2):
result.weekly_messages_limit = _parse_int(msgs_m.group(2))
except (ValueError, TypeError):
pass
input_m = _WEEKLY_INPUT_RE.search(section_text)
output_m = _WEEKLY_OUTPUT_RE.search(section_text)
if input_m and output_m:
try:
result.weekly_tokens_used = (
_parse_int(input_m.group(1)) + _parse_int(output_m.group(1))
)
except (ValueError, TypeError):
pass
elif result.weekly_tokens_used is None:
tok_m = _WEEKLY_TOKENS_RE.search(section_text)
if tok_m:
try:
result.weekly_tokens_used = _parse_int(tok_m.group(1))
except (ValueError, TypeError):
pass
if result.weekly_tokens_used is None:
alt_tok = re.search(r"weekly\s+tokens?[:\s]+([\d,]+)", raw, re.IGNORECASE)
if alt_tok:
try:
result.weekly_tokens_used = _parse_int(alt_tok.group(1))
except (ValueError, TypeError):
pass
cost_m = _WEEKLY_COST_RE.search(section_text)
if cost_m:
try:
result.weekly_cost_usd = _parse_float(cost_m.group(1))
except (ValueError, TypeError):
pass
def parse_claude_usage(raw: str) -> ParsedClaudeUsage:
"""Parse a block of text from ``claude /usage`` into structured fields.
This is a pure function no I/O, no side effects. It never raises;
on failure it sets ``error`` and leaves numeric fields as None.
"""
result = ParsedClaudeUsage(raw_text=raw)
if not raw or not raw.strip():
result.error = "empty output"
return result
sections = _split_usage_sections(raw)
seen_keys: set[str] = set()
for key, label, text in sections:
if key in seen_keys and key != "extra_usage":
continue
pct = _extract_pct(text)
remaining = _extract_remaining(text)
remaining_ms = remaining[0] if remaining else None
remaining_label = remaining[1] if remaining else None
cleaned_text = text.strip()
extra_text = cleaned_text[:240] if cleaned_text else None
has_metrics = pct is not None or remaining_ms is not None
if not has_metrics and key != "extra_usage":
continue
if not has_metrics and not extra_text:
continue
result.windows.append(
ParsedClaudeUsageWindow(
key=key,
label=label,
pct_used=pct,
remaining_ms=remaining_ms,
remaining_label=remaining_label,
extra_text=extra_text,
),
)
seen_keys.add(key)
# Back-compat flattened fields still exist for existing clients.
current_window = next((w for w in result.windows if w.key == "current_session"), None)
fallback_window = next(
(w for w in result.windows if w.pct_used is not None or w.remaining_ms is not None),
None,
)
primary_window = current_window or fallback_window
if primary_window:
result.current_pct = primary_window.pct_used
result.remaining_ms = primary_window.remaining_ms
result.remaining_label = primary_window.remaining_label
else:
result.current_pct = _extract_pct(raw)
fallback_remaining = _extract_remaining(raw)
if fallback_remaining:
result.remaining_ms, result.remaining_label = fallback_remaining
weekly_section_text = raw
for key, _, text in sections:
if key == "weekly_all_models":
weekly_section_text = text
break
else:
week_header = re.search(
r"(?:this\s+week|weekly|week\b)[^\n]*\n(.+?)(?=\n\s*\n|\Z)",
raw,
re.IGNORECASE | re.DOTALL,
)
if week_header:
weekly_section_text = week_header.group(0)
_parse_weekly_stats(result, weekly_section_text, raw)
# ---- validation --------------------------------------------------------
all_none = (
len(result.windows) == 0
and result.current_pct is None
and result.remaining_ms is None
and result.weekly_messages_used is None
and result.weekly_tokens_used is None
)
if all_none:
result.error = "no parseable usage data found"
compatibility_none = (
result.current_pct is None
and result.remaining_ms is None
and result.weekly_messages_used is None
and result.weekly_tokens_used is None
)
if compatibility_none and result.error is None:
result.error = "no parseable usage data found"
return result
# ---------------------------------------------------------------------------
# Adapter interface
# ---------------------------------------------------------------------------
class RuntimeUsageProviderAdapter(ABC):
"""Abstract base for provider-native usage scrapers."""
provider: ClassVar[str]
source_name: ClassVar[str]
freshness_ttl_seconds: ClassVar[int] = 300
@abstractmethod
async def fetch_raw(self) -> str:
"""Return the raw text output from the provider's usage source."""
@abstractmethod
def parse(self, raw: str) -> ParsedClaudeUsage:
"""Parse raw text into structured fields."""
def is_available(self) -> bool:
"""Return True if the prerequisites for this adapter exist on this host."""
return True
async def scrape(self) -> ScrapeResult:
"""Run fetch_raw + parse, returning a ScrapeResult regardless of errors."""
now = utcnow()
try:
raw = await self.fetch_raw()
parsed = self.parse(raw)
return ScrapeResult(
provider=self.provider,
source_name=self.source_name,
scraped_at=now,
fresh=True,
freshness_ttl_seconds=self.freshness_ttl_seconds,
parsed=parsed,
)
except Exception as exc:
logger.warning(
"usage_scraper.fetch_failed provider=%s source=%s error=%s",
self.provider, self.source_name, exc,
)
return ScrapeResult(
provider=self.provider,
source_name=self.source_name,
scraped_at=now,
fresh=False,
freshness_ttl_seconds=self.freshness_ttl_seconds,
parsed=ParsedClaudeUsage(raw_text="", error=str(exc)),
error=str(exc),
)
# ---------------------------------------------------------------------------
# Claude CLI tmux adapter
# ---------------------------------------------------------------------------
_TMUX_WAIT_SECONDS = 2.0 # seconds to wait after sending /usage
_TMUX_CAPTURE_LINES = 80 # lines to capture from the tmux pane
async def _run(
*args: str,
timeout: float = 5.0,
) -> tuple[str, str]:
"""Run a subprocess and return (stdout, stderr)."""
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace")
async def _find_claude_tmux_pane(tmux_socket: str = "") -> str | None:
"""Find the first tmux pane running ``claude``; return pane id or None."""
base = ["tmux"]
if tmux_socket:
base += ["-S", tmux_socket]
try:
stdout, _ = await _run(
*base,
"list-panes", "-a",
"-F", "#{pane_id}:#{pane_current_command}",
)
except (FileNotFoundError, asyncio.TimeoutError, OSError):
return None
for line in stdout.splitlines():
parts = line.strip().split(":", 1)
if len(parts) == 2:
pane_id, command = parts
if "claude" in command.lower():
return pane_id
return None
async def _tmux_send_and_capture(
pane_id: str,
command: str = "/usage",
tmux_socket: str = "",
wait_seconds: float = _TMUX_WAIT_SECONDS,
) -> str:
"""Send a command to a tmux pane and return the captured output."""
base = ["tmux"]
if tmux_socket:
base += ["-S", tmux_socket]
# Clear the pane buffer so we capture fresh output
await _run(*base, "clear-history", "-t", pane_id)
# Send the command
await _run(*base, "send-keys", "-t", pane_id, command, "Enter")
# Wait for the response to render
await asyncio.sleep(wait_seconds)
# Capture the pane contents
stdout, _ = await _run(
*base,
"capture-pane", "-pt", pane_id,
"-J", # join wrapped lines
"-e", # include escape sequences (ignored in text mode)
)
# Strip ANSI escape codes
ansi_re = re.compile(r"\x1b\[[0-9;]*[mGKHF]")
return ansi_re.sub("", stdout)
class ClaudeTmuxScraper(RuntimeUsageProviderAdapter):
"""Scraper that sends ``/usage`` to an active Claude tmux session.
Requires:
- tmux is installed and accessible from the backend process.
- At least one tmux pane is running the ``claude`` CLI.
- For Docker installs: mount the host tmux socket into the container.
This adapter is fragile by design CLI output format changes over time.
Treat results as supplemental hints, not accounting truth.
"""
provider: ClassVar[str] = "anthropic"
source_name: ClassVar[str] = "claude_cli_tmux"
freshness_ttl_seconds: ClassVar[int] = 300
def __init__(
self,
tmux_socket: str = "",
wait_seconds: float = _TMUX_WAIT_SECONDS,
) -> None:
self.tmux_socket = tmux_socket
self.wait_seconds = wait_seconds
def is_available(self) -> bool:
import shutil
return shutil.which("tmux") is not None
async def fetch_raw(self) -> str:
pane_id = await _find_claude_tmux_pane(self.tmux_socket)
if pane_id is None:
raise RuntimeError(
"No tmux pane found running 'claude'. "
"Start a Claude session before enabling the tmux scraper."
)
return await _tmux_send_and_capture(
pane_id,
tmux_socket=self.tmux_socket,
wait_seconds=self.wait_seconds,
)
def parse(self, raw: str) -> ParsedClaudeUsage:
result = parse_claude_usage(raw)
# Tag the parsed result with source and confidence
result.source = "provider_native"
result.confidence = "high"
return result
# ---------------------------------------------------------------------------
# Simple TTL cache
# ---------------------------------------------------------------------------
class UsageScraperCache:
"""In-memory TTL cache for scrape results.
Optionally writes results to a cache directory so they survive
backend restarts (useful for slow-changing subscription data).
"""
def __init__(
self,
ttl_seconds: int = 300,
cache_dir: str = "",
) -> None:
self.ttl_seconds = ttl_seconds
self._memory: dict[str, tuple[datetime, ScrapeResult]] = {}
self._cache_dir = Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "pipeline-usage-cache"
def _key(self, gateway_id: str, source_name: str) -> str:
return f"{gateway_id}:{source_name}"
def get(self, gateway_id: str, source_name: str) -> ScrapeResult | None:
key = self._key(gateway_id, source_name)
entry = self._memory.get(key)
if entry is None:
return None
cached_at, result = entry
age = (utcnow() - cached_at).total_seconds()
if age > self.ttl_seconds:
del self._memory[key]
return None
return result
def set(self, gateway_id: str, source_name: str, result: ScrapeResult) -> None:
key = self._key(gateway_id, source_name)
self._memory[key] = (utcnow(), result)
# Module-level singleton cache used by the endpoint
_cache = UsageScraperCache()
2026-05-21 04:25:31 -05:00
# ---------------------------------------------------------------------------
# Claude Code status-line ingestion
# ---------------------------------------------------------------------------
_STATUSLINE_SOURCE_NAME = "claude_code_statusline"
_STATUSLINE_PROVIDER = "anthropic"
_STATUSLINE_WINDOW_MAP: dict[str, tuple[str, str]] = {
"five_hour": ("current_session", "Current session"),
"seven_day": ("weekly_all_models", "All models"),
}
def _as_number(value: object) -> float | None:
if isinstance(value, bool):
return None
if isinstance(value, int | float):
return float(value)
if isinstance(value, str):
try:
return float(value.strip())
except ValueError:
return None
return None
def _parse_resets_at_ms(value: object, *, now: datetime) -> tuple[int | None, str | None]:
"""Parse Claude Code status-line ``resets_at`` into remaining milliseconds."""
if value is None:
return None, None
reset_dt: datetime | None = None
numeric = _as_number(value)
if numeric is not None:
# Claude documents Unix epoch seconds. Be defensive if a future build
# ever sends milliseconds.
if numeric > 10_000_000_000:
numeric = numeric / 1000
reset_dt = datetime.fromtimestamp(numeric, tz=UTC)
elif isinstance(value, str):
text = value.strip()
if text:
try:
reset_dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
except ValueError:
reset_dt = None
if reset_dt is None:
return None, None
if reset_dt.tzinfo is None:
reset_dt = reset_dt.replace(tzinfo=UTC)
remaining_ms = max(0, int((reset_dt - now).total_seconds() * 1000))
return remaining_ms, _format_remaining_ms(remaining_ms)
def _format_remaining_ms(ms: int) -> str:
seconds = max(0, int(round(ms / 1000)))
days, remainder = divmod(seconds, 86_400)
hours, remainder = divmod(remainder, 3_600)
minutes = remainder // 60
parts: list[str] = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
if not parts:
return "< 1m" if seconds > 0 else "now"
return " ".join(parts[:2])
def parse_claude_statusline_usage(
payload: dict[str, object],
*,
now: datetime | None = None,
) -> ParsedClaudeUsage:
"""Parse Claude Code status-line JSON into provider-native usage windows.
Official Claude Code status-line input exposes ``rate_limits.five_hour`` and
``rate_limits.seven_day`` for Claude.ai subscribers. This is more stable
than screen-scraping ``/usage`` and should be preferred when available.
"""
checked_at = now or utcnow()
if checked_at.tzinfo is None:
checked_at = checked_at.replace(tzinfo=UTC)
result = ParsedClaudeUsage(raw_text="")
result.source = "provider_native"
result.confidence = "high"
rate_limits = payload.get("rate_limits")
if not isinstance(rate_limits, dict):
result.error = "status-line payload did not include rate_limits"
return result
for upstream_key, upstream_value in rate_limits.items():
if not isinstance(upstream_value, dict):
continue
key, label = _STATUSLINE_WINDOW_MAP.get(
str(upstream_key),
(str(upstream_key), str(upstream_key).replace("_", " ").title()),
)
pct = _as_number(upstream_value.get("used_percentage"))
if pct is not None:
pct = max(0.0, min(100.0, pct))
remaining_ms, remaining_label = _parse_resets_at_ms(
upstream_value.get("resets_at"),
now=checked_at,
)
if pct is None and remaining_ms is None:
continue
result.windows.append(
ParsedClaudeUsageWindow(
key=key,
label=label,
pct_used=pct,
remaining_ms=remaining_ms,
remaining_label=remaining_label,
source="provider_native",
confidence="high",
),
)
current_window = next((w for w in result.windows if w.key == "current_session"), None)
primary_window = current_window or next(iter(result.windows), None)
if primary_window:
result.current_pct = primary_window.pct_used
result.remaining_ms = primary_window.remaining_ms
result.remaining_label = primary_window.remaining_label
else:
result.error = "status-line payload had no parseable rate_limits windows"
return result
def store_claude_statusline_usage(
gateway_id: str,
payload: dict[str, object],
) -> ScrapeResult:
"""Store the latest Claude Code status-line usage snapshot for a gateway."""
now = utcnow()
parsed = parse_claude_statusline_usage(payload, now=now)
result = ScrapeResult(
provider=_STATUSLINE_PROVIDER,
source_name=_STATUSLINE_SOURCE_NAME,
scraped_at=now,
fresh=parsed.error is None,
freshness_ttl_seconds=_cache.ttl_seconds,
parsed=parsed,
error=parsed.error,
)
_cache.set(gateway_id, _STATUSLINE_SOURCE_NAME, result)
logger.info(
"usage_statusline.ingested gateway_id=%s windows=%s error=%s",
gateway_id,
len(parsed.windows),
parsed.error,
)
return result
def get_cached_claude_statusline_usage(gateway_id: str) -> ScrapeResult | None:
"""Return the latest fresh Claude Code status-line snapshot, if any."""
return _cache.get(gateway_id, _STATUSLINE_SOURCE_NAME)
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def build_adapters(
enabled_providers: list[str],
tmux_socket: str = "",
) -> list[RuntimeUsageProviderAdapter]:
"""Instantiate enabled adapters from a list of provider names."""
registry: dict[str, type[RuntimeUsageProviderAdapter]] = {
"claude_cli_tmux": ClaudeTmuxScraper,
}
adapters = []
for name in enabled_providers:
cls = registry.get(name.strip().lower())
if cls is None:
logger.warning("usage_scraper.unknown_provider name=%s", name)
continue
kwargs: dict = {}
if name == "claude_cli_tmux":
kwargs["tmux_socket"] = tmux_socket
adapter = cls(**kwargs)
if adapter.is_available():
adapters.append(adapter)
else:
logger.info("usage_scraper.not_available provider=%s", name)
return adapters
async def get_provider_usage(
gateway_id: str,
enabled_providers: list[str],
tmux_socket: str = "",
include_raw: bool = False,
) -> list[ScrapeResult]:
"""Run all enabled adapters for a gateway, using cache where fresh.
Returns one ScrapeResult per adapter (errors included, never raises).
"""
adapters = build_adapters(enabled_providers, tmux_socket=tmux_socket)
results: list[ScrapeResult] = []
for adapter in adapters:
cached = _cache.get(gateway_id, adapter.source_name)
if cached is not None:
results.append(cached)
continue
result = await adapter.scrape()
_cache.set(gateway_id, adapter.source_name, result)
if not include_raw and result.parsed:
result.parsed.raw_text = None # strip unless explicitly requested
results.append(result)
logger.info(
"usage_scraper.result gateway_id=%s provider=%s source=%s "
"pct=%s remaining_ms=%s error=%s",
gateway_id,
result.provider,
result.source_name,
result.parsed.current_pct,
result.parsed.remaining_ms,
result.error,
)
return results