"""Thin API wrappers for Forgejo connection CRUD.""" from __future__ import annotations from typing import TYPE_CHECKING from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, status from sqlmodel import select from app.api.deps import require_org_member from app.db import crud from app.db.session import get_session from app.models.forgejo_connections import ForgejoConnection from app.schemas.common import OkResponse from app.schemas.forgejo_connections import ( ForgejoConnectionCreate, ForgejoConnectionRead, ForgejoConnectionUpdate, ) from app.schemas.forgejo_validation import ForgejoConnectionValidationResponse from app.services.organizations import OrganizationContext if TYPE_CHECKING: from sqlalchemy.ext.asyncio.session import AsyncSession router = APIRouter(prefix="/forgejo/connections", tags=["forgejo-connections"]) SESSION_DEP = Depends(get_session) ORG_MEMBER_DEP = Depends(require_org_member) def _extract_token_last_eight(token: str | None) -> str | None: """Extract last 8 characters of token for display.""" if not token: return None return token[-8:] if len(token) >= 8 else token def _mask_connection(connection: ForgejoConnection) -> dict[str, object]: """Return connection dict with token removed but has_token and token_last_eight.""" return { "id": connection.id, "organization_id": connection.organization_id, "name": connection.name, "base_url": connection.base_url, "active": connection.active, "has_token": connection.token is not None, "token_last_eight": _extract_token_last_eight(connection.token), "created_at": connection.created_at, "updated_at": connection.updated_at, } @router.get("", response_model=list[ForgejoConnectionRead]) async def list_connections( session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> list[ForgejoConnectionRead]: """List Forgejo connections for the caller's organization.""" statement = ( select(ForgejoConnection) .where(ForgejoConnection.organization_id == ctx.organization.id) .order_by(ForgejoConnection.created_at.desc()) ) connections = (await session.exec(statement)).all() return [_mask_connection(c) for c in connections] @router.post("", response_model=ForgejoConnectionRead) async def create_connection( payload: ForgejoConnectionCreate, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> ForgejoConnectionRead: """Create a Forgejo connection for the caller's organization.""" data = payload.model_dump() # Extract token_last_eight for storage token = data.get("token") if token: data["token_last_eight"] = _extract_token_last_eight(token) else: data["token_last_eight"] = None data["organization_id"] = ctx.organization.id connection = await crud.create(session, ForgejoConnection, **data) return _mask_connection(connection) @router.get("/{connection_id}", response_model=ForgejoConnectionRead) async def get_connection( connection_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> ForgejoConnectionRead: """Return one Forgejo connection by id for the caller's organization.""" connection = await crud.get_by_id(session, ForgejoConnection, connection_id) if connection is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) if connection.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) return _mask_connection(connection) @router.patch("/{connection_id}", response_model=ForgejoConnectionRead) async def update_connection( connection_id: UUID, payload: ForgejoConnectionUpdate, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> ForgejoConnectionRead: """Patch a Forgejo connection for the caller's organization.""" connection = await crud.get_by_id(session, ForgejoConnection, connection_id) if connection is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) if connection.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) updates = payload.model_dump(exclude_unset=True) # Handle base_url normalization if "base_url" in updates: raw_url = updates["base_url"] if raw_url: raw_url = raw_url.strip() if not raw_url.startswith(("http://", "https://")): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="base_url must be http:// or https://", ) raw_url = raw_url.rstrip("/") if "/api/v1" in raw_url: import re match = re.match(r"(https?://[^/]+)", raw_url) if match: raw_url = match.group(1).rstrip("/") updates["base_url"] = raw_url # Handle token update - empty string leaves existing unchanged if "token" in updates: raw_token = updates["token"] if raw_token == "": # Empty string - leave existing token unchanged del updates["token"] elif raw_token is not None: updates["token_last_eight"] = _extract_token_last_eight(raw_token) # Apply updates for key, value in updates.items(): setattr(connection, key, value) from app.core.time import utcnow connection.updated_at = utcnow() await crud.save(session, connection) return _mask_connection(connection) @router.delete("/{connection_id}", response_model=OkResponse) async def delete_connection( connection_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> OkResponse: """Delete a Forgejo connection for the caller's organization.""" connection = await crud.get_by_id(session, ForgejoConnection, connection_id) if connection is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) if connection.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) await session.delete(connection) await session.commit() return OkResponse() @router.get( "/{connection_id}/repos", summary="List Available Repositories", description="Return repositories the connection token can access on the Forgejo instance.", ) async def list_connection_repos( connection_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> list[dict[str, object]]: """List repositories accessible via this connection's token.""" connection = await crud.get_by_id(session, ForgejoConnection, connection_id) if connection is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) if connection.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) if not connection.token: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="Connection has no token configured.", ) from app.services.forgejo_client import ForgejoClientError, get_forgejo_client try: async with get_forgejo_client(connection) as client: repos = await client.list_user_repos() except ForgejoClientError as e: raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(e)) except Exception as e: raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Failed to fetch repositories: {e}", ) return [ { "full_name": r.get("full_name", ""), "name": r.get("name", ""), "owner": (r.get("owner") or {}).get("login", ""), "default_branch": r.get("default_branch", "main") or "main", "description": r.get("description") or None, "private": bool(r.get("private", False)), "html_url": r.get("html_url", ""), } for r in repos if r.get("full_name") and r.get("name") ] @router.post( "/{connection_id}/validate", response_model=ForgejoConnectionValidationResponse, summary="Validate Forgejo Connection", description="Test if a Forgejo connection can authenticate and access the API.", ) async def validate_connection( connection_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> ForgejoConnectionValidationResponse: """Validate a Forgejo connection by testing authenticated API access.""" connection = await crud.get_by_id(session, ForgejoConnection, connection_id) if connection is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) if connection.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) if not connection.base_url: from app.core.time import utcnow from app.schemas.forgejo_validation import ValidationStatus return ForgejoConnectionValidationResponse( connection_id=str(connection.id), status=ValidationStatus(ok=False, status="error", error_message="No base_url configured"), response_time_ms=0.0, validated_at=utcnow(), ) from app.core.time import utcnow from app.schemas.forgejo_validation import ValidationStatus import time start_time = time.time() try: from app.services.forgejo_client import get_forgejo_client async with get_forgejo_client(connection) as client: # Use /api/v1/user endpoint to validate authentication await client.get_user() response_time_ms = (time.time() - start_time) * 1000 return ForgejoConnectionValidationResponse( connection_id=str(connection.id), status=ValidationStatus(ok=True, status="ok"), response_time_ms=response_time_ms, validated_at=utcnow(), ) except HTTPException as e: response_time_ms = (time.time() - start_time) * 1000 return ForgejoConnectionValidationResponse( connection_id=str(connection.id), status=ValidationStatus(ok=False, status="error", error_message=str(e.detail)), response_time_ms=response_time_ms, validated_at=utcnow(), ) except Exception as e: response_time_ms = (time.time() - start_time) * 1000 return ForgejoConnectionValidationResponse( connection_id=str(connection.id), status=ValidationStatus(ok=False, status="error", error_message=str(e)), response_time_ms=response_time_ms, validated_at=utcnow(), )