diff --git a/backend/app/api/forgejo_webhooks.py b/backend/app/api/forgejo_webhooks.py index d9f1556..1f5f3e2 100644 --- a/backend/app/api/forgejo_webhooks.py +++ b/backend/app/api/forgejo_webhooks.py @@ -17,7 +17,6 @@ from app.core.logging import get_logger from app.core.time import utcnow from app.db import crud from app.db.session import get_session -from app.models.activity_events import ActivityEvent from app.models.board_repository_links import BoardRepositoryLink from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_repositories import ForgejoRepository @@ -58,7 +57,11 @@ def _verify_signature( detail="Forgejo webhook secret is not configured.", ) signature = next( - (request.headers.get(header) for header in SIGNATURE_HEADERS if request.headers.get(header)), + ( + request.headers.get(header) + for header in SIGNATURE_HEADERS + if request.headers.get(header) + ), None, ) if not signature: @@ -159,11 +162,23 @@ def _issue_number(issue_data: dict[str, Any]) -> int: ) -def _is_pull_request_payload(payload: dict[str, Any], issue_data: dict[str, Any]) -> bool: +def _is_pull_request_payload( + *, + payload: dict[str, Any], + issue_data: dict[str, Any], + request: Request, +) -> bool: + webhook_event = ( + request.headers.get("x-forgejo-event") + or request.headers.get("x-gitea-event") + or request.headers.get("x-github-event") + or "" + ) return bool( payload.get("pull_request") or issue_data.get("pull_request") or payload.get("event_type") == "pull_request" + or webhook_event.lower() == "pull_request" ) @@ -321,11 +336,9 @@ def _activity_message( action: str, repository: ForgejoRepository, issue: ForgejoIssue, - created: bool, ) -> str: - action_label = "created" if created else action return ( - f"Forgejo issue {action_label}: " + f"Forgejo issue {action}: " f"{repository.owner}/{repository.repo}#{issue.forgejo_issue_number} - {issue.title}" ) @@ -336,7 +349,6 @@ async def _record_issue_activity( action: str, repository: ForgejoRepository, issue: ForgejoIssue, - created: bool, ) -> None: if action not in {"closed", "reopened"}: return @@ -349,7 +361,6 @@ async def _record_issue_activity( action=action, repository=repository, issue=issue, - created=created, ), ) return @@ -361,7 +372,6 @@ async def _record_issue_activity( action=action, repository=repository, issue=issue, - created=created, ), board_id=board_id, ) @@ -416,7 +426,7 @@ async def ingest_forgejo_webhook( status_code=status.HTTP_400_BAD_REQUEST, detail="Webhook payload is missing issue data.", ) - if _is_pull_request_payload(payload, issue_data): + if _is_pull_request_payload(payload=payload, issue_data=issue_data, request=request): return ForgejoWebhookIngestResponse( repository_id=repository.id, action=action, @@ -435,7 +445,6 @@ async def ingest_forgejo_webhook( action=action, repository=repository, issue=issue, - created=created, ) await session.commit() logger.info( @@ -445,7 +454,7 @@ async def ingest_forgejo_webhook( "issue_id": str(issue.id), "issue_number": issue.forgejo_issue_number, "action": action, - "created": created, + "issue_created": created, }, ) return ForgejoWebhookIngestResponse( @@ -455,4 +464,3 @@ async def ingest_forgejo_webhook( issue_number=issue.forgejo_issue_number, ignored=False, ) - diff --git a/backend/app/main.py b/backend/app/main.py index 2bf33e6..f8cd2fd 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -12,6 +12,7 @@ from fastapi_pagination import add_pagination from app.api.activity import router as activity_router from app.api.agent import router as agent_router +from app.api.agent_forgejo import router as agent_forgejo_router from app.api.agents import router as agents_router from app.api.approvals import router as approvals_router from app.api.auth import router as auth_router @@ -19,14 +20,14 @@ from app.api.board_group_memory import router as board_group_memory_router from app.api.board_groups import router as board_groups_router from app.api.board_memory import router as board_memory_router from app.api.board_onboarding import router as board_onboarding_router +from app.api.board_repository_links import router as board_repository_links_router from app.api.board_webhooks import router as board_webhooks_router from app.api.boards import router as boards_router from app.api.forgejo_connections import router as forgejo_connections_router from app.api.forgejo_issues import router as forgejo_issues_router from app.api.forgejo_metrics import router as forgejo_metrics_router from app.api.forgejo_repositories import router as forgejo_repositories_router -from app.api.board_repository_links import router as board_repository_links_router -from app.api.agent_forgejo import router as agent_forgejo_router +from app.api.forgejo_webhooks import router as forgejo_webhooks_router from app.api.gateway import router as gateway_router from app.api.gateways import router as gateways_router from app.api.metrics import router as metrics_router @@ -88,6 +89,10 @@ OPENAPI_TAGS = [ "name": "forgejo-repositories", "description": "Forgejo repository tracking and sync management endpoints.", }, + { + "name": "forgejo-webhooks", + "description": "Forgejo webhook receiver endpoints for cached issue updates.", + }, { "name": "metrics", "description": "Aggregated operational and board analytics metrics endpoints.", @@ -192,6 +197,7 @@ _OPENAPI_EXAMPLE_TAGS = { "boards", "board-memory", "board-webhooks", + "forgejo-webhooks", "board-onboarding", "approvals", "tasks", @@ -563,6 +569,7 @@ api_v1.include_router(forgejo_connections_router) api_v1.include_router(forgejo_issues_router) api_v1.include_router(forgejo_metrics_router) api_v1.include_router(forgejo_repositories_router) +api_v1.include_router(forgejo_webhooks_router) api_v1.include_router(board_repository_links_router) api_v1.include_router(agent_forgejo_router) api_v1.include_router(gateway_router) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 0c16d8c..36f8201 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -13,6 +13,7 @@ from app.models.board_webhook_payloads import BoardWebhookPayload from app.models.board_webhooks import BoardWebhook from app.models.boards import Board from app.models.forgejo_connections import ForgejoConnection +from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_repositories import ForgejoRepository from app.models.gateways import Gateway from app.models.organization_board_access import OrganizationBoardAccess @@ -47,6 +48,7 @@ __all__ = [ "Board", "BoardRepositoryLink", "ForgejoConnection", + "ForgejoIssue", "ForgejoRepository", "Gateway", "GatewayInstalledSkill", diff --git a/backend/app/models/forgejo_issues.py b/backend/app/models/forgejo_issues.py index 16079b8..f4325de 100644 --- a/backend/app/models/forgejo_issues.py +++ b/backend/app/models/forgejo_issues.py @@ -5,7 +5,7 @@ from __future__ import annotations from datetime import datetime from uuid import UUID, uuid4 -from sqlalchemy import Column, JSON +from sqlalchemy import JSON, Column from sqlmodel import Field, Index, SQLModel from app.core.time import utcnow @@ -29,7 +29,7 @@ class ForgejoIssue(SQLModel, table=True): is_pull_request: bool = Field(default=False) # JSON fields for complex data - labels: dict[str, object] = Field(default_factory=dict, sa_column=Column(JSON)) + labels: list[dict[str, object]] = Field(default_factory=list, sa_column=Column(JSON)) assignees: list[dict[str, object]] = Field(default_factory=list, sa_column=Column(JSON)) author: str @@ -44,5 +44,7 @@ class ForgejoIssue(SQLModel, table=True): updated_at: datetime = Field(default_factory=utcnow) __table_args__ = ( - Index("ix_forgejo_issues_repo_number", "repository_id", "forgejo_issue_number", unique=True), + Index( + "ix_forgejo_issues_repo_number", "repository_id", "forgejo_issue_number", unique=True + ), ) diff --git a/backend/migrations/versions/b6c7d8e9f0a1_add_forgejo_repository_webhook_secret.py b/backend/migrations/versions/b6c7d8e9f0a1_add_forgejo_repository_webhook_secret.py new file mode 100644 index 0000000..d055ca4 --- /dev/null +++ b/backend/migrations/versions/b6c7d8e9f0a1_add_forgejo_repository_webhook_secret.py @@ -0,0 +1,41 @@ +"""Add webhook secret to Forgejo repositories. + +Revision ID: b6c7d8e9f0a1 +Revises: a1b2c3d4e5f7 +Create Date: 2026-05-19 12:00:00.000000 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "b6c7d8e9f0a1" +down_revision = "a1b2c3d4e5f7" +branch_labels = None +depends_on = None + + +def _has_column(table_name: str, column_name: str) -> bool: + bind = op.get_bind() + inspector = sa.inspect(bind) + if not inspector.has_table(table_name): + return False + return any(column["name"] == column_name for column in inspector.get_columns(table_name)) + + +def upgrade() -> None: + """Store the shared secret used by Forgejo webhooks.""" + if not _has_column("forgejo_repositories", "webhook_secret"): + op.add_column( + "forgejo_repositories", + sa.Column("webhook_secret", sa.String(), nullable=True), + ) + + +def downgrade() -> None: + """Remove the Forgejo webhook shared secret column.""" + if _has_column("forgejo_repositories", "webhook_secret"): + op.drop_column("forgejo_repositories", "webhook_secret") diff --git a/backend/tests/test_forgejo_webhooks_api.py b/backend/tests/test_forgejo_webhooks_api.py new file mode 100644 index 0000000..1a36650 --- /dev/null +++ b/backend/tests/test_forgejo_webhooks_api.py @@ -0,0 +1,353 @@ +# ruff: noqa: INP001 +"""Integration tests for Forgejo issue webhook ingestion.""" + +from __future__ import annotations + +import hashlib +import hmac +import json +from datetime import datetime +from uuid import uuid4 + +import pytest +from fastapi import APIRouter, FastAPI +from httpx import ASGITransport, AsyncClient +from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine +from sqlmodel import SQLModel, col, select +from sqlmodel.ext.asyncio.session import AsyncSession + +from app import models as _models +from app.api.forgejo_webhooks import router as forgejo_webhooks_router +from app.db.session import get_session +from app.models.activity_events import ActivityEvent +from app.models.board_repository_links import BoardRepositoryLink +from app.models.boards import Board +from app.models.forgejo_connections import ForgejoConnection +from app.models.forgejo_issues import ForgejoIssue +from app.models.forgejo_repositories import ForgejoRepository +from app.models.organizations import Organization + + +async def _make_engine() -> AsyncEngine: + engine = create_async_engine("sqlite+aiosqlite:///:memory:") + async with engine.begin() as conn: + await conn.run_sync(SQLModel.metadata.create_all) + return engine + + +def _build_test_app( + session_maker: async_sessionmaker[AsyncSession], +) -> FastAPI: + app = FastAPI() + api_v1 = APIRouter(prefix="/api/v1") + api_v1.include_router(forgejo_webhooks_router) + app.include_router(api_v1) + + async def _override_get_session() -> AsyncSession: + async with session_maker() as session: + yield session + + app.dependency_overrides[get_session] = _override_get_session + return app + + +async def _seed_repository( + session: AsyncSession, + *, + with_board_link: bool = False, + with_issue: bool = True, +) -> tuple[ForgejoRepository, ForgejoIssue | None, Board | None]: + organization_id = uuid4() + connection_id = uuid4() + repository_id = uuid4() + + session.add(Organization(id=organization_id, name=f"org-{organization_id}")) + session.add( + ForgejoConnection( + id=connection_id, + organization_id=organization_id, + name="Forgejo", + base_url="https://forgejo.example.local", + ) + ) + repository = ForgejoRepository( + id=repository_id, + organization_id=organization_id, + connection_id=connection_id, + owner="openclaw", + repo="pipeline", + display_name="Pipeline", + webhook_secret="shared-secret", + ) + session.add(repository) + + board: Board | None = None + if with_board_link: + board = Board( + id=uuid4(), + organization_id=organization_id, + name="Pipeline Board", + slug=f"pipeline-{repository_id}", + description="Issue tracking board.", + ) + session.add(board) + session.add( + BoardRepositoryLink( + id=uuid4(), + organization_id=organization_id, + board_id=board.id, + repository_id=repository_id, + ) + ) + + issue: ForgejoIssue | None = None + if with_issue: + issue = ForgejoIssue( + id=uuid4(), + organization_id=organization_id, + repository_id=repository_id, + forgejo_issue_number=42, + title="Old title", + body_preview="Old body", + state="open", + is_pull_request=False, + labels=[], + assignees=[], + author="alice", + html_url="https://forgejo.example.local/openclaw/pipeline/issues/42", + forgejo_created_at=datetime(2026, 5, 18, 8, 0, 0), + forgejo_updated_at=datetime(2026, 5, 18, 9, 0, 0), + ) + session.add(issue) + + await session.commit() + return repository, issue, board + + +def _encoded_payload(payload: dict[str, object]) -> bytes: + return json.dumps(payload, separators=(",", ":")).encode("utf-8") + + +def _signature(raw_body: bytes, secret: str = "shared-secret") -> str: + digest = hmac.new(secret.encode("utf-8"), raw_body, hashlib.sha256).hexdigest() + return f"sha256={digest}" + + +def _issue_payload(*, action: str, number: int = 42) -> dict[str, object]: + return { + "action": action, + "repository": {"full_name": "openclaw/pipeline"}, + "issue": { + "number": number, + "title": "Fix webhook cache updates", + "body": "Webhook body from Forgejo.", + "state": "closed" if action == "closed" else "open", + "labels": [{"name": "bug", "color": "f43f5e", "description": "Bug report"}], + "assignees": [{"login": "codex", "id": 7, "avatar_url": "https://avatar.test/c"}], + "user": {"login": "kaspa"}, + "html_url": f"https://forgejo.example.local/openclaw/pipeline/issues/{number}", + "created_at": "2026-05-18T10:00:00Z", + "updated_at": "2026-05-19T12:30:00Z", + "closed_at": "2026-05-19T12:45:00Z" if action == "closed" else None, + }, + } + + +@pytest.mark.asyncio +async def test_forgejo_webhook_rejects_invalid_signature_without_updating_issue() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + app = _build_test_app(session_maker) + + async with session_maker() as session: + repository, issue, _ = await _seed_repository(session) + assert issue is not None + + raw_body = _encoded_payload(_issue_payload(action="closed")) + + try: + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://testserver", + ) as client: + response = await client.post( + f"/api/v1/forgejo/webhooks/{repository.id}", + content=raw_body, + headers={ + "Content-Type": "application/json", + "X-Forgejo-Signature": "sha256=bad", + "X-Forgejo-Event": "issues", + }, + ) + + assert response.status_code == 403 + + async with session_maker() as session: + stored_issue = ( + await session.exec(select(ForgejoIssue).where(col(ForgejoIssue.id) == issue.id)) + ).first() + assert stored_issue is not None + assert stored_issue.state == "open" + assert stored_issue.title == "Old title" + events = (await session.exec(select(ActivityEvent))).all() + assert events == [] + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_forgejo_webhook_closes_issue_and_records_board_activity() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + app = _build_test_app(session_maker) + + async with session_maker() as session: + repository, issue, board = await _seed_repository(session, with_board_link=True) + assert issue is not None + assert board is not None + + raw_body = _encoded_payload(_issue_payload(action="closed")) + + try: + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://testserver", + ) as client: + response = await client.post( + f"/api/v1/forgejo/webhooks/{repository.id}", + content=raw_body, + headers={ + "Content-Type": "application/json", + "X-Forgejo-Signature": _signature(raw_body), + "X-Gitea-Event": "issues", + }, + ) + + assert response.status_code == 202 + body = response.json() + assert body["ignored"] is False + assert body["action"] == "closed" + assert body["issue_number"] == 42 + + async with session_maker() as session: + stored_issue = ( + await session.exec(select(ForgejoIssue).where(col(ForgejoIssue.id) == issue.id)) + ).first() + assert stored_issue is not None + assert stored_issue.state == "closed" + assert stored_issue.title == "Fix webhook cache updates" + assert stored_issue.body_preview == "Webhook body from Forgejo." + assert stored_issue.labels == [ + {"name": "bug", "color": "f43f5e", "description": "Bug report"} + ] + assert stored_issue.assignees == [ + {"login": "codex", "id": 7, "avatar_url": "https://avatar.test/c"} + ] + assert stored_issue.author == "kaspa" + assert stored_issue.forgejo_closed_at == datetime(2026, 5, 19, 12, 45, 0) + + events = (await session.exec(select(ActivityEvent))).all() + assert len(events) == 1 + assert events[0].event_type == "forgejo.issue.closed" + assert events[0].board_id == board.id + assert "openclaw/pipeline#42" in str(events[0].message) + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_forgejo_webhook_reopens_issue_and_clears_closed_timestamp() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + app = _build_test_app(session_maker) + + async with session_maker() as session: + repository, issue, _ = await _seed_repository(session) + assert issue is not None + issue.state = "closed" + issue.forgejo_closed_at = datetime(2026, 5, 19, 12, 45, 0) + session.add(issue) + await session.commit() + + raw_body = _encoded_payload(_issue_payload(action="reopened")) + + try: + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://testserver", + ) as client: + response = await client.post( + f"/api/v1/forgejo/webhooks/{repository.id}", + content=raw_body, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature-256": _signature(raw_body), + "X-Forgejo-Event": "issues", + }, + ) + + assert response.status_code == 202 + + async with session_maker() as session: + stored_issue = ( + await session.exec(select(ForgejoIssue).where(col(ForgejoIssue.id) == issue.id)) + ).first() + assert stored_issue is not None + assert stored_issue.state == "open" + assert stored_issue.forgejo_closed_at is None + + events = (await session.exec(select(ActivityEvent))).all() + assert len(events) == 1 + assert events[0].event_type == "forgejo.issue.reopened" + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_forgejo_webhook_ignores_pull_request_events() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + app = _build_test_app(session_maker) + + async with session_maker() as session: + repository, _, _ = await _seed_repository(session, with_issue=False) + + payload = _issue_payload(action="opened", number=99) + assert isinstance(payload["issue"], dict) + payload["issue"]["pull_request"] = {"html_url": "https://forgejo.example.local/pr/99"} + raw_body = _encoded_payload(payload) + + try: + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://testserver", + ) as client: + response = await client.post( + f"/api/v1/forgejo/webhooks/{repository.id}", + content=raw_body, + headers={ + "Content-Type": "application/json", + "X-Forgejo-Signature": _signature(raw_body), + "X-Forgejo-Event": "pull_request", + }, + ) + + assert response.status_code == 202 + body = response.json() + assert body["ignored"] is True + assert body["reason"] == "pull_request_ignored" + + async with session_maker() as session: + issues = ( + await session.exec( + select(ForgejoIssue).where( + ForgejoIssue.repository_id == repository.id, + ForgejoIssue.forgejo_issue_number == 99, + ) + ) + ).all() + assert issues == [] + events = (await session.exec(select(ActivityEvent))).all() + assert events == [] + finally: + await engine.dispose()