Pipeline/backend/app/api/forgejo_webhooks.py

556 lines
18 KiB
Python

"""Forgejo webhook ingest endpoints for cached issue updates."""
from __future__ import annotations
import hashlib
import hmac
import json
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlmodel import select
from app.core.config import settings
from app.core.logging import get_logger
from app.core.time import utcnow
from app.db import crud
from app.db.session import get_session
from app.models.board_repository_links import BoardRepositoryLink
from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
from app.schemas.forgejo_webhooks import ForgejoWebhookIngestResponse
from app.services.activity_log import record_activity
from app.services.forgejo_client import get_forgejo_client
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
router = APIRouter(prefix="/forgejo/webhooks", tags=["forgejo-webhooks"])
SESSION_DEP = Depends(get_session)
logger = get_logger(__name__)
SUPPORTED_ISSUE_ACTIONS = {"opened", "edited", "closed", "reopened"}
SIGNATURE_HEADERS = (
"x-forgejo-signature",
"x-gitea-signature",
"x-hub-signature-256",
)
def _normalize_signature(value: str) -> str:
value = value.strip().lower()
if value.startswith("sha256="):
return value[7:]
return value
def _verify_signature(
*,
secret: str | None,
raw_body: bytes,
request: Request,
) -> None:
if not secret:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Forgejo webhook secret is not configured.",
)
signature = next(
(
request.headers.get(header)
for header in SIGNATURE_HEADERS
if request.headers.get(header)
),
None,
)
if not signature:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Missing Forgejo webhook signature.",
)
expected = hmac.new(secret.encode("utf-8"), raw_body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(_normalize_signature(signature), expected):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid Forgejo webhook signature.",
)
async def _read_limited_body(request: Request) -> bytes:
max_payload_bytes = settings.webhook_max_payload_bytes
content_length = request.headers.get("content-length")
try:
parsed_content_length = int(content_length) if content_length else 0
except (TypeError, ValueError):
parsed_content_length = 0
if parsed_content_length > max_payload_bytes:
raise HTTPException(
status_code=status.HTTP_413_CONTENT_TOO_LARGE,
detail=f"Payload exceeds maximum size of {max_payload_bytes} bytes.",
)
chunks: list[bytes] = []
total_size = 0
async for chunk in request.stream():
total_size += len(chunk)
if total_size > max_payload_bytes:
raise HTTPException(
status_code=status.HTTP_413_CONTENT_TOO_LARGE,
detail=f"Payload exceeds maximum size of {max_payload_bytes} bytes.",
)
chunks.append(chunk)
return b"".join(chunks)
def _decode_json_body(raw_body: bytes) -> dict[str, Any]:
if not raw_body:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload is required.",
)
try:
parsed = json.loads(raw_body.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload must be valid JSON.",
) from exc
if not isinstance(parsed, dict):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload must be a JSON object.",
)
return parsed
def _parse_iso_datetime(value: object) -> datetime:
if not isinstance(value, str) or not value.strip():
return utcnow()
normalized = value.strip().replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return utcnow()
if parsed.tzinfo is not None:
return parsed.astimezone(UTC).replace(tzinfo=None)
return parsed
def _parse_optional_iso_datetime(value: object) -> datetime | None:
if not isinstance(value, str) or not value.strip():
return None
normalized = value.strip().replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(UTC).replace(tzinfo=None)
return parsed
def _issue_number(issue_data: dict[str, Any]) -> int:
number = issue_data.get("number")
if isinstance(number, int):
return number
if isinstance(number, str) and number.isdigit():
return int(number)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Issue payload is missing a valid number.",
)
def _is_pull_request_payload(
*,
payload: dict[str, Any],
issue_data: dict[str, Any],
request: Request,
) -> bool:
webhook_event = (
request.headers.get("x-forgejo-event")
or request.headers.get("x-gitea-event")
or request.headers.get("x-github-event")
or ""
)
return bool(
payload.get("pull_request")
or issue_data.get("pull_request")
or payload.get("event_type") == "pull_request"
or webhook_event.lower() == "pull_request"
)
def _repository_mismatch(
*,
repository: ForgejoRepository,
payload: dict[str, Any],
) -> bool:
raw_repository = payload.get("repository")
if not isinstance(raw_repository, dict):
return False
full_name = raw_repository.get("full_name")
if isinstance(full_name, str) and full_name.strip():
return full_name.strip().lower() != f"{repository.owner}/{repository.repo}".lower()
name = raw_repository.get("name")
owner = raw_repository.get("owner")
owner_name = owner.get("login") if isinstance(owner, dict) else owner
if isinstance(name, str) and isinstance(owner_name, str):
return (
name.strip().lower() != repository.repo.lower()
or owner_name.strip().lower() != repository.owner.lower()
)
return False
def _labels(issue_data: dict[str, Any]) -> Any:
labels: list[dict[str, object]] = []
for label in issue_data.get("labels") or []:
if not isinstance(label, dict):
continue
labels.append(
{
"name": str(label.get("name") or ""),
"color": str(label.get("color") or ""),
"description": str(label.get("description") or ""),
}
)
return labels
def _assignees(issue_data: dict[str, Any]) -> Any:
assignees: list[dict[str, object]] = []
for assignee in issue_data.get("assignees") or []:
if not isinstance(assignee, dict):
continue
assignees.append(
{
"login": str(assignee.get("login") or ""),
"id": assignee.get("id") or 0,
"avatar_url": str(assignee.get("avatar_url") or ""),
}
)
return assignees
def _milestone(issue_data: dict[str, Any]) -> dict[str, object] | None:
raw = issue_data.get("milestone")
if not isinstance(raw, dict):
return None
return {
"id": raw.get("id"),
"title": str(raw.get("title") or ""),
"state": str(raw.get("state") or "open"),
"description": raw.get("description") or None,
"due_on": raw.get("due_on") or None,
"closed_at": raw.get("closed_at") or None,
}
def _author(issue_data: dict[str, Any]) -> str:
user = issue_data.get("user")
if isinstance(user, dict):
login = user.get("login")
if isinstance(login, str):
return login
return ""
def _issue_state(*, action: str, issue_data: dict[str, Any]) -> str:
if action == "closed":
return "closed"
if action == "reopened":
return "open"
state = issue_data.get("state")
return state if isinstance(state, str) and state else "open"
async def _linked_board_ids(
*,
session: AsyncSession,
repository_id: UUID,
) -> list[UUID]:
rows = (
await session.exec(
select(BoardRepositoryLink.board_id).where(
BoardRepositoryLink.repository_id == repository_id
)
)
).all()
return list(rows)
async def _upsert_issue(
*,
session: AsyncSession,
repository: ForgejoRepository,
action: str,
issue_data: dict[str, Any],
issue_payload: dict[str, Any] | None = None,
comments_payload: list[dict[str, object]] | None = None,
timeline_payload: list[dict[str, object]] | None = None,
reactions_payload: list[dict[str, object]] | None = None,
) -> tuple[ForgejoIssue, bool]:
payload = issue_payload if issue_payload is not None else issue_data
number = _issue_number(issue_data)
existing = (
await session.exec(
select(ForgejoIssue).where(
ForgejoIssue.repository_id == repository.id,
ForgejoIssue.forgejo_issue_number == number,
)
)
).first()
state = _issue_state(action=action, issue_data=issue_data)
closed_at = (
None
if state == "open"
else _parse_optional_iso_datetime(issue_data.get("closed_at")) or utcnow()
)
raw_body = payload.get("body") or ""
body_text = str(raw_body) if raw_body else None
milestone = _milestone(payload)
if existing is None:
issue = ForgejoIssue(
organization_id=repository.organization_id,
repository_id=repository.id,
forgejo_issue_number=number,
title=str(issue_data.get("title") or ""),
body=body_text,
body_preview=str(raw_body)[:1000] if raw_body else None,
state=state,
is_pull_request=False,
labels=_labels(payload),
assignees=_assignees(payload),
milestone=milestone,
forgejo_payload=dict(payload),
forgejo_comments_payload=comments_payload or [],
forgejo_timeline_payload=timeline_payload or [],
forgejo_reactions_payload=reactions_payload or [],
author=_author(payload),
html_url=str(payload.get("html_url") or ""),
forgejo_created_at=_parse_iso_datetime(payload.get("created_at")),
forgejo_updated_at=_parse_iso_datetime(payload.get("updated_at")),
forgejo_closed_at=closed_at,
)
session.add(issue)
await session.flush()
return issue, True
existing.title = str(issue_data.get("title") or "")
existing.body = body_text
existing.body_preview = str(raw_body)[:1000] if raw_body else None
existing.milestone = milestone
existing.state = state
existing.is_pull_request = False
existing.labels = _labels(payload)
existing.assignees = _assignees(payload)
existing.forgejo_payload = dict(payload)
if comments_payload is not None:
existing.forgejo_comments_payload = comments_payload
if timeline_payload is not None:
existing.forgejo_timeline_payload = timeline_payload
if reactions_payload is not None:
existing.forgejo_reactions_payload = reactions_payload
existing.author = _author(payload)
existing.html_url = str(payload.get("html_url") or "")
existing.forgejo_created_at = _parse_iso_datetime(payload.get("created_at"))
existing.forgejo_updated_at = _parse_iso_datetime(payload.get("updated_at"))
existing.forgejo_closed_at = closed_at
existing.last_synced_at = utcnow()
existing.updated_at = utcnow()
session.add(existing)
await session.flush()
return existing, False
def _activity_message(
*,
action: str,
repository: ForgejoRepository,
issue: ForgejoIssue,
) -> str:
return (
f"Forgejo issue {action}: "
f"{repository.owner}/{repository.repo}#{issue.forgejo_issue_number} - {issue.title}"
)
async def _record_issue_activity(
*,
session: AsyncSession,
action: str,
repository: ForgejoRepository,
issue: ForgejoIssue,
) -> None:
if action not in {"closed", "reopened"}:
return
board_ids = await _linked_board_ids(session=session, repository_id=repository.id)
if not board_ids:
record_activity(
session,
event_type=f"forgejo.issue.{action}",
message=_activity_message(
action=action,
repository=repository,
issue=issue,
),
)
return
for board_id in board_ids:
record_activity(
session,
event_type=f"forgejo.issue.{action}",
message=_activity_message(
action=action,
repository=repository,
issue=issue,
),
board_id=board_id,
)
@router.post(
"/{repository_id}",
response_model=ForgejoWebhookIngestResponse,
status_code=status.HTTP_202_ACCEPTED,
)
async def ingest_forgejo_webhook(
request: Request,
repository_id: UUID,
session: AsyncSession = SESSION_DEP,
) -> ForgejoWebhookIngestResponse:
"""Receive Forgejo issue webhooks and update cached issue records."""
repository = await crud.get_by_id(session, ForgejoRepository, repository_id)
if repository is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if not repository.active:
raise HTTPException(
status_code=status.HTTP_410_GONE,
detail="Forgejo repository is inactive.",
)
raw_body = await _read_limited_body(request)
_verify_signature(
secret=repository.webhook_secret,
raw_body=raw_body,
request=request,
)
payload = _decode_json_body(raw_body)
action = payload.get("action")
action = action if isinstance(action, str) else None
if action not in SUPPORTED_ISSUE_ACTIONS:
return ForgejoWebhookIngestResponse(
repository_id=repository.id,
action=action,
ignored=True,
reason="unsupported_action",
)
if _repository_mismatch(repository=repository, payload=payload):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Webhook repository does not match tracked repository.",
)
issue_data = payload.get("issue")
if not isinstance(issue_data, dict):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload is missing issue data.",
)
if _is_pull_request_payload(payload=payload, issue_data=issue_data, request=request):
return ForgejoWebhookIngestResponse(
repository_id=repository.id,
action=action,
ignored=True,
reason="pull_request_ignored",
)
issue_number = _issue_number(issue_data)
issue_payload_for_cache: dict[str, Any] = dict(issue_data)
comments_payload: list[dict[str, object]] | None = None
timeline_payload: list[dict[str, object]] | None = None
reactions_payload: list[dict[str, object]] | None = None
connection = await crud.get_by_id(session, ForgejoConnection, repository.connection_id)
if connection is not None and connection.token:
try:
async with get_forgejo_client(connection) as client:
remote_issue = await client.get_issue(
owner=repository.owner,
repo=repository.repo,
issue_number=issue_number,
)
if isinstance(remote_issue, dict):
issue_payload_for_cache = dict(remote_issue)
raw_comments = await client.list_issue_comments(
owner=repository.owner,
repo=repository.repo,
issue_number=issue_number,
)
raw_timeline = await client.list_issue_timeline(
owner=repository.owner,
repo=repository.repo,
issue_number=issue_number,
)
raw_reactions = await client.list_issue_reactions(
owner=repository.owner,
repo=repository.repo,
issue_number=issue_number,
)
comments_payload = [item for item in raw_comments if isinstance(item, dict)]
timeline_payload = [item for item in raw_timeline if isinstance(item, dict)]
reactions_payload = [item for item in raw_reactions if isinstance(item, dict)]
except Exception as exc:
logger.warning(
"forgejo.webhook.issue.enrichment_failed",
extra={
"repository_id": str(repository.id),
"issue_number": issue_number,
"error": str(exc),
},
)
issue, created = await _upsert_issue(
session=session,
repository=repository,
action=action,
issue_data=issue_data,
issue_payload=issue_payload_for_cache,
comments_payload=comments_payload,
timeline_payload=timeline_payload,
reactions_payload=reactions_payload,
)
await _record_issue_activity(
session=session,
action=action,
repository=repository,
issue=issue,
)
await session.commit()
logger.info(
"forgejo.webhook.issue.updated",
extra={
"repository_id": str(repository.id),
"issue_id": str(issue.id),
"issue_number": issue.forgejo_issue_number,
"action": action,
"issue_created": created,
},
)
return ForgejoWebhookIngestResponse(
repository_id=repository.id,
action=action,
issue_id=issue.id,
issue_number=issue.forgejo_issue_number,
ignored=False,
)