feat: webhooks

This commit is contained in:
null 2026-05-19 04:16:32 -05:00
parent ae3786f64b
commit ab73770d16
7 changed files with 513 additions and 63 deletions

View File

@ -177,7 +177,7 @@ async def close_issue(
link_statement = select(BoardRepositoryLink).where(
BoardRepositoryLink.repository_id == issue.repository_id,
)
link = await session.exec(link_statement).first()
link = (await session.exec(link_statement)).first()
if link is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,

View File

@ -343,6 +343,7 @@ def _mask_repository(repository: ForgejoRepository, connection: ForgejoConnectio
"default_branch": repository.default_branch,
"active": repository.active,
"connection": _create_connection_info(connection) if connection is not None else None,
"has_webhook_secret": bool(repository.webhook_secret),
"last_sync_at": repository.last_sync_at,
"last_sync_error": repository.last_sync_error,
"created_at": repository.created_at,

View File

@ -0,0 +1,458 @@
"""Forgejo webhook ingest endpoints for cached issue updates."""
from __future__ import annotations
import hashlib
import hmac
import json
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlmodel import select
from app.core.config import settings
from app.core.logging import get_logger
from app.core.time import utcnow
from app.db import crud
from app.db.session import get_session
from app.models.activity_events import ActivityEvent
from app.models.board_repository_links import BoardRepositoryLink
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
from app.schemas.forgejo_webhooks import ForgejoWebhookIngestResponse
from app.services.activity_log import record_activity
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
router = APIRouter(prefix="/forgejo/webhooks", tags=["forgejo-webhooks"])
SESSION_DEP = Depends(get_session)
logger = get_logger(__name__)
SUPPORTED_ISSUE_ACTIONS = {"opened", "edited", "closed", "reopened"}
SIGNATURE_HEADERS = (
"x-forgejo-signature",
"x-gitea-signature",
"x-hub-signature-256",
)
def _normalize_signature(value: str) -> str:
value = value.strip().lower()
if value.startswith("sha256="):
return value[7:]
return value
def _verify_signature(
*,
secret: str | None,
raw_body: bytes,
request: Request,
) -> None:
if not secret:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Forgejo webhook secret is not configured.",
)
signature = next(
(request.headers.get(header) for header in SIGNATURE_HEADERS if request.headers.get(header)),
None,
)
if not signature:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Missing Forgejo webhook signature.",
)
expected = hmac.new(secret.encode("utf-8"), raw_body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(_normalize_signature(signature), expected):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid Forgejo webhook signature.",
)
async def _read_limited_body(request: Request) -> bytes:
max_payload_bytes = settings.webhook_max_payload_bytes
content_length = request.headers.get("content-length")
try:
parsed_content_length = int(content_length) if content_length else 0
except (TypeError, ValueError):
parsed_content_length = 0
if parsed_content_length > max_payload_bytes:
raise HTTPException(
status_code=status.HTTP_413_CONTENT_TOO_LARGE,
detail=f"Payload exceeds maximum size of {max_payload_bytes} bytes.",
)
chunks: list[bytes] = []
total_size = 0
async for chunk in request.stream():
total_size += len(chunk)
if total_size > max_payload_bytes:
raise HTTPException(
status_code=status.HTTP_413_CONTENT_TOO_LARGE,
detail=f"Payload exceeds maximum size of {max_payload_bytes} bytes.",
)
chunks.append(chunk)
return b"".join(chunks)
def _decode_json_body(raw_body: bytes) -> dict[str, Any]:
if not raw_body:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload is required.",
)
try:
parsed = json.loads(raw_body.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload must be valid JSON.",
) from exc
if not isinstance(parsed, dict):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload must be a JSON object.",
)
return parsed
def _parse_iso_datetime(value: object) -> datetime:
if not isinstance(value, str) or not value.strip():
return utcnow()
normalized = value.strip().replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return utcnow()
if parsed.tzinfo is not None:
return parsed.astimezone(UTC).replace(tzinfo=None)
return parsed
def _parse_optional_iso_datetime(value: object) -> datetime | None:
if not isinstance(value, str) or not value.strip():
return None
normalized = value.strip().replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(UTC).replace(tzinfo=None)
return parsed
def _issue_number(issue_data: dict[str, Any]) -> int:
number = issue_data.get("number")
if isinstance(number, int):
return number
if isinstance(number, str) and number.isdigit():
return int(number)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Issue payload is missing a valid number.",
)
def _is_pull_request_payload(payload: dict[str, Any], issue_data: dict[str, Any]) -> bool:
return bool(
payload.get("pull_request")
or issue_data.get("pull_request")
or payload.get("event_type") == "pull_request"
)
def _repository_mismatch(
*,
repository: ForgejoRepository,
payload: dict[str, Any],
) -> bool:
raw_repository = payload.get("repository")
if not isinstance(raw_repository, dict):
return False
full_name = raw_repository.get("full_name")
if isinstance(full_name, str) and full_name.strip():
return full_name.strip().lower() != f"{repository.owner}/{repository.repo}".lower()
name = raw_repository.get("name")
owner = raw_repository.get("owner")
owner_name = owner.get("login") if isinstance(owner, dict) else owner
if isinstance(name, str) and isinstance(owner_name, str):
return (
name.strip().lower() != repository.repo.lower()
or owner_name.strip().lower() != repository.owner.lower()
)
return False
def _labels(issue_data: dict[str, Any]) -> Any:
labels: list[dict[str, object]] = []
for label in issue_data.get("labels") or []:
if not isinstance(label, dict):
continue
labels.append(
{
"name": str(label.get("name") or ""),
"color": str(label.get("color") or ""),
"description": str(label.get("description") or ""),
}
)
return labels
def _assignees(issue_data: dict[str, Any]) -> Any:
assignees: list[dict[str, object]] = []
for assignee in issue_data.get("assignees") or []:
if not isinstance(assignee, dict):
continue
assignees.append(
{
"login": str(assignee.get("login") or ""),
"id": assignee.get("id") or 0,
"avatar_url": str(assignee.get("avatar_url") or ""),
}
)
return assignees
def _author(issue_data: dict[str, Any]) -> str:
user = issue_data.get("user")
if isinstance(user, dict):
login = user.get("login")
if isinstance(login, str):
return login
return ""
def _issue_state(*, action: str, issue_data: dict[str, Any]) -> str:
if action == "closed":
return "closed"
if action == "reopened":
return "open"
state = issue_data.get("state")
return state if isinstance(state, str) and state else "open"
async def _linked_board_ids(
*,
session: AsyncSession,
repository_id: UUID,
) -> list[UUID]:
rows = (
await session.exec(
select(BoardRepositoryLink.board_id).where(
BoardRepositoryLink.repository_id == repository_id
)
)
).all()
return list(rows)
async def _upsert_issue(
*,
session: AsyncSession,
repository: ForgejoRepository,
action: str,
issue_data: dict[str, Any],
) -> tuple[ForgejoIssue, bool]:
number = _issue_number(issue_data)
existing = (
await session.exec(
select(ForgejoIssue).where(
ForgejoIssue.repository_id == repository.id,
ForgejoIssue.forgejo_issue_number == number,
)
)
).first()
state = _issue_state(action=action, issue_data=issue_data)
closed_at = (
None
if state == "open"
else _parse_optional_iso_datetime(issue_data.get("closed_at")) or utcnow()
)
if existing is None:
issue = ForgejoIssue(
organization_id=repository.organization_id,
repository_id=repository.id,
forgejo_issue_number=number,
title=str(issue_data.get("title") or ""),
body_preview=str(issue_data.get("body") or "")[:1000],
state=state,
is_pull_request=False,
labels=_labels(issue_data),
assignees=_assignees(issue_data),
author=_author(issue_data),
html_url=str(issue_data.get("html_url") or ""),
forgejo_created_at=_parse_iso_datetime(issue_data.get("created_at")),
forgejo_updated_at=_parse_iso_datetime(issue_data.get("updated_at")),
forgejo_closed_at=closed_at,
)
session.add(issue)
await session.flush()
return issue, True
existing.title = str(issue_data.get("title") or "")
existing.body_preview = str(issue_data.get("body") or "")[:1000]
existing.state = state
existing.is_pull_request = False
existing.labels = _labels(issue_data)
existing.assignees = _assignees(issue_data)
existing.author = _author(issue_data)
existing.html_url = str(issue_data.get("html_url") or "")
existing.forgejo_created_at = _parse_iso_datetime(issue_data.get("created_at"))
existing.forgejo_updated_at = _parse_iso_datetime(issue_data.get("updated_at"))
existing.forgejo_closed_at = closed_at
existing.last_synced_at = utcnow()
existing.updated_at = utcnow()
session.add(existing)
await session.flush()
return existing, False
def _activity_message(
*,
action: str,
repository: ForgejoRepository,
issue: ForgejoIssue,
created: bool,
) -> str:
action_label = "created" if created else action
return (
f"Forgejo issue {action_label}: "
f"{repository.owner}/{repository.repo}#{issue.forgejo_issue_number} - {issue.title}"
)
async def _record_issue_activity(
*,
session: AsyncSession,
action: str,
repository: ForgejoRepository,
issue: ForgejoIssue,
created: bool,
) -> None:
if action not in {"closed", "reopened"}:
return
board_ids = await _linked_board_ids(session=session, repository_id=repository.id)
if not board_ids:
record_activity(
session,
event_type=f"forgejo.issue.{action}",
message=_activity_message(
action=action,
repository=repository,
issue=issue,
created=created,
),
)
return
for board_id in board_ids:
record_activity(
session,
event_type=f"forgejo.issue.{action}",
message=_activity_message(
action=action,
repository=repository,
issue=issue,
created=created,
),
board_id=board_id,
)
@router.post(
"/{repository_id}",
response_model=ForgejoWebhookIngestResponse,
status_code=status.HTTP_202_ACCEPTED,
)
async def ingest_forgejo_webhook(
request: Request,
repository_id: UUID,
session: AsyncSession = SESSION_DEP,
) -> ForgejoWebhookIngestResponse:
"""Receive Forgejo issue webhooks and update cached issue records."""
repository = await crud.get_by_id(session, ForgejoRepository, repository_id)
if repository is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if not repository.active:
raise HTTPException(
status_code=status.HTTP_410_GONE,
detail="Forgejo repository is inactive.",
)
raw_body = await _read_limited_body(request)
_verify_signature(
secret=repository.webhook_secret,
raw_body=raw_body,
request=request,
)
payload = _decode_json_body(raw_body)
action = payload.get("action")
action = action if isinstance(action, str) else None
if action not in SUPPORTED_ISSUE_ACTIONS:
return ForgejoWebhookIngestResponse(
repository_id=repository.id,
action=action,
ignored=True,
reason="unsupported_action",
)
if _repository_mismatch(repository=repository, payload=payload):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Webhook repository does not match tracked repository.",
)
issue_data = payload.get("issue")
if not isinstance(issue_data, dict):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload is missing issue data.",
)
if _is_pull_request_payload(payload, issue_data):
return ForgejoWebhookIngestResponse(
repository_id=repository.id,
action=action,
ignored=True,
reason="pull_request_ignored",
)
issue, created = await _upsert_issue(
session=session,
repository=repository,
action=action,
issue_data=issue_data,
)
await _record_issue_activity(
session=session,
action=action,
repository=repository,
issue=issue,
created=created,
)
await session.commit()
logger.info(
"forgejo.webhook.issue.updated",
extra={
"repository_id": str(repository.id),
"issue_id": str(issue.id),
"issue_number": issue.forgejo_issue_number,
"action": action,
"created": created,
},
)
return ForgejoWebhookIngestResponse(
repository_id=repository.id,
action=action,
issue_id=issue.id,
issue_number=issue.forgejo_issue_number,
ignored=False,
)

View File

@ -26,6 +26,7 @@ class ForgejoRepository(QueryModel, table=True):
display_name: str = Field(default="")
default_branch: str = Field(default="main")
active: bool = Field(default=True)
webhook_secret: str | None = Field(default=None)
last_sync_at: datetime | None = Field(default=None)
last_sync_error: str | None = Field(default=None)
created_at: datetime = Field(default_factory=utcnow)

View File

@ -36,7 +36,20 @@ class ForgejoRepositoryBase(SQLModel):
class ForgejoRepositoryCreate(ForgejoRepositoryBase):
"""Payload for creating a Forgejo repository tracked configuration."""
connection_id: UUID
webhook_secret: str | None = None
@field_validator("webhook_secret", mode="before")
@classmethod
def normalize_webhook_secret(cls, value: object) -> str | None | object:
"""Normalize empty webhook secrets to null."""
if value is None:
return None
if isinstance(value, str):
value = value.strip()
return value or None
return value
class ForgejoRepositoryUpdate(SQLModel):
@ -48,6 +61,7 @@ class ForgejoRepositoryUpdate(SQLModel):
display_name: str | None = None
default_branch: str | None = None
active: bool | None = None
webhook_secret: str | None = None
@field_validator("owner", "repo", mode="before")
@classmethod
@ -62,6 +76,17 @@ class ForgejoRepositoryUpdate(SQLModel):
return value
return value
@field_validator("webhook_secret", mode="before")
@classmethod
def normalize_webhook_secret(cls, value: object) -> str | None | object:
"""Normalize empty webhook secrets to null."""
if value is None:
return None
if isinstance(value, str):
value = value.strip()
return value or None
return value
class ForgejoRepositoryConnectionInfo(SQLModel):
"""Safe connection metadata included in repository read responses."""
@ -82,6 +107,7 @@ class ForgejoRepositoryRead(ForgejoRepositoryBase):
organization_id: UUID
connection_id: UUID
connection: ForgejoRepositoryConnectionInfo
has_webhook_secret: bool = False
last_sync_at: datetime | None
last_sync_error: str | None
created_at: datetime

View File

@ -0,0 +1,22 @@
"""Schemas for Forgejo webhook ingest responses."""
from __future__ import annotations
from uuid import UUID
from sqlmodel import SQLModel
RUNTIME_ANNOTATION_TYPES = (UUID,)
class ForgejoWebhookIngestResponse(SQLModel):
"""Response returned after receiving a Forgejo webhook delivery."""
ok: bool = True
repository_id: UUID
action: str | None = None
issue_id: UUID | None = None
issue_number: int | None = None
ignored: bool = False
reason: str | None = None

View File

@ -1,79 +1,21 @@
"use client";
import { useMemo, useState } from "react";
import { useMemo } from "react";
import { type ColumnDef, getCoreRowModel, useReactTable } from "@tanstack/react-table";
import { MoreHorizontal, XCircle } from "lucide-react";
import { XCircle } from "lucide-react";
import { Button } from "@/components/ui/button";
import { Badge } from "@/components/ui/badge";
import { DataTable } from "@/components/tables/DataTable";
import { Dialog, DialogContent, DialogHeader, DialogTitle, DialogDescription, DialogFooter } from "@/components/ui/dialog";
import { CloseForgejoIssueDialog } from "@/components/git/CloseForgejoIssueDialog";
import type { ForgejoIssue } from "@/lib/api-forgejo";
import { closeForgejoIssue } from "@/lib/api-forgejo";
export type ForgejoIssuesTableProps = {
issues: ForgejoIssue[];
onRefresh: () => void;
};
type CloseIssueDialogProps = {
issue: ForgejoIssue | null;
open: boolean;
onOpenChange: (open: boolean) => void;
onCloseSuccess: () => void;
};
function CloseIssueDialog({ issue, open, onOpenChange, onCloseSuccess }: CloseIssueDialogProps) {
const [isClosing, setIsClosing] = useState(false);
const [error, setError] = useState<string | null>(null);
if (!issue) return null;
const handleClose = async () => {
setIsClosing(true);
setError(null);
try {
await closeForgejoIssue(issue.id);
onCloseSuccess();
onOpenChange(false);
} catch (err) {
const message = err instanceof Error ? err.message : "Failed to close issue";
setError(message);
} finally {
setIsClosing(false);
}
};
return (
<Dialog open={open} onOpenChange={onOpenChange}>
<DialogContent>
<DialogHeader>
<DialogTitle>Close Issue</DialogTitle>
<DialogDescription>
Are you sure you want to close issue{" "}
<span className="font-mono font-semibold">#{issue.forgejo_issue_number}</span> in{" "}
<span className="font-mono font-semibold">{issue.repository_id}</span>?
</DialogDescription>
</DialogHeader>
{error && (
<div className="rounded-lg border border-[color:var(--border)] bg-[color:var(--surface-muted)] p-3 text-xs text-muted">
{error}
</div>
)}
<DialogFooter>
<Button variant="outline" onClick={() => onOpenChange(false)}>
Cancel
</Button>
<Button onClick={handleClose} disabled={isClosing}>
{isClosing ? "Closing…" : "Close Issue"}
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
);
}
export function ForgejoIssuesTable({ issues, onRefresh }: ForgejoIssuesTableProps) {
const [closeIssueDialogOpen, setCloseIssueDialogOpen] = useState(false);
const [issueToClose, setIssueToClose] = useState<ForgejoIssue | null>(null);
@ -230,7 +172,7 @@ export function ForgejoIssuesTable({ issues, onRefresh }: ForgejoIssuesTableProp
description: "Sync a repository to pull in issues, or adjust your filters.",
}}
/>
<CloseIssueDialog
<CloseForgejoIssueDialog
issue={issueToClose}
open={closeIssueDialogOpen}
onOpenChange={setCloseIssueDialogOpen}