# ruff: noqa: INP001 """Integration tests for Forgejo issue webhook ingestion.""" from __future__ import annotations import hashlib import hmac import json from datetime import datetime from uuid import uuid4 import pytest from fastapi import APIRouter, FastAPI from httpx import ASGITransport, AsyncClient from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine from sqlmodel import SQLModel, col, select from sqlmodel.ext.asyncio.session import AsyncSession from app import models as _models from app.api.forgejo_webhooks import router as forgejo_webhooks_router from app.db.session import get_session from app.models.activity_events import ActivityEvent from app.models.board_repository_links import BoardRepositoryLink from app.models.boards import Board from app.models.forgejo_connections import ForgejoConnection from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_repositories import ForgejoRepository from app.models.organizations import Organization async def _make_engine() -> AsyncEngine: engine = create_async_engine("sqlite+aiosqlite:///:memory:") async with engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) return engine def _build_test_app( session_maker: async_sessionmaker[AsyncSession], ) -> FastAPI: app = FastAPI() api_v1 = APIRouter(prefix="/api/v1") api_v1.include_router(forgejo_webhooks_router) app.include_router(api_v1) async def _override_get_session() -> AsyncSession: async with session_maker() as session: yield session app.dependency_overrides[get_session] = _override_get_session return app async def _seed_repository( session: AsyncSession, *, with_board_link: bool = False, with_issue: bool = True, ) -> tuple[ForgejoRepository, ForgejoIssue | None, Board | None]: organization_id = uuid4() connection_id = uuid4() repository_id = uuid4() session.add(Organization(id=organization_id, name=f"org-{organization_id}")) session.add( ForgejoConnection( id=connection_id, organization_id=organization_id, name="Forgejo", base_url="https://forgejo.example.local", ) ) repository = ForgejoRepository( id=repository_id, organization_id=organization_id, connection_id=connection_id, owner="openclaw", repo="pipeline", display_name="Pipeline", webhook_secret="shared-secret", ) session.add(repository) board: Board | None = None if with_board_link: board = Board( id=uuid4(), organization_id=organization_id, name="Pipeline Board", slug=f"pipeline-{repository_id}", description="Issue tracking board.", ) session.add(board) session.add( BoardRepositoryLink( id=uuid4(), organization_id=organization_id, board_id=board.id, repository_id=repository_id, ) ) issue: ForgejoIssue | None = None if with_issue: issue = ForgejoIssue( id=uuid4(), organization_id=organization_id, repository_id=repository_id, forgejo_issue_number=42, title="Old title", body_preview="Old body", state="open", is_pull_request=False, labels=[], assignees=[], author="alice", html_url="https://forgejo.example.local/openclaw/pipeline/issues/42", forgejo_created_at=datetime(2026, 5, 18, 8, 0, 0), forgejo_updated_at=datetime(2026, 5, 18, 9, 0, 0), ) session.add(issue) await session.commit() return repository, issue, board def _encoded_payload(payload: dict[str, object]) -> bytes: return json.dumps(payload, separators=(",", ":")).encode("utf-8") def _signature(raw_body: bytes, secret: str = "shared-secret") -> str: digest = hmac.new(secret.encode("utf-8"), raw_body, hashlib.sha256).hexdigest() return f"sha256={digest}" def _issue_payload(*, action: str, number: int = 42) -> dict[str, object]: return { "action": action, "repository": {"full_name": "openclaw/pipeline"}, "issue": { "number": number, "title": "Fix webhook cache updates", "body": "Webhook body from Forgejo.", "state": "closed" if action == "closed" else "open", "labels": [{"name": "bug", "color": "f43f5e", "description": "Bug report"}], "assignees": [{"login": "codex", "id": 7, "avatar_url": "https://avatar.test/c"}], "user": {"login": "kaspa"}, "html_url": f"https://forgejo.example.local/openclaw/pipeline/issues/{number}", "created_at": "2026-05-18T10:00:00Z", "updated_at": "2026-05-19T12:30:00Z", "closed_at": "2026-05-19T12:45:00Z" if action == "closed" else None, }, } @pytest.mark.asyncio async def test_forgejo_webhook_rejects_invalid_signature_without_updating_issue() -> None: engine = await _make_engine() session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) app = _build_test_app(session_maker) async with session_maker() as session: repository, issue, _ = await _seed_repository(session) assert issue is not None raw_body = _encoded_payload(_issue_payload(action="closed")) try: async with AsyncClient( transport=ASGITransport(app=app), base_url="http://testserver", ) as client: response = await client.post( f"/api/v1/forgejo/webhooks/{repository.id}", content=raw_body, headers={ "Content-Type": "application/json", "X-Forgejo-Signature": "sha256=bad", "X-Forgejo-Event": "issues", }, ) assert response.status_code == 403 async with session_maker() as session: stored_issue = ( await session.exec(select(ForgejoIssue).where(col(ForgejoIssue.id) == issue.id)) ).first() assert stored_issue is not None assert stored_issue.state == "open" assert stored_issue.title == "Old title" events = (await session.exec(select(ActivityEvent))).all() assert events == [] finally: await engine.dispose() @pytest.mark.asyncio async def test_forgejo_webhook_closes_issue_and_records_board_activity() -> None: engine = await _make_engine() session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) app = _build_test_app(session_maker) async with session_maker() as session: repository, issue, board = await _seed_repository(session, with_board_link=True) assert issue is not None assert board is not None raw_body = _encoded_payload(_issue_payload(action="closed")) try: async with AsyncClient( transport=ASGITransport(app=app), base_url="http://testserver", ) as client: response = await client.post( f"/api/v1/forgejo/webhooks/{repository.id}", content=raw_body, headers={ "Content-Type": "application/json", "X-Forgejo-Signature": _signature(raw_body), "X-Gitea-Event": "issues", }, ) assert response.status_code == 202 body = response.json() assert body["ignored"] is False assert body["action"] == "closed" assert body["issue_number"] == 42 async with session_maker() as session: stored_issue = ( await session.exec(select(ForgejoIssue).where(col(ForgejoIssue.id) == issue.id)) ).first() assert stored_issue is not None assert stored_issue.state == "closed" assert stored_issue.title == "Fix webhook cache updates" assert stored_issue.body_preview == "Webhook body from Forgejo." assert stored_issue.labels == [ {"name": "bug", "color": "f43f5e", "description": "Bug report"} ] assert stored_issue.assignees == [ {"login": "codex", "id": 7, "avatar_url": "https://avatar.test/c"} ] assert stored_issue.author == "kaspa" assert stored_issue.forgejo_closed_at == datetime(2026, 5, 19, 12, 45, 0) events = (await session.exec(select(ActivityEvent))).all() assert len(events) == 1 assert events[0].event_type == "forgejo.issue.closed" assert events[0].board_id == board.id assert "openclaw/pipeline#42" in str(events[0].message) finally: await engine.dispose() @pytest.mark.asyncio async def test_forgejo_webhook_reopens_issue_and_clears_closed_timestamp() -> None: engine = await _make_engine() session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) app = _build_test_app(session_maker) async with session_maker() as session: repository, issue, _ = await _seed_repository(session) assert issue is not None issue.state = "closed" issue.forgejo_closed_at = datetime(2026, 5, 19, 12, 45, 0) session.add(issue) await session.commit() raw_body = _encoded_payload(_issue_payload(action="reopened")) try: async with AsyncClient( transport=ASGITransport(app=app), base_url="http://testserver", ) as client: response = await client.post( f"/api/v1/forgejo/webhooks/{repository.id}", content=raw_body, headers={ "Content-Type": "application/json", "X-Hub-Signature-256": _signature(raw_body), "X-Forgejo-Event": "issues", }, ) assert response.status_code == 202 async with session_maker() as session: stored_issue = ( await session.exec(select(ForgejoIssue).where(col(ForgejoIssue.id) == issue.id)) ).first() assert stored_issue is not None assert stored_issue.state == "open" assert stored_issue.forgejo_closed_at is None events = (await session.exec(select(ActivityEvent))).all() assert len(events) == 1 assert events[0].event_type == "forgejo.issue.reopened" finally: await engine.dispose() @pytest.mark.asyncio async def test_forgejo_webhook_ignores_pull_request_events() -> None: engine = await _make_engine() session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) app = _build_test_app(session_maker) async with session_maker() as session: repository, _, _ = await _seed_repository(session, with_issue=False) payload = _issue_payload(action="opened", number=99) assert isinstance(payload["issue"], dict) payload["issue"]["pull_request"] = {"html_url": "https://forgejo.example.local/pr/99"} raw_body = _encoded_payload(payload) try: async with AsyncClient( transport=ASGITransport(app=app), base_url="http://testserver", ) as client: response = await client.post( f"/api/v1/forgejo/webhooks/{repository.id}", content=raw_body, headers={ "Content-Type": "application/json", "X-Forgejo-Signature": _signature(raw_body), "X-Forgejo-Event": "pull_request", }, ) assert response.status_code == 202 body = response.json() assert body["ignored"] is True assert body["reason"] == "pull_request_ignored" async with session_maker() as session: issues = ( await session.exec( select(ForgejoIssue).where( ForgejoIssue.repository_id == repository.id, ForgejoIssue.forgejo_issue_number == 99, ) ) ).all() assert issues == [] events = (await session.exec(select(ActivityEvent))).all() assert events == [] finally: await engine.dispose()