Pipeline/backend/tests/test_forgejo_webhooks_api.py

354 lines
12 KiB
Python
Raw Normal View History

2026-05-19 04:24:24 -05:00
# ruff: noqa: INP001
"""Integration tests for Forgejo issue webhook ingestion."""
from __future__ import annotations
import hashlib
import hmac
import json
from datetime import datetime
from uuid import uuid4
import pytest
from fastapi import APIRouter, FastAPI
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlmodel import SQLModel, col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app import models as _models
from app.api.forgejo_webhooks import router as forgejo_webhooks_router
from app.db.session import get_session
from app.models.activity_events import ActivityEvent
from app.models.board_repository_links import BoardRepositoryLink
from app.models.boards import Board
from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
from app.models.organizations import Organization
async def _make_engine() -> AsyncEngine:
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
return engine
def _build_test_app(
session_maker: async_sessionmaker[AsyncSession],
) -> FastAPI:
app = FastAPI()
api_v1 = APIRouter(prefix="/api/v1")
api_v1.include_router(forgejo_webhooks_router)
app.include_router(api_v1)
async def _override_get_session() -> AsyncSession:
async with session_maker() as session:
yield session
app.dependency_overrides[get_session] = _override_get_session
return app
async def _seed_repository(
session: AsyncSession,
*,
with_board_link: bool = False,
with_issue: bool = True,
) -> tuple[ForgejoRepository, ForgejoIssue | None, Board | None]:
organization_id = uuid4()
connection_id = uuid4()
repository_id = uuid4()
session.add(Organization(id=organization_id, name=f"org-{organization_id}"))
session.add(
ForgejoConnection(
id=connection_id,
organization_id=organization_id,
name="Forgejo",
base_url="https://forgejo.example.local",
)
)
repository = ForgejoRepository(
id=repository_id,
organization_id=organization_id,
connection_id=connection_id,
owner="openclaw",
repo="pipeline",
display_name="Pipeline",
webhook_secret="shared-secret",
)
session.add(repository)
board: Board | None = None
if with_board_link:
board = Board(
id=uuid4(),
organization_id=organization_id,
name="Pipeline Board",
slug=f"pipeline-{repository_id}",
description="Issue tracking board.",
)
session.add(board)
session.add(
BoardRepositoryLink(
id=uuid4(),
organization_id=organization_id,
board_id=board.id,
repository_id=repository_id,
)
)
issue: ForgejoIssue | None = None
if with_issue:
issue = ForgejoIssue(
id=uuid4(),
organization_id=organization_id,
repository_id=repository_id,
forgejo_issue_number=42,
title="Old title",
body_preview="Old body",
state="open",
is_pull_request=False,
labels=[],
assignees=[],
author="alice",
html_url="https://forgejo.example.local/openclaw/pipeline/issues/42",
forgejo_created_at=datetime(2026, 5, 18, 8, 0, 0),
forgejo_updated_at=datetime(2026, 5, 18, 9, 0, 0),
)
session.add(issue)
await session.commit()
return repository, issue, board
def _encoded_payload(payload: dict[str, object]) -> bytes:
return json.dumps(payload, separators=(",", ":")).encode("utf-8")
def _signature(raw_body: bytes, secret: str = "shared-secret") -> str:
digest = hmac.new(secret.encode("utf-8"), raw_body, hashlib.sha256).hexdigest()
return f"sha256={digest}"
def _issue_payload(*, action: str, number: int = 42) -> dict[str, object]:
return {
"action": action,
"repository": {"full_name": "openclaw/pipeline"},
"issue": {
"number": number,
"title": "Fix webhook cache updates",
"body": "Webhook body from Forgejo.",
"state": "closed" if action == "closed" else "open",
"labels": [{"name": "bug", "color": "f43f5e", "description": "Bug report"}],
"assignees": [{"login": "codex", "id": 7, "avatar_url": "https://avatar.test/c"}],
"user": {"login": "kaspa"},
"html_url": f"https://forgejo.example.local/openclaw/pipeline/issues/{number}",
"created_at": "2026-05-18T10:00:00Z",
"updated_at": "2026-05-19T12:30:00Z",
"closed_at": "2026-05-19T12:45:00Z" if action == "closed" else None,
},
}
@pytest.mark.asyncio
async def test_forgejo_webhook_rejects_invalid_signature_without_updating_issue() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app = _build_test_app(session_maker)
async with session_maker() as session:
repository, issue, _ = await _seed_repository(session)
assert issue is not None
raw_body = _encoded_payload(_issue_payload(action="closed"))
try:
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/forgejo/webhooks/{repository.id}",
content=raw_body,
headers={
"Content-Type": "application/json",
"X-Forgejo-Signature": "sha256=bad",
"X-Forgejo-Event": "issues",
},
)
assert response.status_code == 403
async with session_maker() as session:
stored_issue = (
await session.exec(select(ForgejoIssue).where(col(ForgejoIssue.id) == issue.id))
).first()
assert stored_issue is not None
assert stored_issue.state == "open"
assert stored_issue.title == "Old title"
events = (await session.exec(select(ActivityEvent))).all()
assert events == []
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_forgejo_webhook_closes_issue_and_records_board_activity() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app = _build_test_app(session_maker)
async with session_maker() as session:
repository, issue, board = await _seed_repository(session, with_board_link=True)
assert issue is not None
assert board is not None
raw_body = _encoded_payload(_issue_payload(action="closed"))
try:
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/forgejo/webhooks/{repository.id}",
content=raw_body,
headers={
"Content-Type": "application/json",
"X-Forgejo-Signature": _signature(raw_body),
"X-Gitea-Event": "issues",
},
)
assert response.status_code == 202
body = response.json()
assert body["ignored"] is False
assert body["action"] == "closed"
assert body["issue_number"] == 42
async with session_maker() as session:
stored_issue = (
await session.exec(select(ForgejoIssue).where(col(ForgejoIssue.id) == issue.id))
).first()
assert stored_issue is not None
assert stored_issue.state == "closed"
assert stored_issue.title == "Fix webhook cache updates"
assert stored_issue.body_preview == "Webhook body from Forgejo."
assert stored_issue.labels == [
{"name": "bug", "color": "f43f5e", "description": "Bug report"}
]
assert stored_issue.assignees == [
{"login": "codex", "id": 7, "avatar_url": "https://avatar.test/c"}
]
assert stored_issue.author == "kaspa"
assert stored_issue.forgejo_closed_at == datetime(2026, 5, 19, 12, 45, 0)
events = (await session.exec(select(ActivityEvent))).all()
assert len(events) == 1
assert events[0].event_type == "forgejo.issue.closed"
assert events[0].board_id == board.id
assert "openclaw/pipeline#42" in str(events[0].message)
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_forgejo_webhook_reopens_issue_and_clears_closed_timestamp() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app = _build_test_app(session_maker)
async with session_maker() as session:
repository, issue, _ = await _seed_repository(session)
assert issue is not None
issue.state = "closed"
issue.forgejo_closed_at = datetime(2026, 5, 19, 12, 45, 0)
session.add(issue)
await session.commit()
raw_body = _encoded_payload(_issue_payload(action="reopened"))
try:
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/forgejo/webhooks/{repository.id}",
content=raw_body,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": _signature(raw_body),
"X-Forgejo-Event": "issues",
},
)
assert response.status_code == 202
async with session_maker() as session:
stored_issue = (
await session.exec(select(ForgejoIssue).where(col(ForgejoIssue.id) == issue.id))
).first()
assert stored_issue is not None
assert stored_issue.state == "open"
assert stored_issue.forgejo_closed_at is None
events = (await session.exec(select(ActivityEvent))).all()
assert len(events) == 1
assert events[0].event_type == "forgejo.issue.reopened"
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_forgejo_webhook_ignores_pull_request_events() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app = _build_test_app(session_maker)
async with session_maker() as session:
repository, _, _ = await _seed_repository(session, with_issue=False)
payload = _issue_payload(action="opened", number=99)
assert isinstance(payload["issue"], dict)
payload["issue"]["pull_request"] = {"html_url": "https://forgejo.example.local/pr/99"}
raw_body = _encoded_payload(payload)
try:
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/forgejo/webhooks/{repository.id}",
content=raw_body,
headers={
"Content-Type": "application/json",
"X-Forgejo-Signature": _signature(raw_body),
"X-Forgejo-Event": "pull_request",
},
)
assert response.status_code == 202
body = response.json()
assert body["ignored"] is True
assert body["reason"] == "pull_request_ignored"
async with session_maker() as session:
issues = (
await session.exec(
select(ForgejoIssue).where(
ForgejoIssue.repository_id == repository.id,
ForgejoIssue.forgejo_issue_number == 99,
)
)
).all()
assert issues == []
events = (await session.exec(select(ActivityEvent))).all()
assert events == []
finally:
await engine.dispose()