From 74756feb958bf8cb8556c4d4113f1d0a06183498 Mon Sep 17 00:00:00 2001 From: null Date: Fri, 22 May 2026 00:12:15 -0500 Subject: [PATCH] feat(scripts): tasking agents --- backend/app/api/forgejo_issues.py | 409 ++++++++++++++++++ backend/app/models/__init__.py | 2 + .../app/models/forgejo_issue_task_links.py | 39 ++ backend/app/schemas/forgejo_issues.py | 26 +- ...e5f6a7b8c9_add_forgejo_issue_task_links.py | 84 ++++ .../tests/test_forgejo_issue_tasking_api.py | 379 ++++++++++++++++ 6 files changed, 938 insertions(+), 1 deletion(-) create mode 100644 backend/app/models/forgejo_issue_task_links.py create mode 100644 backend/migrations/versions/d4e5f6a7b8c9_add_forgejo_issue_task_links.py create mode 100644 backend/tests/test_forgejo_issue_tasking_api.py diff --git a/backend/app/api/forgejo_issues.py b/backend/app/api/forgejo_issues.py index bb5a130..98c7504 100644 --- a/backend/app/api/forgejo_issues.py +++ b/backend/app/api/forgejo_issues.py @@ -11,10 +11,16 @@ from sqlmodel import func, select from app.api.deps import require_org_member from app.core.auth import AuthContext, get_auth_context +from app.core.time import utcnow from app.db import crud from app.db.session import get_session +from app.models.agents import Agent from app.models.board_repository_links import BoardRepositoryLink +from app.models.boards import Board +from app.models.forgejo_issue_task_links import ForgejoIssueTaskLink from app.models.forgejo_issues import ForgejoIssue +from app.models.forgejo_repositories import ForgejoRepository +from app.models.tasks import Task from app.schemas.forgejo_issues import ( CloseIssueResponse, EditIssueRequest, @@ -22,6 +28,8 @@ from app.schemas.forgejo_issues import ( ForgejoIssueDetailRead, ForgejoIssueListResponse, ForgejoIssueRead, + ForgejoIssueTaskRequest, + ForgejoIssueTaskResponse, PostCommentRequest, PostCommentResponse, ) @@ -54,6 +62,243 @@ AUTH_DEP = Depends(get_auth_context) ORG_MEMBER_DEP = Depends(require_org_member) +async def _linked_issue_board_ids( + session: AsyncSession, + *, + issue: ForgejoIssue, + organization_id: UUID, +) -> list[UUID]: + rows = ( + await session.exec( + select(BoardRepositoryLink.board_id).where( + BoardRepositoryLink.organization_id == organization_id, + BoardRepositoryLink.repository_id == issue.repository_id, + ) + ) + ).all() + return list(rows) + + +async def _resolve_issue_task_board( + session: AsyncSession, + *, + issue: ForgejoIssue, + requested_board_id: UUID | None, + ctx: OrganizationContext, +) -> Board: + linked_board_ids = await _linked_issue_board_ids( + session, + issue=issue, + organization_id=ctx.organization.id, + ) + if not linked_board_ids: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Issue repository is not linked to any board", + ) + + allowed_board_ids = set(await list_accessible_board_ids(session, member=ctx.member, write=True)) + authorized_board_ids = [ + board_id for board_id in linked_board_ids if board_id in allowed_board_ids + ] + if not authorized_board_ids: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Board access denied", + ) + + if requested_board_id is None: + if len(authorized_board_ids) > 1: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail="board_id is required because this issue is linked to multiple writable boards", + ) + board_id = authorized_board_ids[0] + else: + if requested_board_id not in linked_board_ids: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Issue repository is not linked to the requested board", + ) + if requested_board_id not in allowed_board_ids: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Board access denied", + ) + board_id = requested_board_id + + board = await crud.get_by_id(session, Board, board_id) + if board is None or board.organization_id != ctx.organization.id: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Board not found") + return board + + +async def _validate_issue_task_agent( + session: AsyncSession, + *, + board: Board, + assigned_agent_id: UUID | None, +) -> Agent | None: + if assigned_agent_id is None: + return None + agent = await crud.get_by_id(session, Agent, assigned_agent_id) + if agent is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent not found") + if agent.board_id != board.id: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail="Agent must belong to the selected board", + ) + if board.gateway_id is not None and agent.gateway_id not in {None, board.gateway_id}: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail="Agent gateway does not match the selected board", + ) + return agent + + +async def _issue_repository_name( + session: AsyncSession, + *, + issue: ForgejoIssue, +) -> str: + repository = await crud.get_by_id(session, ForgejoRepository, issue.repository_id) + if repository is None: + return str(issue.repository_id) + return repository.display_name or f"{repository.owner}/{repository.repo}" + + +def _task_status_for_issue_task( + payload: ForgejoIssueTaskRequest, + *, + assigned_agent_id: UUID | None, +) -> str: + if payload.status is not None: + return payload.status + if assigned_agent_id is not None and payload.start_immediately: + return "in_progress" + return "inbox" + + +def _issue_task_title(issue: ForgejoIssue) -> str: + return f"Git issue #{issue.forgejo_issue_number}: {issue.title}" + + +def _label_names(issue: ForgejoIssue) -> str: + names = [ + str(label.get("name")) + for label in issue.labels + if isinstance(label, dict) and label.get("name") + ] + return ", ".join(names) + + +def _issue_task_description( + *, + issue: ForgejoIssue, + repository_name: str, + instructions: str | None, +) -> str: + parts = [ + "Source: Forgejo issue", + f"Repository: {repository_name}", + f"Issue: #{issue.forgejo_issue_number}", + f"URL: {issue.html_url}", + f"State: {issue.state}", + ] + labels = _label_names(issue) + if labels: + parts.append(f"Labels: {labels}") + if issue.author: + parts.append(f"Author: {issue.author}") + if instructions and instructions.strip(): + parts.append(f"\nInstructions:\n{instructions.strip()}") + if issue.body: + parts.append(f"\nIssue body:\n{issue.body.strip()}") + elif issue.body_preview: + parts.append(f"\nIssue preview:\n{issue.body_preview.strip()}") + return "\n".join(parts) + + +async def _existing_issue_task( + session: AsyncSession, + *, + issue_id: UUID, + board_id: UUID, +) -> tuple[ForgejoIssueTaskLink, Task] | None: + link = ( + await session.exec( + select(ForgejoIssueTaskLink).where( + ForgejoIssueTaskLink.issue_id == issue_id, + ForgejoIssueTaskLink.board_id == board_id, + ) + ) + ).first() + if link is None: + return None + task = await crud.get_by_id(session, Task, link.task_id) + if task is None: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Issue task link points to a missing task", + ) + return link, task + + +async def _notify_issue_task_assignee( + *, + session: AsyncSession, + board: Board, + task: Task, + agent: Agent | None, +) -> None: + if agent is None: + return + from app.api.tasks import notify_agent_on_task_assign + + await notify_agent_on_task_assign( + session=session, + board=board, + task=task, + agent=agent, + ) + + +def _normalize_issue_task_priority(priority: str) -> str: + normalized = priority.strip().lower() + return normalized or "medium" + + +def _set_issue_task_status_fields(task: Task, status_value: str) -> None: + previous_status = task.status + task.status = status_value + if status_value == "in_progress": + if task.in_progress_at is None or previous_status != "in_progress": + task.in_progress_at = utcnow() + return + if status_value == "inbox": + task.in_progress_at = None + + +def _issue_task_response( + *, + created: bool, + issue: ForgejoIssue, + task: Task, + board_id: UUID, +) -> ForgejoIssueTaskResponse: + return ForgejoIssueTaskResponse( + success=True, + created=created, + issue_id=issue.id, + task_id=task.id, + board_id=board_id, + assigned_agent_id=task.assigned_agent_id, + status=task.status, + title=task.title, + ) + + @router.get("", response_model=ForgejoIssueListResponse) async def list_issues( session: AsyncSession = SESSION_DEP, @@ -214,6 +459,170 @@ async def get_issue( return ForgejoIssueDetailRead.model_validate(issue) +@router.post( + "/{issue_id}/task", + response_model=ForgejoIssueTaskResponse, + summary="Create or update a Pipeline task for a Forgejo issue", + responses={ + status.HTTP_200_OK: {"description": "Task created or existing task updated"}, + status.HTTP_401_UNAUTHORIZED: {"description": "User authentication required"}, + status.HTTP_403_FORBIDDEN: {"description": "User lacks board write access"}, + status.HTTP_404_NOT_FOUND: {"description": "Issue, board, or agent not found"}, + status.HTTP_409_CONFLICT: {"description": "Existing link points to a missing task"}, + status.HTTP_422_UNPROCESSABLE_CONTENT: { + "description": "Invalid issue, board, agent, or tasking request" + }, + }, +) +async def task_issue( + issue_id: str, + payload: ForgejoIssueTaskRequest | None = None, + session: AsyncSession = SESSION_DEP, + auth: AuthContext = AUTH_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> ForgejoIssueTaskResponse: + """Create or update a board task from a cached Forgejo issue.""" + payload = payload or ForgejoIssueTaskRequest() + try: + uuid = UUID(issue_id) + except ValueError: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail="Invalid issue_id format", + ) + + issue = await crud.get_by_id(session, ForgejoIssue, uuid) + if issue is None or issue.organization_id != ctx.organization.id: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Issue not found") + if issue.is_pull_request: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail="Pull requests cannot be tasked through the issue tasking endpoint", + ) + if auth.user is None: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + + board = await _resolve_issue_task_board( + session, + issue=issue, + requested_board_id=payload.board_id, + ctx=ctx, + ) + agent_requested = "assigned_agent_id" in payload.model_fields_set + status_requested = "status" in payload.model_fields_set + instructions_requested = "instructions" in payload.model_fields_set + priority_requested = "priority" in payload.model_fields_set + agent = await _validate_issue_task_agent( + session, + board=board, + assigned_agent_id=payload.assigned_agent_id if agent_requested else None, + ) + + existing = await _existing_issue_task(session, issue_id=issue.id, board_id=board.id) + if existing is not None: + _link, task = existing + previous_assignee_id = task.assigned_agent_id + previous_status = task.status + if agent_requested: + task.assigned_agent_id = agent.id if agent is not None else None + if status_requested: + _set_issue_task_status_fields(task, payload.status or "inbox") + elif agent_requested and agent is not None and payload.start_immediately: + _set_issue_task_status_fields(task, "in_progress") + elif agent_requested and agent is None: + _set_issue_task_status_fields(task, "inbox") + if priority_requested: + task.priority = _normalize_issue_task_priority(payload.priority) + if instructions_requested: + repository_name = await _issue_repository_name(session, issue=issue) + task.description = _issue_task_description( + issue=issue, + repository_name=repository_name, + instructions=payload.instructions, + ) + task.updated_at = utcnow() + session.add(task) + record_activity( + session, + event_type="forgejo.issue.task.updated", + task_id=task.id, + board_id=board.id, + message=f"Forgejo issue task updated: #{issue.forgejo_issue_number}.", + ) + await session.commit() + await session.refresh(task) + notify_agent = agent + if notify_agent is None and task.assigned_agent_id and task.status == "in_progress": + current_agent = await crud.get_by_id(session, Agent, task.assigned_agent_id) + if current_agent is not None and current_agent.board_id == board.id: + notify_agent = current_agent + should_notify = ( + notify_agent is not None + and task.assigned_agent_id is not None + and ( + task.assigned_agent_id != previous_assignee_id + or (previous_status != "in_progress" and task.status == "in_progress") + ) + ) + if notify_agent is not None and should_notify: + await _notify_issue_task_assignee( + session=session, + board=board, + task=task, + agent=notify_agent, + ) + return _issue_task_response(created=False, issue=issue, task=task, board_id=board.id) + + status_value = _task_status_for_issue_task( + payload, + assigned_agent_id=agent.id if agent is not None else None, + ) + repository_name = await _issue_repository_name(session, issue=issue) + task = Task( + board_id=board.id, + title=_issue_task_title(issue), + description=_issue_task_description( + issue=issue, + repository_name=repository_name, + instructions=payload.instructions, + ), + status=status_value, + priority=_normalize_issue_task_priority(payload.priority), + assigned_agent_id=agent.id if agent is not None else None, + created_by_user_id=auth.user.id, + auto_created=True, + auto_reason=f"forgejo_issue:{issue.id}", + in_progress_at=utcnow() if status_value == "in_progress" else None, + ) + session.add(task) + await session.flush() + session.add( + ForgejoIssueTaskLink( + organization_id=ctx.organization.id, + board_id=board.id, + issue_id=issue.id, + task_id=task.id, + created_by_user_id=auth.user.id, + ) + ) + record_activity( + session, + event_type="forgejo.issue.task.created", + task_id=task.id, + board_id=board.id, + message=f"Forgejo issue task created: #{issue.forgejo_issue_number}.", + ) + await session.commit() + await session.refresh(task) + await _notify_issue_task_assignee( + session=session, + board=board, + task=task, + agent=agent, + ) + return _issue_task_response(created=True, issue=issue, task=task, board_id=board.id) + + @router.post( "/{issue_id}/close", response_model=CloseIssueResponse, diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 85f34ec..1e0894a 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -13,6 +13,7 @@ from app.models.board_webhook_payloads import BoardWebhookPayload from app.models.board_webhooks import BoardWebhook from app.models.boards import Board from app.models.forgejo_connections import ForgejoConnection +from app.models.forgejo_issue_task_links import ForgejoIssueTaskLink from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_repositories import ForgejoRepository from app.models.gateways import Gateway @@ -49,6 +50,7 @@ __all__ = [ "Board", "BoardRepositoryLink", "ForgejoConnection", + "ForgejoIssueTaskLink", "ForgejoIssue", "ForgejoRepository", "Gateway", diff --git a/backend/app/models/forgejo_issue_task_links.py b/backend/app/models/forgejo_issue_task_links.py new file mode 100644 index 0000000..8f57e0e --- /dev/null +++ b/backend/app/models/forgejo_issue_task_links.py @@ -0,0 +1,39 @@ +"""Links between cached Forgejo issues and Pipeline tasks.""" + +from __future__ import annotations + +from datetime import datetime +from uuid import UUID, uuid4 + +from sqlalchemy import UniqueConstraint +from sqlmodel import Field + +from app.core.time import utcnow +from app.models.tenancy import TenantScoped + +RUNTIME_ANNOTATION_TYPES = (datetime,) + + +class ForgejoIssueTaskLink(TenantScoped, table=True): + """Board-scoped Pipeline task created from a cached Forgejo issue.""" + + __tablename__ = "forgejo_issue_task_links" # pyright: ignore[reportAssignmentType] + __table_args__ = ( + UniqueConstraint( + "board_id", + "issue_id", + name="uq_forgejo_issue_task_links_board_issue", + ), + UniqueConstraint( + "task_id", + name="uq_forgejo_issue_task_links_task", + ), + ) + + id: UUID = Field(default_factory=uuid4, primary_key=True) + organization_id: UUID = Field(foreign_key="organizations.id", index=True) + board_id: UUID = Field(foreign_key="boards.id", index=True) + issue_id: UUID = Field(foreign_key="forgejo_issues.id", index=True) + task_id: UUID = Field(foreign_key="tasks.id", index=True) + created_by_user_id: UUID | None = Field(default=None, foreign_key="users.id", index=True) + created_at: datetime = Field(default_factory=utcnow) diff --git a/backend/app/schemas/forgejo_issues.py b/backend/app/schemas/forgejo_issues.py index 1701874..b2bd87c 100644 --- a/backend/app/schemas/forgejo_issues.py +++ b/backend/app/schemas/forgejo_issues.py @@ -3,7 +3,7 @@ from __future__ import annotations from datetime import datetime -from typing import Any +from typing import Any, Literal from uuid import UUID from sqlmodel import SQLModel @@ -115,3 +115,27 @@ class EditIssueResponse(SQLModel): body: str | None = None state: str forgejo_updated_at: str + + +class ForgejoIssueTaskRequest(SQLModel): + """Request to create or update a Pipeline task for a Forgejo issue.""" + + board_id: UUID | None = None + assigned_agent_id: UUID | None = None + priority: str = "medium" + status: Literal["inbox", "in_progress"] | None = None + start_immediately: bool = True + instructions: str | None = None + + +class ForgejoIssueTaskResponse(SQLModel): + """Response for creating or finding a Pipeline task from a Forgejo issue.""" + + success: bool + created: bool + issue_id: UUID + task_id: UUID + board_id: UUID + assigned_agent_id: UUID | None = None + status: str + title: str diff --git a/backend/migrations/versions/d4e5f6a7b8c9_add_forgejo_issue_task_links.py b/backend/migrations/versions/d4e5f6a7b8c9_add_forgejo_issue_task_links.py new file mode 100644 index 0000000..c1d047f --- /dev/null +++ b/backend/migrations/versions/d4e5f6a7b8c9_add_forgejo_issue_task_links.py @@ -0,0 +1,84 @@ +"""add Forgejo issue to task links + +Revision ID: d4e5f6a7b8c9 +Revises: d3f4a5b6c7d8 +Create Date: 2026-05-22 00:00:00.000000 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +revision = "d4e5f6a7b8c9" +down_revision = "d3f4a5b6c7d8" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "forgejo_issue_task_links", + sa.Column("id", sa.Uuid(), nullable=False), + sa.Column("organization_id", sa.Uuid(), nullable=False), + sa.Column("board_id", sa.Uuid(), nullable=False), + sa.Column("issue_id", sa.Uuid(), nullable=False), + sa.Column("task_id", sa.Uuid(), nullable=False), + sa.Column("created_by_user_id", sa.Uuid(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(["board_id"], ["boards.id"]), + sa.ForeignKeyConstraint(["created_by_user_id"], ["users.id"]), + sa.ForeignKeyConstraint(["issue_id"], ["forgejo_issues.id"]), + sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"]), + sa.ForeignKeyConstraint(["task_id"], ["tasks.id"]), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("board_id", "issue_id", name="uq_forgejo_issue_task_links_board_issue"), + sa.UniqueConstraint("task_id", name="uq_forgejo_issue_task_links_task"), + ) + op.create_index( + op.f("ix_forgejo_issue_task_links_board_id"), + "forgejo_issue_task_links", + ["board_id"], + unique=False, + ) + op.create_index( + op.f("ix_forgejo_issue_task_links_created_by_user_id"), + "forgejo_issue_task_links", + ["created_by_user_id"], + unique=False, + ) + op.create_index( + op.f("ix_forgejo_issue_task_links_issue_id"), + "forgejo_issue_task_links", + ["issue_id"], + unique=False, + ) + op.create_index( + op.f("ix_forgejo_issue_task_links_organization_id"), + "forgejo_issue_task_links", + ["organization_id"], + unique=False, + ) + op.create_index( + op.f("ix_forgejo_issue_task_links_task_id"), + "forgejo_issue_task_links", + ["task_id"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index(op.f("ix_forgejo_issue_task_links_task_id"), table_name="forgejo_issue_task_links") + op.drop_index( + op.f("ix_forgejo_issue_task_links_organization_id"), + table_name="forgejo_issue_task_links", + ) + op.drop_index(op.f("ix_forgejo_issue_task_links_issue_id"), table_name="forgejo_issue_task_links") + op.drop_index( + op.f("ix_forgejo_issue_task_links_created_by_user_id"), + table_name="forgejo_issue_task_links", + ) + op.drop_index(op.f("ix_forgejo_issue_task_links_board_id"), table_name="forgejo_issue_task_links") + op.drop_table("forgejo_issue_task_links") + diff --git a/backend/tests/test_forgejo_issue_tasking_api.py b/backend/tests/test_forgejo_issue_tasking_api.py new file mode 100644 index 0000000..ef1e2b3 --- /dev/null +++ b/backend/tests/test_forgejo_issue_tasking_api.py @@ -0,0 +1,379 @@ +# ruff: noqa: INP001 +"""Integration tests for tasking Forgejo issues to Pipeline agents.""" + +from __future__ import annotations + +from datetime import datetime +from types import SimpleNamespace +from uuid import UUID, uuid4 + +import pytest +from fastapi import APIRouter, FastAPI +from httpx import ASGITransport, AsyncClient +from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine +from sqlmodel import SQLModel, select +from sqlmodel.ext.asyncio.session import AsyncSession + +from app import models as _models # noqa: F401 +from app.api.deps import require_org_member +from app.api.forgejo_issues import router as forgejo_issues_router +from app.core.auth import AuthContext, get_auth_context +from app.db.session import get_session +from app.models.activity_events import ActivityEvent +from app.models.agents import Agent +from app.models.board_repository_links import BoardRepositoryLink +from app.models.boards import Board +from app.models.forgejo_connections import ForgejoConnection +from app.models.forgejo_issue_task_links import ForgejoIssueTaskLink +from app.models.forgejo_issues import ForgejoIssue +from app.models.forgejo_repositories import ForgejoRepository +from app.models.gateways import Gateway +from app.models.organization_board_access import OrganizationBoardAccess +from app.models.organization_members import OrganizationMember +from app.models.organizations import Organization +from app.models.tasks import Task +from app.models.users import User +from app.services.organizations import OrganizationContext + + +async def _make_engine() -> AsyncEngine: + engine = create_async_engine("sqlite+aiosqlite:///:memory:") + async with engine.begin() as conn: + await conn.run_sync(SQLModel.metadata.create_all) + return engine + + +def _build_test_app( + session_maker: async_sessionmaker[AsyncSession], + *, + org_ctx: OrganizationContext, + auth_ctx: AuthContext, +) -> FastAPI: + app = FastAPI() + api_v1 = APIRouter(prefix="/api/v1") + api_v1.include_router(forgejo_issues_router) + app.include_router(api_v1) + + async def _override_get_session() -> AsyncSession: + async with session_maker() as session: + yield session + + async def _override_require_org_member() -> OrganizationContext: + return org_ctx + + async def _override_get_auth_context() -> AuthContext: + return auth_ctx + + app.dependency_overrides[get_session] = _override_get_session + app.dependency_overrides[require_org_member] = _override_require_org_member + app.dependency_overrides[get_auth_context] = _override_get_auth_context + return app + + +async def _seed(session: AsyncSession) -> SimpleNamespace: + org = Organization(id=uuid4(), name="Pipeline Org") + user = User( + id=uuid4(), + clerk_user_id="user_123", + email="user@example.com", + name="User", + active_organization_id=org.id, + ) + member = OrganizationMember( + id=uuid4(), + organization_id=org.id, + user_id=user.id, + role="member", + all_boards_read=False, + all_boards_write=False, + ) + connection = ForgejoConnection( + id=uuid4(), + organization_id=org.id, + name="Dream Forgejo", + base_url="https://forgejo.example.local", + token="temp-token", + token_last_eight="p-token", + ) + repository = ForgejoRepository( + id=uuid4(), + organization_id=org.id, + connection_id=connection.id, + owner="openclaw", + repo="pipeline", + display_name="Pipeline", + ) + board = Board( + id=uuid4(), + organization_id=org.id, + name="Pipeline Board", + slug="pipeline-board", + ) + board_access = OrganizationBoardAccess( + id=uuid4(), + organization_member_id=member.id, + board_id=board.id, + can_read=True, + can_write=True, + ) + link = BoardRepositoryLink( + id=uuid4(), + organization_id=org.id, + board_id=board.id, + repository_id=repository.id, + ) + issue = ForgejoIssue( + id=uuid4(), + organization_id=org.id, + repository_id=repository.id, + forgejo_issue_number=42, + title="Implement agent tasking", + body="Wire the backend so agents can work this issue.", + body_preview="Wire the backend", + state="open", + is_pull_request=False, + labels=[{"name": "backend"}, {"name": "agents"}], + assignees=[], + forgejo_payload={"number": 42, "state": "open"}, + author="kaspa", + html_url="https://forgejo.example.local/openclaw/pipeline/issues/42", + forgejo_created_at=datetime(2026, 5, 19, 12, 0, 0), + forgejo_updated_at=datetime(2026, 5, 19, 12, 5, 0), + ) + gateway = Gateway( + id=uuid4(), + organization_id=org.id, + name="Gateway", + url="https://gateway.example.local", + workspace_root="/workspace", + ) + worker_agent = Agent( + id=uuid4(), + board_id=board.id, + gateway_id=gateway.id, + name="Worker Agent", + is_board_lead=False, + ) + + session.add(org) + session.add(user) + session.add(member) + session.add(connection) + session.add(repository) + session.add(board) + session.add(board_access) + session.add(link) + session.add(issue) + session.add(gateway) + session.add(worker_agent) + await session.commit() + return SimpleNamespace( + org=org, + user=user, + member=member, + repository=repository, + board=board, + issue=issue, + gateway=gateway, + worker_agent=worker_agent, + ) + + +def _client_for( + session_maker: async_sessionmaker[AsyncSession], + seeded: SimpleNamespace, +) -> AsyncClient: + org_ctx = OrganizationContext(organization=seeded.org, member=seeded.member) + auth_ctx = AuthContext(actor_type="user", user=seeded.user) + app = _build_test_app(session_maker, org_ctx=org_ctx, auth_ctx=auth_ctx) + return AsyncClient(transport=ASGITransport(app=app), base_url="http://testserver") + + +@pytest.mark.asyncio +async def test_human_writer_can_create_task_from_issue_and_assign_agent() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + try: + async with session_maker() as session: + seeded = await _seed(session) + + async with _client_for(session_maker, seeded) as client: + response = await client.post( + f"/api/v1/forgejo/issues/{seeded.issue.id}/task", + json={ + "board_id": str(seeded.board.id), + "assigned_agent_id": str(seeded.worker_agent.id), + "instructions": "Start by reproducing the issue locally.", + }, + ) + + assert response.status_code == 200 + payload = response.json() + assert payload["success"] is True + assert payload["created"] is True + assert payload["issue_id"] == str(seeded.issue.id) + assert payload["board_id"] == str(seeded.board.id) + assert payload["assigned_agent_id"] == str(seeded.worker_agent.id) + assert payload["status"] == "in_progress" + assert payload["title"] == "Git issue #42: Implement agent tasking" + + async with session_maker() as session: + task = await session.get(Task, UUID(payload["task_id"])) + assert task is not None + assert task.board_id == seeded.board.id + assert task.assigned_agent_id == seeded.worker_agent.id + assert task.auto_created is True + assert task.auto_reason == f"forgejo_issue:{seeded.issue.id}" + assert task.in_progress_at is not None + assert task.description is not None + assert "Repository: Pipeline" in task.description + assert "Labels: backend, agents" in task.description + assert "Start by reproducing the issue locally." in task.description + + links = (await session.exec(select(ForgejoIssueTaskLink))).all() + assert len(links) == 1 + assert links[0].issue_id == seeded.issue.id + assert links[0].task_id == task.id + + events = (await session.exec(select(ActivityEvent))).all() + assert len(events) == 1 + assert events[0].event_type == "forgejo.issue.task.created" + assert events[0].board_id == seeded.board.id + assert events[0].task_id == task.id + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_tasking_same_issue_reuses_existing_task() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + try: + async with session_maker() as session: + seeded = await _seed(session) + + async with _client_for(session_maker, seeded) as client: + first = await client.post( + f"/api/v1/forgejo/issues/{seeded.issue.id}/task", + json={"board_id": str(seeded.board.id)}, + ) + second = await client.post( + f"/api/v1/forgejo/issues/{seeded.issue.id}/task", + json={ + "board_id": str(seeded.board.id), + "assigned_agent_id": str(seeded.worker_agent.id), + "priority": "high", + }, + ) + + assert first.status_code == 200 + assert second.status_code == 200 + first_payload = first.json() + second_payload = second.json() + assert first_payload["created"] is True + assert second_payload["created"] is False + assert second_payload["task_id"] == first_payload["task_id"] + assert second_payload["assigned_agent_id"] == str(seeded.worker_agent.id) + assert second_payload["status"] == "in_progress" + + async with session_maker() as session: + tasks = (await session.exec(select(Task))).all() + links = (await session.exec(select(ForgejoIssueTaskLink))).all() + events = (await session.exec(select(ActivityEvent))).all() + assert len(tasks) == 1 + assert len(links) == 1 + assert tasks[0].priority == "high" + assert len(events) == 2 + assert [event.event_type for event in events] == [ + "forgejo.issue.task.created", + "forgejo.issue.task.updated", + ] + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_tasking_issue_rejects_agent_from_another_board() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + try: + async with session_maker() as session: + seeded = await _seed(session) + other_board = Board( + id=uuid4(), + organization_id=seeded.org.id, + name="Other Board", + slug="other-board", + ) + other_agent = Agent( + id=uuid4(), + board_id=other_board.id, + gateway_id=seeded.gateway.id, + name="Other Agent", + ) + session.add(other_board) + session.add(other_agent) + await session.commit() + + async with _client_for(session_maker, seeded) as client: + response = await client.post( + f"/api/v1/forgejo/issues/{seeded.issue.id}/task", + json={ + "board_id": str(seeded.board.id), + "assigned_agent_id": str(other_agent.id), + }, + ) + + assert response.status_code == 422 + assert response.json()["detail"] == "Agent must belong to the selected board" + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_tasking_issue_requires_board_when_multiple_linked_boards_are_writable() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + try: + async with session_maker() as session: + seeded = await _seed(session) + second_board = Board( + id=uuid4(), + organization_id=seeded.org.id, + name="Second Board", + slug="second-board", + ) + session.add(second_board) + session.add( + OrganizationBoardAccess( + id=uuid4(), + organization_member_id=seeded.member.id, + board_id=second_board.id, + can_read=True, + can_write=True, + ) + ) + session.add( + BoardRepositoryLink( + id=uuid4(), + organization_id=seeded.org.id, + board_id=second_board.id, + repository_id=seeded.repository.id, + ) + ) + await session.commit() + + async with _client_for(session_maker, seeded) as client: + response = await client.post(f"/api/v1/forgejo/issues/{seeded.issue.id}/task") + + assert response.status_code == 422 + assert ( + response.json()["detail"] + == "board_id is required because this issue is linked to multiple writable boards" + ) + finally: + await engine.dispose()