"""Thin API wrappers for gateway CRUD and template synchronization.""" from __future__ import annotations import asyncio import json from typing import TYPE_CHECKING from uuid import UUID, uuid4 from fastapi import APIRouter, Depends, Query, Request from sqlmodel import col from sse_starlette.sse import EventSourceResponse from app.api.deps import require_org_admin, require_org_member from app.core.auth import AuthContext, get_auth_context from app.core.config import settings from app.core.time import utcnow from app.db import crud from app.db.pagination import paginate from app.db.session import get_session from app.models.agents import Agent from app.models.gateways import Gateway from app.models.skills import GatewayInstalledSkill from app.schemas.common import OkResponse from app.schemas.gateways import ( GatewayAgentImportPreviewResponse, GatewayAgentImportRequest, GatewayAgentImportResponse, GatewayCreate, GatewayRead, GatewayTemplatesSyncResult, GatewayUpdate, ) from app.schemas.gateway_ops import ( CronJobRead, CronStatusResponse, HealthSnapshotRead, SystemHealthResponse, ) from app.schemas.runtime_usage import ( ClaudeStatuslineUsageIn, ProviderUsageResponse, ProviderUsageScrapeResult, ProviderUsageWindow, RuntimeUsageResponse, ) from app.db.session import async_session_maker from app.services.activity_log import record_activity from app.services.openclaw.cron_status import ( compute_job_status, fetch_cron_jobs, ) from app.services.openclaw.system_health import ( DEFAULT_HISTORY_WINDOW_HOURS, fetch_health, get_history, ) from app.services.openclaw.runtime_activity import ( HISTORY_FETCH_LIMIT, POLL_HISTORY_SESSIONS_MAX, fetch_recent_events, ) from app.services.openclaw.gateway_rpc import openclaw_call from app.services.openclaw.runtime_usage import get_runtime_usage from app.services.openclaw.usage_scrapers import ( get_cached_claude_statusline_usage, get_provider_usage, store_claude_statusline_usage, ) from app.schemas.pagination import DefaultLimitOffsetPage from app.services.openclaw.admin_service import GatewayAdminLifecycleService from app.services.openclaw.session_service import GatewayTemplateSyncQuery if TYPE_CHECKING: from fastapi_pagination.limit_offset import LimitOffsetPage from sqlmodel.ext.asyncio.session import AsyncSession from app.services.organizations import OrganizationContext router = APIRouter(prefix="/gateways", tags=["gateways"]) SESSION_DEP = Depends(get_session) AUTH_DEP = Depends(get_auth_context) ORG_ADMIN_DEP = Depends(require_org_admin) ORG_MEMBER_DEP = Depends(require_org_member) INCLUDE_MAIN_QUERY = Query(default=True) RESET_SESSIONS_QUERY = Query(default=False) ROTATE_TOKENS_QUERY = Query(default=False) FORCE_BOOTSTRAP_QUERY = Query(default=False) OVERWRITE_QUERY = Query(default=False) LEAD_ONLY_QUERY = Query(default=False) BOARD_ID_QUERY = Query(default=None) _RUNTIME_TYPE_REFERENCES = (UUID,) def _template_sync_query( *, include_main: bool = INCLUDE_MAIN_QUERY, lead_only: bool = LEAD_ONLY_QUERY, reset_sessions: bool = RESET_SESSIONS_QUERY, rotate_tokens: bool = ROTATE_TOKENS_QUERY, force_bootstrap: bool = FORCE_BOOTSTRAP_QUERY, overwrite: bool = OVERWRITE_QUERY, board_id: UUID | None = BOARD_ID_QUERY, ) -> GatewayTemplateSyncQuery: return GatewayTemplateSyncQuery( include_main=include_main, lead_only=lead_only, reset_sessions=reset_sessions, rotate_tokens=rotate_tokens, force_bootstrap=force_bootstrap, overwrite=overwrite, board_id=board_id, ) SYNC_QUERY_DEP = Depends(_template_sync_query) @router.get("", response_model=DefaultLimitOffsetPage[GatewayRead]) async def list_gateways( session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> LimitOffsetPage[GatewayRead]: """List gateways for the caller's organization.""" statement = ( Gateway.objects.filter_by(organization_id=ctx.organization.id) .order_by(col(Gateway.created_at).desc()) .statement ) return await paginate(session, statement) @router.post("", response_model=GatewayRead) async def create_gateway( payload: GatewayCreate, session: AsyncSession = SESSION_DEP, auth: AuthContext = AUTH_DEP, ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> Gateway: """Create a gateway and provision or refresh its main agent.""" service = GatewayAdminLifecycleService(session) await service.assert_gateway_runtime_compatible( url=payload.url, token=payload.token, allow_insecure_tls=payload.allow_insecure_tls, disable_device_pairing=payload.disable_device_pairing, ) data = payload.model_dump() gateway_id = uuid4() data["id"] = gateway_id data["organization_id"] = ctx.organization.id gateway = await crud.create(session, Gateway, **data) await service.ensure_main_agent(gateway, auth, action="provision") return gateway @router.get("/{gateway_id}", response_model=GatewayRead) async def get_gateway( gateway_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> Gateway: """Return one gateway by id for the caller's organization.""" service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) return gateway @router.patch("/{gateway_id}", response_model=GatewayRead) async def update_gateway( gateway_id: UUID, payload: GatewayUpdate, session: AsyncSession = SESSION_DEP, auth: AuthContext = AUTH_DEP, ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> Gateway: """Patch a gateway and refresh the main-agent provisioning state.""" service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) updates = payload.model_dump(exclude_unset=True) if ( "url" in updates or "token" in updates or "allow_insecure_tls" in updates or "disable_device_pairing" in updates ): raw_next_url = updates.get("url", gateway.url) next_url = raw_next_url.strip() if isinstance(raw_next_url, str) else "" next_token = updates.get("token", gateway.token) next_allow_insecure_tls = bool( updates.get("allow_insecure_tls", gateway.allow_insecure_tls), ) next_disable_device_pairing = bool( updates.get("disable_device_pairing", gateway.disable_device_pairing), ) if next_url: await service.assert_gateway_runtime_compatible( url=next_url, token=next_token, allow_insecure_tls=next_allow_insecure_tls, disable_device_pairing=next_disable_device_pairing, ) await crud.patch(session, gateway, updates) await service.ensure_main_agent(gateway, auth, action="update") return gateway @router.post("/{gateway_id}/templates/sync", response_model=GatewayTemplatesSyncResult) async def sync_gateway_templates( gateway_id: UUID, sync_query: GatewayTemplateSyncQuery = SYNC_QUERY_DEP, session: AsyncSession = SESSION_DEP, auth: AuthContext = AUTH_DEP, ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> GatewayTemplatesSyncResult: """Sync templates for a gateway and optionally rotate runtime settings.""" service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) return await service.sync_templates(gateway, query=sync_query, auth=auth) @router.get( "/{gateway_id}/agents/import-preview", response_model=GatewayAgentImportPreviewResponse, ) async def preview_import_gateway_agents( gateway_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> GatewayAgentImportPreviewResponse: """Preview existing gateway runtime agents that can be imported.""" service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) return await service.preview_gateway_agent_import(gateway=gateway) @router.post( "/{gateway_id}/agents/import", response_model=GatewayAgentImportResponse, ) async def import_gateway_agents( gateway_id: UUID, payload: GatewayAgentImportRequest, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> GatewayAgentImportResponse: """Import selected existing gateway runtime agents into Pipeline.""" service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) return await service.import_gateway_agents( gateway=gateway, gateway_agent_ids=payload.gateway_agent_ids, reconcile_after_import=payload.reconcile_after_import, rotate_tokens=payload.rotate_tokens, reset_sessions=payload.reset_sessions, force_bootstrap=payload.force_bootstrap, ) @router.get( "/{gateway_id}/runtime-usage", response_model=RuntimeUsageResponse, summary="Gateway runtime usage", description=( "Return model usage, token counts, estimated spend, burn rate, and " "time-remaining predictions for the specified gateway. " "Visible to all organisation members." ), ) async def get_gateway_runtime_usage( gateway_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> RuntimeUsageResponse: """Aggregate runtime usage from the gateway's usage.cost / usage.status RPC methods.""" from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) config = GatewayClientConfig( url=gateway.url, token=gateway.token, allow_insecure_tls=gateway.allow_insecure_tls, disable_device_pairing=gateway.disable_device_pairing, ) account_key = gateway.name.lower().replace(" ", "-") if gateway.name else "default" return await get_runtime_usage( gateway_id=gateway.id, config=config, account_key=account_key, ) @router.get( "/{gateway_id}/provider-usage", response_model=ProviderUsageResponse, summary="Gateway provider-native usage (opt-in scraper)", description=( "Return provider-native subscription usage data scraped from the CLI " "(e.g. ``claude /usage``). Returns an empty results list when " "USAGE_SCRAPER_ENABLED=false (the default). " "Enable with USAGE_SCRAPER_ENABLED=true and ensure the required " "prerequisites (tmux, claude binary) are accessible from the backend process." ), ) async def get_gateway_provider_usage( gateway_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> ProviderUsageResponse: """Scrape provider-native usage for the specified gateway (opt-in).""" service = GatewayAdminLifecycleService(session) await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) now = utcnow() statusline_result = get_cached_claude_statusline_usage(str(gateway_id)) if not settings.usage_scraper_enabled and statusline_result is None: return ProviderUsageResponse( gateway_id=gateway_id, generated_at=now, scraper_enabled=False, results=[], ) scrape_results = [] if statusline_result is not None: scrape_results.append(statusline_result) if settings.usage_scraper_enabled: enabled_providers = [ p.strip() for p in settings.usage_scraper_providers.split(",") if p.strip() ] scrape_results.extend( await get_provider_usage( gateway_id=str(gateway_id), enabled_providers=enabled_providers, tmux_socket=settings.usage_scraper_tmux_socket, include_raw=settings.usage_scraper_include_raw, ), ) results = [ ProviderUsageScrapeResult( provider=r.provider, source_name=r.source_name, scraped_at=r.scraped_at, fresh=r.fresh, freshness_ttl_seconds=r.freshness_ttl_seconds, windows=[ ProviderUsageWindow( key=w.key, label=w.label, pct_used=w.pct_used, remaining_ms=w.remaining_ms, remaining_label=w.remaining_label, extra_text=w.extra_text, source=w.source, confidence=w.confidence, ) for w in r.parsed.windows ], current_pct=r.parsed.current_pct, remaining_ms=r.parsed.remaining_ms, remaining_label=r.parsed.remaining_label, weekly_messages_used=r.parsed.weekly_messages_used, weekly_messages_limit=r.parsed.weekly_messages_limit, weekly_tokens_used=r.parsed.weekly_tokens_used, weekly_cost_usd=r.parsed.weekly_cost_usd, raw_text=r.parsed.raw_text, error=r.error or r.parsed.error, source=r.parsed.source, confidence=r.parsed.confidence, ) for r in scrape_results ] return ProviderUsageResponse( gateway_id=gateway_id, generated_at=now, scraper_enabled=True, results=results, ) @router.post( "/{gateway_id}/provider-usage/claude/statusline", response_model=ProviderUsageScrapeResult, summary="Ingest Claude Code status-line usage", description=( "Store provider-native Claude Code usage windows from a local status-line " "collector. The payload should contain Claude Code's rate_limits object; " "Pipeline stores only usage percentages and reset times." ), ) async def ingest_gateway_claude_statusline_usage( gateway_id: UUID, payload: ClaudeStatuslineUsageIn, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> ProviderUsageScrapeResult: """Accept sanitized Claude Code status-line usage for the specified gateway.""" service = GatewayAdminLifecycleService(session) await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) scrape = store_claude_statusline_usage( str(gateway_id), payload.model_dump(exclude_none=True), ) return ProviderUsageScrapeResult( provider=scrape.provider, source_name=scrape.source_name, scraped_at=scrape.scraped_at, fresh=scrape.fresh, freshness_ttl_seconds=scrape.freshness_ttl_seconds, windows=[ ProviderUsageWindow( key=w.key, label=w.label, pct_used=w.pct_used, remaining_ms=w.remaining_ms, remaining_label=w.remaining_label, extra_text=w.extra_text, source=w.source, confidence=w.confidence, ) for w in scrape.parsed.windows ], current_pct=scrape.parsed.current_pct, remaining_ms=scrape.parsed.remaining_ms, remaining_label=scrape.parsed.remaining_label, raw_text=None, error=scrape.error or scrape.parsed.error, source=scrape.parsed.source, confidence=scrape.parsed.confidence, ) @router.get( "/{gateway_id}/cron", response_model=CronStatusResponse, summary="Gateway cron job status", description="Return the list of cron jobs configured on the gateway with their last-run status.", ) async def get_gateway_cron( gateway_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> CronStatusResponse: """Read cron job status from the gateway (read-only).""" from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) config = GatewayClientConfig( url=gateway.url, token=gateway.token, allow_insecure_tls=gateway.allow_insecure_tls, disable_device_pairing=gateway.disable_device_pairing, ) jobs = await fetch_cron_jobs(config) return CronStatusResponse( gateway_id=gateway_id, generated_at=utcnow(), jobs=[ CronJobRead( name=j.name, schedule=j.schedule, enabled=j.enabled, status=compute_job_status(j), last_run=j.last_run, next_run=j.next_run, last_duration_ms=j.last_duration_ms, last_error=j.last_error, ) for j in jobs ], ) @router.get( "/{gateway_id}/models", summary="List models available on the gateway", description="Return the model IDs advertised by the OpenClaw gateway via models.list.", ) async def get_gateway_models( gateway_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> dict: """Return available model IDs from the gateway.""" from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) config = GatewayClientConfig( url=gateway.url, token=gateway.token, allow_insecure_tls=gateway.allow_insecure_tls, disable_device_pairing=gateway.disable_device_pairing, ) try: raw = await openclaw_call("models.list", config=config) except Exception: raw = None if isinstance(raw, list): models = [str(m) for m in raw if m] elif isinstance(raw, dict): nested = raw.get("models") or raw.get("items") or raw.get("data") or [] models = [str(m) for m in nested if m] if isinstance(nested, list) else [] else: models = [] return {"gateway_id": str(gateway_id), "models": models} @router.get( "/{gateway_id}/health", response_model=SystemHealthResponse, summary="Gateway system health", description="Return current CPU, RAM, disk, and uptime stats plus a 24-hour history.", ) async def get_gateway_health( gateway_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> SystemHealthResponse: """Read system health from the gateway and append to the rolling history.""" from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) config = GatewayClientConfig( url=gateway.url, token=gateway.token, allow_insecure_tls=gateway.allow_insecure_tls, disable_device_pairing=gateway.disable_device_pairing, ) snapshot = await fetch_health(str(gateway_id), config, record=True) history = get_history(str(gateway_id)) def _snap_read(s) -> HealthSnapshotRead: return HealthSnapshotRead( recorded_at=s.recorded_at, cpu_pct=s.cpu_pct, memory_pct=s.memory_pct, memory_used_bytes=s.memory_used_bytes, memory_total_bytes=s.memory_total_bytes, disk_pct=s.disk_pct, disk_used_bytes=s.disk_used_bytes, disk_total_bytes=s.disk_total_bytes, uptime_seconds=s.uptime_seconds, load_avg_1m=s.load_avg_1m, load_avg_5m=s.load_avg_5m, load_avg_15m=s.load_avg_15m, hostname=s.hostname, platform=s.platform, ) return SystemHealthResponse( gateway_id=gateway_id, generated_at=utcnow(), current=_snap_read(snapshot), history=[_snap_read(s) for s in history], history_window_hours=DEFAULT_HISTORY_WINDOW_HOURS, ) # --------------------------------------------------------------------------- # Admin: logs and config (org-admin gate) # --------------------------------------------------------------------------- _SECRET_KEY_PATTERNS = frozenset( {"token", "secret", "password", "key", "credential", "auth", "bearer", "apikey", "api_key"} ) def _mask_config_value(key: str, value: object) -> object: """Return the value with secrets replaced by a masked string.""" if isinstance(value, str) and any(p in key.lower() for p in _SECRET_KEY_PATTERNS): return f"••••{value[-4:]}" if len(value) > 4 else "••••" return value @router.get( "/{gateway_id}/logs", summary="Tail gateway logs (admin)", description="Return the most recent log lines from the gateway process. Admin-only.", ) async def get_gateway_logs( gateway_id: UUID, lines: int = Query(default=100, ge=1, le=500, description="Number of lines to return"), session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> dict: """Return recent log lines from the gateway (read-only).""" from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) config = GatewayClientConfig( url=gateway.url, token=gateway.token, allow_insecure_tls=gateway.allow_insecure_tls, disable_device_pairing=gateway.disable_device_pairing, ) try: raw = await openclaw_call("logs.tail", {"lines": lines}, config=config) except Exception: raw = None if isinstance(raw, list): log_lines = [str(line) for line in raw if line is not None] elif isinstance(raw, str): log_lines = raw.splitlines() elif isinstance(raw, dict): nested = raw.get("lines") or raw.get("logs") or raw.get("output") or [] log_lines = [str(line) for line in nested] if isinstance(nested, list) else str(raw).splitlines() else: log_lines = [] return { "gateway_id": str(gateway_id), "generated_at": utcnow().isoformat(), "lines": log_lines, "lines_requested": lines, } @router.get( "/{gateway_id}/config", summary="Read gateway config (admin)", description="Return the current gateway configuration with secrets masked. Admin-only.", ) async def get_gateway_config( gateway_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> dict: """Return the gateway configuration with sensitive values masked.""" from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) config = GatewayClientConfig( url=gateway.url, token=gateway.token, allow_insecure_tls=gateway.allow_insecure_tls, disable_device_pairing=gateway.disable_device_pairing, ) try: raw = await openclaw_call("config.get", config=config) except Exception: raw = None if isinstance(raw, dict): masked = {k: _mask_config_value(k, v) for k, v in raw.items()} else: masked = {} return { "gateway_id": str(gateway_id), "generated_at": utcnow().isoformat(), "config": masked, "available": bool(masked), } @router.patch( "/{gateway_id}/config", summary="Update gateway config (admin)", description=( "Apply partial config updates to the gateway using config.patch + config.apply. " "Secrets must be submitted in full — not masked values. " "Each call is audit-logged. Admin-only." ), ) async def patch_gateway_config( gateway_id: UUID, payload: dict, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> dict: """Apply config changes and write an audit event.""" from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) gw_config = GatewayClientConfig( url=gateway.url, token=gateway.token, allow_insecure_tls=gateway.allow_insecure_tls, disable_device_pairing=gateway.disable_device_pairing, ) if not payload: from fastapi import HTTPException as _HTTPException raise _HTTPException(status_code=400, detail="No config keys provided.") # Strip any masked placeholder values (user accidentally sent ••••xxxx) clean_payload = {k: v for k, v in payload.items() if not str(v).startswith("••••")} if not clean_payload: from fastapi import HTTPException as _HTTPException raise _HTTPException(status_code=400, detail="All values appear to be masked placeholders.") try: await openclaw_call("config.patch", clean_payload, config=gw_config) await openclaw_call("config.apply", config=gw_config) except Exception as exc: from fastapi import HTTPException as _HTTPException raise _HTTPException(status_code=502, detail=f"Gateway config update failed: {exc}") from exc # Audit: record what changed (mask secret values in the log message) changed_keys = list(clean_payload.keys()) masked_summary = ", ".join( f"{k}={_mask_config_value(k, v)}" for k, v in clean_payload.items() ) actor = str(ctx.member.user_id) async with async_session_maker() as audit_session: record_activity( audit_session, event_type="gateway.config.updated", message=f"Gateway config updated by user {actor} — changed: {masked_summary}", ) await audit_session.commit() return { "gateway_id": str(gateway_id), "updated_at": utcnow().isoformat(), "changed_keys": changed_keys, "applied": True, } @router.get( "/{gateway_id}/runtime-activity", summary="Recent gateway runtime messages (REST snapshot)", description=( "Return the most recent messages from all active sessions on the gateway, " "normalised and redacted. Use the streaming endpoint for a live feed." ), ) async def get_gateway_runtime_activity( gateway_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> dict: """Non-streaming snapshot of recent gateway activity.""" from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) config = GatewayClientConfig( url=gateway.url, token=gateway.token, allow_insecure_tls=gateway.allow_insecure_tls, disable_device_pairing=gateway.disable_device_pairing, ) events = await fetch_recent_events( config, max_sessions=POLL_HISTORY_SESSIONS_MAX, history_limit=HISTORY_FETCH_LIMIT, ) return { "gateway_id": str(gateway_id), "generated_at": utcnow().isoformat(), "events": [e.to_dict() for e in events], } _RUNTIME_ACTIVITY_POLL_SECONDS = 4 _RUNTIME_ACTIVITY_SEEN_MAX = 1000 @router.get( "/{gateway_id}/runtime-activity/stream", summary="Live gateway runtime activity (SSE)", description=( "Stream normalised gateway session messages as server-sent events. " "Each event is of type ``runtime_message``. The stream polls the " "gateway every few seconds and emits only new messages." ), ) async def stream_gateway_runtime_activity( request: Request, gateway_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> EventSourceResponse: """SSE stream of live gateway session messages.""" from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) config = GatewayClientConfig( url=gateway.url, token=gateway.token, allow_insecure_tls=gateway.allow_insecure_tls, disable_device_pairing=gateway.disable_device_pairing, ) async def event_generator(): from collections import deque seen_ids: set[str] = set() seen_queue: deque[str] = deque() initial = True while True: if await request.is_disconnected(): break events = await fetch_recent_events( config, since_ids=None if initial else seen_ids, max_sessions=POLL_HISTORY_SESSIONS_MAX, history_limit=HISTORY_FETCH_LIMIT, ) initial = False for event in events: if event.event_id in seen_ids: continue seen_ids.add(event.event_id) seen_queue.append(event.event_id) if len(seen_queue) > _RUNTIME_ACTIVITY_SEEN_MAX: oldest = seen_queue.popleft() seen_ids.discard(oldest) yield { "event": "runtime_message", "data": json.dumps(event.to_dict()), } await asyncio.sleep(_RUNTIME_ACTIVITY_POLL_SECONDS) return EventSourceResponse(event_generator(), ping=15) @router.delete("/{gateway_id}", response_model=OkResponse) async def delete_gateway( gateway_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> OkResponse: """Delete a gateway in the caller's organization.""" service = GatewayAdminLifecycleService(session) gateway = await service.require_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) main_agent = await service.find_main_agent(gateway) if main_agent is not None: await service.clear_agent_foreign_keys(agent_id=main_agent.id) await session.delete(main_agent) duplicate_main_agents = await Agent.objects.filter_by( gateway_id=gateway.id, board_id=None, ).all(session) for agent in duplicate_main_agents: if main_agent is not None and agent.id == main_agent.id: continue await service.clear_agent_foreign_keys(agent_id=agent.id) await session.delete(agent) # NOTE: The migration declares `ondelete="CASCADE"` for gateway_installed_skills.gateway_id, # but some backends/test environments (e.g. SQLite without FK pragma) may not # enforce cascades. Delete rows explicitly to guarantee cleanup semantics. installed_skills = await GatewayInstalledSkill.objects.filter_by( gateway_id=gateway.id, ).all(session) for installed_skill in installed_skills: await session.delete(installed_skill) await session.delete(gateway) await session.commit() return OkResponse()