Pipeline/backend/app/services/openclaw/usage_scrapers.py

839 lines
28 KiB
Python

"""Provider-native usage scrapers for supplemental limit/percentage data.
Design principles:
- Opt-in only: all scrapers are disabled by default (USAGE_SCRAPER_ENABLED=false).
- Supplemental: scraper data enriches JSONL-based metrics; it is never the
primary source of truth for tokens or spend.
- Isolated: each provider adapter is independent and can fail without
affecting the rest of the system.
- Testable: parse_claude_usage() is a pure function with no side effects.
- Fragile by nature: CLI text output changes — treat results as best-effort.
Tmux requirement for ClaudeTmuxScraper:
The backend process must have access to the host's tmux socket. For
Docker-based local installs, mount the socket:
-v /tmp/tmux-1000:/tmp/tmux-1000 (or whatever $TMUX uses)
For bare-metal runs, no extra config is needed.
"""
from __future__ import annotations
import asyncio
import re
import tempfile
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import ClassVar
from app.core.logging import get_logger
from app.core.time import utcnow
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Internal result dataclass (not a schema — stays inside the service layer)
# ---------------------------------------------------------------------------
@dataclass
class ParsedClaudeUsageWindow:
"""One parsed usage window from `claude /usage` text."""
key: str # current_session | weekly_all_models | weekly_sonnet | extra_usage
label: str
pct_used: float | None = None
remaining_ms: int | None = None
remaining_label: str | None = None
extra_text: str | None = None
source: str = "provider_native"
confidence: str = "high"
@dataclass
class ParsedClaudeUsage:
"""Structured result from parsing one block of ``claude /usage`` text."""
raw_text: str
windows: list[ParsedClaudeUsageWindow] = field(default_factory=list)
current_pct: float | None = None
remaining_ms: int | None = None
remaining_label: str | None = None
weekly_messages_used: int | None = None
weekly_messages_limit: int | None = None
weekly_tokens_used: int | None = None
weekly_cost_usd: float | None = None
error: str | None = None
# Source and confidence for the parsed data
source: str | None = None # e.g., "provider_native" or "provider_api_rate_limit"
confidence: str | None = None # e.g., "high" or "medium"
@dataclass
class ScrapeResult:
"""Complete result from one scraper invocation (parsed + metadata)."""
provider: str
source_name: str
scraped_at: datetime
fresh: bool
freshness_ttl_seconds: int
parsed: ParsedClaudeUsage
error: str | None = None
# ---------------------------------------------------------------------------
# Parser — pure function, no I/O
# ---------------------------------------------------------------------------
# Strip thousands-separator commas from a numeric string
def _strip_commas(s: str) -> str:
return s.replace(",", "")
def _parse_int(s: str) -> int:
return int(_strip_commas(s.strip()))
def _parse_float(s: str) -> float:
return float(_strip_commas(s.strip()))
# Matches "67%" or "67.5%" anywhere on a line, with optional surrounding words
_PCT_RE = re.compile(
r"(?:^|[\s(])(\d+(?:\.\d+)?)\s*%",
re.MULTILINE,
)
# Matches "resets in", "resets at", "remaining", "time remaining", "next reset"
# followed by the time expression
_RESET_LINE_RE = re.compile(
r"(?:resets?\s+in[:\s]*|remaining[:\s]*|time\s+remaining[:\s]*|next\s+reset[:\s]*in[:\s]*)"
r"(.+?)(?:\)|$)",
re.IGNORECASE | re.MULTILINE,
)
# Also match "in 2h 47m" or "(resets in 2h 47m)" embedded in longer lines
_INLINE_RESET_RE = re.compile(
r"\(\s*resets?\s+in\s+(.+?)\s*\)",
re.IGNORECASE,
)
# Time components: optional days, optional hours, optional minutes
_TIME_PARTS_RE = re.compile(
r"(?:(\d+)\s*d(?:ay)?s?)?" # days
r"\s*(?:(\d+)\s*h(?:our)?s?)?" # hours
r"\s*(?:(\d+)\s*m(?:in(?:ute)?s?)?)?" # minutes
r"|(<?)\s*1\s*m", # "< 1m" edge case
re.IGNORECASE,
)
# Weekly/daily usage lines
_WEEKLY_MSGS_RE = re.compile(
# "messages: 234 / 500" OR "234 messages" OR "Messages: 234"
r"(?:messages?[:\s]+(\d[\d,]*)(?:\s*/\s*(\d[\d,]*))?)"
r"|(?:(\d[\d,]*)\s+messages?)",
re.IGNORECASE,
)
_WEEKLY_INPUT_RE = re.compile(r"input\s+tokens?[:\s]+([\d,]+)", re.IGNORECASE)
_WEEKLY_OUTPUT_RE = re.compile(r"output\s+tokens?[:\s]+([\d,]+)", re.IGNORECASE)
_WEEKLY_TOKENS_RE = re.compile(r"(?:total\s+)?tokens?[:\s]+([\d,]+)", re.IGNORECASE)
_WEEKLY_COST_RE = re.compile(r"\$\s*([\d]+\.[\d]{1,4})", re.IGNORECASE)
_SECTION_HEADER_CLEANUP_RE = re.compile(r"^[\s\-\*\|\u2022\u2502\u2500\u250c\u2510\u2514\u2518]+")
_WINDOW_HEADER_PATTERNS: tuple[tuple[re.Pattern[str], str, str], ...] = (
(re.compile(r"^current\s+session\b", re.IGNORECASE), "current_session", "Current session"),
(re.compile(r"^usage\s+window\b", re.IGNORECASE), "current_session", "Current session"),
(re.compile(r"^(?:this\s+week|weekly(?:\s+usage)?)\b", re.IGNORECASE), "weekly_all_models", "All models"),
(re.compile(r"^(?:all\s+models|weekly\s+all\s+models)\b", re.IGNORECASE), "weekly_all_models", "All models"),
(re.compile(r"^(?:weekly\s+sonnet|sonnet)\b", re.IGNORECASE), "weekly_sonnet", "Sonnet"),
(re.compile(r"^extra\s+usage\b", re.IGNORECASE), "extra_usage", "Extra usage"),
)
def _parse_remaining_ms(time_str: str) -> tuple[int, str] | None:
"""Convert a time string like '2h 47m', '1 day 4h', '< 1m' to milliseconds.
Returns (ms, normalised_label) or None if nothing matches.
"""
s = time_str.strip()
if not s:
return None
# "< 1m" / "< 1 minute"
if re.match(r"<\s*1\s*m", s, re.IGNORECASE):
return 30_000, "< 1m" # represent as 30 seconds
m = _TIME_PARTS_RE.search(s)
if not m:
return None
days = int(m.group(1) or 0)
hours = int(m.group(2) or 0)
minutes = int(m.group(3) or 0)
total_ms = (days * 86400 + hours * 3600 + minutes * 60) * 1000
if total_ms == 0:
return None
# Build a compact human label
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
label = " ".join(parts) if parts else s
return total_ms, label
def _extract_pct(text: str) -> float | None:
for match in _PCT_RE.finditer(text):
pct = float(match.group(1))
if 0.0 <= pct <= 100.0:
return pct
return None
def _extract_remaining(text: str) -> tuple[int, str] | None:
inline = _INLINE_RESET_RE.search(text)
if inline:
parsed = _parse_remaining_ms(inline.group(1))
if parsed:
return parsed
line_match = _RESET_LINE_RE.search(text)
if line_match:
return _parse_remaining_ms(line_match.group(1).strip())
return None
def _normalized_header(line: str) -> str:
return _SECTION_HEADER_CLEANUP_RE.sub("", line).strip()
def _detect_window_key(line: str) -> tuple[str, str] | None:
normalized = _normalized_header(line)
if not normalized:
return None
for pattern, key, label in _WINDOW_HEADER_PATTERNS:
if pattern.search(normalized):
return key, label
return None
def _split_usage_sections(raw: str) -> list[tuple[str, str, str]]:
sections: list[tuple[str, str, str]] = []
current_key: str | None = None
current_label: str | None = None
current_lines: list[str] = []
for line in raw.splitlines():
detected = _detect_window_key(line)
if detected is not None:
if current_key and current_lines:
sections.append((current_key, current_label or current_key, "\n".join(current_lines)))
current_key, current_label = detected
current_lines = [line]
continue
if current_key:
current_lines.append(line)
if current_key and current_lines:
sections.append((current_key, current_label or current_key, "\n".join(current_lines)))
return sections
def _parse_weekly_stats(result: ParsedClaudeUsage, section_text: str, raw: str) -> None:
msgs_m = _WEEKLY_MSGS_RE.search(section_text)
if msgs_m:
try:
used_str = msgs_m.group(1) or msgs_m.group(3)
if used_str:
result.weekly_messages_used = _parse_int(used_str)
if msgs_m.group(2):
result.weekly_messages_limit = _parse_int(msgs_m.group(2))
except (ValueError, TypeError):
pass
input_m = _WEEKLY_INPUT_RE.search(section_text)
output_m = _WEEKLY_OUTPUT_RE.search(section_text)
if input_m and output_m:
try:
result.weekly_tokens_used = (
_parse_int(input_m.group(1)) + _parse_int(output_m.group(1))
)
except (ValueError, TypeError):
pass
elif result.weekly_tokens_used is None:
tok_m = _WEEKLY_TOKENS_RE.search(section_text)
if tok_m:
try:
result.weekly_tokens_used = _parse_int(tok_m.group(1))
except (ValueError, TypeError):
pass
if result.weekly_tokens_used is None:
alt_tok = re.search(r"weekly\s+tokens?[:\s]+([\d,]+)", raw, re.IGNORECASE)
if alt_tok:
try:
result.weekly_tokens_used = _parse_int(alt_tok.group(1))
except (ValueError, TypeError):
pass
cost_m = _WEEKLY_COST_RE.search(section_text)
if cost_m:
try:
result.weekly_cost_usd = _parse_float(cost_m.group(1))
except (ValueError, TypeError):
pass
def parse_claude_usage(raw: str) -> ParsedClaudeUsage:
"""Parse a block of text from ``claude /usage`` into structured fields.
This is a pure function — no I/O, no side effects. It never raises;
on failure it sets ``error`` and leaves numeric fields as None.
"""
result = ParsedClaudeUsage(raw_text=raw)
if not raw or not raw.strip():
result.error = "empty output"
return result
sections = _split_usage_sections(raw)
seen_keys: set[str] = set()
for key, label, text in sections:
if key in seen_keys and key != "extra_usage":
continue
pct = _extract_pct(text)
remaining = _extract_remaining(text)
remaining_ms = remaining[0] if remaining else None
remaining_label = remaining[1] if remaining else None
cleaned_text = text.strip()
extra_text = cleaned_text[:240] if cleaned_text else None
has_metrics = pct is not None or remaining_ms is not None
if not has_metrics and key != "extra_usage":
continue
if not has_metrics and not extra_text:
continue
result.windows.append(
ParsedClaudeUsageWindow(
key=key,
label=label,
pct_used=pct,
remaining_ms=remaining_ms,
remaining_label=remaining_label,
extra_text=extra_text,
),
)
seen_keys.add(key)
# Back-compat flattened fields still exist for existing clients.
current_window = next((w for w in result.windows if w.key == "current_session"), None)
fallback_window = next(
(w for w in result.windows if w.pct_used is not None or w.remaining_ms is not None),
None,
)
primary_window = current_window or fallback_window
if primary_window:
result.current_pct = primary_window.pct_used
result.remaining_ms = primary_window.remaining_ms
result.remaining_label = primary_window.remaining_label
else:
result.current_pct = _extract_pct(raw)
fallback_remaining = _extract_remaining(raw)
if fallback_remaining:
result.remaining_ms, result.remaining_label = fallback_remaining
weekly_section_text = raw
for key, _, text in sections:
if key == "weekly_all_models":
weekly_section_text = text
break
else:
week_header = re.search(
r"(?:this\s+week|weekly|week\b)[^\n]*\n(.+?)(?=\n\s*\n|\Z)",
raw,
re.IGNORECASE | re.DOTALL,
)
if week_header:
weekly_section_text = week_header.group(0)
_parse_weekly_stats(result, weekly_section_text, raw)
# ---- validation --------------------------------------------------------
all_none = (
len(result.windows) == 0
and result.current_pct is None
and result.remaining_ms is None
and result.weekly_messages_used is None
and result.weekly_tokens_used is None
)
if all_none:
result.error = "no parseable usage data found"
compatibility_none = (
result.current_pct is None
and result.remaining_ms is None
and result.weekly_messages_used is None
and result.weekly_tokens_used is None
)
if compatibility_none and result.error is None:
result.error = "no parseable usage data found"
return result
# ---------------------------------------------------------------------------
# Adapter interface
# ---------------------------------------------------------------------------
class RuntimeUsageProviderAdapter(ABC):
"""Abstract base for provider-native usage scrapers."""
provider: ClassVar[str]
source_name: ClassVar[str]
freshness_ttl_seconds: ClassVar[int] = 300
@abstractmethod
async def fetch_raw(self) -> str:
"""Return the raw text output from the provider's usage source."""
@abstractmethod
def parse(self, raw: str) -> ParsedClaudeUsage:
"""Parse raw text into structured fields."""
def is_available(self) -> bool:
"""Return True if the prerequisites for this adapter exist on this host."""
return True
async def scrape(self) -> ScrapeResult:
"""Run fetch_raw + parse, returning a ScrapeResult regardless of errors."""
now = utcnow()
try:
raw = await self.fetch_raw()
parsed = self.parse(raw)
return ScrapeResult(
provider=self.provider,
source_name=self.source_name,
scraped_at=now,
fresh=True,
freshness_ttl_seconds=self.freshness_ttl_seconds,
parsed=parsed,
)
except Exception as exc:
logger.warning(
"usage_scraper.fetch_failed provider=%s source=%s error=%s",
self.provider, self.source_name, exc,
)
return ScrapeResult(
provider=self.provider,
source_name=self.source_name,
scraped_at=now,
fresh=False,
freshness_ttl_seconds=self.freshness_ttl_seconds,
parsed=ParsedClaudeUsage(raw_text="", error=str(exc)),
error=str(exc),
)
# ---------------------------------------------------------------------------
# Claude CLI tmux adapter
# ---------------------------------------------------------------------------
_TMUX_WAIT_SECONDS = 2.0 # seconds to wait after sending /usage
_TMUX_CAPTURE_LINES = 80 # lines to capture from the tmux pane
async def _run(
*args: str,
timeout: float = 5.0,
) -> tuple[str, str]:
"""Run a subprocess and return (stdout, stderr)."""
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace")
async def _find_claude_tmux_pane(tmux_socket: str = "") -> str | None:
"""Find the first tmux pane running ``claude``; return pane id or None."""
base = ["tmux"]
if tmux_socket:
base += ["-S", tmux_socket]
try:
stdout, _ = await _run(
*base,
"list-panes", "-a",
"-F", "#{pane_id}:#{pane_current_command}",
)
except (FileNotFoundError, asyncio.TimeoutError, OSError):
return None
for line in stdout.splitlines():
parts = line.strip().split(":", 1)
if len(parts) == 2:
pane_id, command = parts
if "claude" in command.lower():
return pane_id
return None
async def _tmux_send_and_capture(
pane_id: str,
command: str = "/usage",
tmux_socket: str = "",
wait_seconds: float = _TMUX_WAIT_SECONDS,
) -> str:
"""Send a command to a tmux pane and return the captured output."""
base = ["tmux"]
if tmux_socket:
base += ["-S", tmux_socket]
# Clear the pane buffer so we capture fresh output
await _run(*base, "clear-history", "-t", pane_id)
# Send the command
await _run(*base, "send-keys", "-t", pane_id, command, "Enter")
# Wait for the response to render
await asyncio.sleep(wait_seconds)
# Capture the pane contents
stdout, _ = await _run(
*base,
"capture-pane", "-pt", pane_id,
"-J", # join wrapped lines
"-e", # include escape sequences (ignored in text mode)
)
# Strip ANSI escape codes
ansi_re = re.compile(r"\x1b\[[0-9;]*[mGKHF]")
return ansi_re.sub("", stdout)
class ClaudeTmuxScraper(RuntimeUsageProviderAdapter):
"""Scraper that sends ``/usage`` to an active Claude tmux session.
Requires:
- tmux is installed and accessible from the backend process.
- At least one tmux pane is running the ``claude`` CLI.
- For Docker installs: mount the host tmux socket into the container.
This adapter is fragile by design — CLI output format changes over time.
Treat results as supplemental hints, not accounting truth.
"""
provider: ClassVar[str] = "anthropic"
source_name: ClassVar[str] = "claude_cli_tmux"
freshness_ttl_seconds: ClassVar[int] = 300
def __init__(
self,
tmux_socket: str = "",
wait_seconds: float = _TMUX_WAIT_SECONDS,
) -> None:
self.tmux_socket = tmux_socket
self.wait_seconds = wait_seconds
def is_available(self) -> bool:
import shutil
return shutil.which("tmux") is not None
async def fetch_raw(self) -> str:
pane_id = await _find_claude_tmux_pane(self.tmux_socket)
if pane_id is None:
raise RuntimeError(
"No tmux pane found running 'claude'. "
"Start a Claude session before enabling the tmux scraper."
)
return await _tmux_send_and_capture(
pane_id,
tmux_socket=self.tmux_socket,
wait_seconds=self.wait_seconds,
)
def parse(self, raw: str) -> ParsedClaudeUsage:
result = parse_claude_usage(raw)
# Tag the parsed result with source and confidence
result.source = "provider_native"
result.confidence = "high"
return result
# ---------------------------------------------------------------------------
# Simple TTL cache
# ---------------------------------------------------------------------------
class UsageScraperCache:
"""In-memory TTL cache for scrape results.
Optionally writes results to a cache directory so they survive
backend restarts (useful for slow-changing subscription data).
"""
def __init__(
self,
ttl_seconds: int = 300,
cache_dir: str = "",
) -> None:
self.ttl_seconds = ttl_seconds
self._memory: dict[str, tuple[datetime, ScrapeResult]] = {}
self._cache_dir = Path(cache_dir) if cache_dir else Path(tempfile.gettempdir()) / "pipeline-usage-cache"
def _key(self, gateway_id: str, source_name: str) -> str:
return f"{gateway_id}:{source_name}"
def get(self, gateway_id: str, source_name: str) -> ScrapeResult | None:
key = self._key(gateway_id, source_name)
entry = self._memory.get(key)
if entry is None:
return None
cached_at, result = entry
age = (utcnow() - cached_at).total_seconds()
if age > self.ttl_seconds:
del self._memory[key]
return None
return result
def set(self, gateway_id: str, source_name: str, result: ScrapeResult) -> None:
key = self._key(gateway_id, source_name)
self._memory[key] = (utcnow(), result)
# Module-level singleton cache used by the endpoint
_cache = UsageScraperCache()
# ---------------------------------------------------------------------------
# Claude Code status-line ingestion
# ---------------------------------------------------------------------------
_STATUSLINE_SOURCE_NAME = "claude_code_statusline"
_STATUSLINE_PROVIDER = "anthropic"
_STATUSLINE_WINDOW_MAP: dict[str, tuple[str, str]] = {
"five_hour": ("current_session", "Current session"),
"seven_day": ("weekly_all_models", "All models"),
}
def _as_number(value: object) -> float | None:
if isinstance(value, bool):
return None
if isinstance(value, int | float):
return float(value)
if isinstance(value, str):
try:
return float(value.strip())
except ValueError:
return None
return None
def _parse_resets_at_ms(value: object, *, now: datetime) -> tuple[int | None, str | None]:
"""Parse Claude Code status-line ``resets_at`` into remaining milliseconds."""
if value is None:
return None, None
reset_dt: datetime | None = None
numeric = _as_number(value)
if numeric is not None:
# Claude documents Unix epoch seconds. Be defensive if a future build
# ever sends milliseconds.
if numeric > 10_000_000_000:
numeric = numeric / 1000
reset_dt = datetime.fromtimestamp(numeric, tz=UTC)
elif isinstance(value, str):
text = value.strip()
if text:
try:
reset_dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
except ValueError:
reset_dt = None
if reset_dt is None:
return None, None
if reset_dt.tzinfo is None:
reset_dt = reset_dt.replace(tzinfo=UTC)
remaining_ms = max(0, int((reset_dt - now).total_seconds() * 1000))
return remaining_ms, _format_remaining_ms(remaining_ms)
def _format_remaining_ms(ms: int) -> str:
seconds = max(0, int(round(ms / 1000)))
days, remainder = divmod(seconds, 86_400)
hours, remainder = divmod(remainder, 3_600)
minutes = remainder // 60
parts: list[str] = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
if not parts:
return "< 1m" if seconds > 0 else "now"
return " ".join(parts[:2])
def parse_claude_statusline_usage(
payload: dict[str, object],
*,
now: datetime | None = None,
) -> ParsedClaudeUsage:
"""Parse Claude Code status-line JSON into provider-native usage windows.
Official Claude Code status-line input exposes ``rate_limits.five_hour`` and
``rate_limits.seven_day`` for Claude.ai subscribers. This is more stable
than screen-scraping ``/usage`` and should be preferred when available.
"""
checked_at = now or utcnow()
if checked_at.tzinfo is None:
checked_at = checked_at.replace(tzinfo=UTC)
result = ParsedClaudeUsage(raw_text="")
result.source = "provider_native"
result.confidence = "high"
rate_limits = payload.get("rate_limits")
if not isinstance(rate_limits, dict):
result.error = "status-line payload did not include rate_limits"
return result
for upstream_key, upstream_value in rate_limits.items():
if not isinstance(upstream_value, dict):
continue
key, label = _STATUSLINE_WINDOW_MAP.get(
str(upstream_key),
(str(upstream_key), str(upstream_key).replace("_", " ").title()),
)
pct = _as_number(upstream_value.get("used_percentage"))
if pct is not None:
pct = max(0.0, min(100.0, pct))
remaining_ms, remaining_label = _parse_resets_at_ms(
upstream_value.get("resets_at"),
now=checked_at,
)
if pct is None and remaining_ms is None:
continue
result.windows.append(
ParsedClaudeUsageWindow(
key=key,
label=label,
pct_used=pct,
remaining_ms=remaining_ms,
remaining_label=remaining_label,
source="provider_native",
confidence="high",
),
)
current_window = next((w for w in result.windows if w.key == "current_session"), None)
primary_window = current_window or next(iter(result.windows), None)
if primary_window:
result.current_pct = primary_window.pct_used
result.remaining_ms = primary_window.remaining_ms
result.remaining_label = primary_window.remaining_label
else:
result.error = "status-line payload had no parseable rate_limits windows"
return result
def store_claude_statusline_usage(
gateway_id: str,
payload: dict[str, object],
) -> ScrapeResult:
"""Store the latest Claude Code status-line usage snapshot for a gateway."""
now = utcnow()
parsed = parse_claude_statusline_usage(payload, now=now)
result = ScrapeResult(
provider=_STATUSLINE_PROVIDER,
source_name=_STATUSLINE_SOURCE_NAME,
scraped_at=now,
fresh=parsed.error is None,
freshness_ttl_seconds=_cache.ttl_seconds,
parsed=parsed,
error=parsed.error,
)
_cache.set(gateway_id, _STATUSLINE_SOURCE_NAME, result)
logger.info(
"usage_statusline.ingested gateway_id=%s windows=%s error=%s",
gateway_id,
len(parsed.windows),
parsed.error,
)
return result
def get_cached_claude_statusline_usage(gateway_id: str) -> ScrapeResult | None:
"""Return the latest fresh Claude Code status-line snapshot, if any."""
return _cache.get(gateway_id, _STATUSLINE_SOURCE_NAME)
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def build_adapters(
enabled_providers: list[str],
tmux_socket: str = "",
) -> list[RuntimeUsageProviderAdapter]:
"""Instantiate enabled adapters from a list of provider names."""
registry: dict[str, type[RuntimeUsageProviderAdapter]] = {
"claude_cli_tmux": ClaudeTmuxScraper,
}
adapters = []
for name in enabled_providers:
cls = registry.get(name.strip().lower())
if cls is None:
logger.warning("usage_scraper.unknown_provider name=%s", name)
continue
kwargs: dict = {}
if name == "claude_cli_tmux":
kwargs["tmux_socket"] = tmux_socket
adapter = cls(**kwargs)
if adapter.is_available():
adapters.append(adapter)
else:
logger.info("usage_scraper.not_available provider=%s", name)
return adapters
async def get_provider_usage(
gateway_id: str,
enabled_providers: list[str],
tmux_socket: str = "",
include_raw: bool = False,
) -> list[ScrapeResult]:
"""Run all enabled adapters for a gateway, using cache where fresh.
Returns one ScrapeResult per adapter (errors included, never raises).
"""
adapters = build_adapters(enabled_providers, tmux_socket=tmux_socket)
results: list[ScrapeResult] = []
for adapter in adapters:
cached = _cache.get(gateway_id, adapter.source_name)
if cached is not None:
results.append(cached)
continue
result = await adapter.scrape()
_cache.set(gateway_id, adapter.source_name, result)
if not include_raw and result.parsed:
result.parsed.raw_text = None # strip unless explicitly requested
results.append(result)
logger.info(
"usage_scraper.result gateway_id=%s provider=%s source=%s "
"pct=%s remaining_ms=%s error=%s",
gateway_id,
result.provider,
result.source_name,
result.parsed.current_pct,
result.parsed.remaining_ms,
result.error,
)
return results