"""Forgejo issue sync service for pulling issues from remote repositories.""" from __future__ import annotations from datetime import datetime from typing import Any from uuid import UUID from sqlmodel import select from app.core.logging import get_logger from app.core.time import utcnow from app.db import crud from app.models.forgejo_connections import ForgejoConnection from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_repositories import ForgejoRepository from app.services.forgejo_client import ForgejoAPIClient, get_forgejo_client logger = get_logger(__name__) class IssueSyncService: """Service for syncing Forgejo issues from remote repositories.""" def __init__(self, session: object, organization_id: UUID) -> None: self.session = session self.organization_id = organization_id async def sync_repository_issues( self, repository_id: UUID, page: int = 1, limit: int = 30, ) -> dict[str, int]: """Sync issues from a Forgejo repository.""" # Load repository repository = await crud.get_by_id(self.session, ForgejoRepository, repository_id) if repository is None: raise ValueError(f"Repository {repository_id} not found or access denied") if repository.organization_id != self.organization_id: raise ValueError(f"Repository {repository_id} not found or access denied") # Load connection separately (no ORM relationship) connection = await crud.get_by_id(self.session, ForgejoConnection, repository.connection_id) if connection is None: raise ValueError("Repository has no connection") # Fetch issues from remote created = 0 updated_count = 0 open_count = 0 closed_count = 0 current_page = page while True: async with get_forgejo_client(connection) as client: response = await client.list_issues( owner=repository.owner, repo=repository.repo, state="all", page=current_page, limit=limit, ) # Forgejo returns issues as a JSON array, not wrapped in "items" issues = response if isinstance(response, list) else response.get("items", response.get("data", [])) if not isinstance(issues, list) or len(issues) == 0: break for issue_data in issues: # Skip pull requests if issue_data.get("pull_request") is not None: continue forgejo_number = issue_data.get("number", 0) state = issue_data.get("state", "open") # Parse labels labels_data = [] for label in (issue_data.get("labels") or []): labels_data.append({ "name": label.get("name", ""), "color": label.get("color", ""), "description": label.get("description", ""), }) # Parse assignees assignees_data = [] for assignee in (issue_data.get("assignees") or []): assignees_data.append({ "login": assignee.get("login", ""), "id": assignee.get("id", 0), "avatar_url": assignee.get("avatar_url", ""), }) # Parse dates created_at = self._parse_iso_date(issue_data.get("created_at")) updated_at = self._parse_iso_date(issue_data.get("updated_at")) closed_at = self._parse_iso_date(issue_data.get("closed_at")) # Check if issue exists existing = await self._find_issue(repository_id, forgejo_number) if existing is None: issue = ForgejoIssue( organization_id=self.organization_id, repository_id=repository_id, forgejo_issue_number=forgejo_number, title=issue_data.get("title", ""), body_preview=(issue_data.get("body") or "")[:1000], state=state, is_pull_request=False, labels=labels_data, assignees=assignees_data, author=issue_data.get("user", {}).get("login", ""), html_url=issue_data.get("html_url", ""), forgejo_created_at=created_at, forgejo_updated_at=updated_at, forgejo_closed_at=closed_at, ) self.session.add(issue) await self.session.flush() created += 1 else: existing.title = issue_data.get("title", "") existing.body_preview = (issue_data.get("body") or "")[:1000] existing.state = state existing.labels = labels_data existing.assignees = assignees_data existing.author = issue_data.get("user", {}).get("login", "") existing.html_url = issue_data.get("html_url", "") existing.forgejo_created_at = created_at existing.forgejo_updated_at = updated_at existing.forgejo_closed_at = closed_at existing.last_synced_at = utcnow() await crud.save(self.session, existing) updated_count += 1 if state == "open": open_count += 1 elif state == "closed": closed_count += 1 # If we got fewer than limit, we're done if len(issues) < limit: break current_page += 1 # Update repository sync metadata repository.last_sync_at = utcnow() repository.last_sync_error = None await crud.save(self.session, repository) return { "created": created, "updated": updated_count, "open": open_count, "closed": closed_count, "total": created + updated_count, } async def _find_issue(self, repository_id: UUID, forgejo_issue_number: int) -> ForgejoIssue | None: """Find an existing cached issue by repository and number.""" statement = select(ForgejoIssue).where( ForgejoIssue.repository_id == repository_id, ForgejoIssue.forgejo_issue_number == forgejo_issue_number, ) results = await self.session.exec(statement) return results.first() def _parse_iso_date(self, date_str: str | None) -> datetime: """Parse ISO format date string to datetime.""" if not date_str: return utcnow() try: # Handle Z suffix cleaned = date_str.replace("Z", "+00:00") parsed = datetime.fromisoformat(cleaned) # Strip timezone info for naive UTC storage return parsed.replace(tzinfo=None) except (ValueError, AttributeError): return utcnow()