# ruff: noqa: INP001 """Integration tests for Forgejo issue close endpoints.""" from __future__ import annotations from datetime import datetime from types import SimpleNamespace from uuid import uuid4 import pytest from fastapi import APIRouter, FastAPI from httpx import ASGITransport, AsyncClient from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine from sqlmodel import SQLModel, select from sqlmodel.ext.asyncio.session import AsyncSession from app import models as _models from app.api import deps as deps_api from app.api.agent_forgejo import router as agent_forgejo_router from app.api.forgejo_issues import router as forgejo_issues_router from app.api.deps import require_org_member from app.core.agent_auth import AgentAuthContext, get_agent_auth_context from app.core.auth import AuthContext, get_auth_context from app.db.session import get_session from app.models.activity_events import ActivityEvent from app.models.agents import Agent from app.models.board_repository_links import BoardRepositoryLink from app.models.boards import Board from app.models.forgejo_connections import ForgejoConnection from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_repositories import ForgejoRepository from app.models.gateways import Gateway from app.models.organization_board_access import OrganizationBoardAccess from app.models.organization_members import OrganizationMember from app.models.organizations import Organization from app.models.users import User from app.services import forgejo_issue_close from app.services.organizations import OrganizationContext async def _make_engine() -> AsyncEngine: engine = create_async_engine("sqlite+aiosqlite:///:memory:") async with engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) return engine def _build_test_app( session_maker: async_sessionmaker[AsyncSession], *, org_ctx: OrganizationContext | None = None, auth_ctx: AuthContext | None = None, agent_ctx: AgentAuthContext | None = None, board: Board | None = None, ) -> FastAPI: app = FastAPI() api_v1 = APIRouter(prefix="/api/v1") api_v1.include_router(forgejo_issues_router) api_v1.include_router(agent_forgejo_router) app.include_router(api_v1) async def _override_get_session() -> AsyncSession: async with session_maker() as session: yield session app.dependency_overrides[get_session] = _override_get_session if org_ctx is not None: async def _override_require_org_member() -> OrganizationContext: return org_ctx app.dependency_overrides[require_org_member] = _override_require_org_member if auth_ctx is not None: async def _override_get_auth_context() -> AuthContext: return auth_ctx app.dependency_overrides[get_auth_context] = _override_get_auth_context if agent_ctx is not None: async def _override_get_agent_auth_context() -> AgentAuthContext: return agent_ctx app.dependency_overrides[get_agent_auth_context] = _override_get_agent_auth_context if board is not None: async def _override_get_board_for_actor_read() -> Board: return board app.dependency_overrides[deps_api.get_board_for_actor_read] = _override_get_board_for_actor_read return app async def _seed(session: AsyncSession) -> SimpleNamespace: org = Organization(id=uuid4(), name="Pipeline Org") user = User( id=uuid4(), clerk_user_id="user_123", email="user@example.com", name="User", active_organization_id=org.id, ) member = OrganizationMember( id=uuid4(), organization_id=org.id, user_id=user.id, role="member", all_boards_read=False, all_boards_write=False, ) connection = ForgejoConnection( id=uuid4(), organization_id=org.id, name="Dream Forgejo", base_url="https://forgejo.example.local", token="temp-token", token_last_eight="p-token", ) repository = ForgejoRepository( id=uuid4(), organization_id=org.id, connection_id=connection.id, owner="openclaw", repo="pipeline", display_name="Pipeline", ) board = Board( id=uuid4(), organization_id=org.id, name="Pipeline Board", slug="pipeline-board", ) board_access = OrganizationBoardAccess( id=uuid4(), organization_member_id=member.id, board_id=board.id, can_read=True, can_write=True, ) link = BoardRepositoryLink( id=uuid4(), organization_id=org.id, board_id=board.id, repository_id=repository.id, ) issue = ForgejoIssue( id=uuid4(), organization_id=org.id, repository_id=repository.id, forgejo_issue_number=42, title="Cached issue", body_preview="Cached body", state="open", is_pull_request=False, labels=[], assignees=[], author="kaspa", html_url="https://forgejo.example.local/openclaw/pipeline/issues/42", forgejo_created_at=datetime(2026, 5, 19, 12, 0, 0), forgejo_updated_at=datetime(2026, 5, 19, 12, 5, 0), ) gateway = Gateway( id=uuid4(), organization_id=org.id, name="Gateway", url="https://gateway.example.local", workspace_root="/workspace", ) lead_agent = Agent( id=uuid4(), board_id=board.id, gateway_id=gateway.id, name="Lead Agent", is_board_lead=True, ) worker_agent = Agent( id=uuid4(), board_id=board.id, gateway_id=gateway.id, name="Worker Agent", is_board_lead=False, ) session.add(org) session.add(user) session.add(member) session.add(connection) session.add(repository) session.add(board) session.add(board_access) session.add(link) session.add(issue) session.add(gateway) session.add(lead_agent) session.add(worker_agent) await session.commit() return SimpleNamespace( org=org, user=user, member=member, board=board, issue=issue, lead_agent=lead_agent, worker_agent=worker_agent, ) class _StubForgejoClient: def __init__(self, *, should_fail: bool) -> None: self._should_fail = should_fail async def __aenter__(self) -> _StubForgejoClient: return self async def __aexit__(self, *_args: object) -> None: return None async def close_issue(self, **_kwargs: object) -> dict[str, object]: if self._should_fail: raise RuntimeError("forgejo down") return {"state": "closed"} def _patch_forgejo_client( monkeypatch: pytest.MonkeyPatch, *, should_fail: bool, ) -> None: def _factory(_connection: object) -> _StubForgejoClient: return _StubForgejoClient(should_fail=should_fail) monkeypatch.setattr(forgejo_issue_close, "get_forgejo_client", _factory) @pytest.mark.asyncio async def test_human_writer_can_close_linked_issue_and_records_activity( monkeypatch: pytest.MonkeyPatch, ) -> None: engine = await _make_engine() session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) try: async with session_maker() as session: seeded = await _seed(session) _patch_forgejo_client(monkeypatch, should_fail=False) org_ctx = OrganizationContext(organization=seeded.org, member=seeded.member) auth_ctx = AuthContext(actor_type="user", user=seeded.user) app = _build_test_app(session_maker, org_ctx=org_ctx, auth_ctx=auth_ctx) async with AsyncClient( transport=ASGITransport(app=app), base_url="http://testserver", ) as client: response = await client.post(f"/api/v1/forgejo/issues/{seeded.issue.id}/close") assert response.status_code == 200 payload = response.json() assert payload["success"] is True assert payload["state"] == "closed" assert payload["forgejo_issue_number"] == 42 async with session_maker() as session: stored_issue = await session.get(ForgejoIssue, seeded.issue.id) assert stored_issue is not None assert stored_issue.state == "closed" assert stored_issue.forgejo_closed_at is not None assert stored_issue.last_synced_at is not None events = (await session.exec(select(ActivityEvent))).all() assert len(events) == 1 event = events[0] assert event.event_type == "forgejo.issue.closed" assert event.board_id == seeded.board.id assert event.agent_id is None assert event.message is not None assert f"user {seeded.user.id}" in event.message assert "openclaw/pipeline#42" in event.message finally: await engine.dispose() @pytest.mark.asyncio async def test_board_lead_agent_can_close_linked_issue_and_records_activity( monkeypatch: pytest.MonkeyPatch, ) -> None: engine = await _make_engine() session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) try: async with session_maker() as session: seeded = await _seed(session) _patch_forgejo_client(monkeypatch, should_fail=False) agent_ctx = AgentAuthContext(actor_type="agent", agent=seeded.lead_agent) app = _build_test_app( session_maker, agent_ctx=agent_ctx, board=seeded.board, ) async with AsyncClient( transport=ASGITransport(app=app), base_url="http://testserver", ) as client: response = await client.post( f"/api/v1/agent/boards/{seeded.board.id}/git/issues/{seeded.issue.id}/close" ) assert response.status_code == 200 payload = response.json() assert payload["success"] is True assert payload["state"] == "closed" async with session_maker() as session: stored_issue = await session.get(ForgejoIssue, seeded.issue.id) assert stored_issue is not None assert stored_issue.state == "closed" events = (await session.exec(select(ActivityEvent))).all() assert len(events) == 1 event = events[0] assert event.event_type == "forgejo.issue.closed" assert event.board_id == seeded.board.id assert event.agent_id == seeded.lead_agent.id assert event.message is not None assert f"agent {seeded.lead_agent.id}" in event.message assert "openclaw/pipeline#42" in event.message finally: await engine.dispose() @pytest.mark.asyncio async def test_worker_agent_cannot_close_issue( monkeypatch: pytest.MonkeyPatch, ) -> None: engine = await _make_engine() session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) try: async with session_maker() as session: seeded = await _seed(session) _patch_forgejo_client(monkeypatch, should_fail=False) worker_ctx = AgentAuthContext(actor_type="agent", agent=seeded.worker_agent) app = _build_test_app( session_maker, agent_ctx=worker_ctx, board=seeded.board, ) async with AsyncClient( transport=ASGITransport(app=app), base_url="http://testserver", ) as client: response = await client.post( f"/api/v1/agent/boards/{seeded.board.id}/git/issues/{seeded.issue.id}/close" ) assert response.status_code == 403 assert response.json()["detail"] == "Only board leads can close issues" async with session_maker() as session: stored_issue = await session.get(ForgejoIssue, seeded.issue.id) assert stored_issue is not None assert stored_issue.state == "open" events = (await session.exec(select(ActivityEvent))).all() assert events == [] finally: await engine.dispose() @pytest.mark.asyncio async def test_forgejo_close_failure_does_not_update_local_issue( monkeypatch: pytest.MonkeyPatch, ) -> None: engine = await _make_engine() session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) try: async with session_maker() as session: seeded = await _seed(session) _patch_forgejo_client(monkeypatch, should_fail=True) org_ctx = OrganizationContext(organization=seeded.org, member=seeded.member) auth_ctx = AuthContext(actor_type="user", user=seeded.user) app = _build_test_app(session_maker, org_ctx=org_ctx, auth_ctx=auth_ctx) async with AsyncClient( transport=ASGITransport(app=app), base_url="http://testserver", ) as client: response = await client.post(f"/api/v1/forgejo/issues/{seeded.issue.id}/close") assert response.status_code == 502 assert "Failed to close issue on Forgejo" in response.json()["detail"] async with session_maker() as session: stored_issue = await session.get(ForgejoIssue, seeded.issue.id) assert stored_issue is not None assert stored_issue.state == "open" assert stored_issue.forgejo_closed_at is None events = (await session.exec(select(ActivityEvent))).all() assert events == [] finally: await engine.dispose()