Files
microdao-daarion/tests/test_job_orchestrator_tool.py
Apple 129e4ea1fc feat(platform): add new services, tools, tests and crews modules
New router intelligence modules (26 files): alert_ingest/store, audit_store,
architecture_pressure, backlog_generator/store, cost_analyzer, data_governance,
dependency_scanner, drift_analyzer, incident_* (5 files), llm_enrichment,
platform_priority_digest, provider_budget, release_check_runner, risk_* (6 files),
signature_state_store, sofiia_auto_router, tool_governance

New services:
- sofiia-console: Dockerfile, adapters/, monitor/nodes/ops/voice modules, launchd, react static
- memory-service: integration_endpoints, integrations, voice_endpoints, static UI
- aurora-service: full app suite (analysis, job_store, orchestrator, reporting, schemas, subagents)
- sofiia-supervisor: new supervisor service
- aistalk-bridge-lite: Telegram bridge lite
- calendar-service: CalDAV calendar service with reminders
- mlx-stt-service / mlx-tts-service: Apple Silicon speech services
- binance-bot-monitor: market monitor service
- node-worker: STT/TTS memory providers

New tools (9): agent_email, browser_tool, contract_tool, observability_tool,
oncall_tool, pr_reviewer_tool, repo_tool, safe_code_executor, secure_vault

New crews: agromatrix_crew (10 modules: depth_classifier, doc_facts, doc_focus,
farm_state, light_reply, llm_factory, memory_manager, proactivity, reflection_engine,
session_context, style_adapter, telemetry)

Tests: 85+ test files for all new modules
Made-with: Cursor
2026-03-03 07:14:14 -08:00

302 lines
9.2 KiB
Python

"""
Tests for Job Orchestrator Tool
"""
import pytest
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from services.router.tool_manager import ToolManager, ToolResult
class TestJobOrchestratorTool:
"""Test job orchestrator tool functionality"""
@pytest.mark.asyncio
async def test_list_tasks_returns_allowed_tasks(self):
"""Test that list_tasks returns only allowed tasks"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "list_tasks",
"params": {},
"agent_id": "sofiia"
})
assert result.success is True
assert "tasks" in result.result
assert result.result["count"] >= 0
@pytest.mark.asyncio
async def test_list_tasks_filter_by_tag(self):
"""Test filtering tasks by tag"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "list_tasks",
"params": {
"filter": {"tag": "smoke"}
},
"agent_id": "sofiia"
})
assert result.success is True
for task in result.result["tasks"]:
assert "smoke" in task.get("tags", [])
@pytest.mark.asyncio
async def test_list_tasks_filter_by_service(self):
"""Test filtering tasks by service"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "list_tasks",
"params": {
"filter": {"service": "gateway"}
},
"agent_id": "sofiia"
})
assert result.success is True
for task in result.result["tasks"]:
service = task.get("id", "")
assert "gateway" in service or "smoke" in service
@pytest.mark.asyncio
async def test_start_task_requires_task_id(self):
"""Test that start_task requires task_id"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "start_task",
"params": {},
"agent_id": "sofiia"
})
assert result.success is False
assert "task_id" in result.error.lower()
@pytest.mark.asyncio
async def test_start_task_unknown_task(self):
"""Test starting unknown task returns error"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "start_task",
"params": {
"task_id": "nonexistent_task"
},
"agent_id": "sofiia"
})
assert result.success is False
assert "not found" in result.error.lower()
@pytest.mark.asyncio
async def test_start_task_dry_run(self):
"""Test dry run returns execution plan without running"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "start_task",
"params": {
"task_id": "smoke_gateway",
"dry_run": True
},
"agent_id": "sofiia"
})
assert result.success is True
assert "execution_plan" in result.result
assert result.result["job"]["status"] == "dry_run"
assert result.result["message"] == "Dry run - no execution performed"
@pytest.mark.asyncio
async def test_start_task_with_inputs_validation(self):
"""Test input schema validation"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "start_task",
"params": {
"task_id": "drift_check_node1",
"inputs": {
"mode": "quick"
}
},
"agent_id": "sofiia"
})
assert result.success is True
@pytest.mark.asyncio
async def test_start_task_invalid_input(self):
"""Test that invalid inputs are rejected"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "start_task",
"params": {
"task_id": "drift_check_node1",
"inputs": {
"mode": "invalid_mode"
}
},
"agent_id": "sofiia"
})
assert result.success is False
assert "validation" in result.error.lower() or "invalid" in result.error.lower()
@pytest.mark.asyncio
async def test_start_task_missing_required_input(self):
"""Test that missing required inputs are rejected"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "start_task",
"params": {
"task_id": "drift_check_node1",
"inputs": {}
},
"agent_id": "sofiia"
})
assert result.success is False
assert "missing" in result.error.lower() or "required" in result.error.lower()
@pytest.mark.asyncio
async def test_get_job_requires_job_id(self):
"""Test that get_job requires job_id"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "get_job",
"params": {},
"agent_id": "sofiia"
})
assert result.success is False
assert "job_id" in result.error.lower()
@pytest.mark.asyncio
async def test_cancel_job_requires_job_id(self):
"""Test that cancel_job requires job_id"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "cancel_job",
"params": {},
"agent_id": "sofiia"
})
assert result.success is False
assert "job_id" in result.error.lower()
@pytest.mark.asyncio
async def test_cancel_job_allowed_for_admin(self):
"""Test that admin can cancel jobs"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "cancel_job",
"params": {
"job_id": "job-abc123",
"reason": "Testing cancellation"
},
"agent_id": "sofiia"
})
assert result.success is True
assert result.result["status"] == "canceled"
assert result.result["canceled_by"] == "sofiia"
@pytest.mark.asyncio
async def test_cancel_job_denied_for_non_admin(self):
"""Test that non-admin cannot cancel jobs"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "cancel_job",
"params": {
"job_id": "job-abc123",
"reason": "Testing"
},
"agent_id": "guest"
})
assert result.success is False
assert "only" in result.error.lower() or "admin" in result.error.lower()
@pytest.mark.asyncio
async def test_unknown_action_returns_error(self):
"""Test that unknown action returns error"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "unknown_action",
"params": {},
"agent_id": "sofiia"
})
assert result.success is False
assert "unknown action" in result.error.lower()
@pytest.mark.asyncio
async def test_task_has_required_fields(self):
"""Test that tasks have required fields"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "list_tasks",
"params": {},
"agent_id": "sofiia"
})
assert result.success is True
for task in result.result["tasks"]:
assert "id" in task
assert "title" in task
assert "tags" in task
@pytest.mark.asyncio
async def test_start_task_returns_job_record(self):
"""Test that start_task returns job record"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "start_task",
"params": {
"task_id": "smoke_gateway",
"dry_run": True
},
"agent_id": "sofiia"
})
assert result.success is True
assert "job" in result.result
job = result.result["job"]
assert "id" in job
assert "task_id" in job
assert "status" in job
assert "created_at" in job
@pytest.mark.asyncio
async def test_idempotency_key_supported(self):
"""Test that idempotency_key is supported"""
tool_mgr = ToolManager({})
result = await tool_mgr._job_orchestrator_tool({
"action": "start_task",
"params": {
"task_id": "smoke_gateway",
"idempotency_key": "unique-key-123"
},
"agent_id": "sofiia"
})
assert result.success is True
assert result.result["job"]["idempotency_key"] == "unique-key-123"