Pipeline/backend/app/api/forgejo_issues.py

460 lines
16 KiB
Python

"""API endpoints for Forgejo issue operations."""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import String, cast
from sqlmodel import func, select
from app.api.deps import require_org_member
from app.core.auth import AuthContext, get_auth_context
from app.db import crud
from app.db.session import get_session
from app.models.board_repository_links import BoardRepositoryLink
from app.models.forgejo_issues import ForgejoIssue
from app.schemas.forgejo_issues import (
CloseIssueResponse,
EditIssueRequest,
EditIssueResponse,
ForgejoIssueDetailRead,
ForgejoIssueListResponse,
ForgejoIssueRead,
PostCommentRequest,
PostCommentResponse,
)
from app.services.activity_log import record_activity
from app.services.forgejo_issue_close import (
CloseIssueAccessError,
CloseIssueNotFoundError,
CloseIssueRemoteError,
close_issue_by_id,
)
from app.services.forgejo_issue_comment import (
PostCommentNotFoundError,
PostCommentRemoteError,
post_comment_by_issue_id,
)
from app.services.forgejo_issue_edit import (
EditIssueNotFoundError,
EditIssueRemoteError,
edit_issue_by_id,
)
from app.services.organizations import OrganizationContext, list_accessible_board_ids
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
router = APIRouter(prefix="/forgejo/issues", tags=["forgejo-issues"])
SESSION_DEP = Depends(get_session)
AUTH_DEP = Depends(get_auth_context)
ORG_MEMBER_DEP = Depends(require_org_member)
@router.get("", response_model=ForgejoIssueListResponse)
async def list_issues(
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
repository_id: str | None = Query(None, description="Filter by repository ID"),
board_id: str | None = Query(
None, description="Filter by board ID (returns issues from all repos linked to this board)"
),
state: str | None = Query(None, description="Filter by state (open, closed)"),
label: str | None = Query(None, description="Filter by label name"),
assignee: str | None = Query(None, description="Filter by assignee login"),
search: str | None = Query(None, description="Search in title and body"),
page: int = Query(1, ge=1, description="Page number"),
limit: int = Query(30, ge=1, le=100, description="Items per page"),
) -> ForgejoIssueListResponse:
"""List cached issues with optional filters."""
# Build query with filters
statement = select(ForgejoIssue).where(
ForgejoIssue.organization_id == ctx.organization.id,
ForgejoIssue.is_pull_request.is_(False),
)
if board_id:
try:
board_uuid = UUID(board_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="Invalid board_id format"
)
linked_repo_ids = (
await session.exec(
select(BoardRepositoryLink.repository_id).where(
BoardRepositoryLink.board_id == board_uuid,
BoardRepositoryLink.organization_id == ctx.organization.id,
)
)
).all()
if not linked_repo_ids:
return ForgejoIssueListResponse(items=[], total=0, page=page, limit=limit)
statement = statement.where(ForgejoIssue.repository_id.in_(linked_repo_ids))
if repository_id:
try:
repo_uuid = UUID(repository_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Invalid repository_id format",
)
statement = statement.where(ForgejoIssue.repository_id == repo_uuid)
if state:
statement = statement.where(ForgejoIssue.state == state)
if label:
# Filter by label name — search within the JSON labels array cast to text
statement = statement.where(cast(ForgejoIssue.labels, String).ilike(f"%{label}%"))
if assignee:
# Filter by assignee login — search within the JSON assignees array cast to text
statement = statement.where(cast(ForgejoIssue.assignees, String).ilike(f"%{assignee}%"))
if search:
statement = statement.where(
(ForgejoIssue.title.ilike(f"%{search}%"))
| (ForgejoIssue.body_preview.ilike(f"%{search}%"))
| (ForgejoIssue.body.ilike(f"%{search}%"))
)
# Count total
total_statement = (
select(func.count())
.select_from(ForgejoIssue)
.where(
ForgejoIssue.organization_id == ctx.organization.id,
ForgejoIssue.is_pull_request.is_(False),
)
)
if board_id:
try:
board_uuid = UUID(board_id)
linked_repo_ids_for_count = (
await session.exec(
select(BoardRepositoryLink.repository_id).where(
BoardRepositoryLink.board_id == board_uuid,
BoardRepositoryLink.organization_id == ctx.organization.id,
)
)
).all()
if linked_repo_ids_for_count:
total_statement = total_statement.where(
ForgejoIssue.repository_id.in_(linked_repo_ids_for_count)
)
except ValueError:
pass
if repository_id:
try:
repo_uuid = UUID(repository_id)
total_statement = total_statement.where(ForgejoIssue.repository_id == repo_uuid)
except ValueError:
pass
if state:
total_statement = total_statement.where(ForgejoIssue.state == state)
if label:
total_statement = total_statement.where(
cast(ForgejoIssue.labels, String).ilike(f"%{label}%")
)
if assignee:
total_statement = total_statement.where(
cast(ForgejoIssue.assignees, String).ilike(f"%{assignee}%")
)
if search:
total_statement = total_statement.where(
(ForgejoIssue.title.ilike(f"%{search}%"))
| (ForgejoIssue.body_preview.ilike(f"%{search}%"))
| (ForgejoIssue.body.ilike(f"%{search}%"))
)
total_result = await session.exec(total_statement)
total = total_result.one()
# Pagination
offset = (page - 1) * limit
statement = (
statement.offset(offset).limit(limit).order_by(ForgejoIssue.forgejo_issue_number.desc())
)
issues = (await session.exec(statement)).all()
items = [ForgejoIssueRead.model_validate(issue) for issue in issues]
return ForgejoIssueListResponse(
items=items,
total=total,
page=page,
limit=limit,
)
@router.get("/{issue_id}", response_model=ForgejoIssueDetailRead)
async def get_issue(
issue_id: str,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ForgejoIssueDetailRead:
"""Get one cached issue by ID."""
try:
uuid = UUID(issue_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="Invalid issue_id format"
)
issue = await crud.get_by_id(session, ForgejoIssue, uuid)
if issue is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if issue.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return ForgejoIssueDetailRead.model_validate(issue)
@router.post(
"/{issue_id}/close",
response_model=CloseIssueResponse,
summary="Close a Forgejo issue (human user)",
description=(
"Close a Forgejo issue by its local ID. The user must have write access "
"to the board that the issue's repository is linked to."
),
responses={
status.HTTP_200_OK: {
"description": "Issue closed successfully",
"content": {
"application/json": {
"example": {
"success": True,
"issue_id": "123e4567-e89b-12d3-a456-426614174000",
"forgejo_issue_number": 42,
"state": "closed",
"forgejo_closed_at": "2026-05-19T03:43:00+00:00",
"last_synced_at": "2026-05-19T03:43:00+00:00",
}
}
},
},
status.HTTP_404_NOT_FOUND: {
"description": "Issue not found or not linked to a board",
},
status.HTTP_403_FORBIDDEN: {
"description": "User lacks write access to the board",
},
status.HTTP_409_CONFLICT: {
"description": "Organization mismatch or access denied",
},
status.HTTP_502_BAD_GATEWAY: {
"description": "Forgejo API call failed",
},
},
)
async def close_issue(
issue_id: str,
session: AsyncSession = SESSION_DEP,
auth: AuthContext = AUTH_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> CloseIssueResponse:
"""Close a Forgejo issue as an authenticated user.
The user must have write access to the board that the issue's repository
is linked to. The issue must belong to a repository linked to that board.
"""
try:
uuid = UUID(issue_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="Invalid issue_id format"
)
issue = await crud.get_by_id(session, ForgejoIssue, uuid)
if issue is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Issue not found")
if issue.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Issue not found")
if auth.user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
# Get boards linked to this issue's repository for this organization.
links = (
await session.exec(
select(BoardRepositoryLink).where(
BoardRepositoryLink.organization_id == ctx.organization.id,
BoardRepositoryLink.repository_id == issue.repository_id,
)
)
).all()
if not links:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Issue repository is not linked to any board",
)
allowed_board_ids = set(await list_accessible_board_ids(session, member=ctx.member, write=True))
authorized_board_id = next(
(link.board_id for link in links if link.board_id in allowed_board_ids),
None,
)
if authorized_board_id is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board access denied",
)
# Close the issue using the service.
try:
result = await close_issue_by_id(
session=session,
issue_id=uuid,
actor_user_id=auth.user.id,
)
except CloseIssueNotFoundError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
except CloseIssueAccessError as e:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
except CloseIssueRemoteError as e:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(e))
repository_full_name = str(result.get("repository_full_name") or "unknown/unknown")
record_activity(
session,
event_type="forgejo.issue.closed",
message=(
"Forgejo issue closed by user "
f"{auth.user.id}: {repository_full_name}#{result['forgejo_issue_number']}"
),
board_id=authorized_board_id,
)
await session.commit()
return CloseIssueResponse(
success=result["success"],
issue_id=result["issue_id"],
forgejo_issue_number=result["forgejo_issue_number"],
state=result["state"],
forgejo_closed_at=result.get("forgejo_closed_at"),
last_synced_at=result.get("last_synced_at") or "",
)
@router.post(
"/{issue_id}/comments",
response_model=PostCommentResponse,
summary="Post a comment on a Forgejo issue",
responses={
status.HTTP_404_NOT_FOUND: {"description": "Issue not found"},
status.HTTP_502_BAD_GATEWAY: {"description": "Forgejo API call failed"},
},
)
async def post_comment(
issue_id: str,
body: PostCommentRequest,
session: AsyncSession = SESSION_DEP,
auth: AuthContext = AUTH_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> PostCommentResponse:
"""Post a comment on a Forgejo issue as an authenticated user."""
try:
uuid = UUID(issue_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="Invalid issue_id format"
)
issue = await crud.get_by_id(session, ForgejoIssue, uuid)
if issue is None or issue.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Issue not found")
if auth.user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
try:
result = await post_comment_by_issue_id(
session=session,
issue_id=uuid,
body=body.body,
actor_user_id=auth.user.id,
)
except PostCommentNotFoundError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
except PostCommentRemoteError as e:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(e))
await session.commit()
return PostCommentResponse(
success=True,
issue_id=uuid,
comment_id=result.get("comment_id"),
body=body.body,
created_at=str(result.get("created_at") or ""),
)
@router.patch(
"/{issue_id}",
response_model=EditIssueResponse,
summary="Edit a Forgejo issue",
responses={
status.HTTP_404_NOT_FOUND: {"description": "Issue not found"},
status.HTTP_502_BAD_GATEWAY: {"description": "Forgejo API call failed"},
},
)
async def edit_issue(
issue_id: str,
body: EditIssueRequest,
session: AsyncSession = SESSION_DEP,
auth: AuthContext = AUTH_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> EditIssueResponse:
"""Edit a Forgejo issue's title, body, and/or state."""
try:
uuid = UUID(issue_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="Invalid issue_id format"
)
issue = await crud.get_by_id(session, ForgejoIssue, uuid)
if issue is None or issue.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Issue not found")
if auth.user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
if body.title is None and body.body is None and body.state is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="At least one field must be provided",
)
try:
result = await edit_issue_by_id(
session=session,
issue_id=uuid,
title=body.title,
body=body.body,
state=body.state,
actor_user_id=auth.user.id,
)
except EditIssueNotFoundError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
except EditIssueRemoteError as e:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(e))
record_activity(
session,
event_type="forgejo.issue.edited",
message=f"Forgejo issue edited by user {auth.user.id}: #{result['forgejo_issue_number']}",
)
await session.commit()
return EditIssueResponse(
success=True,
issue_id=uuid,
forgejo_issue_number=int(result["forgejo_issue_number"]),
title=str(result["title"]),
body=result.get("body") if isinstance(result.get("body"), str) else None,
state=str(result["state"]),
forgejo_updated_at=str(result["forgejo_updated_at"]),
)