Pipeline/backend/tests/test_forgejo_sync_queue.py

231 lines
7.6 KiB
Python

# ruff: noqa: INP001
"""Scheduled Forgejo issue sync queue tests."""
from __future__ import annotations
from uuid import uuid4
import pytest
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlmodel import SQLModel, col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app import models as _models
from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_repositories import ForgejoRepository
from app.models.organizations import Organization
from app.services import forgejo_sync_queue
from app.services.forgejo_sync_queue import (
enqueue_forgejo_issue_sync,
process_forgejo_sync_queue_task,
sync_active_forgejo_repositories,
)
from app.services.queue import QueuedTask
_MODEL_REGISTRY = _models
async def _make_engine() -> AsyncEngine:
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
return engine
async def _seed_repositories(
session: AsyncSession,
) -> tuple[ForgejoRepository, ForgejoRepository, ForgejoRepository]:
organization_id = uuid4()
connection_id = uuid4()
session.add(Organization(id=organization_id, name=f"org-{organization_id}"))
session.add(
ForgejoConnection(
id=connection_id,
organization_id=organization_id,
name="Forgejo",
base_url="https://forgejo.example.local",
)
)
active_repo = ForgejoRepository(
id=uuid4(),
organization_id=organization_id,
connection_id=connection_id,
owner="openclaw",
repo="pipeline",
active=True,
)
failing_repo = ForgejoRepository(
id=uuid4(),
organization_id=organization_id,
connection_id=connection_id,
owner="openclaw",
repo="broken",
active=True,
)
inactive_repo = ForgejoRepository(
id=uuid4(),
organization_id=organization_id,
connection_id=connection_id,
owner="openclaw",
repo="inactive",
active=False,
)
session.add(active_repo)
session.add(failing_repo)
session.add(inactive_repo)
await session.commit()
return active_repo, failing_repo, inactive_repo
def test_enqueue_forgejo_issue_sync_uses_singleton_delayed_task(
monkeypatch: pytest.MonkeyPatch,
) -> None:
captured: dict[str, object] = {}
def _fake_enqueue_with_delay(
task: QueuedTask,
queue_name: str,
*,
delay_seconds: float,
redis_url: str | None = None,
) -> bool:
captured["task"] = task
captured["queue_name"] = queue_name
captured["delay_seconds"] = delay_seconds
captured["redis_url"] = redis_url
return True
monkeypatch.setattr(forgejo_sync_queue.settings, "forgejo_sync_enabled", True)
monkeypatch.setattr(forgejo_sync_queue.settings, "forgejo_sync_interval_seconds", 3600)
monkeypatch.setattr(
"app.services.forgejo_sync_queue.enqueue_task_with_delay",
_fake_enqueue_with_delay,
)
assert enqueue_forgejo_issue_sync(delay_seconds=30) is True
task = captured["task"]
assert isinstance(task, QueuedTask)
assert task.task_type == "forgejo_issue_sync_all"
assert task.payload == {"scope": "active_repositories"}
assert task.created_at.isoformat() == "1970-01-01T00:00:00+00:00"
assert captured["delay_seconds"] == 30
def test_enqueue_forgejo_issue_sync_respects_disabled_setting(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(forgejo_sync_queue.settings, "forgejo_sync_enabled", False)
monkeypatch.setattr(
"app.services.forgejo_sync_queue.enqueue_task_with_delay",
lambda *args, **kwargs: pytest.fail("disabled sync should not enqueue"),
)
assert enqueue_forgejo_issue_sync(delay_seconds=30) is False
@pytest.mark.asyncio
async def test_sync_active_forgejo_repositories_syncs_active_and_records_errors(
monkeypatch: pytest.MonkeyPatch,
) -> None:
calls: list[str] = []
class _FakeIssueSyncService:
def __init__(self, *, session: AsyncSession, organization_id: object) -> None:
del session, organization_id
async def sync_repository_issues(self, *, repository_id: object) -> dict[str, int]:
calls.append(str(repository_id))
if repository_id == failing_repo.id:
raise RuntimeError("Forgejo unavailable")
return {"created": 2, "updated": 3, "open": 4, "closed": 1, "total": 5}
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with session_maker() as session:
active_repo, failing_repo, inactive_repo = await _seed_repositories(session)
monkeypatch.setattr(
"app.services.forgejo_sync_queue.IssueSyncService",
_FakeIssueSyncService,
)
totals = await sync_active_forgejo_repositories(session)
assert totals == {
"repositories": 2,
"succeeded": 1,
"failed": 1,
"created": 2,
"updated": 3,
"open": 4,
"closed": 1,
"total": 5,
}
assert calls == [str(active_repo.id), str(failing_repo.id)]
async with session_maker() as session:
stored_failing_repo = (
await session.exec(
select(ForgejoRepository).where(col(ForgejoRepository.id) == failing_repo.id)
)
).one()
stored_inactive_repo = (
await session.exec(
select(ForgejoRepository).where(col(ForgejoRepository.id) == inactive_repo.id)
)
).one()
assert stored_failing_repo.last_sync_error == "Forgejo unavailable"
assert stored_inactive_repo.last_sync_error is None
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_process_forgejo_sync_queue_task_reschedules_after_run(
monkeypatch: pytest.MonkeyPatch,
) -> None:
called: list[str] = []
async def _fake_sync_active_repositories(session: AsyncSession) -> dict[str, int]:
del session
called.append("sync")
return {
"repositories": 0,
"succeeded": 0,
"failed": 0,
"created": 0,
"updated": 0,
"open": 0,
"closed": 0,
"total": 0,
}
class _FakeSessionMaker:
async def __aenter__(self) -> AsyncSession:
return None # type: ignore[return-value]
async def __aexit__(self, *args: object) -> None:
return None
monkeypatch.setattr(forgejo_sync_queue.settings, "forgejo_sync_enabled", True)
monkeypatch.setattr(forgejo_sync_queue.settings, "forgejo_sync_interval_seconds", 12)
monkeypatch.setattr("app.services.forgejo_sync_queue.async_session_maker", _FakeSessionMaker)
monkeypatch.setattr(
"app.services.forgejo_sync_queue.sync_active_forgejo_repositories",
_fake_sync_active_repositories,
)
monkeypatch.setattr(
"app.services.forgejo_sync_queue.schedule_next_forgejo_issue_sync",
lambda: called.append("reschedule") or True,
)
await process_forgejo_sync_queue_task(
QueuedTask(
task_type="forgejo_issue_sync_all",
payload={"scope": "active_repositories"},
created_at=forgejo_sync_queue._SINGLETON_CREATED_AT,
)
)
assert called == ["sync", "reschedule"]