Pipeline/backend/app/api/forgejo_connections.py

285 lines
11 KiB
Python

"""Thin API wrappers for Forgejo connection CRUD."""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import select
from app.api.deps import require_org_member
from app.db import crud
from app.db.session import get_session
from app.models.forgejo_connections import ForgejoConnection
from app.schemas.common import OkResponse
from app.schemas.forgejo_connections import (
ForgejoConnectionCreate,
ForgejoConnectionRead,
ForgejoConnectionUpdate,
)
from app.schemas.forgejo_validation import ForgejoConnectionValidationResponse
from app.services.organizations import OrganizationContext
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio.session import AsyncSession
router = APIRouter(prefix="/forgejo/connections", tags=["forgejo-connections"])
SESSION_DEP = Depends(get_session)
ORG_MEMBER_DEP = Depends(require_org_member)
def _extract_token_last_eight(token: str | None) -> str | None:
"""Extract last 8 characters of token for display."""
if not token:
return None
return token[-8:] if len(token) >= 8 else token
def _mask_connection(connection: ForgejoConnection) -> dict[str, object]:
"""Return connection dict with token removed but has_token and token_last_eight."""
return {
"id": connection.id,
"organization_id": connection.organization_id,
"name": connection.name,
"base_url": connection.base_url,
"active": connection.active,
"has_token": connection.token is not None,
"token_last_eight": _extract_token_last_eight(connection.token),
"created_at": connection.created_at,
"updated_at": connection.updated_at,
}
@router.get("", response_model=list[ForgejoConnectionRead])
async def list_connections(
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> list[ForgejoConnectionRead]:
"""List Forgejo connections for the caller's organization."""
statement = (
select(ForgejoConnection)
.where(ForgejoConnection.organization_id == ctx.organization.id)
.order_by(ForgejoConnection.created_at.desc())
)
connections = (await session.exec(statement)).all()
return [_mask_connection(c) for c in connections]
@router.post("", response_model=ForgejoConnectionRead)
async def create_connection(
payload: ForgejoConnectionCreate,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ForgejoConnectionRead:
"""Create a Forgejo connection for the caller's organization."""
data = payload.model_dump()
# Extract token_last_eight for storage
token = data.get("token")
if token:
data["token_last_eight"] = _extract_token_last_eight(token)
else:
data["token_last_eight"] = None
data["organization_id"] = ctx.organization.id
connection = await crud.create(session, ForgejoConnection, **data)
return _mask_connection(connection)
@router.get("/{connection_id}", response_model=ForgejoConnectionRead)
async def get_connection(
connection_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ForgejoConnectionRead:
"""Return one Forgejo connection by id for the caller's organization."""
connection = await crud.get_by_id(session, ForgejoConnection, connection_id)
if connection is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if connection.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return _mask_connection(connection)
@router.patch("/{connection_id}", response_model=ForgejoConnectionRead)
async def update_connection(
connection_id: UUID,
payload: ForgejoConnectionUpdate,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ForgejoConnectionRead:
"""Patch a Forgejo connection for the caller's organization."""
connection = await crud.get_by_id(session, ForgejoConnection, connection_id)
if connection is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if connection.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
updates = payload.model_dump(exclude_unset=True)
# Handle base_url normalization
if "base_url" in updates:
raw_url = updates["base_url"]
if raw_url:
raw_url = raw_url.strip()
if not raw_url.startswith(("http://", "https://")):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="base_url must be http:// or https://",
)
raw_url = raw_url.rstrip("/")
if "/api/v1" in raw_url:
import re
match = re.match(r"(https?://[^/]+)", raw_url)
if match:
raw_url = match.group(1).rstrip("/")
updates["base_url"] = raw_url
# Handle token update - empty string leaves existing unchanged
if "token" in updates:
raw_token = updates["token"]
if raw_token == "":
# Empty string - leave existing token unchanged
del updates["token"]
elif raw_token is not None:
updates["token_last_eight"] = _extract_token_last_eight(raw_token)
# Apply updates
for key, value in updates.items():
setattr(connection, key, value)
from app.core.time import utcnow
connection.updated_at = utcnow()
await crud.save(session, connection)
return _mask_connection(connection)
@router.delete("/{connection_id}", response_model=OkResponse)
async def delete_connection(
connection_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> OkResponse:
"""Delete a Forgejo connection for the caller's organization."""
connection = await crud.get_by_id(session, ForgejoConnection, connection_id)
if connection is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if connection.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
await session.delete(connection)
await session.commit()
return OkResponse()
@router.get(
"/{connection_id}/repos",
summary="List Available Repositories",
description="Return repositories the connection token can access on the Forgejo instance.",
)
async def list_connection_repos(
connection_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> list[dict[str, object]]:
"""List repositories accessible via this connection's token."""
connection = await crud.get_by_id(session, ForgejoConnection, connection_id)
if connection is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if connection.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if not connection.token:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Connection has no token configured.",
)
from app.services.forgejo_client import ForgejoClientError, get_forgejo_client
try:
async with get_forgejo_client(connection) as client:
repos = await client.list_user_repos()
except ForgejoClientError as e:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(e))
except Exception as e:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Failed to fetch repositories: {e}",
)
return [
{
"full_name": r.get("full_name", ""),
"name": r.get("name", ""),
"owner": (r.get("owner") or {}).get("login", ""),
"default_branch": r.get("default_branch", "main") or "main",
"description": r.get("description") or None,
"private": bool(r.get("private", False)),
"html_url": r.get("html_url", ""),
}
for r in repos
if r.get("full_name") and r.get("name")
]
@router.post(
"/{connection_id}/validate",
response_model=ForgejoConnectionValidationResponse,
summary="Validate Forgejo Connection",
description="Test if a Forgejo connection can authenticate and access the API.",
)
async def validate_connection(
connection_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ForgejoConnectionValidationResponse:
"""Validate a Forgejo connection by testing authenticated API access."""
connection = await crud.get_by_id(session, ForgejoConnection, connection_id)
if connection is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if connection.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if not connection.base_url:
from app.core.time import utcnow
from app.schemas.forgejo_validation import ValidationStatus
return ForgejoConnectionValidationResponse(
connection_id=str(connection.id),
status=ValidationStatus(ok=False, status="error", error_message="No base_url configured"),
response_time_ms=0.0,
validated_at=utcnow(),
)
from app.core.time import utcnow
from app.schemas.forgejo_validation import ValidationStatus
import time
start_time = time.time()
try:
from app.services.forgejo_client import get_forgejo_client
async with get_forgejo_client(connection) as client:
# Use /api/v1/user endpoint to validate authentication
await client.get_user()
response_time_ms = (time.time() - start_time) * 1000
return ForgejoConnectionValidationResponse(
connection_id=str(connection.id),
status=ValidationStatus(ok=True, status="ok"),
response_time_ms=response_time_ms,
validated_at=utcnow(),
)
except HTTPException as e:
response_time_ms = (time.time() - start_time) * 1000
return ForgejoConnectionValidationResponse(
connection_id=str(connection.id),
status=ValidationStatus(ok=False, status="error", error_message=str(e.detail)),
response_time_ms=response_time_ms,
validated_at=utcnow(),
)
except Exception as e:
response_time_ms = (time.time() - start_time) * 1000
return ForgejoConnectionValidationResponse(
connection_id=str(connection.id),
status=ValidationStatus(ok=False, status="error", error_message=str(e)),
response_time_ms=response_time_ms,
validated_at=utcnow(),
)