"""Forgejo webhook ingest endpoints for cached issue updates.""" from __future__ import annotations import hashlib import hmac import json from datetime import UTC, datetime from typing import TYPE_CHECKING, Any from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Request, status from sqlmodel import select from app.core.config import settings from app.core.logging import get_logger from app.core.time import utcnow from app.db import crud from app.db.session import get_session from app.models.board_repository_links import BoardRepositoryLink from app.models.forgejo_connections import ForgejoConnection from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_repositories import ForgejoRepository from app.schemas.forgejo_webhooks import ForgejoWebhookIngestResponse from app.services.activity_log import record_activity from app.services.forgejo_client import get_forgejo_client if TYPE_CHECKING: from sqlmodel.ext.asyncio.session import AsyncSession router = APIRouter(prefix="/forgejo/webhooks", tags=["forgejo-webhooks"]) SESSION_DEP = Depends(get_session) logger = get_logger(__name__) SUPPORTED_ISSUE_ACTIONS = {"opened", "edited", "closed", "reopened"} SIGNATURE_HEADERS = ( "x-forgejo-signature", "x-gitea-signature", "x-hub-signature-256", ) def _normalize_signature(value: str) -> str: value = value.strip().lower() if value.startswith("sha256="): return value[7:] return value def _verify_signature( *, secret: str | None, raw_body: bytes, request: Request, ) -> None: if not secret: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Forgejo webhook secret is not configured.", ) signature = next( ( request.headers.get(header) for header in SIGNATURE_HEADERS if request.headers.get(header) ), None, ) if not signature: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Missing Forgejo webhook signature.", ) expected = hmac.new(secret.encode("utf-8"), raw_body, hashlib.sha256).hexdigest() if not hmac.compare_digest(_normalize_signature(signature), expected): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid Forgejo webhook signature.", ) async def _read_limited_body(request: Request) -> bytes: max_payload_bytes = settings.webhook_max_payload_bytes content_length = request.headers.get("content-length") try: parsed_content_length = int(content_length) if content_length else 0 except (TypeError, ValueError): parsed_content_length = 0 if parsed_content_length > max_payload_bytes: raise HTTPException( status_code=status.HTTP_413_CONTENT_TOO_LARGE, detail=f"Payload exceeds maximum size of {max_payload_bytes} bytes.", ) chunks: list[bytes] = [] total_size = 0 async for chunk in request.stream(): total_size += len(chunk) if total_size > max_payload_bytes: raise HTTPException( status_code=status.HTTP_413_CONTENT_TOO_LARGE, detail=f"Payload exceeds maximum size of {max_payload_bytes} bytes.", ) chunks.append(chunk) return b"".join(chunks) def _decode_json_body(raw_body: bytes) -> dict[str, Any]: if not raw_body: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Webhook payload is required.", ) try: parsed = json.loads(raw_body.decode("utf-8")) except (UnicodeDecodeError, json.JSONDecodeError) as exc: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Webhook payload must be valid JSON.", ) from exc if not isinstance(parsed, dict): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Webhook payload must be a JSON object.", ) return parsed def _parse_iso_datetime(value: object) -> datetime: if not isinstance(value, str) or not value.strip(): return utcnow() normalized = value.strip().replace("Z", "+00:00") try: parsed = datetime.fromisoformat(normalized) except ValueError: return utcnow() if parsed.tzinfo is not None: return parsed.astimezone(UTC).replace(tzinfo=None) return parsed def _parse_optional_iso_datetime(value: object) -> datetime | None: if not isinstance(value, str) or not value.strip(): return None normalized = value.strip().replace("Z", "+00:00") try: parsed = datetime.fromisoformat(normalized) except ValueError: return None if parsed.tzinfo is not None: return parsed.astimezone(UTC).replace(tzinfo=None) return parsed def _issue_number(issue_data: dict[str, Any]) -> int: number = issue_data.get("number") if isinstance(number, int): return number if isinstance(number, str) and number.isdigit(): return int(number) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Issue payload is missing a valid number.", ) def _is_pull_request_payload( *, payload: dict[str, Any], issue_data: dict[str, Any], request: Request, ) -> bool: webhook_event = ( request.headers.get("x-forgejo-event") or request.headers.get("x-gitea-event") or request.headers.get("x-github-event") or "" ) return bool( payload.get("pull_request") or issue_data.get("pull_request") or payload.get("event_type") == "pull_request" or webhook_event.lower() == "pull_request" ) def _repository_mismatch( *, repository: ForgejoRepository, payload: dict[str, Any], ) -> bool: raw_repository = payload.get("repository") if not isinstance(raw_repository, dict): return False full_name = raw_repository.get("full_name") if isinstance(full_name, str) and full_name.strip(): return full_name.strip().lower() != f"{repository.owner}/{repository.repo}".lower() name = raw_repository.get("name") owner = raw_repository.get("owner") owner_name = owner.get("login") if isinstance(owner, dict) else owner if isinstance(name, str) and isinstance(owner_name, str): return ( name.strip().lower() != repository.repo.lower() or owner_name.strip().lower() != repository.owner.lower() ) return False def _labels(issue_data: dict[str, Any]) -> Any: labels: list[dict[str, object]] = [] for label in issue_data.get("labels") or []: if not isinstance(label, dict): continue labels.append( { "name": str(label.get("name") or ""), "color": str(label.get("color") or ""), "description": str(label.get("description") or ""), } ) return labels def _assignees(issue_data: dict[str, Any]) -> Any: assignees: list[dict[str, object]] = [] for assignee in issue_data.get("assignees") or []: if not isinstance(assignee, dict): continue assignees.append( { "login": str(assignee.get("login") or ""), "id": assignee.get("id") or 0, "avatar_url": str(assignee.get("avatar_url") or ""), } ) return assignees def _milestone(issue_data: dict[str, Any]) -> dict[str, object] | None: raw = issue_data.get("milestone") if not isinstance(raw, dict): return None return { "id": raw.get("id"), "title": str(raw.get("title") or ""), "state": str(raw.get("state") or "open"), "description": raw.get("description") or None, "due_on": raw.get("due_on") or None, "closed_at": raw.get("closed_at") or None, } def _author(issue_data: dict[str, Any]) -> str: user = issue_data.get("user") if isinstance(user, dict): login = user.get("login") if isinstance(login, str): return login return "" def _issue_state(*, action: str, issue_data: dict[str, Any]) -> str: if action == "closed": return "closed" if action == "reopened": return "open" state = issue_data.get("state") return state if isinstance(state, str) and state else "open" async def _linked_board_ids( *, session: AsyncSession, repository_id: UUID, ) -> list[UUID]: rows = ( await session.exec( select(BoardRepositoryLink.board_id).where( BoardRepositoryLink.repository_id == repository_id ) ) ).all() return list(rows) async def _upsert_issue( *, session: AsyncSession, repository: ForgejoRepository, action: str, issue_data: dict[str, Any], issue_payload: dict[str, Any] | None = None, comments_payload: list[dict[str, object]] | None = None, timeline_payload: list[dict[str, object]] | None = None, reactions_payload: list[dict[str, object]] | None = None, ) -> tuple[ForgejoIssue, bool]: payload = issue_payload if issue_payload is not None else issue_data number = _issue_number(issue_data) existing = ( await session.exec( select(ForgejoIssue).where( ForgejoIssue.repository_id == repository.id, ForgejoIssue.forgejo_issue_number == number, ) ) ).first() state = _issue_state(action=action, issue_data=issue_data) closed_at = ( None if state == "open" else _parse_optional_iso_datetime(issue_data.get("closed_at")) or utcnow() ) raw_body = payload.get("body") or "" body_text = str(raw_body) if raw_body else None milestone = _milestone(payload) if existing is None: issue = ForgejoIssue( organization_id=repository.organization_id, repository_id=repository.id, forgejo_issue_number=number, title=str(issue_data.get("title") or ""), body=body_text, body_preview=str(raw_body)[:1000] if raw_body else None, state=state, is_pull_request=False, labels=_labels(payload), assignees=_assignees(payload), milestone=milestone, forgejo_payload=dict(payload), forgejo_comments_payload=comments_payload or [], forgejo_timeline_payload=timeline_payload or [], forgejo_reactions_payload=reactions_payload or [], author=_author(payload), html_url=str(payload.get("html_url") or ""), forgejo_created_at=_parse_iso_datetime(payload.get("created_at")), forgejo_updated_at=_parse_iso_datetime(payload.get("updated_at")), forgejo_closed_at=closed_at, ) session.add(issue) await session.flush() return issue, True existing.title = str(issue_data.get("title") or "") existing.body = body_text existing.body_preview = str(raw_body)[:1000] if raw_body else None existing.milestone = milestone existing.state = state existing.is_pull_request = False existing.labels = _labels(payload) existing.assignees = _assignees(payload) existing.forgejo_payload = dict(payload) if comments_payload is not None: existing.forgejo_comments_payload = comments_payload if timeline_payload is not None: existing.forgejo_timeline_payload = timeline_payload if reactions_payload is not None: existing.forgejo_reactions_payload = reactions_payload existing.author = _author(payload) existing.html_url = str(payload.get("html_url") or "") existing.forgejo_created_at = _parse_iso_datetime(payload.get("created_at")) existing.forgejo_updated_at = _parse_iso_datetime(payload.get("updated_at")) existing.forgejo_closed_at = closed_at existing.last_synced_at = utcnow() existing.updated_at = utcnow() session.add(existing) await session.flush() return existing, False def _activity_message( *, action: str, repository: ForgejoRepository, issue: ForgejoIssue, ) -> str: return ( f"Forgejo issue {action}: " f"{repository.owner}/{repository.repo}#{issue.forgejo_issue_number} - {issue.title}" ) async def _record_issue_activity( *, session: AsyncSession, action: str, repository: ForgejoRepository, issue: ForgejoIssue, ) -> None: if action not in {"closed", "reopened"}: return board_ids = await _linked_board_ids(session=session, repository_id=repository.id) if not board_ids: record_activity( session, event_type=f"forgejo.issue.{action}", message=_activity_message( action=action, repository=repository, issue=issue, ), ) return for board_id in board_ids: record_activity( session, event_type=f"forgejo.issue.{action}", message=_activity_message( action=action, repository=repository, issue=issue, ), board_id=board_id, ) @router.post( "/{repository_id}", response_model=ForgejoWebhookIngestResponse, status_code=status.HTTP_202_ACCEPTED, ) async def ingest_forgejo_webhook( request: Request, repository_id: UUID, session: AsyncSession = SESSION_DEP, ) -> ForgejoWebhookIngestResponse: """Receive Forgejo issue webhooks and update cached issue records.""" repository = await crud.get_by_id(session, ForgejoRepository, repository_id) if repository is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) if not repository.active: raise HTTPException( status_code=status.HTTP_410_GONE, detail="Forgejo repository is inactive.", ) raw_body = await _read_limited_body(request) _verify_signature( secret=repository.webhook_secret, raw_body=raw_body, request=request, ) payload = _decode_json_body(raw_body) action = payload.get("action") action = action if isinstance(action, str) else None if action not in SUPPORTED_ISSUE_ACTIONS: return ForgejoWebhookIngestResponse( repository_id=repository.id, action=action, ignored=True, reason="unsupported_action", ) if _repository_mismatch(repository=repository, payload=payload): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="Webhook repository does not match tracked repository.", ) issue_data = payload.get("issue") if not isinstance(issue_data, dict): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Webhook payload is missing issue data.", ) if _is_pull_request_payload(payload=payload, issue_data=issue_data, request=request): return ForgejoWebhookIngestResponse( repository_id=repository.id, action=action, ignored=True, reason="pull_request_ignored", ) issue_number = _issue_number(issue_data) issue_payload_for_cache: dict[str, Any] = dict(issue_data) comments_payload: list[dict[str, object]] | None = None timeline_payload: list[dict[str, object]] | None = None reactions_payload: list[dict[str, object]] | None = None connection = await crud.get_by_id(session, ForgejoConnection, repository.connection_id) if connection is not None and connection.token: try: async with get_forgejo_client(connection) as client: remote_issue = await client.get_issue( owner=repository.owner, repo=repository.repo, issue_number=issue_number, ) if isinstance(remote_issue, dict): issue_payload_for_cache = dict(remote_issue) raw_comments = await client.list_issue_comments( owner=repository.owner, repo=repository.repo, issue_number=issue_number, ) raw_timeline = await client.list_issue_timeline( owner=repository.owner, repo=repository.repo, issue_number=issue_number, ) raw_reactions = await client.list_issue_reactions( owner=repository.owner, repo=repository.repo, issue_number=issue_number, ) comments_payload = [item for item in raw_comments if isinstance(item, dict)] timeline_payload = [item for item in raw_timeline if isinstance(item, dict)] reactions_payload = [item for item in raw_reactions if isinstance(item, dict)] except Exception as exc: logger.warning( "forgejo.webhook.issue.enrichment_failed", extra={ "repository_id": str(repository.id), "issue_number": issue_number, "error": str(exc), }, ) issue, created = await _upsert_issue( session=session, repository=repository, action=action, issue_data=issue_data, issue_payload=issue_payload_for_cache, comments_payload=comments_payload, timeline_payload=timeline_payload, reactions_payload=reactions_payload, ) await _record_issue_activity( session=session, action=action, repository=repository, issue=issue, ) await session.commit() logger.info( "forgejo.webhook.issue.updated", extra={ "repository_id": str(repository.id), "issue_id": str(issue.id), "issue_number": issue.forgejo_issue_number, "action": action, "issue_created": created, }, ) return ForgejoWebhookIngestResponse( repository_id=repository.id, action=action, issue_id=issue.id, issue_number=issue.forgejo_issue_number, ignored=False, )