feat(scripts): tasking agents

This commit is contained in:
null 2026-05-22 00:12:15 -05:00
parent a985af3f4a
commit 74756feb95
6 changed files with 938 additions and 1 deletions

View File

@ -11,10 +11,16 @@ from sqlmodel import func, select
from app.api.deps import require_org_member from app.api.deps import require_org_member
from app.core.auth import AuthContext, get_auth_context from app.core.auth import AuthContext, get_auth_context
from app.core.time import utcnow
from app.db import crud from app.db import crud
from app.db.session import get_session from app.db.session import get_session
from app.models.agents import Agent
from app.models.board_repository_links import BoardRepositoryLink from app.models.board_repository_links import BoardRepositoryLink
from app.models.boards import Board
from app.models.forgejo_issue_task_links import ForgejoIssueTaskLink
from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
from app.models.tasks import Task
from app.schemas.forgejo_issues import ( from app.schemas.forgejo_issues import (
CloseIssueResponse, CloseIssueResponse,
EditIssueRequest, EditIssueRequest,
@ -22,6 +28,8 @@ from app.schemas.forgejo_issues import (
ForgejoIssueDetailRead, ForgejoIssueDetailRead,
ForgejoIssueListResponse, ForgejoIssueListResponse,
ForgejoIssueRead, ForgejoIssueRead,
ForgejoIssueTaskRequest,
ForgejoIssueTaskResponse,
PostCommentRequest, PostCommentRequest,
PostCommentResponse, PostCommentResponse,
) )
@ -54,6 +62,243 @@ AUTH_DEP = Depends(get_auth_context)
ORG_MEMBER_DEP = Depends(require_org_member) ORG_MEMBER_DEP = Depends(require_org_member)
async def _linked_issue_board_ids(
session: AsyncSession,
*,
issue: ForgejoIssue,
organization_id: UUID,
) -> list[UUID]:
rows = (
await session.exec(
select(BoardRepositoryLink.board_id).where(
BoardRepositoryLink.organization_id == organization_id,
BoardRepositoryLink.repository_id == issue.repository_id,
)
)
).all()
return list(rows)
async def _resolve_issue_task_board(
session: AsyncSession,
*,
issue: ForgejoIssue,
requested_board_id: UUID | None,
ctx: OrganizationContext,
) -> Board:
linked_board_ids = await _linked_issue_board_ids(
session,
issue=issue,
organization_id=ctx.organization.id,
)
if not linked_board_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Issue repository is not linked to any board",
)
allowed_board_ids = set(await list_accessible_board_ids(session, member=ctx.member, write=True))
authorized_board_ids = [
board_id for board_id in linked_board_ids if board_id in allowed_board_ids
]
if not authorized_board_ids:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board access denied",
)
if requested_board_id is None:
if len(authorized_board_ids) > 1:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="board_id is required because this issue is linked to multiple writable boards",
)
board_id = authorized_board_ids[0]
else:
if requested_board_id not in linked_board_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Issue repository is not linked to the requested board",
)
if requested_board_id not in allowed_board_ids:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board access denied",
)
board_id = requested_board_id
board = await crud.get_by_id(session, Board, board_id)
if board is None or board.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Board not found")
return board
async def _validate_issue_task_agent(
session: AsyncSession,
*,
board: Board,
assigned_agent_id: UUID | None,
) -> Agent | None:
if assigned_agent_id is None:
return None
agent = await crud.get_by_id(session, Agent, assigned_agent_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent not found")
if agent.board_id != board.id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Agent must belong to the selected board",
)
if board.gateway_id is not None and agent.gateway_id not in {None, board.gateway_id}:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Agent gateway does not match the selected board",
)
return agent
async def _issue_repository_name(
session: AsyncSession,
*,
issue: ForgejoIssue,
) -> str:
repository = await crud.get_by_id(session, ForgejoRepository, issue.repository_id)
if repository is None:
return str(issue.repository_id)
return repository.display_name or f"{repository.owner}/{repository.repo}"
def _task_status_for_issue_task(
payload: ForgejoIssueTaskRequest,
*,
assigned_agent_id: UUID | None,
) -> str:
if payload.status is not None:
return payload.status
if assigned_agent_id is not None and payload.start_immediately:
return "in_progress"
return "inbox"
def _issue_task_title(issue: ForgejoIssue) -> str:
return f"Git issue #{issue.forgejo_issue_number}: {issue.title}"
def _label_names(issue: ForgejoIssue) -> str:
names = [
str(label.get("name"))
for label in issue.labels
if isinstance(label, dict) and label.get("name")
]
return ", ".join(names)
def _issue_task_description(
*,
issue: ForgejoIssue,
repository_name: str,
instructions: str | None,
) -> str:
parts = [
"Source: Forgejo issue",
f"Repository: {repository_name}",
f"Issue: #{issue.forgejo_issue_number}",
f"URL: {issue.html_url}",
f"State: {issue.state}",
]
labels = _label_names(issue)
if labels:
parts.append(f"Labels: {labels}")
if issue.author:
parts.append(f"Author: {issue.author}")
if instructions and instructions.strip():
parts.append(f"\nInstructions:\n{instructions.strip()}")
if issue.body:
parts.append(f"\nIssue body:\n{issue.body.strip()}")
elif issue.body_preview:
parts.append(f"\nIssue preview:\n{issue.body_preview.strip()}")
return "\n".join(parts)
async def _existing_issue_task(
session: AsyncSession,
*,
issue_id: UUID,
board_id: UUID,
) -> tuple[ForgejoIssueTaskLink, Task] | None:
link = (
await session.exec(
select(ForgejoIssueTaskLink).where(
ForgejoIssueTaskLink.issue_id == issue_id,
ForgejoIssueTaskLink.board_id == board_id,
)
)
).first()
if link is None:
return None
task = await crud.get_by_id(session, Task, link.task_id)
if task is None:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Issue task link points to a missing task",
)
return link, task
async def _notify_issue_task_assignee(
*,
session: AsyncSession,
board: Board,
task: Task,
agent: Agent | None,
) -> None:
if agent is None:
return
from app.api.tasks import notify_agent_on_task_assign
await notify_agent_on_task_assign(
session=session,
board=board,
task=task,
agent=agent,
)
def _normalize_issue_task_priority(priority: str) -> str:
normalized = priority.strip().lower()
return normalized or "medium"
def _set_issue_task_status_fields(task: Task, status_value: str) -> None:
previous_status = task.status
task.status = status_value
if status_value == "in_progress":
if task.in_progress_at is None or previous_status != "in_progress":
task.in_progress_at = utcnow()
return
if status_value == "inbox":
task.in_progress_at = None
def _issue_task_response(
*,
created: bool,
issue: ForgejoIssue,
task: Task,
board_id: UUID,
) -> ForgejoIssueTaskResponse:
return ForgejoIssueTaskResponse(
success=True,
created=created,
issue_id=issue.id,
task_id=task.id,
board_id=board_id,
assigned_agent_id=task.assigned_agent_id,
status=task.status,
title=task.title,
)
@router.get("", response_model=ForgejoIssueListResponse) @router.get("", response_model=ForgejoIssueListResponse)
async def list_issues( async def list_issues(
session: AsyncSession = SESSION_DEP, session: AsyncSession = SESSION_DEP,
@ -214,6 +459,170 @@ async def get_issue(
return ForgejoIssueDetailRead.model_validate(issue) return ForgejoIssueDetailRead.model_validate(issue)
@router.post(
"/{issue_id}/task",
response_model=ForgejoIssueTaskResponse,
summary="Create or update a Pipeline task for a Forgejo issue",
responses={
status.HTTP_200_OK: {"description": "Task created or existing task updated"},
status.HTTP_401_UNAUTHORIZED: {"description": "User authentication required"},
status.HTTP_403_FORBIDDEN: {"description": "User lacks board write access"},
status.HTTP_404_NOT_FOUND: {"description": "Issue, board, or agent not found"},
status.HTTP_409_CONFLICT: {"description": "Existing link points to a missing task"},
status.HTTP_422_UNPROCESSABLE_CONTENT: {
"description": "Invalid issue, board, agent, or tasking request"
},
},
)
async def task_issue(
issue_id: str,
payload: ForgejoIssueTaskRequest | None = None,
session: AsyncSession = SESSION_DEP,
auth: AuthContext = AUTH_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> ForgejoIssueTaskResponse:
"""Create or update a board task from a cached Forgejo issue."""
payload = payload or ForgejoIssueTaskRequest()
try:
uuid = UUID(issue_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Invalid issue_id format",
)
issue = await crud.get_by_id(session, ForgejoIssue, uuid)
if issue is None or issue.organization_id != ctx.organization.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Issue not found")
if issue.is_pull_request:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Pull requests cannot be tasked through the issue tasking endpoint",
)
if auth.user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
board = await _resolve_issue_task_board(
session,
issue=issue,
requested_board_id=payload.board_id,
ctx=ctx,
)
agent_requested = "assigned_agent_id" in payload.model_fields_set
status_requested = "status" in payload.model_fields_set
instructions_requested = "instructions" in payload.model_fields_set
priority_requested = "priority" in payload.model_fields_set
agent = await _validate_issue_task_agent(
session,
board=board,
assigned_agent_id=payload.assigned_agent_id if agent_requested else None,
)
existing = await _existing_issue_task(session, issue_id=issue.id, board_id=board.id)
if existing is not None:
_link, task = existing
previous_assignee_id = task.assigned_agent_id
previous_status = task.status
if agent_requested:
task.assigned_agent_id = agent.id if agent is not None else None
if status_requested:
_set_issue_task_status_fields(task, payload.status or "inbox")
elif agent_requested and agent is not None and payload.start_immediately:
_set_issue_task_status_fields(task, "in_progress")
elif agent_requested and agent is None:
_set_issue_task_status_fields(task, "inbox")
if priority_requested:
task.priority = _normalize_issue_task_priority(payload.priority)
if instructions_requested:
repository_name = await _issue_repository_name(session, issue=issue)
task.description = _issue_task_description(
issue=issue,
repository_name=repository_name,
instructions=payload.instructions,
)
task.updated_at = utcnow()
session.add(task)
record_activity(
session,
event_type="forgejo.issue.task.updated",
task_id=task.id,
board_id=board.id,
message=f"Forgejo issue task updated: #{issue.forgejo_issue_number}.",
)
await session.commit()
await session.refresh(task)
notify_agent = agent
if notify_agent is None and task.assigned_agent_id and task.status == "in_progress":
current_agent = await crud.get_by_id(session, Agent, task.assigned_agent_id)
if current_agent is not None and current_agent.board_id == board.id:
notify_agent = current_agent
should_notify = (
notify_agent is not None
and task.assigned_agent_id is not None
and (
task.assigned_agent_id != previous_assignee_id
or (previous_status != "in_progress" and task.status == "in_progress")
)
)
if notify_agent is not None and should_notify:
await _notify_issue_task_assignee(
session=session,
board=board,
task=task,
agent=notify_agent,
)
return _issue_task_response(created=False, issue=issue, task=task, board_id=board.id)
status_value = _task_status_for_issue_task(
payload,
assigned_agent_id=agent.id if agent is not None else None,
)
repository_name = await _issue_repository_name(session, issue=issue)
task = Task(
board_id=board.id,
title=_issue_task_title(issue),
description=_issue_task_description(
issue=issue,
repository_name=repository_name,
instructions=payload.instructions,
),
status=status_value,
priority=_normalize_issue_task_priority(payload.priority),
assigned_agent_id=agent.id if agent is not None else None,
created_by_user_id=auth.user.id,
auto_created=True,
auto_reason=f"forgejo_issue:{issue.id}",
in_progress_at=utcnow() if status_value == "in_progress" else None,
)
session.add(task)
await session.flush()
session.add(
ForgejoIssueTaskLink(
organization_id=ctx.organization.id,
board_id=board.id,
issue_id=issue.id,
task_id=task.id,
created_by_user_id=auth.user.id,
)
)
record_activity(
session,
event_type="forgejo.issue.task.created",
task_id=task.id,
board_id=board.id,
message=f"Forgejo issue task created: #{issue.forgejo_issue_number}.",
)
await session.commit()
await session.refresh(task)
await _notify_issue_task_assignee(
session=session,
board=board,
task=task,
agent=agent,
)
return _issue_task_response(created=True, issue=issue, task=task, board_id=board.id)
@router.post( @router.post(
"/{issue_id}/close", "/{issue_id}/close",
response_model=CloseIssueResponse, response_model=CloseIssueResponse,

View File

@ -13,6 +13,7 @@ from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board from app.models.boards import Board
from app.models.forgejo_connections import ForgejoConnection from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_issue_task_links import ForgejoIssueTaskLink
from app.models.forgejo_issues import ForgejoIssue from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository from app.models.forgejo_repositories import ForgejoRepository
from app.models.gateways import Gateway from app.models.gateways import Gateway
@ -49,6 +50,7 @@ __all__ = [
"Board", "Board",
"BoardRepositoryLink", "BoardRepositoryLink",
"ForgejoConnection", "ForgejoConnection",
"ForgejoIssueTaskLink",
"ForgejoIssue", "ForgejoIssue",
"ForgejoRepository", "ForgejoRepository",
"Gateway", "Gateway",

View File

@ -0,0 +1,39 @@
"""Links between cached Forgejo issues and Pipeline tasks."""
from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
from sqlalchemy import UniqueConstraint
from sqlmodel import Field
from app.core.time import utcnow
from app.models.tenancy import TenantScoped
RUNTIME_ANNOTATION_TYPES = (datetime,)
class ForgejoIssueTaskLink(TenantScoped, table=True):
"""Board-scoped Pipeline task created from a cached Forgejo issue."""
__tablename__ = "forgejo_issue_task_links" # pyright: ignore[reportAssignmentType]
__table_args__ = (
UniqueConstraint(
"board_id",
"issue_id",
name="uq_forgejo_issue_task_links_board_issue",
),
UniqueConstraint(
"task_id",
name="uq_forgejo_issue_task_links_task",
),
)
id: UUID = Field(default_factory=uuid4, primary_key=True)
organization_id: UUID = Field(foreign_key="organizations.id", index=True)
board_id: UUID = Field(foreign_key="boards.id", index=True)
issue_id: UUID = Field(foreign_key="forgejo_issues.id", index=True)
task_id: UUID = Field(foreign_key="tasks.id", index=True)
created_by_user_id: UUID | None = Field(default=None, foreign_key="users.id", index=True)
created_at: datetime = Field(default_factory=utcnow)

View File

@ -3,7 +3,7 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any, Literal
from uuid import UUID from uuid import UUID
from sqlmodel import SQLModel from sqlmodel import SQLModel
@ -115,3 +115,27 @@ class EditIssueResponse(SQLModel):
body: str | None = None body: str | None = None
state: str state: str
forgejo_updated_at: str forgejo_updated_at: str
class ForgejoIssueTaskRequest(SQLModel):
"""Request to create or update a Pipeline task for a Forgejo issue."""
board_id: UUID | None = None
assigned_agent_id: UUID | None = None
priority: str = "medium"
status: Literal["inbox", "in_progress"] | None = None
start_immediately: bool = True
instructions: str | None = None
class ForgejoIssueTaskResponse(SQLModel):
"""Response for creating or finding a Pipeline task from a Forgejo issue."""
success: bool
created: bool
issue_id: UUID
task_id: UUID
board_id: UUID
assigned_agent_id: UUID | None = None
status: str
title: str

View File

@ -0,0 +1,84 @@
"""add Forgejo issue to task links
Revision ID: d4e5f6a7b8c9
Revises: d3f4a5b6c7d8
Create Date: 2026-05-22 00:00:00.000000
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
revision = "d4e5f6a7b8c9"
down_revision = "d3f4a5b6c7d8"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"forgejo_issue_task_links",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("organization_id", sa.Uuid(), nullable=False),
sa.Column("board_id", sa.Uuid(), nullable=False),
sa.Column("issue_id", sa.Uuid(), nullable=False),
sa.Column("task_id", sa.Uuid(), nullable=False),
sa.Column("created_by_user_id", sa.Uuid(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["board_id"], ["boards.id"]),
sa.ForeignKeyConstraint(["created_by_user_id"], ["users.id"]),
sa.ForeignKeyConstraint(["issue_id"], ["forgejo_issues.id"]),
sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"]),
sa.ForeignKeyConstraint(["task_id"], ["tasks.id"]),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("board_id", "issue_id", name="uq_forgejo_issue_task_links_board_issue"),
sa.UniqueConstraint("task_id", name="uq_forgejo_issue_task_links_task"),
)
op.create_index(
op.f("ix_forgejo_issue_task_links_board_id"),
"forgejo_issue_task_links",
["board_id"],
unique=False,
)
op.create_index(
op.f("ix_forgejo_issue_task_links_created_by_user_id"),
"forgejo_issue_task_links",
["created_by_user_id"],
unique=False,
)
op.create_index(
op.f("ix_forgejo_issue_task_links_issue_id"),
"forgejo_issue_task_links",
["issue_id"],
unique=False,
)
op.create_index(
op.f("ix_forgejo_issue_task_links_organization_id"),
"forgejo_issue_task_links",
["organization_id"],
unique=False,
)
op.create_index(
op.f("ix_forgejo_issue_task_links_task_id"),
"forgejo_issue_task_links",
["task_id"],
unique=False,
)
def downgrade() -> None:
op.drop_index(op.f("ix_forgejo_issue_task_links_task_id"), table_name="forgejo_issue_task_links")
op.drop_index(
op.f("ix_forgejo_issue_task_links_organization_id"),
table_name="forgejo_issue_task_links",
)
op.drop_index(op.f("ix_forgejo_issue_task_links_issue_id"), table_name="forgejo_issue_task_links")
op.drop_index(
op.f("ix_forgejo_issue_task_links_created_by_user_id"),
table_name="forgejo_issue_task_links",
)
op.drop_index(op.f("ix_forgejo_issue_task_links_board_id"), table_name="forgejo_issue_task_links")
op.drop_table("forgejo_issue_task_links")

View File

@ -0,0 +1,379 @@
# ruff: noqa: INP001
"""Integration tests for tasking Forgejo issues to Pipeline agents."""
from __future__ import annotations
from datetime import datetime
from types import SimpleNamespace
from uuid import UUID, uuid4
import pytest
from fastapi import APIRouter, FastAPI
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlmodel import SQLModel, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app import models as _models # noqa: F401
from app.api.deps import require_org_member
from app.api.forgejo_issues import router as forgejo_issues_router
from app.core.auth import AuthContext, get_auth_context
from app.db.session import get_session
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.board_repository_links import BoardRepositoryLink
from app.models.boards import Board
from app.models.forgejo_connections import ForgejoConnection
from app.models.forgejo_issue_task_links import ForgejoIssueTaskLink
from app.models.forgejo_issues import ForgejoIssue
from app.models.forgejo_repositories import ForgejoRepository
from app.models.gateways import Gateway
from app.models.organization_board_access import OrganizationBoardAccess
from app.models.organization_members import OrganizationMember
from app.models.organizations import Organization
from app.models.tasks import Task
from app.models.users import User
from app.services.organizations import OrganizationContext
async def _make_engine() -> AsyncEngine:
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
return engine
def _build_test_app(
session_maker: async_sessionmaker[AsyncSession],
*,
org_ctx: OrganizationContext,
auth_ctx: AuthContext,
) -> FastAPI:
app = FastAPI()
api_v1 = APIRouter(prefix="/api/v1")
api_v1.include_router(forgejo_issues_router)
app.include_router(api_v1)
async def _override_get_session() -> AsyncSession:
async with session_maker() as session:
yield session
async def _override_require_org_member() -> OrganizationContext:
return org_ctx
async def _override_get_auth_context() -> AuthContext:
return auth_ctx
app.dependency_overrides[get_session] = _override_get_session
app.dependency_overrides[require_org_member] = _override_require_org_member
app.dependency_overrides[get_auth_context] = _override_get_auth_context
return app
async def _seed(session: AsyncSession) -> SimpleNamespace:
org = Organization(id=uuid4(), name="Pipeline Org")
user = User(
id=uuid4(),
clerk_user_id="user_123",
email="user@example.com",
name="User",
active_organization_id=org.id,
)
member = OrganizationMember(
id=uuid4(),
organization_id=org.id,
user_id=user.id,
role="member",
all_boards_read=False,
all_boards_write=False,
)
connection = ForgejoConnection(
id=uuid4(),
organization_id=org.id,
name="Dream Forgejo",
base_url="https://forgejo.example.local",
token="temp-token",
token_last_eight="p-token",
)
repository = ForgejoRepository(
id=uuid4(),
organization_id=org.id,
connection_id=connection.id,
owner="openclaw",
repo="pipeline",
display_name="Pipeline",
)
board = Board(
id=uuid4(),
organization_id=org.id,
name="Pipeline Board",
slug="pipeline-board",
)
board_access = OrganizationBoardAccess(
id=uuid4(),
organization_member_id=member.id,
board_id=board.id,
can_read=True,
can_write=True,
)
link = BoardRepositoryLink(
id=uuid4(),
organization_id=org.id,
board_id=board.id,
repository_id=repository.id,
)
issue = ForgejoIssue(
id=uuid4(),
organization_id=org.id,
repository_id=repository.id,
forgejo_issue_number=42,
title="Implement agent tasking",
body="Wire the backend so agents can work this issue.",
body_preview="Wire the backend",
state="open",
is_pull_request=False,
labels=[{"name": "backend"}, {"name": "agents"}],
assignees=[],
forgejo_payload={"number": 42, "state": "open"},
author="kaspa",
html_url="https://forgejo.example.local/openclaw/pipeline/issues/42",
forgejo_created_at=datetime(2026, 5, 19, 12, 0, 0),
forgejo_updated_at=datetime(2026, 5, 19, 12, 5, 0),
)
gateway = Gateway(
id=uuid4(),
organization_id=org.id,
name="Gateway",
url="https://gateway.example.local",
workspace_root="/workspace",
)
worker_agent = Agent(
id=uuid4(),
board_id=board.id,
gateway_id=gateway.id,
name="Worker Agent",
is_board_lead=False,
)
session.add(org)
session.add(user)
session.add(member)
session.add(connection)
session.add(repository)
session.add(board)
session.add(board_access)
session.add(link)
session.add(issue)
session.add(gateway)
session.add(worker_agent)
await session.commit()
return SimpleNamespace(
org=org,
user=user,
member=member,
repository=repository,
board=board,
issue=issue,
gateway=gateway,
worker_agent=worker_agent,
)
def _client_for(
session_maker: async_sessionmaker[AsyncSession],
seeded: SimpleNamespace,
) -> AsyncClient:
org_ctx = OrganizationContext(organization=seeded.org, member=seeded.member)
auth_ctx = AuthContext(actor_type="user", user=seeded.user)
app = _build_test_app(session_maker, org_ctx=org_ctx, auth_ctx=auth_ctx)
return AsyncClient(transport=ASGITransport(app=app), base_url="http://testserver")
@pytest.mark.asyncio
async def test_human_writer_can_create_task_from_issue_and_assign_agent() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with session_maker() as session:
seeded = await _seed(session)
async with _client_for(session_maker, seeded) as client:
response = await client.post(
f"/api/v1/forgejo/issues/{seeded.issue.id}/task",
json={
"board_id": str(seeded.board.id),
"assigned_agent_id": str(seeded.worker_agent.id),
"instructions": "Start by reproducing the issue locally.",
},
)
assert response.status_code == 200
payload = response.json()
assert payload["success"] is True
assert payload["created"] is True
assert payload["issue_id"] == str(seeded.issue.id)
assert payload["board_id"] == str(seeded.board.id)
assert payload["assigned_agent_id"] == str(seeded.worker_agent.id)
assert payload["status"] == "in_progress"
assert payload["title"] == "Git issue #42: Implement agent tasking"
async with session_maker() as session:
task = await session.get(Task, UUID(payload["task_id"]))
assert task is not None
assert task.board_id == seeded.board.id
assert task.assigned_agent_id == seeded.worker_agent.id
assert task.auto_created is True
assert task.auto_reason == f"forgejo_issue:{seeded.issue.id}"
assert task.in_progress_at is not None
assert task.description is not None
assert "Repository: Pipeline" in task.description
assert "Labels: backend, agents" in task.description
assert "Start by reproducing the issue locally." in task.description
links = (await session.exec(select(ForgejoIssueTaskLink))).all()
assert len(links) == 1
assert links[0].issue_id == seeded.issue.id
assert links[0].task_id == task.id
events = (await session.exec(select(ActivityEvent))).all()
assert len(events) == 1
assert events[0].event_type == "forgejo.issue.task.created"
assert events[0].board_id == seeded.board.id
assert events[0].task_id == task.id
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_tasking_same_issue_reuses_existing_task() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with session_maker() as session:
seeded = await _seed(session)
async with _client_for(session_maker, seeded) as client:
first = await client.post(
f"/api/v1/forgejo/issues/{seeded.issue.id}/task",
json={"board_id": str(seeded.board.id)},
)
second = await client.post(
f"/api/v1/forgejo/issues/{seeded.issue.id}/task",
json={
"board_id": str(seeded.board.id),
"assigned_agent_id": str(seeded.worker_agent.id),
"priority": "high",
},
)
assert first.status_code == 200
assert second.status_code == 200
first_payload = first.json()
second_payload = second.json()
assert first_payload["created"] is True
assert second_payload["created"] is False
assert second_payload["task_id"] == first_payload["task_id"]
assert second_payload["assigned_agent_id"] == str(seeded.worker_agent.id)
assert second_payload["status"] == "in_progress"
async with session_maker() as session:
tasks = (await session.exec(select(Task))).all()
links = (await session.exec(select(ForgejoIssueTaskLink))).all()
events = (await session.exec(select(ActivityEvent))).all()
assert len(tasks) == 1
assert len(links) == 1
assert tasks[0].priority == "high"
assert len(events) == 2
assert [event.event_type for event in events] == [
"forgejo.issue.task.created",
"forgejo.issue.task.updated",
]
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_tasking_issue_rejects_agent_from_another_board() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with session_maker() as session:
seeded = await _seed(session)
other_board = Board(
id=uuid4(),
organization_id=seeded.org.id,
name="Other Board",
slug="other-board",
)
other_agent = Agent(
id=uuid4(),
board_id=other_board.id,
gateway_id=seeded.gateway.id,
name="Other Agent",
)
session.add(other_board)
session.add(other_agent)
await session.commit()
async with _client_for(session_maker, seeded) as client:
response = await client.post(
f"/api/v1/forgejo/issues/{seeded.issue.id}/task",
json={
"board_id": str(seeded.board.id),
"assigned_agent_id": str(other_agent.id),
},
)
assert response.status_code == 422
assert response.json()["detail"] == "Agent must belong to the selected board"
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_tasking_issue_requires_board_when_multiple_linked_boards_are_writable() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
try:
async with session_maker() as session:
seeded = await _seed(session)
second_board = Board(
id=uuid4(),
organization_id=seeded.org.id,
name="Second Board",
slug="second-board",
)
session.add(second_board)
session.add(
OrganizationBoardAccess(
id=uuid4(),
organization_member_id=seeded.member.id,
board_id=second_board.id,
can_read=True,
can_write=True,
)
)
session.add(
BoardRepositoryLink(
id=uuid4(),
organization_id=seeded.org.id,
board_id=second_board.id,
repository_id=seeded.repository.id,
)
)
await session.commit()
async with _client_for(session_maker, seeded) as client:
response = await client.post(f"/api/v1/forgejo/issues/{seeded.issue.id}/task")
assert response.status_code == 422
assert (
response.json()["detail"]
== "board_id is required because this issue is linked to multiple writable boards"
)
finally:
await engine.dispose()