Pipeline/backend/tests/test_runtime_usage_service.py

367 lines
12 KiB
Python
Raw Normal View History

# ruff: noqa: INP001
"""Unit tests for runtime_usage service helpers.
Tests cover provider/model normalisation, cost estimation, session parsing,
per-model aggregation, window building, burn rate, and predictions.
No gateway connection is required.
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import pytest
from app.services.openclaw.runtime_usage import (
DEFAULT_MODEL_PRICING,
_build_predictions,
_build_window,
_compute_burn_rate,
_parse_sessions,
aggregate_per_model,
estimate_cost,
load_pricing,
model_key,
normalize_model,
normalize_provider,
)
from app.schemas.runtime_usage import RuntimeUsageBurnRate, RuntimeUsageCurrent, RuntimeUsageWindow
# ---------------------------------------------------------------------------
# normalize_provider
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"raw, expected",
[
("anthropic", "anthropic"),
("Anthropic", "anthropic"),
("claude", "anthropic"),
("CLAUDE", "anthropic"),
("openai", "openai"),
("OpenAI", "openai"),
("codex", "openai"),
("ollama", "ollama"),
("local", "ollama"),
("gemini", "google"),
("", "unknown"),
(" ", "unknown"),
("custom-provider", "custom-provider"),
],
)
def test_normalize_provider(raw: str, expected: str) -> None:
assert normalize_provider(raw) == expected
# ---------------------------------------------------------------------------
# normalize_model
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
"raw, expected",
[
("claude-sonnet-4-6", "claude-sonnet-4-6"),
("claude-sonnet-4-6-20250219", "claude-sonnet-4-6"),
("claude-3-5-sonnet-20241022", "claude-3-5-sonnet"),
("anthropic/claude-opus-4-7", "claude-opus-4-7"),
("gpt-4o-2024-05-13", "gpt-4o"),
("gpt-4o-mini", "gpt-4o-mini"),
("claude-3-haiku-20240307", "claude-3-haiku"),
("llama3:latest", "llama3:latest"), # local model — strip :latest via re
("o1-preview", "o1"),
("gpt-4-turbo-preview", "gpt-4-turbo"),
],
)
def test_normalize_model(raw: str, expected: str) -> None:
result = normalize_model(raw)
# We only guarantee the date-stamp is stripped; allow minor variation
assert expected in result or result == expected
# ---------------------------------------------------------------------------
# model_key
# ---------------------------------------------------------------------------
def test_model_key() -> None:
assert model_key("anthropic", "claude-sonnet-4-6") == "anthropic/claude-sonnet-4-6"
# ---------------------------------------------------------------------------
# estimate_cost
# ---------------------------------------------------------------------------
def test_estimate_cost_known_model() -> None:
cost, unpriced = estimate_cost("anthropic", "claude-sonnet-4-6", 1_000_000, 1_000_000)
assert not unpriced
# 1M input @ $3 + 1M output @ $15 = $18
assert abs(cost - 18.0) < 0.01
def test_estimate_cost_with_cache_tokens() -> None:
cost, unpriced = estimate_cost(
"anthropic", "claude-sonnet-4-6",
input_tokens=0, output_tokens=0,
cache_read_tokens=1_000_000, cache_write_tokens=1_000_000,
)
assert not unpriced
# $0.30 cache_read + $3.75 cache_write = $4.05
assert abs(cost - 4.05) < 0.01
def test_estimate_cost_ollama_is_free() -> None:
cost, unpriced = estimate_cost("ollama", "llama3", 100_000, 50_000)
assert cost == 0.0
assert not unpriced # Ollama is intentionally free, not unpriced
def test_estimate_cost_unknown_paid_model() -> None:
cost, unpriced = estimate_cost("anthropic", "claude-99-ultra", 1_000, 1_000)
assert cost == 0.0
assert unpriced # unknown model — must flag
def test_estimate_cost_zero_tokens() -> None:
cost, unpriced = estimate_cost("anthropic", "claude-haiku-4-5", 0, 0)
assert cost == 0.0
assert not unpriced
# ---------------------------------------------------------------------------
# load_pricing
# ---------------------------------------------------------------------------
def test_load_pricing_has_defaults() -> None:
pricing = load_pricing()
assert "anthropic/claude-sonnet-4-6" in pricing
assert "openai/gpt-4o" in pricing
def test_load_pricing_has_required_fields() -> None:
pricing = load_pricing()
for key, entry in pricing.items():
assert "input" in entry, f"{key} missing input"
assert "output" in entry, f"{key} missing output"
# ---------------------------------------------------------------------------
# _parse_sessions
# ---------------------------------------------------------------------------
_SESSION_A = {
"sessionId": "sess-a",
"provider": "anthropic",
"model": "claude-sonnet-4-6",
"usage": {"input_tokens": 1000, "output_tokens": 500},
"cost": 0.012,
"calls": 3,
"updatedAt": "2026-05-20T10:00:00Z",
}
_SESSION_B = {
"id": "sess-b",
"model": "gpt-4o",
"usage": {"inputTokens": 2000, "outputTokens": 800},
"costUsd": 0.013,
"calls": 2,
"updatedAt": "2026-05-20T09:00:00Z",
}
def test_parse_sessions_flat_list() -> None:
raw = {"sessions": [_SESSION_A, _SESSION_B]}
sessions = _parse_sessions(raw)
assert len(sessions) == 2
def test_parse_sessions_nested_5hour() -> None:
raw = {"5hour": {"sessions": [_SESSION_A]}}
sessions = _parse_sessions(raw)
assert len(sessions) == 1
def test_parse_sessions_empty() -> None:
assert _parse_sessions({}) == []
def test_parse_sessions_malformed_entries_skipped() -> None:
raw = {"sessions": [_SESSION_A, "bad-string", None, 42, _SESSION_B]}
sessions = _parse_sessions(raw)
assert len(sessions) == 2
# ---------------------------------------------------------------------------
# aggregate_per_model
# ---------------------------------------------------------------------------
def test_aggregate_per_model_basic() -> None:
per_model = aggregate_per_model([_SESSION_A], account_key="claude-default")
key = "anthropic/claude-sonnet-4-6"
assert key in per_model
entry = per_model[key]
assert entry.input_tokens == 1000
assert entry.output_tokens == 500
assert entry.total_tokens == 1500
assert entry.calls == 3
assert entry.provider == "anthropic"
assert entry.account_key == "claude-default"
assert not entry.unpriced
def test_aggregate_per_model_merges_same_model() -> None:
sessions = [_SESSION_A, {**_SESSION_A, "sessionId": "sess-c", "usage": {"input_tokens": 200, "output_tokens": 100}}]
per_model = aggregate_per_model(sessions)
entry = per_model["anthropic/claude-sonnet-4-6"]
assert entry.input_tokens == 1200
assert entry.output_tokens == 600
def test_aggregate_per_model_unknown_model_flagged() -> None:
session = {
"sessionId": "x",
"provider": "anthropic",
"model": "claude-99-ultra",
"usage": {"input_tokens": 100, "output_tokens": 50},
"calls": 1,
}
per_model = aggregate_per_model([session])
key = "anthropic/claude-99-ultra"
assert per_model[key].unpriced
def test_aggregate_per_model_ollama_not_flagged() -> None:
session = {
"sessionId": "y",
"provider": "ollama",
"model": "llama3",
"usage": {"input_tokens": 5000, "output_tokens": 2000},
"calls": 1,
}
per_model = aggregate_per_model([session])
entry = per_model["ollama/llama3"]
assert not entry.unpriced
assert entry.cost_usd == 0.0
# ---------------------------------------------------------------------------
# _build_window
# ---------------------------------------------------------------------------
def _now_naive() -> datetime:
return datetime.now(timezone.utc).replace(tzinfo=None)
def test_build_window_falls_back_to_5h_rolling() -> None:
now = _now_naive()
window = _build_window({}, now)
assert window.key == "5h"
assert abs((now - window.started_at).total_seconds() - 5 * 3600) < 5
assert window.reset_in_ms == 0 # resets_at == now
def test_build_window_uses_gateway_status() -> None:
now = _now_naive()
started = now - timedelta(hours=3)
resets = now + timedelta(hours=2)
status_raw = {
"windowStart": started.isoformat() + "Z",
"windowEnd": resets.isoformat() + "Z",
}
window = _build_window(status_raw, now)
assert abs(window.reset_in_ms - 2 * 3600 * 1000) < 5000 # within 5 seconds
# ---------------------------------------------------------------------------
# _compute_burn_rate
# ---------------------------------------------------------------------------
def test_compute_burn_rate_recent_sessions() -> None:
now = _now_naive()
recent = (now - timedelta(minutes=30)).isoformat() + "Z"
sessions = [
{"updatedAt": recent, "usage": {"input_tokens": 6000, "output_tokens": 0}, "cost": 0.018},
]
window = RuntimeUsageWindow(
key="5h",
started_at=now - timedelta(hours=5),
resets_at=now,
reset_in_ms=0,
)
burn = _compute_burn_rate(sessions, window, now)
assert burn.tokens_per_minute == pytest.approx(6000 / 60, abs=1)
assert burn.cost_usd_per_minute == pytest.approx(0.018 / 60, abs=1e-6)
def test_compute_burn_rate_no_recent_sessions() -> None:
now = _now_naive()
old = (now - timedelta(hours=3)).isoformat() + "Z"
sessions = [{"updatedAt": old, "usage": {"input_tokens": 1000, "output_tokens": 0}, "cost": 0.01}]
window = RuntimeUsageWindow(key="5h", started_at=now - timedelta(hours=5), resets_at=now, reset_in_ms=0)
burn = _compute_burn_rate(sessions, window, now)
assert burn.tokens_per_minute == 0.0
assert burn.cost_usd_per_minute == 0.0
# ---------------------------------------------------------------------------
# _build_predictions
# ---------------------------------------------------------------------------
def _make_window(reset_in_ms: int) -> RuntimeUsageWindow:
now = _now_naive()
return RuntimeUsageWindow(
key="5h",
started_at=now - timedelta(hours=5),
resets_at=now + timedelta(milliseconds=reset_in_ms),
reset_in_ms=reset_in_ms,
)
def test_build_predictions_no_limit() -> None:
current = RuntimeUsageCurrent(total_cost_usd=1.0, total_tokens=5000, total_calls=10)
burn = RuntimeUsageBurnRate(tokens_per_minute=100.0, cost_usd_per_minute=0.01)
window = _make_window(reset_in_ms=60_000)
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms is None
assert pred.safe is True
def test_build_predictions_safe() -> None:
current = RuntimeUsageCurrent(
total_cost_usd=1.0, total_tokens=10_000, total_calls=5,
token_limit=100_000, # 90k remaining
)
burn = RuntimeUsageBurnRate(tokens_per_minute=100.0, cost_usd_per_minute=0.01)
# 90k tokens @ 100/min = 900 minutes = 54,000,000 ms
# reset in 30 minutes = 1,800,000 ms → safe=True
window = _make_window(reset_in_ms=1_800_000)
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms is not None
assert pred.time_to_limit_ms > 1_800_000
assert pred.safe is True
def test_build_predictions_unsafe() -> None:
current = RuntimeUsageCurrent(
total_cost_usd=1.0, total_tokens=95_000, total_calls=5,
token_limit=100_000, # only 5k left
)
burn = RuntimeUsageBurnRate(tokens_per_minute=1000.0, cost_usd_per_minute=0.05)
# 5k tokens @ 1000/min = 5 minutes = 300,000 ms
# reset in 30 minutes = 1,800,000 ms → safe=False (will hit limit before reset)
window = _make_window(reset_in_ms=1_800_000)
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms is not None
assert pred.time_to_limit_ms < 1_800_000
assert pred.safe is False
def test_build_predictions_already_over_limit() -> None:
current = RuntimeUsageCurrent(
total_cost_usd=5.0, total_tokens=110_000, total_calls=20,
token_limit=100_000,
)
burn = RuntimeUsageBurnRate(tokens_per_minute=500.0, cost_usd_per_minute=0.05)
window = _make_window(reset_in_ms=1_800_000)
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms == 0
assert pred.safe is False