diff --git a/backend/app/api/forgejo_webhooks.py b/backend/app/api/forgejo_webhooks.py index 1d31cf9..41eb083 100644 --- a/backend/app/api/forgejo_webhooks.py +++ b/backend/app/api/forgejo_webhooks.py @@ -18,10 +18,12 @@ from app.core.time import utcnow from app.db import crud from app.db.session import get_session from app.models.board_repository_links import BoardRepositoryLink +from app.models.forgejo_connections import ForgejoConnection from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_repositories import ForgejoRepository from app.schemas.forgejo_webhooks import ForgejoWebhookIngestResponse from app.services.activity_log import record_activity +from app.services.forgejo_client import get_forgejo_client if TYPE_CHECKING: from sqlmodel.ext.asyncio.session import AsyncSession @@ -289,7 +291,12 @@ async def _upsert_issue( repository: ForgejoRepository, action: str, issue_data: dict[str, Any], + issue_payload: dict[str, Any] | None = None, + comments_payload: list[dict[str, object]] | None = None, + timeline_payload: list[dict[str, object]] | None = None, + reactions_payload: list[dict[str, object]] | None = None, ) -> tuple[ForgejoIssue, bool]: + payload = issue_payload if issue_payload is not None else issue_data number = _issue_number(issue_data) existing = ( await session.exec( @@ -306,9 +313,9 @@ async def _upsert_issue( else _parse_optional_iso_datetime(issue_data.get("closed_at")) or utcnow() ) - raw_body = issue_data.get("body") or "" + raw_body = payload.get("body") or "" body_text = str(raw_body) if raw_body else None - milestone = _milestone(issue_data) + milestone = _milestone(payload) if existing is None: issue = ForgejoIssue( @@ -320,14 +327,17 @@ async def _upsert_issue( body_preview=str(raw_body)[:1000] if raw_body else None, state=state, is_pull_request=False, - labels=_labels(issue_data), - assignees=_assignees(issue_data), + labels=_labels(payload), + assignees=_assignees(payload), milestone=milestone, - forgejo_payload=dict(issue_data), - author=_author(issue_data), - html_url=str(issue_data.get("html_url") or ""), - forgejo_created_at=_parse_iso_datetime(issue_data.get("created_at")), - forgejo_updated_at=_parse_iso_datetime(issue_data.get("updated_at")), + forgejo_payload=dict(payload), + forgejo_comments_payload=comments_payload or [], + forgejo_timeline_payload=timeline_payload or [], + forgejo_reactions_payload=reactions_payload or [], + author=_author(payload), + html_url=str(payload.get("html_url") or ""), + forgejo_created_at=_parse_iso_datetime(payload.get("created_at")), + forgejo_updated_at=_parse_iso_datetime(payload.get("updated_at")), forgejo_closed_at=closed_at, ) session.add(issue) @@ -340,13 +350,19 @@ async def _upsert_issue( existing.milestone = milestone existing.state = state existing.is_pull_request = False - existing.labels = _labels(issue_data) - existing.assignees = _assignees(issue_data) - existing.forgejo_payload = dict(issue_data) - existing.author = _author(issue_data) - existing.html_url = str(issue_data.get("html_url") or "") - existing.forgejo_created_at = _parse_iso_datetime(issue_data.get("created_at")) - existing.forgejo_updated_at = _parse_iso_datetime(issue_data.get("updated_at")) + existing.labels = _labels(payload) + existing.assignees = _assignees(payload) + existing.forgejo_payload = dict(payload) + if comments_payload is not None: + existing.forgejo_comments_payload = comments_payload + if timeline_payload is not None: + existing.forgejo_timeline_payload = timeline_payload + if reactions_payload is not None: + existing.forgejo_reactions_payload = reactions_payload + existing.author = _author(payload) + existing.html_url = str(payload.get("html_url") or "") + existing.forgejo_created_at = _parse_iso_datetime(payload.get("created_at")) + existing.forgejo_updated_at = _parse_iso_datetime(payload.get("updated_at")) existing.forgejo_closed_at = closed_at existing.last_synced_at = utcnow() existing.updated_at = utcnow() @@ -458,11 +474,60 @@ async def ingest_forgejo_webhook( reason="pull_request_ignored", ) + issue_number = _issue_number(issue_data) + issue_payload_for_cache: dict[str, Any] = dict(issue_data) + comments_payload: list[dict[str, object]] | None = None + timeline_payload: list[dict[str, object]] | None = None + reactions_payload: list[dict[str, object]] | None = None + + connection = await crud.get_by_id(session, ForgejoConnection, repository.connection_id) + if connection is not None and connection.token: + try: + async with get_forgejo_client(connection) as client: + remote_issue = await client.get_issue( + owner=repository.owner, + repo=repository.repo, + issue_number=issue_number, + ) + if isinstance(remote_issue, dict): + issue_payload_for_cache = dict(remote_issue) + raw_comments = await client.list_issue_comments( + owner=repository.owner, + repo=repository.repo, + issue_number=issue_number, + ) + raw_timeline = await client.list_issue_timeline( + owner=repository.owner, + repo=repository.repo, + issue_number=issue_number, + ) + raw_reactions = await client.list_issue_reactions( + owner=repository.owner, + repo=repository.repo, + issue_number=issue_number, + ) + comments_payload = [item for item in raw_comments if isinstance(item, dict)] + timeline_payload = [item for item in raw_timeline if isinstance(item, dict)] + reactions_payload = [item for item in raw_reactions if isinstance(item, dict)] + except Exception as exc: + logger.warning( + "forgejo.webhook.issue.enrichment_failed", + extra={ + "repository_id": str(repository.id), + "issue_number": issue_number, + "error": str(exc), + }, + ) + issue, created = await _upsert_issue( session=session, repository=repository, action=action, issue_data=issue_data, + issue_payload=issue_payload_for_cache, + comments_payload=comments_payload, + timeline_payload=timeline_payload, + reactions_payload=reactions_payload, ) await _record_issue_activity( session=session, diff --git a/backend/app/models/forgejo_issues.py b/backend/app/models/forgejo_issues.py index 391b2e1..3b51afa 100644 --- a/backend/app/models/forgejo_issues.py +++ b/backend/app/models/forgejo_issues.py @@ -37,6 +37,15 @@ class ForgejoIssue(SQLModel, table=True): forgejo_payload: dict[str, object] | None = Field( default=None, sa_column=Column(JSON, nullable=True) ) + forgejo_comments_payload: list[dict[str, object]] = Field( + default_factory=list, sa_column=Column(JSON) + ) + forgejo_timeline_payload: list[dict[str, object]] = Field( + default_factory=list, sa_column=Column(JSON) + ) + forgejo_reactions_payload: list[dict[str, object]] = Field( + default_factory=list, sa_column=Column(JSON) + ) author: str html_url: str diff --git a/backend/app/schemas/forgejo_issues.py b/backend/app/schemas/forgejo_issues.py index 37576eb..6294252 100644 --- a/backend/app/schemas/forgejo_issues.py +++ b/backend/app/schemas/forgejo_issues.py @@ -46,6 +46,9 @@ class ForgejoIssueDetailRead(ForgejoIssueRead): """Issue detail payload including full cached Forgejo source payload.""" forgejo_payload: dict[str, Any] | None = None + forgejo_comments_payload: list[dict[str, Any]] = [] + forgejo_timeline_payload: list[dict[str, Any]] = [] + forgejo_reactions_payload: list[dict[str, Any]] = [] class ForgejoIssueListResponse(SQLModel): diff --git a/backend/app/services/forgejo_client.py b/backend/app/services/forgejo_client.py index 88d56d3..0f939bb 100644 --- a/backend/app/services/forgejo_client.py +++ b/backend/app/services/forgejo_client.py @@ -2,16 +2,10 @@ from __future__ import annotations -import asyncio -from typing import TYPE_CHECKING - import httpx from app.core.logging import get_logger -if TYPE_CHECKING: - from collections.abc import AsyncGenerator - logger = get_logger(__name__) @@ -137,6 +131,18 @@ class ForgejoAPIClient: response.raise_for_status() return response.json() + async def get_issue( + self, + owner: str, + repo: str, + issue_number: int, + ) -> dict[str, object]: + """Get full details for a single issue.""" + client = await self._get_client() + response = await client.get(f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}") + response.raise_for_status() + return response.json() + async def get_repository( self, owner: str, @@ -183,6 +189,81 @@ class ForgejoAPIClient: data = response.json() return list(data) if isinstance(data, list) else [] + async def list_issue_comments( + self, + owner: str, + repo: str, + issue_number: int, + limit: int = 50, + ) -> list[dict[str, object]]: + """List all comments for an issue.""" + client = await self._get_client() + comments: list[dict[str, object]] = [] + page = 1 + while True: + response = await client.get( + f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/comments", + params={"limit": limit, "page": page}, + ) + response.raise_for_status() + data = response.json() + page_items = data if isinstance(data, list) else [] + comments.extend(item for item in page_items if isinstance(item, dict)) + if len(page_items) < limit: + break + page += 1 + return comments + + async def list_issue_timeline( + self, + owner: str, + repo: str, + issue_number: int, + limit: int = 50, + ) -> list[dict[str, object]]: + """List timeline events for an issue.""" + client = await self._get_client() + timeline: list[dict[str, object]] = [] + page = 1 + while True: + response = await client.get( + f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/timeline", + params={"limit": limit, "page": page}, + ) + response.raise_for_status() + data = response.json() + page_items = data if isinstance(data, list) else [] + timeline.extend(item for item in page_items if isinstance(item, dict)) + if len(page_items) < limit: + break + page += 1 + return timeline + + async def list_issue_reactions( + self, + owner: str, + repo: str, + issue_number: int, + limit: int = 50, + ) -> list[dict[str, object]]: + """List reactions for an issue.""" + client = await self._get_client() + reactions: list[dict[str, object]] = [] + page = 1 + while True: + response = await client.get( + f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}/reactions", + params={"limit": limit, "page": page}, + ) + response.raise_for_status() + data = response.json() + page_items = data if isinstance(data, list) else [] + reactions.extend(item for item in page_items if isinstance(item, dict)) + if len(page_items) < limit: + break + page += 1 + return reactions + async def list_user_repos( self, limit: int = 50, @@ -223,6 +304,7 @@ def get_forgejo_client( # Remove /api/v1 if present to get base URL if "/api/v1" in base_url: import re + match = re.match(r"(https?://[^/]+)", base_url) if match: base_url = match.group(1).rstrip("/") diff --git a/backend/app/services/forgejo_issue_sync.py b/backend/app/services/forgejo_issue_sync.py index 64f3dc4..e7ef840 100644 --- a/backend/app/services/forgejo_issue_sync.py +++ b/backend/app/services/forgejo_issue_sync.py @@ -18,6 +18,30 @@ from app.services.forgejo_client import get_forgejo_client logger = get_logger(__name__) +def _as_dict(value: object) -> dict[str, object] | None: + return value if isinstance(value, dict) else None + + +def _as_dict_list(value: object) -> list[dict[str, object]]: + if not isinstance(value, list): + return [] + return [item for item in value if isinstance(item, dict)] + + +def _author_login(issue_payload: dict[str, object]) -> str: + user = issue_payload.get("user") + if isinstance(user, dict): + login = user.get("login") + if isinstance(login, str): + return login + return "" + + +def _html_url(issue_payload: dict[str, object]) -> str: + value = issue_payload.get("html_url") + return value if isinstance(value, str) else "" + + class IssueSyncService: """Service for syncing Forgejo issues from remote repositories.""" @@ -75,12 +99,86 @@ class IssueSyncService: if issue_data.get("pull_request") is not None: continue - forgejo_number = issue_data.get("number", 0) + raw_number = issue_data.get("number", 0) + try: + forgejo_number = int(raw_number) + except (TypeError, ValueError): + forgejo_number = 0 state = issue_data.get("state", "open") + issue_payload = dict(issue_data) + comments_payload: list[dict[str, object]] = [] + timeline_payload: list[dict[str, object]] = [] + reactions_payload: list[dict[str, object]] = [] + + # Enrich each issue with full detail and exhaustive nested data. + try: + full_issue = await client.get_issue( + owner=repository.owner, + repo=repository.repo, + issue_number=forgejo_number, + ) + maybe_full = _as_dict(full_issue) + if maybe_full is not None: + issue_payload = maybe_full + except Exception as exc: + logger.warning( + "issue_detail_sync_failed", + repository_id=str(repository_id), + issue_number=forgejo_number, + error=str(exc), + ) + + try: + comments_payload = _as_dict_list( + await client.list_issue_comments( + owner=repository.owner, + repo=repository.repo, + issue_number=forgejo_number, + ) + ) + except Exception as exc: + logger.warning( + "issue_comments_sync_failed", + repository_id=str(repository_id), + issue_number=forgejo_number, + error=str(exc), + ) + + try: + timeline_payload = _as_dict_list( + await client.list_issue_timeline( + owner=repository.owner, + repo=repository.repo, + issue_number=forgejo_number, + ) + ) + except Exception as exc: + logger.warning( + "issue_timeline_sync_failed", + repository_id=str(repository_id), + issue_number=forgejo_number, + error=str(exc), + ) + + try: + reactions_payload = _as_dict_list( + await client.list_issue_reactions( + owner=repository.owner, + repo=repository.repo, + issue_number=forgejo_number, + ) + ) + except Exception as exc: + logger.warning( + "issue_reactions_sync_failed", + repository_id=str(repository_id), + issue_number=forgejo_number, + error=str(exc), + ) # Parse labels labels_data = [] - for label in issue_data.get("labels") or []: + for label in issue_payload.get("labels") or []: labels_data.append( { "id": label.get("id"), @@ -92,7 +190,7 @@ class IssueSyncService: # Parse assignees assignees_data = [] - for assignee in issue_data.get("assignees") or []: + for assignee in issue_payload.get("assignees") or []: assignees_data.append( { "login": assignee.get("login", ""), @@ -103,7 +201,7 @@ class IssueSyncService: # Parse milestone milestone_data = None - raw_milestone = issue_data.get("milestone") + raw_milestone = issue_payload.get("milestone") if raw_milestone and isinstance(raw_milestone, dict): milestone_data = { "id": raw_milestone.get("id"), @@ -115,14 +213,14 @@ class IssueSyncService: } # Full body and preview - raw_body = issue_data.get("body") or "" + raw_body = issue_payload.get("body") or "" body_full = raw_body if raw_body else None body_preview = raw_body[:1000] if raw_body else None # Parse dates — required fields fall back to utcnow(), optional closed_at stays None - created_at = self._parse_iso_date(issue_data.get("created_at")) or utcnow() - updated_at = self._parse_iso_date(issue_data.get("updated_at")) or utcnow() - closed_at = self._parse_iso_date(issue_data.get("closed_at")) + created_at = self._parse_iso_date(issue_payload.get("created_at")) or utcnow() + updated_at = self._parse_iso_date(issue_payload.get("updated_at")) or utcnow() + closed_at = self._parse_iso_date(issue_payload.get("closed_at")) # Check if issue exists existing = await self._find_issue(repository_id, forgejo_number) @@ -140,9 +238,12 @@ class IssueSyncService: labels=labels_data, assignees=assignees_data, milestone=milestone_data, - forgejo_payload=dict(issue_data), - author=issue_data.get("user", {}).get("login", ""), - html_url=issue_data.get("html_url", ""), + forgejo_payload=issue_payload, + forgejo_comments_payload=comments_payload, + forgejo_timeline_payload=timeline_payload, + forgejo_reactions_payload=reactions_payload, + author=_author_login(issue_payload), + html_url=_html_url(issue_payload), forgejo_created_at=created_at, forgejo_updated_at=updated_at, forgejo_closed_at=closed_at, @@ -158,9 +259,12 @@ class IssueSyncService: existing.labels = labels_data existing.assignees = assignees_data existing.milestone = milestone_data - existing.forgejo_payload = dict(issue_data) - existing.author = issue_data.get("user", {}).get("login", "") - existing.html_url = issue_data.get("html_url", "") + existing.forgejo_payload = issue_payload + existing.forgejo_comments_payload = comments_payload + existing.forgejo_timeline_payload = timeline_payload + existing.forgejo_reactions_payload = reactions_payload + existing.author = _author_login(issue_payload) + existing.html_url = _html_url(issue_payload) existing.forgejo_created_at = created_at existing.forgejo_updated_at = updated_at existing.forgejo_closed_at = closed_at diff --git a/backend/migrations/versions/d3f4a5b6c7d8_add_forgejo_issue_payload_cache.py b/backend/migrations/versions/d3f4a5b6c7d8_add_forgejo_issue_payload_cache.py index 0485148..269969d 100644 --- a/backend/migrations/versions/d3f4a5b6c7d8_add_forgejo_issue_payload_cache.py +++ b/backend/migrations/versions/d3f4a5b6c7d8_add_forgejo_issue_payload_cache.py @@ -22,7 +22,22 @@ def upgrade() -> None: "forgejo_issues", sa.Column("forgejo_payload", sa.JSON(), nullable=True), ) + op.add_column( + "forgejo_issues", + sa.Column("forgejo_comments_payload", sa.JSON(), nullable=False, server_default="[]"), + ) + op.add_column( + "forgejo_issues", + sa.Column("forgejo_timeline_payload", sa.JSON(), nullable=False, server_default="[]"), + ) + op.add_column( + "forgejo_issues", + sa.Column("forgejo_reactions_payload", sa.JSON(), nullable=False, server_default="[]"), + ) def downgrade() -> None: + op.drop_column("forgejo_issues", "forgejo_reactions_payload") + op.drop_column("forgejo_issues", "forgejo_timeline_payload") + op.drop_column("forgejo_issues", "forgejo_comments_payload") op.drop_column("forgejo_issues", "forgejo_payload")