Pipeline/backend/tests/test_runtime_usage_service.py

647 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# ruff: noqa: INP001
"""Unit tests for runtime_usage service helpers.
Tests cover provider/model normalisation, cost estimation, session parsing,
per-model aggregation, window building, burn rate, and predictions.
No gateway connection is required.
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import pytest
import json
import os
import tempfile
from app.services.openclaw.runtime_usage import (
DEFAULT_MODEL_PRICING,
_build_predictions,
_build_window,
_compute_burn_rate,
_get_explicit_cost,
_oldest_active_ts,
_parse_sessions,
aggregate_per_model,
estimate_cost,
load_pricing,
model_key,
normalize_model,
normalize_provider,
)
from app.schemas.runtime_usage import RuntimeUsageBurnRate, RuntimeUsageCurrent, RuntimeUsageWindow
# ---------------------------------------------------------------------------
# normalize_provider
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"raw, expected",
[
("anthropic", "anthropic"),
("Anthropic", "anthropic"),
("claude", "anthropic"),
("CLAUDE", "anthropic"),
("openai", "openai"),
("OpenAI", "openai"),
("codex", "openai"),
("ollama", "ollama"),
("local", "ollama"),
("gemini", "google"),
("", "unknown"),
(" ", "unknown"),
("custom-provider", "custom-provider"),
],
)
def test_normalize_provider(raw: str, expected: str) -> None:
assert normalize_provider(raw) == expected
# ---------------------------------------------------------------------------
# normalize_model
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"raw, expected",
[
("claude-sonnet-4-6", "claude-sonnet-4-6"),
("claude-sonnet-4-6-20250219", "claude-sonnet-4-6"),
("claude-3-5-sonnet-20241022", "claude-3-5-sonnet"),
("anthropic/claude-opus-4-7", "claude-opus-4-7"),
("gpt-4o-2024-05-13", "gpt-4o"),
("gpt-4o-mini", "gpt-4o-mini"),
("claude-3-haiku-20240307", "claude-3-haiku"),
("llama3:latest", "llama3:latest"), # local model — strip :latest via re
("o1-preview", "o1"),
("gpt-4-turbo-preview", "gpt-4-turbo"),
],
)
def test_normalize_model(raw: str, expected: str) -> None:
result = normalize_model(raw)
# We only guarantee the date-stamp is stripped; allow minor variation
assert expected in result or result == expected
# ---------------------------------------------------------------------------
# model_key
# ---------------------------------------------------------------------------
def test_model_key() -> None:
assert model_key("anthropic", "claude-sonnet-4-6") == "anthropic/claude-sonnet-4-6"
# ---------------------------------------------------------------------------
# estimate_cost
# ---------------------------------------------------------------------------
def test_estimate_cost_known_model() -> None:
cost, unpriced = estimate_cost("anthropic", "claude-sonnet-4-6", 1_000_000, 1_000_000)
assert not unpriced
# 1M input @ $3 + 1M output @ $15 = $18
assert abs(cost - 18.0) < 0.01
def test_estimate_cost_with_cache_tokens() -> None:
cost, unpriced = estimate_cost(
"anthropic", "claude-sonnet-4-6",
input_tokens=0, output_tokens=0,
cache_read_tokens=1_000_000, cache_write_tokens=1_000_000,
)
assert not unpriced
# $0.30 cache_read + $3.75 cache_write = $4.05
assert abs(cost - 4.05) < 0.01
def test_estimate_cost_ollama_is_free() -> None:
cost, unpriced = estimate_cost("ollama", "llama3", 100_000, 50_000)
assert cost == 0.0
assert not unpriced # Ollama is intentionally free, not unpriced
def test_estimate_cost_unknown_paid_model() -> None:
cost, unpriced = estimate_cost("anthropic", "claude-99-ultra", 1_000, 1_000)
assert cost == 0.0
assert unpriced # unknown model — must flag
def test_estimate_cost_zero_tokens() -> None:
cost, unpriced = estimate_cost("anthropic", "claude-haiku-4-5", 0, 0)
assert cost == 0.0
assert not unpriced
# ---------------------------------------------------------------------------
# load_pricing
# ---------------------------------------------------------------------------
def test_load_pricing_has_defaults() -> None:
pricing = load_pricing()
assert "anthropic/claude-sonnet-4-6" in pricing
assert "openai/gpt-4o" in pricing
def test_load_pricing_has_required_fields() -> None:
pricing = load_pricing()
for key, entry in pricing.items():
assert "input" in entry, f"{key} missing input"
assert "output" in entry, f"{key} missing output"
# ---------------------------------------------------------------------------
# _parse_sessions
# ---------------------------------------------------------------------------
_SESSION_A = {
"sessionId": "sess-a",
"provider": "anthropic",
"model": "claude-sonnet-4-6",
"usage": {"input_tokens": 1000, "output_tokens": 500},
"cost": 0.012,
"calls": 3,
"updatedAt": "2026-05-20T10:00:00Z",
}
_SESSION_B = {
"id": "sess-b",
"model": "gpt-4o",
"usage": {"inputTokens": 2000, "outputTokens": 800},
"costUsd": 0.013,
"calls": 2,
"updatedAt": "2026-05-20T09:00:00Z",
}
def test_parse_sessions_flat_list() -> None:
raw = {"sessions": [_SESSION_A, _SESSION_B]}
sessions = _parse_sessions(raw)
assert len(sessions) == 2
def test_parse_sessions_nested_5hour() -> None:
raw = {"5hour": {"sessions": [_SESSION_A]}}
sessions = _parse_sessions(raw)
assert len(sessions) == 1
def test_parse_sessions_empty() -> None:
assert _parse_sessions({}) == []
def test_parse_sessions_malformed_entries_skipped() -> None:
raw = {"sessions": [_SESSION_A, "bad-string", None, 42, _SESSION_B]}
sessions = _parse_sessions(raw)
assert len(sessions) == 2
# ---------------------------------------------------------------------------
# aggregate_per_model
# ---------------------------------------------------------------------------
def test_aggregate_per_model_basic() -> None:
per_model = aggregate_per_model([_SESSION_A], account_key="claude-default")
key = "anthropic/claude-sonnet-4-6"
assert key in per_model
entry = per_model[key]
assert entry.input_tokens == 1000
assert entry.output_tokens == 500
assert entry.total_tokens == 1500
assert entry.calls == 3
assert entry.provider == "anthropic"
assert entry.account_key == "claude-default"
assert not entry.unpriced
def test_aggregate_per_model_merges_same_model() -> None:
sessions = [_SESSION_A, {**_SESSION_A, "sessionId": "sess-c", "usage": {"input_tokens": 200, "output_tokens": 100}}]
per_model = aggregate_per_model(sessions)
entry = per_model["anthropic/claude-sonnet-4-6"]
assert entry.input_tokens == 1200
assert entry.output_tokens == 600
def test_aggregate_per_model_unknown_model_flagged() -> None:
session = {
"sessionId": "x",
"provider": "anthropic",
"model": "claude-99-ultra",
"usage": {"input_tokens": 100, "output_tokens": 50},
"calls": 1,
}
per_model = aggregate_per_model([session])
key = "anthropic/claude-99-ultra"
assert per_model[key].unpriced
def test_aggregate_per_model_ollama_not_flagged() -> None:
session = {
"sessionId": "y",
"provider": "ollama",
"model": "llama3",
"usage": {"input_tokens": 5000, "output_tokens": 2000},
"calls": 1,
}
per_model = aggregate_per_model([session])
entry = per_model["ollama/llama3"]
assert not entry.unpriced
assert entry.cost_usd == 0.0
# ---------------------------------------------------------------------------
# _oldest_active_ts
# ---------------------------------------------------------------------------
def test_oldest_active_ts_ignores_events_outside_window() -> None:
now = _now_naive()
sessions = [
{"updatedAt": (now - timedelta(hours=6)).isoformat() + "Z"}, # outside 5h window
{"updatedAt": (now - timedelta(hours=2, minutes=30)).isoformat() + "Z"},
{"updatedAt": (now - timedelta(hours=1)).isoformat() + "Z"},
]
oldest = _oldest_active_ts(sessions, now)
assert oldest is not None
assert abs((oldest - (now - timedelta(hours=2, minutes=30))).total_seconds()) < 2
def test_oldest_active_ts_returns_none_when_all_events_outside_window() -> None:
now = _now_naive()
sessions = [
{"updatedAt": (now - timedelta(hours=8)).isoformat() + "Z"},
{"updatedAt": (now - timedelta(hours=6)).isoformat() + "Z"},
]
assert _oldest_active_ts(sessions, now) is None
# ---------------------------------------------------------------------------
# _build_window
# ---------------------------------------------------------------------------
def _now_naive() -> datetime:
return datetime.now(timezone.utc).replace(tzinfo=None)
def test_build_window_falls_back_to_5h_rolling() -> None:
now = _now_naive()
window = _build_window({}, now)
assert window.key == "5h"
assert abs((now - window.started_at).total_seconds() - 5 * 3600) < 5
assert window.reset_in_ms == 0 # resets_at == now
assert window.source == "local_jsonl_estimate"
assert window.confidence == "low"
def test_build_window_uses_oldest_active_event_for_local_reset() -> None:
now = _now_naive()
oldest_event = now - timedelta(hours=2, minutes=15)
window = _build_window({}, now, oldest_event_ts=oldest_event)
expected_reset = oldest_event + timedelta(hours=5)
expected_ms = int((expected_reset - now).total_seconds() * 1000)
assert window.started_at == oldest_event
assert window.resets_at == expected_reset
assert abs(window.reset_in_ms - expected_ms) < 1000
assert window.reset_in_ms > 0
assert window.source == "local_jsonl_estimate"
assert window.confidence == "low"
def test_build_window_uses_gateway_status() -> None:
now = _now_naive()
started = now - timedelta(hours=3)
resets = now + timedelta(hours=2)
status_raw = {
"windowStart": started.isoformat() + "Z",
"windowEnd": resets.isoformat() + "Z",
}
window = _build_window(status_raw, now)
assert abs(window.reset_in_ms - 2 * 3600 * 1000) < 5000 # within 5 seconds
assert window.source == "provider_native"
assert window.confidence == "high"
def test_build_window_uses_ratelimit_reset_header_when_available() -> None:
now = _now_naive()
status_raw = {
"x_ratelimit_reset": "1800", # delta seconds
}
window = _build_window(status_raw, now)
assert abs(window.reset_in_ms - 1_800_000) < 5000
assert window.source == "provider_api_rate_limit"
assert window.confidence == "medium"
# ---------------------------------------------------------------------------
# _compute_burn_rate
# ---------------------------------------------------------------------------
def test_compute_burn_rate_recent_sessions() -> None:
now = _now_naive()
recent = (now - timedelta(minutes=30)).isoformat() + "Z"
sessions = [
{"updatedAt": recent, "usage": {"input_tokens": 6000, "output_tokens": 0}, "cost": 0.018},
]
window = RuntimeUsageWindow(
key="5h",
started_at=now - timedelta(hours=5),
resets_at=now,
reset_in_ms=0,
)
burn = _compute_burn_rate(sessions, window, now)
assert burn.tokens_per_minute == pytest.approx(6000 / 60, abs=1)
assert burn.cost_usd_per_minute == pytest.approx(0.018 / 60, abs=1e-6)
def test_compute_burn_rate_no_recent_sessions() -> None:
now = _now_naive()
old = (now - timedelta(hours=3)).isoformat() + "Z"
sessions = [{"updatedAt": old, "usage": {"input_tokens": 1000, "output_tokens": 0}, "cost": 0.01}]
window = RuntimeUsageWindow(key="5h", started_at=now - timedelta(hours=5), resets_at=now, reset_in_ms=0)
burn = _compute_burn_rate(sessions, window, now)
assert burn.tokens_per_minute == 0.0
assert burn.cost_usd_per_minute == 0.0
# ---------------------------------------------------------------------------
# _build_predictions
# ---------------------------------------------------------------------------
def _make_window(reset_in_ms: int) -> RuntimeUsageWindow:
now = _now_naive()
return RuntimeUsageWindow(
key="5h",
started_at=now - timedelta(hours=5),
resets_at=now + timedelta(milliseconds=reset_in_ms),
reset_in_ms=reset_in_ms,
)
def test_build_predictions_no_limit() -> None:
current = RuntimeUsageCurrent(total_cost_usd=1.0, total_tokens=5000, total_calls=10)
burn = RuntimeUsageBurnRate(tokens_per_minute=100.0, cost_usd_per_minute=0.01)
window = _make_window(reset_in_ms=60_000)
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms is None
assert pred.safe is True
def test_build_predictions_safe() -> None:
current = RuntimeUsageCurrent(
total_cost_usd=1.0, total_tokens=10_000, total_calls=5,
token_limit=100_000, # 90k remaining
)
burn = RuntimeUsageBurnRate(tokens_per_minute=100.0, cost_usd_per_minute=0.01)
# 90k tokens @ 100/min = 900 minutes = 54,000,000 ms
# reset in 30 minutes = 1,800,000 ms → safe=True
window = _make_window(reset_in_ms=1_800_000)
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms is not None
assert pred.time_to_limit_ms > 1_800_000
assert pred.safe is True
def test_build_predictions_unsafe() -> None:
current = RuntimeUsageCurrent(
total_cost_usd=1.0, total_tokens=95_000, total_calls=5,
token_limit=100_000, # only 5k left
)
burn = RuntimeUsageBurnRate(tokens_per_minute=1000.0, cost_usd_per_minute=0.05)
# 5k tokens @ 1000/min = 5 minutes = 300,000 ms
# reset in 30 minutes = 1,800,000 ms → safe=False (will hit limit before reset)
window = _make_window(reset_in_ms=1_800_000)
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms is not None
assert pred.time_to_limit_ms < 1_800_000
assert pred.safe is False
def test_build_predictions_already_over_limit() -> None:
current = RuntimeUsageCurrent(
total_cost_usd=5.0, total_tokens=110_000, total_calls=20,
token_limit=100_000,
)
burn = RuntimeUsageBurnRate(tokens_per_minute=500.0, cost_usd_per_minute=0.05)
window = _make_window(reset_in_ms=1_800_000)
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms == 0
assert pred.safe is False
# ---------------------------------------------------------------------------
# Phase 4: typed-limit predictions
# ---------------------------------------------------------------------------
def test_build_predictions_uses_output_token_limit() -> None:
"""Output limit uses output tokens + output burn rate — not total tokens."""
current = RuntimeUsageCurrent(
total_cost_usd=1.0,
total_tokens=50_000,
total_output_tokens=5_000,
total_calls=10,
output_token_limit=10_000, # 5k out remaining
)
burn = RuntimeUsageBurnRate(
tokens_per_minute=1000.0,
output_tokens_per_minute=100.0, # 5k / 100/min = 50 min = 3,000,000 ms
cost_usd_per_minute=0.01,
)
window = _make_window(reset_in_ms=30 * 60 * 1000) # 30 min reset — resets before limit
pred = _build_predictions(current, burn, window)
assert pred.limit_kind == "output_tokens"
assert pred.time_to_limit_ms is not None
assert abs(pred.time_to_limit_ms - 3_000_000) < 10_000 # ~50 min
assert pred.safe is True # limit at 50 min, reset at 30 min → reset saves us
def test_build_predictions_prefers_output_over_total_when_tighter() -> None:
"""When both output and total limits are set, picks the one that hits first."""
current = RuntimeUsageCurrent(
total_cost_usd=0.5,
total_tokens=90_000,
total_output_tokens=9_000,
total_calls=5,
output_token_limit=10_000, # 1k out remaining → hits in 10 min
total_token_limit=200_000, # 110k total remaining → hits in 110 min
)
burn = RuntimeUsageBurnRate(
tokens_per_minute=1000.0,
output_tokens_per_minute=100.0,
cost_usd_per_minute=0.005,
)
window = _make_window(reset_in_ms=2 * 60 * 60 * 1000)
pred = _build_predictions(current, burn, window)
assert pred.limit_kind == "output_tokens"
assert pred.time_to_limit_ms is not None
assert pred.time_to_limit_ms < 15 * 60 * 1000 # well under 15 min
def test_build_predictions_message_limit() -> None:
"""Message limit uses call count over the window — never token totals."""
current = RuntimeUsageCurrent(
total_cost_usd=0.5,
total_tokens=5_000,
total_output_tokens=2_000,
total_calls=400,
message_limit=500, # 100 calls remaining
)
burn = RuntimeUsageBurnRate(tokens_per_minute=50.0, output_tokens_per_minute=20.0, cost_usd_per_minute=0.005)
# 400 calls over a 5h window → ~400/(5*60) = 1.33 calls/min → 100/1.33 ≈ 75 min
window = _make_window(reset_in_ms=60 * 60 * 1000) # 1h reset — resets before limit
pred = _build_predictions(current, burn, window)
assert pred.limit_kind == "messages"
assert pred.time_to_limit_ms is not None
assert pred.time_to_limit_ms > 0
# safe: limit hits at ~75 min, reset at 60 min → reset saves us
assert pred.safe is True
# ---------------------------------------------------------------------------
# Phase 5: explicit cost preference
# ---------------------------------------------------------------------------
def test_get_explicit_cost_usage_cost_total() -> None:
"""usage.cost.total (reference dashboard format) wins."""
session = {
"cost": 0.001, # top-level: 0.001
"usage": {"cost": {"total": 0.050}}, # nested: 0.050
}
assert _get_explicit_cost(session) == pytest.approx(0.050)
def test_get_explicit_cost_usage_cost_flat() -> None:
"""usage.cost as a plain float is used when no .total sub-key."""
session = {"usage": {"cost": 0.030}}
assert _get_explicit_cost(session) == pytest.approx(0.030)
def test_get_explicit_cost_top_level_fallback() -> None:
session = {"cost": 0.012}
assert _get_explicit_cost(session) == pytest.approx(0.012)
def test_get_explicit_cost_costusd_variant() -> None:
session = {"costUsd": 0.007}
assert _get_explicit_cost(session) == pytest.approx(0.007)
def test_get_explicit_cost_missing_returns_zero() -> None:
assert _get_explicit_cost({}) == 0.0
assert _get_explicit_cost({"usage": {}}) == 0.0
def test_aggregate_explicit_cost_wins_over_local_estimate() -> None:
"""When a session carries explicit usage.cost.total, that value is used
verbatim — local pricing should not overwrite it."""
session = {
"sessionId": "explicit-sess",
"provider": "anthropic",
"model": "claude-sonnet-4-6",
# Tiny token counts would produce ~$0 local estimate
"usage": {
"input_tokens": 1,
"output_tokens": 1,
"cost": {"total": 9.99}, # explicit provider cost
},
"calls": 1,
}
per_model = aggregate_per_model([session])
entry = per_model["anthropic/claude-sonnet-4-6"]
# Must use explicit 9.99, not local estimate of ~0
assert entry.cost_usd == pytest.approx(9.99, abs=0.001)
assert not entry.unpriced
# ---------------------------------------------------------------------------
# Phase 5: pricing regression tests
# ---------------------------------------------------------------------------
def test_opus_cache_write_price_is_18_75() -> None:
"""Opus cache write = $18.75/MTok (5× input), not the old $3.75."""
cost, unpriced = estimate_cost(
"anthropic", "claude-opus-4-7",
input_tokens=0, output_tokens=0,
cache_read_tokens=0, cache_write_tokens=1_000_000,
)
assert not unpriced
assert abs(cost - 18.75) < 0.01
def test_gpt41_mini_is_priced() -> None:
cost, unpriced = estimate_cost("openai", "gpt-4.1-mini", 1_000_000, 1_000_000)
assert not unpriced
# $0.40 input + $1.60 output = $2.00
assert abs(cost - 2.00) < 0.01
def test_unknown_anthropic_model_unpriced() -> None:
_, unpriced = estimate_cost("anthropic", "claude-99-ultra", 1_000, 1_000)
assert unpriced, "Unknown paid model must set unpriced=True, not silently return $0"
# ---------------------------------------------------------------------------
# Phase 5: pricing override file shapes
# ---------------------------------------------------------------------------
def _reset_pricing_cache() -> None:
import app.services.openclaw.runtime_usage as m
m._pricing_cache = None
def test_load_pricing_override_direct_shape(tmp_path: "pytest.TempPathFactory") -> None:
override = {
"anthropic/claude-test-model": {
"input": 99.0, "output": 199.0, "cache_read": 9.9, "cache_write": 49.5,
}
}
f = tmp_path / "pricing.json"
f.write_text(json.dumps(override))
_reset_pricing_cache()
try:
os.environ["RUNTIME_USAGE_PRICING_FILE"] = str(f)
pricing = load_pricing()
assert "anthropic/claude-test-model" in pricing
assert pricing["anthropic/claude-test-model"]["input"] == 99.0
finally:
del os.environ["RUNTIME_USAGE_PRICING_FILE"]
_reset_pricing_cache()
def test_load_pricing_override_rates_per_million_shape(tmp_path: "pytest.TempPathFactory") -> None:
"""The reference dashboard wraps overrides under rates_usd_per_million."""
override = {
"rates_usd_per_million": {
"anthropic/claude-test-model-2": {
"input": 55.0, "output": 110.0, "cache_read": 5.5, "cache_write": 13.75,
}
}
}
f = tmp_path / "pricing_wrapped.json"
f.write_text(json.dumps(override))
_reset_pricing_cache()
try:
os.environ["RUNTIME_USAGE_PRICING_FILE"] = str(f)
pricing = load_pricing()
assert "anthropic/claude-test-model-2" in pricing
assert pricing["anthropic/claude-test-model-2"]["output"] == 110.0
# Defaults must still be present
assert "anthropic/claude-sonnet-4-6" in pricing
finally:
del os.environ["RUNTIME_USAGE_PRICING_FILE"]
_reset_pricing_cache()
def test_load_pricing_override_does_not_overwrite_explicit_gateway_cost() -> None:
"""Local pricing override must not affect how explicit costs are read from sessions."""
# This is a logical check: aggregate uses _get_explicit_cost first, so even a
# wildly wrong pricing table cannot corrupt sessions that carry explicit costs.
session = {
"provider": "anthropic",
"model": "claude-sonnet-4-6",
"usage": {"input_tokens": 1_000_000, "output_tokens": 1_000_000, "cost": {"total": 0.0001}},
"calls": 1,
}
per_model = aggregate_per_model([session])
entry = per_model["anthropic/claude-sonnet-4-6"]
# $0.0001 explicit wins over the $18 local estimate for 1M in + 1M out
assert entry.cost_usd == pytest.approx(0.0001, abs=1e-6)