From e083e4e10c33f7dc6b9b2e1f18b83875dbef5826 Mon Sep 17 00:00:00 2001 From: null Date: Wed, 20 May 2026 04:13:32 -0500 Subject: [PATCH] fix(heatmap&import_agent): corrections --- backend/app/api/forgejo_metrics.py | 5 +- backend/app/api/gateways.py | 47 ++ backend/app/schemas/gateways.py | 59 +++ .../app/services/openclaw/admin_service.py | 354 +++++++++++++- .../c1d2e3f4a5b6_unique_agent_session_id.py | 40 ++ .../tests/test_gateway_agent_import_api.py | 448 ++++++++++++++++++ frontend/src/app/gateways/page.tsx | 21 + .../gateways/GatewayAgentImportDialog.tsx | 345 ++++++++++++++ .../src/components/gateways/GatewaysTable.tsx | 29 +- .../src/components/git/ForgejoHeatmap.tsx | 17 +- frontend/src/lib/api-gateway-agent-import.ts | 80 ++++ 11 files changed, 1432 insertions(+), 13 deletions(-) create mode 100644 backend/migrations/versions/c1d2e3f4a5b6_unique_agent_session_id.py create mode 100644 backend/tests/test_gateway_agent_import_api.py create mode 100644 frontend/src/components/gateways/GatewayAgentImportDialog.tsx create mode 100644 frontend/src/lib/api-gateway-agent-import.ts diff --git a/backend/app/api/forgejo_metrics.py b/backend/app/api/forgejo_metrics.py index d862633..ac7c218 100644 --- a/backend/app/api/forgejo_metrics.py +++ b/backend/app/api/forgejo_metrics.py @@ -7,7 +7,9 @@ from typing import TYPE_CHECKING from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy import Date as SADate from sqlalchemy import and_ +from sqlalchemy import cast as sa_cast from sqlmodel import func, select from app.api.deps import ORG_MEMBER_DEP, OrganizationContext @@ -267,9 +269,6 @@ async def get_forgejo_heatmap( ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> HeatmapResponse: """Return per-day issue event counts (created + closed) for the last 365 days.""" - from sqlalchemy import Date as SADate - from sqlalchemy import cast as sa_cast - if organization_id and organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) diff --git a/backend/app/api/gateways.py b/backend/app/api/gateways.py index 2429c8a..0c8a798 100644 --- a/backend/app/api/gateways.py +++ b/backend/app/api/gateways.py @@ -18,6 +18,9 @@ from app.models.gateways import Gateway from app.models.skills import GatewayInstalledSkill from app.schemas.common import OkResponse from app.schemas.gateways import ( + GatewayAgentImportPreviewResponse, + GatewayAgentImportRequest, + GatewayAgentImportResponse, GatewayCreate, GatewayRead, GatewayTemplatesSyncResult, @@ -185,6 +188,50 @@ async def sync_gateway_templates( return await service.sync_templates(gateway, query=sync_query, auth=auth) +@router.get( + "/{gateway_id}/agents/import-preview", + response_model=GatewayAgentImportPreviewResponse, +) +async def preview_import_gateway_agents( + gateway_id: UUID, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_ADMIN_DEP, +) -> GatewayAgentImportPreviewResponse: + """Preview existing gateway runtime agents that can be imported.""" + service = GatewayAdminLifecycleService(session) + gateway = await service.require_gateway( + gateway_id=gateway_id, + organization_id=ctx.organization.id, + ) + return await service.preview_gateway_agent_import(gateway=gateway) + + +@router.post( + "/{gateway_id}/agents/import", + response_model=GatewayAgentImportResponse, +) +async def import_gateway_agents( + gateway_id: UUID, + payload: GatewayAgentImportRequest, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_ADMIN_DEP, +) -> GatewayAgentImportResponse: + """Import selected existing gateway runtime agents into Pipeline.""" + service = GatewayAdminLifecycleService(session) + gateway = await service.require_gateway( + gateway_id=gateway_id, + organization_id=ctx.organization.id, + ) + return await service.import_gateway_agents( + gateway=gateway, + gateway_agent_ids=payload.gateway_agent_ids, + reconcile_after_import=payload.reconcile_after_import, + rotate_tokens=payload.rotate_tokens, + reset_sessions=payload.reset_sessions, + force_bootstrap=payload.force_bootstrap, + ) + + @router.delete("/{gateway_id}", response_model=OkResponse) async def delete_gateway( gateway_id: UUID, diff --git a/backend/app/schemas/gateways.py b/backend/app/schemas/gateways.py index 9d30699..8ed163d 100644 --- a/backend/app/schemas/gateways.py +++ b/backend/app/schemas/gateways.py @@ -89,3 +89,62 @@ class GatewayTemplatesSyncResult(SQLModel): agents_skipped: int main_updated: bool errors: list[GatewayTemplatesSyncError] = Field(default_factory=list) + + +class GatewayAgentImportCandidate(SQLModel): + """One gateway runtime agent candidate discovered for import.""" + + gateway_agent_id: str + gateway_agent_name: str | None = None + session_key: str | None = None + existing_agent_id: UUID | None = None + importable: bool + inferred_board_id: UUID | None = None + inferred_is_board_lead: bool = False + + +class GatewayAgentImportPreviewResponse(SQLModel): + """Discovery preview response for importing existing gateway agents.""" + + gateway_id: UUID + discovered_count: int + importable_count: int + already_tracked_count: int + candidates: list[GatewayAgentImportCandidate] = Field(default_factory=list) + + +class GatewayAgentImportRequest(SQLModel): + """Request payload listing gateway runtime agents to import.""" + + gateway_agent_ids: list[str] = Field(default_factory=list, min_length=1) + reconcile_after_import: bool = False + rotate_tokens: bool = True + reset_sessions: bool = True + force_bootstrap: bool = True + + +class GatewayAgentReconcileError(SQLModel): + """One reconciliation error for an imported agent.""" + + agent_id: UUID | None = None + message: str + + +class GatewayAgentImportReconcileSummary(SQLModel): + """Summary for optional post-import reconcile execution.""" + + attempted: int + updated: int + skipped: int + errors: list[GatewayAgentReconcileError] = Field(default_factory=list) + + +class GatewayAgentImportResponse(SQLModel): + """Import summary response for existing gateway runtime agents.""" + + gateway_id: UUID + imported_count: int + skipped_count: int + imported_agent_ids: list[UUID] = Field(default_factory=list) + skipped_gateway_agent_ids: list[str] = Field(default_factory=list) + reconcile: GatewayAgentImportReconcileSummary | None = None diff --git a/backend/app/services/openclaw/admin_service.py b/backend/app/services/openclaw/admin_service.py index 5a58e18..0456b86 100644 --- a/backend/app/services/openclaw/admin_service.py +++ b/backend/app/services/openclaw/admin_service.py @@ -3,11 +3,13 @@ from __future__ import annotations from abc import ABC, abstractmethod +import re from typing import TYPE_CHECKING from uuid import UUID from fastapi import HTTPException, status from sqlmodel import col +from sqlmodel import select from app.core.auth import AuthContext from app.core.logging import TRACE_LEVEL @@ -16,16 +18,25 @@ from app.db import crud from app.models.activity_events import ActivityEvent from app.models.agents import Agent from app.models.approvals import Approval +from app.models.boards import Board from app.models.board_webhooks import BoardWebhook from app.models.gateways import Gateway from app.models.tasks import Task -from app.schemas.gateways import GatewayTemplatesSyncResult +from app.schemas.gateways import ( + GatewayAgentImportCandidate, + GatewayAgentImportPreviewResponse, + GatewayAgentImportReconcileSummary, + GatewayAgentImportResponse, + GatewayAgentReconcileError, + GatewayTemplatesSyncResult, +) from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG from app.services.openclaw.db_service import OpenClawDBService from app.services.openclaw.error_messages import normalize_gateway_error_message from app.services.openclaw.gateway_compat import check_gateway_version_compatibility from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig from app.services.openclaw.gateway_rpc import OpenClawGatewayError, openclaw_call +from app.services.openclaw.internal.agent_key import agent_key as resolve_agent_key from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator from app.services.openclaw.provisioning_db import ( GatewayTemplateSyncOptions, @@ -40,6 +51,44 @@ if TYPE_CHECKING: from app.models.users import User +_LEAD_SESSION_KEY_RE = re.compile( + r"^agent:lead-(?P[0-9a-fA-F-]{36}):main$" +) + + +def _as_dict_list(value: object) -> list[dict[str, object]]: + if isinstance(value, dict): + for key in ("agents", "sessions", "items", "data"): + nested = value.get(key) + if isinstance(nested, list): + return [item for item in nested if isinstance(item, dict)] + return [] + if isinstance(value, list): + return [item for item in value if isinstance(item, dict)] + return [] + + +def _derive_session_key_for_runtime_agent( + runtime_agent_id: str, + session_by_agent_key: dict[str, str], +) -> str: + existing = session_by_agent_key.get(runtime_agent_id) + if existing: + return existing + return f"agent:{runtime_agent_id}:main" + + +def _parse_lead_board_id(session_key: str) -> UUID | None: + matched = _LEAD_SESSION_KEY_RE.match(session_key) + if matched is None: + return None + board_id_raw = matched.group("board_id") + try: + return UUID(board_id_raw) + except ValueError: + return None + + class AbstractGatewayMainAgentManager(ABC): """Abstract manager for gateway-main agent naming/profile behavior.""" @@ -287,6 +336,309 @@ class GatewayAdminLifecycleService(OpenClawDBService): notify=False, ) + async def preview_gateway_agent_import( + self, + *, + gateway: Gateway, + ) -> GatewayAgentImportPreviewResponse: + """Return runtime agents that can be imported into Pipeline metadata.""" + config = GatewayClientConfig( + url=gateway.url, + token=gateway.token, + allow_insecure_tls=gateway.allow_insecure_tls, + disable_device_pairing=gateway.disable_device_pairing, + ) + try: + runtime_agents_raw = await openclaw_call("agents.list", config=config) + runtime_sessions_raw = await openclaw_call( + "sessions.list", + {"limit": 500}, + config=config, + ) + except OpenClawGatewayError as exc: + detail = normalize_gateway_error_message(str(exc)) + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Gateway runtime query failed: {detail}", + ) from exc + + runtime_agents = _as_dict_list(runtime_agents_raw) + runtime_sessions = _as_dict_list(runtime_sessions_raw) + + session_by_agent_key: dict[str, str] = {} + for runtime_session in runtime_sessions: + key = runtime_session.get("key") + if not isinstance(key, str): + continue + parts = key.split(":") + if len(parts) < 2 or parts[0] != "agent": + continue + session_by_agent_key[parts[1]] = key + + existing_agents = await Agent.objects.filter_by(gateway_id=gateway.id).all(self.session) + existing_by_session_key = { + (agent.openclaw_session_id or "").strip(): agent + for agent in existing_agents + if (agent.openclaw_session_id or "").strip() + } + existing_by_runtime_key = { + resolve_agent_key(agent): agent + for agent in existing_agents + } + + lead_board_ids: set[UUID] = set() + normalized_runtime_agents: list[tuple[str, str | None, str]] = [] + for runtime_agent in runtime_agents: + runtime_id = runtime_agent.get("id") + if not isinstance(runtime_id, str) or not runtime_id.strip(): + continue + runtime_key = runtime_id.strip() + runtime_name = runtime_agent.get("name") + normalized_name = runtime_name.strip() if isinstance(runtime_name, str) else None + session_key = _derive_session_key_for_runtime_agent(runtime_key, session_by_agent_key) + lead_board_id = _parse_lead_board_id(session_key) + if lead_board_id is not None: + lead_board_ids.add(lead_board_id) + normalized_runtime_agents.append((runtime_key, normalized_name, session_key)) + + board_by_id: dict[UUID, Board] = {} + if lead_board_ids: + board_rows = ( + await self.session.exec( + select(Board).where( + col(Board.id).in_(lead_board_ids), + col(Board.organization_id) == gateway.organization_id, + col(Board.gateway_id) == gateway.id, + ) + ) + ).all() + board_by_id = {board.id: board for board in board_rows} + + candidates: list[GatewayAgentImportCandidate] = [] + for runtime_id, runtime_name, session_key in normalized_runtime_agents: + tracked_agent = existing_by_session_key.get(session_key) + if tracked_agent is None: + tracked_agent = existing_by_runtime_key.get(runtime_id) + lead_board_id = _parse_lead_board_id(session_key) + inferred_board_id = ( + lead_board_id if lead_board_id is not None and lead_board_id in board_by_id else None + ) + inferred_is_board_lead = inferred_board_id is not None + candidates.append( + GatewayAgentImportCandidate( + gateway_agent_id=runtime_id, + gateway_agent_name=runtime_name, + session_key=session_key, + existing_agent_id=tracked_agent.id if tracked_agent is not None else None, + importable=tracked_agent is None, + inferred_board_id=inferred_board_id, + inferred_is_board_lead=inferred_is_board_lead, + ) + ) + + candidates.sort(key=lambda item: item.gateway_agent_id.lower()) + discovered_count = len(candidates) + importable_count = sum(1 for item in candidates if item.importable) + already_tracked_count = discovered_count - importable_count + return GatewayAgentImportPreviewResponse( + gateway_id=gateway.id, + discovered_count=discovered_count, + importable_count=importable_count, + already_tracked_count=already_tracked_count, + candidates=candidates, + ) + + async def import_gateway_agents( + self, + *, + gateway: Gateway, + gateway_agent_ids: list[str], + reconcile_after_import: bool = False, + rotate_tokens: bool = True, + reset_sessions: bool = True, + force_bootstrap: bool = True, + ) -> GatewayAgentImportResponse: + """Import selected runtime agents as Pipeline agent rows.""" + preview = await self.preview_gateway_agent_import(gateway=gateway) + candidates_by_runtime_id = { + item.gateway_agent_id: item + for item in preview.candidates + } + seen: set[str] = set() + deduped_ids: list[str] = [] + for runtime_id in gateway_agent_ids: + cleaned = runtime_id.strip() + if not cleaned or cleaned in seen: + continue + seen.add(cleaned) + deduped_ids.append(cleaned) + + imported_agent_ids: list[UUID] = [] + skipped_gateway_agent_ids: list[str] = [] + for runtime_id in deduped_ids: + candidate = candidates_by_runtime_id.get(runtime_id) + if candidate is None or not candidate.importable: + skipped_gateway_agent_ids.append(runtime_id) + continue + imported = Agent( + name=candidate.gateway_agent_name or f"Imported {runtime_id}", + board_id=candidate.inferred_board_id, + gateway_id=gateway.id, + status="offline", + heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(), + openclaw_session_id=candidate.session_key, + is_board_lead=candidate.inferred_is_board_lead, + ) + self.session.add(imported) + await self.session.flush() + imported_agent_ids.append(imported.id) + + await self.session.commit() + reconcile_summary: GatewayAgentImportReconcileSummary | None = None + if reconcile_after_import and imported_agent_ids: + reconcile_summary = await self.reconcile_imported_agents( + gateway=gateway, + agent_ids=imported_agent_ids, + rotate_tokens=rotate_tokens, + reset_sessions=reset_sessions, + force_bootstrap=force_bootstrap, + ) + + return GatewayAgentImportResponse( + gateway_id=gateway.id, + imported_count=len(imported_agent_ids), + skipped_count=len(skipped_gateway_agent_ids), + imported_agent_ids=imported_agent_ids, + skipped_gateway_agent_ids=skipped_gateway_agent_ids, + reconcile=reconcile_summary, + ) + + async def reconcile_imported_agents( + self, + *, + gateway: Gateway, + agent_ids: list[UUID], + rotate_tokens: bool, + reset_sessions: bool, + force_bootstrap: bool, + ) -> GatewayAgentImportReconcileSummary: + """Run post-import lifecycle reconcile for imported agent rows.""" + if not agent_ids: + return GatewayAgentImportReconcileSummary( + attempted=0, + updated=0, + skipped=0, + errors=[], + ) + + agents = ( + await self.session.exec( + select(Agent).where( + col(Agent.id).in_(agent_ids), + col(Agent.gateway_id) == gateway.id, + ) + ) + ).all() + boards_by_id: dict[UUID, Board] = {} + board_ids = [agent.board_id for agent in agents if agent.board_id is not None] + if board_ids: + board_rows = ( + await self.session.exec( + select(Board).where( + col(Board.id).in_(board_ids), + col(Board.gateway_id) == gateway.id, + col(Board.organization_id) == gateway.organization_id, + ) + ) + ).all() + boards_by_id = {board.id: board for board in board_rows} + + updated = 0 + skipped = 0 + errors: list[GatewayAgentReconcileError] = [] + main_session_key = GatewayAgentIdentity.session_key(gateway) + for agent in agents: + board: Board | None = None + if agent.board_id is not None: + board = boards_by_id.get(agent.board_id) + if board is None: + skipped += 1 + errors.append( + GatewayAgentReconcileError( + agent_id=agent.id, + message="Skipping reconcile: board is missing or not mapped to gateway.", + ) + ) + continue + elif (agent.openclaw_session_id or "").strip() != main_session_key: + skipped += 1 + errors.append( + GatewayAgentReconcileError( + agent_id=agent.id, + message=( + "Skipping reconcile: boardless imported agent is not the gateway-main " + "session and cannot be reprovisioned automatically." + ), + ) + ) + continue + + if not rotate_tokens: + skipped += 1 + errors.append( + GatewayAgentReconcileError( + agent_id=agent.id, + message=( + "Skipping reconcile: rotate_tokens=false is unsupported for imported " + "agents without an existing backend token hash." + ), + ) + ) + continue + + try: + await AgentLifecycleOrchestrator(self.session).run_lifecycle( + gateway=gateway, + agent_id=agent.id, + board=board, + user=None, + action="update", + auth_token=None, + force_bootstrap=force_bootstrap, + reset_session=reset_sessions, + wake=False, + deliver_wakeup=False, + wakeup_verb="updated", + clear_confirm_token=False, + raise_gateway_errors=True, + ) + except HTTPException as exc: + skipped += 1 + message = exc.detail if isinstance(exc.detail, str) else str(exc.detail) + errors.append( + GatewayAgentReconcileError( + agent_id=agent.id, + message=f"Reconcile failed: {message}", + ) + ) + except Exception as exc: + skipped += 1 + errors.append( + GatewayAgentReconcileError( + agent_id=agent.id, + message=f"Reconcile failed: {exc}", + ) + ) + else: + updated += 1 + + return GatewayAgentImportReconcileSummary( + attempted=len(agents), + updated=updated, + skipped=skipped, + errors=errors, + ) + async def clear_agent_foreign_keys(self, *, agent_id: UUID) -> None: now = utcnow() await crud.update_where( diff --git a/backend/migrations/versions/c1d2e3f4a5b6_unique_agent_session_id.py b/backend/migrations/versions/c1d2e3f4a5b6_unique_agent_session_id.py new file mode 100644 index 0000000..636cbf1 --- /dev/null +++ b/backend/migrations/versions/c1d2e3f4a5b6_unique_agent_session_id.py @@ -0,0 +1,40 @@ +"""Add unique constraint to agents.openclaw_session_id. + +Revision ID: c1d2e3f4a5b6 +Revises: b6c7d8e9f0a1 +Create Date: 2026-05-20 12:00:00.000000 + +""" + +from __future__ import annotations + +from alembic import op + +revision = "c1d2e3f4a5b6" +down_revision = "b6c7d8e9f0a1" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Drop the plain index first, then replace with a unique one. + # NULL values are excluded from unique constraints in PostgreSQL so + # multiple agents with NULL openclaw_session_id are still allowed. + op.drop_index("ix_agents_openclaw_session_id", table_name="agents") + op.create_index( + "ix_agents_openclaw_session_id", + "agents", + ["openclaw_session_id"], + unique=True, + postgresql_where="openclaw_session_id IS NOT NULL", + ) + + +def downgrade() -> None: + op.drop_index("ix_agents_openclaw_session_id", table_name="agents") + op.create_index( + "ix_agents_openclaw_session_id", + "agents", + ["openclaw_session_id"], + unique=False, + ) diff --git a/backend/tests/test_gateway_agent_import_api.py b/backend/tests/test_gateway_agent_import_api.py new file mode 100644 index 0000000..2185c8c --- /dev/null +++ b/backend/tests/test_gateway_agent_import_api.py @@ -0,0 +1,448 @@ +# ruff: noqa: INP001 +"""Integration tests for gateway runtime agent import preview and import APIs.""" + +from __future__ import annotations + +from types import SimpleNamespace +from uuid import uuid4 + +import pytest +from fastapi import APIRouter, FastAPI +from httpx import ASGITransport, AsyncClient +from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine +from sqlmodel import SQLModel, select +from sqlmodel.ext.asyncio.session import AsyncSession + +from app import models as _models +from app.api.deps import require_org_admin +from app.api.gateways import router as gateways_router +from app.db.session import get_session +from app.models.agents import Agent +from app.models.boards import Board +from app.models.gateways import Gateway +from app.models.organization_members import OrganizationMember +from app.models.organizations import Organization +from app.models.users import User +import app.services.openclaw.admin_service as admin_service +from app.core.agent_tokens import hash_agent_token +from app.services.organizations import OrganizationContext + + +async def _make_engine() -> AsyncEngine: + engine = create_async_engine("sqlite+aiosqlite:///:memory:") + async with engine.begin() as conn: + await conn.run_sync(SQLModel.metadata.create_all) + return engine + + +def _build_test_app( + session_maker: async_sessionmaker[AsyncSession], + ctx: OrganizationContext, +) -> FastAPI: + app = FastAPI() + api_v1 = APIRouter(prefix="/api/v1") + api_v1.include_router(gateways_router) + app.include_router(api_v1) + + async def _override_get_session() -> AsyncSession: + async with session_maker() as session: + yield session + + async def _override_require_org_admin() -> OrganizationContext: + return ctx + + app.dependency_overrides[get_session] = _override_get_session + app.dependency_overrides[require_org_admin] = _override_require_org_admin + return app + + +@pytest.mark.asyncio +async def test_gateway_agent_import_preview_reports_importable_and_tracked( + monkeypatch: pytest.MonkeyPatch, +) -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + organization = Organization(id=uuid4(), name="Pipeline") + user = User(id=uuid4(), clerk_user_id="user_123", email="user@example.com") + member = OrganizationMember( + id=uuid4(), + organization_id=organization.id, + user_id=user.id, + role="owner", + all_boards_read=True, + all_boards_write=True, + ) + gateway = Gateway( + id=uuid4(), + organization_id=organization.id, + name="Main Gateway", + url="ws://gateway.example.local", + workspace_root="/workspace", + ) + board = Board( + id=uuid4(), + organization_id=organization.id, + name="Platform", + slug="platform", + gateway_id=gateway.id, + ) + tracked_agent = Agent( + id=uuid4(), + board_id=None, + gateway_id=gateway.id, + name="Already Tracked", + openclaw_session_id="agent:existing-main:main", + is_board_lead=False, + ) + + lead_runtime_id = f"lead-{board.id}" + + async def _fake_openclaw_call( + method: str, + params: object | None = None, + *, + config: object, + ) -> object: + _ = params, config + if method == "agents.list": + return { + "agents": [ + {"id": "existing-main", "name": "Existing Main"}, + {"id": "legacy-worker", "name": "Legacy Worker"}, + {"id": lead_runtime_id, "name": "Legacy Lead"}, + ] + } + if method == "sessions.list": + return { + "sessions": [ + {"key": "agent:existing-main:main"}, + {"key": "agent:legacy-worker:main"}, + {"key": f"agent:{lead_runtime_id}:main"}, + ] + } + raise AssertionError(f"Unexpected method {method}") + + monkeypatch.setattr(admin_service, "openclaw_call", _fake_openclaw_call) + + try: + async with session_maker() as session: + session.add(organization) + session.add(user) + session.add(member) + session.add(gateway) + session.add(board) + session.add(tracked_agent) + await session.commit() + + app = _build_test_app( + session_maker, + OrganizationContext(organization=organization, member=member), + ) + + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://testserver", + ) as client: + response = await client.get( + f"/api/v1/gateways/{gateway.id}/agents/import-preview" + ) + + assert response.status_code == 200 + data = response.json() + assert data["discovered_count"] == 3 + assert data["importable_count"] == 2 + assert data["already_tracked_count"] == 1 + + candidates = {item["gateway_agent_id"]: item for item in data["candidates"]} + assert candidates["existing-main"]["importable"] is False + assert candidates["existing-main"]["existing_agent_id"] == str(tracked_agent.id) + assert candidates["legacy-worker"]["importable"] is True + assert candidates[lead_runtime_id]["importable"] is True + assert candidates[lead_runtime_id]["inferred_board_id"] == str(board.id) + assert candidates[lead_runtime_id]["inferred_is_board_lead"] is True + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_gateway_agent_import_creates_selected_candidates( + monkeypatch: pytest.MonkeyPatch, +) -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + organization = Organization(id=uuid4(), name="Pipeline") + user = User(id=uuid4(), clerk_user_id="user_123", email="user@example.com") + member = OrganizationMember( + id=uuid4(), + organization_id=organization.id, + user_id=user.id, + role="owner", + all_boards_read=True, + all_boards_write=True, + ) + gateway = Gateway( + id=uuid4(), + organization_id=organization.id, + name="Main Gateway", + url="ws://gateway.example.local", + workspace_root="/workspace", + ) + board = Board( + id=uuid4(), + organization_id=organization.id, + name="Platform", + slug="platform", + gateway_id=gateway.id, + ) + tracked_agent = Agent( + id=uuid4(), + board_id=None, + gateway_id=gateway.id, + name="Already Tracked", + openclaw_session_id="agent:existing-main:main", + is_board_lead=False, + ) + + lead_runtime_id = f"lead-{board.id}" + + async def _fake_openclaw_call( + method: str, + params: object | None = None, + *, + config: object, + ) -> object: + _ = params, config + if method == "agents.list": + return { + "agents": [ + {"id": "existing-main", "name": "Existing Main"}, + {"id": "legacy-worker", "name": "Legacy Worker"}, + {"id": lead_runtime_id, "name": "Legacy Lead"}, + ] + } + if method == "sessions.list": + return { + "sessions": [ + {"key": "agent:existing-main:main"}, + {"key": "agent:legacy-worker:main"}, + {"key": f"agent:{lead_runtime_id}:main"}, + ] + } + raise AssertionError(f"Unexpected method {method}") + + monkeypatch.setattr(admin_service, "openclaw_call", _fake_openclaw_call) + + try: + async with session_maker() as session: + session.add(organization) + session.add(user) + session.add(member) + session.add(gateway) + session.add(board) + session.add(tracked_agent) + await session.commit() + + app = _build_test_app( + session_maker, + OrganizationContext(organization=organization, member=member), + ) + + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://testserver", + ) as client: + response = await client.post( + f"/api/v1/gateways/{gateway.id}/agents/import", + json={ + "gateway_agent_ids": [ + "existing-main", + "legacy-worker", + lead_runtime_id, + "unknown-agent", + ] + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["imported_count"] == 2 + assert data["skipped_count"] == 2 + assert set(data["skipped_gateway_agent_ids"]) == {"existing-main", "unknown-agent"} + assert len(data["imported_agent_ids"]) == 2 + + async with session_maker() as session: + gateway_agents = ( + await session.exec( + select(Agent).where(Agent.gateway_id == gateway.id) + ) + ).all() + # original tracked + imported worker + imported lead + assert len(gateway_agents) == 3 + by_name = {agent.name: agent for agent in gateway_agents} + imported_worker = by_name["Legacy Worker"] + imported_lead = by_name["Legacy Lead"] + assert imported_worker.board_id is None + assert imported_worker.is_board_lead is False + assert imported_worker.openclaw_session_id == "agent:legacy-worker:main" + assert imported_lead.board_id == board.id + assert imported_lead.is_board_lead is True + assert imported_lead.openclaw_session_id == f"agent:{lead_runtime_id}:main" + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_gateway_agent_import_reconcile_updates_supported_agents( + monkeypatch: pytest.MonkeyPatch, +) -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + organization = Organization(id=uuid4(), name="Pipeline") + user = User(id=uuid4(), clerk_user_id="user_123", email="user@example.com") + member = OrganizationMember( + id=uuid4(), + organization_id=organization.id, + user_id=user.id, + role="owner", + all_boards_read=True, + all_boards_write=True, + ) + gateway = Gateway( + id=uuid4(), + organization_id=organization.id, + name="Main Gateway", + url="ws://gateway.example.local", + workspace_root="/workspace", + ) + board = Board( + id=uuid4(), + organization_id=organization.id, + name="Platform", + slug="platform", + gateway_id=gateway.id, + ) + + lead_runtime_id = f"lead-{board.id}" + + async def _fake_openclaw_call( + method: str, + params: object | None = None, + *, + config: object, + ) -> object: + _ = params, config + if method == "agents.list": + return { + "agents": [ + {"id": "legacy-worker", "name": "Legacy Worker"}, + {"id": lead_runtime_id, "name": "Legacy Lead"}, + ] + } + if method == "sessions.list": + return { + "sessions": [ + {"key": "agent:legacy-worker:main"}, + {"key": f"agent:{lead_runtime_id}:main"}, + ] + } + raise AssertionError(f"Unexpected method {method}") + + async def _fake_run_lifecycle( + self: object, + *, + gateway: Gateway, + agent_id: object, + board: Board | None, + user: object, + action: str, + auth_token: str | None, + force_bootstrap: bool, + reset_session: bool, + wake: bool, + deliver_wakeup: bool, + wakeup_verb: str | None, + clear_confirm_token: bool, + raise_gateway_errors: bool, + ) -> Agent: + _ = ( + user, + action, + auth_token, + force_bootstrap, + reset_session, + wake, + deliver_wakeup, + wakeup_verb, + clear_confirm_token, + raise_gateway_errors, + ) + assert board is not None + session = getattr(self, "session") + agent = await session.get(Agent, agent_id) + assert agent is not None + assert agent.gateway_id == gateway.id + agent.status = "online" + agent.agent_token_hash = hash_agent_token("rotated-token") + session.add(agent) + await session.commit() + await session.refresh(agent) + return agent + + monkeypatch.setattr(admin_service, "openclaw_call", _fake_openclaw_call) + monkeypatch.setattr( + admin_service.AgentLifecycleOrchestrator, + "run_lifecycle", + _fake_run_lifecycle, + ) + + try: + async with session_maker() as session: + session.add(organization) + session.add(user) + session.add(member) + session.add(gateway) + session.add(board) + await session.commit() + + app = _build_test_app( + session_maker, + OrganizationContext(organization=organization, member=member), + ) + + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://testserver", + ) as client: + response = await client.post( + f"/api/v1/gateways/{gateway.id}/agents/import", + json={ + "gateway_agent_ids": ["legacy-worker", lead_runtime_id], + "reconcile_after_import": True, + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["imported_count"] == 2 + assert data["reconcile"] is not None + assert data["reconcile"]["attempted"] == 2 + assert data["reconcile"]["updated"] == 1 + assert data["reconcile"]["skipped"] == 1 + assert len(data["reconcile"]["errors"]) == 1 + + async with session_maker() as session: + agents = ( + await session.exec( + select(Agent).where(Agent.gateway_id == gateway.id) + ) + ).all() + by_name = {agent.name: agent for agent in agents} + assert by_name["Legacy Lead"].status == "online" + assert by_name["Legacy Lead"].agent_token_hash + assert by_name["Legacy Worker"].status == "offline" + assert by_name["Legacy Worker"].agent_token_hash is None + finally: + await engine.dispose() diff --git a/frontend/src/app/gateways/page.tsx b/frontend/src/app/gateways/page.tsx index e4dc868..02bcea1 100644 --- a/frontend/src/app/gateways/page.tsx +++ b/frontend/src/app/gateways/page.tsx @@ -8,6 +8,7 @@ import Link from "next/link"; import { useAuth } from "@/auth/clerk"; import { useQueryClient } from "@tanstack/react-query"; +import { GatewayAgentImportDialog } from "@/components/gateways/GatewayAgentImportDialog"; import { GatewaysTable } from "@/components/gateways/GatewaysTable"; import { DashboardPageLayout } from "@/components/templates/DashboardPageLayout"; import { buttonVariants } from "@/components/ui/button"; @@ -20,6 +21,7 @@ import { useDeleteGatewayApiV1GatewaysGatewayIdDelete, useListGatewaysApiV1GatewaysGet, } from "@/api/generated/gateways/gateways"; +import { getListAgentsApiV1AgentsGetQueryKey } from "@/api/generated/agents/agents"; import { createOptimisticListDeleteMutation } from "@/lib/list-delete"; import { useOrganizationMembership } from "@/lib/use-organization-membership"; import type { GatewayRead } from "@/api/generated/model"; @@ -38,8 +40,10 @@ export default function GatewaysPage() { const { isAdmin } = useOrganizationMembership(isSignedIn); const [deleteTarget, setDeleteTarget] = useState(null); + const [importTarget, setImportTarget] = useState(null); const gatewaysKey = getListGatewaysApiV1GatewaysGetQueryKey(); + const agentsKey = getListAgentsApiV1AgentsGetQueryKey(); const gatewaysQuery = useListGatewaysApiV1GatewaysGet< listGatewaysApiV1GatewaysGetResponse, ApiError @@ -87,6 +91,11 @@ export default function GatewaysPage() { deleteMutation.mutate({ gatewayId: deleteTarget.id }); }; + const handleImported = async () => { + await queryClient.invalidateQueries({ queryKey: gatewaysKey }); + await queryClient.invalidateQueries({ queryKey: agentsKey }); + }; + return ( <> + + { + if (!open) { + setImportTarget(null); + } + }} + gateway={importTarget} + onImported={handleImported} + /> ); } diff --git a/frontend/src/components/gateways/GatewayAgentImportDialog.tsx b/frontend/src/components/gateways/GatewayAgentImportDialog.tsx new file mode 100644 index 0000000..cc2946a --- /dev/null +++ b/frontend/src/components/gateways/GatewayAgentImportDialog.tsx @@ -0,0 +1,345 @@ +"use client"; + +import { useMemo, useState } from "react"; + +import { useMutation, useQuery } from "@tanstack/react-query"; + +import type { GatewayRead } from "@/api/generated/model"; +import { ApiError } from "@/api/mutator"; +import { Badge } from "@/components/ui/badge"; +import { Button } from "@/components/ui/button"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import { + getGatewayAgentImportPreview, + importGatewayAgents, + type GatewayAgentImportCandidate, + type GatewayAgentImportPreviewResponse, + type GatewayAgentImportResponse, +} from "@/lib/api-gateway-agent-import"; + +type ImportMode = "import_only" | "import_and_reconcile"; + +type GatewayAgentImportDialogProps = { + open: boolean; + onOpenChange: (open: boolean) => void; + gateway: GatewayRead | null; + onImported: (result: GatewayAgentImportResponse) => void; +}; + +function shortUuid(value: string | null): string { + if (!value) { + return "—"; + } + return value.length > 14 ? `${value.slice(0, 6)}…${value.slice(-6)}` : value; +} + +export function GatewayAgentImportDialog({ + open, + onOpenChange, + gateway, + onImported, +}: GatewayAgentImportDialogProps) { + const [manualSelectedAgentIds, setManualSelectedAgentIds] = useState | null>(null); + const [importMode, setImportMode] = useState("import_and_reconcile"); + const [resultMessage, setResultMessage] = useState(null); + + const previewQuery = useQuery({ + queryKey: ["gateway-agent-import-preview", gateway?.id], + queryFn: async () => { + if (!gateway) { + throw new Error("Gateway is required"); + } + return getGatewayAgentImportPreview(gateway.id); + }, + enabled: open && Boolean(gateway?.id), + staleTime: 0, + refetchOnMount: "always", + }); + + const importableCandidates = useMemo( + () => + (previewQuery.data?.candidates ?? []).filter( + (candidate) => candidate.importable, + ), + [previewQuery.data?.candidates], + ); + const defaultSelectedAgentIds = useMemo( + () => + new Set( + importableCandidates.map((candidate) => candidate.gateway_agent_id), + ), + [importableCandidates], + ); + const selectedAgentIds = manualSelectedAgentIds ?? defaultSelectedAgentIds; + + const importMutation = useMutation< + GatewayAgentImportResponse, + ApiError, + string[] + >({ + mutationFn: async (gatewayAgentIds) => { + if (!gateway) { + throw new Error("Gateway is required"); + } + return importGatewayAgents(gateway.id, { + gateway_agent_ids: gatewayAgentIds, + reconcile_after_import: importMode === "import_and_reconcile", + rotate_tokens: importMode === "import_and_reconcile", + reset_sessions: importMode === "import_and_reconcile", + force_bootstrap: importMode === "import_and_reconcile", + }); + }, + onSuccess: (result) => { + const reconcileText = result.reconcile + ? ` Reconcile updated ${result.reconcile.updated}/${result.reconcile.attempted}.` + : ""; + setResultMessage( + `Imported ${result.imported_count} agent${result.imported_count === 1 ? "" : "s"}.${reconcileText}`, + ); + onImported(result); + }, + }); + + const selectedCount = selectedAgentIds.size; + const canImport = selectedCount > 0 && !importMutation.isPending; + + const toggleCandidate = (candidate: GatewayAgentImportCandidate) => { + if (!candidate.importable) { + return; + } + setManualSelectedAgentIds((prev) => { + const next = new Set(prev ?? selectedAgentIds); + if (next.has(candidate.gateway_agent_id)) { + next.delete(candidate.gateway_agent_id); + } else { + next.add(candidate.gateway_agent_id); + } + return next; + }); + }; + + const selectAllImportable = () => { + setManualSelectedAgentIds( + new Set(importableCandidates.map((candidate) => candidate.gateway_agent_id)), + ); + }; + + const clearSelection = () => { + setManualSelectedAgentIds(new Set()); + }; + + const handleImport = () => { + importMutation.mutate([...selectedAgentIds]); + }; + + const handleOpenChange = (nextOpen: boolean) => { + if (!nextOpen) { + setManualSelectedAgentIds(null); + setResultMessage(null); + } + onOpenChange(nextOpen); + }; + + return ( + + + + Import Existing Gateway Agents + + Preview gateway runtime agents for {gateway?.name}, + then import selected entries into Pipeline. + + + +
+
+
+
Discovered
+
+ {previewQuery.data?.discovered_count ?? 0} +
+
+
+
Importable
+
+ {previewQuery.data?.importable_count ?? 0} +
+
+
+
Already Tracked
+
+ {previewQuery.data?.already_tracked_count ?? 0} +
+
+
+ +
+
+ Selected {selectedCount} of {importableCandidates.length} importable + agents +
+
+ + +
+
+ +
+ + + + + + + + + + + + {previewQuery.isLoading ? ( + + + + ) : previewQuery.error ? ( + + + + ) : previewQuery.data?.candidates.length ? ( + previewQuery.data.candidates.map((candidate) => { + const isSelected = selectedAgentIds.has( + candidate.gateway_agent_id, + ); + return ( + + + + + + + + ); + }) + ) : ( + + + + )} + +
AgentSessionBoardStatusSelect
+ Loading import preview… +
+ {previewQuery.error.message} +
+
+ {candidate.gateway_agent_name ?? candidate.gateway_agent_id} +
+
+ {candidate.gateway_agent_id} +
+
+ {candidate.session_key ?? "—"} + + {shortUuid(candidate.inferred_board_id)} + + {candidate.importable ? ( + Importable + ) : ( + Tracked + )} + + {candidate.importable ? ( + + ) : ( + + — + + )} +
+ No gateway agents found. +
+
+ +
+ + +

+ Reconcile rotates tokens and pushes templates so imported agents can + check in with Pipeline auth. +

+
+ + {resultMessage ? ( +

{resultMessage}

+ ) : null} + {importMutation.error ? ( +

{importMutation.error.message}

+ ) : null} +
+ + + + + +
+
+ ); +} diff --git a/frontend/src/components/gateways/GatewaysTable.tsx b/frontend/src/components/gateways/GatewaysTable.tsx index a6aaf3b..92865f4 100644 --- a/frontend/src/components/gateways/GatewaysTable.tsx +++ b/frontend/src/components/gateways/GatewaysTable.tsx @@ -30,6 +30,7 @@ type GatewaysTableProps = { columnOrder?: string[]; disableSorting?: boolean; onDelete?: (gateway: GatewayRead) => void; + onImport?: (gateway: GatewayRead) => void; emptyMessage?: string; emptyState?: Omit & { icon?: DataTableEmptyState["icon"]; @@ -62,6 +63,7 @@ export function GatewaysTable({ columnOrder, disableSorting = false, onDelete, + onImport, emptyMessage = "No gateways found.", emptyState, }: GatewaysTableProps) { @@ -139,8 +141,31 @@ export function GatewaysTable({ rowActions={ showActions ? { - getEditHref: (gateway) => `/gateways/${gateway.id}/edit`, - onDelete, + actions: [ + { + key: "edit", + label: "Edit", + href: (gateway) => `/gateways/${gateway.id}/edit`, + }, + ...(onImport + ? [ + { + key: "import-agents", + label: "Import Agents", + onClick: onImport, + }, + ] + : []), + ...(onDelete + ? [ + { + key: "delete", + label: "Delete", + onClick: onDelete, + }, + ] + : []), + ], } : undefined } diff --git a/frontend/src/components/git/ForgejoHeatmap.tsx b/frontend/src/components/git/ForgejoHeatmap.tsx index fa2a4a8..489a653 100644 --- a/frontend/src/components/git/ForgejoHeatmap.tsx +++ b/frontend/src/components/git/ForgejoHeatmap.tsx @@ -51,12 +51,13 @@ export function ForgejoHeatmap({ const { weeks, monthLabels } = useMemo(() => { const data = new Map(days.map((d) => [d.date, d.count])); - // Start on the Sunday that is ~52 weeks before today + // Start on the Sunday 52 weeks before the current week's Sunday, + // so the last column always contains today and future cells are clipped. const today = new Date(); today.setHours(0, 0, 0, 0); const start = new Date(today); - start.setDate(start.getDate() - WEEKS * 7 + 1); - start.setDate(start.getDate() - start.getDay()); // rewind to Sunday + start.setDate(start.getDate() - start.getDay()); // rewind to this week's Sunday + start.setDate(start.getDate() - (WEEKS - 1) * 7); // go back 52 more weeks type Cell = { date: string; count: number; future: boolean }; const builtWeeks: Cell[][] = []; @@ -96,7 +97,7 @@ export function ForgejoHeatmap({ return (
-
+
))} - {/* Day-of-week labels: Mon=row1, Wed=row3, Fri=row5 */} + {/* Day-of-week labels: Mon=row1, Wed=row3, Fri=row5 + y = cell top + CELL - 2 puts the baseline near the bottom of the + 10px cell, which reads as vertically centred for a 10px font. */} {(["Mon", "Wed", "Fri"] as const).map((label, i) => ( {/* Legend */} -
+
Less {LEVEL_FILL.map((fill, i) => (
= { + data: T; + status: number; + headers: Headers; +}; + +export type GatewayAgentImportCandidate = { + gateway_agent_id: string; + gateway_agent_name: string | null; + session_key: string | null; + existing_agent_id: string | null; + importable: boolean; + inferred_board_id: string | null; + inferred_is_board_lead: boolean; +}; + +export type GatewayAgentImportPreviewResponse = { + gateway_id: string; + discovered_count: number; + importable_count: number; + already_tracked_count: number; + candidates: GatewayAgentImportCandidate[]; +}; + +export type GatewayAgentReconcileError = { + agent_id: string | null; + message: string; +}; + +export type GatewayAgentImportReconcileSummary = { + attempted: number; + updated: number; + skipped: number; + errors: GatewayAgentReconcileError[]; +}; + +export type GatewayAgentImportResponse = { + gateway_id: string; + imported_count: number; + skipped_count: number; + imported_agent_ids: string[]; + skipped_gateway_agent_ids: string[]; + reconcile: GatewayAgentImportReconcileSummary | null; +}; + +export type GatewayAgentImportRequest = { + gateway_agent_ids: string[]; + reconcile_after_import: boolean; + rotate_tokens: boolean; + reset_sessions: boolean; + force_bootstrap: boolean; +}; + +async function fetchJson(path: string, init?: RequestInit): Promise { + const response = await customFetch>(path, init ?? {}); + return response.data; +} + +export async function getGatewayAgentImportPreview( + gatewayId: string, +): Promise { + return fetchJson( + `/api/v1/gateways/${gatewayId}/agents/import-preview`, + ); +} + +export async function importGatewayAgents( + gatewayId: string, + payload: GatewayAgentImportRequest, +): Promise { + return fetchJson( + `/api/v1/gateways/${gatewayId}/agents/import`, + { + method: "POST", + body: JSON.stringify(payload), + }, + ); +}