Pipeline/backend/tests/test_runtime_usage_api.py

297 lines
11 KiB
Python
Raw Normal View History

# ruff: noqa: INP001
"""API integration tests for GET /api/v1/gateways/{gateway_id}/runtime-usage.
Uses an in-memory SQLite DB and patches `openclaw_call` to avoid real gateway
connections. Tests cover: success, gateway 404, org boundary, and graceful
degradation when the gateway RPC returns empty/error data.
"""
from __future__ import annotations
2026-05-21 04:25:31 -05:00
from datetime import UTC, datetime
from unittest.mock import AsyncMock, patch
from uuid import uuid4
import pytest
from fastapi import APIRouter, FastAPI
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlmodel import SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession # sqlmodel's AsyncSession has .exec()
from app import models as _models # noqa: F401 — registers SQLModel metadata
from app.api.gateways import router as gateways_router
from app.db.session import get_session
from app.models.gateways import Gateway
from app.models.organizations import Organization
from app.services.organizations import OrganizationContext
async def _make_engine() -> AsyncEngine:
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
return engine
def _make_session_maker(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
return async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
def _build_app(
session_maker: async_sessionmaker[AsyncSession],
org_id: UUID,
) -> FastAPI:
app = FastAPI()
api_v1 = APIRouter(prefix="/api/v1")
api_v1.include_router(gateways_router)
app.include_router(api_v1)
async def _override_session() -> AsyncSession:
async with session_maker() as s:
yield s
async def _override_org_member() -> OrganizationContext:
from app.models.organizations import Organization
org = Organization(id=org_id, name="test-org")
from app.models.organization_members import OrganizationMember
member = OrganizationMember(organization_id=org.id, user_id=uuid4(), role="admin")
return OrganizationContext(organization=org, member=member)
async def _override_org_admin() -> OrganizationContext:
from app.models.organizations import Organization
org = Organization(id=org_id, name="test-org")
from app.models.organization_members import OrganizationMember
member = OrganizationMember(organization_id=org.id, user_id=uuid4(), role="admin")
return OrganizationContext(organization=org, member=member)
from app.api.deps import require_org_admin, require_org_member
app.dependency_overrides[get_session] = _override_session
app.dependency_overrides[require_org_member] = _override_org_member
app.dependency_overrides[require_org_admin] = _override_org_admin
return app
async def _seed_gateway(session: AsyncSession, org_id: UUID) -> Gateway:
gateway = Gateway(
id=uuid4(),
organization_id=org_id,
name="test-gateway",
url="ws://localhost:18789",
token="test-token",
workspace_root="/tmp/test-workspace",
allow_insecure_tls=True,
disable_device_pairing=True,
)
session.add(gateway)
await session.commit()
return gateway
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_runtime_usage_empty_gateway_response() -> None:
"""Returns zeroed-out response gracefully when gateway RPC returns nothing."""
engine = await _make_engine()
session_maker = _make_session_maker(engine)
org_id = uuid4()
async with session_maker() as session:
gateway = await _seed_gateway(session, org_id)
app = _build_app(session_maker, org_id)
with patch(
"app.services.openclaw.runtime_usage.openclaw_call",
new_callable=AsyncMock,
return_value={},
):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get(f"/api/v1/gateways/{gateway.id}/runtime-usage")
assert response.status_code == 200
data = response.json()
assert data["gateway_id"] == str(gateway.id)
assert data["current"]["total_cost_usd"] == 0.0
assert data["current"]["total_tokens"] == 0
assert data["per_model"] == {}
assert data["top_sessions"] == []
assert data["predictions"]["safe"] is True
@pytest.mark.asyncio
async def test_runtime_usage_with_session_data() -> None:
"""Aggregates per-model usage correctly from a sessions list."""
engine = await _make_engine()
session_maker = _make_session_maker(engine)
org_id = uuid4()
async with session_maker() as session:
gateway = await _seed_gateway(session, org_id)
rpc_cost_response = {
"sessions": [
{
"sessionId": "sess-1",
"provider": "anthropic",
"model": "claude-sonnet-4-6",
"usage": {"input_tokens": 10000, "output_tokens": 5000},
"cost": 0.105,
"calls": 5,
"updatedAt": "2026-05-20T09:00:00Z",
},
{
"sessionId": "sess-2",
"provider": "anthropic",
"model": "claude-haiku-4-5",
"usage": {"input_tokens": 50000, "output_tokens": 20000},
"calls": 10,
"updatedAt": "2026-05-20T08:00:00Z",
},
]
}
app = _build_app(session_maker, org_id)
async def _mock_call(method: str, params=None, *, config): # noqa: ANN001
if method == "usage.cost":
return rpc_cost_response
return {}
with patch(
"app.services.openclaw.runtime_usage.openclaw_call",
side_effect=_mock_call,
):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get(f"/api/v1/gateways/{gateway.id}/runtime-usage")
assert response.status_code == 200
data = response.json()
per_model = data["per_model"]
assert "anthropic/claude-sonnet-4-6" in per_model
assert "anthropic/claude-haiku-4-5" in per_model
assert per_model["anthropic/claude-sonnet-4-6"]["input_tokens"] == 10000
assert per_model["anthropic/claude-sonnet-4-6"]["calls"] == 5
assert data["current"]["total_calls"] == 15
# At least one top session
assert len(data["top_sessions"]) >= 1
@pytest.mark.asyncio
async def test_runtime_usage_gateway_not_found() -> None:
"""Returns 404 when gateway_id does not belong to the org."""
engine = await _make_engine()
session_maker = _make_session_maker(engine)
org_id = uuid4()
other_gateway_id = str(uuid4())
app = _build_app(session_maker, org_id)
with patch(
"app.services.openclaw.runtime_usage.openclaw_call",
new_callable=AsyncMock,
return_value={},
):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get(f"/api/v1/gateways/{other_gateway_id}/runtime-usage")
assert response.status_code == 404
2026-05-21 04:25:31 -05:00
@pytest.mark.asyncio
async def test_claude_statusline_ingest_surfaces_provider_usage() -> None:
"""Status-line snapshots are returned as provider-native usage windows."""
engine = await _make_engine()
session_maker = _make_session_maker(engine)
org_id = uuid4()
async with session_maker() as session:
gateway = await _seed_gateway(session, org_id)
app = _build_app(session_maker, org_id)
future_reset = int(datetime(2099, 1, 1, tzinfo=UTC).timestamp())
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
post_response = await client.post(
f"/api/v1/gateways/{gateway.id}/provider-usage/claude/statusline",
json={
"session_id": "claude-session-1",
"rate_limits": {
"five_hour": {
"used_percentage": 77,
"resets_at": future_reset,
},
"seven_day": {
"used_percentage": 21,
"resets_at": future_reset,
},
},
},
)
get_response = await client.get(f"/api/v1/gateways/{gateway.id}/provider-usage")
assert post_response.status_code == 200
assert get_response.status_code == 200
data = get_response.json()
assert data["scraper_enabled"] is True
assert data["results"][0]["source_name"] == "claude_code_statusline"
by_key = {window["key"]: window for window in data["results"][0]["windows"]}
assert by_key["current_session"]["pct_used"] == 77
assert by_key["weekly_all_models"]["pct_used"] == 21
@pytest.mark.asyncio
async def test_runtime_usage_org_boundary() -> None:
"""A gateway created in a different org is not visible to another org."""
engine = await _make_engine()
session_maker = _make_session_maker(engine)
org_a = uuid4()
org_b = uuid4()
# Seed gateway under org_a, but build app with org_b credentials
async with session_maker() as session:
gateway = await _seed_gateway(session, org_a)
app = _build_app(session_maker, org_b) # authenticated as org_b
with patch(
"app.services.openclaw.runtime_usage.openclaw_call",
new_callable=AsyncMock,
return_value={},
):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get(f"/api/v1/gateways/{gateway.id}/runtime-usage")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_runtime_usage_rpc_error_degrades_gracefully() -> None:
"""A gateway RPC failure returns zeroed usage rather than 500."""
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
engine = await _make_engine()
session_maker = _make_session_maker(engine)
org_id = uuid4()
async with session_maker() as session:
gateway = await _seed_gateway(session, org_id)
app = _build_app(session_maker, org_id)
with patch(
"app.services.openclaw.runtime_usage.openclaw_call",
side_effect=OpenClawGatewayError("timeout"),
):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get(f"/api/v1/gateways/{gateway.id}/runtime-usage")
assert response.status_code == 200
data = response.json()
assert data["current"]["total_cost_usd"] == 0.0