Pipeline/backend/app/services/forgejo_client.py

233 lines
6.6 KiB
Python

"""Forgejo API client service for making REST API calls."""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
import httpx
from app.core.logging import get_logger
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
logger = get_logger(__name__)
class ForgejoClientError(Exception):
"""Base exception for Forgejo API client errors."""
class ForgejoAPIClient:
"""HTTP client for Forgejo REST API calls."""
def __init__(
self,
base_url: str,
token: str | None = None,
timeout_connect: float = 5.0,
timeout_read: float = 30.0,
) -> None:
"""Initialize the Forgejo API client."""
self.base_url = base_url.rstrip("/")
self.token = token
self.timeout_connect = timeout_connect
self.timeout_read = timeout_read
self._client: httpx.AsyncClient | None = None
async def __aenter__(self) -> ForgejoAPIClient:
"""Enter async context manager."""
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=self._build_headers(),
timeout=httpx.Timeout(
connect=self.timeout_connect,
read=self.timeout_read,
write=10.0,
pool=5.0,
),
)
return self
async def __aexit__(self, *args: object) -> None:
"""Exit async context manager."""
if self._client:
await self._client.aclose()
self._client = None
def _build_headers(self) -> dict[str, str]:
"""Build request headers including auth and User-Agent."""
headers = {
"User-Agent": "Pipeline/ForgejoClient/1.0",
"Accept": "application/json",
}
if self.token:
headers["Authorization"] = f"token {self.token}"
return headers
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create the HTTP client."""
if self._client is None:
raise RuntimeError("ForgejoAPIClient must be used as async context manager")
return self._client
async def get_user(self) -> dict[str, object]:
"""Get authenticated user info — useful for connection validation."""
client = await self._get_client()
response = await client.get("/api/v1/user")
response.raise_for_status()
return response.json()
async def list_issues(
self,
owner: str,
repo: str,
state: str = "open",
page: int = 1,
limit: int = 30,
) -> dict[str, object]:
"""
List issues for a repository (excluding pull requests).
Args:
owner: Repository owner
repo: Repository name
state: Issue state (open, closed, all)
page: Page number
limit: Items per page
Returns:
API response as dict
"""
client = await self._get_client()
params = {
"state": state,
"page": page,
"per_page": limit,
"type": "issues", # Exclude pull requests
}
response = await client.get(f"/api/v1/repos/{owner}/{repo}/issues", params=params)
response.raise_for_status()
return response.json()
async def close_issue(
self,
owner: str,
repo: str,
issue_number: int,
) -> dict[str, object]:
"""
Close a specific issue.
Args:
owner: Repository owner
repo: Repository name
issue_number: Issue number to close
Returns:
Updated issue data as dict
"""
client = await self._get_client()
payload = {"state": "closed"}
response = await client.patch(
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}",
json=payload,
)
response.raise_for_status()
return response.json()
async def get_repository(
self,
owner: str,
repo: str,
) -> dict[str, object]:
"""
Get repository metadata.
Args:
owner: Repository owner
repo: Repository name
Returns:
Repository data as dict
"""
client = await self._get_client()
response = await client.get(f"/api/v1/repos/{owner}/{repo}")
response.raise_for_status()
return response.json()
async def list_labels(
self,
owner: str,
repo: str,
limit: int = 50,
) -> list[dict[str, object]]:
"""
List all labels defined on a repository.
Args:
owner: Repository owner
repo: Repository name
limit: Max labels to fetch per page
Returns:
List of label dicts with id, name, color, description.
"""
client = await self._get_client()
response = await client.get(
f"/api/v1/repos/{owner}/{repo}/labels",
params={"limit": limit, "page": 1},
)
response.raise_for_status()
data = response.json()
return list(data) if isinstance(data, list) else []
async def list_user_repos(
self,
limit: int = 50,
page: int = 1,
) -> list[dict[str, object]]:
"""
List repositories accessible to the authenticated token.
Uses /api/v1/repos/search which returns repos the token can access,
including private repos and org repos.
Returns:
List of repository dicts with name, full_name, owner, default_branch, etc.
"""
client = await self._get_client()
response = await client.get("/api/v1/repos/search", params={"limit": limit, "page": page})
response.raise_for_status()
data = response.json()
# Forgejo /repos/search returns {"data": [...], "ok": true}
if isinstance(data, dict):
return list(data.get("data", []))
return list(data)
def get_forgejo_client(
connection: object,
) -> ForgejoAPIClient:
"""
Factory function to create a ForgejoAPIClient from a connection object.
Args:
connection: ForgejoConnection object with base_url and token
Returns:
Configured ForgejoAPIClient instance
"""
base_url = connection.base_url.rstrip("/")
# Remove /api/v1 if present to get base URL
if "/api/v1" in base_url:
import re
match = re.match(r"(https?://[^/]+)", base_url)
if match:
base_url = match.group(1).rstrip("/")
return ForgejoAPIClient(
base_url=base_url,
token=getattr(connection, "token", None),
)