New POST /{repository_id}/sync/recent?days=7 endpoint.
This commit is contained in:
parent
5446fad843
commit
922a386871
|
|
@ -5,7 +5,7 @@ from __future__ import annotations
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, HTTPException, status
|
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
||||||
from sqlmodel import select
|
from sqlmodel import select
|
||||||
|
|
||||||
from app.api.deps import require_org_member
|
from app.api.deps import require_org_member
|
||||||
|
|
@ -328,6 +328,41 @@ async def sync_repository_issues(
|
||||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"/{repository_id}/sync/recent",
|
||||||
|
summary="Light Sync Issues from Repository",
|
||||||
|
description="Fetch only issues updated in the last `days` days. Faster than full sync — uses batch DB lookups and skips per-issue enrichment calls.",
|
||||||
|
)
|
||||||
|
async def sync_repository_issues_recent(
|
||||||
|
repository_id: UUID,
|
||||||
|
days: int = Query(default=7, ge=1, le=90),
|
||||||
|
session: AsyncSession = SESSION_DEP,
|
||||||
|
ctx: OrganizationContext = ORG_MEMBER_DEP,
|
||||||
|
) -> dict[str, int]:
|
||||||
|
"""Light sync: fetch only issues updated in the last `days` days."""
|
||||||
|
repository = await crud.get_by_id(session, ForgejoRepository, repository_id)
|
||||||
|
if repository is None or repository.organization_id != ctx.organization.id:
|
||||||
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
||||||
|
|
||||||
|
from app.services.forgejo_issue_sync import IssueSyncService
|
||||||
|
from app.core.time import utcnow
|
||||||
|
|
||||||
|
try:
|
||||||
|
sync_service = IssueSyncService(session=session, organization_id=ctx.organization.id)
|
||||||
|
result = await sync_service.sync_recent(repository_id=repository_id, days=days)
|
||||||
|
return result
|
||||||
|
except ValueError as e:
|
||||||
|
repository.last_sync_error = str(e)
|
||||||
|
repository.updated_at = utcnow()
|
||||||
|
await crud.save(session, repository)
|
||||||
|
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=str(e))
|
||||||
|
except Exception as e:
|
||||||
|
repository.last_sync_error = str(e)
|
||||||
|
repository.updated_at = utcnow()
|
||||||
|
await crud.save(session, repository)
|
||||||
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
|
||||||
|
|
||||||
|
|
||||||
def _mask_repository(repository: ForgejoRepository, connection: ForgejoConnection | None = None) -> dict[str, object]:
|
def _mask_repository(repository: ForgejoRepository, connection: ForgejoConnection | None = None) -> dict[str, object]:
|
||||||
"""Return repository dict with safe connection metadata."""
|
"""Return repository dict with safe connection metadata."""
|
||||||
return {
|
return {
|
||||||
|
|
|
||||||
|
|
@ -80,6 +80,7 @@ class ForgejoAPIClient:
|
||||||
state: str = "open",
|
state: str = "open",
|
||||||
page: int = 1,
|
page: int = 1,
|
||||||
limit: int = 30,
|
limit: int = 30,
|
||||||
|
since: str | None = None,
|
||||||
) -> dict[str, object]:
|
) -> dict[str, object]:
|
||||||
"""
|
"""
|
||||||
List issues for a repository (excluding pull requests).
|
List issues for a repository (excluding pull requests).
|
||||||
|
|
@ -90,17 +91,20 @@ class ForgejoAPIClient:
|
||||||
state: Issue state (open, closed, all)
|
state: Issue state (open, closed, all)
|
||||||
page: Page number
|
page: Page number
|
||||||
limit: Items per page
|
limit: Items per page
|
||||||
|
since: RFC 3339 timestamp; only return issues updated at or after this time
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
API response as dict
|
API response as dict
|
||||||
"""
|
"""
|
||||||
client = await self._get_client()
|
client = await self._get_client()
|
||||||
params = {
|
params: dict[str, object] = {
|
||||||
"state": state,
|
"state": state,
|
||||||
"page": page,
|
"page": page,
|
||||||
"per_page": limit,
|
"per_page": limit,
|
||||||
"type": "issues", # Exclude pull requests
|
"type": "issues", # Exclude pull requests
|
||||||
}
|
}
|
||||||
|
if since:
|
||||||
|
params["since"] = since
|
||||||
response = await client.get(f"/api/v1/repos/{owner}/{repo}/issues", params=params)
|
response = await client.get(f"/api/v1/repos/{owner}/{repo}/issues", params=params)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response.json()
|
return response.json()
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
from sqlmodel import select
|
from sqlmodel import select
|
||||||
|
|
@ -350,6 +350,252 @@ class IssueSyncService:
|
||||||
"total": created + updated_count,
|
"total": created + updated_count,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async def sync_recent(
|
||||||
|
self,
|
||||||
|
repository_id: UUID,
|
||||||
|
days: int = 7,
|
||||||
|
limit: int = 50,
|
||||||
|
) -> dict[str, int]:
|
||||||
|
"""Lightweight sync: fetch only issues updated in the last `days` days.
|
||||||
|
|
||||||
|
Uses a single batch DB query per page (no per-issue lookups) and skips
|
||||||
|
enrichment calls (comments, timeline, reactions). After paging, closes
|
||||||
|
any DB-open issues that Forgejo has since closed outside the time window.
|
||||||
|
"""
|
||||||
|
repository = await crud.get_by_id(self.session, ForgejoRepository, repository_id)
|
||||||
|
if repository is None:
|
||||||
|
raise ValueError(f"Repository {repository_id} not found or access denied")
|
||||||
|
if repository.organization_id != self.organization_id:
|
||||||
|
raise ValueError(f"Repository {repository_id} not found or access denied")
|
||||||
|
|
||||||
|
connection = await crud.get_by_id(self.session, ForgejoConnection, repository.connection_id)
|
||||||
|
if connection is None:
|
||||||
|
raise ValueError("Repository has no connection")
|
||||||
|
|
||||||
|
since_dt = utcnow() - timedelta(days=days)
|
||||||
|
since_iso = since_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
|
||||||
|
created = 0
|
||||||
|
updated_count = 0
|
||||||
|
open_count = 0
|
||||||
|
closed_count = 0
|
||||||
|
page = 1
|
||||||
|
|
||||||
|
while True:
|
||||||
|
async with get_forgejo_client(connection) as client:
|
||||||
|
response = await client.list_issues(
|
||||||
|
owner=repository.owner,
|
||||||
|
repo=repository.repo,
|
||||||
|
state="all",
|
||||||
|
page=page,
|
||||||
|
limit=limit,
|
||||||
|
since=since_iso,
|
||||||
|
)
|
||||||
|
|
||||||
|
issues = (
|
||||||
|
response
|
||||||
|
if isinstance(response, list)
|
||||||
|
else response.get("items", response.get("data", []))
|
||||||
|
)
|
||||||
|
if not isinstance(issues, list) or not issues:
|
||||||
|
break
|
||||||
|
|
||||||
|
numbers = [
|
||||||
|
int(i.get("number", 0))
|
||||||
|
for i in issues
|
||||||
|
if isinstance(i, dict) and i.get("number") and i.get("pull_request") is None
|
||||||
|
]
|
||||||
|
existing_map = await self._find_issues_batch(repository_id, numbers)
|
||||||
|
|
||||||
|
for issue_data in issues:
|
||||||
|
if not isinstance(issue_data, dict):
|
||||||
|
continue
|
||||||
|
if issue_data.get("pull_request") is not None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
raw_number = issue_data.get("number", 0)
|
||||||
|
try:
|
||||||
|
forgejo_number = int(raw_number)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
state = issue_data.get("state", "open")
|
||||||
|
raw_body = issue_data.get("body") or ""
|
||||||
|
body_full = raw_body if raw_body else None
|
||||||
|
body_preview = raw_body[:1000] if raw_body else None
|
||||||
|
|
||||||
|
labels_data = []
|
||||||
|
for label in issue_data.get("labels") or []:
|
||||||
|
if isinstance(label, dict):
|
||||||
|
labels_data.append(
|
||||||
|
{
|
||||||
|
"id": label.get("id"),
|
||||||
|
"name": label.get("name", ""),
|
||||||
|
"color": label.get("color", ""),
|
||||||
|
"description": label.get("description", ""),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assignees_data = []
|
||||||
|
for assignee in issue_data.get("assignees") or []:
|
||||||
|
if isinstance(assignee, dict):
|
||||||
|
assignees_data.append(
|
||||||
|
{
|
||||||
|
"login": assignee.get("login", ""),
|
||||||
|
"id": assignee.get("id", 0),
|
||||||
|
"avatar_url": assignee.get("avatar_url", ""),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
milestone_data = None
|
||||||
|
raw_milestone = issue_data.get("milestone")
|
||||||
|
if raw_milestone and isinstance(raw_milestone, dict):
|
||||||
|
milestone_data = {
|
||||||
|
"id": raw_milestone.get("id"),
|
||||||
|
"title": raw_milestone.get("title", ""),
|
||||||
|
"state": raw_milestone.get("state", "open"),
|
||||||
|
"description": raw_milestone.get("description") or None,
|
||||||
|
"due_on": raw_milestone.get("due_on") or None,
|
||||||
|
"closed_at": raw_milestone.get("closed_at") or None,
|
||||||
|
}
|
||||||
|
|
||||||
|
created_at = self._parse_iso_date(issue_data.get("created_at")) or utcnow()
|
||||||
|
updated_at = self._parse_iso_date(issue_data.get("updated_at")) or utcnow()
|
||||||
|
closed_at = self._parse_iso_date(issue_data.get("closed_at"))
|
||||||
|
|
||||||
|
existing = existing_map.get(forgejo_number)
|
||||||
|
|
||||||
|
if existing is None:
|
||||||
|
issue = ForgejoIssue(
|
||||||
|
organization_id=self.organization_id,
|
||||||
|
repository_id=repository_id,
|
||||||
|
forgejo_issue_number=forgejo_number,
|
||||||
|
title=issue_data.get("title", ""),
|
||||||
|
body=body_full,
|
||||||
|
body_preview=body_preview,
|
||||||
|
state=state,
|
||||||
|
is_pull_request=False,
|
||||||
|
labels=labels_data,
|
||||||
|
assignees=assignees_data,
|
||||||
|
milestone=milestone_data,
|
||||||
|
forgejo_payload=dict(issue_data),
|
||||||
|
author=_author_login(issue_data),
|
||||||
|
html_url=_html_url(issue_data),
|
||||||
|
forgejo_created_at=created_at,
|
||||||
|
forgejo_updated_at=updated_at,
|
||||||
|
forgejo_closed_at=closed_at,
|
||||||
|
)
|
||||||
|
self.session.add(issue)
|
||||||
|
await self.session.flush()
|
||||||
|
created += 1
|
||||||
|
else:
|
||||||
|
existing.title = issue_data.get("title", "")
|
||||||
|
existing.body = body_full
|
||||||
|
existing.body_preview = body_preview
|
||||||
|
existing.state = state
|
||||||
|
existing.labels = labels_data
|
||||||
|
existing.assignees = assignees_data
|
||||||
|
existing.milestone = milestone_data
|
||||||
|
existing.forgejo_payload = dict(issue_data)
|
||||||
|
existing.author = _author_login(issue_data)
|
||||||
|
existing.html_url = _html_url(issue_data)
|
||||||
|
existing.forgejo_created_at = created_at
|
||||||
|
existing.forgejo_updated_at = updated_at
|
||||||
|
existing.forgejo_closed_at = closed_at
|
||||||
|
existing.last_synced_at = utcnow()
|
||||||
|
await crud.save(self.session, existing)
|
||||||
|
updated_count += 1
|
||||||
|
|
||||||
|
if state == "open":
|
||||||
|
open_count += 1
|
||||||
|
elif state == "closed":
|
||||||
|
closed_count += 1
|
||||||
|
|
||||||
|
if len(issues) < limit:
|
||||||
|
break
|
||||||
|
page += 1
|
||||||
|
|
||||||
|
stale_closed = await self._close_stale_opens(
|
||||||
|
repository_id, connection, repository, days
|
||||||
|
)
|
||||||
|
|
||||||
|
repository.last_sync_at = utcnow()
|
||||||
|
repository.last_sync_error = None
|
||||||
|
await crud.save(self.session, repository)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"created": created,
|
||||||
|
"updated": updated_count,
|
||||||
|
"stale_closed": stale_closed,
|
||||||
|
"open": open_count,
|
||||||
|
"closed": closed_count,
|
||||||
|
"total": created + updated_count,
|
||||||
|
}
|
||||||
|
|
||||||
|
async def _find_issues_batch(
|
||||||
|
self, repository_id: UUID, numbers: list[int]
|
||||||
|
) -> dict[int, ForgejoIssue]:
|
||||||
|
"""Batch lookup of issues by number — one query instead of N serial lookups."""
|
||||||
|
if not numbers:
|
||||||
|
return {}
|
||||||
|
statement = select(ForgejoIssue).where(
|
||||||
|
ForgejoIssue.repository_id == repository_id,
|
||||||
|
ForgejoIssue.forgejo_issue_number.in_(numbers),
|
||||||
|
)
|
||||||
|
results = await self.session.exec(statement)
|
||||||
|
return {issue.forgejo_issue_number: issue for issue in results.all()}
|
||||||
|
|
||||||
|
async def _close_stale_opens(
|
||||||
|
self,
|
||||||
|
repository_id: UUID,
|
||||||
|
connection: object,
|
||||||
|
repository: ForgejoRepository,
|
||||||
|
days: int,
|
||||||
|
) -> int:
|
||||||
|
"""Verify open DB issues not updated in `days` days; close any that Forgejo has closed."""
|
||||||
|
cutoff = utcnow() - timedelta(days=days)
|
||||||
|
statement = (
|
||||||
|
select(ForgejoIssue)
|
||||||
|
.where(
|
||||||
|
ForgejoIssue.repository_id == repository_id,
|
||||||
|
ForgejoIssue.state == "open",
|
||||||
|
ForgejoIssue.forgejo_updated_at < cutoff,
|
||||||
|
)
|
||||||
|
.limit(50)
|
||||||
|
)
|
||||||
|
stale = (await self.session.exec(statement)).all()
|
||||||
|
if not stale:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
closed_count = 0
|
||||||
|
async with get_forgejo_client(connection) as client:
|
||||||
|
for issue in stale:
|
||||||
|
try:
|
||||||
|
remote = await client.get_issue(
|
||||||
|
owner=repository.owner,
|
||||||
|
repo=repository.repo,
|
||||||
|
issue_number=issue.forgejo_issue_number,
|
||||||
|
)
|
||||||
|
if isinstance(remote, dict) and remote.get("state") == "closed":
|
||||||
|
issue.state = "closed"
|
||||||
|
issue.forgejo_closed_at = self._parse_iso_date(remote.get("closed_at"))
|
||||||
|
issue.forgejo_updated_at = (
|
||||||
|
self._parse_iso_date(remote.get("updated_at")) or utcnow()
|
||||||
|
)
|
||||||
|
issue.last_synced_at = utcnow()
|
||||||
|
await crud.save(self.session, issue)
|
||||||
|
closed_count += 1
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(
|
||||||
|
"stale_open_check_failed",
|
||||||
|
extra={
|
||||||
|
"repository_id": str(repository_id),
|
||||||
|
"issue_number": issue.forgejo_issue_number,
|
||||||
|
"error": str(exc),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return closed_count
|
||||||
|
|
||||||
async def _find_issue(
|
async def _find_issue(
|
||||||
self, repository_id: UUID, forgejo_issue_number: int
|
self, repository_id: UUID, forgejo_issue_number: int
|
||||||
) -> ForgejoIssue | None:
|
) -> ForgejoIssue | None:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue