diff --git a/backend/app/api/gateways.py b/backend/app/api/gateways.py index 8393766..46eb812 100644 --- a/backend/app/api/gateways.py +++ b/backend/app/api/gateways.py @@ -38,6 +38,7 @@ from app.schemas.gateway_ops import ( SystemHealthResponse, ) from app.schemas.runtime_usage import ( + ClaudeStatuslineUsageIn, ProviderUsageResponse, ProviderUsageScrapeResult, ProviderUsageWindow, @@ -59,8 +60,13 @@ from app.services.openclaw.runtime_activity import ( POLL_HISTORY_SESSIONS_MAX, fetch_recent_events, ) +from app.services.openclaw.gateway_rpc import openclaw_call from app.services.openclaw.runtime_usage import get_runtime_usage -from app.services.openclaw.usage_scrapers import get_provider_usage +from app.services.openclaw.usage_scrapers import ( + get_cached_claude_statusline_usage, + get_provider_usage, + store_claude_statusline_usage, +) from app.schemas.pagination import DefaultLimitOffsetPage from app.services.openclaw.admin_service import GatewayAdminLifecycleService from app.services.openclaw.session_service import GatewayTemplateSyncQuery @@ -330,7 +336,9 @@ async def get_gateway_provider_usage( ) now = utcnow() - if not settings.usage_scraper_enabled: + statusline_result = get_cached_claude_statusline_usage(str(gateway_id)) + + if not settings.usage_scraper_enabled and statusline_result is None: return ProviderUsageResponse( gateway_id=gateway_id, generated_at=now, @@ -338,17 +346,24 @@ async def get_gateway_provider_usage( results=[], ) - enabled_providers = [ - p.strip() - for p in settings.usage_scraper_providers.split(",") - if p.strip() - ] - scrape_results = await get_provider_usage( - gateway_id=str(gateway_id), - enabled_providers=enabled_providers, - tmux_socket=settings.usage_scraper_tmux_socket, - include_raw=settings.usage_scraper_include_raw, - ) + scrape_results = [] + if statusline_result is not None: + scrape_results.append(statusline_result) + + if settings.usage_scraper_enabled: + enabled_providers = [ + p.strip() + for p in settings.usage_scraper_providers.split(",") + if p.strip() + ] + scrape_results.extend( + await get_provider_usage( + gateway_id=str(gateway_id), + enabled_providers=enabled_providers, + tmux_socket=settings.usage_scraper_tmux_socket, + include_raw=settings.usage_scraper_include_raw, + ), + ) results = [ ProviderUsageScrapeResult( @@ -393,6 +408,62 @@ async def get_gateway_provider_usage( ) +@router.post( + "/{gateway_id}/provider-usage/claude/statusline", + response_model=ProviderUsageScrapeResult, + summary="Ingest Claude Code status-line usage", + description=( + "Store provider-native Claude Code usage windows from a local status-line " + "collector. The payload should contain Claude Code's rate_limits object; " + "Pipeline stores only usage percentages and reset times." + ), +) +async def ingest_gateway_claude_statusline_usage( + gateway_id: UUID, + payload: ClaudeStatuslineUsageIn, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> ProviderUsageScrapeResult: + """Accept sanitized Claude Code status-line usage for the specified gateway.""" + service = GatewayAdminLifecycleService(session) + await service.require_gateway( + gateway_id=gateway_id, + organization_id=ctx.organization.id, + ) + + scrape = store_claude_statusline_usage( + str(gateway_id), + payload.model_dump(exclude_none=True), + ) + return ProviderUsageScrapeResult( + provider=scrape.provider, + source_name=scrape.source_name, + scraped_at=scrape.scraped_at, + fresh=scrape.fresh, + freshness_ttl_seconds=scrape.freshness_ttl_seconds, + windows=[ + ProviderUsageWindow( + key=w.key, + label=w.label, + pct_used=w.pct_used, + remaining_ms=w.remaining_ms, + remaining_label=w.remaining_label, + extra_text=w.extra_text, + source=w.source, + confidence=w.confidence, + ) + for w in scrape.parsed.windows + ], + current_pct=scrape.parsed.current_pct, + remaining_ms=scrape.parsed.remaining_ms, + remaining_label=scrape.parsed.remaining_label, + raw_text=None, + error=scrape.error or scrape.parsed.error, + source=scrape.parsed.source, + confidence=scrape.parsed.confidence, + ) + + @router.get( "/{gateway_id}/cron", response_model=CronStatusResponse, @@ -545,7 +616,7 @@ async def get_gateway_logs( log_lines = raw.splitlines() elif isinstance(raw, dict): nested = raw.get("lines") or raw.get("logs") or raw.get("output") or [] - log_lines = [str(l) for l in nested] if isinstance(nested, list) else str(raw).splitlines() + log_lines = [str(line) for line in nested] if isinstance(nested, list) else str(raw).splitlines() else: log_lines = [] diff --git a/backend/app/schemas/runtime_usage.py b/backend/app/schemas/runtime_usage.py index 3c2c4c6..777492e 100644 --- a/backend/app/schemas/runtime_usage.py +++ b/backend/app/schemas/runtime_usage.py @@ -158,6 +158,20 @@ class ProviderUsageResponse(SQLModel): results: list[ProviderUsageScrapeResult] +class ClaudeStatuslineUsageIn(SQLModel): + """Sanitized Claude Code status-line payload posted by a local collector. + + Claude Code passes a much larger JSON object to status-line commands. The + collector should forward only these low-risk fields so Pipeline never needs + raw prompts, file paths beyond the current workspace, or credentials. + """ + + session_id: str | None = None + model: dict[str, object] | None = None + workspace: dict[str, object] | None = None + rate_limits: dict[str, object] | None = None + + class RuntimeUsageResponse(SQLModel): """Complete runtime usage payload returned by GET /gateways/{id}/runtime-usage.""" diff --git a/backend/app/services/openclaw/usage_scrapers.py b/backend/app/services/openclaw/usage_scrapers.py index 3c233ff..49647ce 100644 --- a/backend/app/services/openclaw/usage_scrapers.py +++ b/backend/app/services/openclaw/usage_scrapers.py @@ -23,7 +23,7 @@ import re import tempfile from abc import ABC, abstractmethod from dataclasses import dataclass, field -from datetime import datetime +from datetime import UTC, datetime from pathlib import Path from typing import ClassVar @@ -609,6 +609,171 @@ class UsageScraperCache: _cache = UsageScraperCache() +# --------------------------------------------------------------------------- +# Claude Code status-line ingestion +# --------------------------------------------------------------------------- + +_STATUSLINE_SOURCE_NAME = "claude_code_statusline" +_STATUSLINE_PROVIDER = "anthropic" +_STATUSLINE_WINDOW_MAP: dict[str, tuple[str, str]] = { + "five_hour": ("current_session", "Current session"), + "seven_day": ("weekly_all_models", "All models"), +} + + +def _as_number(value: object) -> float | None: + if isinstance(value, bool): + return None + if isinstance(value, int | float): + return float(value) + if isinstance(value, str): + try: + return float(value.strip()) + except ValueError: + return None + return None + + +def _parse_resets_at_ms(value: object, *, now: datetime) -> tuple[int | None, str | None]: + """Parse Claude Code status-line ``resets_at`` into remaining milliseconds.""" + if value is None: + return None, None + + reset_dt: datetime | None = None + numeric = _as_number(value) + if numeric is not None: + # Claude documents Unix epoch seconds. Be defensive if a future build + # ever sends milliseconds. + if numeric > 10_000_000_000: + numeric = numeric / 1000 + reset_dt = datetime.fromtimestamp(numeric, tz=UTC) + elif isinstance(value, str): + text = value.strip() + if text: + try: + reset_dt = datetime.fromisoformat(text.replace("Z", "+00:00")) + except ValueError: + reset_dt = None + + if reset_dt is None: + return None, None + if reset_dt.tzinfo is None: + reset_dt = reset_dt.replace(tzinfo=UTC) + + remaining_ms = max(0, int((reset_dt - now).total_seconds() * 1000)) + return remaining_ms, _format_remaining_ms(remaining_ms) + + +def _format_remaining_ms(ms: int) -> str: + seconds = max(0, int(round(ms / 1000))) + days, remainder = divmod(seconds, 86_400) + hours, remainder = divmod(remainder, 3_600) + minutes = remainder // 60 + parts: list[str] = [] + if days: + parts.append(f"{days}d") + if hours: + parts.append(f"{hours}h") + if minutes: + parts.append(f"{minutes}m") + if not parts: + return "< 1m" if seconds > 0 else "now" + return " ".join(parts[:2]) + + +def parse_claude_statusline_usage( + payload: dict[str, object], + *, + now: datetime | None = None, +) -> ParsedClaudeUsage: + """Parse Claude Code status-line JSON into provider-native usage windows. + + Official Claude Code status-line input exposes ``rate_limits.five_hour`` and + ``rate_limits.seven_day`` for Claude.ai subscribers. This is more stable + than screen-scraping ``/usage`` and should be preferred when available. + """ + checked_at = now or utcnow() + if checked_at.tzinfo is None: + checked_at = checked_at.replace(tzinfo=UTC) + result = ParsedClaudeUsage(raw_text="") + result.source = "provider_native" + result.confidence = "high" + + rate_limits = payload.get("rate_limits") + if not isinstance(rate_limits, dict): + result.error = "status-line payload did not include rate_limits" + return result + + for upstream_key, upstream_value in rate_limits.items(): + if not isinstance(upstream_value, dict): + continue + key, label = _STATUSLINE_WINDOW_MAP.get( + str(upstream_key), + (str(upstream_key), str(upstream_key).replace("_", " ").title()), + ) + pct = _as_number(upstream_value.get("used_percentage")) + if pct is not None: + pct = max(0.0, min(100.0, pct)) + remaining_ms, remaining_label = _parse_resets_at_ms( + upstream_value.get("resets_at"), + now=checked_at, + ) + if pct is None and remaining_ms is None: + continue + result.windows.append( + ParsedClaudeUsageWindow( + key=key, + label=label, + pct_used=pct, + remaining_ms=remaining_ms, + remaining_label=remaining_label, + source="provider_native", + confidence="high", + ), + ) + + current_window = next((w for w in result.windows if w.key == "current_session"), None) + primary_window = current_window or next(iter(result.windows), None) + if primary_window: + result.current_pct = primary_window.pct_used + result.remaining_ms = primary_window.remaining_ms + result.remaining_label = primary_window.remaining_label + else: + result.error = "status-line payload had no parseable rate_limits windows" + return result + + +def store_claude_statusline_usage( + gateway_id: str, + payload: dict[str, object], +) -> ScrapeResult: + """Store the latest Claude Code status-line usage snapshot for a gateway.""" + now = utcnow() + parsed = parse_claude_statusline_usage(payload, now=now) + result = ScrapeResult( + provider=_STATUSLINE_PROVIDER, + source_name=_STATUSLINE_SOURCE_NAME, + scraped_at=now, + fresh=parsed.error is None, + freshness_ttl_seconds=_cache.ttl_seconds, + parsed=parsed, + error=parsed.error, + ) + _cache.set(gateway_id, _STATUSLINE_SOURCE_NAME, result) + logger.info( + "usage_statusline.ingested gateway_id=%s windows=%s error=%s", + gateway_id, + len(parsed.windows), + parsed.error, + ) + return result + + +def get_cached_claude_statusline_usage(gateway_id: str) -> ScrapeResult | None: + """Return the latest fresh Claude Code status-line snapshot, if any.""" + return _cache.get(gateway_id, _STATUSLINE_SOURCE_NAME) + + # --------------------------------------------------------------------------- # Public entry point # --------------------------------------------------------------------------- diff --git a/backend/tests/test_runtime_usage_api.py b/backend/tests/test_runtime_usage_api.py index c5a57c0..5905637 100644 --- a/backend/tests/test_runtime_usage_api.py +++ b/backend/tests/test_runtime_usage_api.py @@ -8,6 +8,7 @@ degradation when the gateway RPC returns empty/error data. from __future__ import annotations +from datetime import UTC, datetime from unittest.mock import AsyncMock, patch from uuid import uuid4 @@ -201,6 +202,48 @@ async def test_runtime_usage_gateway_not_found() -> None: assert response.status_code == 404 +@pytest.mark.asyncio +async def test_claude_statusline_ingest_surfaces_provider_usage() -> None: + """Status-line snapshots are returned as provider-native usage windows.""" + engine = await _make_engine() + session_maker = _make_session_maker(engine) + + org_id = uuid4() + async with session_maker() as session: + gateway = await _seed_gateway(session, org_id) + + app = _build_app(session_maker, org_id) + future_reset = int(datetime(2099, 1, 1, tzinfo=UTC).timestamp()) + + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + post_response = await client.post( + f"/api/v1/gateways/{gateway.id}/provider-usage/claude/statusline", + json={ + "session_id": "claude-session-1", + "rate_limits": { + "five_hour": { + "used_percentage": 77, + "resets_at": future_reset, + }, + "seven_day": { + "used_percentage": 21, + "resets_at": future_reset, + }, + }, + }, + ) + get_response = await client.get(f"/api/v1/gateways/{gateway.id}/provider-usage") + + assert post_response.status_code == 200 + assert get_response.status_code == 200 + data = get_response.json() + assert data["scraper_enabled"] is True + assert data["results"][0]["source_name"] == "claude_code_statusline" + by_key = {window["key"]: window for window in data["results"][0]["windows"]} + assert by_key["current_session"]["pct_used"] == 77 + assert by_key["weekly_all_models"]["pct_used"] == 21 + + @pytest.mark.asyncio async def test_runtime_usage_org_boundary() -> None: """A gateway created in a different org is not visible to another org.""" diff --git a/backend/tests/test_runtime_usage_scrapers.py b/backend/tests/test_runtime_usage_scrapers.py index a46991a..207523a 100644 --- a/backend/tests/test_runtime_usage_scrapers.py +++ b/backend/tests/test_runtime_usage_scrapers.py @@ -11,7 +11,12 @@ from __future__ import annotations import pytest # We import the parse function directly — it is pure and has no side effects. -from app.services.openclaw.usage_scrapers import parse_claude_usage +from datetime import UTC, datetime + +from app.services.openclaw.usage_scrapers import ( + parse_claude_statusline_usage, + parse_claude_usage, +) # --------------------------------------------------------------------------- @@ -345,3 +350,40 @@ class TestParseClaudeUsageSectionedWindows: assert r.weekly_messages_limit == 500 assert r.weekly_tokens_used == 540_000 assert r.weekly_cost_usd == pytest.approx(2.34, rel=1e-3) + + +class TestParseClaudeStatuslineUsage: + + def test_parses_official_rate_limits_fields(self): + now = datetime(2026, 5, 21, 12, 0, tzinfo=UTC) + r = parse_claude_statusline_usage( + { + "rate_limits": { + "five_hour": { + "used_percentage": 77, + "resets_at": int(now.timestamp()) + (23 * 60), + }, + "seven_day": { + "used_percentage": 21, + "resets_at": int(now.timestamp()) + (12 * 3600) + (23 * 60), + }, + }, + }, + now=now, + ) + by_key = {window.key: window for window in r.windows} + + assert r.error is None + assert by_key["current_session"].pct_used == pytest.approx(77.0) + assert by_key["current_session"].remaining_ms == 1_380_000 + assert by_key["current_session"].remaining_label == "23m" + assert by_key["weekly_all_models"].pct_used == pytest.approx(21.0) + assert by_key["weekly_all_models"].remaining_label == "12h 23m" + assert r.current_pct == pytest.approx(77.0) + assert r.source == "provider_native" + assert r.confidence == "high" + + def test_missing_rate_limits_is_visible_error(self): + r = parse_claude_statusline_usage({}) + assert r.windows == [] + assert r.error == "status-line payload did not include rate_limits" diff --git a/docs/claude-code-statusline-usage.md b/docs/claude-code-statusline-usage.md new file mode 100644 index 0000000..cd10569 --- /dev/null +++ b/docs/claude-code-statusline-usage.md @@ -0,0 +1,37 @@ +# Claude Code Status-Line Usage Ingestion + +Pipeline tracks Claude subscription usage most accurately from Claude Code's status-line JSON. This is preferred over API rate-limit headers because Claude Code exposes provider-native subscription windows: + +- `rate_limits.five_hour.used_percentage` -> Current session +- `rate_limits.five_hour.resets_at` -> Current session reset time +- `rate_limits.seven_day.used_percentage` -> All models +- `rate_limits.seven_day.resets_at` -> All models reset time + +API rate-limit headers are still useful diagnostics, but they are not subscription remaining usage. + +## Configure Claude Code + +Add a status-line command to `~/.claude/settings.json` or `~/.config/claude/settings.json`: + +```json +{ + "statusLine": { + "type": "command", + "command": "PIPELINE_API_URL=http://localhost:8001 PIPELINE_AUTH_TOKEN=$LOCAL_AUTH_TOKEN PIPELINE_GATEWAY_ID= /home/kaspa/.openclaw/Projects/Pipeline/scripts/claude-statusline-to-pipeline.py", + "padding": 0 + } +} +``` + +Replace `` with the Pipeline gateway UUID shown in the dashboard/API. Use the backend URL that is reachable from the shell running Claude Code. + +The collector forwards only sanitized fields: `session_id`, `model`, `workspace`, and `rate_limits`. It ignores network errors so Claude Code is not interrupted if Pipeline is down. + +## Verification + +After Claude Code completes at least one model response, the dashboard Runtime Usage card should show provider-native rows: + +- Current session +- All models + +Both should display `% used` and reset time from Claude Code. If `rate_limits` is absent, Pipeline will not invent a subscription usage percentage. diff --git a/frontend/src/app/dashboard/page.tsx b/frontend/src/app/dashboard/page.tsx index 9835e7c..2ea2a43 100644 --- a/frontend/src/app/dashboard/page.tsx +++ b/frontend/src/app/dashboard/page.tsx @@ -816,9 +816,21 @@ export default function DashboardPage() { return windows; }, }); + const statuslineUsageWindows = providerUsageQuery.data ?? []; + const statuslineProviders = new Set( + statuslineUsageWindows + .filter((window) => window.pctUsed !== null || window.remainingMs !== null) + .map((window) => window.provider), + ); const providerUsageWindows = [ - ...(credentialUsageQuery.data ?? []), - ...(providerUsageQuery.data ?? []), + ...statuslineUsageWindows, + ...(credentialUsageQuery.data ?? []).filter( + (window) => + !( + window.key.includes("subscription_unavailable") + && statuslineProviders.has(window.provider) + ), + ), ]; // Gateway health — query the first gateway only for the compact dashboard panel diff --git a/scripts/claude-statusline-to-pipeline.py b/scripts/claude-statusline-to-pipeline.py new file mode 100755 index 0000000..c3cbd01 --- /dev/null +++ b/scripts/claude-statusline-to-pipeline.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +"""Forward Claude Code status-line usage windows to Pipeline. + +Claude Code passes a JSON snapshot on stdin to status-line commands. This helper +extracts only the documented rate_limits fields and posts them to Pipeline so the +dashboard can show provider-native Current session and All models reset windows. + +Required environment: + PIPELINE_API_URL e.g. http://localhost:8001 + PIPELINE_AUTH_TOKEN local bearer token or user token + PIPELINE_GATEWAY_ID gateway UUID to attach this local Claude Code session to + +Optional environment: + PIPELINE_STATUSLINE_TIMEOUT_SECONDS default: 2 + PIPELINE_STATUSLINE_PRINT set to 0 to suppress compact status output +""" + +from __future__ import annotations + +import json +import os +import sys +import urllib.error +import urllib.request + + +def _compact_status(payload: dict[str, object]) -> str: + model = payload.get("model") + model_name = "Claude" + if isinstance(model, dict): + display = model.get("display_name") + if isinstance(display, str) and display.strip(): + model_name = display.strip() + + rate_limits = payload.get("rate_limits") + if not isinstance(rate_limits, dict): + return f"[{model_name}]" + + parts: list[str] = [] + five = rate_limits.get("five_hour") + if isinstance(five, dict) and isinstance(five.get("used_percentage"), int | float): + parts.append(f"5h {five['used_percentage']:.0f}%") + week = rate_limits.get("seven_day") + if isinstance(week, dict) and isinstance(week.get("used_percentage"), int | float): + parts.append(f"7d {week['used_percentage']:.0f}%") + return f"[{model_name}] {' · '.join(parts)}" if parts else f"[{model_name}]" + + +def _sanitize(raw: dict[str, object]) -> dict[str, object]: + payload: dict[str, object] = {} + for key in ("session_id", "model", "workspace", "rate_limits"): + value = raw.get(key) + if value is not None: + payload[key] = value + return payload + + +def main() -> int: + try: + raw = json.load(sys.stdin) + except json.JSONDecodeError: + return 0 + if not isinstance(raw, dict): + return 0 + + payload = _sanitize(raw) + if os.environ.get("PIPELINE_STATUSLINE_PRINT") != "0": + print(_compact_status(payload)) + + api_url = os.environ.get("PIPELINE_API_URL", "").strip().rstrip("/") + token = os.environ.get("PIPELINE_AUTH_TOKEN", "").strip() + gateway_id = os.environ.get("PIPELINE_GATEWAY_ID", "").strip() + if not api_url or not token or not gateway_id: + return 0 + + url = f"{api_url}/api/v1/gateways/{gateway_id}/provider-usage/claude/statusline" + body = json.dumps(payload).encode("utf-8") + timeout = float(os.environ.get("PIPELINE_STATUSLINE_TIMEOUT_SECONDS", "2")) + request = urllib.request.Request( + url, + data=body, + method="POST", + headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + "User-Agent": "PipelineClaudeStatusline/1.0", + }, + ) + try: + with urllib.request.urlopen(request, timeout=timeout): + pass + except (OSError, urllib.error.URLError, urllib.error.HTTPError): + # Status-line hooks must never disrupt Claude Code if Pipeline is down. + return 0 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())