# ruff: noqa: INP001 """Unit tests for cron_status service helpers. All tests are pure-Python — no gateway connection required. """ from __future__ import annotations import pytest from app.services.openclaw.cron_status import ( CronJob, parse_cron_jobs, compute_job_status, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- FIXTURE_NORMAL = [ { "name": "sync-agents", "schedule": "*/5 * * * *", "enabled": True, "lastRun": "2026-05-21T10:00:00Z", "nextRun": "2026-05-21T10:05:00Z", "lastDuration": 1234, "lastStatus": "success", }, { "name": "cleanup-sessions", "schedule": "0 3 * * *", "enabled": False, "lastRun": None, "nextRun": None, "lastDuration": None, "lastStatus": None, }, ] FIXTURE_FAILED_JOB = [ { "name": "broken-task", "schedule": "*/10 * * * *", "enabled": True, "lastRun": "2026-05-21T09:50:00Z", "nextRun": "2026-05-21T10:00:00Z", "lastDuration": 5000, "lastStatus": "error", "lastError": "connection refused", }, ] FIXTURE_ALT_KEYS = [ { "id": "job-1", "title": "my-job", "active": True, "last_run": "2026-05-21T08:00:00Z", "next_run": "2026-05-21T09:00:00Z", "duration_ms": 800, "result": "ok", } ] FIXTURE_EMPTY = [] FIXTURE_GARBAGE = "not a list" FIXTURE_NONE = None # --------------------------------------------------------------------------- # parse_cron_jobs # --------------------------------------------------------------------------- class TestParseCronJobs: def test_normal_jobs_parsed(self): jobs = parse_cron_jobs(FIXTURE_NORMAL) assert len(jobs) == 2 def test_job_name_extracted(self): jobs = parse_cron_jobs(FIXTURE_NORMAL) assert jobs[0].name == "sync-agents" def test_schedule_extracted(self): jobs = parse_cron_jobs(FIXTURE_NORMAL) assert jobs[0].schedule == "*/5 * * * *" def test_enabled_flag(self): jobs = parse_cron_jobs(FIXTURE_NORMAL) assert jobs[0].enabled is True assert jobs[1].enabled is False def test_last_run_parsed(self): jobs = parse_cron_jobs(FIXTURE_NORMAL) assert jobs[0].last_run is not None assert "2026" in jobs[0].last_run def test_last_run_none_when_absent(self): jobs = parse_cron_jobs(FIXTURE_NORMAL) assert jobs[1].last_run is None def test_duration_ms(self): jobs = parse_cron_jobs(FIXTURE_NORMAL) assert jobs[0].last_duration_ms == 1234 def test_status_success(self): jobs = parse_cron_jobs(FIXTURE_NORMAL) assert jobs[0].last_status == "success" def test_status_none_when_never_run(self): jobs = parse_cron_jobs(FIXTURE_NORMAL) assert jobs[1].last_status is None def test_failed_job(self): jobs = parse_cron_jobs(FIXTURE_FAILED_JOB) assert jobs[0].last_status == "error" assert jobs[0].last_error is not None def test_alt_key_names(self): jobs = parse_cron_jobs(FIXTURE_ALT_KEYS) assert len(jobs) == 1 assert jobs[0].name == "my-job" assert jobs[0].enabled is True assert jobs[0].last_duration_ms == 800 def test_empty_list(self): assert parse_cron_jobs(FIXTURE_EMPTY) == [] def test_garbage_input_returns_empty(self): assert parse_cron_jobs(FIXTURE_GARBAGE) == [] def test_none_input_returns_empty(self): assert parse_cron_jobs(FIXTURE_NONE) == [] def test_non_dict_entries_skipped(self): jobs = parse_cron_jobs([FIXTURE_NORMAL[0], "bad", 42, None]) assert len(jobs) == 1 def test_missing_name_skipped(self): jobs = parse_cron_jobs([{"schedule": "* * * * *", "enabled": True}]) assert len(jobs) == 0 def test_returns_cron_job_instances(self): jobs = parse_cron_jobs(FIXTURE_NORMAL) assert all(isinstance(j, CronJob) for j in jobs) # --------------------------------------------------------------------------- # compute_job_status # --------------------------------------------------------------------------- class TestComputeJobStatus: def test_disabled_returns_disabled(self): job = CronJob(name="j", schedule="* * *", enabled=False) assert compute_job_status(job) == "disabled" def test_never_run_returns_pending(self): job = CronJob(name="j", schedule="* * *", enabled=True, last_run=None) assert compute_job_status(job) == "pending" def test_success_returns_ok(self): job = CronJob(name="j", schedule="* * *", enabled=True, last_run="2026-05-21T10:00:00Z", last_status="success") assert compute_job_status(job) == "ok" def test_error_returns_error(self): job = CronJob(name="j", schedule="* * *", enabled=True, last_run="2026-05-21T10:00:00Z", last_status="error") assert compute_job_status(job) == "error" def test_unknown_status_returns_unknown(self): job = CronJob(name="j", schedule="* * *", enabled=True, last_run="2026-05-21T10:00:00Z", last_status="running") assert compute_job_status(job) in {"running", "unknown", "ok"}