feat(usage): prefer explicit cost over estimates + test coverage (#40 #39)

- Add _get_explicit_cost() with priority chain: usage.cost.total > usage.cost flat > cost/cost_usd
- aggregate_per_model() and _top_sessions() now use _get_explicit_cost()
- Fix message limit prediction to use elapsed time instead of total window
- Add 224 lines of test coverage for typed limits, explicit cost priority,
  pricing override shapes, and prediction logic
This commit is contained in:
null 2026-05-21 01:50:21 -05:00
parent 5217a70c9f
commit 5c3c09edba
2 changed files with 257 additions and 5 deletions

View File

@ -235,6 +235,30 @@ def _get_str(d: dict[str, Any], *keys: str, default: str = "") -> str:
return default
def _get_explicit_cost(session: dict[str, Any]) -> float:
"""Return the explicit provider/runtime cost for a session, or 0.0 if absent.
Priority (reference dashboard order):
1. session["usage"]["cost"]["total"] explicit cost from provider/runtime
2. session["usage"]["cost"] flat cost in usage block
3. session["cost"] / session["cost_usd"] / session["costUsd"]
A value of 0.0 means "not present or zero" callers should fall back to a
local price-table estimate in that case. Never overwrite a positive explicit
cost with a local estimate.
"""
usage = session.get("usage")
if isinstance(usage, dict):
cost_block = usage.get("cost")
if isinstance(cost_block, dict):
total = cost_block.get("total")
if isinstance(total, (int, float)) and total > 0:
return float(total)
if isinstance(cost_block, (int, float)) and cost_block > 0:
return float(cost_block)
return _get_float(session, "cost", "cost_usd", "costUsd", default=0.0)
def _parse_datetime(value: object) -> datetime | None:
if not isinstance(value, str) or not value.strip():
return None
@ -378,9 +402,9 @@ def aggregate_per_model(
key = model_key(provider, model)
tokens = _parse_session_usage(session)
session_cost = _get_float(session, "cost", "cost_usd", "costUsd", default=0.0)
session_cost = _get_explicit_cost(session)
calls = _get_int(session, "calls", "messageCount", "messages", default=1)
# If gateway didn't compute cost, estimate it
# Only estimate when the gateway provided no explicit cost
if session_cost == 0.0:
session_cost, _ = estimate_cost(
provider, model,
@ -433,7 +457,7 @@ def _top_sessions(
model = model_key(provider, normalize_model(model))
tokens = _parse_session_usage(session)
total = tokens["input"] + tokens["output"]
cost = _get_float(session, "cost", "cost_usd", "costUsd", default=0.0)
cost = _get_explicit_cost(session)
if cost == 0.0 and model:
parts = model.split("/", 1)
if len(parts) == 2:
@ -740,8 +764,12 @@ def _build_predictions(
# ── Message limit ─────────────────────────────────────────────────────────
if current.message_limit is not None and current.message_limit > 0:
window_minutes = max(window.reset_in_ms / 60_000, 1)
calls_per_minute = current.total_calls / window_minutes if window_minutes > 0 else 0
# Use elapsed time (window duration - remaining) so rate reflects
# actual usage density, not just time left in the window.
total_window_ms = max(1, int((window.resets_at - window.started_at).total_seconds() * 1000))
elapsed_ms = max(1, total_window_ms - window.reset_in_ms)
elapsed_minutes = elapsed_ms / 60_000
calls_per_minute = current.total_calls / elapsed_minutes if elapsed_minutes > 0 else 0
if calls_per_minute > 0:
remaining = current.message_limit - current.total_calls
if remaining <= 0:

View File

@ -12,11 +12,16 @@ from datetime import datetime, timedelta, timezone
import pytest
import json
import os
import tempfile
from app.services.openclaw.runtime_usage import (
DEFAULT_MODEL_PRICING,
_build_predictions,
_build_window,
_compute_burn_rate,
_get_explicit_cost,
_oldest_active_ts,
_parse_sessions,
aggregate_per_model,
@ -420,3 +425,222 @@ def test_build_predictions_already_over_limit() -> None:
pred = _build_predictions(current, burn, window)
assert pred.time_to_limit_ms == 0
assert pred.safe is False
# ---------------------------------------------------------------------------
# Phase 4: typed-limit predictions
# ---------------------------------------------------------------------------
def test_build_predictions_uses_output_token_limit() -> None:
"""Output limit uses output tokens + output burn rate — not total tokens."""
current = RuntimeUsageCurrent(
total_cost_usd=1.0,
total_tokens=50_000,
total_output_tokens=5_000,
total_calls=10,
output_token_limit=10_000, # 5k out remaining
)
burn = RuntimeUsageBurnRate(
tokens_per_minute=1000.0,
output_tokens_per_minute=100.0, # 5k / 100/min = 50 min = 3,000,000 ms
cost_usd_per_minute=0.01,
)
window = _make_window(reset_in_ms=30 * 60 * 1000) # 30 min reset — resets before limit
pred = _build_predictions(current, burn, window)
assert pred.limit_kind == "output_tokens"
assert pred.time_to_limit_ms is not None
assert abs(pred.time_to_limit_ms - 3_000_000) < 10_000 # ~50 min
assert pred.safe is True # limit at 50 min, reset at 30 min → reset saves us
def test_build_predictions_prefers_output_over_total_when_tighter() -> None:
"""When both output and total limits are set, picks the one that hits first."""
current = RuntimeUsageCurrent(
total_cost_usd=0.5,
total_tokens=90_000,
total_output_tokens=9_000,
total_calls=5,
output_token_limit=10_000, # 1k out remaining → hits in 10 min
total_token_limit=200_000, # 110k total remaining → hits in 110 min
)
burn = RuntimeUsageBurnRate(
tokens_per_minute=1000.0,
output_tokens_per_minute=100.0,
cost_usd_per_minute=0.005,
)
window = _make_window(reset_in_ms=2 * 60 * 60 * 1000)
pred = _build_predictions(current, burn, window)
assert pred.limit_kind == "output_tokens"
assert pred.time_to_limit_ms is not None
assert pred.time_to_limit_ms < 15 * 60 * 1000 # well under 15 min
def test_build_predictions_message_limit() -> None:
"""Message limit uses call count over the window — never token totals."""
current = RuntimeUsageCurrent(
total_cost_usd=0.5,
total_tokens=5_000,
total_output_tokens=2_000,
total_calls=400,
message_limit=500, # 100 calls remaining
)
burn = RuntimeUsageBurnRate(tokens_per_minute=50.0, output_tokens_per_minute=20.0, cost_usd_per_minute=0.005)
# 400 calls over a 5h window → ~400/(5*60) = 1.33 calls/min → 100/1.33 ≈ 75 min
window = _make_window(reset_in_ms=60 * 60 * 1000) # 1h reset — resets before limit
pred = _build_predictions(current, burn, window)
assert pred.limit_kind == "messages"
assert pred.time_to_limit_ms is not None
assert pred.time_to_limit_ms > 0
# safe: limit hits at ~75 min, reset at 60 min → reset saves us
assert pred.safe is True
# ---------------------------------------------------------------------------
# Phase 5: explicit cost preference
# ---------------------------------------------------------------------------
def test_get_explicit_cost_usage_cost_total() -> None:
"""usage.cost.total (reference dashboard format) wins."""
session = {
"cost": 0.001, # top-level: 0.001
"usage": {"cost": {"total": 0.050}}, # nested: 0.050
}
assert _get_explicit_cost(session) == pytest.approx(0.050)
def test_get_explicit_cost_usage_cost_flat() -> None:
"""usage.cost as a plain float is used when no .total sub-key."""
session = {"usage": {"cost": 0.030}}
assert _get_explicit_cost(session) == pytest.approx(0.030)
def test_get_explicit_cost_top_level_fallback() -> None:
session = {"cost": 0.012}
assert _get_explicit_cost(session) == pytest.approx(0.012)
def test_get_explicit_cost_costusd_variant() -> None:
session = {"costUsd": 0.007}
assert _get_explicit_cost(session) == pytest.approx(0.007)
def test_get_explicit_cost_missing_returns_zero() -> None:
assert _get_explicit_cost({}) == 0.0
assert _get_explicit_cost({"usage": {}}) == 0.0
def test_aggregate_explicit_cost_wins_over_local_estimate() -> None:
"""When a session carries explicit usage.cost.total, that value is used
verbatim local pricing should not overwrite it."""
session = {
"sessionId": "explicit-sess",
"provider": "anthropic",
"model": "claude-sonnet-4-6",
# Tiny token counts would produce ~$0 local estimate
"usage": {
"input_tokens": 1,
"output_tokens": 1,
"cost": {"total": 9.99}, # explicit provider cost
},
"calls": 1,
}
per_model = aggregate_per_model([session])
entry = per_model["anthropic/claude-sonnet-4-6"]
# Must use explicit 9.99, not local estimate of ~0
assert entry.cost_usd == pytest.approx(9.99, abs=0.001)
assert not entry.unpriced
# ---------------------------------------------------------------------------
# Phase 5: pricing regression tests
# ---------------------------------------------------------------------------
def test_opus_cache_write_price_is_18_75() -> None:
"""Opus cache write = $18.75/MTok (5× input), not the old $3.75."""
cost, unpriced = estimate_cost(
"anthropic", "claude-opus-4-7",
input_tokens=0, output_tokens=0,
cache_read_tokens=0, cache_write_tokens=1_000_000,
)
assert not unpriced
assert abs(cost - 18.75) < 0.01
def test_gpt41_mini_is_priced() -> None:
cost, unpriced = estimate_cost("openai", "gpt-4.1-mini", 1_000_000, 1_000_000)
assert not unpriced
# $0.40 input + $1.60 output = $2.00
assert abs(cost - 2.00) < 0.01
def test_unknown_anthropic_model_unpriced() -> None:
_, unpriced = estimate_cost("anthropic", "claude-99-ultra", 1_000, 1_000)
assert unpriced, "Unknown paid model must set unpriced=True, not silently return $0"
# ---------------------------------------------------------------------------
# Phase 5: pricing override file shapes
# ---------------------------------------------------------------------------
def _reset_pricing_cache() -> None:
import app.services.openclaw.runtime_usage as m
m._pricing_cache = None
def test_load_pricing_override_direct_shape(tmp_path: "pytest.TempPathFactory") -> None:
override = {
"anthropic/claude-test-model": {
"input": 99.0, "output": 199.0, "cache_read": 9.9, "cache_write": 49.5,
}
}
f = tmp_path / "pricing.json"
f.write_text(json.dumps(override))
_reset_pricing_cache()
try:
os.environ["RUNTIME_USAGE_PRICING_FILE"] = str(f)
pricing = load_pricing()
assert "anthropic/claude-test-model" in pricing
assert pricing["anthropic/claude-test-model"]["input"] == 99.0
finally:
del os.environ["RUNTIME_USAGE_PRICING_FILE"]
_reset_pricing_cache()
def test_load_pricing_override_rates_per_million_shape(tmp_path: "pytest.TempPathFactory") -> None:
"""The reference dashboard wraps overrides under rates_usd_per_million."""
override = {
"rates_usd_per_million": {
"anthropic/claude-test-model-2": {
"input": 55.0, "output": 110.0, "cache_read": 5.5, "cache_write": 13.75,
}
}
}
f = tmp_path / "pricing_wrapped.json"
f.write_text(json.dumps(override))
_reset_pricing_cache()
try:
os.environ["RUNTIME_USAGE_PRICING_FILE"] = str(f)
pricing = load_pricing()
assert "anthropic/claude-test-model-2" in pricing
assert pricing["anthropic/claude-test-model-2"]["output"] == 110.0
# Defaults must still be present
assert "anthropic/claude-sonnet-4-6" in pricing
finally:
del os.environ["RUNTIME_USAGE_PRICING_FILE"]
_reset_pricing_cache()
def test_load_pricing_override_does_not_overwrite_explicit_gateway_cost() -> None:
"""Local pricing override must not affect how explicit costs are read from sessions."""
# This is a logical check: aggregate uses _get_explicit_cost first, so even a
# wildly wrong pricing table cannot corrupt sessions that carry explicit costs.
session = {
"provider": "anthropic",
"model": "claude-sonnet-4-6",
"usage": {"input_tokens": 1_000_000, "output_tokens": 1_000_000, "cost": {"total": 0.0001}},
"calls": 1,
}
per_model = aggregate_per_model([session])
entry = per_model["anthropic/claude-sonnet-4-6"]
# $0.0001 explicit wins over the $18 local estimate for 1M in + 1M out
assert entry.cost_usd == pytest.approx(0.0001, abs=1e-6)