Pipeline/backend/app/services/forgejo_issue_close.py

159 lines
5.0 KiB
Python

"""Service for closing Forgejo issues and updating local cache."""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from sqlmodel import select
from app.core.logging import get_logger
from app.core.time import utcnow
from app.db import crud
from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
from app.services.forgejo_client import get_forgejo_client
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio.session import AsyncSession
logger = get_logger(__name__)
class CloseIssueError(Exception):
"""Base exception for close issue errors."""
class CloseIssueNotFoundError(CloseIssueError):
"""Raised when issue or repository is not found."""
class CloseIssueAccessError(CloseIssueError):
"""Raised when access is denied to close issue."""
class CloseIssueRemoteError(CloseIssueError):
"""Raised when remote Forgejo API call fails."""
async def close_cached_issue(
session: AsyncSession,
issue: ForgejoIssue,
actor_agent_id: UUID | None = None,
actor_user_id: UUID | None = None,
) -> dict[str, object]:
"""
Close a Forgejo issue remotely and update local cache.
Args:
session: Async session for database operations
issue: ForgejoIssue to close
actor_agent_id: Optional agent ID performing the close
actor_user_id: Optional user ID performing the close
Returns:
Normalized result dict with success status and details
Raises:
CloseIssueNotFoundError: If repository or connection not found
CloseIssueAccessError: If organization mismatch
CloseIssueRemoteError: If Forgejo API call fails
"""
# Load repository and connection
repository = await crud.get_by_id(session, ForgejoRepository, issue.repository_id)
if repository is None:
raise CloseIssueNotFoundError("Repository not found")
connection = await crud.get_by_id(session, ForgejoConnection, repository.connection_id)
if connection is None:
raise CloseIssueNotFoundError("Connection not found")
# Verify organization matches
if issue.organization_id != repository.organization_id:
raise CloseIssueAccessError("Organization mismatch")
if issue.organization_id != connection.organization_id:
raise CloseIssueAccessError("Organization mismatch")
# Close issue on Forgejo first
try:
async with get_forgejo_client(connection) as client:
await client.close_issue(
owner=repository.owner,
repo=repository.repo,
issue_number=issue.forgejo_issue_number,
)
except Exception as e:
raise CloseIssueRemoteError(f"Failed to close issue on Forgejo: {e}")
# Only update local cache if Forgejo call succeeded
issue.state = "closed"
issue.forgejo_closed_at = utcnow()
issue.last_synced_at = utcnow()
# Update the issue in the session
session.add(issue)
await session.flush()
logger.info(
"forgejo.issue.closed",
extra={
"issue_id": str(issue.id),
"forgejo_issue_number": issue.forgejo_issue_number,
"repository_id": str(repository.id),
"organization_id": str(issue.organization_id),
"actor_agent_id": str(actor_agent_id) if actor_agent_id else None,
"actor_user_id": str(actor_user_id) if actor_user_id else None,
},
)
return {
"success": True,
"issue_id": str(issue.id),
"forgejo_issue_number": issue.forgejo_issue_number,
"repository_id": str(repository.id),
"repository_full_name": f"{repository.owner}/{repository.repo}",
"state": "closed",
"forgejo_closed_at": issue.forgejo_closed_at.isoformat() if issue.forgejo_closed_at else None,
"last_synced_at": issue.last_synced_at.isoformat(),
}
async def close_issue_by_id(
session: AsyncSession,
issue_id: UUID,
actor_agent_id: UUID | None = None,
actor_user_id: UUID | None = None,
) -> dict[str, object]:
"""
Close a Forgejo issue by ID.
Args:
session: Async session for database operations
issue_id: LocalForgejoIssue ID to close
actor_agent_id: Optional agent ID performing the close
actor_user_id: Optional user ID performing the close
Returns:
Normalized result dict with success status and details
Raises:
CloseIssueNotFoundError: If issue not found
CloseIssueAccessError: If access is denied
CloseIssueRemoteError: If Forgejo API call fails
"""
# Find the issue
statement = select(ForgejoIssue).where(ForgejoIssue.id == issue_id)
issue = (await session.exec(statement)).first()
if issue is None:
raise CloseIssueNotFoundError("Issue not found")
# Close the issue
return await close_cached_issue(
session=session,
issue=issue,
actor_agent_id=actor_agent_id,
actor_user_id=actor_user_id,
)