Pipeline/backend/app/api/agent_forgejo.py

604 lines
21 KiB
Python
Raw Normal View History

"""Agent-scoped Forgejo issue read APIs for board-linked repositories."""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlmodel import select, func
from sqlalchemy import and_
2026-05-22 01:44:39 -05:00
from app.core.logging import get_logger
2026-05-20 03:49:57 -05:00
from app.api.deps import get_board_for_actor_read
2026-05-22 01:44:39 -05:00
logger = get_logger(__name__)
from app.core.agent_auth import get_agent_auth_context
2026-05-22 01:44:39 -05:00
from app.db import crud
from app.db.session import get_session
2026-05-22 01:44:39 -05:00
from app.models.agents import Agent
2026-05-20 03:49:57 -05:00
from app.models.boards import Board
from app.models.board_repository_links import BoardRepositoryLink
from app.models.forgejo_issues import ForgejoIssue
2026-05-22 01:44:39 -05:00
from app.models.gateways import Gateway
from app.schemas.forgejo_issues import ForgejoIssueRead, ForgejoIssueListResponse, CloseIssueResponse
2026-05-20 03:49:57 -05:00
from app.services.activity_log import record_activity
from app.services.forgejo_issue_close import (
CloseIssueAccessError,
CloseIssueNotFoundError,
CloseIssueRemoteError,
close_issue_by_id,
)
2026-05-22 01:44:39 -05:00
from app.services.forgejo_issue_create import create_issue_on_board_repositories
from app.services.issue_template import render_error_report
from app.services.openclaw.gateway_rpc import GatewayConfig, send_message
2026-05-20 03:49:57 -05:00
from app.services.openclaw.policies import OpenClawAuthorizationPolicy
2026-05-22 01:44:39 -05:00
from sqlmodel import SQLModel
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.agent_auth import AgentAuthContext
router = APIRouter(prefix="/agent/boards", tags=["agent-board-issues"])
SESSION_DEP = Depends(get_session)
BOARD_READ_DEP = Depends(get_board_for_actor_read)
AGENT_CTX_DEP = Depends(get_agent_auth_context)
def _agent_board_openapi_hints(
*,
intent: str,
when_to_use: list[str],
routing_examples: list[dict[str, object]],
required_actor: str = "any_agent",
when_not_to_use: list[str] | None = None,
routing_policy: list[str] | None = None,
) -> dict[str, object]:
"""Generate LLM routing hints for board-scoped agent endpoints."""
return {
"x-llm-intent": intent,
"x-when-to-use": when_to_use,
"x-when-not-to-use": when_not_to_use
or [
"Use a more specific endpoint for direct state mutation or direct messaging.",
],
"x-required-actor": required_actor,
"x-prerequisites": [
"Authenticated agent token",
"Board access is validated before execution",
],
"x-side-effects": [
"Read-only access to issues linked to board repositories",
],
"x-negative-guidance": [
"Avoid this endpoint when a focused sibling endpoint handles the action.",
],
"x-routing-policy": routing_policy
or [
"Use when the request intent matches this board-scoped route.",
"Prefer dedicated mutation/read routes once intent is narrowed.",
],
"x-routing-policy-examples": routing_examples,
}
@router.get(
"/{board_id}/git/issues",
response_model=ForgejoIssueListResponse,
summary="List issues for board's linked repositories",
description=(
"List Forgejo issues from repositories linked to the specified board.\n\n"
"Use this endpoint when an agent needs to discover issues across all "
"repositories associated with its assigned board.\n\n"
"LLM Routing Guidance:\n"
"- Use when you need to list issues for a specific board's repositories\n"
"- Filter by state, search text, or assignee for targeted results\n"
"- Use this instead of generic issue lists when board context matters\n"
"- Exclude pull requests automatically"
),
openapi_extra=_agent_board_openapi_hints(
intent="board_issue_discovery",
when_to_use=[
"Need to list issues for a specific board's repositories",
"Looking for issues to assign or track",
"Discovering work items related to a board's scope",
],
when_not_to_use=[
"Listing all issues across all repositories (use forgejo issues list)",
"Working with issues from unlinked repositories",
"Need repository-agnostic issue search",
],
routing_examples=[
{
"input": {
"intent": "list issues for board xyz",
"board_id": "uuid",
},
"decision": "agent_board_list_issues",
},
{
"input": {
"intent": "find issues assigned to me",
"assignee": "me",
},
"decision": "agent_board_list_issues (with assignee filter)",
},
],
),
)
async def list_board_issues(
board_id: UUID,
session: AsyncSession = SESSION_DEP,
board: ForgejoIssue = BOARD_READ_DEP,
state: str | None = Query(default=None, description="Filter by issue state (open/closed)"),
search: str | None = Query(default=None, description="Search in title/body"),
page: int = Query(default=1, ge=1, description="Page number"),
limit: int = Query(default=30, ge=1, le=100, description="Items per page"),
) -> ForgejoIssueListResponse:
"""List issues for repositories linked to a board."""
# Get linked repositories
link_statement = select(BoardRepositoryLink).where(
BoardRepositoryLink.board_id == board_id
)
links = (await session.exec(link_statement)).all()
if not links:
return ForgejoIssueListResponse(
items=[],
total=0,
page=page,
limit=limit,
)
repository_ids = [link.repository_id for link in links]
# Build base query
statement = select(ForgejoIssue).where(
ForgejoIssue.repository_id.in_(repository_ids)
)
# Apply filters
if state:
statement = statement.where(ForgejoIssue.state == state)
if search:
statement = statement.where(
ForgejoIssue.title.ilike(f"%{search}%") |
ForgejoIssue.body_preview.ilike(f"%{search}%")
)
# Count total
count_statement = select(func.count(ForgejoIssue.id)).where(
ForgejoIssue.repository_id.in_(repository_ids)
)
if state:
count_statement = count_statement.where(ForgejoIssue.state == state)
if search:
count_statement = count_statement.where(
ForgejoIssue.title.ilike(f"%{search}%") |
ForgejoIssue.body_preview.ilike(f"%{search}%")
)
total = await session.scalar(count_statement) or 0
# Apply pagination and execute
offset = (page - 1) * limit
statement = statement.offset(offset).limit(limit)
issues = (await session.exec(statement)).all()
return ForgejoIssueListResponse(
items=[ForgejoIssueRead.model_validate(issue) for issue in issues],
total=total,
page=page,
limit=limit,
)
@router.get(
"/{board_id}/git/issues/{issue_id}",
response_model=ForgejoIssueRead,
summary="Read one issue from board-linked repositories",
description=(
"Read a specific Forgejo issue by id, ensuring it belongs to a repository "
"linked to the specified board.\n\n"
"Use this endpoint when an agent needs to inspect a specific issue "
"within the context of a board's repositories."
),
openapi_extra=_agent_board_openapi_hints(
intent="board_issue_inspection",
when_to_use=[
"Need to read a specific issue's details",
"Verifying issue state before taking action",
"Inspecting issue metadata for board context",
],
when_not_to_use=[
"Listing multiple issues (use list endpoint)",
"Working with issues from unlinked repositories",
],
routing_examples=[
{
"input": {
"intent": "read issue 123 for board xyz",
"board_id": "uuid",
"issue_id": "uuid",
},
"decision": "agent_board_read_issue",
},
],
),
)
async def read_board_issue(
board_id: UUID,
issue_id: UUID,
session: AsyncSession = SESSION_DEP,
board: ForgejoIssue = BOARD_READ_DEP,
) -> ForgejoIssueRead:
"""Read one issue from board-linked repositories."""
# Get linked repositories
link_statement = select(BoardRepositoryLink).where(
BoardRepositoryLink.board_id == board_id
)
links = (await session.exec(link_statement)).all()
if not links:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No repositories linked to this board",
)
repository_ids = [link.repository_id for link in links]
# Find issue
statement = select(ForgejoIssue).where(
and_(
ForgejoIssue.id == issue_id,
ForgejoIssue.repository_id.in_(repository_ids),
)
)
issue = (await session.exec(statement)).first()
if issue is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Issue not found or not linked to this board",
)
return ForgejoIssueRead.model_validate(issue)
@router.post(
"/{board_id}/git/issues/{issue_id}/close",
response_model=CloseIssueResponse,
summary="Close a Forgejo issue (agent)",
description=(
"Close a Forgejo issue by its local ID as a board-lead agent. "
"Only board lead agents can close issues. The issue must belong to a "
"repository linked to the target board."
),
openapi_extra=_agent_board_openapi_hints(
intent="agent_board_close_issue",
when_to_use=[
"Lead agent needs to close a specific issue",
"Finalizing work items after completion",
"Closing issues after deployment or release",
],
when_not_to_use=[
"Worker agents closing issues (use human endpoint or ask lead)",
"Working with issues from unlinked repositories",
],
required_actor="board_lead",
routing_examples=[
{
"input": {
"intent": "close issue 123 for board xyz",
"board_id": "uuid",
"issue_id": "uuid",
},
"decision": "agent_board_close_issue",
},
],
),
responses={
status.HTTP_200_OK: {
"description": "Issue closed successfully",
"content": {
"application/json": {
"example": {
"success": True,
"issue_id": "123e4567-e89b-12d3-a456-426614174000",
"forgejo_issue_number": 42,
"state": "closed",
"forgejo_closed_at": "2026-05-19T03:43:00+00:00",
"last_synced_at": "2026-05-19T03:43:00+00:00",
}
}
},
},
status.HTTP_404_NOT_FOUND: {
"description": "Issue not found or not linked to this board",
},
status.HTTP_403_FORBIDDEN: {
"description": "Caller is not board lead",
},
status.HTTP_409_CONFLICT: {
"description": "Organization mismatch or access denied",
},
status.HTTP_502_BAD_GATEWAY: {
"description": "Forgejo API call failed",
},
},
)
async def close_board_issue(
board_id: UUID,
issue_id: UUID,
session: AsyncSession = SESSION_DEP,
2026-05-20 03:49:57 -05:00
board: Board = BOARD_READ_DEP,
agent_ctx: AgentAuthContext = AGENT_CTX_DEP,
) -> CloseIssueResponse:
"""Close a Forgejo issue as a board-lead agent.
Only board lead agents can close issues. The issue must belong to a repository
linked to the target board.
"""
2026-05-20 03:49:57 -05:00
links = (
await session.exec(
select(BoardRepositoryLink).where(
BoardRepositoryLink.organization_id == board.organization_id,
BoardRepositoryLink.board_id == board_id,
)
)
).all()
repository_ids = [link.repository_id for link in links]
if not repository_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Issue not found or not linked to this board",
)
statement = select(ForgejoIssue).where(
and_(
ForgejoIssue.id == issue_id,
2026-05-20 03:49:57 -05:00
ForgejoIssue.repository_id.in_(repository_ids),
),
)
issue = (await session.exec(statement)).first()
if issue is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Issue not found or not linked to this board",
)
2026-05-20 03:49:57 -05:00
# Verify agent is board lead.
OpenClawAuthorizationPolicy.require_board_lead_actor(
actor_agent=agent_ctx.agent,
detail="Only board leads can close issues",
)
2026-05-20 03:49:57 -05:00
# Close the issue using the service.
try:
result = await close_issue_by_id(
session=session,
issue_id=issue_id,
actor_agent_id=agent_ctx.agent.id,
)
except CloseIssueNotFoundError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
except CloseIssueAccessError as e:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
except CloseIssueRemoteError as e:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(e))
2026-05-20 03:49:57 -05:00
repository_full_name = str(result.get("repository_full_name") or "unknown/unknown")
record_activity(
session,
event_type="forgejo.issue.closed",
message=(
"Forgejo issue closed by agent "
f"{agent_ctx.agent.id}: {repository_full_name}#{result['forgejo_issue_number']}"
),
board_id=board_id,
agent_id=agent_ctx.agent.id,
)
await session.commit()
return CloseIssueResponse(
success=result["success"],
issue_id=result["issue_id"],
forgejo_issue_number=result["forgejo_issue_number"],
state=result["state"],
forgejo_closed_at=result.get("forgejo_closed_at"),
last_synced_at=result.get("last_synced_at") or "",
)
2026-05-22 01:44:39 -05:00
class AgentErrorReport(SQLModel):
"""Error report payload submitted by an agent."""
title: str
body: str
error_type: str | None = None
stack_trace: str | None = None
task_id: UUID | None = None
class AgentErrorReportResponse(SQLModel):
"""Response after an agent submits an error report."""
success: bool
board_id: UUID
escalation_agent_id: UUID | None
notification_sent: bool
issues_created: int
issue_ids: list[UUID]
@router.post(
"/{board_id}/errors",
response_model=AgentErrorReportResponse,
summary="Report an error to the board escalation agent",
description=(
"Report an error or code failure on the board.\n\n"
"**This endpoint is MANDATORY when `board.error_escalation_enabled` is `true`.**\n"
"Agents MUST call this endpoint whenever they encounter an error, exception, or "
"code failure — not doing so violates the board's error escalation rule.\n\n"
"The backend will:\n"
"1. Notify the designated escalation agent (or the board lead if none is set).\n"
"2. Automatically open a Forgejo issue on all linked repositories if "
"`board.error_auto_open_issue` is enabled."
),
openapi_extra=_agent_board_openapi_hints(
intent="agent_error_escalation",
required_actor="any_agent",
when_to_use=[
"Agent encounters a runtime error, exception, or unexpected failure",
"Code execution fails or produces incorrect results",
"Agent needs to escalate a blocking problem to a supervisor agent",
"Board error_escalation_enabled is true — this call is then MANDATORY",
],
when_not_to_use=[
"Board error_escalation_enabled is false",
"Non-critical warnings that do not block progress",
],
routing_policy=[
"MUST be called on any error when error_escalation_enabled is true on the board",
"Skipping this call when the rule is active violates board policy",
],
routing_examples=[
{
"input": {
"intent": "code execution raised an exception",
"board_rule": "error_escalation_enabled=true",
},
"decision": "call POST /agent/boards/{board_id}/errors immediately",
},
{
"input": {
"intent": "unexpected API failure blocked task completion",
"board_rule": "error_escalation_enabled=true",
},
"decision": "call POST /agent/boards/{board_id}/errors with full stack trace",
},
],
),
responses={
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Error escalation is not enabled on this board",
},
status.HTTP_502_BAD_GATEWAY: {
"description": "Could not notify escalation agent via gateway",
},
},
)
async def report_board_error(
board_id: UUID,
payload: AgentErrorReport,
session: AsyncSession = SESSION_DEP,
board: Board = BOARD_READ_DEP,
agent_ctx: AgentAuthContext = AGENT_CTX_DEP,
) -> AgentErrorReportResponse:
"""Report an error — mandatory when board.error_escalation_enabled is True."""
if not board.error_escalation_enabled:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Error escalation is not enabled on this board",
)
# Resolve the target escalation agent (explicit setting or fall back to board lead).
escalation_agent: Agent | None = None
if board.error_escalation_agent_id is not None:
escalation_agent = await crud.get_by_id(session, Agent, board.error_escalation_agent_id)
if escalation_agent is None:
escalation_agent = (
await session.exec(
select(Agent).where(
Agent.board_id == board_id,
Agent.is_board_lead.is_(True),
)
)
).first()
notification_sent = False
if escalation_agent is not None and escalation_agent.openclaw_session_id:
gateway = await crud.get_by_id(session, Gateway, escalation_agent.gateway_id)
if gateway is not None:
config = GatewayConfig(
url=gateway.url,
token=gateway.token,
allow_insecure_tls=gateway.allow_insecure_tls,
disable_device_pairing=gateway.disable_device_pairing,
)
parts = [
f"⚠️ ERROR ESCALATION from agent {agent_ctx.agent.name} on board {board.name}",
f"Title: {payload.title}",
f"Body: {payload.body}",
]
if payload.error_type:
parts.append(f"Error type: {payload.error_type}")
if payload.task_id:
parts.append(f"Task ID: {payload.task_id}")
if payload.stack_trace:
parts.append(f"Stack trace:\n{payload.stack_trace}")
message_text = "\n".join(parts)
try:
await send_message(
message_text,
session_key=escalation_agent.openclaw_session_id,
config=config,
deliver=True,
)
notification_sent = True
except Exception as exc:
logger.warning(
"error_escalation.notify_failed",
extra={
"board_id": str(board_id),
"escalation_agent_id": str(escalation_agent.id),
"error": str(exc),
},
)
# Optionally open a Forgejo issue on all linked repositories.
created_issues: list[ForgejoIssue] = []
if board.error_auto_open_issue:
issue_title = f"[Error] {payload.title}"
affected_area = None
if payload.error_type:
affected_area = f"- Error type: {payload.error_type}\n- Board: {board.name}"
issue_body = render_error_report(
summary=payload.title,
problem=payload.body,
affected_area=affected_area,
stack_trace=payload.stack_trace,
reporter=f"agent:{agent_ctx.agent.name}",
)
created_issues = await create_issue_on_board_repositories(
session,
board_id=board_id,
organization_id=board.organization_id,
title=issue_title,
body=issue_body,
)
record_activity(
session,
event_type="agent.error.reported",
message=(
f"Error reported by agent {agent_ctx.agent.name} on board {board.name}: "
f"{payload.title}"
),
board_id=board_id,
agent_id=agent_ctx.agent.id,
)
await session.commit()
return AgentErrorReportResponse(
success=True,
board_id=board_id,
escalation_agent_id=escalation_agent.id if escalation_agent else None,
notification_sent=notification_sent,
issues_created=len(created_issues),
issue_ids=[issue.id for issue in created_issues],
)