# ruff: noqa: INP001 """Scheduled Forgejo issue sync queue tests.""" from __future__ import annotations from uuid import uuid4 import pytest from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine from sqlmodel import SQLModel, col, select from sqlmodel.ext.asyncio.session import AsyncSession from app import models as _models from app.models.forgejo_connections import ForgejoConnection from app.models.forgejo_repositories import ForgejoRepository from app.models.organizations import Organization from app.services import forgejo_sync_queue from app.services.forgejo_sync_queue import ( enqueue_forgejo_issue_sync, process_forgejo_sync_queue_task, sync_active_forgejo_repositories, ) from app.services.queue import QueuedTask _MODEL_REGISTRY = _models async def _make_engine() -> AsyncEngine: engine = create_async_engine("sqlite+aiosqlite:///:memory:") async with engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) return engine async def _seed_repositories( session: AsyncSession, ) -> tuple[ForgejoRepository, ForgejoRepository, ForgejoRepository]: organization_id = uuid4() connection_id = uuid4() session.add(Organization(id=organization_id, name=f"org-{organization_id}")) session.add( ForgejoConnection( id=connection_id, organization_id=organization_id, name="Forgejo", base_url="https://forgejo.example.local", ) ) active_repo = ForgejoRepository( id=uuid4(), organization_id=organization_id, connection_id=connection_id, owner="openclaw", repo="pipeline", active=True, ) failing_repo = ForgejoRepository( id=uuid4(), organization_id=organization_id, connection_id=connection_id, owner="openclaw", repo="broken", active=True, ) inactive_repo = ForgejoRepository( id=uuid4(), organization_id=organization_id, connection_id=connection_id, owner="openclaw", repo="inactive", active=False, ) session.add(active_repo) session.add(failing_repo) session.add(inactive_repo) await session.commit() return active_repo, failing_repo, inactive_repo def test_enqueue_forgejo_issue_sync_uses_singleton_delayed_task( monkeypatch: pytest.MonkeyPatch, ) -> None: captured: dict[str, object] = {} def _fake_enqueue_with_delay( task: QueuedTask, queue_name: str, *, delay_seconds: float, redis_url: str | None = None, ) -> bool: captured["task"] = task captured["queue_name"] = queue_name captured["delay_seconds"] = delay_seconds captured["redis_url"] = redis_url return True monkeypatch.setattr(forgejo_sync_queue.settings, "forgejo_sync_enabled", True) monkeypatch.setattr(forgejo_sync_queue.settings, "forgejo_sync_interval_seconds", 3600) monkeypatch.setattr( "app.services.forgejo_sync_queue.enqueue_task_with_delay", _fake_enqueue_with_delay, ) assert enqueue_forgejo_issue_sync(delay_seconds=30) is True task = captured["task"] assert isinstance(task, QueuedTask) assert task.task_type == "forgejo_issue_sync_all" assert task.payload == {"scope": "active_repositories"} assert task.created_at.isoformat() == "1970-01-01T00:00:00+00:00" assert captured["delay_seconds"] == 30 def test_enqueue_forgejo_issue_sync_respects_disabled_setting( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setattr(forgejo_sync_queue.settings, "forgejo_sync_enabled", False) monkeypatch.setattr( "app.services.forgejo_sync_queue.enqueue_task_with_delay", lambda *args, **kwargs: pytest.fail("disabled sync should not enqueue"), ) assert enqueue_forgejo_issue_sync(delay_seconds=30) is False @pytest.mark.asyncio async def test_sync_active_forgejo_repositories_syncs_active_and_records_errors( monkeypatch: pytest.MonkeyPatch, ) -> None: calls: list[str] = [] class _FakeIssueSyncService: def __init__(self, *, session: AsyncSession, organization_id: object) -> None: del session, organization_id async def sync_repository_issues(self, *, repository_id: object) -> dict[str, int]: calls.append(str(repository_id)) if repository_id == failing_repo.id: raise RuntimeError("Forgejo unavailable") return {"created": 2, "updated": 3, "open": 4, "closed": 1, "total": 5} engine = await _make_engine() session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) try: async with session_maker() as session: active_repo, failing_repo, inactive_repo = await _seed_repositories(session) monkeypatch.setattr( "app.services.forgejo_sync_queue.IssueSyncService", _FakeIssueSyncService, ) totals = await sync_active_forgejo_repositories(session) assert totals == { "repositories": 2, "succeeded": 1, "failed": 1, "created": 2, "updated": 3, "open": 4, "closed": 1, "total": 5, } assert calls == [str(active_repo.id), str(failing_repo.id)] async with session_maker() as session: stored_failing_repo = ( await session.exec( select(ForgejoRepository).where(col(ForgejoRepository.id) == failing_repo.id) ) ).one() stored_inactive_repo = ( await session.exec( select(ForgejoRepository).where(col(ForgejoRepository.id) == inactive_repo.id) ) ).one() assert stored_failing_repo.last_sync_error == "Forgejo unavailable" assert stored_inactive_repo.last_sync_error is None finally: await engine.dispose() @pytest.mark.asyncio async def test_process_forgejo_sync_queue_task_reschedules_after_run( monkeypatch: pytest.MonkeyPatch, ) -> None: called: list[str] = [] async def _fake_sync_active_repositories(session: AsyncSession) -> dict[str, int]: del session called.append("sync") return { "repositories": 0, "succeeded": 0, "failed": 0, "created": 0, "updated": 0, "open": 0, "closed": 0, "total": 0, } class _FakeSessionMaker: async def __aenter__(self) -> AsyncSession: return None # type: ignore[return-value] async def __aexit__(self, *args: object) -> None: return None monkeypatch.setattr(forgejo_sync_queue.settings, "forgejo_sync_enabled", True) monkeypatch.setattr(forgejo_sync_queue.settings, "forgejo_sync_interval_seconds", 12) monkeypatch.setattr("app.services.forgejo_sync_queue.async_session_maker", _FakeSessionMaker) monkeypatch.setattr( "app.services.forgejo_sync_queue.sync_active_forgejo_repositories", _fake_sync_active_repositories, ) monkeypatch.setattr( "app.services.forgejo_sync_queue.schedule_next_forgejo_issue_sync", lambda: called.append("reschedule") or True, ) await process_forgejo_sync_queue_task( QueuedTask( task_type="forgejo_issue_sync_all", payload={"scope": "active_repositories"}, created_at=forgejo_sync_queue._SINGLETON_CREATED_AT, ) ) assert called == ["sync", "reschedule"]