"""Service for creating new Forgejo issues from Pipeline and caching them locally.""" from __future__ import annotations from typing import TYPE_CHECKING from uuid import UUID from sqlmodel import select from app.core.logging import get_logger from app.core.time import utcnow from app.db import crud from app.models.board_repository_links import BoardRepositoryLink from app.models.forgejo_connections import ForgejoConnection from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_repositories import ForgejoRepository from app.services.forgejo_client import get_forgejo_client if TYPE_CHECKING: from sqlalchemy.ext.asyncio.session import AsyncSession logger = get_logger(__name__) class CreateIssueError(Exception): """Base exception for issue creation errors.""" class CreateIssueNotFoundError(CreateIssueError): """Raised when a required repository or connection is not found.""" class CreateIssueRemoteError(CreateIssueError): """Raised when the Forgejo API call fails.""" async def create_issue_on_repository( session: AsyncSession, *, repository: ForgejoRepository, title: str, body: str, labels: list[int] | None = None, ) -> ForgejoIssue: """Create an issue on Forgejo and cache it locally. Raises: CreateIssueNotFoundError: If the connection is not found. CreateIssueRemoteError: If the Forgejo API call fails. """ connection = await crud.get_by_id(session, ForgejoConnection, repository.connection_id) if connection is None: raise CreateIssueNotFoundError("Repository connection not found") try: async with get_forgejo_client(connection) as client: raw = await client.create_issue( owner=repository.owner, repo=repository.repo, title=title, body=body, labels=labels, ) except Exception as exc: raise CreateIssueRemoteError(f"Failed to create issue on Forgejo: {exc}") from exc now = utcnow() raw_number = raw.get("number", 0) forgejo_number = int(raw_number) if isinstance(raw_number, int | float) else 0 raw_body = raw.get("body") or "" raw_user = raw.get("user") author = ( raw_user.get("login", "") if isinstance(raw_user, dict) else "" ) html_url = str(raw.get("html_url", "")) raw_created = raw.get("created_at") forgejo_created_at = now if isinstance(raw_created, str): try: from datetime import datetime forgejo_created_at = datetime.fromisoformat( raw_created.replace("Z", "+00:00") ).replace(tzinfo=None) except ValueError: pass issue = ForgejoIssue( organization_id=repository.organization_id, repository_id=repository.id, forgejo_issue_number=forgejo_number, title=title, body=raw_body or body, body_preview=(raw_body or body)[:1000], state="open", is_pull_request=False, labels=[], assignees=[], author=author, html_url=html_url, forgejo_created_at=forgejo_created_at, forgejo_updated_at=forgejo_created_at, forgejo_payload=dict(raw) if isinstance(raw, dict) else None, last_synced_at=now, ) session.add(issue) await session.flush() logger.info( "forgejo.issue.created", extra={ "issue_id": str(issue.id), "forgejo_issue_number": forgejo_number, "repository_id": str(repository.id), "organization_id": str(repository.organization_id), }, ) return issue async def create_issue_on_board_repositories( session: AsyncSession, *, board_id: UUID, organization_id: UUID, title: str, body: str, labels: list[int] | None = None, ) -> list[ForgejoIssue]: """Create an issue on all repositories linked to a board. Returns a list of created ForgejoIssue records (one per linked repository). Repositories that fail are logged and skipped. """ links = ( await session.exec( select(BoardRepositoryLink).where( BoardRepositoryLink.board_id == board_id, BoardRepositoryLink.organization_id == organization_id, ) ) ).all() created: list[ForgejoIssue] = [] for link in links: repository = await crud.get_by_id(session, ForgejoRepository, link.repository_id) if repository is None: continue try: issue = await create_issue_on_repository( session, repository=repository, title=title, body=body, labels=labels, ) created.append(issue) except CreateIssueError as exc: logger.warning( "error_escalation.issue_create_failed", extra={ "board_id": str(board_id), "repository_id": str(link.repository_id), "error": str(exc), }, ) return created