# ruff: noqa: INP001 """Integration tests for Forgejo repository settings API responses.""" from __future__ import annotations from datetime import datetime from uuid import uuid4 import pytest from fastapi import APIRouter, FastAPI from httpx import ASGITransport, AsyncClient from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine from sqlmodel import SQLModel from sqlmodel.ext.asyncio.session import AsyncSession from app import models as _models from app.api.deps import require_org_member from app.api.forgejo_repositories import router as forgejo_repositories_router from app.db.session import get_session from app.models.board_repository_links import BoardRepositoryLink from app.models.boards import Board from app.models.forgejo_connections import ForgejoConnection from app.models.forgejo_import_runs import ForgejoImportRun from app.models.forgejo_repositories import ForgejoRepository from app.models.organization_members import OrganizationMember from app.models.organizations import Organization from app.services.organizations import OrganizationContext async def _make_engine() -> AsyncEngine: engine = create_async_engine("sqlite+aiosqlite:///:memory:") async with engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) return engine def _build_test_app( session_maker: async_sessionmaker[AsyncSession], ctx: OrganizationContext, ) -> FastAPI: app = FastAPI() api_v1 = APIRouter(prefix="/api/v1") api_v1.include_router(forgejo_repositories_router) app.include_router(api_v1) async def _override_get_session() -> AsyncSession: async with session_maker() as session: yield session async def _override_require_org_member() -> OrganizationContext: return ctx app.dependency_overrides[get_session] = _override_get_session app.dependency_overrides[require_org_member] = _override_require_org_member return app @pytest.mark.asyncio async def test_list_repositories_embeds_linked_board_summaries() -> None: engine = await _make_engine() session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) organization = Organization(id=uuid4(), name="Pipeline") member = OrganizationMember( id=uuid4(), organization_id=organization.id, user_id=uuid4(), role="owner", ) app = _build_test_app( session_maker, OrganizationContext(organization=organization, member=member), ) connection = ForgejoConnection( id=uuid4(), organization_id=organization.id, name="Dream Forgejo", base_url="https://forgejo.example.local", ) repository = ForgejoRepository( id=uuid4(), organization_id=organization.id, connection_id=connection.id, owner="openclaw", repo="pipeline", display_name="Pipeline", ) board = Board( id=uuid4(), organization_id=organization.id, name="Pipeline Board", slug="pipeline-board", ) link = BoardRepositoryLink( id=uuid4(), organization_id=organization.id, board_id=board.id, repository_id=repository.id, ) try: async with session_maker() as session: session.add(organization) session.add(connection) session.add(repository) session.add(board) session.add(link) await session.commit() async with AsyncClient( transport=ASGITransport(app=app), base_url="http://testserver", ) as client: response = await client.get("/api/v1/forgejo/repositories") assert response.status_code == 200 data = response.json() assert data[0]["linked_boards"] == [{"id": str(board.id), "name": "Pipeline Board"}] finally: await engine.dispose() @pytest.mark.asyncio async def test_import_runs_route_returns_persisted_history() -> None: engine = await _make_engine() session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) organization = Organization(id=uuid4(), name="Pipeline") member = OrganizationMember( id=uuid4(), organization_id=organization.id, user_id=uuid4(), role="owner", ) app = _build_test_app( session_maker, OrganizationContext(organization=organization, member=member), ) repository_id = uuid4() run = ForgejoImportRun( id=uuid4(), organization_id=organization.id, requested_by_user_id=member.user_id, repository_ids=[str(repository_id)], results=[ { "repository_id": str(repository_id), "name": "openclaw/pipeline", "created": 2, "updated": 3, "stale_closed": 1, "open": 5, "closed": 1, "total": 6, "error": None, } ], total_created=2, total_updated=3, total_stale_closed=1, succeeded=1, failed=0, started_at=datetime(2026, 5, 25, 12, 0, 0), finished_at=datetime(2026, 5, 25, 12, 0, 3), duration_ms=3000, created_at=datetime(2026, 5, 25, 12, 0, 3), ) try: async with session_maker() as session: session.add(organization) session.add(run) await session.commit() async with AsyncClient( transport=ASGITransport(app=app), base_url="http://testserver", ) as client: response = await client.get("/api/v1/forgejo/repositories/import-runs") assert response.status_code == 200 data = response.json() assert data[0]["id"] == str(run.id) assert data[0]["results"][0]["created"] == 2 assert data[0]["repository_ids"] == [str(repository_id)] finally: await engine.dispose()