"""Forgejo issue tracking metrics endpoints.""" from __future__ import annotations import asyncio import time as _time from datetime import timedelta from typing import TYPE_CHECKING from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import Date as SADate from sqlalchemy import and_ from sqlalchemy import cast as sa_cast from sqlmodel import func, select from app.api.deps import ORG_MEMBER_DEP, OrganizationContext from app.core.time import utcnow from app.db.session import async_session_maker, get_session from app.models.board_repository_links import BoardRepositoryLink from app.models.forgejo_commit_activity import ForgejoCommitDay from app.models.forgejo_connections import ForgejoConnection from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_repositories import ForgejoRepository from app.schemas.metrics import ( CommitActivityDay, HeatmapDay, HeatmapResponse, LastPushRead, MetricsResponse, ) from app.services.forgejo_client import ForgejoAPIClient if TYPE_CHECKING: from sqlmodel.ext.asyncio.session import AsyncSession # --------------------------------------------------------------------------- # Line-stats background cache # --------------------------------------------------------------------------- # Key: org_id string → (fetched_at, total_additions, total_deletions, has_data, day_counts) # day_counts maps "YYYY-MM-DD" → commit count for the heatmap grid. # Populated by a fire-and-forget asyncio task so the heatmap endpoint never # blocks waiting for Forgejo's 202 "still computing" response. # --------------------------------------------------------------------------- _line_stats_cache: dict[str, tuple[float, int, int, bool, dict[str, int]]] = {} _line_stats_fetching: set[str] = set() _LINE_STATS_TTL_HIT = 300 # 5 min — re-fetch cadence once real data is cached _LINE_STATS_TTL_MISS = 30 # 30 s — retry cadence while Forgejo is still computing async def _bg_fetch_line_stats( cache_key: str, # (owner, repo, base_url, token, repository_id, organization_id) repos: list[tuple[str, str, str, str | None, UUID, UUID]], since_iso: str, ) -> None: """Background task: sum commit line stats, persist per-repo day counts to DB.""" async def _one( owner: str, repo: str, base_url: str, token: str | None ) -> tuple[int, int, dict[str, int]]: try: async with ForgejoAPIClient(base_url=base_url, token=token) as client: return await client.get_commit_line_stats_since(owner, repo, since_iso) except Exception: return 0, 0, {} try: results = await asyncio.gather( *[_one(o, r, bu, tok) for o, r, bu, tok, _, _ in repos] ) total_adds = sum(a for a, _, _ in results) total_dels = sum(d for _, d, _ in results) merged_days: dict[str, int] = {} # Persist per-repo per-day counts to DB (upsert). try: from datetime import date as _date async with async_session_maker() as session: for (_, _, _, _, repo_id, org_id), (_, _, day_counts) in zip( repos, results ): for day_str, cnt in day_counts.items(): try: day_obj = _date.fromisoformat(day_str) except ValueError: continue existing = ( await session.exec( select(ForgejoCommitDay).where( ForgejoCommitDay.repository_id == repo_id, ForgejoCommitDay.date == day_obj, ) ) ).first() if existing is None: session.add( ForgejoCommitDay( organization_id=org_id, repository_id=repo_id, date=day_obj, commit_count=cnt, ) ) elif existing.commit_count != cnt: existing.commit_count = cnt existing.updated_at = utcnow() await session.commit() except Exception as db_exc: from app.core.logging import get_logger as _get_logger _get_logger(__name__).warning( "commit_activity_db_write_failed", extra={"error": str(db_exc)} ) for _, _, day_counts in results: for day, cnt in day_counts.items(): merged_days[day] = merged_days.get(day, 0) + cnt _line_stats_cache[cache_key] = ( _time.monotonic(), total_adds, total_dels, True, merged_days, ) finally: _line_stats_fetching.discard(cache_key) # --------------------------------------------------------------------------- # Last-push background cache # --------------------------------------------------------------------------- _last_push_cache: dict[str, tuple[float, LastPushRead | None]] = {} _last_push_fetching: set[str] = set() _LAST_PUSH_TTL = 60 # seconds async def _bg_fetch_last_push( cache_key: str, repos: list[tuple[str, str, str, str | None]], # (owner, repo, base_url, token) ) -> None: """Background task: find the most-recent commit across all tracked repos.""" async def _one( owner: str, repo: str, base_url: str, token: str | None ) -> tuple[str, str, dict] | None: try: async with ForgejoAPIClient(base_url=base_url, token=token) as client: commit = await client.get_last_commit(owner, repo) if not commit: return None return owner, repo, commit except Exception: return None try: results = await asyncio.gather( *[_one(o, r, bu, tok) for o, r, bu, tok in repos] ) best: tuple[str, str, dict] | None = None best_ts: str = "" for item in results: if item is None: continue owner, repo_name, commit = item commit_obj = commit.get("commit") or {} author_obj = commit_obj.get("author") or {} date_str: str = author_obj.get("date") or commit.get("created") or "" if date_str > best_ts: best_ts = date_str best = (owner, repo_name, commit) push: LastPushRead | None = None if best: owner, repo_name, commit = best commit_obj = commit.get("commit") or {} full_msg: str = commit_obj.get("message") or "" first_line = full_msg.splitlines()[0] if full_msg else "" sha_full: str = commit.get("sha") or "" sha_short = sha_full[:7] if sha_full else "" author_name: str = ( (commit.get("author") or {}).get("login") or (commit_obj.get("author") or {}).get("name") or "unknown" ) author_obj = commit_obj.get("author") or {} date_str = author_obj.get("date") or commit.get("created") or "" # Infer branch from commit refs if available, otherwise "—" branch = (commit.get("branch") or "").strip() or "—" push = LastPushRead( sha=sha_short, message=first_line, author=author_name, repo=f"{owner}/{repo_name}", branch=branch, pushed_at=date_str, ) _last_push_cache[cache_key] = (_time.monotonic(), push) finally: _last_push_fetching.discard(cache_key) router = APIRouter(prefix="/forgejo", tags=["forgejo-metrics"]) SESSION_DEP = Depends(get_session) # Use ORG_MEMBER_DEP directly, not wrapped in Depends again @router.get( "/metrics", response_model=MetricsResponse, summary="Forgejo issue tracking metrics", description=( "Get aggregated metrics for Forgejo issues across linked repositories. " "Supports filtering by organization_id, board_id, or repository_id. " "Empty scope returns zeroed metrics." ), responses={ status.HTTP_200_OK: { "description": "Metrics retrieved successfully", "content": { "application/json": { "example": { "open_issues": 25, "closed_issues": 150, "closed_in_selected_range": 12, "selected_range_days": 7, "closed_last_7_days": 12, "closed_last_30_days": 35, "stale_open_issues": 5, "repositories_synced": 3, "repository_sync_error_count": 1, "last_sync_timestamps": { "repo_1": "2026-05-19T03:00:00+00:00", "repo_2": "2026-05-19T02:30:00+00:00", "repo_3": "2026-05-19T01:00:00+00:00", }, "sync_error_counts": { "repo_1": 0, "repo_2": 2, "repo_3": 0, }, } } }, }, status.HTTP_403_FORBIDDEN: { "description": "User lacks access to the board", }, }, ) async def get_forgejo_metrics( organization_id: UUID | None = Query( None, description="Filter by organization ID", ), board_id: UUID | None = Query( None, description="Filter by board ID (via linked repositories)", ), repository_id: UUID | None = Query( None, description="Filter by specific repository ID", ), closed_range_days: int = Query( 7, ge=1, le=365, description="Window (in days) for closed_in_selected_range", ), session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> MetricsResponse: """Get Forgejo issue tracking metrics. Filters: - organization_id: All boards/repositories in organization - board_id: All repositories linked to board - repository_id: Single repository Empty scope (no filters) returns zeroed metrics. """ # Determine scope if organization_id and organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) if repository_id: # Single repository if board_id or organization_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot combine repository_id with board_id or organization_id", ) repo_statement = select(ForgejoRepository).where( ForgejoRepository.id == repository_id, ForgejoRepository.organization_id == ctx.organization.id, ) repo = (await session.exec(repo_statement)).first() if repo is None: return _zeroed_metrics() repo_ids = [repo.id] elif board_id: # Board-scoped: get linked repositories link_statement = select(BoardRepositoryLink).where( BoardRepositoryLink.board_id == board_id, BoardRepositoryLink.organization_id == ctx.organization.id, ) links = (await session.exec(link_statement)).all() repo_ids = [link.repository_id for link in links] if not repo_ids: return _zeroed_metrics() elif organization_id: # Organization-scoped: all repositories in org repo_statement = select(ForgejoRepository.id).where( ForgejoRepository.organization_id == ctx.organization.id, ) repos = (await session.exec(repo_statement)).all() repo_ids = list(repos) if not repo_ids: return _zeroed_metrics() else: # No filters - return zeroed metrics return _zeroed_metrics() # Calculate metrics # 1. Open issues count open_statement = select(func.count(ForgejoIssue.id)).where( and_( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.state == "open", ForgejoIssue.is_pull_request.is_(False), ) ) open_count = await session.exec(open_statement) open_issues = open_count.one_or_none() or 0 # 2. Closed issues count closed_statement = select(func.count(ForgejoIssue.id)).where( and_( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.state == "closed", ForgejoIssue.is_pull_request.is_(False), ) ) closed_count = await session.exec(closed_statement) closed_issues = closed_count.one_or_none() or 0 # 3. Closed in selected range now = utcnow() selected_range_start = now - timedelta(days=closed_range_days) closed_selected_statement = select(func.count(ForgejoIssue.id)).where( and_( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.state == "closed", ForgejoIssue.is_pull_request.is_(False), ForgejoIssue.forgejo_closed_at >= selected_range_start, ) ) closed_selected_count = await session.exec(closed_selected_statement) closed_in_selected_range = closed_selected_count.one_or_none() or 0 # 3. Closed in last 7 days seven_days_ago = now - timedelta(days=7) closed_7_statement = select(func.count(ForgejoIssue.id)).where( and_( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.state == "closed", ForgejoIssue.is_pull_request.is_(False), ForgejoIssue.forgejo_closed_at >= seven_days_ago, ) ) closed_7_count = await session.exec(closed_7_statement) closed_last_7_days = closed_7_count.one_or_none() or 0 # 4. Closed in last 30 days thirty_days_ago = now - timedelta(days=30) closed_30_statement = select(func.count(ForgejoIssue.id)).where( and_( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.state == "closed", ForgejoIssue.is_pull_request.is_(False), ForgejoIssue.forgejo_closed_at >= thirty_days_ago, ) ) closed_30_count = await session.exec(closed_30_statement) closed_last_30_days = closed_30_count.one_or_none() or 0 # 5. Stale open issues (open > 14 days with no update) fourteen_days_ago = now - timedelta(days=14) stale_statement = select(func.count(ForgejoIssue.id)).where( and_( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.state == "open", ForgejoIssue.is_pull_request.is_(False), ForgejoIssue.forgejo_updated_at < fourteen_days_ago, ) ) stale_count = await session.exec(stale_statement) stale_open_issues = stale_count.one_or_none() or 0 # 6. Get sync status per repository last_sync_timestamps: dict[str, str] = {} sync_error_counts: dict[str, int] = {} for repo_id in repo_ids: # Get repository sync info repo_statement = select(ForgejoRepository).where( ForgejoRepository.id == repo_id ) repo = (await session.exec(repo_statement)).first() if repo: repo_key = str(repo_id) last_sync_timestamps[repo_key] = ( repo.last_sync_at.isoformat() if repo.last_sync_at else "" ) sync_error_counts[repo_key] = 1 if repo.last_sync_error else 0 repositories_synced = len(repo_ids) repository_sync_error_count = sum(sync_error_counts.values()) return MetricsResponse( open_issues=open_issues, closed_issues=closed_issues, closed_in_selected_range=closed_in_selected_range, selected_range_days=closed_range_days, closed_last_7_days=closed_last_7_days, closed_last_30_days=closed_last_30_days, stale_open_issues=stale_open_issues, repositories_synced=repositories_synced, repository_sync_error_count=repository_sync_error_count, last_sync_timestamps=last_sync_timestamps, sync_error_counts=sync_error_counts, ) @router.get( "/heatmap", response_model=HeatmapResponse, summary="Forgejo issue activity heatmap", description="Daily issue open+close event counts for the last year, scoped to the caller's organisation.", ) async def get_forgejo_heatmap( organization_id: UUID | None = Query(None, description="Filter by organisation ID"), repository_id: UUID | None = Query( None, description="Filter by a single repository ID" ), session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> HeatmapResponse: """Return per-day issue event counts and total line contributions for the last year.""" if organization_id and organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) since = utcnow() - timedelta(days=365) # Line-stats fetch uses a shorter window — querying a full year of commits # with stat=true paginates hundreds of pages and takes 30+ seconds on active # repos. 90 days is enough context for the dashboard summary numbers. line_stats_since = utcnow() - timedelta(days=90) # Fetch scoped repos with their connections in one query repos_statement = ( select(ForgejoRepository, ForgejoConnection) .join( ForgejoConnection, ForgejoRepository.connection_id == ForgejoConnection.id ) .where(ForgejoRepository.organization_id == ctx.organization.id) ) if repository_id: repos_statement = repos_statement.where(ForgejoRepository.id == repository_id) repos_with_conns = (await session.exec(repos_statement)).all() if not repos_with_conns: return HeatmapResponse(days=[], max_count=0) repo_ids = [repo.id for repo, _ in repos_with_conns] # Line stats — served from background cache; fire refresh if stale. since_iso = line_stats_since.strftime("%Y-%m-%dT%H:%M:%SZ") cache_key = ( f"repo:{repository_id}" if repository_id else f"org:{ctx.organization.id}" ) cached = _line_stats_cache.get(cache_key) now = _time.monotonic() ttl = _LINE_STATS_TTL_HIT if (cached and cached[3]) else _LINE_STATS_TTL_MISS if cache_key not in _line_stats_fetching and ( cached is None or now - cached[0] > ttl ): # Normalise base_url the same way get_forgejo_client() does, eagerly, # so the background task never touches a potentially-closed session. import re as _re repo_tuples: list[tuple[str, str, str, str | None, UUID, UUID]] = [] for repo, conn in repos_with_conns: bu = (conn.base_url or "").rstrip("/") if "/api/v1" in bu: m = _re.match(r"(https?://[^/]+)", bu) bu = m.group(1).rstrip("/") if m else bu repo_tuples.append( ( repo.owner, repo.repo, bu, getattr(conn, "token", None), repo.id, repo.organization_id, ) ) _line_stats_fetching.add(cache_key) asyncio.create_task( _bg_fetch_line_stats(cache_key, repo_tuples, since_iso), name=f"line-stats-{cache_key}", ) if cached is not None: _, total_additions, total_deletions, has_line_stats, commit_day_counts = cached else: total_additions = total_deletions = 0 has_line_stats = False commit_day_counts = {} # Heatmap grid: prefer commit-per-day counts from DB (persistent, no restarts); # fall back to in-memory cache, then to issue-event counts while the first # background fetch is in progress. commit_db_rows = ( await session.exec( select( ForgejoCommitDay.date, func.sum(ForgejoCommitDay.commit_count).label("cnt"), ) .where( ForgejoCommitDay.repository_id.in_(repo_ids), ForgejoCommitDay.date >= line_stats_since.date(), ) .group_by(ForgejoCommitDay.date) .order_by(ForgejoCommitDay.date) ) ).all() if commit_db_rows: days_list = [ HeatmapDay(date=str(day), count=int(cnt)) for day, cnt in commit_db_rows ] max_count = max((d.count for d in days_list), default=0) elif has_line_stats and commit_day_counts: # In-memory cache hit but DB write hasn't landed yet days_list = [ HeatmapDay(date=k, count=v) for k, v in sorted(commit_day_counts.items()) ] max_count = max((d.count for d in days_list), default=0) else: # First-ever load — fall back to issue-event counts from DB while background fetch runs counts: dict[str, int] = {} created_rows = ( await session.exec( select( sa_cast(ForgejoIssue.forgejo_created_at, SADate).label("day"), func.count().label("cnt"), ) .where( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.is_pull_request.is_(False), ForgejoIssue.forgejo_created_at.is_not(None), ForgejoIssue.forgejo_created_at >= since, ) .group_by(sa_cast(ForgejoIssue.forgejo_created_at, SADate)) ) ).all() for day, cnt in created_rows: if day: key = str(day) counts[key] = counts.get(key, 0) + int(cnt) closed_rows = ( await session.exec( select( sa_cast(ForgejoIssue.forgejo_closed_at, SADate).label("day"), func.count().label("cnt"), ) .where( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.is_pull_request.is_(False), ForgejoIssue.forgejo_closed_at.is_not(None), ForgejoIssue.forgejo_closed_at >= since, ) .group_by(sa_cast(ForgejoIssue.forgejo_closed_at, SADate)) ) ).all() for day, cnt in closed_rows: if day: key = str(day) counts[key] = counts.get(key, 0) + int(cnt) days_list = [HeatmapDay(date=k, count=v) for k, v in sorted(counts.items())] max_count = max((d.count for d in days_list), default=0) return HeatmapResponse( days=days_list, max_count=max_count, total_additions=total_additions, total_deletions=total_deletions, has_line_stats=has_line_stats, ) @router.get( "/last-push", response_model=LastPushRead | None, summary="Most-recent commit across all tracked repositories", ) async def get_last_push( organization_id: UUID | None = Query(None), repository_id: UUID | None = Query(None), session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> LastPushRead | None: """Return the most-recently committed commit across all tracked repos, served from cache.""" if organization_id and organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) repos_statement = ( select(ForgejoRepository, ForgejoConnection) .join( ForgejoConnection, ForgejoRepository.connection_id == ForgejoConnection.id ) .where(ForgejoRepository.organization_id == ctx.organization.id) ) if repository_id: repos_statement = repos_statement.where(ForgejoRepository.id == repository_id) repos_with_conns = (await session.exec(repos_statement)).all() if not repos_with_conns: return None import re as _re cache_key = ( f"last-push:repo:{repository_id}" if repository_id else f"last-push:org:{ctx.organization.id}" ) cached = _last_push_cache.get(cache_key) now = _time.monotonic() if cache_key not in _last_push_fetching and ( cached is None or now - cached[0] > _LAST_PUSH_TTL ): repo_tuples: list[tuple[str, str, str, str | None]] = [] for repo, conn in repos_with_conns: bu = (conn.base_url or "").rstrip("/") if "/api/v1" in bu: m = _re.match(r"(https?://[^/]+)", bu) bu = m.group(1).rstrip("/") if m else bu repo_tuples.append( (repo.owner, repo.repo, bu, getattr(conn, "token", None)) ) _last_push_fetching.add(cache_key) asyncio.create_task( _bg_fetch_last_push(cache_key, repo_tuples), name=f"last-push-{cache_key}", ) return cached[1] if cached is not None else None @router.get( "/commit-activity", response_model=list[CommitActivityDay], summary="Per-day commit counts from persistent DB store", description=( "Returns commit counts per calendar day from the forgejo_commit_activity table. " "Populated by the background line-stats fetch; accurate across restarts. " "Scope by organization_id or repository_id; defaults to the caller's full org." ), ) async def get_commit_activity( organization_id: UUID | None = Query(None, description="Filter by organisation ID"), repository_id: UUID | None = Query( None, description="Filter by a single repository ID" ), days: int = Query( default=90, ge=1, le=365, description="How many days back to return" ), session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> list[CommitActivityDay]: """Return aggregated commit-per-day counts from the DB.""" if organization_id and organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) since = (utcnow() - timedelta(days=days)).date() if repository_id: # Verify repo belongs to this org repo = ( await session.exec( select(ForgejoRepository).where( ForgejoRepository.id == repository_id, ForgejoRepository.organization_id == ctx.organization.id, ) ) ).first() if repo is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) rows = ( await session.exec( select(ForgejoCommitDay.date, ForgejoCommitDay.commit_count) .where( ForgejoCommitDay.repository_id == repository_id, ForgejoCommitDay.date >= since, ) .order_by(ForgejoCommitDay.date) ) ).all() else: # Org-wide: aggregate across all repos repo_ids_rows = ( await session.exec( select(ForgejoRepository.id).where( ForgejoRepository.organization_id == ctx.organization.id ) ) ).all() repo_ids = list(repo_ids_rows) if not repo_ids: return [] rows = ( await session.exec( select( ForgejoCommitDay.date, func.sum(ForgejoCommitDay.commit_count).label("commit_count"), ) .where( ForgejoCommitDay.repository_id.in_(repo_ids), ForgejoCommitDay.date >= since, ) .group_by(ForgejoCommitDay.date) .order_by(ForgejoCommitDay.date) ) ).all() return [ CommitActivityDay(date=str(day), commit_count=int(cnt)) for day, cnt in rows ] def _zeroed_metrics() -> MetricsResponse: """Return zeroed metrics for empty scopes.""" return MetricsResponse( open_issues=0, closed_issues=0, closed_in_selected_range=0, selected_range_days=7, closed_last_7_days=0, closed_last_30_days=0, stale_open_issues=0, repositories_synced=0, repository_sync_error_count=0, last_sync_timestamps={}, sync_error_counts={}, )