BillTracker/scripts/pipeline-report.py

90 lines
2.7 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""Report the current project/task to the Pipeline dashboard.
Usage:
python report-project.py --project Pipeline --task "fixing bug" --status busy
Required environment variables:
PIPELINE_BOT_KEY API key generated from Settings Bot API keys
PIPELINE_URL Pipeline base URL (default: http://localhost:8001)
Optional environment variables:
PIPELINE_REPORT_TIMEOUT Request timeout in seconds (default: 3)
Exit codes:
0 success or env vars not configured (non-disruptive)
1 HTTP error or unexpected failure
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import urllib.error
import urllib.request
def main() -> int:
parser = argparse.ArgumentParser(
description="Report current project/task to Pipeline dashboard."
)
parser.add_argument("--project", default=None, help="Project name, e.g. Pipeline")
parser.add_argument("--task", default=None, help="Current task description")
parser.add_argument(
"--status",
default="busy",
choices=["busy", "idle", "error"],
help="Agent status (default: busy)",
)
parser.add_argument("--detail", default=None, help="Extra JSON detail string")
args = parser.parse_args()
bot_key = os.environ.get("PIPELINE_BOT_KEY", "").strip()
base_url = os.environ.get("PIPELINE_URL", "http://localhost:8001").strip().rstrip("/")
timeout = float(os.environ.get("PIPELINE_REPORT_TIMEOUT", "3"))
if not bot_key:
# Not configured — exit silently so the agent isn't disrupted.
return 0
payload: dict[str, object] = {
"project": args.project,
"task": args.task,
"status": args.status,
}
if args.detail:
try:
payload["detail"] = json.loads(args.detail)
except json.JSONDecodeError:
payload["detail"] = {"raw": args.detail}
body = json.dumps(payload).encode("utf-8")
url = f"{base_url}/api/v1/bot/report"
req = urllib.request.Request(
url,
data=body,
method="POST",
headers={
"X-Bot-Key": bot_key,
"Content-Type": "application/json",
"User-Agent": "PipelineBotReport/1.0",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout):
pass
except urllib.error.HTTPError as exc:
print(f"report-project: HTTP {exc.code} from {url}", file=sys.stderr)
return 1
except (urllib.error.URLError, OSError) as exc:
print(f"report-project: connection error: {exc}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())