Pipeline/backend/app/api/forgejo_webhooks.py

467 lines
14 KiB
Python
Raw Normal View History

2026-05-19 04:16:32 -05:00
"""Forgejo webhook ingest endpoints for cached issue updates."""
from __future__ import annotations
import hashlib
import hmac
import json
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlmodel import select
from app.core.config import settings
from app.core.logging import get_logger
from app.core.time import utcnow
from app.db import crud
from app.db.session import get_session
from app.models.board_repository_links import BoardRepositoryLink
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
from app.schemas.forgejo_webhooks import ForgejoWebhookIngestResponse
from app.services.activity_log import record_activity
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
router = APIRouter(prefix="/forgejo/webhooks", tags=["forgejo-webhooks"])
SESSION_DEP = Depends(get_session)
logger = get_logger(__name__)
SUPPORTED_ISSUE_ACTIONS = {"opened", "edited", "closed", "reopened"}
SIGNATURE_HEADERS = (
"x-forgejo-signature",
"x-gitea-signature",
"x-hub-signature-256",
)
def _normalize_signature(value: str) -> str:
value = value.strip().lower()
if value.startswith("sha256="):
return value[7:]
return value
def _verify_signature(
*,
secret: str | None,
raw_body: bytes,
request: Request,
) -> None:
if not secret:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Forgejo webhook secret is not configured.",
)
signature = next(
2026-05-19 04:24:24 -05:00
(
request.headers.get(header)
for header in SIGNATURE_HEADERS
if request.headers.get(header)
),
2026-05-19 04:16:32 -05:00
None,
)
if not signature:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Missing Forgejo webhook signature.",
)
expected = hmac.new(secret.encode("utf-8"), raw_body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(_normalize_signature(signature), expected):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid Forgejo webhook signature.",
)
async def _read_limited_body(request: Request) -> bytes:
max_payload_bytes = settings.webhook_max_payload_bytes
content_length = request.headers.get("content-length")
try:
parsed_content_length = int(content_length) if content_length else 0
except (TypeError, ValueError):
parsed_content_length = 0
if parsed_content_length > max_payload_bytes:
raise HTTPException(
status_code=status.HTTP_413_CONTENT_TOO_LARGE,
detail=f"Payload exceeds maximum size of {max_payload_bytes} bytes.",
)
chunks: list[bytes] = []
total_size = 0
async for chunk in request.stream():
total_size += len(chunk)
if total_size > max_payload_bytes:
raise HTTPException(
status_code=status.HTTP_413_CONTENT_TOO_LARGE,
detail=f"Payload exceeds maximum size of {max_payload_bytes} bytes.",
)
chunks.append(chunk)
return b"".join(chunks)
def _decode_json_body(raw_body: bytes) -> dict[str, Any]:
if not raw_body:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload is required.",
)
try:
parsed = json.loads(raw_body.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload must be valid JSON.",
) from exc
if not isinstance(parsed, dict):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload must be a JSON object.",
)
return parsed
def _parse_iso_datetime(value: object) -> datetime:
if not isinstance(value, str) or not value.strip():
return utcnow()
normalized = value.strip().replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return utcnow()
if parsed.tzinfo is not None:
return parsed.astimezone(UTC).replace(tzinfo=None)
return parsed
def _parse_optional_iso_datetime(value: object) -> datetime | None:
if not isinstance(value, str) or not value.strip():
return None
normalized = value.strip().replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(UTC).replace(tzinfo=None)
return parsed
def _issue_number(issue_data: dict[str, Any]) -> int:
number = issue_data.get("number")
if isinstance(number, int):
return number
if isinstance(number, str) and number.isdigit():
return int(number)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Issue payload is missing a valid number.",
)
2026-05-19 04:24:24 -05:00
def _is_pull_request_payload(
*,
payload: dict[str, Any],
issue_data: dict[str, Any],
request: Request,
) -> bool:
webhook_event = (
request.headers.get("x-forgejo-event")
or request.headers.get("x-gitea-event")
or request.headers.get("x-github-event")
or ""
)
2026-05-19 04:16:32 -05:00
return bool(
payload.get("pull_request")
or issue_data.get("pull_request")
or payload.get("event_type") == "pull_request"
2026-05-19 04:24:24 -05:00
or webhook_event.lower() == "pull_request"
2026-05-19 04:16:32 -05:00
)
def _repository_mismatch(
*,
repository: ForgejoRepository,
payload: dict[str, Any],
) -> bool:
raw_repository = payload.get("repository")
if not isinstance(raw_repository, dict):
return False
full_name = raw_repository.get("full_name")
if isinstance(full_name, str) and full_name.strip():
return full_name.strip().lower() != f"{repository.owner}/{repository.repo}".lower()
name = raw_repository.get("name")
owner = raw_repository.get("owner")
owner_name = owner.get("login") if isinstance(owner, dict) else owner
if isinstance(name, str) and isinstance(owner_name, str):
return (
name.strip().lower() != repository.repo.lower()
or owner_name.strip().lower() != repository.owner.lower()
)
return False
def _labels(issue_data: dict[str, Any]) -> Any:
labels: list[dict[str, object]] = []
for label in issue_data.get("labels") or []:
if not isinstance(label, dict):
continue
labels.append(
{
"name": str(label.get("name") or ""),
"color": str(label.get("color") or ""),
"description": str(label.get("description") or ""),
}
)
return labels
def _assignees(issue_data: dict[str, Any]) -> Any:
assignees: list[dict[str, object]] = []
for assignee in issue_data.get("assignees") or []:
if not isinstance(assignee, dict):
continue
assignees.append(
{
"login": str(assignee.get("login") or ""),
"id": assignee.get("id") or 0,
"avatar_url": str(assignee.get("avatar_url") or ""),
}
)
return assignees
def _author(issue_data: dict[str, Any]) -> str:
user = issue_data.get("user")
if isinstance(user, dict):
login = user.get("login")
if isinstance(login, str):
return login
return ""
def _issue_state(*, action: str, issue_data: dict[str, Any]) -> str:
if action == "closed":
return "closed"
if action == "reopened":
return "open"
state = issue_data.get("state")
return state if isinstance(state, str) and state else "open"
async def _linked_board_ids(
*,
session: AsyncSession,
repository_id: UUID,
) -> list[UUID]:
rows = (
await session.exec(
select(BoardRepositoryLink.board_id).where(
BoardRepositoryLink.repository_id == repository_id
)
)
).all()
return list(rows)
async def _upsert_issue(
*,
session: AsyncSession,
repository: ForgejoRepository,
action: str,
issue_data: dict[str, Any],
) -> tuple[ForgejoIssue, bool]:
number = _issue_number(issue_data)
existing = (
await session.exec(
select(ForgejoIssue).where(
ForgejoIssue.repository_id == repository.id,
ForgejoIssue.forgejo_issue_number == number,
)
)
).first()
state = _issue_state(action=action, issue_data=issue_data)
closed_at = (
None
if state == "open"
else _parse_optional_iso_datetime(issue_data.get("closed_at")) or utcnow()
)
if existing is None:
issue = ForgejoIssue(
organization_id=repository.organization_id,
repository_id=repository.id,
forgejo_issue_number=number,
title=str(issue_data.get("title") or ""),
body_preview=str(issue_data.get("body") or "")[:1000],
state=state,
is_pull_request=False,
labels=_labels(issue_data),
assignees=_assignees(issue_data),
author=_author(issue_data),
html_url=str(issue_data.get("html_url") or ""),
forgejo_created_at=_parse_iso_datetime(issue_data.get("created_at")),
forgejo_updated_at=_parse_iso_datetime(issue_data.get("updated_at")),
forgejo_closed_at=closed_at,
)
session.add(issue)
await session.flush()
return issue, True
existing.title = str(issue_data.get("title") or "")
existing.body_preview = str(issue_data.get("body") or "")[:1000]
existing.state = state
existing.is_pull_request = False
existing.labels = _labels(issue_data)
existing.assignees = _assignees(issue_data)
existing.author = _author(issue_data)
existing.html_url = str(issue_data.get("html_url") or "")
existing.forgejo_created_at = _parse_iso_datetime(issue_data.get("created_at"))
existing.forgejo_updated_at = _parse_iso_datetime(issue_data.get("updated_at"))
existing.forgejo_closed_at = closed_at
existing.last_synced_at = utcnow()
existing.updated_at = utcnow()
session.add(existing)
await session.flush()
return existing, False
def _activity_message(
*,
action: str,
repository: ForgejoRepository,
issue: ForgejoIssue,
) -> str:
return (
2026-05-19 04:24:24 -05:00
f"Forgejo issue {action}: "
2026-05-19 04:16:32 -05:00
f"{repository.owner}/{repository.repo}#{issue.forgejo_issue_number} - {issue.title}"
)
async def _record_issue_activity(
*,
session: AsyncSession,
action: str,
repository: ForgejoRepository,
issue: ForgejoIssue,
) -> None:
if action not in {"closed", "reopened"}:
return
board_ids = await _linked_board_ids(session=session, repository_id=repository.id)
if not board_ids:
record_activity(
session,
event_type=f"forgejo.issue.{action}",
message=_activity_message(
action=action,
repository=repository,
issue=issue,
),
)
return
for board_id in board_ids:
record_activity(
session,
event_type=f"forgejo.issue.{action}",
message=_activity_message(
action=action,
repository=repository,
issue=issue,
),
board_id=board_id,
)
@router.post(
"/{repository_id}",
response_model=ForgejoWebhookIngestResponse,
status_code=status.HTTP_202_ACCEPTED,
)
async def ingest_forgejo_webhook(
request: Request,
repository_id: UUID,
session: AsyncSession = SESSION_DEP,
) -> ForgejoWebhookIngestResponse:
"""Receive Forgejo issue webhooks and update cached issue records."""
repository = await crud.get_by_id(session, ForgejoRepository, repository_id)
if repository is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if not repository.active:
raise HTTPException(
status_code=status.HTTP_410_GONE,
detail="Forgejo repository is inactive.",
)
raw_body = await _read_limited_body(request)
_verify_signature(
secret=repository.webhook_secret,
raw_body=raw_body,
request=request,
)
payload = _decode_json_body(raw_body)
action = payload.get("action")
action = action if isinstance(action, str) else None
if action not in SUPPORTED_ISSUE_ACTIONS:
return ForgejoWebhookIngestResponse(
repository_id=repository.id,
action=action,
ignored=True,
reason="unsupported_action",
)
if _repository_mismatch(repository=repository, payload=payload):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Webhook repository does not match tracked repository.",
)
issue_data = payload.get("issue")
if not isinstance(issue_data, dict):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload is missing issue data.",
)
2026-05-19 04:24:24 -05:00
if _is_pull_request_payload(payload=payload, issue_data=issue_data, request=request):
2026-05-19 04:16:32 -05:00
return ForgejoWebhookIngestResponse(
repository_id=repository.id,
action=action,
ignored=True,
reason="pull_request_ignored",
)
issue, created = await _upsert_issue(
session=session,
repository=repository,
action=action,
issue_data=issue_data,
)
await _record_issue_activity(
session=session,
action=action,
repository=repository,
issue=issue,
)
await session.commit()
logger.info(
"forgejo.webhook.issue.updated",
extra={
"repository_id": str(repository.id),
"issue_id": str(issue.id),
"issue_number": issue.forgejo_issue_number,
"action": action,
2026-05-19 04:24:24 -05:00
"issue_created": created,
2026-05-19 04:16:32 -05:00
},
)
return ForgejoWebhookIngestResponse(
repository_id=repository.id,
action=action,
issue_id=issue.id,
issue_number=issue.forgejo_issue_number,
ignored=False,
)