445 lines
14 KiB
Python
445 lines
14 KiB
Python
"""Forgejo API client service for making REST API calls."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import httpx
|
|
|
|
from app.core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class ForgejoClientError(Exception):
|
|
"""Base exception for Forgejo API client errors."""
|
|
|
|
|
|
class ForgejoAPIClient:
|
|
"""HTTP client for Forgejo REST API calls."""
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: str,
|
|
token: str | None = None,
|
|
timeout_connect: float = 5.0,
|
|
timeout_read: float = 30.0,
|
|
) -> None:
|
|
"""Initialize the Forgejo API client."""
|
|
self.base_url = base_url.rstrip("/")
|
|
self.token = token
|
|
self.timeout_connect = timeout_connect
|
|
self.timeout_read = timeout_read
|
|
self._client: httpx.AsyncClient | None = None
|
|
|
|
async def __aenter__(self) -> ForgejoAPIClient:
|
|
"""Enter async context manager."""
|
|
self._client = httpx.AsyncClient(
|
|
base_url=self.base_url,
|
|
headers=self._build_headers(),
|
|
timeout=httpx.Timeout(
|
|
connect=self.timeout_connect,
|
|
read=self.timeout_read,
|
|
write=10.0,
|
|
pool=5.0,
|
|
),
|
|
)
|
|
return self
|
|
|
|
async def __aexit__(self, *args: object) -> None:
|
|
"""Exit async context manager."""
|
|
if self._client:
|
|
await self._client.aclose()
|
|
self._client = None
|
|
|
|
def _build_headers(self) -> dict[str, str]:
|
|
"""Build request headers including auth and User-Agent."""
|
|
headers = {
|
|
"User-Agent": "Pipeline/ForgejoClient/1.0",
|
|
"Accept": "application/json",
|
|
}
|
|
if self.token:
|
|
headers["Authorization"] = f"token {self.token}"
|
|
return headers
|
|
|
|
async def _get_client(self) -> httpx.AsyncClient:
|
|
"""Get or create the HTTP client."""
|
|
if self._client is None:
|
|
raise RuntimeError("ForgejoAPIClient must be used as async context manager")
|
|
return self._client
|
|
|
|
async def get_user(self) -> dict[str, object]:
|
|
"""Get authenticated user info — useful for connection validation."""
|
|
client = await self._get_client()
|
|
response = await client.get("/api/v1/user")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def list_issues(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
state: str = "open",
|
|
page: int = 1,
|
|
limit: int = 30,
|
|
since: str | None = None,
|
|
) -> dict[str, object]:
|
|
"""
|
|
List issues for a repository (excluding pull requests).
|
|
|
|
Args:
|
|
owner: Repository owner
|
|
repo: Repository name
|
|
state: Issue state (open, closed, all)
|
|
page: Page number
|
|
limit: Items per page
|
|
since: RFC 3339 timestamp; only return issues updated at or after this time
|
|
|
|
Returns:
|
|
API response as dict
|
|
"""
|
|
client = await self._get_client()
|
|
params: dict[str, object] = {
|
|
"state": state,
|
|
"page": page,
|
|
"per_page": limit,
|
|
"type": "issues", # Exclude pull requests
|
|
}
|
|
if since:
|
|
params["since"] = since
|
|
response = await client.get(f"/api/v1/repos/{owner}/{repo}/issues", params=params)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def close_issue(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
issue_number: int,
|
|
) -> dict[str, object]:
|
|
"""
|
|
Close a specific issue.
|
|
|
|
Args:
|
|
owner: Repository owner
|
|
repo: Repository name
|
|
issue_number: Issue number to close
|
|
|
|
Returns:
|
|
Updated issue data as dict
|
|
"""
|
|
client = await self._get_client()
|
|
payload = {"state": "closed"}
|
|
response = await client.patch(
|
|
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}",
|
|
json=payload,
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def get_issue(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
issue_number: int,
|
|
) -> dict[str, object]:
|
|
"""Get full details for a single issue."""
|
|
client = await self._get_client()
|
|
response = await client.get(f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def get_repository(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
) -> dict[str, object]:
|
|
"""
|
|
Get repository metadata.
|
|
|
|
Args:
|
|
owner: Repository owner
|
|
repo: Repository name
|
|
|
|
Returns:
|
|
Repository data as dict
|
|
"""
|
|
client = await self._get_client()
|
|
response = await client.get(f"/api/v1/repos/{owner}/{repo}")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def list_labels(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
limit: int = 50,
|
|
) -> list[dict[str, object]]:
|
|
"""
|
|
List all labels defined on a repository.
|
|
|
|
Args:
|
|
owner: Repository owner
|
|
repo: Repository name
|
|
limit: Max labels to fetch per page
|
|
|
|
Returns:
|
|
List of label dicts with id, name, color, description.
|
|
"""
|
|
client = await self._get_client()
|
|
response = await client.get(
|
|
f"/api/v1/repos/{owner}/{repo}/labels",
|
|
params={"limit": limit, "page": 1},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return list(data) if isinstance(data, list) else []
|
|
|
|
async def list_issue_comments(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
issue_number: int,
|
|
limit: int = 50,
|
|
) -> list[dict[str, object]]:
|
|
"""List all comments for an issue."""
|
|
client = await self._get_client()
|
|
comments: list[dict[str, object]] = []
|
|
page = 1
|
|
while True:
|
|
response = await client.get(
|
|
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/comments",
|
|
params={"limit": limit, "page": page},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
page_items = data if isinstance(data, list) else []
|
|
comments.extend(item for item in page_items if isinstance(item, dict))
|
|
if len(page_items) < limit:
|
|
break
|
|
page += 1
|
|
return comments
|
|
|
|
async def list_issue_timeline(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
issue_number: int,
|
|
limit: int = 50,
|
|
) -> list[dict[str, object]]:
|
|
"""List timeline events for an issue."""
|
|
client = await self._get_client()
|
|
timeline: list[dict[str, object]] = []
|
|
page = 1
|
|
while True:
|
|
response = await client.get(
|
|
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/timeline",
|
|
params={"limit": limit, "page": page},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
page_items = data if isinstance(data, list) else []
|
|
timeline.extend(item for item in page_items if isinstance(item, dict))
|
|
if len(page_items) < limit:
|
|
break
|
|
page += 1
|
|
return timeline
|
|
|
|
async def list_issue_reactions(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
issue_number: int,
|
|
limit: int = 50,
|
|
) -> list[dict[str, object]]:
|
|
"""List reactions for an issue."""
|
|
client = await self._get_client()
|
|
reactions: list[dict[str, object]] = []
|
|
page = 1
|
|
while True:
|
|
response = await client.get(
|
|
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/reactions",
|
|
params={"limit": limit, "page": page},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
page_items = data if isinstance(data, list) else []
|
|
reactions.extend(item for item in page_items if isinstance(item, dict))
|
|
if len(page_items) < limit:
|
|
break
|
|
page += 1
|
|
return reactions
|
|
|
|
async def create_issue(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
title: str,
|
|
body: str,
|
|
labels: list[int] | None = None,
|
|
) -> dict[str, object]:
|
|
"""Open a new issue on a repository."""
|
|
client = await self._get_client()
|
|
payload: dict[str, object] = {"title": title, "body": body}
|
|
if labels:
|
|
payload["labels"] = labels
|
|
response = await client.post(
|
|
f"/api/v1/repos/{owner}/{repo}/issues",
|
|
json=payload,
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def create_comment(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
issue_number: int,
|
|
body: str,
|
|
) -> dict[str, object]:
|
|
"""Post a new comment on an issue."""
|
|
client = await self._get_client()
|
|
response = await client.post(
|
|
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/comments",
|
|
json={"body": body},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def edit_issue(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
issue_number: int,
|
|
*,
|
|
title: str | None = None,
|
|
body: str | None = None,
|
|
state: str | None = None,
|
|
) -> dict[str, object]:
|
|
"""Edit an issue's title, body, and/or state."""
|
|
payload: dict[str, object] = {}
|
|
if title is not None:
|
|
payload["title"] = title
|
|
if body is not None:
|
|
payload["body"] = body
|
|
if state is not None:
|
|
payload["state"] = state
|
|
client = await self._get_client()
|
|
response = await client.patch(
|
|
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}",
|
|
json=payload,
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def list_user_repos(
|
|
self,
|
|
limit: int = 50,
|
|
page: int = 1,
|
|
) -> list[dict[str, object]]:
|
|
"""
|
|
List repositories accessible to the authenticated token.
|
|
|
|
Uses /api/v1/repos/search which returns repos the token can access,
|
|
including private repos and org repos.
|
|
|
|
Returns:
|
|
List of repository dicts with name, full_name, owner, default_branch, etc.
|
|
"""
|
|
client = await self._get_client()
|
|
response = await client.get("/api/v1/repos/search", params={"limit": limit, "page": page})
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
# Forgejo /repos/search returns {"data": [...], "ok": true}
|
|
if isinstance(data, dict):
|
|
return list(data.get("data", []))
|
|
return list(data)
|
|
|
|
|
|
async def get_last_commit(
|
|
self, owner: str, repo: str, branch: str | None = None
|
|
) -> dict | None:
|
|
"""Return the most-recent commit on the default (or given) branch, or None."""
|
|
client = await self._get_client()
|
|
params: dict[str, object] = {"limit": 1}
|
|
if branch:
|
|
params["sha"] = branch
|
|
response = await client.get(
|
|
f"/api/v1/repos/{owner}/{repo}/commits", params=params
|
|
)
|
|
if response.status_code in (404, 409):
|
|
return None # empty repo or not found
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
commits = data if isinstance(data, list) else data.get("commits") or data.get("data") or []
|
|
return commits[0] if commits else None
|
|
|
|
async def get_commit_line_stats_since(
|
|
self, owner: str, repo: str, since_iso: str
|
|
) -> tuple[int, int, dict[str, int]]:
|
|
"""Sum additions/deletions and count commits per day since ``since_iso``.
|
|
|
|
Uses ``GET /repos/{owner}/{repo}/commits?since=…&stat=true`` which
|
|
returns per-commit ``stats`` objects and is available on all Forgejo/
|
|
Gitea versions. Returns ``(total_additions, total_deletions, day_counts)``
|
|
where ``day_counts`` maps "YYYY-MM-DD" → commit count.
|
|
"""
|
|
client = await self._get_client()
|
|
total_adds = total_dels = 0
|
|
day_counts: dict[str, int] = {}
|
|
page = 1
|
|
# Cap at 5 pages (250 commits) — prevents unbounded pagination on
|
|
# very active repos where each page requires Forgejo to compute
|
|
# per-commit diffs (?stat=true), which is the primary latency driver.
|
|
while page <= 5:
|
|
response = await client.get(
|
|
f"/api/v1/repos/{owner}/{repo}/commits",
|
|
params={"since": since_iso, "limit": 50, "page": page, "stat": "true"},
|
|
)
|
|
if response.status_code in (404, 409):
|
|
break # empty or non-existent repo
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
commits = data if isinstance(data, list) else []
|
|
if not commits:
|
|
break
|
|
for commit in commits:
|
|
s = commit.get("stats") or {}
|
|
total_adds += int(s.get("additions") or 0)
|
|
total_dels += int(s.get("deletions") or 0)
|
|
# Extract commit date for per-day activity counting
|
|
commit_obj = commit.get("commit") or {}
|
|
author_obj = commit_obj.get("author") or {}
|
|
date_str: str = author_obj.get("date") or commit.get("created") or ""
|
|
if date_str and len(date_str) >= 10:
|
|
day = date_str[:10]
|
|
day_counts[day] = day_counts.get(day, 0) + 1
|
|
if len(commits) < 50:
|
|
break # last page
|
|
page += 1
|
|
return total_adds, total_dels, day_counts
|
|
|
|
|
|
def get_forgejo_client(
|
|
connection: object,
|
|
) -> ForgejoAPIClient:
|
|
"""
|
|
Factory function to create a ForgejoAPIClient from a connection object.
|
|
|
|
Args:
|
|
connection: ForgejoConnection object with base_url and token
|
|
|
|
Returns:
|
|
Configured ForgejoAPIClient instance
|
|
"""
|
|
base_url = connection.base_url.rstrip("/")
|
|
# Remove /api/v1 if present to get base URL
|
|
if "/api/v1" in base_url:
|
|
import re
|
|
|
|
match = re.match(r"(https?://[^/]+)", base_url)
|
|
if match:
|
|
base_url = match.group(1).rstrip("/")
|
|
return ForgejoAPIClient(
|
|
base_url=base_url,
|
|
token=getattr(connection, "token", None),
|
|
)
|