import logging import os import re import time from urllib.parse import quote as _urlquote from .audit import audit_tool_call import requests from .common import _auth_headers logger = logging.getLogger(__name__) # Динамічно читаємо при кожному виклику (не кешуємо на рівні модуля), # щоб env-зміни без рестарту підхоплювались у тестах. def _farmos_base_url() -> str: return os.getenv("FARMOS_BASE_URL", "").strip() FARMOS_BASE_URL = os.getenv("FARMOS_BASE_URL", "http://localhost:8080") # ── Shared limits ───────────────────────────────────────────────────────────── _MAX_LIMIT = 20 _MIN_LIMIT = 1 _OUTPUT_MAX_LINES = 12 # ── whitelist для farmos_read_logs ─────────────────────────────────────────── _VALID_LOG_TYPES = {"activity", "observation", "harvest", "input", "seeding"} # ── whitelist для farmos_search_assets ─────────────────────────────────────── _VALID_ASSET_TYPES = { "asset_land", "asset_plant", "asset_equipment", "asset_structure", "asset_animal", } # farmOS 4.x JSON:API URL: /api/asset/ і /api/log/ # Наші whitelist ключі типу "asset_land" → "/api/asset/land" def _asset_type_to_path(asset_type: str) -> str: """Конвертує 'asset_land' → 'asset/land' для URL.""" # asset_land → asset/land, asset_plant → asset/plant, etc. if asset_type.startswith("asset_"): return "asset/" + asset_type[len("asset_"):] return asset_type # fallback (не має траплятися) # ── Нормалізація типу помилки → human-readable мітка ─────────────────────── def _classify_exception(exc: Exception) -> str: """Повертає одну з мітель: timeout / dns / ssl / connect / other.""" name = type(exc).__name__ msg = str(exc).lower() if "timeout" in name.lower() or "timeout" in msg: return "timeout" if "name or service not known" in msg or "nodename nor servname" in msg or "dns" in msg: return "dns" if "ssl" in msg or "ssl" in name.lower() or "certificate" in msg: return "ssl" if "connect" in name.lower() or "connection" in msg: return "connect" return "other" # ── v4.3: farmos_ping — fail-closed CrewAI tool ────────────────────────────── # Підключається до operations_agent. Ніколи не кидає виняток. # Не виводить у відповідь URL або токени. def _make_lc_tool(name: str, description: str, func): """ Обгортає plain function у langchain_core.tools.Tool для сумісності зі старими версіями crewai (які не мають crewai.tools.tool декоратора). Fallback: повертає ту саму plain function без обгортки. """ try: from langchain_core.tools import Tool as _LCTool return _LCTool(name=name, description=description, func=func) except Exception: pass try: from langchain.tools import Tool as _LCTool2 # type: ignore[import] return _LCTool2(name=name, description=description, func=func) except Exception: pass # Fallback: plain function (для середовищ без crewai/langchain взагалі) return func def _farmos_ping_raw(query: str = "") -> str: # noqa: ARG001 return _farmos_ping_impl() farmos_ping = _make_lc_tool( name="farmos_ping", description=( "Перевіряє доступність і авторизацію farmOS. " "Повертає стан: доступний / недоступний / не налаштований. " "Використовуй для швидкої діагностики перед іншими farmOS-операціями." ), func=_farmos_ping_raw, ) def _farmos_ping_impl() -> str: """ Логіка farmos_ping, незалежна від декоратора. Fail-closed: будь-яка помилка → зрозумілий рядок. """ _t = time.time() base_url = _farmos_base_url() if not base_url: _tlog_farmos("farmos_ping", ok=False, reason="no_base_url", ms=int((time.time() - _t) * 1000)) return "FarmOS не налаштований: FARMOS_BASE_URL відсутній." if not _has_auth(): _tlog_farmos("farmos_ping", ok=False, reason="no_auth", ms=int((time.time() - _t) * 1000)) return "FarmOS URL заданий, але немає токена або логіну/паролю." # HTTP healthcheck try: ping_url = base_url.rstrip("/") + "/api" headers = _auth_headers() resp = requests.get(ping_url, headers=headers, timeout=3) ms = int((time.time() - _t) * 1000) if resp.status_code == 200: _tlog_farmos("farmos_ping", ok=True, reason="ok", http=True, status=200, ms=ms) return "FarmOS доступний." if resp.status_code in (401, 403): _tlog_farmos("farmos_ping", ok=False, reason="auth_error", http=True, status=resp.status_code, ms=ms) return f"FarmOS недоступний: помилка авторизації ({resp.status_code})." _tlog_farmos("farmos_ping", ok=False, reason=f"http_{resp.status_code}", http=True, status=resp.status_code, ms=ms) return f"FarmOS недоступний: HTTP {resp.status_code}." except requests.exceptions.Timeout: ms = int((time.time() - _t) * 1000) _tlog_farmos("farmos_ping", ok=False, reason="timeout", err="timeout", ms=ms) return "FarmOS недоступний: timeout (3s)." except Exception as exc: ms = int((time.time() - _t) * 1000) err_kind = _classify_exception(exc) _tlog_farmos("farmos_ping", ok=False, reason=err_kind, err=err_kind, ms=ms) if err_kind == "dns": return "FarmOS недоступний: DNS/host не знайдено." if err_kind == "ssl": return "FarmOS недоступний: TLS/SSL помилка." if err_kind == "connect": return "FarmOS недоступний: з'єднання відхилено." return f"FarmOS недоступний: {type(exc).__name__}." # ── v4.4: farmos_read_logs — read-only logs tool ───────────────────────────── def _farmos_read_logs_raw(log_type: str = "activity", limit: int = 10) -> str: return _farmos_read_logs_impl(log_type=log_type, limit=limit) farmos_read_logs = _make_lc_tool( name="farmos_read_logs", description=( "Читає останні записи farmOS (логи операцій). " "log_type: activity | observation | harvest | input | seeding. " "limit: 1..20. Тільки читання. Fail-closed." ), func=_farmos_read_logs_raw, ) def _farmos_read_logs_impl(log_type: str = "activity", limit: int = 10) -> str: """ Отримує останні N записів farmOS для заданого типу логу. Fail-closed: будь-яка помилка → зрозумілий рядок. """ _t = time.time() # Валідація вхідних параметрів if log_type not in _VALID_LOG_TYPES: valid = ", ".join(sorted(_VALID_LOG_TYPES)) return f"FarmOS: log_type має бути одним з: {valid}." limit = max(_MIN_LIMIT, min(_MAX_LIMIT, int(limit))) base_url = _farmos_base_url() if not base_url: _tlog_farmos("farmos_read_logs", ok=False, reason="no_base_url", log_type=log_type, limit=limit, ms=0) return "FarmOS не налаштований: FARMOS_BASE_URL відсутній." if not _has_auth(): _tlog_farmos("farmos_read_logs", ok=False, reason="no_auth", log_type=log_type, limit=limit, ms=0) return "FarmOS URL заданий, але немає токена або логіну/паролю." headers = _auth_headers() # Пробуємо з sort=-changed, fallback без sort (деякі версії не підтримують) param_sets = [ {"page[limit]": limit, "sort": "-changed"}, {"page[limit]": limit}, ] url = f"{base_url.rstrip('/')}/api/log/{log_type}" try: resp = _try_requests(url, headers, param_sets, timeout=5) ms = int((time.time() - _t) * 1000) if resp is None or resp.status_code == 404: _tlog_farmos("farmos_read_logs", ok=False, reason="not_found", http=True, status=404, log_type=log_type, limit=limit, ms=ms) return "FarmOS: endpoint для логів не знайдено (404)." if resp.status_code in (401, 403): _tlog_farmos("farmos_read_logs", ok=False, reason="auth_error", http=True, status=resp.status_code, log_type=log_type, limit=limit, ms=ms) return f"FarmOS недоступний: помилка авторизації ({resp.status_code})." if resp.status_code != 200: _tlog_farmos("farmos_read_logs", ok=False, reason=f"http_{resp.status_code}", http=True, status=resp.status_code, log_type=log_type, limit=limit, ms=ms) return f"FarmOS: помилка запиту ({resp.status_code})." items, parse_err = _parse_jsonapi_list(resp) if parse_err: _tlog_farmos("farmos_read_logs", ok=False, reason="parse_error", http=True, status=200, log_type=log_type, limit=limit, ms=ms) return parse_err if not items: _tlog_farmos("farmos_read_logs", ok=True, reason="empty", http=True, status=200, log_type=log_type, limit=limit, ms=ms) return "FarmOS: записів не знайдено." lines = [] for item in items: try: attrs = item.get("attributes", {}) if isinstance(item, dict) else {} # Patch 2: розширений маппінг полів name = _extract_name(attrs, fallback=log_type) date_str = _extract_date(attrs) # Patch 2: notes може бути dict {value:...} або description notes_raw = ( attrs.get("notes") or attrs.get("notes_value") or attrs.get("description") or {} ) if isinstance(notes_raw, dict): notes_raw = notes_raw.get("value") or "" # Patch 1: normalize whitespace + word-boundary truncation notes = _safe_notes(str(notes_raw)) lines.append(f"- {name} | {date_str} | {notes or '—'}") except Exception: lines.append("- (помилка читання запису)") if len(lines) > _OUTPUT_MAX_LINES: lines = lines[:_OUTPUT_MAX_LINES] _tlog_farmos("farmos_read_logs", ok=True, reason="ok", http=True, status=200, log_type=log_type, limit=limit, ms=ms) return "\n".join(lines) except requests.exceptions.Timeout: ms = int((time.time() - _t) * 1000) _tlog_farmos("farmos_read_logs", ok=False, reason="timeout", err="timeout", log_type=log_type, limit=limit, ms=ms) return "FarmOS: timeout при отриманні логів (5s)." except Exception as exc: ms = int((time.time() - _t) * 1000) err_kind = _classify_exception(exc) _tlog_farmos("farmos_read_logs", ok=False, reason=err_kind, err=err_kind, log_type=log_type, limit=limit, ms=ms) return "FarmOS: не вдалося отримати логи (внутрішня помилка)." # ── v4.5: farmos_search_assets — read-only asset search ────────────────────── def _farmos_search_assets_raw( asset_type: str = "asset_land", name_contains: str = "", limit: int = 10, ) -> str: return _farmos_search_assets_impl( asset_type=asset_type, name_contains=name_contains, limit=limit, ) farmos_search_assets = _make_lc_tool( name="farmos_search_assets", description=( "Шукає активи farmOS за типом і необов'язковим підрядком назви. " "asset_type: asset_land | asset_plant | asset_equipment | asset_structure | asset_animal. " "Тільки читання. Fail-closed." ), func=_farmos_search_assets_raw, ) def _farmos_search_assets_impl( asset_type: str = "asset_land", name_contains: str = "", limit: int = 10, ) -> str: """ Пошук активів farmOS через JSON:API. Fail-closed. """ _t = time.time() # Валідація if asset_type not in _VALID_ASSET_TYPES: valid = ", ".join(sorted(_VALID_ASSET_TYPES)) return f"FarmOS: asset_type має бути одним з: {valid}." limit = max(_MIN_LIMIT, min(_MAX_LIMIT, int(limit))) name_contains = str(name_contains).strip()[:50] # детерміновано обрізаємо base_url = _farmos_base_url() if not base_url: _tlog_farmos("farmos_search_assets", ok=False, reason="no_base_url", asset_type=asset_type, limit=limit, ms=0) return "FarmOS не налаштований: FARMOS_BASE_URL відсутній." if not _has_auth(): _tlog_farmos("farmos_search_assets", ok=False, reason="no_auth", asset_type=asset_type, limit=limit, ms=0) return "FarmOS URL заданий, але немає токена або логіну/паролю." headers = _auth_headers() url = f"{base_url.rstrip('/')}/api/{_asset_type_to_path(asset_type)}" # Параметри: спробуємо з server-side filter, потім без (client-side fallback) # farmOS 4.x сортує по "name", не "label" (label → 400) base_params = {"page[limit]": limit, "sort": "name"} no_sort_params = {"page[limit]": limit} if name_contains: param_sets = [ {**base_params, "filter[name][value]": name_contains}, base_params, no_sort_params, ] client_filter = True else: param_sets = [base_params, no_sort_params] client_filter = False try: resp = _try_requests(url, headers, param_sets, timeout=5) ms = int((time.time() - _t) * 1000) if resp is None or resp.status_code == 404: _tlog_farmos("farmos_search_assets", ok=False, reason="not_found", http=True, status=404, asset_type=asset_type, limit=limit, ms=ms) return "FarmOS: endpoint для asset не знайдено (404)." if resp.status_code in (401, 403): _tlog_farmos("farmos_search_assets", ok=False, reason="auth_error", http=True, status=resp.status_code, asset_type=asset_type, limit=limit, ms=ms) return f"FarmOS: помилка авторизації ({resp.status_code})." if resp.status_code != 200: _tlog_farmos("farmos_search_assets", ok=False, reason=f"http_{resp.status_code}", http=True, status=resp.status_code, asset_type=asset_type, limit=limit, ms=ms) return f"FarmOS: помилка запиту ({resp.status_code})." items, parse_err = _parse_jsonapi_list(resp) if parse_err: _tlog_farmos("farmos_search_assets", ok=False, reason="parse_error", http=True, status=200, asset_type=asset_type, limit=limit, ms=ms) return parse_err # Client-side substring filter (plain, no regex) if client_filter and name_contains and items: needle = name_contains.lower() items = [ it for it in items if needle in _extract_label(it).lower() ] if not items: _tlog_farmos("farmos_search_assets", ok=True, reason="empty", http=True, status=200, asset_type=asset_type, limit=limit, filtered=bool(name_contains), count=0, ms=ms) return "FarmOS: нічого не знайдено." lines = [] for item in items: try: label = _extract_label(item) item_type = str(item.get("type", asset_type)) item_id = str(item.get("id", ""))[:8] # тільки перші 8 символів UUID line = f"- {label} | {item_type} | id={item_id}" # Patch 1: normalize і обріз рядка до 120 символів line = re.sub(r"\s+", " ", line).strip()[:120] lines.append(line) except Exception: lines.append("- (помилка читання активу)") if len(lines) > _OUTPUT_MAX_LINES: lines = lines[:_OUTPUT_MAX_LINES] _tlog_farmos("farmos_search_assets", ok=True, reason="ok", http=True, status=200, asset_type=asset_type, limit=limit, filtered=bool(name_contains), count=len(lines), ms=ms) return "\n".join(lines) except requests.exceptions.Timeout: ms = int((time.time() - _t) * 1000) _tlog_farmos("farmos_search_assets", ok=False, reason="timeout", err="timeout", asset_type=asset_type, limit=limit, ms=ms) return "FarmOS: timeout (5s)." except Exception as exc: ms = int((time.time() - _t) * 1000) err_kind = _classify_exception(exc) _tlog_farmos("farmos_search_assets", ok=False, reason=err_kind, err=err_kind, asset_type=asset_type, limit=limit, ms=ms) return f"FarmOS: мережна помилка ({err_kind})." # ── Shared helpers ──────────────────────────────────────────────────────────── def _has_auth() -> bool: """Перевіряє наявність будь-якого виду авторизації.""" token = os.getenv("FARMOS_TOKEN", "").strip() farmos_user = os.getenv("FARMOS_USER", "").strip() farmos_pass = os.getenv("FARMOS_PASS", os.getenv("FARMOS_PASSWORD", "")).strip() return bool(token or (farmos_user and farmos_pass)) def _try_requests(url: str, headers: dict, param_sets: list, timeout: float) -> "requests.Response | None": """ Пробує серію наборів параметрів. Повертає першу відповідь, яка не 404, або None якщо всі 404. """ resp = None for params in param_sets: resp = requests.get(url, headers=headers, params=params, timeout=timeout) if resp.status_code != 404: return resp return resp # остання відповідь (404 або None) def _parse_jsonapi_list(resp: "requests.Response") -> "tuple[list, str | None]": """ Парсить JSON:API list відповідь. Повертає (items, error_string). Patch 3: перевіряє що data є list; якщо ні → зрозуміла помилка. """ try: data = resp.json() except Exception: return [], "FarmOS: не вдалося розібрати відповідь (не валідний JSON)." if not isinstance(data, dict): return [], "FarmOS: неочікуваний формат відповіді." raw = data.get("data") if raw is None: return [], None # порожня відповідь — не помилка if not isinstance(raw, list): return [], "FarmOS: неочікуваний формат відповіді (data не є списком)." return raw, None def _safe_notes(raw: str, max_len: int = 80) -> str: """ Patch 1: normalize whitespace (tabs, newlines, multiple spaces → single space), потім обрізає по межі слова (не посеред UTF-8 символу). """ normalized = re.sub(r"\s+", " ", raw).strip() if len(normalized) <= max_len: return normalized truncated = normalized[:max_len] # Обріз по останньому пробілу — не посеред слова boundary = truncated.rfind(" ") if boundary > max_len // 2: return truncated[:boundary] return truncated def _extract_name(attrs: dict, fallback: str = "") -> str: """Patch 2: best-effort name з різних полів JSON:API.""" raw = ( attrs.get("name") or attrs.get("label") or attrs.get("type") or fallback ) return re.sub(r"\s+", " ", str(raw)).strip() def _extract_label(item: dict) -> str: """Отримує label/name активу з JSON:API item.""" attrs = item.get("attributes", {}) if isinstance(item, dict) else {} raw = ( attrs.get("label") or attrs.get("name") or attrs.get("type") or "(no label)" ) return re.sub(r"\s+", " ", str(raw)).strip() def _extract_date(attrs: dict) -> str: """Patch 2: best-effort date з timestamp/changed/created.""" from datetime import datetime, timezone for field in ("timestamp", "changed", "created"): ts = attrs.get(field) if ts is None: continue if isinstance(ts, (int, float)) and ts > 0: return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d") if isinstance(ts, str) and ts: return ts[:10] return "—" # ── Telemetry helper ────────────────────────────────────────────────────────── def _tlog_farmos( event: str, ok: bool, reason: str, status: int = 0, http: bool = False, err: str = "", log_type: str = "", asset_type: str = "", limit: int = 0, filtered: bool = False, count: int = -1, ms: int = 0, ) -> None: """PII-safe telemetry. Без URL, токенів, user_id.""" try: extra = f" http={http} status={status}" if http else "" extra += f" err={err}" if err else "" extra += f" log_type={log_type}" if log_type else "" extra += f" asset_type={asset_type}" if asset_type else "" extra += f" limit={limit}" if limit else "" extra += f" filtered={filtered}" if filtered else "" extra += f" count={count}" if count >= 0 else "" logger.info( "AGX_STEPAN_METRIC %s ok=%s reason=%s%s ms=%s", event, ok, reason, extra, ms, ) audit_tool_call( f"tool_farmos_read.{event}", {"reason": reason, "log_type": log_type, "asset_type": asset_type}, {"ok": ok, "status": status}, ok, ms, ) except Exception: pass def get_asset(asset_id: str): _t = time.time() url = f"{FARMOS_BASE_URL}/jsonapi/asset/asset/{asset_id}" r = requests.get(url, headers=_auth_headers(), timeout=20) r.raise_for_status() out = r.json() audit_tool_call("tool_farmos_read.get_asset", {"asset_id": asset_id}, {"ok": True}, True, int((time.time()-_t)*1000)) return out def search_assets(name_contains: str = "", limit: int = 10): _t = time.time() params = {} if name_contains: params["filter[name][condition][path]"] = "name" params["filter[name][condition][operator]"] = "CONTAINS" params["filter[name][condition][value]"] = name_contains params["page[limit]"] = limit url = f"{FARMOS_BASE_URL}/jsonapi/asset/asset" r = requests.get(url, headers=_auth_headers(), params=params, timeout=20) r.raise_for_status() out = r.json() audit_tool_call("tool_farmos_read.search_assets", {"name_contains": name_contains}, {"ok": True}, True, int((time.time()-_t)*1000)) return out def read_logs(log_type: str = "observation", limit: int = 10): _t = time.time() url = f"{FARMOS_BASE_URL}/jsonapi/log/{log_type}" params = {"page[limit]": limit} r = requests.get(url, headers=_auth_headers(), params=params, timeout=20) r.raise_for_status() out = r.json() audit_tool_call("tool_farmos_read.read_logs", {"log_type": log_type, "limit": limit}, {"ok": True}, True, int((time.time()-_t)*1000)) return out def read_inventory(limit: int = 10): _t = time.time() url = f"{FARMOS_BASE_URL}/jsonapi/log/inventory" params = {"page[limit]": limit} r = requests.get(url, headers=_auth_headers(), params=params, timeout=20) r.raise_for_status() out = r.json() audit_tool_call("tool_farmos_read.read_inventory", {"limit": limit}, {"ok": True}, True, int((time.time()-_t)*1000)) return out