import sys import os import json import subprocess from pathlib import Path from crewai import Crew, Task from crews.agromatrix_crew.agents.stepan_orchestrator import build_stepan from crews.agromatrix_crew.agents.operations_agent import build_operations from crews.agromatrix_crew.agents.iot_agent import build_iot from crews.agromatrix_crew.agents.platform_agent import build_platform from crews.agromatrix_crew.agents.spreadsheet_agent import build_spreadsheet from crews.agromatrix_crew.agents.sustainability_agent import build_sustainability from crews.agromatrix_crew.audit import audit_event, new_trace from agromatrix_tools import tool_dictionary from agromatrix_tools import tool_operation_plan from crews.agromatrix_crew.operator_commands import route_operator_command, route_operator_text def farmos_ui_hint(): port = os.getenv('FARMOS_UI_PORT', '18080') try: out = subprocess.check_output(['docker','ps','--format','{{.Names}}'], text=True) if 'farmos_ui_proxy' in out: return "\n[UI] farmOS доступний локально: http://127.0.0.1:{} (basic auth)".format(port) except Exception: return "" return "" def detect_intent(text: str) -> str: t = text.lower() if 'сплануй' in t and 'тиж' in t: return 'plan_week' if 'сплануй' in t: return 'plan_day' if 'критично' in t or 'на завтра' in t: return 'show_critical_tomorrow' if 'план/факт' in t or 'план факт' in t: return 'plan_vs_fact' if 'закрий план' in t: return 'close_plan' return 'general' def validate_payload(obj: dict): required = ['status', 'summary', 'artifacts', 'tool_calls', 'next_actions'] for k in required: if k not in obj: return False, f'missing:{k}' if obj['status'] not in ['ok', 'error']: return False, 'bad_status' if not isinstance(obj['summary'], str): return False, 'summary_not_string' if not isinstance(obj['artifacts'], list): return False, 'artifacts_not_list' if not isinstance(obj['tool_calls'], list): return False, 'tool_calls_not_list' if not isinstance(obj['next_actions'], list): return False, 'next_actions_not_list' return True, 'ok' def run_task_with_retry(agent, description: str, trace_id: str, max_retries: int = 2): instruction = "Return ONLY valid JSON matching schema in crews/agromatrix_crew/schema.json. No extra text." last_error = '' for attempt in range(max_retries + 1): desc = description if attempt == 0 else (description + "\n\n" + instruction) task = Task( description=desc, expected_output="JSON strictly matching schema.json", agent=agent ) crew = Crew(agents=[agent], tasks=[task], verbose=True) result = crew.kickoff() raw = str(result) try: data = json.loads(raw) ok, reason = validate_payload(data) if ok: return data last_error = reason except Exception as e: last_error = f'json_error:{e}' audit_event({ 'trace_id': trace_id, 'agent': agent.role, 'action': 'json_validation_failed', 'error': last_error }) return { 'status': 'error', 'summary': f'JSON validation failed: {last_error}', 'artifacts': [], 'tool_calls': [], 'next_actions': [] } def handle_message(text: str, user_id: str = '', chat_id: str = '', trace_id: str = '', ops_mode: bool = False, last_pending_list: list | None = None) -> str: trace = new_trace(text) if trace_id: trace['trace_id'] = trace_id os.environ['AGX_TRACE_ID'] = trace['trace_id'] os.environ['AGX_USER_ID'] = str(user_id) os.environ['AGX_CHAT_ID'] = str(chat_id) os.environ['AGX_OPS_MODE'] = '1' if ops_mode else '0' # operator commands if text.strip().startswith('/'): op_res = route_operator_command(text, str(user_id), str(chat_id)) if op_res: return json.dumps(op_res, ensure_ascii=False) elif ops_mode: op_res = route_operator_text(text, str(user_id), str(chat_id), last_pending_list=last_pending_list) if op_res: return json.dumps(op_res, ensure_ascii=False) stepan = build_stepan() ops = build_operations() iot = build_iot() platform = build_platform() spreadsheet = build_spreadsheet() sustainability = build_sustainability() audit_event({**trace, 'agent': 'stepan', 'action': 'intake'}) # Preflight normalization norm = tool_dictionary.normalize_from_text(text, trace_id=trace['trace_id'], source='telegram') pending = [item for cat in norm.values() for item in cat if item.get('status') == 'pending'] if pending: lines = ["=== PENDING TERMS (Stepan) ==="] for item in pending: lines.append(f"- {item.get('term')}: {item.get('suggestions', [])[:3]}") lines.append("\nБудь ласка, уточніть невідомі терміни. Після підтвердження я продовжу.") return "\n".join(lines) intent = detect_intent(text) pending_count = 0 if ops_mode: try: from agromatrix_tools import tool_dictionary_review as review pending_count = review.stats().get('open', 0) except Exception: pending_count = 0 if intent in ['plan_week', 'plan_day']: plan_id = tool_operation_plan.create_plan({ 'scope': { 'field_ids': [i.get('normalized_id') for i in norm.get('fields', []) if i.get('status')=='ok'], 'crop_ids': [i.get('normalized_id') for i in norm.get('crops', []) if i.get('status')=='ok'], 'date_window': {'start': '', 'end': ''} }, 'tasks': [] }, trace_id=trace['trace_id'], source='telegram') return json.dumps({ 'status': 'ok', 'summary': f'План створено: {plan_id}', 'artifacts': [], 'tool_calls': [], 'next_actions': ['уточнити дати та операції'], 'pending_dictionary_items': pending_count if ops_mode else None }, ensure_ascii=False) if intent == 'show_critical_tomorrow': _ = tool_operation_plan.plan_dashboard({}, {}) return json.dumps({ 'status': 'ok', 'summary': 'Критичні задачі на завтра', 'artifacts': [], 'tool_calls': [], 'next_actions': [], 'pending_dictionary_items': pending_count if ops_mode else None }, ensure_ascii=False) if intent == 'plan_vs_fact': _ = tool_operation_plan.plan_dashboard({}, {}) return json.dumps({ 'status': 'ok', 'summary': 'План/факт зведення', 'artifacts': [], 'tool_calls': [], 'next_actions': [], 'pending_dictionary_items': pending_count if ops_mode else None }, ensure_ascii=False) # general crew flow ops_out = run_task_with_retry(ops, "Оціни чи потрібні операційні записи або читання farmOS", trace['trace_id']) iot_out = run_task_with_retry(iot, "Оціни чи є потреба в даних ThingsBoard або NATS", trace['trace_id']) platform_out = run_task_with_retry(platform, "Перевір базовий статус сервісів/інтеграцій", trace['trace_id']) sheet_out = run_task_with_retry(spreadsheet, "Якщо запит стосується таблиць — підготуй артефакти", trace['trace_id']) sustainability_out = run_task_with_retry(sustainability, "Якщо потрібні агрегації — дай read-only підсумки", trace['trace_id']) audit_event({**trace, 'agent': 'stepan', 'action': 'delegate', 'targets': ['ops','iot','platform','spreadsheet','sustainability']}) summary = { 'ops': ops_out, 'iot': iot_out, 'platform': platform_out, 'spreadsheet': sheet_out, 'sustainability': sustainability_out } final_task = Task( description=f"Сформуй фінальну коротку відповідь користувачу. Вхідні дані (JSON): {json.dumps(summary, ensure_ascii=False)}", expected_output="Коротка консолідована відповідь для користувача українською.", agent=stepan ) crew = Crew(agents=[stepan], tasks=[final_task], verbose=True) result = crew.kickoff() return str(result) + farmos_ui_hint() def main(): if len(sys.argv) < 2: print('Usage: python run.py [--trace ] ""') sys.exit(1) args = sys.argv[1:] trace_override = None if args and args[0] == '--trace': trace_override = args[1] args = args[2:] user_request = args[0] output = handle_message(user_request, user_id=os.getenv('AGX_USER_ID',''), chat_id=os.getenv('AGX_CHAT_ID',''), trace_id=trace_override or '', ops_mode=os.getenv('AGX_OPS_MODE','0')=='1') print(output) if __name__ == '__main__': main()