Pipeline/backend/app/services/forgejo_issue_sync.py

633 lines
26 KiB
Python
Raw Permalink Normal View History

"""Forgejo issue sync service for pulling issues from remote repositories."""
from __future__ import annotations
from datetime import datetime, timedelta
from uuid import UUID
from sqlmodel import select
from app.core.logging import get_logger
from app.core.time import utcnow
from app.db import crud
from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
2026-05-21 22:47:24 -05:00
from app.services.forgejo_client import get_forgejo_client
logger = get_logger(__name__)
2026-05-21 22:53:02 -05:00
def _as_dict(value: object) -> dict[str, object] | None:
return value if isinstance(value, dict) else None
def _as_dict_list(value: object) -> list[dict[str, object]]:
if not isinstance(value, list):
return []
return [item for item in value if isinstance(item, dict)]
def _author_login(issue_payload: dict[str, object]) -> str:
user = issue_payload.get("user")
if isinstance(user, dict):
login = user.get("login")
if isinstance(login, str):
return login
return ""
def _html_url(issue_payload: dict[str, object]) -> str:
value = issue_payload.get("html_url")
return value if isinstance(value, str) else ""
class IssueSyncService:
"""Service for syncing Forgejo issues from remote repositories."""
def __init__(self, session: object, organization_id: UUID) -> None:
self.session = session
self.organization_id = organization_id
async def sync_repository_issues(
self,
repository_id: UUID,
page: int = 1,
limit: int = 30,
) -> dict[str, int]:
"""Sync issues from a Forgejo repository."""
# Load repository
repository = await crud.get_by_id(self.session, ForgejoRepository, repository_id)
if repository is None:
raise ValueError(f"Repository {repository_id} not found or access denied")
if repository.organization_id != self.organization_id:
raise ValueError(f"Repository {repository_id} not found or access denied")
# Load connection separately (no ORM relationship)
connection = await crud.get_by_id(self.session, ForgejoConnection, repository.connection_id)
if connection is None:
raise ValueError("Repository has no connection")
# Fetch issues from remote
created = 0
updated_count = 0
open_count = 0
closed_count = 0
current_page = page
while True:
async with get_forgejo_client(connection) as client:
response = await client.list_issues(
owner=repository.owner,
repo=repository.repo,
state="all",
page=current_page,
limit=limit,
)
# Forgejo returns issues as a JSON array, not wrapped in "items"
issues = (
response
if isinstance(response, list)
else response.get("items", response.get("data", []))
)
if not isinstance(issues, list) or len(issues) == 0:
break
# Batch-lookup existing issues for this page (one query vs N serial)
page_numbers = [
int(i.get("number", 0))
for i in issues
if isinstance(i, dict) and i.get("pull_request") is None and i.get("number")
]
existing_map = await self._find_issues_batch(repository_id, page_numbers)
for issue_data in issues:
# Skip pull requests
if issue_data.get("pull_request") is not None:
continue
raw_number = issue_data.get("number", 0)
try:
forgejo_number = int(raw_number)
except (TypeError, ValueError):
forgejo_number = 0
state = issue_data.get("state", "open")
issue_payload = dict(issue_data)
comments_payload: list[dict[str, object]] = []
timeline_payload: list[dict[str, object]] = []
reactions_payload: list[dict[str, object]] = []
# Enrich each issue with full detail and exhaustive nested data.
try:
full_issue = await client.get_issue(
2026-05-21 22:53:02 -05:00
owner=repository.owner,
repo=repository.repo,
issue_number=forgejo_number,
)
maybe_full = _as_dict(full_issue)
if maybe_full is not None:
issue_payload = maybe_full
except Exception as exc:
logger.warning(
"issue_detail_sync_failed",
extra={
"repository_id": str(repository_id),
"issue_number": forgejo_number,
"error": str(exc),
},
2026-05-21 22:53:02 -05:00
)
try:
comments_payload = _as_dict_list(
await client.list_issue_comments(
owner=repository.owner,
repo=repository.repo,
issue_number=forgejo_number,
)
2026-05-21 22:53:02 -05:00
)
except Exception as exc:
logger.warning(
"issue_comments_sync_failed",
extra={
"repository_id": str(repository_id),
"issue_number": forgejo_number,
"error": str(exc),
},
)
try:
timeline_payload = _as_dict_list(
await client.list_issue_timeline(
owner=repository.owner,
repo=repository.repo,
issue_number=forgejo_number,
)
)
except Exception as exc:
logger.warning(
"issue_timeline_sync_failed",
extra={
"repository_id": str(repository_id),
"issue_number": forgejo_number,
"error": str(exc),
},
)
try:
reactions_payload = _as_dict_list(
await client.list_issue_reactions(
owner=repository.owner,
repo=repository.repo,
issue_number=forgejo_number,
)
)
except Exception as exc:
logger.warning(
"issue_reactions_sync_failed",
extra={
"repository_id": str(repository_id),
"issue_number": forgejo_number,
"error": str(exc),
},
)
# Parse labels
labels_data = []
for label in issue_payload.get("labels") or []:
labels_data.append(
{
"id": label.get("id"),
"name": label.get("name", ""),
"color": label.get("color", ""),
"description": label.get("description", ""),
}
)
# Parse assignees
assignees_data = []
for assignee in issue_payload.get("assignees") or []:
assignees_data.append(
{
"login": assignee.get("login", ""),
"id": assignee.get("id", 0),
"avatar_url": assignee.get("avatar_url", ""),
}
)
# Parse milestone
milestone_data = None
raw_milestone = issue_payload.get("milestone")
if raw_milestone and isinstance(raw_milestone, dict):
milestone_data = {
"id": raw_milestone.get("id"),
"title": raw_milestone.get("title", ""),
"state": raw_milestone.get("state", "open"),
"description": raw_milestone.get("description") or None,
"due_on": raw_milestone.get("due_on") or None,
"closed_at": raw_milestone.get("closed_at") or None,
2026-05-21 22:47:24 -05:00
}
# Full body and preview
raw_body = issue_payload.get("body") or ""
body_full = raw_body if raw_body else None
body_preview = raw_body[:1000] if raw_body else None
# Parse dates — required fields fall back to utcnow(), optional closed_at stays None
created_at = self._parse_iso_date(issue_payload.get("created_at")) or utcnow()
updated_at = self._parse_iso_date(issue_payload.get("updated_at")) or utcnow()
closed_at = self._parse_iso_date(issue_payload.get("closed_at"))
existing = existing_map.get(forgejo_number)
if existing is None:
issue = ForgejoIssue(
organization_id=self.organization_id,
repository_id=repository_id,
forgejo_issue_number=forgejo_number,
title=issue_data.get("title", ""),
body=body_full,
body_preview=body_preview,
state=state,
is_pull_request=False,
labels=labels_data,
assignees=assignees_data,
milestone=milestone_data,
forgejo_payload=issue_payload,
forgejo_comments_payload=comments_payload,
forgejo_timeline_payload=timeline_payload,
forgejo_reactions_payload=reactions_payload,
author=_author_login(issue_payload),
html_url=_html_url(issue_payload),
forgejo_created_at=created_at,
forgejo_updated_at=updated_at,
forgejo_closed_at=closed_at,
)
self.session.add(issue)
await self.session.flush()
created += 1
else:
existing.title = issue_data.get("title", "")
existing.body = body_full
existing.body_preview = body_preview
existing.state = state
existing.labels = labels_data
existing.assignees = assignees_data
existing.milestone = milestone_data
existing.forgejo_payload = issue_payload
existing.forgejo_comments_payload = comments_payload
existing.forgejo_timeline_payload = timeline_payload
existing.forgejo_reactions_payload = reactions_payload
existing.author = _author_login(issue_payload)
existing.html_url = _html_url(issue_payload)
existing.forgejo_created_at = created_at
existing.forgejo_updated_at = updated_at
existing.forgejo_closed_at = closed_at
existing.last_synced_at = utcnow()
await crud.save(self.session, existing)
updated_count += 1
if state == "open":
open_count += 1
elif state == "closed":
closed_count += 1
# If we got fewer than limit, we're done
if len(issues) < limit:
break
current_page += 1
2026-05-19 21:34:11 -05:00
# Sync repository label catalog
try:
async with get_forgejo_client(connection) as client:
fetched_labels = await client.list_labels(
owner=repository.owner,
repo=repository.repo,
)
repository.labels = [
{
"id": lbl.get("id"),
"name": lbl.get("name", ""),
"color": lbl.get("color", ""),
"description": lbl.get("description") or "",
}
for lbl in fetched_labels
if lbl.get("name")
]
except Exception as exc:
logger.warning(
"label_sync_failed",
2026-05-21 23:07:45 -05:00
extra={
"repository_id": str(repository_id),
"error": str(exc),
},
2026-05-19 21:34:11 -05:00
)
# Sync repository remote metadata (description, archived, topics, issue count)
try:
async with get_forgejo_client(connection) as client:
repo_meta = await client.get_repository(
owner=repository.owner,
repo=repository.repo,
)
repository.description = repo_meta.get("description") or None
repository.open_issues_count = int(repo_meta.get("open_issues_count") or 0)
repository.is_archived = bool(repo_meta.get("archived", False))
raw_topics = repo_meta.get("topics")
repository.topics = list(raw_topics) if isinstance(raw_topics, list) else []
except Exception as exc:
logger.warning(
"repo_metadata_sync_failed",
2026-05-21 23:07:45 -05:00
extra={
"repository_id": str(repository_id),
"error": str(exc),
},
)
# Close DB-open issues that Forgejo has since closed
stale_closed = await self._close_stale_opens(
repository_id, connection, repository, days=90
)
# Update repository sync metadata
repository.last_sync_at = utcnow()
repository.last_sync_error = None
await crud.save(self.session, repository)
return {
"created": created,
"updated": updated_count,
"stale_closed": stale_closed,
"open": open_count,
"closed": closed_count,
"total": created + updated_count,
}
async def sync_recent(
self,
repository_id: UUID,
days: int = 7,
limit: int = 50,
) -> dict[str, int]:
"""Lightweight sync: fetch only issues updated in the last `days` days.
Uses a single batch DB query per page (no per-issue lookups) and skips
enrichment calls (comments, timeline, reactions). After paging, closes
any DB-open issues that Forgejo has since closed outside the time window.
"""
repository = await crud.get_by_id(self.session, ForgejoRepository, repository_id)
if repository is None:
raise ValueError(f"Repository {repository_id} not found or access denied")
if repository.organization_id != self.organization_id:
raise ValueError(f"Repository {repository_id} not found or access denied")
connection = await crud.get_by_id(self.session, ForgejoConnection, repository.connection_id)
if connection is None:
raise ValueError("Repository has no connection")
since_dt = utcnow() - timedelta(days=days)
since_iso = since_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
created = 0
updated_count = 0
open_count = 0
closed_count = 0
page = 1
while True:
async with get_forgejo_client(connection) as client:
response = await client.list_issues(
owner=repository.owner,
repo=repository.repo,
state="all",
page=page,
limit=limit,
since=since_iso,
)
issues = (
response
if isinstance(response, list)
else response.get("items", response.get("data", []))
)
if not isinstance(issues, list) or not issues:
break
numbers = [
int(i.get("number", 0))
for i in issues
if isinstance(i, dict) and i.get("number") and i.get("pull_request") is None
]
existing_map = await self._find_issues_batch(repository_id, numbers)
for issue_data in issues:
if not isinstance(issue_data, dict):
continue
if issue_data.get("pull_request") is not None:
continue
raw_number = issue_data.get("number", 0)
try:
forgejo_number = int(raw_number)
except (TypeError, ValueError):
continue
state = issue_data.get("state", "open")
raw_body = issue_data.get("body") or ""
body_full = raw_body if raw_body else None
body_preview = raw_body[:1000] if raw_body else None
labels_data = []
for label in issue_data.get("labels") or []:
if isinstance(label, dict):
labels_data.append(
{
"id": label.get("id"),
"name": label.get("name", ""),
"color": label.get("color", ""),
"description": label.get("description", ""),
}
)
assignees_data = []
for assignee in issue_data.get("assignees") or []:
if isinstance(assignee, dict):
assignees_data.append(
{
"login": assignee.get("login", ""),
"id": assignee.get("id", 0),
"avatar_url": assignee.get("avatar_url", ""),
}
)
milestone_data = None
raw_milestone = issue_data.get("milestone")
if raw_milestone and isinstance(raw_milestone, dict):
milestone_data = {
"id": raw_milestone.get("id"),
"title": raw_milestone.get("title", ""),
"state": raw_milestone.get("state", "open"),
"description": raw_milestone.get("description") or None,
"due_on": raw_milestone.get("due_on") or None,
"closed_at": raw_milestone.get("closed_at") or None,
}
created_at = self._parse_iso_date(issue_data.get("created_at")) or utcnow()
updated_at = self._parse_iso_date(issue_data.get("updated_at")) or utcnow()
closed_at = self._parse_iso_date(issue_data.get("closed_at"))
existing = existing_map.get(forgejo_number)
if existing is None:
issue = ForgejoIssue(
organization_id=self.organization_id,
repository_id=repository_id,
forgejo_issue_number=forgejo_number,
title=issue_data.get("title", ""),
body=body_full,
body_preview=body_preview,
state=state,
is_pull_request=False,
labels=labels_data,
assignees=assignees_data,
milestone=milestone_data,
forgejo_payload=dict(issue_data),
author=_author_login(issue_data),
html_url=_html_url(issue_data),
forgejo_created_at=created_at,
forgejo_updated_at=updated_at,
forgejo_closed_at=closed_at,
)
self.session.add(issue)
await self.session.flush()
created += 1
else:
existing.title = issue_data.get("title", "")
existing.body = body_full
existing.body_preview = body_preview
existing.state = state
existing.labels = labels_data
existing.assignees = assignees_data
existing.milestone = milestone_data
existing.forgejo_payload = dict(issue_data)
existing.author = _author_login(issue_data)
existing.html_url = _html_url(issue_data)
existing.forgejo_created_at = created_at
existing.forgejo_updated_at = updated_at
existing.forgejo_closed_at = closed_at
existing.last_synced_at = utcnow()
await crud.save(self.session, existing)
updated_count += 1
if state == "open":
open_count += 1
elif state == "closed":
closed_count += 1
if len(issues) < limit:
break
page += 1
stale_closed = await self._close_stale_opens(
repository_id, connection, repository, days
)
repository.last_sync_at = utcnow()
repository.last_sync_error = None
await crud.save(self.session, repository)
return {
"created": created,
"updated": updated_count,
"stale_closed": stale_closed,
"open": open_count,
"closed": closed_count,
"total": created + updated_count,
}
async def _find_issues_batch(
self, repository_id: UUID, numbers: list[int]
) -> dict[int, ForgejoIssue]:
"""Batch lookup of issues by number — one query instead of N serial lookups."""
if not numbers:
return {}
statement = select(ForgejoIssue).where(
ForgejoIssue.repository_id == repository_id,
ForgejoIssue.forgejo_issue_number.in_(numbers),
)
results = await self.session.exec(statement)
return {issue.forgejo_issue_number: issue for issue in results.all()}
async def _close_stale_opens(
self,
repository_id: UUID,
connection: object,
repository: ForgejoRepository,
days: int,
) -> int:
"""Verify open DB issues not updated in `days` days; close any that Forgejo has closed."""
cutoff = utcnow() - timedelta(days=days)
statement = (
select(ForgejoIssue)
.where(
ForgejoIssue.repository_id == repository_id,
ForgejoIssue.state == "open",
ForgejoIssue.forgejo_updated_at < cutoff,
)
.limit(50)
)
stale = (await self.session.exec(statement)).all()
if not stale:
return 0
closed_count = 0
async with get_forgejo_client(connection) as client:
for issue in stale:
try:
remote = await client.get_issue(
owner=repository.owner,
repo=repository.repo,
issue_number=issue.forgejo_issue_number,
)
if isinstance(remote, dict) and remote.get("state") == "closed":
issue.state = "closed"
issue.forgejo_closed_at = self._parse_iso_date(remote.get("closed_at"))
issue.forgejo_updated_at = (
self._parse_iso_date(remote.get("updated_at")) or utcnow()
)
issue.last_synced_at = utcnow()
await crud.save(self.session, issue)
closed_count += 1
except Exception as exc:
logger.warning(
"stale_open_check_failed",
extra={
"repository_id": str(repository_id),
"issue_number": issue.forgejo_issue_number,
"error": str(exc),
},
)
return closed_count
2026-05-21 22:47:24 -05:00
async def _find_issue(
self, repository_id: UUID, forgejo_issue_number: int
) -> ForgejoIssue | None:
"""Find an existing cached issue by repository and number."""
statement = select(ForgejoIssue).where(
ForgejoIssue.repository_id == repository_id,
ForgejoIssue.forgejo_issue_number == forgejo_issue_number,
)
results = await self.session.exec(statement)
return results.first()
2026-05-20 01:44:31 -05:00
def _parse_iso_date(self, date_str: str | None) -> datetime | None:
"""Parse ISO format date string to datetime. Returns None for absent/empty values."""
if not date_str:
2026-05-20 01:44:31 -05:00
return None
try:
cleaned = date_str.replace("Z", "+00:00")
parsed = datetime.fromisoformat(cleaned)
return parsed.replace(tzinfo=None)
except (ValueError, AttributeError):
2026-05-21 22:47:24 -05:00
return None