# ruff: noqa: INP001 """Unit tests for runtime_usage service helpers. Tests cover provider/model normalisation, cost estimation, session parsing, per-model aggregation, window building, burn rate, and predictions. No gateway connection is required. """ from __future__ import annotations from datetime import datetime, timedelta, timezone import pytest from app.services.openclaw.runtime_usage import ( DEFAULT_MODEL_PRICING, _build_predictions, _build_window, _compute_burn_rate, _parse_sessions, aggregate_per_model, estimate_cost, load_pricing, model_key, normalize_model, normalize_provider, ) from app.schemas.runtime_usage import RuntimeUsageBurnRate, RuntimeUsageCurrent, RuntimeUsageWindow # --------------------------------------------------------------------------- # normalize_provider # --------------------------------------------------------------------------- @pytest.mark.parametrize( "raw, expected", [ ("anthropic", "anthropic"), ("Anthropic", "anthropic"), ("claude", "anthropic"), ("CLAUDE", "anthropic"), ("openai", "openai"), ("OpenAI", "openai"), ("codex", "openai"), ("ollama", "ollama"), ("local", "ollama"), ("gemini", "google"), ("", "unknown"), (" ", "unknown"), ("custom-provider", "custom-provider"), ], ) def test_normalize_provider(raw: str, expected: str) -> None: assert normalize_provider(raw) == expected # --------------------------------------------------------------------------- # normalize_model # --------------------------------------------------------------------------- @pytest.mark.parametrize( "raw, expected", [ ("claude-sonnet-4-6", "claude-sonnet-4-6"), ("claude-sonnet-4-6-20250219", "claude-sonnet-4-6"), ("claude-3-5-sonnet-20241022", "claude-3-5-sonnet"), ("anthropic/claude-opus-4-7", "claude-opus-4-7"), ("gpt-4o-2024-05-13", "gpt-4o"), ("gpt-4o-mini", "gpt-4o-mini"), ("claude-3-haiku-20240307", "claude-3-haiku"), ("llama3:latest", "llama3:latest"), # local model — strip :latest via re ("o1-preview", "o1"), ("gpt-4-turbo-preview", "gpt-4-turbo"), ], ) def test_normalize_model(raw: str, expected: str) -> None: result = normalize_model(raw) # We only guarantee the date-stamp is stripped; allow minor variation assert expected in result or result == expected # --------------------------------------------------------------------------- # model_key # --------------------------------------------------------------------------- def test_model_key() -> None: assert model_key("anthropic", "claude-sonnet-4-6") == "anthropic/claude-sonnet-4-6" # --------------------------------------------------------------------------- # estimate_cost # --------------------------------------------------------------------------- def test_estimate_cost_known_model() -> None: cost, unpriced = estimate_cost("anthropic", "claude-sonnet-4-6", 1_000_000, 1_000_000) assert not unpriced # 1M input @ $3 + 1M output @ $15 = $18 assert abs(cost - 18.0) < 0.01 def test_estimate_cost_with_cache_tokens() -> None: cost, unpriced = estimate_cost( "anthropic", "claude-sonnet-4-6", input_tokens=0, output_tokens=0, cache_read_tokens=1_000_000, cache_write_tokens=1_000_000, ) assert not unpriced # $0.30 cache_read + $3.75 cache_write = $4.05 assert abs(cost - 4.05) < 0.01 def test_estimate_cost_ollama_is_free() -> None: cost, unpriced = estimate_cost("ollama", "llama3", 100_000, 50_000) assert cost == 0.0 assert not unpriced # Ollama is intentionally free, not unpriced def test_estimate_cost_unknown_paid_model() -> None: cost, unpriced = estimate_cost("anthropic", "claude-99-ultra", 1_000, 1_000) assert cost == 0.0 assert unpriced # unknown model — must flag def test_estimate_cost_zero_tokens() -> None: cost, unpriced = estimate_cost("anthropic", "claude-haiku-4-5", 0, 0) assert cost == 0.0 assert not unpriced # --------------------------------------------------------------------------- # load_pricing # --------------------------------------------------------------------------- def test_load_pricing_has_defaults() -> None: pricing = load_pricing() assert "anthropic/claude-sonnet-4-6" in pricing assert "openai/gpt-4o" in pricing def test_load_pricing_has_required_fields() -> None: pricing = load_pricing() for key, entry in pricing.items(): assert "input" in entry, f"{key} missing input" assert "output" in entry, f"{key} missing output" # --------------------------------------------------------------------------- # _parse_sessions # --------------------------------------------------------------------------- _SESSION_A = { "sessionId": "sess-a", "provider": "anthropic", "model": "claude-sonnet-4-6", "usage": {"input_tokens": 1000, "output_tokens": 500}, "cost": 0.012, "calls": 3, "updatedAt": "2026-05-20T10:00:00Z", } _SESSION_B = { "id": "sess-b", "model": "gpt-4o", "usage": {"inputTokens": 2000, "outputTokens": 800}, "costUsd": 0.013, "calls": 2, "updatedAt": "2026-05-20T09:00:00Z", } def test_parse_sessions_flat_list() -> None: raw = {"sessions": [_SESSION_A, _SESSION_B]} sessions = _parse_sessions(raw) assert len(sessions) == 2 def test_parse_sessions_nested_5hour() -> None: raw = {"5hour": {"sessions": [_SESSION_A]}} sessions = _parse_sessions(raw) assert len(sessions) == 1 def test_parse_sessions_empty() -> None: assert _parse_sessions({}) == [] def test_parse_sessions_malformed_entries_skipped() -> None: raw = {"sessions": [_SESSION_A, "bad-string", None, 42, _SESSION_B]} sessions = _parse_sessions(raw) assert len(sessions) == 2 # --------------------------------------------------------------------------- # aggregate_per_model # --------------------------------------------------------------------------- def test_aggregate_per_model_basic() -> None: per_model = aggregate_per_model([_SESSION_A], account_key="claude-default") key = "anthropic/claude-sonnet-4-6" assert key in per_model entry = per_model[key] assert entry.input_tokens == 1000 assert entry.output_tokens == 500 assert entry.total_tokens == 1500 assert entry.calls == 3 assert entry.provider == "anthropic" assert entry.account_key == "claude-default" assert not entry.unpriced def test_aggregate_per_model_merges_same_model() -> None: sessions = [_SESSION_A, {**_SESSION_A, "sessionId": "sess-c", "usage": {"input_tokens": 200, "output_tokens": 100}}] per_model = aggregate_per_model(sessions) entry = per_model["anthropic/claude-sonnet-4-6"] assert entry.input_tokens == 1200 assert entry.output_tokens == 600 def test_aggregate_per_model_unknown_model_flagged() -> None: session = { "sessionId": "x", "provider": "anthropic", "model": "claude-99-ultra", "usage": {"input_tokens": 100, "output_tokens": 50}, "calls": 1, } per_model = aggregate_per_model([session]) key = "anthropic/claude-99-ultra" assert per_model[key].unpriced def test_aggregate_per_model_ollama_not_flagged() -> None: session = { "sessionId": "y", "provider": "ollama", "model": "llama3", "usage": {"input_tokens": 5000, "output_tokens": 2000}, "calls": 1, } per_model = aggregate_per_model([session]) entry = per_model["ollama/llama3"] assert not entry.unpriced assert entry.cost_usd == 0.0 # --------------------------------------------------------------------------- # _build_window # --------------------------------------------------------------------------- def _now_naive() -> datetime: return datetime.now(timezone.utc).replace(tzinfo=None) def test_build_window_falls_back_to_5h_rolling() -> None: now = _now_naive() window = _build_window({}, now) assert window.key == "5h" assert abs((now - window.started_at).total_seconds() - 5 * 3600) < 5 assert window.reset_in_ms == 0 # resets_at == now def test_build_window_uses_gateway_status() -> None: now = _now_naive() started = now - timedelta(hours=3) resets = now + timedelta(hours=2) status_raw = { "windowStart": started.isoformat() + "Z", "windowEnd": resets.isoformat() + "Z", } window = _build_window(status_raw, now) assert abs(window.reset_in_ms - 2 * 3600 * 1000) < 5000 # within 5 seconds # --------------------------------------------------------------------------- # _compute_burn_rate # --------------------------------------------------------------------------- def test_compute_burn_rate_recent_sessions() -> None: now = _now_naive() recent = (now - timedelta(minutes=30)).isoformat() + "Z" sessions = [ {"updatedAt": recent, "usage": {"input_tokens": 6000, "output_tokens": 0}, "cost": 0.018}, ] window = RuntimeUsageWindow( key="5h", started_at=now - timedelta(hours=5), resets_at=now, reset_in_ms=0, ) burn = _compute_burn_rate(sessions, window, now) assert burn.tokens_per_minute == pytest.approx(6000 / 60, abs=1) assert burn.cost_usd_per_minute == pytest.approx(0.018 / 60, abs=1e-6) def test_compute_burn_rate_no_recent_sessions() -> None: now = _now_naive() old = (now - timedelta(hours=3)).isoformat() + "Z" sessions = [{"updatedAt": old, "usage": {"input_tokens": 1000, "output_tokens": 0}, "cost": 0.01}] window = RuntimeUsageWindow(key="5h", started_at=now - timedelta(hours=5), resets_at=now, reset_in_ms=0) burn = _compute_burn_rate(sessions, window, now) assert burn.tokens_per_minute == 0.0 assert burn.cost_usd_per_minute == 0.0 # --------------------------------------------------------------------------- # _build_predictions # --------------------------------------------------------------------------- def _make_window(reset_in_ms: int) -> RuntimeUsageWindow: now = _now_naive() return RuntimeUsageWindow( key="5h", started_at=now - timedelta(hours=5), resets_at=now + timedelta(milliseconds=reset_in_ms), reset_in_ms=reset_in_ms, ) def test_build_predictions_no_limit() -> None: current = RuntimeUsageCurrent(total_cost_usd=1.0, total_tokens=5000, total_calls=10) burn = RuntimeUsageBurnRate(tokens_per_minute=100.0, cost_usd_per_minute=0.01) window = _make_window(reset_in_ms=60_000) pred = _build_predictions(current, burn, window) assert pred.time_to_limit_ms is None assert pred.safe is True def test_build_predictions_safe() -> None: current = RuntimeUsageCurrent( total_cost_usd=1.0, total_tokens=10_000, total_calls=5, token_limit=100_000, # 90k remaining ) burn = RuntimeUsageBurnRate(tokens_per_minute=100.0, cost_usd_per_minute=0.01) # 90k tokens @ 100/min = 900 minutes = 54,000,000 ms # reset in 30 minutes = 1,800,000 ms → safe=True window = _make_window(reset_in_ms=1_800_000) pred = _build_predictions(current, burn, window) assert pred.time_to_limit_ms is not None assert pred.time_to_limit_ms > 1_800_000 assert pred.safe is True def test_build_predictions_unsafe() -> None: current = RuntimeUsageCurrent( total_cost_usd=1.0, total_tokens=95_000, total_calls=5, token_limit=100_000, # only 5k left ) burn = RuntimeUsageBurnRate(tokens_per_minute=1000.0, cost_usd_per_minute=0.05) # 5k tokens @ 1000/min = 5 minutes = 300,000 ms # reset in 30 minutes = 1,800,000 ms → safe=False (will hit limit before reset) window = _make_window(reset_in_ms=1_800_000) pred = _build_predictions(current, burn, window) assert pred.time_to_limit_ms is not None assert pred.time_to_limit_ms < 1_800_000 assert pred.safe is False def test_build_predictions_already_over_limit() -> None: current = RuntimeUsageCurrent( total_cost_usd=5.0, total_tokens=110_000, total_calls=20, token_limit=100_000, ) burn = RuntimeUsageBurnRate(tokens_per_minute=500.0, cost_usd_per_minute=0.05) window = _make_window(reset_in_ms=1_800_000) pred = _build_predictions(current, burn, window) assert pred.time_to_limit_ms == 0 assert pred.safe is False