"""Thin API wrappers for Forgejo repository CRUD.""" from __future__ import annotations import time from typing import TYPE_CHECKING from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlmodel import col, select from app.api.deps import require_org_member from app.core.time import utcnow from app.db import crud from app.db.session import get_session from app.models.boards import Board from app.models.board_repository_links import BoardRepositoryLink from app.models.forgejo_connections import ForgejoConnection from app.models.forgejo_import_runs import ForgejoImportRun from app.models.forgejo_repositories import ForgejoRepository from app.schemas.boards import BoardRead from app.schemas.common import OkResponse from app.schemas.forgejo_repositories import ( ForgejoRepositoryCreate, ForgejoRepositoryRead, ForgejoRepositoryUpdate, MassImportRequest, MassImportResponse, MassImportRunRead, MassImportRepoResult, ) from app.schemas.forgejo_validation import ForgejoRepositoryValidationResponse, ValidationStatus from app.services.forgejo_client import get_forgejo_client from app.services.forgejo_cleanup import delete_repository_with_dependents from app.services.organizations import OrganizationContext if TYPE_CHECKING: from sqlmodel.ext.asyncio.session import AsyncSession router = APIRouter(prefix="/forgejo/repositories", tags=["forgejo-repositories"]) SESSION_DEP = Depends(get_session) ORG_MEMBER_DEP = Depends(require_org_member) def _create_connection_info(connection: ForgejoConnection) -> dict[str, object]: """Create safe connection metadata for repository responses.""" return { "id": connection.id, "organization_id": connection.organization_id, "name": connection.name, "base_url": connection.base_url, "has_token": connection.token is not None, "token_last_eight": ( connection.token[-8:] if connection.token and len(connection.token) >= 8 else connection.token ), "active": connection.active, } async def _repository_board_summary_map( session: AsyncSession, *, organization_id: UUID, repository_ids: list[UUID], ) -> dict[UUID, list[dict[str, object]]]: """Return linked board summaries keyed by repository id.""" if not repository_ids: return {} rows = ( await session.exec( select(BoardRepositoryLink.repository_id, Board.id, Board.name) .join(Board, BoardRepositoryLink.board_id == Board.id) .where( BoardRepositoryLink.organization_id == organization_id, col(BoardRepositoryLink.repository_id).in_(repository_ids), Board.organization_id == organization_id, ) .order_by(Board.name.asc()) ) ).all() result: dict[UUID, list[dict[str, object]]] = {} for repository_id, board_id, board_name in rows: result.setdefault(repository_id, []).append( { "id": board_id, "name": board_name, } ) return result def _mask_import_run(run: ForgejoImportRun) -> MassImportRunRead: """Return a typed persisted import run summary.""" return MassImportRunRead( id=run.id, organization_id=run.organization_id, requested_by_user_id=run.requested_by_user_id, repository_ids=[UUID(str(repository_id)) for repository_id in run.repository_ids], results=[MassImportRepoResult.model_validate(result) for result in run.results], total_created=run.total_created, total_updated=run.total_updated, total_stale_closed=run.total_stale_closed, succeeded=run.succeeded, failed=run.failed, started_at=run.started_at, finished_at=run.finished_at, duration_ms=run.duration_ms, created_at=run.created_at, ) @router.get("", response_model=list[ForgejoRepositoryRead]) async def list_repositories( session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> list[ForgejoRepositoryRead]: """List Forgejo repositories for the caller's organization.""" statement = ( select(ForgejoRepository) .where(ForgejoRepository.organization_id == ctx.organization.id) .order_by(ForgejoRepository.created_at.desc()) ) repositories = (await session.exec(statement)).all() # Fetch connections in batch for response building conn_ids = {r.connection_id for r in repositories} conn_map: dict[UUID, ForgejoConnection] = {} for cid in conn_ids: c = await crud.get_by_id(session, ForgejoConnection, cid) if c is not None: conn_map[cid] = c board_map = await _repository_board_summary_map( session, organization_id=ctx.organization.id, repository_ids=[r.id for r in repositories], ) result = [] for r in repositories: result.append( _mask_repository( r, conn_map.get(r.connection_id), linked_boards=board_map.get(r.id, []), ) ) return result @router.post("", response_model=ForgejoRepositoryRead) async def create_repository( payload: ForgejoRepositoryCreate, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> ForgejoRepositoryRead: """Create a Forgejo repository tracked for the caller's organization.""" # Validate connection belongs to caller's org connection = await crud.get_by_id(session, ForgejoConnection, payload.connection_id) if connection is None: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="connection_id is invalid", ) if connection.organization_id != ctx.organization.id: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="connection_id is invalid", ) # Check for duplicate active repo existing = await crud.get_one_by( session, ForgejoRepository, organization_id=ctx.organization.id, connection_id=payload.connection_id, owner=payload.owner, repo=payload.repo, ) if existing is not None: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"Repository {payload.owner}/{payload.repo} is already tracked", ) data = payload.model_dump() data["organization_id"] = ctx.organization.id # Ensure connection_id is included for foreign key relationship data["connection_id"] = payload.connection_id repository = await crud.create(session, ForgejoRepository, **data) # Load connection for response conn = await crud.get_by_id(session, ForgejoConnection, repository.connection_id) return _mask_repository(repository, conn) @router.post( "/import", response_model=MassImportResponse, summary="Mass Import Issues", description="Full issue import for all active repositories (or a subset). Updates existing records, closes stale open issues.", ) async def mass_import_repositories( payload: MassImportRequest | None = None, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> MassImportResponse: """Run a full sync across all (or selected) active repositories for the org.""" from app.services.forgejo_issue_sync import IssueSyncService started_at = utcnow() monotonic_start = time.monotonic() statement = select(ForgejoRepository).where( ForgejoRepository.organization_id == ctx.organization.id, ForgejoRepository.active == True, # noqa: E712 ) all_repos = (await session.exec(statement)).all() if payload and payload.repository_ids: id_set = set(payload.repository_ids) repos_to_import = [r for r in all_repos if r.id in id_set] else: repos_to_import = list(all_repos) sync_service = IssueSyncService(session=session, organization_id=ctx.organization.id) results: list[MassImportRepoResult] = [] total_created = 0 total_updated = 0 total_stale_closed = 0 succeeded = 0 failed = 0 for repo in repos_to_import: try: stats = await sync_service.sync_repository_issues(repository_id=repo.id) results.append( MassImportRepoResult( repository_id=repo.id, name=f"{repo.owner}/{repo.repo}", created=stats.get("created", 0), updated=stats.get("updated", 0), stale_closed=stats.get("stale_closed", 0), open=stats.get("open", 0), closed=stats.get("closed", 0), total=stats.get("total", 0), error=None, ) ) total_created += stats.get("created", 0) total_updated += stats.get("updated", 0) total_stale_closed += stats.get("stale_closed", 0) succeeded += 1 except Exception as e: results.append( MassImportRepoResult( repository_id=repo.id, name=f"{repo.owner}/{repo.repo}", error=str(e), ) ) failed += 1 finished_at = utcnow() run = await crud.create( session, ForgejoImportRun, organization_id=ctx.organization.id, requested_by_user_id=ctx.member.user_id, repository_ids=[str(repo.id) for repo in repos_to_import], results=[result.model_dump(mode="json") for result in results], total_created=total_created, total_updated=total_updated, total_stale_closed=total_stale_closed, succeeded=succeeded, failed=failed, started_at=started_at, finished_at=finished_at, duration_ms=int((time.monotonic() - monotonic_start) * 1000), created_at=finished_at, ) return MassImportResponse( results=results, total_created=total_created, total_updated=total_updated, total_stale_closed=total_stale_closed, succeeded=succeeded, failed=failed, run=_mask_import_run(run), ) @router.get("/import-runs", response_model=list[MassImportRunRead]) async def list_mass_import_runs( limit: int = Query(default=5, ge=1, le=25), session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> list[MassImportRunRead]: """Return recent persisted mass import run summaries.""" runs = ( await session.exec( select(ForgejoImportRun) .where(ForgejoImportRun.organization_id == ctx.organization.id) .order_by(col(ForgejoImportRun.created_at).desc()) .limit(limit) ) ).all() return [_mask_import_run(run) for run in runs] @router.get("/{repository_id}", response_model=ForgejoRepositoryRead) async def get_repository( repository_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> ForgejoRepositoryRead: """Return one Forgejo repository by id for the caller's organization.""" statement = select(ForgejoRepository).where( ForgejoRepository.id == repository_id, ForgejoRepository.organization_id == ctx.organization.id, ) repository = (await session.exec(statement)).first() if repository is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) # Load connection for response conn = await crud.get_by_id(session, ForgejoConnection, repository.connection_id) board_map = await _repository_board_summary_map( session, organization_id=ctx.organization.id, repository_ids=[repository.id], ) return _mask_repository( repository, conn, linked_boards=board_map.get(repository.id, []), ) @router.patch("/{repository_id}", response_model=ForgejoRepositoryRead) async def update_repository( repository_id: UUID, payload: ForgejoRepositoryUpdate, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> ForgejoRepositoryRead: """Patch a Forgejo repository for the caller's organization.""" # Get repository statement = select(ForgejoRepository).where( ForgejoRepository.id == repository_id, ForgejoRepository.organization_id == ctx.organization.id, ) repository = (await session.exec(statement)).first() if repository is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) # Load connection for updates validation conn = await crud.get_by_id(session, ForgejoConnection, repository.connection_id) if conn is None: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Referenced connection not found", ) updates = payload.model_dump(exclude_unset=True) # Handle connection_id validation if "connection_id" in updates: new_conn_id = updates["connection_id"] connection = await crud.get_by_id(session, ForgejoConnection, new_conn_id) if connection is None: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="connection_id is invalid", ) if connection.organization_id != ctx.organization.id: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="connection_id is invalid", ) # Check for duplicate active repo (same connection/owner/repo) if "owner" in updates or "repo" in updates or "connection_id" in updates: current_conn = repository.connection_id if "connection_id" in updates: current_conn = updates["connection_id"] current_owner = updates.get("owner", repository.owner) current_repo = updates.get("repo", repository.repo) existing = await crud.get_one_by( session, ForgejoRepository, organization_id=ctx.organization.id, connection_id=current_conn, owner=current_owner, repo=current_repo, ) if existing is not None and existing.id != repository.id: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"Repository {current_owner}/{current_repo} is already tracked", ) # Apply updates for key, value in updates.items(): setattr(repository, key, value) from app.core.time import utcnow repository.updated_at = utcnow() # Reload connection to get latest state await crud.save(session, repository) # Load connection for response conn = await crud.get_by_id(session, ForgejoConnection, repository.connection_id) board_map = await _repository_board_summary_map( session, organization_id=ctx.organization.id, repository_ids=[repository.id], ) return _mask_repository( repository, conn, linked_boards=board_map.get(repository.id, []), ) @router.delete("/{repository_id}", response_model=OkResponse) async def delete_repository( repository_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> OkResponse: """Delete a Forgejo repository for the caller's organization.""" repository = await crud.get_by_id(session, ForgejoRepository, repository_id) if repository is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) if repository.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) await delete_repository_with_dependents(session, repository) await session.commit() return OkResponse() @router.post( "/{repository_id}/validate", response_model=ForgejoRepositoryValidationResponse, summary="Validate Forgejo Repository", description="Test if a Forgejo repository exists and can be accessed with the connection token.", ) async def validate_repository( repository_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> ForgejoRepositoryValidationResponse: """Validate a Forgejo repository by testing API access.""" repository = await crud.get_by_id(session, ForgejoRepository, repository_id) if repository is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) if repository.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) # Load connection connection = await crud.get_by_id(session, ForgejoConnection, repository.connection_id) if connection is None or connection.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) from app.core.time import utcnow import time start_time = time.time() repo_exists = None try: async with get_forgejo_client(connection) as client: # Test if repository exists and is accessible await client.get_repository(owner=repository.owner, repo=repository.repo) repo_exists = True response_time_ms = (time.time() - start_time) * 1000 return ForgejoRepositoryValidationResponse( repository_id=str(repository.id), status=ValidationStatus(ok=True, status="ok"), response_time_ms=response_time_ms, validated_at=utcnow(), repo_exists=repo_exists, ) except HTTPException as e: response_time_ms = (time.time() - start_time) * 1000 return ForgejoRepositoryValidationResponse( repository_id=str(repository.id), status=ValidationStatus(ok=False, status="error", error_message=str(e.detail)), response_time_ms=response_time_ms, validated_at=utcnow(), repo_exists=False, ) except Exception as e: response_time_ms = (time.time() - start_time) * 1000 return ForgejoRepositoryValidationResponse( repository_id=str(repository.id), status=ValidationStatus(ok=False, status="error", error_message=str(e)), response_time_ms=response_time_ms, validated_at=utcnow(), repo_exists=False, ) @router.post( "/{repository_id}/sync", summary="Sync Issues from Repository", description="Sync issues from a Forgejo repository.", ) async def sync_repository_issues( repository_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> dict[str, int]: """Sync issues from a Forgejo repository.""" repository = await crud.get_by_id(session, ForgejoRepository, repository_id) if repository is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) if repository.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) from app.services.forgejo_issue_sync import IssueSyncService from app.core.time import utcnow try: sync_service = IssueSyncService(session=session, organization_id=ctx.organization.id) result = await sync_service.sync_repository_issues(repository_id=repository_id) return result except ValueError as e: # Update error on repository repository.last_sync_error = str(e) repository.updated_at = utcnow() await crud.save(session, repository) raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=str(e)) except Exception as e: # Update error on repository repository.last_sync_error = str(e) repository.updated_at = utcnow() await crud.save(session, repository) raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.post( "/{repository_id}/sync/recent", summary="Light Sync Issues from Repository", description="Fetch only issues updated in the last `days` days. Faster than full sync — uses batch DB lookups and skips per-issue enrichment calls.", ) async def sync_repository_issues_recent( repository_id: UUID, days: int = Query(default=7, ge=1, le=90), session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> dict[str, int]: """Light sync: fetch only issues updated in the last `days` days.""" repository = await crud.get_by_id(session, ForgejoRepository, repository_id) if repository is None or repository.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) from app.services.forgejo_issue_sync import IssueSyncService from app.core.time import utcnow try: sync_service = IssueSyncService(session=session, organization_id=ctx.organization.id) result = await sync_service.sync_recent(repository_id=repository_id, days=days) return result except ValueError as e: repository.last_sync_error = str(e) repository.updated_at = utcnow() await crud.save(session, repository) raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=str(e)) except Exception as e: repository.last_sync_error = str(e) repository.updated_at = utcnow() await crud.save(session, repository) raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) def _mask_repository( repository: ForgejoRepository, connection: ForgejoConnection | None = None, *, linked_boards: list[dict[str, object]] | None = None, ) -> dict[str, object]: """Return repository dict with safe connection metadata.""" return { "id": repository.id, "organization_id": repository.organization_id, "connection_id": repository.connection_id, "owner": repository.owner, "repo": repository.repo, "display_name": repository.display_name, "default_branch": repository.default_branch, "active": repository.active, "connection": _create_connection_info(connection) if connection is not None else None, "has_webhook_secret": bool(repository.webhook_secret), "description": repository.description, "open_issues_count": ( repository.open_issues_count if repository.open_issues_count is not None else 0 ), "is_archived": bool(repository.is_archived), "topics": repository.topics if repository.topics is not None else [], "labels": repository.labels if repository.labels is not None else [], "linked_boards": linked_boards or [], "last_sync_at": repository.last_sync_at, "last_sync_error": repository.last_sync_error, "created_at": repository.created_at, "updated_at": repository.updated_at, } @router.get("/{repository_id}/boards", response_model=list[BoardRead]) async def list_boards_for_repository( repository_id: UUID, session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> list[BoardRead]: """Return all boards that have this repository linked to them.""" repository = await crud.get_by_id(session, ForgejoRepository, repository_id) if repository is None or repository.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Repository not found") links = ( await session.exec( select(BoardRepositoryLink).where( BoardRepositoryLink.organization_id == ctx.organization.id, BoardRepositoryLink.repository_id == repository_id, ) ) ).all() if not links: return [] board_ids = [link.board_id for link in links] boards = ( await session.exec( select(Board).where( Board.id.in_(board_ids), Board.organization_id == ctx.organization.id, ) ) ).all() return [BoardRead.model_validate(b) for b in boards]