"""Queue task helpers for scheduled Forgejo issue cache sync.""" from __future__ import annotations from datetime import UTC, datetime from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from app.core.config import settings from app.core.logging import get_logger from app.core.time import utcnow from app.db import crud from app.db.session import async_session_maker from app.models.forgejo_repositories import ForgejoRepository from app.services.forgejo_issue_sync import IssueSyncService from app.services.queue import QueuedTask, enqueue_task_with_delay from app.services.queue import requeue_if_failed as generic_requeue_if_failed logger = get_logger(__name__) TASK_TYPE = "forgejo_issue_sync_all" _SINGLETON_CREATED_AT = datetime(1970, 1, 1, tzinfo=UTC) def _task() -> QueuedTask: return QueuedTask( task_type=TASK_TYPE, payload={"scope": "active_repositories"}, created_at=_SINGLETON_CREATED_AT, ) def _sync_enabled() -> bool: return settings.forgejo_sync_enabled and settings.forgejo_sync_interval_seconds > 0 def enqueue_forgejo_issue_sync(*, delay_seconds: float) -> bool: """Schedule the singleton Forgejo issue sync task.""" if not _sync_enabled(): logger.info("forgejo.sync.queue.disabled") return False delay = max(0.0, float(delay_seconds)) ok = enqueue_task_with_delay( _task(), settings.rq_queue_name, delay_seconds=delay, redis_url=settings.rq_redis_url, ) if ok: logger.info( "forgejo.sync.queue.enqueued", extra={"delay_seconds": delay}, ) return ok def seed_scheduled_forgejo_sync() -> bool: """Seed periodic Forgejo sync when the API process starts.""" return enqueue_forgejo_issue_sync( delay_seconds=settings.forgejo_sync_startup_delay_seconds, ) def schedule_next_forgejo_issue_sync() -> bool: """Schedule the next periodic Forgejo issue sync run.""" return enqueue_forgejo_issue_sync( delay_seconds=settings.forgejo_sync_interval_seconds, ) async def sync_active_forgejo_repositories(session: AsyncSession) -> dict[str, int]: """Sync all active Forgejo repositories and return aggregate counts.""" repositories = ( await session.exec( select(ForgejoRepository) .where(ForgejoRepository.active.is_(True)) .order_by(ForgejoRepository.created_at.asc()) ) ).all() totals = { "repositories": len(repositories), "succeeded": 0, "failed": 0, "created": 0, "updated": 0, "open": 0, "closed": 0, "total": 0, } for repository in repositories: try: result = await IssueSyncService( session=session, organization_id=repository.organization_id, ).sync_repository_issues(repository_id=repository.id) except Exception as exc: repository.last_sync_error = str(exc) repository.updated_at = utcnow() await crud.save(session, repository) totals["failed"] += 1 logger.warning( "forgejo.sync.repository_failed", extra={ "repository_id": str(repository.id), "owner": repository.owner, "repo": repository.repo, "error": str(exc), }, ) continue totals["succeeded"] += 1 for key in ("created", "updated", "open", "closed", "total"): totals[key] += int(result.get(key, 0)) return totals async def process_forgejo_sync_queue_task(task: QueuedTask) -> None: """Worker entrypoint for scheduled Forgejo issue sync.""" if task.task_type not in {TASK_TYPE, "legacy"}: raise ValueError(f"Unexpected task_type={task.task_type!r}; expected {TASK_TYPE!r}") if not _sync_enabled(): logger.info("forgejo.sync.skipped_disabled") return try: async with async_session_maker() as session: totals = await sync_active_forgejo_repositories(session) logger.info( "forgejo.sync.completed", extra={f"sync_{key}": value for key, value in totals.items()}, ) except Exception as exc: logger.exception("forgejo.sync.failed", extra={"error": str(exc)}) finally: schedule_next_forgejo_issue_sync() def requeue_forgejo_sync_queue_task(task: QueuedTask, *, delay_seconds: float = 0) -> bool: """Requeue a failed Forgejo sync task with capped retries.""" return generic_requeue_if_failed( task, settings.rq_queue_name, max_retries=settings.rq_dispatch_max_retries, redis_url=settings.rq_redis_url, delay_seconds=max(0.0, delay_seconds), )