fix(heatmap&import_agent): corrections

This commit is contained in:
null 2026-05-20 04:13:32 -05:00
parent ac6320f6de
commit e083e4e10c
11 changed files with 1432 additions and 13 deletions

View File

@ -7,7 +7,9 @@ from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import Date as SADate
from sqlalchemy import and_
from sqlalchemy import cast as sa_cast
from sqlmodel import func, select
from app.api.deps import ORG_MEMBER_DEP, OrganizationContext
@ -267,9 +269,6 @@ async def get_forgejo_heatmap(
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> HeatmapResponse:
"""Return per-day issue event counts (created + closed) for the last 365 days."""
from sqlalchemy import Date as SADate
from sqlalchemy import cast as sa_cast
if organization_id and organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)

View File

@ -18,6 +18,9 @@ from app.models.gateways import Gateway
from app.models.skills import GatewayInstalledSkill
from app.schemas.common import OkResponse
from app.schemas.gateways import (
GatewayAgentImportPreviewResponse,
GatewayAgentImportRequest,
GatewayAgentImportResponse,
GatewayCreate,
GatewayRead,
GatewayTemplatesSyncResult,
@ -185,6 +188,50 @@ async def sync_gateway_templates(
return await service.sync_templates(gateway, query=sync_query, auth=auth)
@router.get(
"/{gateway_id}/agents/import-preview",
response_model=GatewayAgentImportPreviewResponse,
)
async def preview_import_gateway_agents(
gateway_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_ADMIN_DEP,
) -> GatewayAgentImportPreviewResponse:
"""Preview existing gateway runtime agents that can be imported."""
service = GatewayAdminLifecycleService(session)
gateway = await service.require_gateway(
gateway_id=gateway_id,
organization_id=ctx.organization.id,
)
return await service.preview_gateway_agent_import(gateway=gateway)
@router.post(
"/{gateway_id}/agents/import",
response_model=GatewayAgentImportResponse,
)
async def import_gateway_agents(
gateway_id: UUID,
payload: GatewayAgentImportRequest,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_ADMIN_DEP,
) -> GatewayAgentImportResponse:
"""Import selected existing gateway runtime agents into Pipeline."""
service = GatewayAdminLifecycleService(session)
gateway = await service.require_gateway(
gateway_id=gateway_id,
organization_id=ctx.organization.id,
)
return await service.import_gateway_agents(
gateway=gateway,
gateway_agent_ids=payload.gateway_agent_ids,
reconcile_after_import=payload.reconcile_after_import,
rotate_tokens=payload.rotate_tokens,
reset_sessions=payload.reset_sessions,
force_bootstrap=payload.force_bootstrap,
)
@router.delete("/{gateway_id}", response_model=OkResponse)
async def delete_gateway(
gateway_id: UUID,

View File

@ -89,3 +89,62 @@ class GatewayTemplatesSyncResult(SQLModel):
agents_skipped: int
main_updated: bool
errors: list[GatewayTemplatesSyncError] = Field(default_factory=list)
class GatewayAgentImportCandidate(SQLModel):
"""One gateway runtime agent candidate discovered for import."""
gateway_agent_id: str
gateway_agent_name: str | None = None
session_key: str | None = None
existing_agent_id: UUID | None = None
importable: bool
inferred_board_id: UUID | None = None
inferred_is_board_lead: bool = False
class GatewayAgentImportPreviewResponse(SQLModel):
"""Discovery preview response for importing existing gateway agents."""
gateway_id: UUID
discovered_count: int
importable_count: int
already_tracked_count: int
candidates: list[GatewayAgentImportCandidate] = Field(default_factory=list)
class GatewayAgentImportRequest(SQLModel):
"""Request payload listing gateway runtime agents to import."""
gateway_agent_ids: list[str] = Field(default_factory=list, min_length=1)
reconcile_after_import: bool = False
rotate_tokens: bool = True
reset_sessions: bool = True
force_bootstrap: bool = True
class GatewayAgentReconcileError(SQLModel):
"""One reconciliation error for an imported agent."""
agent_id: UUID | None = None
message: str
class GatewayAgentImportReconcileSummary(SQLModel):
"""Summary for optional post-import reconcile execution."""
attempted: int
updated: int
skipped: int
errors: list[GatewayAgentReconcileError] = Field(default_factory=list)
class GatewayAgentImportResponse(SQLModel):
"""Import summary response for existing gateway runtime agents."""
gateway_id: UUID
imported_count: int
skipped_count: int
imported_agent_ids: list[UUID] = Field(default_factory=list)
skipped_gateway_agent_ids: list[str] = Field(default_factory=list)
reconcile: GatewayAgentImportReconcileSummary | None = None

View File

@ -3,11 +3,13 @@
from __future__ import annotations
from abc import ABC, abstractmethod
import re
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import HTTPException, status
from sqlmodel import col
from sqlmodel import select
from app.core.auth import AuthContext
from app.core.logging import TRACE_LEVEL
@ -16,16 +18,25 @@ from app.db import crud
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.approvals import Approval
from app.models.boards import Board
from app.models.board_webhooks import BoardWebhook
from app.models.gateways import Gateway
from app.models.tasks import Task
from app.schemas.gateways import GatewayTemplatesSyncResult
from app.schemas.gateways import (
GatewayAgentImportCandidate,
GatewayAgentImportPreviewResponse,
GatewayAgentImportReconcileSummary,
GatewayAgentImportResponse,
GatewayAgentReconcileError,
GatewayTemplatesSyncResult,
)
from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.error_messages import normalize_gateway_error_message
from app.services.openclaw.gateway_compat import check_gateway_version_compatibility
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import OpenClawGatewayError, openclaw_call
from app.services.openclaw.internal.agent_key import agent_key as resolve_agent_key
from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator
from app.services.openclaw.provisioning_db import (
GatewayTemplateSyncOptions,
@ -40,6 +51,44 @@ if TYPE_CHECKING:
from app.models.users import User
_LEAD_SESSION_KEY_RE = re.compile(
r"^agent:lead-(?P<board_id>[0-9a-fA-F-]{36}):main$"
)
def _as_dict_list(value: object) -> list[dict[str, object]]:
if isinstance(value, dict):
for key in ("agents", "sessions", "items", "data"):
nested = value.get(key)
if isinstance(nested, list):
return [item for item in nested if isinstance(item, dict)]
return []
if isinstance(value, list):
return [item for item in value if isinstance(item, dict)]
return []
def _derive_session_key_for_runtime_agent(
runtime_agent_id: str,
session_by_agent_key: dict[str, str],
) -> str:
existing = session_by_agent_key.get(runtime_agent_id)
if existing:
return existing
return f"agent:{runtime_agent_id}:main"
def _parse_lead_board_id(session_key: str) -> UUID | None:
matched = _LEAD_SESSION_KEY_RE.match(session_key)
if matched is None:
return None
board_id_raw = matched.group("board_id")
try:
return UUID(board_id_raw)
except ValueError:
return None
class AbstractGatewayMainAgentManager(ABC):
"""Abstract manager for gateway-main agent naming/profile behavior."""
@ -287,6 +336,309 @@ class GatewayAdminLifecycleService(OpenClawDBService):
notify=False,
)
async def preview_gateway_agent_import(
self,
*,
gateway: Gateway,
) -> GatewayAgentImportPreviewResponse:
"""Return runtime agents that can be imported into Pipeline metadata."""
config = GatewayClientConfig(
url=gateway.url,
token=gateway.token,
allow_insecure_tls=gateway.allow_insecure_tls,
disable_device_pairing=gateway.disable_device_pairing,
)
try:
runtime_agents_raw = await openclaw_call("agents.list", config=config)
runtime_sessions_raw = await openclaw_call(
"sessions.list",
{"limit": 500},
config=config,
)
except OpenClawGatewayError as exc:
detail = normalize_gateway_error_message(str(exc))
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway runtime query failed: {detail}",
) from exc
runtime_agents = _as_dict_list(runtime_agents_raw)
runtime_sessions = _as_dict_list(runtime_sessions_raw)
session_by_agent_key: dict[str, str] = {}
for runtime_session in runtime_sessions:
key = runtime_session.get("key")
if not isinstance(key, str):
continue
parts = key.split(":")
if len(parts) < 2 or parts[0] != "agent":
continue
session_by_agent_key[parts[1]] = key
existing_agents = await Agent.objects.filter_by(gateway_id=gateway.id).all(self.session)
existing_by_session_key = {
(agent.openclaw_session_id or "").strip(): agent
for agent in existing_agents
if (agent.openclaw_session_id or "").strip()
}
existing_by_runtime_key = {
resolve_agent_key(agent): agent
for agent in existing_agents
}
lead_board_ids: set[UUID] = set()
normalized_runtime_agents: list[tuple[str, str | None, str]] = []
for runtime_agent in runtime_agents:
runtime_id = runtime_agent.get("id")
if not isinstance(runtime_id, str) or not runtime_id.strip():
continue
runtime_key = runtime_id.strip()
runtime_name = runtime_agent.get("name")
normalized_name = runtime_name.strip() if isinstance(runtime_name, str) else None
session_key = _derive_session_key_for_runtime_agent(runtime_key, session_by_agent_key)
lead_board_id = _parse_lead_board_id(session_key)
if lead_board_id is not None:
lead_board_ids.add(lead_board_id)
normalized_runtime_agents.append((runtime_key, normalized_name, session_key))
board_by_id: dict[UUID, Board] = {}
if lead_board_ids:
board_rows = (
await self.session.exec(
select(Board).where(
col(Board.id).in_(lead_board_ids),
col(Board.organization_id) == gateway.organization_id,
col(Board.gateway_id) == gateway.id,
)
)
).all()
board_by_id = {board.id: board for board in board_rows}
candidates: list[GatewayAgentImportCandidate] = []
for runtime_id, runtime_name, session_key in normalized_runtime_agents:
tracked_agent = existing_by_session_key.get(session_key)
if tracked_agent is None:
tracked_agent = existing_by_runtime_key.get(runtime_id)
lead_board_id = _parse_lead_board_id(session_key)
inferred_board_id = (
lead_board_id if lead_board_id is not None and lead_board_id in board_by_id else None
)
inferred_is_board_lead = inferred_board_id is not None
candidates.append(
GatewayAgentImportCandidate(
gateway_agent_id=runtime_id,
gateway_agent_name=runtime_name,
session_key=session_key,
existing_agent_id=tracked_agent.id if tracked_agent is not None else None,
importable=tracked_agent is None,
inferred_board_id=inferred_board_id,
inferred_is_board_lead=inferred_is_board_lead,
)
)
candidates.sort(key=lambda item: item.gateway_agent_id.lower())
discovered_count = len(candidates)
importable_count = sum(1 for item in candidates if item.importable)
already_tracked_count = discovered_count - importable_count
return GatewayAgentImportPreviewResponse(
gateway_id=gateway.id,
discovered_count=discovered_count,
importable_count=importable_count,
already_tracked_count=already_tracked_count,
candidates=candidates,
)
async def import_gateway_agents(
self,
*,
gateway: Gateway,
gateway_agent_ids: list[str],
reconcile_after_import: bool = False,
rotate_tokens: bool = True,
reset_sessions: bool = True,
force_bootstrap: bool = True,
) -> GatewayAgentImportResponse:
"""Import selected runtime agents as Pipeline agent rows."""
preview = await self.preview_gateway_agent_import(gateway=gateway)
candidates_by_runtime_id = {
item.gateway_agent_id: item
for item in preview.candidates
}
seen: set[str] = set()
deduped_ids: list[str] = []
for runtime_id in gateway_agent_ids:
cleaned = runtime_id.strip()
if not cleaned or cleaned in seen:
continue
seen.add(cleaned)
deduped_ids.append(cleaned)
imported_agent_ids: list[UUID] = []
skipped_gateway_agent_ids: list[str] = []
for runtime_id in deduped_ids:
candidate = candidates_by_runtime_id.get(runtime_id)
if candidate is None or not candidate.importable:
skipped_gateway_agent_ids.append(runtime_id)
continue
imported = Agent(
name=candidate.gateway_agent_name or f"Imported {runtime_id}",
board_id=candidate.inferred_board_id,
gateway_id=gateway.id,
status="offline",
heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(),
openclaw_session_id=candidate.session_key,
is_board_lead=candidate.inferred_is_board_lead,
)
self.session.add(imported)
await self.session.flush()
imported_agent_ids.append(imported.id)
await self.session.commit()
reconcile_summary: GatewayAgentImportReconcileSummary | None = None
if reconcile_after_import and imported_agent_ids:
reconcile_summary = await self.reconcile_imported_agents(
gateway=gateway,
agent_ids=imported_agent_ids,
rotate_tokens=rotate_tokens,
reset_sessions=reset_sessions,
force_bootstrap=force_bootstrap,
)
return GatewayAgentImportResponse(
gateway_id=gateway.id,
imported_count=len(imported_agent_ids),
skipped_count=len(skipped_gateway_agent_ids),
imported_agent_ids=imported_agent_ids,
skipped_gateway_agent_ids=skipped_gateway_agent_ids,
reconcile=reconcile_summary,
)
async def reconcile_imported_agents(
self,
*,
gateway: Gateway,
agent_ids: list[UUID],
rotate_tokens: bool,
reset_sessions: bool,
force_bootstrap: bool,
) -> GatewayAgentImportReconcileSummary:
"""Run post-import lifecycle reconcile for imported agent rows."""
if not agent_ids:
return GatewayAgentImportReconcileSummary(
attempted=0,
updated=0,
skipped=0,
errors=[],
)
agents = (
await self.session.exec(
select(Agent).where(
col(Agent.id).in_(agent_ids),
col(Agent.gateway_id) == gateway.id,
)
)
).all()
boards_by_id: dict[UUID, Board] = {}
board_ids = [agent.board_id for agent in agents if agent.board_id is not None]
if board_ids:
board_rows = (
await self.session.exec(
select(Board).where(
col(Board.id).in_(board_ids),
col(Board.gateway_id) == gateway.id,
col(Board.organization_id) == gateway.organization_id,
)
)
).all()
boards_by_id = {board.id: board for board in board_rows}
updated = 0
skipped = 0
errors: list[GatewayAgentReconcileError] = []
main_session_key = GatewayAgentIdentity.session_key(gateway)
for agent in agents:
board: Board | None = None
if agent.board_id is not None:
board = boards_by_id.get(agent.board_id)
if board is None:
skipped += 1
errors.append(
GatewayAgentReconcileError(
agent_id=agent.id,
message="Skipping reconcile: board is missing or not mapped to gateway.",
)
)
continue
elif (agent.openclaw_session_id or "").strip() != main_session_key:
skipped += 1
errors.append(
GatewayAgentReconcileError(
agent_id=agent.id,
message=(
"Skipping reconcile: boardless imported agent is not the gateway-main "
"session and cannot be reprovisioned automatically."
),
)
)
continue
if not rotate_tokens:
skipped += 1
errors.append(
GatewayAgentReconcileError(
agent_id=agent.id,
message=(
"Skipping reconcile: rotate_tokens=false is unsupported for imported "
"agents without an existing backend token hash."
),
)
)
continue
try:
await AgentLifecycleOrchestrator(self.session).run_lifecycle(
gateway=gateway,
agent_id=agent.id,
board=board,
user=None,
action="update",
auth_token=None,
force_bootstrap=force_bootstrap,
reset_session=reset_sessions,
wake=False,
deliver_wakeup=False,
wakeup_verb="updated",
clear_confirm_token=False,
raise_gateway_errors=True,
)
except HTTPException as exc:
skipped += 1
message = exc.detail if isinstance(exc.detail, str) else str(exc.detail)
errors.append(
GatewayAgentReconcileError(
agent_id=agent.id,
message=f"Reconcile failed: {message}",
)
)
except Exception as exc:
skipped += 1
errors.append(
GatewayAgentReconcileError(
agent_id=agent.id,
message=f"Reconcile failed: {exc}",
)
)
else:
updated += 1
return GatewayAgentImportReconcileSummary(
attempted=len(agents),
updated=updated,
skipped=skipped,
errors=errors,
)
async def clear_agent_foreign_keys(self, *, agent_id: UUID) -> None:
now = utcnow()
await crud.update_where(

View File

@ -0,0 +1,40 @@
"""Add unique constraint to agents.openclaw_session_id.
Revision ID: c1d2e3f4a5b6
Revises: b6c7d8e9f0a1
Create Date: 2026-05-20 12:00:00.000000
"""
from __future__ import annotations
from alembic import op
revision = "c1d2e3f4a5b6"
down_revision = "b6c7d8e9f0a1"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Drop the plain index first, then replace with a unique one.
# NULL values are excluded from unique constraints in PostgreSQL so
# multiple agents with NULL openclaw_session_id are still allowed.
op.drop_index("ix_agents_openclaw_session_id", table_name="agents")
op.create_index(
"ix_agents_openclaw_session_id",
"agents",
["openclaw_session_id"],
unique=True,
postgresql_where="openclaw_session_id IS NOT NULL",
)
def downgrade() -> None:
op.drop_index("ix_agents_openclaw_session_id", table_name="agents")
op.create_index(
"ix_agents_openclaw_session_id",
"agents",
["openclaw_session_id"],
unique=False,
)

View File

@ -0,0 +1,448 @@
# ruff: noqa: INP001
"""Integration tests for gateway runtime agent import preview and import APIs."""
from __future__ import annotations
from types import SimpleNamespace
from uuid import uuid4
import pytest
from fastapi import APIRouter, FastAPI
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlmodel import SQLModel, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app import models as _models
from app.api.deps import require_org_admin
from app.api.gateways import router as gateways_router
from app.db.session import get_session
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.organization_members import OrganizationMember
from app.models.organizations import Organization
from app.models.users import User
import app.services.openclaw.admin_service as admin_service
from app.core.agent_tokens import hash_agent_token
from app.services.organizations import OrganizationContext
async def _make_engine() -> AsyncEngine:
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
return engine
def _build_test_app(
session_maker: async_sessionmaker[AsyncSession],
ctx: OrganizationContext,
) -> FastAPI:
app = FastAPI()
api_v1 = APIRouter(prefix="/api/v1")
api_v1.include_router(gateways_router)
app.include_router(api_v1)
async def _override_get_session() -> AsyncSession:
async with session_maker() as session:
yield session
async def _override_require_org_admin() -> OrganizationContext:
return ctx
app.dependency_overrides[get_session] = _override_get_session
app.dependency_overrides[require_org_admin] = _override_require_org_admin
return app
@pytest.mark.asyncio
async def test_gateway_agent_import_preview_reports_importable_and_tracked(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
organization = Organization(id=uuid4(), name="Pipeline")
user = User(id=uuid4(), clerk_user_id="user_123", email="user@example.com")
member = OrganizationMember(
id=uuid4(),
organization_id=organization.id,
user_id=user.id,
role="owner",
all_boards_read=True,
all_boards_write=True,
)
gateway = Gateway(
id=uuid4(),
organization_id=organization.id,
name="Main Gateway",
url="ws://gateway.example.local",
workspace_root="/workspace",
)
board = Board(
id=uuid4(),
organization_id=organization.id,
name="Platform",
slug="platform",
gateway_id=gateway.id,
)
tracked_agent = Agent(
id=uuid4(),
board_id=None,
gateway_id=gateway.id,
name="Already Tracked",
openclaw_session_id="agent:existing-main:main",
is_board_lead=False,
)
lead_runtime_id = f"lead-{board.id}"
async def _fake_openclaw_call(
method: str,
params: object | None = None,
*,
config: object,
) -> object:
_ = params, config
if method == "agents.list":
return {
"agents": [
{"id": "existing-main", "name": "Existing Main"},
{"id": "legacy-worker", "name": "Legacy Worker"},
{"id": lead_runtime_id, "name": "Legacy Lead"},
]
}
if method == "sessions.list":
return {
"sessions": [
{"key": "agent:existing-main:main"},
{"key": "agent:legacy-worker:main"},
{"key": f"agent:{lead_runtime_id}:main"},
]
}
raise AssertionError(f"Unexpected method {method}")
monkeypatch.setattr(admin_service, "openclaw_call", _fake_openclaw_call)
try:
async with session_maker() as session:
session.add(organization)
session.add(user)
session.add(member)
session.add(gateway)
session.add(board)
session.add(tracked_agent)
await session.commit()
app = _build_test_app(
session_maker,
OrganizationContext(organization=organization, member=member),
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.get(
f"/api/v1/gateways/{gateway.id}/agents/import-preview"
)
assert response.status_code == 200
data = response.json()
assert data["discovered_count"] == 3
assert data["importable_count"] == 2
assert data["already_tracked_count"] == 1
candidates = {item["gateway_agent_id"]: item for item in data["candidates"]}
assert candidates["existing-main"]["importable"] is False
assert candidates["existing-main"]["existing_agent_id"] == str(tracked_agent.id)
assert candidates["legacy-worker"]["importable"] is True
assert candidates[lead_runtime_id]["importable"] is True
assert candidates[lead_runtime_id]["inferred_board_id"] == str(board.id)
assert candidates[lead_runtime_id]["inferred_is_board_lead"] is True
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_gateway_agent_import_creates_selected_candidates(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
organization = Organization(id=uuid4(), name="Pipeline")
user = User(id=uuid4(), clerk_user_id="user_123", email="user@example.com")
member = OrganizationMember(
id=uuid4(),
organization_id=organization.id,
user_id=user.id,
role="owner",
all_boards_read=True,
all_boards_write=True,
)
gateway = Gateway(
id=uuid4(),
organization_id=organization.id,
name="Main Gateway",
url="ws://gateway.example.local",
workspace_root="/workspace",
)
board = Board(
id=uuid4(),
organization_id=organization.id,
name="Platform",
slug="platform",
gateway_id=gateway.id,
)
tracked_agent = Agent(
id=uuid4(),
board_id=None,
gateway_id=gateway.id,
name="Already Tracked",
openclaw_session_id="agent:existing-main:main",
is_board_lead=False,
)
lead_runtime_id = f"lead-{board.id}"
async def _fake_openclaw_call(
method: str,
params: object | None = None,
*,
config: object,
) -> object:
_ = params, config
if method == "agents.list":
return {
"agents": [
{"id": "existing-main", "name": "Existing Main"},
{"id": "legacy-worker", "name": "Legacy Worker"},
{"id": lead_runtime_id, "name": "Legacy Lead"},
]
}
if method == "sessions.list":
return {
"sessions": [
{"key": "agent:existing-main:main"},
{"key": "agent:legacy-worker:main"},
{"key": f"agent:{lead_runtime_id}:main"},
]
}
raise AssertionError(f"Unexpected method {method}")
monkeypatch.setattr(admin_service, "openclaw_call", _fake_openclaw_call)
try:
async with session_maker() as session:
session.add(organization)
session.add(user)
session.add(member)
session.add(gateway)
session.add(board)
session.add(tracked_agent)
await session.commit()
app = _build_test_app(
session_maker,
OrganizationContext(organization=organization, member=member),
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/gateways/{gateway.id}/agents/import",
json={
"gateway_agent_ids": [
"existing-main",
"legacy-worker",
lead_runtime_id,
"unknown-agent",
]
},
)
assert response.status_code == 200
data = response.json()
assert data["imported_count"] == 2
assert data["skipped_count"] == 2
assert set(data["skipped_gateway_agent_ids"]) == {"existing-main", "unknown-agent"}
assert len(data["imported_agent_ids"]) == 2
async with session_maker() as session:
gateway_agents = (
await session.exec(
select(Agent).where(Agent.gateway_id == gateway.id)
)
).all()
# original tracked + imported worker + imported lead
assert len(gateway_agents) == 3
by_name = {agent.name: agent for agent in gateway_agents}
imported_worker = by_name["Legacy Worker"]
imported_lead = by_name["Legacy Lead"]
assert imported_worker.board_id is None
assert imported_worker.is_board_lead is False
assert imported_worker.openclaw_session_id == "agent:legacy-worker:main"
assert imported_lead.board_id == board.id
assert imported_lead.is_board_lead is True
assert imported_lead.openclaw_session_id == f"agent:{lead_runtime_id}:main"
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_gateway_agent_import_reconcile_updates_supported_agents(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
organization = Organization(id=uuid4(), name="Pipeline")
user = User(id=uuid4(), clerk_user_id="user_123", email="user@example.com")
member = OrganizationMember(
id=uuid4(),
organization_id=organization.id,
user_id=user.id,
role="owner",
all_boards_read=True,
all_boards_write=True,
)
gateway = Gateway(
id=uuid4(),
organization_id=organization.id,
name="Main Gateway",
url="ws://gateway.example.local",
workspace_root="/workspace",
)
board = Board(
id=uuid4(),
organization_id=organization.id,
name="Platform",
slug="platform",
gateway_id=gateway.id,
)
lead_runtime_id = f"lead-{board.id}"
async def _fake_openclaw_call(
method: str,
params: object | None = None,
*,
config: object,
) -> object:
_ = params, config
if method == "agents.list":
return {
"agents": [
{"id": "legacy-worker", "name": "Legacy Worker"},
{"id": lead_runtime_id, "name": "Legacy Lead"},
]
}
if method == "sessions.list":
return {
"sessions": [
{"key": "agent:legacy-worker:main"},
{"key": f"agent:{lead_runtime_id}:main"},
]
}
raise AssertionError(f"Unexpected method {method}")
async def _fake_run_lifecycle(
self: object,
*,
gateway: Gateway,
agent_id: object,
board: Board | None,
user: object,
action: str,
auth_token: str | None,
force_bootstrap: bool,
reset_session: bool,
wake: bool,
deliver_wakeup: bool,
wakeup_verb: str | None,
clear_confirm_token: bool,
raise_gateway_errors: bool,
) -> Agent:
_ = (
user,
action,
auth_token,
force_bootstrap,
reset_session,
wake,
deliver_wakeup,
wakeup_verb,
clear_confirm_token,
raise_gateway_errors,
)
assert board is not None
session = getattr(self, "session")
agent = await session.get(Agent, agent_id)
assert agent is not None
assert agent.gateway_id == gateway.id
agent.status = "online"
agent.agent_token_hash = hash_agent_token("rotated-token")
session.add(agent)
await session.commit()
await session.refresh(agent)
return agent
monkeypatch.setattr(admin_service, "openclaw_call", _fake_openclaw_call)
monkeypatch.setattr(
admin_service.AgentLifecycleOrchestrator,
"run_lifecycle",
_fake_run_lifecycle,
)
try:
async with session_maker() as session:
session.add(organization)
session.add(user)
session.add(member)
session.add(gateway)
session.add(board)
await session.commit()
app = _build_test_app(
session_maker,
OrganizationContext(organization=organization, member=member),
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/gateways/{gateway.id}/agents/import",
json={
"gateway_agent_ids": ["legacy-worker", lead_runtime_id],
"reconcile_after_import": True,
},
)
assert response.status_code == 200
data = response.json()
assert data["imported_count"] == 2
assert data["reconcile"] is not None
assert data["reconcile"]["attempted"] == 2
assert data["reconcile"]["updated"] == 1
assert data["reconcile"]["skipped"] == 1
assert len(data["reconcile"]["errors"]) == 1
async with session_maker() as session:
agents = (
await session.exec(
select(Agent).where(Agent.gateway_id == gateway.id)
)
).all()
by_name = {agent.name: agent for agent in agents}
assert by_name["Legacy Lead"].status == "online"
assert by_name["Legacy Lead"].agent_token_hash
assert by_name["Legacy Worker"].status == "offline"
assert by_name["Legacy Worker"].agent_token_hash is None
finally:
await engine.dispose()

View File

@ -8,6 +8,7 @@ import Link from "next/link";
import { useAuth } from "@/auth/clerk";
import { useQueryClient } from "@tanstack/react-query";
import { GatewayAgentImportDialog } from "@/components/gateways/GatewayAgentImportDialog";
import { GatewaysTable } from "@/components/gateways/GatewaysTable";
import { DashboardPageLayout } from "@/components/templates/DashboardPageLayout";
import { buttonVariants } from "@/components/ui/button";
@ -20,6 +21,7 @@ import {
useDeleteGatewayApiV1GatewaysGatewayIdDelete,
useListGatewaysApiV1GatewaysGet,
} from "@/api/generated/gateways/gateways";
import { getListAgentsApiV1AgentsGetQueryKey } from "@/api/generated/agents/agents";
import { createOptimisticListDeleteMutation } from "@/lib/list-delete";
import { useOrganizationMembership } from "@/lib/use-organization-membership";
import type { GatewayRead } from "@/api/generated/model";
@ -38,8 +40,10 @@ export default function GatewaysPage() {
const { isAdmin } = useOrganizationMembership(isSignedIn);
const [deleteTarget, setDeleteTarget] = useState<GatewayRead | null>(null);
const [importTarget, setImportTarget] = useState<GatewayRead | null>(null);
const gatewaysKey = getListGatewaysApiV1GatewaysGetQueryKey();
const agentsKey = getListAgentsApiV1AgentsGetQueryKey();
const gatewaysQuery = useListGatewaysApiV1GatewaysGet<
listGatewaysApiV1GatewaysGetResponse,
ApiError
@ -87,6 +91,11 @@ export default function GatewaysPage() {
deleteMutation.mutate({ gatewayId: deleteTarget.id });
};
const handleImported = async () => {
await queryClient.invalidateQueries({ queryKey: gatewaysKey });
await queryClient.invalidateQueries({ queryKey: agentsKey });
};
return (
<>
<DashboardPageLayout
@ -122,6 +131,7 @@ export default function GatewaysPage() {
showActions
stickyHeader
onDelete={setDeleteTarget}
onImport={setImportTarget}
emptyState={{
title: "No gateways yet",
description:
@ -155,6 +165,17 @@ export default function GatewaysPage() {
onConfirm={handleDelete}
isConfirming={deleteMutation.isPending}
/>
<GatewayAgentImportDialog
open={Boolean(importTarget)}
onOpenChange={(open) => {
if (!open) {
setImportTarget(null);
}
}}
gateway={importTarget}
onImported={handleImported}
/>
</>
);
}

View File

@ -0,0 +1,345 @@
"use client";
import { useMemo, useState } from "react";
import { useMutation, useQuery } from "@tanstack/react-query";
import type { GatewayRead } from "@/api/generated/model";
import { ApiError } from "@/api/mutator";
import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button";
import {
Dialog,
DialogContent,
DialogDescription,
DialogFooter,
DialogHeader,
DialogTitle,
} from "@/components/ui/dialog";
import {
Select,
SelectContent,
SelectItem,
SelectTrigger,
SelectValue,
} from "@/components/ui/select";
import {
getGatewayAgentImportPreview,
importGatewayAgents,
type GatewayAgentImportCandidate,
type GatewayAgentImportPreviewResponse,
type GatewayAgentImportResponse,
} from "@/lib/api-gateway-agent-import";
type ImportMode = "import_only" | "import_and_reconcile";
type GatewayAgentImportDialogProps = {
open: boolean;
onOpenChange: (open: boolean) => void;
gateway: GatewayRead | null;
onImported: (result: GatewayAgentImportResponse) => void;
};
function shortUuid(value: string | null): string {
if (!value) {
return "—";
}
return value.length > 14 ? `${value.slice(0, 6)}${value.slice(-6)}` : value;
}
export function GatewayAgentImportDialog({
open,
onOpenChange,
gateway,
onImported,
}: GatewayAgentImportDialogProps) {
const [manualSelectedAgentIds, setManualSelectedAgentIds] = useState<Set<string> | null>(null);
const [importMode, setImportMode] = useState<ImportMode>("import_and_reconcile");
const [resultMessage, setResultMessage] = useState<string | null>(null);
const previewQuery = useQuery<GatewayAgentImportPreviewResponse, ApiError>({
queryKey: ["gateway-agent-import-preview", gateway?.id],
queryFn: async () => {
if (!gateway) {
throw new Error("Gateway is required");
}
return getGatewayAgentImportPreview(gateway.id);
},
enabled: open && Boolean(gateway?.id),
staleTime: 0,
refetchOnMount: "always",
});
const importableCandidates = useMemo(
() =>
(previewQuery.data?.candidates ?? []).filter(
(candidate) => candidate.importable,
),
[previewQuery.data?.candidates],
);
const defaultSelectedAgentIds = useMemo(
() =>
new Set(
importableCandidates.map((candidate) => candidate.gateway_agent_id),
),
[importableCandidates],
);
const selectedAgentIds = manualSelectedAgentIds ?? defaultSelectedAgentIds;
const importMutation = useMutation<
GatewayAgentImportResponse,
ApiError,
string[]
>({
mutationFn: async (gatewayAgentIds) => {
if (!gateway) {
throw new Error("Gateway is required");
}
return importGatewayAgents(gateway.id, {
gateway_agent_ids: gatewayAgentIds,
reconcile_after_import: importMode === "import_and_reconcile",
rotate_tokens: importMode === "import_and_reconcile",
reset_sessions: importMode === "import_and_reconcile",
force_bootstrap: importMode === "import_and_reconcile",
});
},
onSuccess: (result) => {
const reconcileText = result.reconcile
? ` Reconcile updated ${result.reconcile.updated}/${result.reconcile.attempted}.`
: "";
setResultMessage(
`Imported ${result.imported_count} agent${result.imported_count === 1 ? "" : "s"}.${reconcileText}`,
);
onImported(result);
},
});
const selectedCount = selectedAgentIds.size;
const canImport = selectedCount > 0 && !importMutation.isPending;
const toggleCandidate = (candidate: GatewayAgentImportCandidate) => {
if (!candidate.importable) {
return;
}
setManualSelectedAgentIds((prev) => {
const next = new Set(prev ?? selectedAgentIds);
if (next.has(candidate.gateway_agent_id)) {
next.delete(candidate.gateway_agent_id);
} else {
next.add(candidate.gateway_agent_id);
}
return next;
});
};
const selectAllImportable = () => {
setManualSelectedAgentIds(
new Set(importableCandidates.map((candidate) => candidate.gateway_agent_id)),
);
};
const clearSelection = () => {
setManualSelectedAgentIds(new Set());
};
const handleImport = () => {
importMutation.mutate([...selectedAgentIds]);
};
const handleOpenChange = (nextOpen: boolean) => {
if (!nextOpen) {
setManualSelectedAgentIds(null);
setResultMessage(null);
}
onOpenChange(nextOpen);
};
return (
<Dialog open={open} onOpenChange={handleOpenChange}>
<DialogContent className="max-w-4xl">
<DialogHeader>
<DialogTitle>Import Existing Gateway Agents</DialogTitle>
<DialogDescription>
Preview gateway runtime agents for <strong>{gateway?.name}</strong>,
then import selected entries into Pipeline.
</DialogDescription>
</DialogHeader>
<div className="space-y-4">
<div className="grid gap-3 sm:grid-cols-3">
<div className="rounded-lg border border-border bg-muted/30 p-3 text-sm">
<div className="text-muted-foreground">Discovered</div>
<div className="text-base font-semibold text-foreground">
{previewQuery.data?.discovered_count ?? 0}
</div>
</div>
<div className="rounded-lg border border-border bg-muted/30 p-3 text-sm">
<div className="text-muted-foreground">Importable</div>
<div className="text-base font-semibold text-foreground">
{previewQuery.data?.importable_count ?? 0}
</div>
</div>
<div className="rounded-lg border border-border bg-muted/30 p-3 text-sm">
<div className="text-muted-foreground">Already Tracked</div>
<div className="text-base font-semibold text-foreground">
{previewQuery.data?.already_tracked_count ?? 0}
</div>
</div>
</div>
<div className="flex flex-col gap-2 rounded-lg border border-border bg-card p-3 sm:flex-row sm:items-center sm:justify-between">
<div className="text-sm text-muted-foreground">
Selected {selectedCount} of {importableCandidates.length} importable
agents
</div>
<div className="flex items-center gap-2">
<Button
type="button"
variant="secondary"
size="sm"
onClick={selectAllImportable}
>
Select All
</Button>
<Button
type="button"
variant="ghost"
size="sm"
onClick={clearSelection}
>
Clear
</Button>
</div>
</div>
<div className="max-h-[360px] overflow-auto rounded-lg border border-border">
<table className="w-full text-sm">
<thead className="sticky top-0 bg-muted/70 text-xs uppercase text-muted-foreground">
<tr>
<th className="px-3 py-2 text-left">Agent</th>
<th className="px-3 py-2 text-left">Session</th>
<th className="px-3 py-2 text-left">Board</th>
<th className="px-3 py-2 text-left">Status</th>
<th className="px-3 py-2 text-right">Select</th>
</tr>
</thead>
<tbody>
{previewQuery.isLoading ? (
<tr>
<td colSpan={5} className="px-3 py-6 text-center text-muted-foreground">
Loading import preview
</td>
</tr>
) : previewQuery.error ? (
<tr>
<td colSpan={5} className="px-3 py-6 text-center text-red-500">
{previewQuery.error.message}
</td>
</tr>
) : previewQuery.data?.candidates.length ? (
previewQuery.data.candidates.map((candidate) => {
const isSelected = selectedAgentIds.has(
candidate.gateway_agent_id,
);
return (
<tr
key={candidate.gateway_agent_id}
className="border-t border-border"
>
<td className="px-3 py-2">
<div className="font-medium text-foreground">
{candidate.gateway_agent_name ?? candidate.gateway_agent_id}
</div>
<div className="text-xs text-muted-foreground">
{candidate.gateway_agent_id}
</div>
</td>
<td className="px-3 py-2 text-xs text-muted-foreground">
{candidate.session_key ?? "—"}
</td>
<td className="px-3 py-2 text-xs text-muted-foreground">
{shortUuid(candidate.inferred_board_id)}
</td>
<td className="px-3 py-2">
{candidate.importable ? (
<Badge variant="outline">Importable</Badge>
) : (
<Badge>Tracked</Badge>
)}
</td>
<td className="px-3 py-2 text-right">
{candidate.importable ? (
<Button
type="button"
size="sm"
variant={isSelected ? "primary" : "secondary"}
onClick={() => toggleCandidate(candidate)}
>
{isSelected ? "Selected" : "Select"}
</Button>
) : (
<span className="text-xs text-muted-foreground">
</span>
)}
</td>
</tr>
);
})
) : (
<tr>
<td colSpan={5} className="px-3 py-6 text-center text-muted-foreground">
No gateway agents found.
</td>
</tr>
)}
</tbody>
</table>
</div>
<div className="grid gap-2 sm:max-w-sm">
<label className="text-sm font-medium text-foreground">Import Mode</label>
<Select
value={importMode}
onValueChange={(value) => setImportMode(value as ImportMode)}
>
<SelectTrigger>
<SelectValue />
</SelectTrigger>
<SelectContent>
<SelectItem value="import_and_reconcile">
Import + Reconcile
</SelectItem>
<SelectItem value="import_only">Import Only</SelectItem>
</SelectContent>
</Select>
<p className="text-xs text-muted-foreground">
Reconcile rotates tokens and pushes templates so imported agents can
check in with Pipeline auth.
</p>
</div>
{resultMessage ? (
<p className="text-sm text-foreground">{resultMessage}</p>
) : null}
{importMutation.error ? (
<p className="text-sm text-red-500">{importMutation.error.message}</p>
) : null}
</div>
<DialogFooter>
<Button
type="button"
variant="ghost"
onClick={() => handleOpenChange(false)}
>
Close
</Button>
<Button type="button" onClick={handleImport} disabled={!canImport}>
{importMutation.isPending ? "Importing…" : "Import Selected"}
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
);
}

View File

@ -30,6 +30,7 @@ type GatewaysTableProps = {
columnOrder?: string[];
disableSorting?: boolean;
onDelete?: (gateway: GatewayRead) => void;
onImport?: (gateway: GatewayRead) => void;
emptyMessage?: string;
emptyState?: Omit<DataTableEmptyState, "icon"> & {
icon?: DataTableEmptyState["icon"];
@ -62,6 +63,7 @@ export function GatewaysTable({
columnOrder,
disableSorting = false,
onDelete,
onImport,
emptyMessage = "No gateways found.",
emptyState,
}: GatewaysTableProps) {
@ -139,8 +141,31 @@ export function GatewaysTable({
rowActions={
showActions
? {
getEditHref: (gateway) => `/gateways/${gateway.id}/edit`,
onDelete,
actions: [
{
key: "edit",
label: "Edit",
href: (gateway) => `/gateways/${gateway.id}/edit`,
},
...(onImport
? [
{
key: "import-agents",
label: "Import Agents",
onClick: onImport,
},
]
: []),
...(onDelete
? [
{
key: "delete",
label: "Delete",
onClick: onDelete,
},
]
: []),
],
}
: undefined
}

View File

@ -51,12 +51,13 @@ export function ForgejoHeatmap({
const { weeks, monthLabels } = useMemo(() => {
const data = new Map<string, number>(days.map((d) => [d.date, d.count]));
// Start on the Sunday that is ~52 weeks before today
// Start on the Sunday 52 weeks before the current week's Sunday,
// so the last column always contains today and future cells are clipped.
const today = new Date();
today.setHours(0, 0, 0, 0);
const start = new Date(today);
start.setDate(start.getDate() - WEEKS * 7 + 1);
start.setDate(start.getDate() - start.getDay()); // rewind to Sunday
start.setDate(start.getDate() - start.getDay()); // rewind to this week's Sunday
start.setDate(start.getDate() - (WEEKS - 1) * 7); // go back 52 more weeks
type Cell = { date: string; count: number; future: boolean };
const builtWeeks: Cell[][] = [];
@ -96,7 +97,7 @@ export function ForgejoHeatmap({
return (
<div>
<div style={{ overflowX: "auto" }}>
<div style={{ overflowX: "auto" }} className="flex justify-center">
<svg
width={svgW}
height={svgH}
@ -117,12 +118,14 @@ export function ForgejoHeatmap({
</text>
))}
{/* Day-of-week labels: Mon=row1, Wed=row3, Fri=row5 */}
{/* Day-of-week labels: Mon=row1, Wed=row3, Fri=row5
y = cell top + CELL - 2 puts the baseline near the bottom of the
10px cell, which reads as vertically centred for a 10px font. */}
{(["Mon", "Wed", "Fri"] as const).map((label, i) => (
<text
key={label}
x={0}
y={TOP + (i * 2 + 1) * STRIDE - GAP}
y={TOP + (i * 2 + 1) * STRIDE + CELL - 2}
fontSize={10}
fill="currentColor"
className="text-muted"
@ -158,7 +161,7 @@ export function ForgejoHeatmap({
</div>
{/* Legend */}
<div className="mt-2 flex items-center justify-end gap-1.5 text-xs text-muted">
<div className="mt-2 flex items-center justify-center gap-1.5 text-xs text-muted">
<span>Less</span>
{LEVEL_FILL.map((fill, i) => (
<div

View File

@ -0,0 +1,80 @@
import { customFetch } from "@/api/mutator";
type ApiResponse<T> = {
data: T;
status: number;
headers: Headers;
};
export type GatewayAgentImportCandidate = {
gateway_agent_id: string;
gateway_agent_name: string | null;
session_key: string | null;
existing_agent_id: string | null;
importable: boolean;
inferred_board_id: string | null;
inferred_is_board_lead: boolean;
};
export type GatewayAgentImportPreviewResponse = {
gateway_id: string;
discovered_count: number;
importable_count: number;
already_tracked_count: number;
candidates: GatewayAgentImportCandidate[];
};
export type GatewayAgentReconcileError = {
agent_id: string | null;
message: string;
};
export type GatewayAgentImportReconcileSummary = {
attempted: number;
updated: number;
skipped: number;
errors: GatewayAgentReconcileError[];
};
export type GatewayAgentImportResponse = {
gateway_id: string;
imported_count: number;
skipped_count: number;
imported_agent_ids: string[];
skipped_gateway_agent_ids: string[];
reconcile: GatewayAgentImportReconcileSummary | null;
};
export type GatewayAgentImportRequest = {
gateway_agent_ids: string[];
reconcile_after_import: boolean;
rotate_tokens: boolean;
reset_sessions: boolean;
force_bootstrap: boolean;
};
async function fetchJson<T>(path: string, init?: RequestInit): Promise<T> {
const response = await customFetch<ApiResponse<T>>(path, init ?? {});
return response.data;
}
export async function getGatewayAgentImportPreview(
gatewayId: string,
): Promise<GatewayAgentImportPreviewResponse> {
return fetchJson<GatewayAgentImportPreviewResponse>(
`/api/v1/gateways/${gatewayId}/agents/import-preview`,
);
}
export async function importGatewayAgents(
gatewayId: string,
payload: GatewayAgentImportRequest,
): Promise<GatewayAgentImportResponse> {
return fetchJson<GatewayAgentImportResponse>(
`/api/v1/gateways/${gatewayId}/agents/import`,
{
method: "POST",
body: JSON.stringify(payload),
},
);
}