2026-05-20 20:15:02 -05:00
|
|
|
# ruff: noqa: INP001
|
|
|
|
|
"""API integration tests for GET /api/v1/gateways/{gateway_id}/runtime-usage.
|
|
|
|
|
|
|
|
|
|
Uses an in-memory SQLite DB and patches `openclaw_call` to avoid real gateway
|
|
|
|
|
connections. Tests cover: success, gateway 404, org boundary, and graceful
|
|
|
|
|
degradation when the gateway RPC returns empty/error data.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
2026-05-21 04:25:31 -05:00
|
|
|
from datetime import UTC, datetime
|
2026-05-20 20:15:02 -05:00
|
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
|
from uuid import uuid4
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
from fastapi import APIRouter, FastAPI
|
|
|
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
|
|
|
|
|
from sqlmodel import SQLModel
|
|
|
|
|
from sqlmodel.ext.asyncio.session import AsyncSession # sqlmodel's AsyncSession has .exec()
|
|
|
|
|
|
|
|
|
|
from app import models as _models # noqa: F401 — registers SQLModel metadata
|
|
|
|
|
from app.api.gateways import router as gateways_router
|
|
|
|
|
from app.db.session import get_session
|
|
|
|
|
from app.models.gateways import Gateway
|
|
|
|
|
from app.models.organizations import Organization
|
|
|
|
|
from app.services.organizations import OrganizationContext
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _make_engine() -> AsyncEngine:
|
|
|
|
|
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
|
|
|
|
|
async with engine.begin() as conn:
|
|
|
|
|
await conn.run_sync(SQLModel.metadata.create_all)
|
|
|
|
|
return engine
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _make_session_maker(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
|
|
|
|
|
return async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _build_app(
|
|
|
|
|
session_maker: async_sessionmaker[AsyncSession],
|
|
|
|
|
org_id: UUID,
|
|
|
|
|
) -> FastAPI:
|
|
|
|
|
app = FastAPI()
|
|
|
|
|
api_v1 = APIRouter(prefix="/api/v1")
|
|
|
|
|
api_v1.include_router(gateways_router)
|
|
|
|
|
app.include_router(api_v1)
|
|
|
|
|
|
|
|
|
|
async def _override_session() -> AsyncSession:
|
|
|
|
|
async with session_maker() as s:
|
|
|
|
|
yield s
|
|
|
|
|
|
|
|
|
|
async def _override_org_member() -> OrganizationContext:
|
|
|
|
|
from app.models.organizations import Organization
|
|
|
|
|
org = Organization(id=org_id, name="test-org")
|
|
|
|
|
from app.models.organization_members import OrganizationMember
|
|
|
|
|
member = OrganizationMember(organization_id=org.id, user_id=uuid4(), role="admin")
|
|
|
|
|
return OrganizationContext(organization=org, member=member)
|
|
|
|
|
|
|
|
|
|
async def _override_org_admin() -> OrganizationContext:
|
|
|
|
|
from app.models.organizations import Organization
|
|
|
|
|
org = Organization(id=org_id, name="test-org")
|
|
|
|
|
from app.models.organization_members import OrganizationMember
|
|
|
|
|
member = OrganizationMember(organization_id=org.id, user_id=uuid4(), role="admin")
|
|
|
|
|
return OrganizationContext(organization=org, member=member)
|
|
|
|
|
|
|
|
|
|
from app.api.deps import require_org_admin, require_org_member
|
|
|
|
|
app.dependency_overrides[get_session] = _override_session
|
|
|
|
|
app.dependency_overrides[require_org_member] = _override_org_member
|
|
|
|
|
app.dependency_overrides[require_org_admin] = _override_org_admin
|
|
|
|
|
return app
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _seed_gateway(session: AsyncSession, org_id: UUID) -> Gateway:
|
|
|
|
|
gateway = Gateway(
|
|
|
|
|
id=uuid4(),
|
|
|
|
|
organization_id=org_id,
|
|
|
|
|
name="test-gateway",
|
|
|
|
|
url="ws://localhost:18789",
|
|
|
|
|
token="test-token",
|
|
|
|
|
workspace_root="/tmp/test-workspace",
|
|
|
|
|
allow_insecure_tls=True,
|
|
|
|
|
disable_device_pairing=True,
|
|
|
|
|
)
|
|
|
|
|
session.add(gateway)
|
|
|
|
|
await session.commit()
|
|
|
|
|
return gateway
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Tests
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_runtime_usage_empty_gateway_response() -> None:
|
|
|
|
|
"""Returns zeroed-out response gracefully when gateway RPC returns nothing."""
|
|
|
|
|
engine = await _make_engine()
|
|
|
|
|
session_maker = _make_session_maker(engine)
|
|
|
|
|
|
|
|
|
|
org_id = uuid4()
|
|
|
|
|
async with session_maker() as session:
|
|
|
|
|
gateway = await _seed_gateway(session, org_id)
|
|
|
|
|
|
|
|
|
|
app = _build_app(session_maker, org_id)
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"app.services.openclaw.runtime_usage.openclaw_call",
|
|
|
|
|
new_callable=AsyncMock,
|
|
|
|
|
return_value={},
|
|
|
|
|
):
|
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
|
|
|
response = await client.get(f"/api/v1/gateways/{gateway.id}/runtime-usage")
|
|
|
|
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
data = response.json()
|
|
|
|
|
assert data["gateway_id"] == str(gateway.id)
|
|
|
|
|
assert data["current"]["total_cost_usd"] == 0.0
|
|
|
|
|
assert data["current"]["total_tokens"] == 0
|
|
|
|
|
assert data["per_model"] == {}
|
|
|
|
|
assert data["top_sessions"] == []
|
|
|
|
|
assert data["predictions"]["safe"] is True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_runtime_usage_with_session_data() -> None:
|
|
|
|
|
"""Aggregates per-model usage correctly from a sessions list."""
|
|
|
|
|
engine = await _make_engine()
|
|
|
|
|
session_maker = _make_session_maker(engine)
|
|
|
|
|
|
|
|
|
|
org_id = uuid4()
|
|
|
|
|
async with session_maker() as session:
|
|
|
|
|
gateway = await _seed_gateway(session, org_id)
|
|
|
|
|
|
|
|
|
|
rpc_cost_response = {
|
|
|
|
|
"sessions": [
|
|
|
|
|
{
|
|
|
|
|
"sessionId": "sess-1",
|
|
|
|
|
"provider": "anthropic",
|
|
|
|
|
"model": "claude-sonnet-4-6",
|
|
|
|
|
"usage": {"input_tokens": 10000, "output_tokens": 5000},
|
|
|
|
|
"cost": 0.105,
|
|
|
|
|
"calls": 5,
|
|
|
|
|
"updatedAt": "2026-05-20T09:00:00Z",
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"sessionId": "sess-2",
|
|
|
|
|
"provider": "anthropic",
|
|
|
|
|
"model": "claude-haiku-4-5",
|
|
|
|
|
"usage": {"input_tokens": 50000, "output_tokens": 20000},
|
|
|
|
|
"calls": 10,
|
|
|
|
|
"updatedAt": "2026-05-20T08:00:00Z",
|
|
|
|
|
},
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
app = _build_app(session_maker, org_id)
|
|
|
|
|
|
|
|
|
|
async def _mock_call(method: str, params=None, *, config): # noqa: ANN001
|
|
|
|
|
if method == "usage.cost":
|
|
|
|
|
return rpc_cost_response
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"app.services.openclaw.runtime_usage.openclaw_call",
|
|
|
|
|
side_effect=_mock_call,
|
|
|
|
|
):
|
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
|
|
|
response = await client.get(f"/api/v1/gateways/{gateway.id}/runtime-usage")
|
|
|
|
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
data = response.json()
|
|
|
|
|
per_model = data["per_model"]
|
|
|
|
|
assert "anthropic/claude-sonnet-4-6" in per_model
|
|
|
|
|
assert "anthropic/claude-haiku-4-5" in per_model
|
|
|
|
|
assert per_model["anthropic/claude-sonnet-4-6"]["input_tokens"] == 10000
|
|
|
|
|
assert per_model["anthropic/claude-sonnet-4-6"]["calls"] == 5
|
|
|
|
|
assert data["current"]["total_calls"] == 15
|
|
|
|
|
# At least one top session
|
|
|
|
|
assert len(data["top_sessions"]) >= 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_runtime_usage_gateway_not_found() -> None:
|
|
|
|
|
"""Returns 404 when gateway_id does not belong to the org."""
|
|
|
|
|
engine = await _make_engine()
|
|
|
|
|
session_maker = _make_session_maker(engine)
|
|
|
|
|
|
|
|
|
|
org_id = uuid4()
|
|
|
|
|
other_gateway_id = str(uuid4())
|
|
|
|
|
|
|
|
|
|
app = _build_app(session_maker, org_id)
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"app.services.openclaw.runtime_usage.openclaw_call",
|
|
|
|
|
new_callable=AsyncMock,
|
|
|
|
|
return_value={},
|
|
|
|
|
):
|
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
|
|
|
response = await client.get(f"/api/v1/gateways/{other_gateway_id}/runtime-usage")
|
|
|
|
|
|
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
|
|
|
|
|
2026-05-21 04:25:31 -05:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_claude_statusline_ingest_surfaces_provider_usage() -> None:
|
|
|
|
|
"""Status-line snapshots are returned as provider-native usage windows."""
|
|
|
|
|
engine = await _make_engine()
|
|
|
|
|
session_maker = _make_session_maker(engine)
|
|
|
|
|
|
|
|
|
|
org_id = uuid4()
|
|
|
|
|
async with session_maker() as session:
|
|
|
|
|
gateway = await _seed_gateway(session, org_id)
|
|
|
|
|
|
|
|
|
|
app = _build_app(session_maker, org_id)
|
|
|
|
|
future_reset = int(datetime(2099, 1, 1, tzinfo=UTC).timestamp())
|
|
|
|
|
|
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
|
|
|
post_response = await client.post(
|
|
|
|
|
f"/api/v1/gateways/{gateway.id}/provider-usage/claude/statusline",
|
|
|
|
|
json={
|
|
|
|
|
"session_id": "claude-session-1",
|
|
|
|
|
"rate_limits": {
|
|
|
|
|
"five_hour": {
|
|
|
|
|
"used_percentage": 77,
|
|
|
|
|
"resets_at": future_reset,
|
|
|
|
|
},
|
|
|
|
|
"seven_day": {
|
|
|
|
|
"used_percentage": 21,
|
|
|
|
|
"resets_at": future_reset,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
get_response = await client.get(f"/api/v1/gateways/{gateway.id}/provider-usage")
|
|
|
|
|
|
|
|
|
|
assert post_response.status_code == 200
|
|
|
|
|
assert get_response.status_code == 200
|
|
|
|
|
data = get_response.json()
|
|
|
|
|
assert data["scraper_enabled"] is True
|
|
|
|
|
assert data["results"][0]["source_name"] == "claude_code_statusline"
|
|
|
|
|
by_key = {window["key"]: window for window in data["results"][0]["windows"]}
|
|
|
|
|
assert by_key["current_session"]["pct_used"] == 77
|
|
|
|
|
assert by_key["weekly_all_models"]["pct_used"] == 21
|
|
|
|
|
|
|
|
|
|
|
2026-05-20 20:15:02 -05:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_runtime_usage_org_boundary() -> None:
|
|
|
|
|
"""A gateway created in a different org is not visible to another org."""
|
|
|
|
|
engine = await _make_engine()
|
|
|
|
|
session_maker = _make_session_maker(engine)
|
|
|
|
|
|
|
|
|
|
org_a = uuid4()
|
|
|
|
|
org_b = uuid4()
|
|
|
|
|
|
|
|
|
|
# Seed gateway under org_a, but build app with org_b credentials
|
|
|
|
|
async with session_maker() as session:
|
|
|
|
|
gateway = await _seed_gateway(session, org_a)
|
|
|
|
|
|
|
|
|
|
app = _build_app(session_maker, org_b) # authenticated as org_b
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"app.services.openclaw.runtime_usage.openclaw_call",
|
|
|
|
|
new_callable=AsyncMock,
|
|
|
|
|
return_value={},
|
|
|
|
|
):
|
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
|
|
|
response = await client.get(f"/api/v1/gateways/{gateway.id}/runtime-usage")
|
|
|
|
|
|
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_runtime_usage_rpc_error_degrades_gracefully() -> None:
|
|
|
|
|
"""A gateway RPC failure returns zeroed usage rather than 500."""
|
|
|
|
|
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
|
|
|
|
|
|
|
|
|
|
engine = await _make_engine()
|
|
|
|
|
session_maker = _make_session_maker(engine)
|
|
|
|
|
|
|
|
|
|
org_id = uuid4()
|
|
|
|
|
async with session_maker() as session:
|
|
|
|
|
gateway = await _seed_gateway(session, org_id)
|
|
|
|
|
|
|
|
|
|
app = _build_app(session_maker, org_id)
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"app.services.openclaw.runtime_usage.openclaw_call",
|
|
|
|
|
side_effect=OpenClawGatewayError("timeout"),
|
|
|
|
|
):
|
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
|
|
|
response = await client.get(f"/api/v1/gateways/{gateway.id}/runtime-usage")
|
|
|
|
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
|
|
data = response.json()
|
|
|
|
|
assert data["current"]["total_cost_usd"] == 0.0
|