Pipeline/backend/app/services/forgejo_issue_create.py

167 lines
5.1 KiB
Python
Raw Normal View History

2026-05-22 01:44:39 -05:00
"""Service for creating new Forgejo issues from Pipeline and caching them locally."""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from sqlmodel import select
from app.core.logging import get_logger
from app.core.time import utcnow
from app.db import crud
from app.models.board_repository_links import BoardRepositoryLink
from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
from app.services.forgejo_client import get_forgejo_client
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio.session import AsyncSession
logger = get_logger(__name__)
class CreateIssueError(Exception):
"""Base exception for issue creation errors."""
class CreateIssueNotFoundError(CreateIssueError):
"""Raised when a required repository or connection is not found."""
class CreateIssueRemoteError(CreateIssueError):
"""Raised when the Forgejo API call fails."""
async def create_issue_on_repository(
session: AsyncSession,
*,
repository: ForgejoRepository,
title: str,
body: str,
labels: list[int] | None = None,
) -> ForgejoIssue:
"""Create an issue on Forgejo and cache it locally.
Raises:
CreateIssueNotFoundError: If the connection is not found.
CreateIssueRemoteError: If the Forgejo API call fails.
"""
connection = await crud.get_by_id(session, ForgejoConnection, repository.connection_id)
if connection is None:
raise CreateIssueNotFoundError("Repository connection not found")
try:
async with get_forgejo_client(connection) as client:
raw = await client.create_issue(
owner=repository.owner,
repo=repository.repo,
title=title,
body=body,
labels=labels,
)
except Exception as exc:
raise CreateIssueRemoteError(f"Failed to create issue on Forgejo: {exc}") from exc
now = utcnow()
raw_number = raw.get("number", 0)
forgejo_number = int(raw_number) if isinstance(raw_number, int | float) else 0
raw_body = raw.get("body") or ""
raw_user = raw.get("user")
author = (
raw_user.get("login", "") if isinstance(raw_user, dict) else ""
)
html_url = str(raw.get("html_url", ""))
raw_created = raw.get("created_at")
forgejo_created_at = now
if isinstance(raw_created, str):
try:
from datetime import datetime
forgejo_created_at = datetime.fromisoformat(
raw_created.replace("Z", "+00:00")
).replace(tzinfo=None)
except ValueError:
pass
issue = ForgejoIssue(
organization_id=repository.organization_id,
repository_id=repository.id,
forgejo_issue_number=forgejo_number,
title=title,
body=raw_body or body,
body_preview=(raw_body or body)[:1000],
state="open",
is_pull_request=False,
labels=[],
assignees=[],
author=author,
html_url=html_url,
forgejo_created_at=forgejo_created_at,
forgejo_updated_at=forgejo_created_at,
forgejo_payload=dict(raw) if isinstance(raw, dict) else None,
last_synced_at=now,
)
session.add(issue)
await session.flush()
logger.info(
"forgejo.issue.created",
extra={
"issue_id": str(issue.id),
"forgejo_issue_number": forgejo_number,
"repository_id": str(repository.id),
"organization_id": str(repository.organization_id),
},
)
return issue
async def create_issue_on_board_repositories(
session: AsyncSession,
*,
board_id: UUID,
organization_id: UUID,
title: str,
body: str,
labels: list[int] | None = None,
) -> list[ForgejoIssue]:
"""Create an issue on all repositories linked to a board.
Returns a list of created ForgejoIssue records (one per linked repository).
Repositories that fail are logged and skipped.
"""
links = (
await session.exec(
select(BoardRepositoryLink).where(
BoardRepositoryLink.board_id == board_id,
BoardRepositoryLink.organization_id == organization_id,
)
)
).all()
created: list[ForgejoIssue] = []
for link in links:
repository = await crud.get_by_id(session, ForgejoRepository, link.repository_id)
if repository is None:
continue
try:
issue = await create_issue_on_repository(
session,
repository=repository,
title=title,
body=body,
labels=labels,
)
created.append(issue)
except CreateIssueError as exc:
logger.warning(
"error_escalation.issue_create_failed",
extra={
"board_id": str(board_id),
"repository_id": str(link.repository_id),
"error": str(exc),
},
)
return created