Pipeline/backend/app/services/forgejo_client.py

432 lines
14 KiB
Python

"""Forgejo API client service for making REST API calls."""
from __future__ import annotations
import httpx
from app.core.logging import get_logger
logger = get_logger(__name__)
class ForgejoClientError(Exception):
"""Base exception for Forgejo API client errors."""
class ForgejoAPIClient:
"""HTTP client for Forgejo REST API calls."""
def __init__(
self,
base_url: str,
token: str | None = None,
timeout_connect: float = 5.0,
timeout_read: float = 30.0,
) -> None:
"""Initialize the Forgejo API client."""
self.base_url = base_url.rstrip("/")
self.token = token
self.timeout_connect = timeout_connect
self.timeout_read = timeout_read
self._client: httpx.AsyncClient | None = None
async def __aenter__(self) -> ForgejoAPIClient:
"""Enter async context manager."""
self._client = httpx.AsyncClient(
base_url=self.base_url,
headers=self._build_headers(),
timeout=httpx.Timeout(
connect=self.timeout_connect,
read=self.timeout_read,
write=10.0,
pool=5.0,
),
)
return self
async def __aexit__(self, *args: object) -> None:
"""Exit async context manager."""
if self._client:
await self._client.aclose()
self._client = None
def _build_headers(self) -> dict[str, str]:
"""Build request headers including auth and User-Agent."""
headers = {
"User-Agent": "Pipeline/ForgejoClient/1.0",
"Accept": "application/json",
}
if self.token:
headers["Authorization"] = f"token {self.token}"
return headers
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create the HTTP client."""
if self._client is None:
raise RuntimeError("ForgejoAPIClient must be used as async context manager")
return self._client
async def get_user(self) -> dict[str, object]:
"""Get authenticated user info — useful for connection validation."""
client = await self._get_client()
response = await client.get("/api/v1/user")
response.raise_for_status()
return response.json()
async def list_issues(
self,
owner: str,
repo: str,
state: str = "open",
page: int = 1,
limit: int = 30,
) -> dict[str, object]:
"""
List issues for a repository (excluding pull requests).
Args:
owner: Repository owner
repo: Repository name
state: Issue state (open, closed, all)
page: Page number
limit: Items per page
Returns:
API response as dict
"""
client = await self._get_client()
params = {
"state": state,
"page": page,
"per_page": limit,
"type": "issues", # Exclude pull requests
}
response = await client.get(f"/api/v1/repos/{owner}/{repo}/issues", params=params)
response.raise_for_status()
return response.json()
async def close_issue(
self,
owner: str,
repo: str,
issue_number: int,
) -> dict[str, object]:
"""
Close a specific issue.
Args:
owner: Repository owner
repo: Repository name
issue_number: Issue number to close
Returns:
Updated issue data as dict
"""
client = await self._get_client()
payload = {"state": "closed"}
response = await client.patch(
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}",
json=payload,
)
response.raise_for_status()
return response.json()
async def get_issue(
self,
owner: str,
repo: str,
issue_number: int,
) -> dict[str, object]:
"""Get full details for a single issue."""
client = await self._get_client()
response = await client.get(f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}")
response.raise_for_status()
return response.json()
async def get_repository(
self,
owner: str,
repo: str,
) -> dict[str, object]:
"""
Get repository metadata.
Args:
owner: Repository owner
repo: Repository name
Returns:
Repository data as dict
"""
client = await self._get_client()
response = await client.get(f"/api/v1/repos/{owner}/{repo}")
response.raise_for_status()
return response.json()
async def list_labels(
self,
owner: str,
repo: str,
limit: int = 50,
) -> list[dict[str, object]]:
"""
List all labels defined on a repository.
Args:
owner: Repository owner
repo: Repository name
limit: Max labels to fetch per page
Returns:
List of label dicts with id, name, color, description.
"""
client = await self._get_client()
response = await client.get(
f"/api/v1/repos/{owner}/{repo}/labels",
params={"limit": limit, "page": 1},
)
response.raise_for_status()
data = response.json()
return list(data) if isinstance(data, list) else []
async def list_issue_comments(
self,
owner: str,
repo: str,
issue_number: int,
limit: int = 50,
) -> list[dict[str, object]]:
"""List all comments for an issue."""
client = await self._get_client()
comments: list[dict[str, object]] = []
page = 1
while True:
response = await client.get(
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/comments",
params={"limit": limit, "page": page},
)
response.raise_for_status()
data = response.json()
page_items = data if isinstance(data, list) else []
comments.extend(item for item in page_items if isinstance(item, dict))
if len(page_items) < limit:
break
page += 1
return comments
async def list_issue_timeline(
self,
owner: str,
repo: str,
issue_number: int,
limit: int = 50,
) -> list[dict[str, object]]:
"""List timeline events for an issue."""
client = await self._get_client()
timeline: list[dict[str, object]] = []
page = 1
while True:
response = await client.get(
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/timeline",
params={"limit": limit, "page": page},
)
response.raise_for_status()
data = response.json()
page_items = data if isinstance(data, list) else []
timeline.extend(item for item in page_items if isinstance(item, dict))
if len(page_items) < limit:
break
page += 1
return timeline
async def list_issue_reactions(
self,
owner: str,
repo: str,
issue_number: int,
limit: int = 50,
) -> list[dict[str, object]]:
"""List reactions for an issue."""
client = await self._get_client()
reactions: list[dict[str, object]] = []
page = 1
while True:
response = await client.get(
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/reactions",
params={"limit": limit, "page": page},
)
response.raise_for_status()
data = response.json()
page_items = data if isinstance(data, list) else []
reactions.extend(item for item in page_items if isinstance(item, dict))
if len(page_items) < limit:
break
page += 1
return reactions
async def create_issue(
self,
owner: str,
repo: str,
title: str,
body: str,
labels: list[int] | None = None,
) -> dict[str, object]:
"""Open a new issue on a repository."""
client = await self._get_client()
payload: dict[str, object] = {"title": title, "body": body}
if labels:
payload["labels"] = labels
response = await client.post(
f"/api/v1/repos/{owner}/{repo}/issues",
json=payload,
)
response.raise_for_status()
return response.json()
async def create_comment(
self,
owner: str,
repo: str,
issue_number: int,
body: str,
) -> dict[str, object]:
"""Post a new comment on an issue."""
client = await self._get_client()
response = await client.post(
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/comments",
json={"body": body},
)
response.raise_for_status()
return response.json()
async def edit_issue(
self,
owner: str,
repo: str,
issue_number: int,
*,
title: str | None = None,
body: str | None = None,
state: str | None = None,
) -> dict[str, object]:
"""Edit an issue's title, body, and/or state."""
payload: dict[str, object] = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if state is not None:
payload["state"] = state
client = await self._get_client()
response = await client.patch(
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}",
json=payload,
)
response.raise_for_status()
return response.json()
async def list_user_repos(
self,
limit: int = 50,
page: int = 1,
) -> list[dict[str, object]]:
"""
List repositories accessible to the authenticated token.
Uses /api/v1/repos/search which returns repos the token can access,
including private repos and org repos.
Returns:
List of repository dicts with name, full_name, owner, default_branch, etc.
"""
client = await self._get_client()
response = await client.get("/api/v1/repos/search", params={"limit": limit, "page": page})
response.raise_for_status()
data = response.json()
# Forgejo /repos/search returns {"data": [...], "ok": true}
if isinstance(data, dict):
return list(data.get("data", []))
return list(data)
async def get_last_commit(
self, owner: str, repo: str, branch: str | None = None
) -> dict | None:
"""Return the most-recent commit on the default (or given) branch, or None."""
client = await self._get_client()
params: dict[str, object] = {"limit": 1}
if branch:
params["sha"] = branch
response = await client.get(
f"/api/v1/repos/{owner}/{repo}/commits", params=params
)
if response.status_code in (404, 409):
return None # empty repo or not found
response.raise_for_status()
data = response.json()
commits = data if isinstance(data, list) else data.get("commits") or data.get("data") or []
return commits[0] if commits else None
async def get_commit_line_stats_since(
self, owner: str, repo: str, since_iso: str
) -> tuple[int, int]:
"""Sum additions and deletions across all commits since ``since_iso``.
Uses ``GET /repos/{owner}/{repo}/commits?since=…&stat=true`` which
returns per-commit ``stats`` objects and is available on all Forgejo/
Gitea versions. Returns ``(total_additions, total_deletions)``.
"""
client = await self._get_client()
total_adds = total_dels = 0
page = 1
# Cap at 5 pages (250 commits) — prevents unbounded pagination on
# very active repos where each page requires Forgejo to compute
# per-commit diffs (?stat=true), which is the primary latency driver.
while page <= 5:
response = await client.get(
f"/api/v1/repos/{owner}/{repo}/commits",
params={"since": since_iso, "limit": 50, "page": page, "stat": "true"},
)
if response.status_code in (404, 409):
break # empty or non-existent repo
response.raise_for_status()
data = response.json()
commits = data if isinstance(data, list) else []
if not commits:
break
for commit in commits:
s = commit.get("stats") or {}
total_adds += int(s.get("additions") or 0)
total_dels += int(s.get("deletions") or 0)
if len(commits) < 50:
break # last page
page += 1
return total_adds, total_dels
def get_forgejo_client(
connection: object,
) -> ForgejoAPIClient:
"""
Factory function to create a ForgejoAPIClient from a connection object.
Args:
connection: ForgejoConnection object with base_url and token
Returns:
Configured ForgejoAPIClient instance
"""
base_url = connection.base_url.rstrip("/")
# Remove /api/v1 if present to get base URL
if "/api/v1" in base_url:
import re
match = re.match(r"(https?://[^/]+)", base_url)
if match:
base_url = match.group(1).rstrip("/")
return ForgejoAPIClient(
base_url=base_url,
token=getattr(connection, "token", None),
)