Pipeline/backend/app/services/forgejo_client.py

177 lines
4.7 KiB
Python

"""Forgejo API client service for making REST API calls."""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
import httpx
from app.core.logging import get_logger
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
logger = get_logger(__name__)
class ForgejoClientError(Exception):
"""Base exception for Forgejo API client errors."""
class ForgejoAPIClient:
"""HTTP client for Forgejo REST API calls."""
def __init__(
self,
base_url: str,
token: str | None = None,
timeout_connect: float = 5.0,
timeout_read: float = 30.0,
) -> None:
"""Initialize the Forgejo API client."""
self.base_url = base_url.rstrip("/")
self.token = token
self.timeout_connect = timeout_connect
self.timeout_read = timeout_read
self._client: httpx.AsyncClient | None = None
async def __aenter__(self) -> ForgejoAPIClient:
"""Enter async context manager."""
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=self._build_headers(),
timeout=httpx.Timeout(
connect=self.timeout_connect,
read=self.timeout_read,
write=10.0,
pool=5.0,
),
)
return self
async def __aexit__(self, *args: object) -> None:
"""Exit async context manager."""
if self._client:
await self._client.aclose()
self._client = None
def _build_headers(self) -> dict[str, str]:
"""Build request headers including auth and User-Agent."""
headers = {
"User-Agent": "Pipeline/ForgejoClient/1.0",
"Accept": "application/json",
}
if self.token:
headers["Authorization"] = f"token {self.token}"
return headers
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create the HTTP client."""
if self._client is None:
raise RuntimeError("ForgejoAPIClient must be used as async context manager")
return self._client
async def list_issues(
self,
owner: str,
repo: str,
state: str = "open",
page: int = 1,
limit: int = 30,
) -> dict[str, object]:
"""
List issues for a repository (excluding pull requests).
Args:
owner: Repository owner
repo: Repository name
state: Issue state (open, closed, all)
page: Page number
limit: Items per page
Returns:
API response as dict
"""
client = await self._get_client()
params = {
"state": state,
"page": page,
"per_page": limit,
"type": "issues", # Exclude pull requests
}
response = await client.get(f"/repos/{owner}/{repo}/issues", params=params)
response.raise_for_status()
return response.json()
async def close_issue(
self,
owner: str,
repo: str,
issue_number: int,
) -> dict[str, object]:
"""
Close a specific issue.
Args:
owner: Repository owner
repo: Repository name
issue_number: Issue number to close
Returns:
Updated issue data as dict
"""
client = await self._get_client()
payload = {"state": "closed"}
response = await client.patch(
f"/repos/{owner}/{repo}/issues/{issue_number}",
json=payload,
)
response.raise_for_status()
return response.json()
async def get_repository(
self,
owner: str,
repo: str,
) -> dict[str, object]:
"""
Get repository metadata.
Args:
owner: Repository owner
repo: Repository name
Returns:
Repository data as dict
"""
client = await self._get_client()
response = await client.get(f"/repos/{owner}/{repo}")
response.raise_for_status()
return response.json()
async def get_forgejo_client(
connection: object,
) -> ForgejoAPIClient:
"""
Factory function to create a ForgejoAPIClient from a connection object.
Args:
connection: ForgejoConnection object with base_url and token
Returns:
Configured ForgejoAPIClient instance
"""
base_url = connection.base_url.rstrip("/")
# Remove /api/v1 if present to get base URL
if "/api/v1" in base_url:
import re
match = re.match(r"(https?://[^/]+)", base_url)
if match:
base_url = match.group(1).rstrip("/")
return ForgejoAPIClient(
base_url=base_url,
token=getattr(connection, "token", None),
)