Pipeline/backend/tests/test_gateway_agent_import_a...

449 lines
15 KiB
Python
Raw Permalink Normal View History

2026-05-20 04:13:32 -05:00
# ruff: noqa: INP001
"""Integration tests for gateway runtime agent import preview and import APIs."""
from __future__ import annotations
from types import SimpleNamespace
from uuid import uuid4
import pytest
from fastapi import APIRouter, FastAPI
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlmodel import SQLModel, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app import models as _models
from app.api.deps import require_org_admin
from app.api.gateways import router as gateways_router
from app.db.session import get_session
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.organization_members import OrganizationMember
from app.models.organizations import Organization
from app.models.users import User
import app.services.openclaw.admin_service as admin_service
from app.core.agent_tokens import hash_agent_token
from app.services.organizations import OrganizationContext
async def _make_engine() -> AsyncEngine:
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
return engine
def _build_test_app(
session_maker: async_sessionmaker[AsyncSession],
ctx: OrganizationContext,
) -> FastAPI:
app = FastAPI()
api_v1 = APIRouter(prefix="/api/v1")
api_v1.include_router(gateways_router)
app.include_router(api_v1)
async def _override_get_session() -> AsyncSession:
async with session_maker() as session:
yield session
async def _override_require_org_admin() -> OrganizationContext:
return ctx
app.dependency_overrides[get_session] = _override_get_session
app.dependency_overrides[require_org_admin] = _override_require_org_admin
return app
@pytest.mark.asyncio
async def test_gateway_agent_import_preview_reports_importable_and_tracked(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
organization = Organization(id=uuid4(), name="Pipeline")
user = User(id=uuid4(), clerk_user_id="user_123", email="user@example.com")
member = OrganizationMember(
id=uuid4(),
organization_id=organization.id,
user_id=user.id,
role="owner",
all_boards_read=True,
all_boards_write=True,
)
gateway = Gateway(
id=uuid4(),
organization_id=organization.id,
name="Main Gateway",
url="ws://gateway.example.local",
workspace_root="/workspace",
)
board = Board(
id=uuid4(),
organization_id=organization.id,
name="Platform",
slug="platform",
gateway_id=gateway.id,
)
tracked_agent = Agent(
id=uuid4(),
board_id=None,
gateway_id=gateway.id,
name="Already Tracked",
openclaw_session_id="agent:existing-main:main",
is_board_lead=False,
)
lead_runtime_id = f"lead-{board.id}"
async def _fake_openclaw_call(
method: str,
params: object | None = None,
*,
config: object,
) -> object:
_ = params, config
if method == "agents.list":
return {
"agents": [
{"id": "existing-main", "name": "Existing Main"},
{"id": "legacy-worker", "name": "Legacy Worker"},
{"id": lead_runtime_id, "name": "Legacy Lead"},
]
}
if method == "sessions.list":
return {
"sessions": [
{"key": "agent:existing-main:main"},
{"key": "agent:legacy-worker:main"},
{"key": f"agent:{lead_runtime_id}:main"},
]
}
raise AssertionError(f"Unexpected method {method}")
monkeypatch.setattr(admin_service, "openclaw_call", _fake_openclaw_call)
try:
async with session_maker() as session:
session.add(organization)
session.add(user)
session.add(member)
session.add(gateway)
session.add(board)
session.add(tracked_agent)
await session.commit()
app = _build_test_app(
session_maker,
OrganizationContext(organization=organization, member=member),
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.get(
f"/api/v1/gateways/{gateway.id}/agents/import-preview"
)
assert response.status_code == 200
data = response.json()
assert data["discovered_count"] == 3
assert data["importable_count"] == 2
assert data["already_tracked_count"] == 1
candidates = {item["gateway_agent_id"]: item for item in data["candidates"]}
assert candidates["existing-main"]["importable"] is False
assert candidates["existing-main"]["existing_agent_id"] == str(tracked_agent.id)
assert candidates["legacy-worker"]["importable"] is True
assert candidates[lead_runtime_id]["importable"] is True
assert candidates[lead_runtime_id]["inferred_board_id"] == str(board.id)
assert candidates[lead_runtime_id]["inferred_is_board_lead"] is True
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_gateway_agent_import_creates_selected_candidates(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
organization = Organization(id=uuid4(), name="Pipeline")
user = User(id=uuid4(), clerk_user_id="user_123", email="user@example.com")
member = OrganizationMember(
id=uuid4(),
organization_id=organization.id,
user_id=user.id,
role="owner",
all_boards_read=True,
all_boards_write=True,
)
gateway = Gateway(
id=uuid4(),
organization_id=organization.id,
name="Main Gateway",
url="ws://gateway.example.local",
workspace_root="/workspace",
)
board = Board(
id=uuid4(),
organization_id=organization.id,
name="Platform",
slug="platform",
gateway_id=gateway.id,
)
tracked_agent = Agent(
id=uuid4(),
board_id=None,
gateway_id=gateway.id,
name="Already Tracked",
openclaw_session_id="agent:existing-main:main",
is_board_lead=False,
)
lead_runtime_id = f"lead-{board.id}"
async def _fake_openclaw_call(
method: str,
params: object | None = None,
*,
config: object,
) -> object:
_ = params, config
if method == "agents.list":
return {
"agents": [
{"id": "existing-main", "name": "Existing Main"},
{"id": "legacy-worker", "name": "Legacy Worker"},
{"id": lead_runtime_id, "name": "Legacy Lead"},
]
}
if method == "sessions.list":
return {
"sessions": [
{"key": "agent:existing-main:main"},
{"key": "agent:legacy-worker:main"},
{"key": f"agent:{lead_runtime_id}:main"},
]
}
raise AssertionError(f"Unexpected method {method}")
monkeypatch.setattr(admin_service, "openclaw_call", _fake_openclaw_call)
try:
async with session_maker() as session:
session.add(organization)
session.add(user)
session.add(member)
session.add(gateway)
session.add(board)
session.add(tracked_agent)
await session.commit()
app = _build_test_app(
session_maker,
OrganizationContext(organization=organization, member=member),
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/gateways/{gateway.id}/agents/import",
json={
"gateway_agent_ids": [
"existing-main",
"legacy-worker",
lead_runtime_id,
"unknown-agent",
]
},
)
assert response.status_code == 200
data = response.json()
assert data["imported_count"] == 2
assert data["skipped_count"] == 2
assert set(data["skipped_gateway_agent_ids"]) == {"existing-main", "unknown-agent"}
assert len(data["imported_agent_ids"]) == 2
async with session_maker() as session:
gateway_agents = (
await session.exec(
select(Agent).where(Agent.gateway_id == gateway.id)
)
).all()
# original tracked + imported worker + imported lead
assert len(gateway_agents) == 3
by_name = {agent.name: agent for agent in gateway_agents}
imported_worker = by_name["Legacy Worker"]
imported_lead = by_name["Legacy Lead"]
assert imported_worker.board_id is None
assert imported_worker.is_board_lead is False
assert imported_worker.openclaw_session_id == "agent:legacy-worker:main"
assert imported_lead.board_id == board.id
assert imported_lead.is_board_lead is True
assert imported_lead.openclaw_session_id == f"agent:{lead_runtime_id}:main"
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_gateway_agent_import_reconcile_updates_supported_agents(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
organization = Organization(id=uuid4(), name="Pipeline")
user = User(id=uuid4(), clerk_user_id="user_123", email="user@example.com")
member = OrganizationMember(
id=uuid4(),
organization_id=organization.id,
user_id=user.id,
role="owner",
all_boards_read=True,
all_boards_write=True,
)
gateway = Gateway(
id=uuid4(),
organization_id=organization.id,
name="Main Gateway",
url="ws://gateway.example.local",
workspace_root="/workspace",
)
board = Board(
id=uuid4(),
organization_id=organization.id,
name="Platform",
slug="platform",
gateway_id=gateway.id,
)
lead_runtime_id = f"lead-{board.id}"
async def _fake_openclaw_call(
method: str,
params: object | None = None,
*,
config: object,
) -> object:
_ = params, config
if method == "agents.list":
return {
"agents": [
{"id": "legacy-worker", "name": "Legacy Worker"},
{"id": lead_runtime_id, "name": "Legacy Lead"},
]
}
if method == "sessions.list":
return {
"sessions": [
{"key": "agent:legacy-worker:main"},
{"key": f"agent:{lead_runtime_id}:main"},
]
}
raise AssertionError(f"Unexpected method {method}")
async def _fake_run_lifecycle(
self: object,
*,
gateway: Gateway,
agent_id: object,
board: Board | None,
user: object,
action: str,
auth_token: str | None,
force_bootstrap: bool,
reset_session: bool,
wake: bool,
deliver_wakeup: bool,
wakeup_verb: str | None,
clear_confirm_token: bool,
raise_gateway_errors: bool,
) -> Agent:
_ = (
user,
action,
auth_token,
force_bootstrap,
reset_session,
wake,
deliver_wakeup,
wakeup_verb,
clear_confirm_token,
raise_gateway_errors,
)
assert board is not None
session = getattr(self, "session")
agent = await session.get(Agent, agent_id)
assert agent is not None
assert agent.gateway_id == gateway.id
agent.status = "online"
agent.agent_token_hash = hash_agent_token("rotated-token")
session.add(agent)
await session.commit()
await session.refresh(agent)
return agent
monkeypatch.setattr(admin_service, "openclaw_call", _fake_openclaw_call)
monkeypatch.setattr(
admin_service.AgentLifecycleOrchestrator,
"run_lifecycle",
_fake_run_lifecycle,
)
try:
async with session_maker() as session:
session.add(organization)
session.add(user)
session.add(member)
session.add(gateway)
session.add(board)
await session.commit()
app = _build_test_app(
session_maker,
OrganizationContext(organization=organization, member=member),
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/gateways/{gateway.id}/agents/import",
json={
"gateway_agent_ids": ["legacy-worker", lead_runtime_id],
"reconcile_after_import": True,
},
)
assert response.status_code == 200
data = response.json()
assert data["imported_count"] == 2
assert data["reconcile"] is not None
assert data["reconcile"]["attempted"] == 2
assert data["reconcile"]["updated"] == 1
assert data["reconcile"]["skipped"] == 1
assert len(data["reconcile"]["errors"]) == 1
async with session_maker() as session:
agents = (
await session.exec(
select(Agent).where(Agent.gateway_id == gateway.id)
)
).all()
by_name = {agent.name: agent for agent in agents}
assert by_name["Legacy Lead"].status == "online"
assert by_name["Legacy Lead"].agent_token_hash
assert by_name["Legacy Worker"].status == "offline"
assert by_name["Legacy Worker"].agent_token_hash is None
finally:
await engine.dispose()