"""Forgejo issue tracking metrics endpoints.""" from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlmodel import select, func from sqlalchemy import and_ from app.api.deps import ORG_MEMBER_DEP, OrganizationContext from app.core.agent_auth import get_agent_auth_context, AgentAuthContext from app.db.session import get_session from app.models.board_repository_links import BoardRepositoryLink from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_repositories import ForgejoRepository from app.schemas.metrics import MetricsResponse from app.core.agent_auth import get_agent_auth_context, AgentAuthContext if TYPE_CHECKING: from sqlmodel.ext.asyncio.session import AsyncSession router = APIRouter(prefix="/forgejo", tags=["forgejo-metrics"]) SESSION_DEP = Depends(get_session) # Use ORG_MEMBER_DEP directly, not wrapped in Depends again @router.get( "/metrics", response_model=MetricsResponse, summary="Forgejo issue tracking metrics", description=( "Get aggregated metrics for Forgejo issues across linked repositories. " "Supports filtering by organization_id, board_id, or repository_id. " "Empty scope returns zeroed metrics." ), responses={ status.HTTP_200_OK: { "description": "Metrics retrieved successfully", "content": { "application/json": { "example": { "open_issues": 25, "closed_issues": 150, "closed_last_7_days": 12, "closed_last_30_days": 35, "stale_open_issues": 5, "repositories_synced": 3, "last_sync_timestamps": { "repo_1": "2026-05-19T03:00:00+00:00", "repo_2": "2026-05-19T02:30:00+00:00", "repo_3": "2026-05-19T01:00:00+00:00", }, "sync_error_counts": { "repo_1": 0, "repo_2": 2, "repo_3": 0, }, } } }, }, status.HTTP_403_FORBIDDEN: { "description": "User lacks access to the board", }, }, ) async def get_forgejo_metrics( organization_id: str | None = Query( None, description="Filter by organization ID", ), board_id: str | None = Query( None, description="Filter by board ID (via linked repositories)", ), repository_id: str | None = Query( None, description="Filter by specific repository ID", ), session: AsyncSession = SESSION_DEP, ctx: OrganizationContext = ORG_MEMBER_DEP, ) -> MetricsResponse: """Get Forgejo issue tracking metrics. Filters: - organization_id: All boards/repositories in organization - board_id: All repositories linked to board - repository_id: Single repository Empty scope (no filters) returns zeroed metrics. """ # Determine scope if repository_id: # Single repository repo_ids = [repository_id] if board_id or organization_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot combine repository_id with board_id or organization_id", ) elif board_id: # Board-scoped: get linked repositories link_statement = select(BoardRepositoryLink).where( BoardRepositoryLink.board_id == board_id ) links = (await session.exec(link_statement)).all() repo_ids = [link.repository_id for link in links] if not repo_ids: return _zeroed_metrics() elif organization_id: # Organization-scoped: all repositories in org repo_statement = select(ForgejoRepository.id).where( ForgejoRepository.organization_id == organization_id ) repos = (await session.exec(repo_statement)).all() repo_ids = [r.id for r in repos] if not repo_ids: return _zeroed_metrics() else: # No filters - return zeroed metrics return _zeroed_metrics() # Calculate metrics # 1. Open issues count open_statement = select(func.count(ForgejoIssue.id)).where( and_( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.state == "open", ForgejoIssue.is_pull_request.is_(False), ) ) open_count = await session.exec(open_statement) open_issues = open_count.one_or_none() or 0 # 2. Closed issues count closed_statement = select(func.count(ForgejoIssue.id)).where( and_( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.state == "closed", ForgejoIssue.is_pull_request.is_(False), ) ) closed_count = await session.exec(closed_statement) closed_issues = closed_count.one_or_none() or 0 # 3. Closed in last 7 days now = datetime.now(timezone.utc) seven_days_ago = now - timedelta(days=7) closed_7_statement = select(func.count(ForgejoIssue.id)).where( and_( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.state == "closed", ForgejoIssue.is_pull_request.is_(False), ForgejoIssue.updated_at >= seven_days_ago, ) ) closed_7_count = await session.exec(closed_7_statement) closed_last_7_days = closed_7_count.one_or_none() or 0 # 4. Closed in last 30 days thirty_days_ago = now - timedelta(days=30) closed_30_statement = select(func.count(ForgejoIssue.id)).where( and_( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.state == "closed", ForgejoIssue.is_pull_request.is_(False), ForgejoIssue.updated_at >= thirty_days_ago, ) ) closed_30_count = await session.exec(closed_30_statement) closed_last_30_days = closed_30_count.one_or_none() or 0 # 5. Stale open issues (open > 14 days with no update) fourteen_days_ago = now - timedelta(days=14) stale_statement = select(func.count(ForgejoIssue.id)).where( and_( ForgejoIssue.repository_id.in_(repo_ids), ForgejoIssue.state == "open", ForgejoIssue.is_pull_request.is_(False), ForgejoIssue.updated_at < fourteen_days_ago, ) ) stale_count = await session.exec(stale_statement) stale_open_issues = stale_count.one_or_none() or 0 # 6. Get sync status per repository last_sync_timestamps: dict[str, str] = {} sync_error_counts: dict[str, int] = {} for repo_id in repo_ids: # Get repository sync info repo_statement = select(ForgejoRepository).where( ForgejoRepository.id == repo_id ) repo = (await session.exec(repo_statement)).first() if repo: last_sync_timestamps[repo_id] = ( repo.last_sync_at.isoformat() if repo.last_sync_at else "" ) sync_error_counts[repo_id] = 1 if repo.last_sync_error else 0 repositories_synced = len(repo_ids) return MetricsResponse( open_issues=open_issues, closed_issues=closed_issues, closed_last_7_days=closed_last_7_days, closed_last_30_days=closed_last_30_days, stale_open_issues=stale_open_issues, repositories_synced=repositories_synced, last_sync_timestamps=last_sync_timestamps, sync_error_counts=sync_error_counts, ) def _zeroed_metrics() -> MetricsResponse: """Return zeroed metrics for empty scopes.""" return MetricsResponse( open_issues=0, closed_issues=0, closed_last_7_days=0, closed_last_30_days=0, stale_open_issues=0, repositories_synced=0, last_sync_timestamps={}, sync_error_counts={}, )