feat(scripts): pull more backened

This commit is contained in:
null 2026-05-21 22:53:02 -05:00
parent e56f252da6
commit 5cc0d75636
6 changed files with 314 additions and 36 deletions

View File

@ -18,10 +18,12 @@ from app.core.time import utcnow
from app.db import crud
from app.db.session import get_session
from app.models.board_repository_links import BoardRepositoryLink
from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
from app.schemas.forgejo_webhooks import ForgejoWebhookIngestResponse
from app.services.activity_log import record_activity
from app.services.forgejo_client import get_forgejo_client
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
@ -289,7 +291,12 @@ async def _upsert_issue(
repository: ForgejoRepository,
action: str,
issue_data: dict[str, Any],
issue_payload: dict[str, Any] | None = None,
comments_payload: list[dict[str, object]] | None = None,
timeline_payload: list[dict[str, object]] | None = None,
reactions_payload: list[dict[str, object]] | None = None,
) -> tuple[ForgejoIssue, bool]:
payload = issue_payload if issue_payload is not None else issue_data
number = _issue_number(issue_data)
existing = (
await session.exec(
@ -306,9 +313,9 @@ async def _upsert_issue(
else _parse_optional_iso_datetime(issue_data.get("closed_at")) or utcnow()
)
raw_body = issue_data.get("body") or ""
raw_body = payload.get("body") or ""
body_text = str(raw_body) if raw_body else None
milestone = _milestone(issue_data)
milestone = _milestone(payload)
if existing is None:
issue = ForgejoIssue(
@ -320,14 +327,17 @@ async def _upsert_issue(
body_preview=str(raw_body)[:1000] if raw_body else None,
state=state,
is_pull_request=False,
labels=_labels(issue_data),
assignees=_assignees(issue_data),
labels=_labels(payload),
assignees=_assignees(payload),
milestone=milestone,
forgejo_payload=dict(issue_data),
author=_author(issue_data),
html_url=str(issue_data.get("html_url") or ""),
forgejo_created_at=_parse_iso_datetime(issue_data.get("created_at")),
forgejo_updated_at=_parse_iso_datetime(issue_data.get("updated_at")),
forgejo_payload=dict(payload),
forgejo_comments_payload=comments_payload or [],
forgejo_timeline_payload=timeline_payload or [],
forgejo_reactions_payload=reactions_payload or [],
author=_author(payload),
html_url=str(payload.get("html_url") or ""),
forgejo_created_at=_parse_iso_datetime(payload.get("created_at")),
forgejo_updated_at=_parse_iso_datetime(payload.get("updated_at")),
forgejo_closed_at=closed_at,
)
session.add(issue)
@ -340,13 +350,19 @@ async def _upsert_issue(
existing.milestone = milestone
existing.state = state
existing.is_pull_request = False
existing.labels = _labels(issue_data)
existing.assignees = _assignees(issue_data)
existing.forgejo_payload = dict(issue_data)
existing.author = _author(issue_data)
existing.html_url = str(issue_data.get("html_url") or "")
existing.forgejo_created_at = _parse_iso_datetime(issue_data.get("created_at"))
existing.forgejo_updated_at = _parse_iso_datetime(issue_data.get("updated_at"))
existing.labels = _labels(payload)
existing.assignees = _assignees(payload)
existing.forgejo_payload = dict(payload)
if comments_payload is not None:
existing.forgejo_comments_payload = comments_payload
if timeline_payload is not None:
existing.forgejo_timeline_payload = timeline_payload
if reactions_payload is not None:
existing.forgejo_reactions_payload = reactions_payload
existing.author = _author(payload)
existing.html_url = str(payload.get("html_url") or "")
existing.forgejo_created_at = _parse_iso_datetime(payload.get("created_at"))
existing.forgejo_updated_at = _parse_iso_datetime(payload.get("updated_at"))
existing.forgejo_closed_at = closed_at
existing.last_synced_at = utcnow()
existing.updated_at = utcnow()
@ -458,11 +474,60 @@ async def ingest_forgejo_webhook(
reason="pull_request_ignored",
)
issue_number = _issue_number(issue_data)
issue_payload_for_cache: dict[str, Any] = dict(issue_data)
comments_payload: list[dict[str, object]] | None = None
timeline_payload: list[dict[str, object]] | None = None
reactions_payload: list[dict[str, object]] | None = None
connection = await crud.get_by_id(session, ForgejoConnection, repository.connection_id)
if connection is not None and connection.token:
try:
async with get_forgejo_client(connection) as client:
remote_issue = await client.get_issue(
owner=repository.owner,
repo=repository.repo,
issue_number=issue_number,
)
if isinstance(remote_issue, dict):
issue_payload_for_cache = dict(remote_issue)
raw_comments = await client.list_issue_comments(
owner=repository.owner,
repo=repository.repo,
issue_number=issue_number,
)
raw_timeline = await client.list_issue_timeline(
owner=repository.owner,
repo=repository.repo,
issue_number=issue_number,
)
raw_reactions = await client.list_issue_reactions(
owner=repository.owner,
repo=repository.repo,
issue_number=issue_number,
)
comments_payload = [item for item in raw_comments if isinstance(item, dict)]
timeline_payload = [item for item in raw_timeline if isinstance(item, dict)]
reactions_payload = [item for item in raw_reactions if isinstance(item, dict)]
except Exception as exc:
logger.warning(
"forgejo.webhook.issue.enrichment_failed",
extra={
"repository_id": str(repository.id),
"issue_number": issue_number,
"error": str(exc),
},
)
issue, created = await _upsert_issue(
session=session,
repository=repository,
action=action,
issue_data=issue_data,
issue_payload=issue_payload_for_cache,
comments_payload=comments_payload,
timeline_payload=timeline_payload,
reactions_payload=reactions_payload,
)
await _record_issue_activity(
session=session,

View File

@ -37,6 +37,15 @@ class ForgejoIssue(SQLModel, table=True):
forgejo_payload: dict[str, object] | None = Field(
default=None, sa_column=Column(JSON, nullable=True)
)
forgejo_comments_payload: list[dict[str, object]] = Field(
default_factory=list, sa_column=Column(JSON)
)
forgejo_timeline_payload: list[dict[str, object]] = Field(
default_factory=list, sa_column=Column(JSON)
)
forgejo_reactions_payload: list[dict[str, object]] = Field(
default_factory=list, sa_column=Column(JSON)
)
author: str
html_url: str

View File

@ -46,6 +46,9 @@ class ForgejoIssueDetailRead(ForgejoIssueRead):
"""Issue detail payload including full cached Forgejo source payload."""
forgejo_payload: dict[str, Any] | None = None
forgejo_comments_payload: list[dict[str, Any]] = []
forgejo_timeline_payload: list[dict[str, Any]] = []
forgejo_reactions_payload: list[dict[str, Any]] = []
class ForgejoIssueListResponse(SQLModel):

View File

@ -2,16 +2,10 @@
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
import httpx
from app.core.logging import get_logger
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
logger = get_logger(__name__)
@ -137,6 +131,18 @@ class ForgejoAPIClient:
response.raise_for_status()
return response.json()
async def get_issue(
self,
owner: str,
repo: str,
issue_number: int,
) -> dict[str, object]:
"""Get full details for a single issue."""
client = await self._get_client()
response = await client.get(f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}")
response.raise_for_status()
return response.json()
async def get_repository(
self,
owner: str,
@ -183,6 +189,81 @@ class ForgejoAPIClient:
data = response.json()
return list(data) if isinstance(data, list) else []
async def list_issue_comments(
self,
owner: str,
repo: str,
issue_number: int,
limit: int = 50,
) -> list[dict[str, object]]:
"""List all comments for an issue."""
client = await self._get_client()
comments: list[dict[str, object]] = []
page = 1
while True:
response = await client.get(
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/comments",
params={"limit": limit, "page": page},
)
response.raise_for_status()
data = response.json()
page_items = data if isinstance(data, list) else []
comments.extend(item for item in page_items if isinstance(item, dict))
if len(page_items) < limit:
break
page += 1
return comments
async def list_issue_timeline(
self,
owner: str,
repo: str,
issue_number: int,
limit: int = 50,
) -> list[dict[str, object]]:
"""List timeline events for an issue."""
client = await self._get_client()
timeline: list[dict[str, object]] = []
page = 1
while True:
response = await client.get(
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/timeline",
params={"limit": limit, "page": page},
)
response.raise_for_status()
data = response.json()
page_items = data if isinstance(data, list) else []
timeline.extend(item for item in page_items if isinstance(item, dict))
if len(page_items) < limit:
break
page += 1
return timeline
async def list_issue_reactions(
self,
owner: str,
repo: str,
issue_number: int,
limit: int = 50,
) -> list[dict[str, object]]:
"""List reactions for an issue."""
client = await self._get_client()
reactions: list[dict[str, object]] = []
page = 1
while True:
response = await client.get(
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/reactions",
params={"limit": limit, "page": page},
)
response.raise_for_status()
data = response.json()
page_items = data if isinstance(data, list) else []
reactions.extend(item for item in page_items if isinstance(item, dict))
if len(page_items) < limit:
break
page += 1
return reactions
async def list_user_repos(
self,
limit: int = 50,
@ -223,6 +304,7 @@ def get_forgejo_client(
# Remove /api/v1 if present to get base URL
if "/api/v1" in base_url:
import re
match = re.match(r"(https?://[^/]+)", base_url)
if match:
base_url = match.group(1).rstrip("/")

View File

@ -18,6 +18,30 @@ from app.services.forgejo_client import get_forgejo_client
logger = get_logger(__name__)
def _as_dict(value: object) -> dict[str, object] | None:
return value if isinstance(value, dict) else None
def _as_dict_list(value: object) -> list[dict[str, object]]:
if not isinstance(value, list):
return []
return [item for item in value if isinstance(item, dict)]
def _author_login(issue_payload: dict[str, object]) -> str:
user = issue_payload.get("user")
if isinstance(user, dict):
login = user.get("login")
if isinstance(login, str):
return login
return ""
def _html_url(issue_payload: dict[str, object]) -> str:
value = issue_payload.get("html_url")
return value if isinstance(value, str) else ""
class IssueSyncService:
"""Service for syncing Forgejo issues from remote repositories."""
@ -75,12 +99,86 @@ class IssueSyncService:
if issue_data.get("pull_request") is not None:
continue
forgejo_number = issue_data.get("number", 0)
raw_number = issue_data.get("number", 0)
try:
forgejo_number = int(raw_number)
except (TypeError, ValueError):
forgejo_number = 0
state = issue_data.get("state", "open")
issue_payload = dict(issue_data)
comments_payload: list[dict[str, object]] = []
timeline_payload: list[dict[str, object]] = []
reactions_payload: list[dict[str, object]] = []
# Enrich each issue with full detail and exhaustive nested data.
try:
full_issue = await client.get_issue(
owner=repository.owner,
repo=repository.repo,
issue_number=forgejo_number,
)
maybe_full = _as_dict(full_issue)
if maybe_full is not None:
issue_payload = maybe_full
except Exception as exc:
logger.warning(
"issue_detail_sync_failed",
repository_id=str(repository_id),
issue_number=forgejo_number,
error=str(exc),
)
try:
comments_payload = _as_dict_list(
await client.list_issue_comments(
owner=repository.owner,
repo=repository.repo,
issue_number=forgejo_number,
)
)
except Exception as exc:
logger.warning(
"issue_comments_sync_failed",
repository_id=str(repository_id),
issue_number=forgejo_number,
error=str(exc),
)
try:
timeline_payload = _as_dict_list(
await client.list_issue_timeline(
owner=repository.owner,
repo=repository.repo,
issue_number=forgejo_number,
)
)
except Exception as exc:
logger.warning(
"issue_timeline_sync_failed",
repository_id=str(repository_id),
issue_number=forgejo_number,
error=str(exc),
)
try:
reactions_payload = _as_dict_list(
await client.list_issue_reactions(
owner=repository.owner,
repo=repository.repo,
issue_number=forgejo_number,
)
)
except Exception as exc:
logger.warning(
"issue_reactions_sync_failed",
repository_id=str(repository_id),
issue_number=forgejo_number,
error=str(exc),
)
# Parse labels
labels_data = []
for label in issue_data.get("labels") or []:
for label in issue_payload.get("labels") or []:
labels_data.append(
{
"id": label.get("id"),
@ -92,7 +190,7 @@ class IssueSyncService:
# Parse assignees
assignees_data = []
for assignee in issue_data.get("assignees") or []:
for assignee in issue_payload.get("assignees") or []:
assignees_data.append(
{
"login": assignee.get("login", ""),
@ -103,7 +201,7 @@ class IssueSyncService:
# Parse milestone
milestone_data = None
raw_milestone = issue_data.get("milestone")
raw_milestone = issue_payload.get("milestone")
if raw_milestone and isinstance(raw_milestone, dict):
milestone_data = {
"id": raw_milestone.get("id"),
@ -115,14 +213,14 @@ class IssueSyncService:
}
# Full body and preview
raw_body = issue_data.get("body") or ""
raw_body = issue_payload.get("body") or ""
body_full = raw_body if raw_body else None
body_preview = raw_body[:1000] if raw_body else None
# Parse dates — required fields fall back to utcnow(), optional closed_at stays None
created_at = self._parse_iso_date(issue_data.get("created_at")) or utcnow()
updated_at = self._parse_iso_date(issue_data.get("updated_at")) or utcnow()
closed_at = self._parse_iso_date(issue_data.get("closed_at"))
created_at = self._parse_iso_date(issue_payload.get("created_at")) or utcnow()
updated_at = self._parse_iso_date(issue_payload.get("updated_at")) or utcnow()
closed_at = self._parse_iso_date(issue_payload.get("closed_at"))
# Check if issue exists
existing = await self._find_issue(repository_id, forgejo_number)
@ -140,9 +238,12 @@ class IssueSyncService:
labels=labels_data,
assignees=assignees_data,
milestone=milestone_data,
forgejo_payload=dict(issue_data),
author=issue_data.get("user", {}).get("login", ""),
html_url=issue_data.get("html_url", ""),
forgejo_payload=issue_payload,
forgejo_comments_payload=comments_payload,
forgejo_timeline_payload=timeline_payload,
forgejo_reactions_payload=reactions_payload,
author=_author_login(issue_payload),
html_url=_html_url(issue_payload),
forgejo_created_at=created_at,
forgejo_updated_at=updated_at,
forgejo_closed_at=closed_at,
@ -158,9 +259,12 @@ class IssueSyncService:
existing.labels = labels_data
existing.assignees = assignees_data
existing.milestone = milestone_data
existing.forgejo_payload = dict(issue_data)
existing.author = issue_data.get("user", {}).get("login", "")
existing.html_url = issue_data.get("html_url", "")
existing.forgejo_payload = issue_payload
existing.forgejo_comments_payload = comments_payload
existing.forgejo_timeline_payload = timeline_payload
existing.forgejo_reactions_payload = reactions_payload
existing.author = _author_login(issue_payload)
existing.html_url = _html_url(issue_payload)
existing.forgejo_created_at = created_at
existing.forgejo_updated_at = updated_at
existing.forgejo_closed_at = closed_at

View File

@ -22,7 +22,22 @@ def upgrade() -> None:
"forgejo_issues",
sa.Column("forgejo_payload", sa.JSON(), nullable=True),
)
op.add_column(
"forgejo_issues",
sa.Column("forgejo_comments_payload", sa.JSON(), nullable=False, server_default="[]"),
)
op.add_column(
"forgejo_issues",
sa.Column("forgejo_timeline_payload", sa.JSON(), nullable=False, server_default="[]"),
)
op.add_column(
"forgejo_issues",
sa.Column("forgejo_reactions_payload", sa.JSON(), nullable=False, server_default="[]"),
)
def downgrade() -> None:
op.drop_column("forgejo_issues", "forgejo_reactions_payload")
op.drop_column("forgejo_issues", "forgejo_timeline_payload")
op.drop_column("forgejo_issues", "forgejo_comments_payload")
op.drop_column("forgejo_issues", "forgejo_payload")