feat: webhook

This commit is contained in:
null 2026-05-19 04:24:24 -05:00
parent ab73770d16
commit 4878724bed
6 changed files with 431 additions and 18 deletions

View File

@ -17,7 +17,6 @@ from app.core.logging import get_logger
from app.core.time import utcnow
from app.db import crud
from app.db.session import get_session
from app.models.activity_events import ActivityEvent
from app.models.board_repository_links import BoardRepositoryLink
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
@ -58,7 +57,11 @@ def _verify_signature(
detail="Forgejo webhook secret is not configured.",
)
signature = next(
(request.headers.get(header) for header in SIGNATURE_HEADERS if request.headers.get(header)),
(
request.headers.get(header)
for header in SIGNATURE_HEADERS
if request.headers.get(header)
),
None,
)
if not signature:
@ -159,11 +162,23 @@ def _issue_number(issue_data: dict[str, Any]) -> int:
)
def _is_pull_request_payload(payload: dict[str, Any], issue_data: dict[str, Any]) -> bool:
def _is_pull_request_payload(
*,
payload: dict[str, Any],
issue_data: dict[str, Any],
request: Request,
) -> bool:
webhook_event = (
request.headers.get("x-forgejo-event")
or request.headers.get("x-gitea-event")
or request.headers.get("x-github-event")
or ""
)
return bool(
payload.get("pull_request")
or issue_data.get("pull_request")
or payload.get("event_type") == "pull_request"
or webhook_event.lower() == "pull_request"
)
@ -321,11 +336,9 @@ def _activity_message(
action: str,
repository: ForgejoRepository,
issue: ForgejoIssue,
created: bool,
) -> str:
action_label = "created" if created else action
return (
f"Forgejo issue {action_label}: "
f"Forgejo issue {action}: "
f"{repository.owner}/{repository.repo}#{issue.forgejo_issue_number} - {issue.title}"
)
@ -336,7 +349,6 @@ async def _record_issue_activity(
action: str,
repository: ForgejoRepository,
issue: ForgejoIssue,
created: bool,
) -> None:
if action not in {"closed", "reopened"}:
return
@ -349,7 +361,6 @@ async def _record_issue_activity(
action=action,
repository=repository,
issue=issue,
created=created,
),
)
return
@ -361,7 +372,6 @@ async def _record_issue_activity(
action=action,
repository=repository,
issue=issue,
created=created,
),
board_id=board_id,
)
@ -416,7 +426,7 @@ async def ingest_forgejo_webhook(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Webhook payload is missing issue data.",
)
if _is_pull_request_payload(payload, issue_data):
if _is_pull_request_payload(payload=payload, issue_data=issue_data, request=request):
return ForgejoWebhookIngestResponse(
repository_id=repository.id,
action=action,
@ -435,7 +445,6 @@ async def ingest_forgejo_webhook(
action=action,
repository=repository,
issue=issue,
created=created,
)
await session.commit()
logger.info(
@ -445,7 +454,7 @@ async def ingest_forgejo_webhook(
"issue_id": str(issue.id),
"issue_number": issue.forgejo_issue_number,
"action": action,
"created": created,
"issue_created": created,
},
)
return ForgejoWebhookIngestResponse(
@ -455,4 +464,3 @@ async def ingest_forgejo_webhook(
issue_number=issue.forgejo_issue_number,
ignored=False,
)

View File

@ -12,6 +12,7 @@ from fastapi_pagination import add_pagination
from app.api.activity import router as activity_router
from app.api.agent import router as agent_router
from app.api.agent_forgejo import router as agent_forgejo_router
from app.api.agents import router as agents_router
from app.api.approvals import router as approvals_router
from app.api.auth import router as auth_router
@ -19,14 +20,14 @@ from app.api.board_group_memory import router as board_group_memory_router
from app.api.board_groups import router as board_groups_router
from app.api.board_memory import router as board_memory_router
from app.api.board_onboarding import router as board_onboarding_router
from app.api.board_repository_links import router as board_repository_links_router
from app.api.board_webhooks import router as board_webhooks_router
from app.api.boards import router as boards_router
from app.api.forgejo_connections import router as forgejo_connections_router
from app.api.forgejo_issues import router as forgejo_issues_router
from app.api.forgejo_metrics import router as forgejo_metrics_router
from app.api.forgejo_repositories import router as forgejo_repositories_router
from app.api.board_repository_links import router as board_repository_links_router
from app.api.agent_forgejo import router as agent_forgejo_router
from app.api.forgejo_webhooks import router as forgejo_webhooks_router
from app.api.gateway import router as gateway_router
from app.api.gateways import router as gateways_router
from app.api.metrics import router as metrics_router
@ -88,6 +89,10 @@ OPENAPI_TAGS = [
"name": "forgejo-repositories",
"description": "Forgejo repository tracking and sync management endpoints.",
},
{
"name": "forgejo-webhooks",
"description": "Forgejo webhook receiver endpoints for cached issue updates.",
},
{
"name": "metrics",
"description": "Aggregated operational and board analytics metrics endpoints.",
@ -192,6 +197,7 @@ _OPENAPI_EXAMPLE_TAGS = {
"boards",
"board-memory",
"board-webhooks",
"forgejo-webhooks",
"board-onboarding",
"approvals",
"tasks",
@ -563,6 +569,7 @@ api_v1.include_router(forgejo_connections_router)
api_v1.include_router(forgejo_issues_router)
api_v1.include_router(forgejo_metrics_router)
api_v1.include_router(forgejo_repositories_router)
api_v1.include_router(forgejo_webhooks_router)
api_v1.include_router(board_repository_links_router)
api_v1.include_router(agent_forgejo_router)
api_v1.include_router(gateway_router)

View File

@ -13,6 +13,7 @@ from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
from app.models.gateways import Gateway
from app.models.organization_board_access import OrganizationBoardAccess
@ -47,6 +48,7 @@ __all__ = [
"Board",
"BoardRepositoryLink",
"ForgejoConnection",
"ForgejoIssue",
"ForgejoRepository",
"Gateway",
"GatewayInstalledSkill",

View File

@ -5,7 +5,7 @@ from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
from sqlalchemy import Column, JSON
from sqlalchemy import JSON, Column
from sqlmodel import Field, Index, SQLModel
from app.core.time import utcnow
@ -29,7 +29,7 @@ class ForgejoIssue(SQLModel, table=True):
is_pull_request: bool = Field(default=False)
# JSON fields for complex data
labels: dict[str, object] = Field(default_factory=dict, sa_column=Column(JSON))
labels: list[dict[str, object]] = Field(default_factory=list, sa_column=Column(JSON))
assignees: list[dict[str, object]] = Field(default_factory=list, sa_column=Column(JSON))
author: str
@ -44,5 +44,7 @@ class ForgejoIssue(SQLModel, table=True):
updated_at: datetime = Field(default_factory=utcnow)
__table_args__ = (
Index("ix_forgejo_issues_repo_number", "repository_id", "forgejo_issue_number", unique=True),
Index(
"ix_forgejo_issues_repo_number", "repository_id", "forgejo_issue_number", unique=True
),
)

View File

@ -0,0 +1,41 @@
"""Add webhook secret to Forgejo repositories.
Revision ID: b6c7d8e9f0a1
Revises: a1b2c3d4e5f7
Create Date: 2026-05-19 12:00:00.000000
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "b6c7d8e9f0a1"
down_revision = "a1b2c3d4e5f7"
branch_labels = None
depends_on = None
def _has_column(table_name: str, column_name: str) -> bool:
bind = op.get_bind()
inspector = sa.inspect(bind)
if not inspector.has_table(table_name):
return False
return any(column["name"] == column_name for column in inspector.get_columns(table_name))
def upgrade() -> None:
"""Store the shared secret used by Forgejo webhooks."""
if not _has_column("forgejo_repositories", "webhook_secret"):
op.add_column(
"forgejo_repositories",
sa.Column("webhook_secret", sa.String(), nullable=True),
)
def downgrade() -> None:
"""Remove the Forgejo webhook shared secret column."""
if _has_column("forgejo_repositories", "webhook_secret"):
op.drop_column("forgejo_repositories", "webhook_secret")

View File

@ -0,0 +1,353 @@
# ruff: noqa: INP001
"""Integration tests for Forgejo issue webhook ingestion."""
from __future__ import annotations
import hashlib
import hmac
import json
from datetime import datetime
from uuid import uuid4
import pytest
from fastapi import APIRouter, FastAPI
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlmodel import SQLModel, col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app import models as _models
from app.api.forgejo_webhooks import router as forgejo_webhooks_router
from app.db.session import get_session
from app.models.activity_events import ActivityEvent
from app.models.board_repository_links import BoardRepositoryLink
from app.models.boards import Board
from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
from app.models.organizations import Organization
async def _make_engine() -> AsyncEngine:
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
return engine
def _build_test_app(
session_maker: async_sessionmaker[AsyncSession],
) -> FastAPI:
app = FastAPI()
api_v1 = APIRouter(prefix="/api/v1")
api_v1.include_router(forgejo_webhooks_router)
app.include_router(api_v1)
async def _override_get_session() -> AsyncSession:
async with session_maker() as session:
yield session
app.dependency_overrides[get_session] = _override_get_session
return app
async def _seed_repository(
session: AsyncSession,
*,
with_board_link: bool = False,
with_issue: bool = True,
) -> tuple[ForgejoRepository, ForgejoIssue | None, Board | None]:
organization_id = uuid4()
connection_id = uuid4()
repository_id = uuid4()
session.add(Organization(id=organization_id, name=f"org-{organization_id}"))
session.add(
ForgejoConnection(
id=connection_id,
organization_id=organization_id,
name="Forgejo",
base_url="https://forgejo.example.local",
)
)
repository = ForgejoRepository(
id=repository_id,
organization_id=organization_id,
connection_id=connection_id,
owner="openclaw",
repo="pipeline",
display_name="Pipeline",
webhook_secret="shared-secret",
)
session.add(repository)
board: Board | None = None
if with_board_link:
board = Board(
id=uuid4(),
organization_id=organization_id,
name="Pipeline Board",
slug=f"pipeline-{repository_id}",
description="Issue tracking board.",
)
session.add(board)
session.add(
BoardRepositoryLink(
id=uuid4(),
organization_id=organization_id,
board_id=board.id,
repository_id=repository_id,
)
)
issue: ForgejoIssue | None = None
if with_issue:
issue = ForgejoIssue(
id=uuid4(),
organization_id=organization_id,
repository_id=repository_id,
forgejo_issue_number=42,
title="Old title",
body_preview="Old body",
state="open",
is_pull_request=False,
labels=[],
assignees=[],
author="alice",
html_url="https://forgejo.example.local/openclaw/pipeline/issues/42",
forgejo_created_at=datetime(2026, 5, 18, 8, 0, 0),
forgejo_updated_at=datetime(2026, 5, 18, 9, 0, 0),
)
session.add(issue)
await session.commit()
return repository, issue, board
def _encoded_payload(payload: dict[str, object]) -> bytes:
return json.dumps(payload, separators=(",", ":")).encode("utf-8")
def _signature(raw_body: bytes, secret: str = "shared-secret") -> str:
digest = hmac.new(secret.encode("utf-8"), raw_body, hashlib.sha256).hexdigest()
return f"sha256={digest}"
def _issue_payload(*, action: str, number: int = 42) -> dict[str, object]:
return {
"action": action,
"repository": {"full_name": "openclaw/pipeline"},
"issue": {
"number": number,
"title": "Fix webhook cache updates",
"body": "Webhook body from Forgejo.",
"state": "closed" if action == "closed" else "open",
"labels": [{"name": "bug", "color": "f43f5e", "description": "Bug report"}],
"assignees": [{"login": "codex", "id": 7, "avatar_url": "https://avatar.test/c"}],
"user": {"login": "kaspa"},
"html_url": f"https://forgejo.example.local/openclaw/pipeline/issues/{number}",
"created_at": "2026-05-18T10:00:00Z",
"updated_at": "2026-05-19T12:30:00Z",
"closed_at": "2026-05-19T12:45:00Z" if action == "closed" else None,
},
}
@pytest.mark.asyncio
async def test_forgejo_webhook_rejects_invalid_signature_without_updating_issue() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app = _build_test_app(session_maker)
async with session_maker() as session:
repository, issue, _ = await _seed_repository(session)
assert issue is not None
raw_body = _encoded_payload(_issue_payload(action="closed"))
try:
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/forgejo/webhooks/{repository.id}",
content=raw_body,
headers={
"Content-Type": "application/json",
"X-Forgejo-Signature": "sha256=bad",
"X-Forgejo-Event": "issues",
},
)
assert response.status_code == 403
async with session_maker() as session:
stored_issue = (
await session.exec(select(ForgejoIssue).where(col(ForgejoIssue.id) == issue.id))
).first()
assert stored_issue is not None
assert stored_issue.state == "open"
assert stored_issue.title == "Old title"
events = (await session.exec(select(ActivityEvent))).all()
assert events == []
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_forgejo_webhook_closes_issue_and_records_board_activity() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app = _build_test_app(session_maker)
async with session_maker() as session:
repository, issue, board = await _seed_repository(session, with_board_link=True)
assert issue is not None
assert board is not None
raw_body = _encoded_payload(_issue_payload(action="closed"))
try:
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/forgejo/webhooks/{repository.id}",
content=raw_body,
headers={
"Content-Type": "application/json",
"X-Forgejo-Signature": _signature(raw_body),
"X-Gitea-Event": "issues",
},
)
assert response.status_code == 202
body = response.json()
assert body["ignored"] is False
assert body["action"] == "closed"
assert body["issue_number"] == 42
async with session_maker() as session:
stored_issue = (
await session.exec(select(ForgejoIssue).where(col(ForgejoIssue.id) == issue.id))
).first()
assert stored_issue is not None
assert stored_issue.state == "closed"
assert stored_issue.title == "Fix webhook cache updates"
assert stored_issue.body_preview == "Webhook body from Forgejo."
assert stored_issue.labels == [
{"name": "bug", "color": "f43f5e", "description": "Bug report"}
]
assert stored_issue.assignees == [
{"login": "codex", "id": 7, "avatar_url": "https://avatar.test/c"}
]
assert stored_issue.author == "kaspa"
assert stored_issue.forgejo_closed_at == datetime(2026, 5, 19, 12, 45, 0)
events = (await session.exec(select(ActivityEvent))).all()
assert len(events) == 1
assert events[0].event_type == "forgejo.issue.closed"
assert events[0].board_id == board.id
assert "openclaw/pipeline#42" in str(events[0].message)
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_forgejo_webhook_reopens_issue_and_clears_closed_timestamp() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app = _build_test_app(session_maker)
async with session_maker() as session:
repository, issue, _ = await _seed_repository(session)
assert issue is not None
issue.state = "closed"
issue.forgejo_closed_at = datetime(2026, 5, 19, 12, 45, 0)
session.add(issue)
await session.commit()
raw_body = _encoded_payload(_issue_payload(action="reopened"))
try:
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/forgejo/webhooks/{repository.id}",
content=raw_body,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": _signature(raw_body),
"X-Forgejo-Event": "issues",
},
)
assert response.status_code == 202
async with session_maker() as session:
stored_issue = (
await session.exec(select(ForgejoIssue).where(col(ForgejoIssue.id) == issue.id))
).first()
assert stored_issue is not None
assert stored_issue.state == "open"
assert stored_issue.forgejo_closed_at is None
events = (await session.exec(select(ActivityEvent))).all()
assert len(events) == 1
assert events[0].event_type == "forgejo.issue.reopened"
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_forgejo_webhook_ignores_pull_request_events() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app = _build_test_app(session_maker)
async with session_maker() as session:
repository, _, _ = await _seed_repository(session, with_issue=False)
payload = _issue_payload(action="opened", number=99)
assert isinstance(payload["issue"], dict)
payload["issue"]["pull_request"] = {"html_url": "https://forgejo.example.local/pr/99"}
raw_body = _encoded_payload(payload)
try:
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/forgejo/webhooks/{repository.id}",
content=raw_body,
headers={
"Content-Type": "application/json",
"X-Forgejo-Signature": _signature(raw_body),
"X-Forgejo-Event": "pull_request",
},
)
assert response.status_code == 202
body = response.json()
assert body["ignored"] is True
assert body["reason"] == "pull_request_ignored"
async with session_maker() as session:
issues = (
await session.exec(
select(ForgejoIssue).where(
ForgejoIssue.repository_id == repository.id,
ForgejoIssue.forgejo_issue_number == 99,
)
)
).all()
assert issues == []
events = (await session.exec(select(ActivityEvent))).all()
assert events == []
finally:
await engine.dispose()