From 922a386871de1177682ed394707368d4e55dc41a Mon Sep 17 00:00:00 2001 From: null Date: Sun, 24 May 2026 19:42:07 -0500 Subject: [PATCH] New POST /{repository_id}/sync/recent?days=7 endpoint. --- backend/app/api/forgejo_repositories.py | 37 ++- backend/app/services/forgejo_client.py | 6 +- backend/app/services/forgejo_issue_sync.py | 248 ++++++++++++++++++++- 3 files changed, 288 insertions(+), 3 deletions(-) diff --git a/backend/app/api/forgejo_repositories.py b/backend/app/api/forgejo_repositories.py index e5c6ba7..dc0c04b 100644 --- a/backend/app/api/forgejo_repositories.py +++ b/backend/app/api/forgejo_repositories.py @@ -5,7 +5,7 @@ from __future__ import annotations from typing import TYPE_CHECKING from uuid import UUID -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlmodel import select from app.api.deps import require_org_member @@ -328,6 +328,41 @@ async def sync_repository_issues( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) +@router.post( + "/{repository_id}/sync/recent", + summary="Light Sync Issues from Repository", + description="Fetch only issues updated in the last `days` days. Faster than full sync — uses batch DB lookups and skips per-issue enrichment calls.", +) +async def sync_repository_issues_recent( + repository_id: UUID, + days: int = Query(default=7, ge=1, le=90), + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> dict[str, int]: + """Light sync: fetch only issues updated in the last `days` days.""" + repository = await crud.get_by_id(session, ForgejoRepository, repository_id) + if repository is None or repository.organization_id != ctx.organization.id: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) + + from app.services.forgejo_issue_sync import IssueSyncService + from app.core.time import utcnow + + try: + sync_service = IssueSyncService(session=session, organization_id=ctx.organization.id) + result = await sync_service.sync_recent(repository_id=repository_id, days=days) + return result + except ValueError as e: + repository.last_sync_error = str(e) + repository.updated_at = utcnow() + await crud.save(session, repository) + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=str(e)) + except Exception as e: + repository.last_sync_error = str(e) + repository.updated_at = utcnow() + await crud.save(session, repository) + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) + + def _mask_repository(repository: ForgejoRepository, connection: ForgejoConnection | None = None) -> dict[str, object]: """Return repository dict with safe connection metadata.""" return { diff --git a/backend/app/services/forgejo_client.py b/backend/app/services/forgejo_client.py index 1265137..e7574a5 100644 --- a/backend/app/services/forgejo_client.py +++ b/backend/app/services/forgejo_client.py @@ -80,6 +80,7 @@ class ForgejoAPIClient: state: str = "open", page: int = 1, limit: int = 30, + since: str | None = None, ) -> dict[str, object]: """ List issues for a repository (excluding pull requests). @@ -90,17 +91,20 @@ class ForgejoAPIClient: state: Issue state (open, closed, all) page: Page number limit: Items per page + since: RFC 3339 timestamp; only return issues updated at or after this time Returns: API response as dict """ client = await self._get_client() - params = { + params: dict[str, object] = { "state": state, "page": page, "per_page": limit, "type": "issues", # Exclude pull requests } + if since: + params["since"] = since response = await client.get(f"/api/v1/repos/{owner}/{repo}/issues", params=params) response.raise_for_status() return response.json() diff --git a/backend/app/services/forgejo_issue_sync.py b/backend/app/services/forgejo_issue_sync.py index b677858..c3c086f 100644 --- a/backend/app/services/forgejo_issue_sync.py +++ b/backend/app/services/forgejo_issue_sync.py @@ -2,7 +2,7 @@ from __future__ import annotations -from datetime import datetime +from datetime import datetime, timedelta from uuid import UUID from sqlmodel import select @@ -350,6 +350,252 @@ class IssueSyncService: "total": created + updated_count, } + async def sync_recent( + self, + repository_id: UUID, + days: int = 7, + limit: int = 50, + ) -> dict[str, int]: + """Lightweight sync: fetch only issues updated in the last `days` days. + + Uses a single batch DB query per page (no per-issue lookups) and skips + enrichment calls (comments, timeline, reactions). After paging, closes + any DB-open issues that Forgejo has since closed outside the time window. + """ + repository = await crud.get_by_id(self.session, ForgejoRepository, repository_id) + if repository is None: + raise ValueError(f"Repository {repository_id} not found or access denied") + if repository.organization_id != self.organization_id: + raise ValueError(f"Repository {repository_id} not found or access denied") + + connection = await crud.get_by_id(self.session, ForgejoConnection, repository.connection_id) + if connection is None: + raise ValueError("Repository has no connection") + + since_dt = utcnow() - timedelta(days=days) + since_iso = since_dt.strftime("%Y-%m-%dT%H:%M:%SZ") + + created = 0 + updated_count = 0 + open_count = 0 + closed_count = 0 + page = 1 + + while True: + async with get_forgejo_client(connection) as client: + response = await client.list_issues( + owner=repository.owner, + repo=repository.repo, + state="all", + page=page, + limit=limit, + since=since_iso, + ) + + issues = ( + response + if isinstance(response, list) + else response.get("items", response.get("data", [])) + ) + if not isinstance(issues, list) or not issues: + break + + numbers = [ + int(i.get("number", 0)) + for i in issues + if isinstance(i, dict) and i.get("number") and i.get("pull_request") is None + ] + existing_map = await self._find_issues_batch(repository_id, numbers) + + for issue_data in issues: + if not isinstance(issue_data, dict): + continue + if issue_data.get("pull_request") is not None: + continue + + raw_number = issue_data.get("number", 0) + try: + forgejo_number = int(raw_number) + except (TypeError, ValueError): + continue + + state = issue_data.get("state", "open") + raw_body = issue_data.get("body") or "" + body_full = raw_body if raw_body else None + body_preview = raw_body[:1000] if raw_body else None + + labels_data = [] + for label in issue_data.get("labels") or []: + if isinstance(label, dict): + labels_data.append( + { + "id": label.get("id"), + "name": label.get("name", ""), + "color": label.get("color", ""), + "description": label.get("description", ""), + } + ) + + assignees_data = [] + for assignee in issue_data.get("assignees") or []: + if isinstance(assignee, dict): + assignees_data.append( + { + "login": assignee.get("login", ""), + "id": assignee.get("id", 0), + "avatar_url": assignee.get("avatar_url", ""), + } + ) + + milestone_data = None + raw_milestone = issue_data.get("milestone") + if raw_milestone and isinstance(raw_milestone, dict): + milestone_data = { + "id": raw_milestone.get("id"), + "title": raw_milestone.get("title", ""), + "state": raw_milestone.get("state", "open"), + "description": raw_milestone.get("description") or None, + "due_on": raw_milestone.get("due_on") or None, + "closed_at": raw_milestone.get("closed_at") or None, + } + + created_at = self._parse_iso_date(issue_data.get("created_at")) or utcnow() + updated_at = self._parse_iso_date(issue_data.get("updated_at")) or utcnow() + closed_at = self._parse_iso_date(issue_data.get("closed_at")) + + existing = existing_map.get(forgejo_number) + + if existing is None: + issue = ForgejoIssue( + organization_id=self.organization_id, + repository_id=repository_id, + forgejo_issue_number=forgejo_number, + title=issue_data.get("title", ""), + body=body_full, + body_preview=body_preview, + state=state, + is_pull_request=False, + labels=labels_data, + assignees=assignees_data, + milestone=milestone_data, + forgejo_payload=dict(issue_data), + author=_author_login(issue_data), + html_url=_html_url(issue_data), + forgejo_created_at=created_at, + forgejo_updated_at=updated_at, + forgejo_closed_at=closed_at, + ) + self.session.add(issue) + await self.session.flush() + created += 1 + else: + existing.title = issue_data.get("title", "") + existing.body = body_full + existing.body_preview = body_preview + existing.state = state + existing.labels = labels_data + existing.assignees = assignees_data + existing.milestone = milestone_data + existing.forgejo_payload = dict(issue_data) + existing.author = _author_login(issue_data) + existing.html_url = _html_url(issue_data) + existing.forgejo_created_at = created_at + existing.forgejo_updated_at = updated_at + existing.forgejo_closed_at = closed_at + existing.last_synced_at = utcnow() + await crud.save(self.session, existing) + updated_count += 1 + + if state == "open": + open_count += 1 + elif state == "closed": + closed_count += 1 + + if len(issues) < limit: + break + page += 1 + + stale_closed = await self._close_stale_opens( + repository_id, connection, repository, days + ) + + repository.last_sync_at = utcnow() + repository.last_sync_error = None + await crud.save(self.session, repository) + + return { + "created": created, + "updated": updated_count, + "stale_closed": stale_closed, + "open": open_count, + "closed": closed_count, + "total": created + updated_count, + } + + async def _find_issues_batch( + self, repository_id: UUID, numbers: list[int] + ) -> dict[int, ForgejoIssue]: + """Batch lookup of issues by number — one query instead of N serial lookups.""" + if not numbers: + return {} + statement = select(ForgejoIssue).where( + ForgejoIssue.repository_id == repository_id, + ForgejoIssue.forgejo_issue_number.in_(numbers), + ) + results = await self.session.exec(statement) + return {issue.forgejo_issue_number: issue for issue in results.all()} + + async def _close_stale_opens( + self, + repository_id: UUID, + connection: object, + repository: ForgejoRepository, + days: int, + ) -> int: + """Verify open DB issues not updated in `days` days; close any that Forgejo has closed.""" + cutoff = utcnow() - timedelta(days=days) + statement = ( + select(ForgejoIssue) + .where( + ForgejoIssue.repository_id == repository_id, + ForgejoIssue.state == "open", + ForgejoIssue.forgejo_updated_at < cutoff, + ) + .limit(50) + ) + stale = (await self.session.exec(statement)).all() + if not stale: + return 0 + + closed_count = 0 + async with get_forgejo_client(connection) as client: + for issue in stale: + try: + remote = await client.get_issue( + owner=repository.owner, + repo=repository.repo, + issue_number=issue.forgejo_issue_number, + ) + if isinstance(remote, dict) and remote.get("state") == "closed": + issue.state = "closed" + issue.forgejo_closed_at = self._parse_iso_date(remote.get("closed_at")) + issue.forgejo_updated_at = ( + self._parse_iso_date(remote.get("updated_at")) or utcnow() + ) + issue.last_synced_at = utcnow() + await crud.save(self.session, issue) + closed_count += 1 + except Exception as exc: + logger.warning( + "stale_open_check_failed", + extra={ + "repository_id": str(repository_id), + "issue_number": issue.forgejo_issue_number, + "error": str(exc), + }, + ) + return closed_count + async def _find_issue( self, repository_id: UUID, forgejo_issue_number: int ) -> ForgejoIssue | None: