Pipeline/backend/app/api/forgejo_repositories.py

352 lines
14 KiB
Python

"""Thin API wrappers for Forgejo repository CRUD."""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import select
from app.api.deps import require_org_member
from app.db import crud
from app.db.session import get_session
from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_repositories import ForgejoRepository
from app.schemas.common import OkResponse
from app.schemas.forgejo_repositories import (
ForgejoRepositoryCreate,
ForgejoRepositoryRead,
ForgejoRepositoryUpdate,
)
from app.schemas.forgejo_validation import ForgejoRepositoryValidationResponse, ValidationStatus
from app.services.forgejo_client import get_forgejo_client
from app.services.organizations import OrganizationContext
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
router = APIRouter(prefix="/forgejo/repositories", tags=["forgejo-repositories"])
SESSION_DEP = Depends(get_session)
ORG_MEMBER_DEP = Depends(require_org_member)
def _create_connection_info(connection: ForgejoConnection) -> dict[str, object]:
"""Create safe connection metadata for repository responses."""
return {
"id": connection.id,
"organization_id": connection.organization_id,
"name": connection.name,
"base_url": connection.base_url,
"has_token": connection.token is not None,
"token_last_eight": connection.token[-8:] if connection.token and len(connection.token) >= 8 else connection.token,
"active": connection.active,
}
@router.get("", response_model=list[ForgejoRepositoryRead])
async def list_repositories(
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> list[ForgejoRepositoryRead]:
"""List Forgejo repositories for the caller's organization."""
statement = (
select(ForgejoRepository)
.where(ForgejoRepository.organization_id == ctx.organization.id)
.order_by(ForgejoRepository.created_at.desc())
)
repositories = (await session.exec(statement)).all()
# Fetch connections in batch for response building
conn_ids = {r.connection_id for r in repositories}
conn_map: dict[UUID, ForgejoConnection] = {}
for cid in conn_ids:
c = await crud.get_by_id(session, ForgejoConnection, cid)
if c is not None:
conn_map[cid] = c
result = []
for r in repositories:
result.append(_mask_repository(r, conn_map.get(r.connection_id)))
return result
@router.post("", response_model=ForgejoRepositoryRead)
async def create_repository(
payload: ForgejoRepositoryCreate,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ForgejoRepositoryRead:
"""Create a Forgejo repository tracked for the caller's organization."""
# Validate connection belongs to caller's org
connection = await crud.get_by_id(session, ForgejoConnection, payload.connection_id)
if connection is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="connection_id is invalid",
)
if connection.organization_id != ctx.organization.id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="connection_id is invalid",
)
# Check for duplicate active repo
existing = await crud.get_one_by(
session,
ForgejoRepository,
organization_id=ctx.organization.id,
connection_id=payload.connection_id,
owner=payload.owner,
repo=payload.repo,
)
if existing is not None:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Repository {payload.owner}/{payload.repo} is already tracked",
)
data = payload.model_dump()
data["organization_id"] = ctx.organization.id
# Ensure connection_id is included for foreign key relationship
data["connection_id"] = payload.connection_id
repository = await crud.create(session, ForgejoRepository, **data)
# Load connection for response
conn = await crud.get_by_id(session, ForgejoConnection, repository.connection_id)
return _mask_repository(repository, conn)
@router.get("/{repository_id}", response_model=ForgejoRepositoryRead)
async def get_repository(
repository_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ForgejoRepositoryRead:
"""Return one Forgejo repository by id for the caller's organization."""
statement = (
select(ForgejoRepository)
.where(ForgejoRepository.id == repository_id, ForgejoRepository.organization_id == ctx.organization.id)
)
repository = (await session.exec(statement)).first()
if repository is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
# Load connection for response
conn = await crud.get_by_id(session, ForgejoConnection, repository.connection_id)
return _mask_repository(repository, conn)
@router.patch("/{repository_id}", response_model=ForgejoRepositoryRead)
async def update_repository(
repository_id: UUID,
payload: ForgejoRepositoryUpdate,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ForgejoRepositoryRead:
"""Patch a Forgejo repository for the caller's organization."""
# Get repository
statement = (
select(ForgejoRepository)
.where(ForgejoRepository.id == repository_id, ForgejoRepository.organization_id == ctx.organization.id)
)
repository = (await session.exec(statement)).first()
if repository is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
# Load connection for updates validation
conn = await crud.get_by_id(session, ForgejoConnection, repository.connection_id)
if conn is None:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Referenced connection not found")
# Attach connection for updates validation
repository.connection = conn
updates = payload.model_dump(exclude_unset=True)
# Handle connection_id validation
if "connection_id" in updates:
new_conn_id = updates["connection_id"]
connection = await crud.get_by_id(session, ForgejoConnection, new_conn_id)
if connection is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="connection_id is invalid",
)
if connection.organization_id != ctx.organization.id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="connection_id is invalid",
)
# Check for duplicate active repo (same connection/owner/repo)
if "owner" in updates or "repo" in updates or "connection_id" in updates:
current_conn = repository.connection_id
if "connection_id" in updates:
current_conn = updates["connection_id"]
current_owner = updates.get("owner", repository.owner)
current_repo = updates.get("repo", repository.repo)
existing = await crud.get_one_by(
session,
ForgejoRepository,
organization_id=ctx.organization.id,
connection_id=current_conn,
owner=current_owner,
repo=current_repo,
)
if existing is not None and existing.id != repository.id:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Repository {current_owner}/{current_repo} is already tracked",
)
# Apply updates
for key, value in updates.items():
setattr(repository, key, value)
from app.core.time import utcnow
repository.updated_at = utcnow()
# Reload connection to get latest state
await crud.save(session, repository)
# Load connection for response
conn = await crud.get_by_id(session, ForgejoConnection, repository.connection_id)
return _mask_repository(repository, conn)
@router.delete("/{repository_id}", response_model=OkResponse)
async def delete_repository(
repository_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> OkResponse:
"""Delete a Forgejo repository for the caller's organization."""
repository = await crud.get_by_id(session, ForgejoRepository, repository_id)
if repository is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if repository.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
await session.delete(repository)
await session.commit()
return OkResponse()
@router.post(
"/{repository_id}/validate",
response_model=ForgejoRepositoryValidationResponse,
summary="Validate Forgejo Repository",
description="Test if a Forgejo repository exists and can be accessed with the connection token.",
)
async def validate_repository(
repository_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ForgejoRepositoryValidationResponse:
"""Validate a Forgejo repository by testing API access."""
repository = await crud.get_by_id(session, ForgejoRepository, repository_id)
if repository is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if repository.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
# Load connection
connection = await crud.get_by_id(session, ForgejoConnection, repository.connection_id)
if connection is None or connection.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
from app.core.time import utcnow
import time
start_time = time.time()
repo_exists = None
try:
async with get_forgejo_client(connection) as client:
# Test if repository exists and is accessible
await client.get_repository(owner=repository.owner, repo=repository.repo)
repo_exists = True
response_time_ms = (time.time() - start_time) * 1000
return ForgejoRepositoryValidationResponse(
repository_id=str(repository.id),
status=ValidationStatus(ok=True, status="ok"),
response_time_ms=response_time_ms,
validated_at=utcnow(),
repo_exists=repo_exists,
)
except HTTPException as e:
response_time_ms = (time.time() - start_time) * 1000
return ForgejoRepositoryValidationResponse(
repository_id=str(repository.id),
status=ValidationStatus(ok=False, status="error", error_message=str(e.detail)),
response_time_ms=response_time_ms,
validated_at=utcnow(),
repo_exists=False,
)
except Exception as e:
response_time_ms = (time.time() - start_time) * 1000
return ForgejoRepositoryValidationResponse(
repository_id=str(repository.id),
status=ValidationStatus(ok=False, status="error", error_message=str(e)),
response_time_ms=response_time_ms,
validated_at=utcnow(),
repo_exists=False,
)
@router.post(
"/{repository_id}/sync",
summary="Sync Issues from Repository",
description="Sync issues from a Forgejo repository.",
)
async def sync_repository_issues(
repository_id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> dict[str, int]:
"""Sync issues from a Forgejo repository."""
repository = await crud.get_by_id(session, ForgejoRepository, repository_id)
if repository is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if repository.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
from app.services.forgejo_issue_sync import IssueSyncService
from app.core.time import utcnow
try:
sync_service = IssueSyncService(session=session, organization_id=ctx.organization.id)
result = await sync_service.sync_repository_issues(repository_id=repository_id)
return result
except ValueError as e:
# Update error on repository
repository.last_sync_error = str(e)
repository.updated_at = utcnow()
await crud.save(session, repository)
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=str(e))
except Exception as e:
# Update error on repository
repository.last_sync_error = str(e)
repository.updated_at = utcnow()
await crud.save(session, repository)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
def _mask_repository(repository: ForgejoRepository, connection: ForgejoConnection | None = None) -> dict[str, object]:
"""Return repository dict with safe connection metadata."""
return {
"id": repository.id,
"organization_id": repository.organization_id,
"connection_id": repository.connection_id,
"owner": repository.owner,
"repo": repository.repo,
"display_name": repository.display_name,
"default_branch": repository.default_branch,
"active": repository.active,
"connection": _create_connection_info(connection) if connection is not None else None,
"has_webhook_secret": bool(repository.webhook_secret),
"description": repository.description,
"open_issues_count": repository.open_issues_count if repository.open_issues_count is not None else 0,
"is_archived": bool(repository.is_archived),
"topics": repository.topics if repository.topics is not None else [],
"labels": repository.labels if repository.labels is not None else [],
"last_sync_at": repository.last_sync_at,
"last_sync_error": repository.last_sync_error,
"created_at": repository.created_at,
"updated_at": repository.updated_at,
}