Pipeline/backend/app/services/forgejo_sync_queue.py

148 lines
4.8 KiB
Python

"""Queue task helpers for scheduled Forgejo issue cache sync."""
from __future__ import annotations
from datetime import UTC, datetime
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.config import settings
from app.core.logging import get_logger
from app.core.time import utcnow
from app.db import crud
from app.db.session import async_session_maker
from app.models.forgejo_repositories import ForgejoRepository
from app.services.forgejo_issue_sync import IssueSyncService
from app.services.queue import QueuedTask, enqueue_task_with_delay
from app.services.queue import requeue_if_failed as generic_requeue_if_failed
logger = get_logger(__name__)
TASK_TYPE = "forgejo_issue_sync_all"
_SINGLETON_CREATED_AT = datetime(1970, 1, 1, tzinfo=UTC)
def _task() -> QueuedTask:
return QueuedTask(
task_type=TASK_TYPE,
payload={"scope": "active_repositories"},
created_at=_SINGLETON_CREATED_AT,
)
def _sync_enabled() -> bool:
return settings.forgejo_sync_enabled and settings.forgejo_sync_interval_seconds > 0
def enqueue_forgejo_issue_sync(*, delay_seconds: float) -> bool:
"""Schedule the singleton Forgejo issue sync task."""
if not _sync_enabled():
logger.info("forgejo.sync.queue.disabled")
return False
delay = max(0.0, float(delay_seconds))
ok = enqueue_task_with_delay(
_task(),
settings.rq_queue_name,
delay_seconds=delay,
redis_url=settings.rq_redis_url,
)
if ok:
logger.info(
"forgejo.sync.queue.enqueued",
extra={"delay_seconds": delay},
)
return ok
def seed_scheduled_forgejo_sync() -> bool:
"""Seed periodic Forgejo sync when the API process starts."""
return enqueue_forgejo_issue_sync(
delay_seconds=settings.forgejo_sync_startup_delay_seconds,
)
def schedule_next_forgejo_issue_sync() -> bool:
"""Schedule the next periodic Forgejo issue sync run."""
return enqueue_forgejo_issue_sync(
delay_seconds=settings.forgejo_sync_interval_seconds,
)
async def sync_active_forgejo_repositories(session: AsyncSession) -> dict[str, int]:
"""Sync all active Forgejo repositories and return aggregate counts."""
repositories = (
await session.exec(
select(ForgejoRepository)
.where(ForgejoRepository.active.is_(True))
.order_by(ForgejoRepository.created_at.asc())
)
).all()
totals = {
"repositories": len(repositories),
"succeeded": 0,
"failed": 0,
"created": 0,
"updated": 0,
"open": 0,
"closed": 0,
"total": 0,
}
for repository in repositories:
try:
result = await IssueSyncService(
session=session,
organization_id=repository.organization_id,
).sync_repository_issues(repository_id=repository.id)
except Exception as exc:
repository.last_sync_error = str(exc)
repository.updated_at = utcnow()
await crud.save(session, repository)
totals["failed"] += 1
logger.warning(
"forgejo.sync.repository_failed",
extra={
"repository_id": str(repository.id),
"owner": repository.owner,
"repo": repository.repo,
"error": str(exc),
},
)
continue
totals["succeeded"] += 1
for key in ("created", "updated", "open", "closed", "total"):
totals[key] += int(result.get(key, 0))
return totals
async def process_forgejo_sync_queue_task(task: QueuedTask) -> None:
"""Worker entrypoint for scheduled Forgejo issue sync."""
if task.task_type not in {TASK_TYPE, "legacy"}:
raise ValueError(f"Unexpected task_type={task.task_type!r}; expected {TASK_TYPE!r}")
if not _sync_enabled():
logger.info("forgejo.sync.skipped_disabled")
return
try:
async with async_session_maker() as session:
totals = await sync_active_forgejo_repositories(session)
logger.info(
"forgejo.sync.completed",
extra={f"sync_{key}": value for key, value in totals.items()},
)
except Exception as exc:
logger.exception("forgejo.sync.failed", extra={"error": str(exc)})
finally:
schedule_next_forgejo_issue_sync()
def requeue_forgejo_sync_queue_task(task: QueuedTask, *, delay_seconds: float = 0) -> bool:
"""Requeue a failed Forgejo sync task with capped retries."""
return generic_requeue_if_failed(
task,
settings.rq_queue_name,
max_retries=settings.rq_dispatch_max_retries,
redis_url=settings.rq_redis_url,
delay_seconds=max(0.0, delay_seconds),
)