Pipeline/backend/tests/test_runtime_usage_service.py

647 lines
23 KiB
Python
Raw Normal View History

# ruff: noqa: INP001
"""Unit tests for runtime_usage service helpers.
Tests cover provider/model normalisation, cost estimation, session parsing,
per-model aggregation, window building, burn rate, and predictions.
No gateway connection is required.
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import pytest
import json
import os
import tempfile
from app.services.openclaw.runtime_usage import (
DEFAULT_MODEL_PRICING,
_build_predictions,
_build_window,
_compute_burn_rate,
_get_explicit_cost,
_oldest_active_ts,
_parse_sessions,
aggregate_per_model,
estimate_cost,
load_pricing,
model_key,
normalize_model,
normalize_provider,
)
from app.schemas.runtime_usage import RuntimeUsageBurnRate, RuntimeUsageCurrent, RuntimeUsageWindow
# ---------------------------------------------------------------------------
# normalize_provider
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"raw, expected",
[
("anthropic", "anthropic"),
("Anthropic", "anthropic"),
("claude", "anthropic"),
("CLAUDE", "anthropic"),
("openai", "openai"),
("OpenAI", "openai"),
("codex", "openai"),
("ollama", "ollama"),
("local", "ollama"),
("gemini", "google"),
("", "unknown"),
(" ", "unknown"),
("custom-provider", "custom-provider"),
],
)
def test_normalize_provider(raw: str, expected: str) -> None:
assert normalize_provider(raw) == expected
# ---------------------------------------------------------------------------
# normalize_model
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"raw, expected",
[
("claude-sonnet-4-6", "claude-sonnet-4-6"),
("claude-sonnet-4-6-20250219", "claude-sonnet-4-6"),
("claude-3-5-sonnet-20241022", "claude-3-5-sonnet"),
("anthropic/claude-opus-4-7", "claude-opus-4-7"),
("gpt-4o-2024-05-13", "gpt-4o"),
("gpt-4o-mini", "gpt-4o-mini"),
("claude-3-haiku-20240307", "claude-3-haiku"),
("llama3:latest", "llama3:latest"), # local model — strip :latest via re
("o1-preview", "o1"),
("gpt-4-turbo-preview", "gpt-4-turbo"),
],
)
def test_normalize_model(raw: str, expected: str) -> None:
result = normalize_model(raw)
# We only guarantee the date-stamp is stripped; allow minor variation
assert expected in result or result == expected
# ---------------------------------------------------------------------------
# model_key
# ---------------------------------------------------------------------------
def test_model_key() -> None:
assert model_key("anthropic", "claude-sonnet-4-6") == "anthropic/claude-sonnet-4-6"
# ---------------------------------------------------------------------------
# estimate_cost
# ---------------------------------------------------------------------------
def test_estimate_cost_known_model() -> None:
cost, unpriced = estimate_cost("anthropic", "claude-sonnet-4-6", 1_000_000, 1_000_000)
assert not unpriced
# 1M input @ $3 + 1M output @ $15 = $18
assert abs(cost - 18.0) < 0.01
def test_estimate_cost_with_cache_tokens() -> None:
cost, unpriced = estimate_cost(
"anthropic", "claude-sonnet-4-6",
input_tokens=0, output_tokens=0,
cache_read_tokens=1_000_000, cache_write_tokens=1_000_000,
)
assert not unpriced
# $0.30 cache_read + $3.75 cache_write = $4.05
assert abs(cost - 4.05) < 0.01
def test_estimate_cost_ollama_is_free() -> None:
cost, unpriced = estimate_cost("ollama", "llama3", 100_000, 50_000)
assert cost == 0.0
assert not unpriced # Ollama is intentionally free, not unpriced
def test_estimate_cost_unknown_paid_model() -> None:
cost, unpriced = estimate_cost("anthropic", "claude-99-ultra", 1_000, 1_000)
assert cost == 0.0
assert unpriced # unknown model — must flag
def test_estimate_cost_zero_tokens() -> None:
cost, unpriced = estimate_cost("anthropic", "claude-haiku-4-5", 0, 0)
assert cost == 0.0
assert not unpriced
# ---------------------------------------------------------------------------
# load_pricing
# ---------------------------------------------------------------------------
def test_load_pricing_has_defaults() -> None:
pricing = load_pricing()
assert "anthropic/claude-sonnet-4-6" in pricing
assert "openai/gpt-4o" in pricing
def test_load_pricing_has_required_fields() -> None:
pricing = load_pricing()
for key, entry in pricing.items():
assert "input" in entry, f"{key} missing input"
assert "output" in entry, f"{key} missing output"
# ---------------------------------------------------------------------------
# _parse_sessions
# ---------------------------------------------------------------------------
_SESSION_A = {
"sessionId": "sess-a",
"provider": "anthropic",
"model": "claude-sonnet-4-6",
"usage": {"input_tokens": 1000, "output_tokens": 500},
"cost": 0.012,
"calls": 3,
"updatedAt": "2026-05-20T10:00:00Z",
}
_SESSION_B = {
"id": "sess-b",
"model": "gpt-4o",
"usage": {"inputTokens": 2000, "outputTokens": 800},
"costUsd": 0.013,
"calls": 2,
"updatedAt": "2026-05-20T09:00:00Z",
}
def test_parse_sessions_flat_list() -> None:
raw = {"sessions": [_SESSION_A, _SESSION_B]}
sessions = _parse_sessions(raw)
assert len(sessions) == 2
def test_parse_sessions_nested_5hour() -> None:
raw = {"5hour": {"sessions": [_SESSION_A]}}
sessions = _parse_sessions(raw)
assert len(sessions) == 1
def test_parse_sessions_empty() -> None:
assert _parse_sessions({}) == []
def test_parse_sessions_malformed_entries_skipped() -> None:
raw = {"sessions": [_SESSION_A, "bad-string", None, 42, _SESSION_B]}
sessions = _parse_sessions(raw)
assert len(sessions) == 2
# ---------------------------------------------------------------------------
# aggregate_per_model
# ---------------------------------------------------------------------------
def test_aggregate_per_model_basic() -> None:
per_model = aggregate_per_model([_SESSION_A], account_key="claude-default")
key = "anthropic/claude-sonnet-4-6"
assert key in per_model
entry = per_model[key]
assert entry.input_tokens == 1000
assert entry.output_tokens == 500
assert entry.total_tokens == 1500
assert entry.calls == 3
assert entry.provider == "anthropic"
assert entry.account_key == "claude-default"
assert not entry.unpriced
def test_aggregate_per_model_merges_same_model() -> None:
sessions = [_SESSION_A, {**_SESSION_A, "sessionId": "sess-c", "usage": {"input_tokens": 200, "output_tokens": 100}}]
per_model = aggregate_per_model(sessions)
entry = per_model["anthropic/claude-sonnet-4-6"]
assert entry.input_tokens == 1200
assert entry.output_tokens == 600
def test_aggregate_per_model_unknown_model_flagged() -> None:
session = {
"sessionId": "x",
"provider": "anthropic",
"model": "claude-99-ultra",
"usage": {"input_tokens": 100, "output_tokens": 50},
"calls": 1,
}
per_model = aggregate_per_model([session])
key = "anthropic/claude-99-ultra"
assert per_model[key].unpriced
def test_aggregate_per_model_ollama_not_flagged() -> None:
session = {
"sessionId": "y",
"provider": "ollama",
"model": "llama3",
"usage": {"input_tokens": 5000, "output_tokens": 2000},
"calls": 1,
}
per_model = aggregate_per_model([session])
entry = per_model["ollama/llama3"]
assert not entry.unpriced
assert entry.cost_usd == 0.0
# ---------------------------------------------------------------------------
# _oldest_active_ts
# ---------------------------------------------------------------------------
def test_oldest_active_ts_ignores_events_outside_window() -> None:
now = _now_naive()
sessions = [
{"updatedAt": (now - timedelta(hours=6)).isoformat() + "Z"}, # outside 5h window
{"updatedAt": (now - timedelta(hours=2, minutes=30)).isoformat() + "Z"},
{"updatedAt": (now - timedelta(hours=1)).isoformat() + "Z"},
]
oldest = _oldest_active_ts(sessions, now)
assert oldest is not None
assert abs((oldest - (now - timedelta(hours=2, minutes=30))).total_seconds()) < 2
def test_oldest_active_ts_returns_none_when_all_events_outside_window() -> None:
now = _now_naive()
sessions = [
{"updatedAt": (now - timedelta(hours=8)).isoformat() + "Z"},
{"updatedAt": (now - timedelta(hours=6)).isoformat() + "Z"},
]
assert _oldest_active_ts(sessions, now) is None
# ---------------------------------------------------------------------------
# _build_window
# ---------------------------------------------------------------------------
def _now_naive() -> datetime:
return datetime.now(timezone.utc).replace(tzinfo=None)
def test_build_window_falls_back_to_5h_rolling() -> None:
now = _now_naive()
window = _build_window({}, now)
assert window.key == "5h"
assert abs((now - window.started_at).total_seconds() - 5 * 3600) < 5
assert window.reset_in_ms == 0 # resets_at == now
assert window.source == "local_jsonl_estimate"
assert window.confidence == "low"
def test_build_window_uses_oldest_active_event_for_local_reset() -> None:
now = _now_naive()
oldest_event = now - timedelta(hours=2, minutes=15)
window = _build_window({}, now, oldest_event_ts=oldest_event)
expected_reset = oldest_event + timedelta(hours=5)
expected_ms = int((expected_reset - now).total_seconds() * 1000)
assert window.started_at == oldest_event
assert window.resets_at == expected_reset
assert abs(window.reset_in_ms - expected_ms) < 1000
assert window.reset_in_ms > 0
assert window.source == "local_jsonl_estimate"
assert window.confidence == "low"
def test_build_window_uses_gateway_status() -> None:
now = _now_naive()
started = now - timedelta(hours=3)
resets = now + timedelta(hours=2)
status_raw = {
"windowStart": started.isoformat() + "Z",
"windowEnd": resets.isoformat() + "Z",
}
window = _build_window(status_raw, now)
assert abs(window.reset_in_ms - 2 * 3600 * 1000) < 5000 # within 5 seconds
assert window.source == "provider_native"
assert window.confidence == "high"
def test_build_window_uses_ratelimit_reset_header_when_available() -> None:
now = _now_naive()
status_raw = {
"x_ratelimit_reset": "1800", # delta seconds
}
window = _build_window(status_raw, now)
assert abs(window.reset_in_ms - 1_800_000) < 5000
assert window.source == "provider_api_rate_limit"
assert window.confidence == "medium"
# ---------------------------------------------------------------------------
# _compute_burn_rate
# ---------------------------------------------------------------------------
def test_compute_burn_rate_recent_sessions() -> None:
now = _now_naive()
recent = (now - timedelta(minutes=30)).isoformat() + "Z"
sessions = [
{"updatedAt": recent, "usage": {"input_tokens": 6000, "output_tokens": 0}, "cost": 0.018},
]
window = RuntimeUsageWindow(
key="5h",
started_at=now - timedelta(hours=5),
resets_at=now,
reset_in_ms=0,
)
burn = _compute_burn_rate(sessions, window, now)
assert burn.tokens_per_minute == pytest.approx(6000 / 60, abs=1)
assert burn.cost_usd_per_minute == pytest.approx(0.018 / 60, abs=1e-6)
def test_compute_burn_rate_no_recent_sessions() -> None:
now = _now_naive()
old = (now - timedelta(hours=3)).isoformat() + "Z"
sessions = [{"updatedAt": old, "usage": {"input_tokens": 1000, "output_tokens": 0}, "cost": 0.01}]
window = RuntimeUsageWindow(key="5h", started_at=now - timedelta(hours=5), resets_at=now, reset_in_ms=0)
burn = _compute_burn_rate(sessions, window, now)
assert burn.tokens_per_minute == 0.0
assert burn.cost_usd_per_minute == 0.0
# ---------------------------------------------------------------------------
# _build_predictions
# ---------------------------------------------------------------------------
def _make_window(reset_in_ms: int) -> RuntimeUsageWindow:
now = _now_naive()
return RuntimeUsageWindow(
key="5h",
started_at=now - timedelta(hours=5),
resets_at=now + timedelta(milliseconds=reset_in_ms),
reset_in_ms=reset_in_ms,
)
def test_build_predictions_no_limit() -> None:
current = RuntimeUsageCurrent(total_cost_usd=1.0, total_tokens=5000, total_calls=10)
burn = RuntimeUsageBurnRate(tokens_per_minute=100.0, cost_usd_per_minute=0.01)
window = _make_window(reset_in_ms=60_000)
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms is None
assert pred.safe is True
def test_build_predictions_safe() -> None:
current = RuntimeUsageCurrent(
total_cost_usd=1.0, total_tokens=10_000, total_calls=5,
token_limit=100_000, # 90k remaining
)
burn = RuntimeUsageBurnRate(tokens_per_minute=100.0, cost_usd_per_minute=0.01)
# 90k tokens @ 100/min = 900 minutes = 54,000,000 ms
# reset in 30 minutes = 1,800,000 ms → safe=True
window = _make_window(reset_in_ms=1_800_000)
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms is not None
assert pred.time_to_limit_ms > 1_800_000
assert pred.safe is True
def test_build_predictions_unsafe() -> None:
current = RuntimeUsageCurrent(
total_cost_usd=1.0, total_tokens=95_000, total_calls=5,
token_limit=100_000, # only 5k left
)
burn = RuntimeUsageBurnRate(tokens_per_minute=1000.0, cost_usd_per_minute=0.05)
# 5k tokens @ 1000/min = 5 minutes = 300,000 ms
# reset in 30 minutes = 1,800,000 ms → safe=False (will hit limit before reset)
window = _make_window(reset_in_ms=1_800_000)
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms is not None
assert pred.time_to_limit_ms < 1_800_000
assert pred.safe is False
def test_build_predictions_already_over_limit() -> None:
current = RuntimeUsageCurrent(
total_cost_usd=5.0, total_tokens=110_000, total_calls=20,
token_limit=100_000,
)
burn = RuntimeUsageBurnRate(tokens_per_minute=500.0, cost_usd_per_minute=0.05)
window = _make_window(reset_in_ms=1_800_000)
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms == 0
assert pred.safe is False
# ---------------------------------------------------------------------------
# Phase 4: typed-limit predictions
# ---------------------------------------------------------------------------
def test_build_predictions_uses_output_token_limit() -> None:
"""Output limit uses output tokens + output burn rate — not total tokens."""
current = RuntimeUsageCurrent(
total_cost_usd=1.0,
total_tokens=50_000,
total_output_tokens=5_000,
total_calls=10,
output_token_limit=10_000, # 5k out remaining
)
burn = RuntimeUsageBurnRate(
tokens_per_minute=1000.0,
output_tokens_per_minute=100.0, # 5k / 100/min = 50 min = 3,000,000 ms
cost_usd_per_minute=0.01,
)
window = _make_window(reset_in_ms=30 * 60 * 1000) # 30 min reset — resets before limit
pred = _build_predictions(current, burn, window)
assert pred.limit_kind == "output_tokens"
assert pred.time_to_limit_ms is not None
assert abs(pred.time_to_limit_ms - 3_000_000) < 10_000 # ~50 min
assert pred.safe is True # limit at 50 min, reset at 30 min → reset saves us
def test_build_predictions_prefers_output_over_total_when_tighter() -> None:
"""When both output and total limits are set, picks the one that hits first."""
current = RuntimeUsageCurrent(
total_cost_usd=0.5,
total_tokens=90_000,
total_output_tokens=9_000,
total_calls=5,
output_token_limit=10_000, # 1k out remaining → hits in 10 min
total_token_limit=200_000, # 110k total remaining → hits in 110 min
)
burn = RuntimeUsageBurnRate(
tokens_per_minute=1000.0,
output_tokens_per_minute=100.0,
cost_usd_per_minute=0.005,
)
window = _make_window(reset_in_ms=2 * 60 * 60 * 1000)
pred = _build_predictions(current, burn, window)
assert pred.limit_kind == "output_tokens"
assert pred.time_to_limit_ms is not None
assert pred.time_to_limit_ms < 15 * 60 * 1000 # well under 15 min
def test_build_predictions_message_limit() -> None:
"""Message limit uses call count over the window — never token totals."""
current = RuntimeUsageCurrent(
total_cost_usd=0.5,
total_tokens=5_000,
total_output_tokens=2_000,
total_calls=400,
message_limit=500, # 100 calls remaining
)
burn = RuntimeUsageBurnRate(tokens_per_minute=50.0, output_tokens_per_minute=20.0, cost_usd_per_minute=0.005)
# 400 calls over a 5h window → ~400/(5*60) = 1.33 calls/min → 100/1.33 ≈ 75 min
window = _make_window(reset_in_ms=60 * 60 * 1000) # 1h reset — resets before limit
pred = _build_predictions(current, burn, window)
assert pred.limit_kind == "messages"
assert pred.time_to_limit_ms is not None
assert pred.time_to_limit_ms > 0
# safe: limit hits at ~75 min, reset at 60 min → reset saves us
assert pred.safe is True
# ---------------------------------------------------------------------------
# Phase 5: explicit cost preference
# ---------------------------------------------------------------------------
def test_get_explicit_cost_usage_cost_total() -> None:
"""usage.cost.total (reference dashboard format) wins."""
session = {
"cost": 0.001, # top-level: 0.001
"usage": {"cost": {"total": 0.050}}, # nested: 0.050
}
assert _get_explicit_cost(session) == pytest.approx(0.050)
def test_get_explicit_cost_usage_cost_flat() -> None:
"""usage.cost as a plain float is used when no .total sub-key."""
session = {"usage": {"cost": 0.030}}
assert _get_explicit_cost(session) == pytest.approx(0.030)
def test_get_explicit_cost_top_level_fallback() -> None:
session = {"cost": 0.012}
assert _get_explicit_cost(session) == pytest.approx(0.012)
def test_get_explicit_cost_costusd_variant() -> None:
session = {"costUsd": 0.007}
assert _get_explicit_cost(session) == pytest.approx(0.007)
def test_get_explicit_cost_missing_returns_zero() -> None:
assert _get_explicit_cost({}) == 0.0
assert _get_explicit_cost({"usage": {}}) == 0.0
def test_aggregate_explicit_cost_wins_over_local_estimate() -> None:
"""When a session carries explicit usage.cost.total, that value is used
verbatim local pricing should not overwrite it."""
session = {
"sessionId": "explicit-sess",
"provider": "anthropic",
"model": "claude-sonnet-4-6",
# Tiny token counts would produce ~$0 local estimate
"usage": {
"input_tokens": 1,
"output_tokens": 1,
"cost": {"total": 9.99}, # explicit provider cost
},
"calls": 1,
}
per_model = aggregate_per_model([session])
entry = per_model["anthropic/claude-sonnet-4-6"]
# Must use explicit 9.99, not local estimate of ~0
assert entry.cost_usd == pytest.approx(9.99, abs=0.001)
assert not entry.unpriced
# ---------------------------------------------------------------------------
# Phase 5: pricing regression tests
# ---------------------------------------------------------------------------
def test_opus_cache_write_price_is_18_75() -> None:
"""Opus cache write = $18.75/MTok (5× input), not the old $3.75."""
cost, unpriced = estimate_cost(
"anthropic", "claude-opus-4-7",
input_tokens=0, output_tokens=0,
cache_read_tokens=0, cache_write_tokens=1_000_000,
)
assert not unpriced
assert abs(cost - 18.75) < 0.01
def test_gpt41_mini_is_priced() -> None:
cost, unpriced = estimate_cost("openai", "gpt-4.1-mini", 1_000_000, 1_000_000)
assert not unpriced
# $0.40 input + $1.60 output = $2.00
assert abs(cost - 2.00) < 0.01
def test_unknown_anthropic_model_unpriced() -> None:
_, unpriced = estimate_cost("anthropic", "claude-99-ultra", 1_000, 1_000)
assert unpriced, "Unknown paid model must set unpriced=True, not silently return $0"
# ---------------------------------------------------------------------------
# Phase 5: pricing override file shapes
# ---------------------------------------------------------------------------
def _reset_pricing_cache() -> None:
import app.services.openclaw.runtime_usage as m
m._pricing_cache = None
def test_load_pricing_override_direct_shape(tmp_path: "pytest.TempPathFactory") -> None:
override = {
"anthropic/claude-test-model": {
"input": 99.0, "output": 199.0, "cache_read": 9.9, "cache_write": 49.5,
}
}
f = tmp_path / "pricing.json"
f.write_text(json.dumps(override))
_reset_pricing_cache()
try:
os.environ["RUNTIME_USAGE_PRICING_FILE"] = str(f)
pricing = load_pricing()
assert "anthropic/claude-test-model" in pricing
assert pricing["anthropic/claude-test-model"]["input"] == 99.0
finally:
del os.environ["RUNTIME_USAGE_PRICING_FILE"]
_reset_pricing_cache()
def test_load_pricing_override_rates_per_million_shape(tmp_path: "pytest.TempPathFactory") -> None:
"""The reference dashboard wraps overrides under rates_usd_per_million."""
override = {
"rates_usd_per_million": {
"anthropic/claude-test-model-2": {
"input": 55.0, "output": 110.0, "cache_read": 5.5, "cache_write": 13.75,
}
}
}
f = tmp_path / "pricing_wrapped.json"
f.write_text(json.dumps(override))
_reset_pricing_cache()
try:
os.environ["RUNTIME_USAGE_PRICING_FILE"] = str(f)
pricing = load_pricing()
assert "anthropic/claude-test-model-2" in pricing
assert pricing["anthropic/claude-test-model-2"]["output"] == 110.0
# Defaults must still be present
assert "anthropic/claude-sonnet-4-6" in pricing
finally:
del os.environ["RUNTIME_USAGE_PRICING_FILE"]
_reset_pricing_cache()
def test_load_pricing_override_does_not_overwrite_explicit_gateway_cost() -> None:
"""Local pricing override must not affect how explicit costs are read from sessions."""
# This is a logical check: aggregate uses _get_explicit_cost first, so even a
# wildly wrong pricing table cannot corrupt sessions that carry explicit costs.
session = {
"provider": "anthropic",
"model": "claude-sonnet-4-6",
"usage": {"input_tokens": 1_000_000, "output_tokens": 1_000_000, "cost": {"total": 0.0001}},
"calls": 1,
}
per_model = aggregate_per_model([session])
entry = per_model["anthropic/claude-sonnet-4-6"]
# $0.0001 explicit wins over the $18 local estimate for 1M in + 1M out
assert entry.cost_usd == pytest.approx(0.0001, abs=1e-6)