Pipeline/backend/tests/test_forgejo_issue_close_ap...

418 lines
14 KiB
Python
Raw Normal View History

2026-05-20 03:49:57 -05:00
# ruff: noqa: INP001
"""Integration tests for Forgejo issue close endpoints."""
from __future__ import annotations
from datetime import datetime
from types import SimpleNamespace
from uuid import uuid4
import pytest
from fastapi import APIRouter, FastAPI
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlmodel import SQLModel, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app import models as _models
from app.api import deps as deps_api
from app.api.agent_forgejo import router as agent_forgejo_router
from app.api.deps import require_org_member
2026-05-21 22:47:24 -05:00
from app.api.forgejo_issues import router as forgejo_issues_router
2026-05-20 03:49:57 -05:00
from app.core.agent_auth import AgentAuthContext, get_agent_auth_context
from app.core.auth import AuthContext, get_auth_context
from app.db.session import get_session
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.board_repository_links import BoardRepositoryLink
from app.models.boards import Board
from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
from app.models.gateways import Gateway
from app.models.organization_board_access import OrganizationBoardAccess
from app.models.organization_members import OrganizationMember
from app.models.organizations import Organization
from app.models.users import User
from app.services import forgejo_issue_close
from app.services.organizations import OrganizationContext
async def _make_engine() -> AsyncEngine:
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
return engine
def _build_test_app(
session_maker: async_sessionmaker[AsyncSession],
*,
org_ctx: OrganizationContext | None = None,
auth_ctx: AuthContext | None = None,
agent_ctx: AgentAuthContext | None = None,
board: Board | None = None,
) -> FastAPI:
app = FastAPI()
api_v1 = APIRouter(prefix="/api/v1")
api_v1.include_router(forgejo_issues_router)
api_v1.include_router(agent_forgejo_router)
app.include_router(api_v1)
async def _override_get_session() -> AsyncSession:
async with session_maker() as session:
yield session
app.dependency_overrides[get_session] = _override_get_session
if org_ctx is not None:
2026-05-21 22:47:24 -05:00
2026-05-20 03:49:57 -05:00
async def _override_require_org_member() -> OrganizationContext:
return org_ctx
app.dependency_overrides[require_org_member] = _override_require_org_member
if auth_ctx is not None:
2026-05-21 22:47:24 -05:00
2026-05-20 03:49:57 -05:00
async def _override_get_auth_context() -> AuthContext:
return auth_ctx
app.dependency_overrides[get_auth_context] = _override_get_auth_context
if agent_ctx is not None:
2026-05-21 22:47:24 -05:00
2026-05-20 03:49:57 -05:00
async def _override_get_agent_auth_context() -> AgentAuthContext:
return agent_ctx
app.dependency_overrides[get_agent_auth_context] = _override_get_agent_auth_context
if board is not None:
2026-05-21 22:47:24 -05:00
2026-05-20 03:49:57 -05:00
async def _override_get_board_for_actor_read() -> Board:
return board
2026-05-21 22:47:24 -05:00
app.dependency_overrides[deps_api.get_board_for_actor_read] = (
_override_get_board_for_actor_read
)
2026-05-20 03:49:57 -05:00
return app
async def _seed(session: AsyncSession) -> SimpleNamespace:
org = Organization(id=uuid4(), name="Pipeline Org")
user = User(
id=uuid4(),
clerk_user_id="user_123",
email="user@example.com",
name="User",
active_organization_id=org.id,
)
member = OrganizationMember(
id=uuid4(),
organization_id=org.id,
user_id=user.id,
role="member",
all_boards_read=False,
all_boards_write=False,
)
connection = ForgejoConnection(
id=uuid4(),
organization_id=org.id,
name="Dream Forgejo",
base_url="https://forgejo.example.local",
token="temp-token",
token_last_eight="p-token",
)
repository = ForgejoRepository(
id=uuid4(),
organization_id=org.id,
connection_id=connection.id,
owner="openclaw",
repo="pipeline",
display_name="Pipeline",
)
board = Board(
id=uuid4(),
organization_id=org.id,
name="Pipeline Board",
slug="pipeline-board",
)
board_access = OrganizationBoardAccess(
id=uuid4(),
organization_member_id=member.id,
board_id=board.id,
can_read=True,
can_write=True,
)
link = BoardRepositoryLink(
id=uuid4(),
organization_id=org.id,
board_id=board.id,
repository_id=repository.id,
)
issue = ForgejoIssue(
id=uuid4(),
organization_id=org.id,
repository_id=repository.id,
forgejo_issue_number=42,
title="Cached issue",
body_preview="Cached body",
state="open",
is_pull_request=False,
labels=[],
assignees=[],
2026-05-21 22:47:24 -05:00
forgejo_payload={"number": 42, "state": "open"},
2026-05-20 03:49:57 -05:00
author="kaspa",
html_url="https://forgejo.example.local/openclaw/pipeline/issues/42",
forgejo_created_at=datetime(2026, 5, 19, 12, 0, 0),
forgejo_updated_at=datetime(2026, 5, 19, 12, 5, 0),
)
gateway = Gateway(
id=uuid4(),
organization_id=org.id,
name="Gateway",
url="https://gateway.example.local",
workspace_root="/workspace",
)
lead_agent = Agent(
id=uuid4(),
board_id=board.id,
gateway_id=gateway.id,
name="Lead Agent",
is_board_lead=True,
)
worker_agent = Agent(
id=uuid4(),
board_id=board.id,
gateway_id=gateway.id,
name="Worker Agent",
is_board_lead=False,
)
session.add(org)
session.add(user)
session.add(member)
session.add(connection)
session.add(repository)
session.add(board)
session.add(board_access)
session.add(link)
session.add(issue)
session.add(gateway)
session.add(lead_agent)
session.add(worker_agent)
await session.commit()
return SimpleNamespace(
org=org,
user=user,
member=member,
board=board,
issue=issue,
lead_agent=lead_agent,
worker_agent=worker_agent,
)
class _StubForgejoClient:
def __init__(self, *, should_fail: bool) -> None:
self._should_fail = should_fail
async def __aenter__(self) -> _StubForgejoClient:
return self
async def __aexit__(self, *_args: object) -> None:
return None
async def close_issue(self, **_kwargs: object) -> dict[str, object]:
if self._should_fail:
raise RuntimeError("forgejo down")
return {"state": "closed"}
def _patch_forgejo_client(
monkeypatch: pytest.MonkeyPatch,
*,
should_fail: bool,
) -> None:
def _factory(_connection: object) -> _StubForgejoClient:
return _StubForgejoClient(should_fail=should_fail)
monkeypatch.setattr(forgejo_issue_close, "get_forgejo_client", _factory)
@pytest.mark.asyncio
async def test_human_writer_can_close_linked_issue_and_records_activity(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with session_maker() as session:
seeded = await _seed(session)
_patch_forgejo_client(monkeypatch, should_fail=False)
org_ctx = OrganizationContext(organization=seeded.org, member=seeded.member)
auth_ctx = AuthContext(actor_type="user", user=seeded.user)
app = _build_test_app(session_maker, org_ctx=org_ctx, auth_ctx=auth_ctx)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(f"/api/v1/forgejo/issues/{seeded.issue.id}/close")
assert response.status_code == 200
payload = response.json()
assert payload["success"] is True
assert payload["state"] == "closed"
assert payload["forgejo_issue_number"] == 42
async with session_maker() as session:
stored_issue = await session.get(ForgejoIssue, seeded.issue.id)
assert stored_issue is not None
assert stored_issue.state == "closed"
assert stored_issue.forgejo_closed_at is not None
assert stored_issue.last_synced_at is not None
2026-05-21 22:47:24 -05:00
assert stored_issue.forgejo_payload is not None
assert stored_issue.forgejo_payload.get("state") == "closed"
assert stored_issue.forgejo_payload.get("closed_at")
2026-05-20 03:49:57 -05:00
events = (await session.exec(select(ActivityEvent))).all()
assert len(events) == 1
event = events[0]
assert event.event_type == "forgejo.issue.closed"
assert event.board_id == seeded.board.id
assert event.agent_id is None
assert event.message is not None
assert f"user {seeded.user.id}" in event.message
assert "openclaw/pipeline#42" in event.message
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_board_lead_agent_can_close_linked_issue_and_records_activity(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with session_maker() as session:
seeded = await _seed(session)
_patch_forgejo_client(monkeypatch, should_fail=False)
agent_ctx = AgentAuthContext(actor_type="agent", agent=seeded.lead_agent)
app = _build_test_app(
session_maker,
agent_ctx=agent_ctx,
board=seeded.board,
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/agent/boards/{seeded.board.id}/git/issues/{seeded.issue.id}/close"
)
assert response.status_code == 200
payload = response.json()
assert payload["success"] is True
assert payload["state"] == "closed"
async with session_maker() as session:
stored_issue = await session.get(ForgejoIssue, seeded.issue.id)
assert stored_issue is not None
assert stored_issue.state == "closed"
events = (await session.exec(select(ActivityEvent))).all()
assert len(events) == 1
event = events[0]
assert event.event_type == "forgejo.issue.closed"
assert event.board_id == seeded.board.id
assert event.agent_id == seeded.lead_agent.id
assert event.message is not None
assert f"agent {seeded.lead_agent.id}" in event.message
assert "openclaw/pipeline#42" in event.message
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_worker_agent_cannot_close_issue(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with session_maker() as session:
seeded = await _seed(session)
_patch_forgejo_client(monkeypatch, should_fail=False)
worker_ctx = AgentAuthContext(actor_type="agent", agent=seeded.worker_agent)
app = _build_test_app(
session_maker,
agent_ctx=worker_ctx,
board=seeded.board,
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(
f"/api/v1/agent/boards/{seeded.board.id}/git/issues/{seeded.issue.id}/close"
)
assert response.status_code == 403
assert response.json()["detail"] == "Only board leads can close issues"
async with session_maker() as session:
stored_issue = await session.get(ForgejoIssue, seeded.issue.id)
assert stored_issue is not None
assert stored_issue.state == "open"
events = (await session.exec(select(ActivityEvent))).all()
assert events == []
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_forgejo_close_failure_does_not_update_local_issue(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with session_maker() as session:
seeded = await _seed(session)
_patch_forgejo_client(monkeypatch, should_fail=True)
org_ctx = OrganizationContext(organization=seeded.org, member=seeded.member)
auth_ctx = AuthContext(actor_type="user", user=seeded.user)
app = _build_test_app(session_maker, org_ctx=org_ctx, auth_ctx=auth_ctx)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
response = await client.post(f"/api/v1/forgejo/issues/{seeded.issue.id}/close")
assert response.status_code == 502
assert "Failed to close issue on Forgejo" in response.json()["detail"]
async with session_maker() as session:
stored_issue = await session.get(ForgejoIssue, seeded.issue.id)
assert stored_issue is not None
assert stored_issue.state == "open"
assert stored_issue.forgejo_closed_at is None
events = (await session.exec(select(ActivityEvent))).all()
assert events == []
finally:
await engine.dispose()