Files
microdao-daarion/packages/agromatrix-tools/agromatrix_tools/tool_farmos_read.py
Apple 90080c632a fix(fabric): use broadcast subject for NATS capabilities discovery
NATS wildcards (node.*.capabilities.get) only work for subscriptions,
not for publish. Switch to a dedicated broadcast subject
(fabric.capabilities.discover) that all NCS instances subscribe to,
enabling proper scatter-gather discovery across nodes.

Made-with: Cursor
2026-02-27 03:20:13 -08:00

587 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import os
import re
import time
from urllib.parse import quote as _urlquote
from .audit import audit_tool_call
import requests
from .common import _auth_headers
logger = logging.getLogger(__name__)
# Динамічно читаємо при кожному виклику (не кешуємо на рівні модуля),
# щоб env-зміни без рестарту підхоплювались у тестах.
def _farmos_base_url() -> str:
return os.getenv("FARMOS_BASE_URL", "").strip()
FARMOS_BASE_URL = os.getenv("FARMOS_BASE_URL", "http://localhost:8080")
# ── Shared limits ─────────────────────────────────────────────────────────────
_MAX_LIMIT = 20
_MIN_LIMIT = 1
_OUTPUT_MAX_LINES = 12
# ── whitelist для farmos_read_logs ───────────────────────────────────────────
_VALID_LOG_TYPES = {"activity", "observation", "harvest", "input", "seeding"}
# ── whitelist для farmos_search_assets ───────────────────────────────────────
_VALID_ASSET_TYPES = {
"asset_land", "asset_plant", "asset_equipment",
"asset_structure", "asset_animal",
}
# farmOS 4.x JSON:API URL: /api/asset/<type> і /api/log/<type>
# Наші whitelist ключі типу "asset_land" → "/api/asset/land"
def _asset_type_to_path(asset_type: str) -> str:
"""Конвертує 'asset_land''asset/land' для URL."""
# asset_land → asset/land, asset_plant → asset/plant, etc.
if asset_type.startswith("asset_"):
return "asset/" + asset_type[len("asset_"):]
return asset_type # fallback (не має траплятися)
# ── Нормалізація типу помилки → human-readable мітка ───────────────────────
def _classify_exception(exc: Exception) -> str:
"""Повертає одну з мітель: timeout / dns / ssl / connect / other."""
name = type(exc).__name__
msg = str(exc).lower()
if "timeout" in name.lower() or "timeout" in msg:
return "timeout"
if "name or service not known" in msg or "nodename nor servname" in msg or "dns" in msg:
return "dns"
if "ssl" in msg or "ssl" in name.lower() or "certificate" in msg:
return "ssl"
if "connect" in name.lower() or "connection" in msg:
return "connect"
return "other"
# ── v4.3: farmos_ping — fail-closed CrewAI tool ──────────────────────────────
# Підключається до operations_agent. Ніколи не кидає виняток.
# Не виводить у відповідь URL або токени.
def _make_lc_tool(name: str, description: str, func):
"""
Обгортає plain function у langchain_core.tools.Tool для сумісності
зі старими версіями crewai (які не мають crewai.tools.tool декоратора).
Fallback: повертає ту саму plain function без обгортки.
"""
try:
from langchain_core.tools import Tool as _LCTool
return _LCTool(name=name, description=description, func=func)
except Exception:
pass
try:
from langchain.tools import Tool as _LCTool2 # type: ignore[import]
return _LCTool2(name=name, description=description, func=func)
except Exception:
pass
# Fallback: plain function (для середовищ без crewai/langchain взагалі)
return func
def _farmos_ping_raw(query: str = "") -> str: # noqa: ARG001
return _farmos_ping_impl()
farmos_ping = _make_lc_tool(
name="farmos_ping",
description=(
"Перевіряє доступність і авторизацію farmOS. "
"Повертає стан: доступний / недоступний / не налаштований. "
"Використовуй для швидкої діагностики перед іншими farmOS-операціями."
),
func=_farmos_ping_raw,
)
def _farmos_ping_impl() -> str:
"""
Логіка farmos_ping, незалежна від декоратора.
Fail-closed: будь-яка помилка → зрозумілий рядок.
"""
_t = time.time()
base_url = _farmos_base_url()
if not base_url:
_tlog_farmos("farmos_ping", ok=False, reason="no_base_url", ms=int((time.time() - _t) * 1000))
return "FarmOS не налаштований: FARMOS_BASE_URL відсутній."
if not _has_auth():
_tlog_farmos("farmos_ping", ok=False, reason="no_auth", ms=int((time.time() - _t) * 1000))
return "FarmOS URL заданий, але немає токена або логіну/паролю."
# HTTP healthcheck
try:
ping_url = base_url.rstrip("/") + "/api"
headers = _auth_headers()
resp = requests.get(ping_url, headers=headers, timeout=3)
ms = int((time.time() - _t) * 1000)
if resp.status_code == 200:
_tlog_farmos("farmos_ping", ok=True, reason="ok", http=True, status=200, ms=ms)
return "FarmOS доступний."
if resp.status_code in (401, 403):
_tlog_farmos("farmos_ping", ok=False, reason="auth_error", http=True, status=resp.status_code, ms=ms)
return f"FarmOS недоступний: помилка авторизації ({resp.status_code})."
_tlog_farmos("farmos_ping", ok=False, reason=f"http_{resp.status_code}", http=True, status=resp.status_code, ms=ms)
return f"FarmOS недоступний: HTTP {resp.status_code}."
except requests.exceptions.Timeout:
ms = int((time.time() - _t) * 1000)
_tlog_farmos("farmos_ping", ok=False, reason="timeout", err="timeout", ms=ms)
return "FarmOS недоступний: timeout (3s)."
except Exception as exc:
ms = int((time.time() - _t) * 1000)
err_kind = _classify_exception(exc)
_tlog_farmos("farmos_ping", ok=False, reason=err_kind, err=err_kind, ms=ms)
if err_kind == "dns":
return "FarmOS недоступний: DNS/host не знайдено."
if err_kind == "ssl":
return "FarmOS недоступний: TLS/SSL помилка."
if err_kind == "connect":
return "FarmOS недоступний: з'єднання відхилено."
return f"FarmOS недоступний: {type(exc).__name__}."
# ── v4.4: farmos_read_logs — read-only logs tool ─────────────────────────────
def _farmos_read_logs_raw(log_type: str = "activity", limit: int = 10) -> str:
return _farmos_read_logs_impl(log_type=log_type, limit=limit)
farmos_read_logs = _make_lc_tool(
name="farmos_read_logs",
description=(
"Читає останні записи farmOS (логи операцій). "
"log_type: activity | observation | harvest | input | seeding. "
"limit: 1..20. Тільки читання. Fail-closed."
),
func=_farmos_read_logs_raw,
)
def _farmos_read_logs_impl(log_type: str = "activity", limit: int = 10) -> str:
"""
Отримує останні N записів farmOS для заданого типу логу.
Fail-closed: будь-яка помилка → зрозумілий рядок.
"""
_t = time.time()
# Валідація вхідних параметрів
if log_type not in _VALID_LOG_TYPES:
valid = ", ".join(sorted(_VALID_LOG_TYPES))
return f"FarmOS: log_type має бути одним з: {valid}."
limit = max(_MIN_LIMIT, min(_MAX_LIMIT, int(limit)))
base_url = _farmos_base_url()
if not base_url:
_tlog_farmos("farmos_read_logs", ok=False, reason="no_base_url",
log_type=log_type, limit=limit, ms=0)
return "FarmOS не налаштований: FARMOS_BASE_URL відсутній."
if not _has_auth():
_tlog_farmos("farmos_read_logs", ok=False, reason="no_auth",
log_type=log_type, limit=limit, ms=0)
return "FarmOS URL заданий, але немає токена або логіну/паролю."
headers = _auth_headers()
# Пробуємо з sort=-changed, fallback без sort (деякі версії не підтримують)
param_sets = [
{"page[limit]": limit, "sort": "-changed"},
{"page[limit]": limit},
]
url = f"{base_url.rstrip('/')}/api/log/{log_type}"
try:
resp = _try_requests(url, headers, param_sets, timeout=5)
ms = int((time.time() - _t) * 1000)
if resp is None or resp.status_code == 404:
_tlog_farmos("farmos_read_logs", ok=False, reason="not_found", http=True,
status=404, log_type=log_type, limit=limit, ms=ms)
return "FarmOS: endpoint для логів не знайдено (404)."
if resp.status_code in (401, 403):
_tlog_farmos("farmos_read_logs", ok=False, reason="auth_error", http=True,
status=resp.status_code, log_type=log_type, limit=limit, ms=ms)
return f"FarmOS недоступний: помилка авторизації ({resp.status_code})."
if resp.status_code != 200:
_tlog_farmos("farmos_read_logs", ok=False, reason=f"http_{resp.status_code}",
http=True, status=resp.status_code, log_type=log_type, limit=limit, ms=ms)
return f"FarmOS: помилка запиту ({resp.status_code})."
items, parse_err = _parse_jsonapi_list(resp)
if parse_err:
_tlog_farmos("farmos_read_logs", ok=False, reason="parse_error", http=True,
status=200, log_type=log_type, limit=limit, ms=ms)
return parse_err
if not items:
_tlog_farmos("farmos_read_logs", ok=True, reason="empty", http=True,
status=200, log_type=log_type, limit=limit, ms=ms)
return "FarmOS: записів не знайдено."
lines = []
for item in items:
try:
attrs = item.get("attributes", {}) if isinstance(item, dict) else {}
# Patch 2: розширений маппінг полів
name = _extract_name(attrs, fallback=log_type)
date_str = _extract_date(attrs)
# Patch 2: notes може бути dict {value:...} або description
notes_raw = (
attrs.get("notes")
or attrs.get("notes_value")
or attrs.get("description")
or {}
)
if isinstance(notes_raw, dict):
notes_raw = notes_raw.get("value") or ""
# Patch 1: normalize whitespace + word-boundary truncation
notes = _safe_notes(str(notes_raw))
lines.append(f"- {name} | {date_str} | {notes or ''}")
except Exception:
lines.append("- (помилка читання запису)")
if len(lines) > _OUTPUT_MAX_LINES:
lines = lines[:_OUTPUT_MAX_LINES]
_tlog_farmos("farmos_read_logs", ok=True, reason="ok", http=True,
status=200, log_type=log_type, limit=limit, ms=ms)
return "\n".join(lines)
except requests.exceptions.Timeout:
ms = int((time.time() - _t) * 1000)
_tlog_farmos("farmos_read_logs", ok=False, reason="timeout", err="timeout",
log_type=log_type, limit=limit, ms=ms)
return "FarmOS: timeout при отриманні логів (5s)."
except Exception as exc:
ms = int((time.time() - _t) * 1000)
err_kind = _classify_exception(exc)
_tlog_farmos("farmos_read_logs", ok=False, reason=err_kind, err=err_kind,
log_type=log_type, limit=limit, ms=ms)
return "FarmOS: не вдалося отримати логи (внутрішня помилка)."
# ── v4.5: farmos_search_assets — read-only asset search ──────────────────────
def _farmos_search_assets_raw(
asset_type: str = "asset_land",
name_contains: str = "",
limit: int = 10,
) -> str:
return _farmos_search_assets_impl(
asset_type=asset_type,
name_contains=name_contains,
limit=limit,
)
farmos_search_assets = _make_lc_tool(
name="farmos_search_assets",
description=(
"Шукає активи farmOS за типом і необов'язковим підрядком назви. "
"asset_type: asset_land | asset_plant | asset_equipment | asset_structure | asset_animal. "
"Тільки читання. Fail-closed."
),
func=_farmos_search_assets_raw,
)
def _farmos_search_assets_impl(
asset_type: str = "asset_land",
name_contains: str = "",
limit: int = 10,
) -> str:
"""
Пошук активів farmOS через JSON:API. Fail-closed.
"""
_t = time.time()
# Валідація
if asset_type not in _VALID_ASSET_TYPES:
valid = ", ".join(sorted(_VALID_ASSET_TYPES))
return f"FarmOS: asset_type має бути одним з: {valid}."
limit = max(_MIN_LIMIT, min(_MAX_LIMIT, int(limit)))
name_contains = str(name_contains).strip()[:50] # детерміновано обрізаємо
base_url = _farmos_base_url()
if not base_url:
_tlog_farmos("farmos_search_assets", ok=False, reason="no_base_url",
asset_type=asset_type, limit=limit, ms=0)
return "FarmOS не налаштований: FARMOS_BASE_URL відсутній."
if not _has_auth():
_tlog_farmos("farmos_search_assets", ok=False, reason="no_auth",
asset_type=asset_type, limit=limit, ms=0)
return "FarmOS URL заданий, але немає токена або логіну/паролю."
headers = _auth_headers()
url = f"{base_url.rstrip('/')}/api/{_asset_type_to_path(asset_type)}"
# Параметри: спробуємо з server-side filter, потім без (client-side fallback)
# farmOS 4.x сортує по "name", не "label" (label → 400)
base_params = {"page[limit]": limit, "sort": "name"}
no_sort_params = {"page[limit]": limit}
if name_contains:
param_sets = [
{**base_params, "filter[name][value]": name_contains},
base_params,
no_sort_params,
]
client_filter = True
else:
param_sets = [base_params, no_sort_params]
client_filter = False
try:
resp = _try_requests(url, headers, param_sets, timeout=5)
ms = int((time.time() - _t) * 1000)
if resp is None or resp.status_code == 404:
_tlog_farmos("farmos_search_assets", ok=False, reason="not_found", http=True,
status=404, asset_type=asset_type, limit=limit, ms=ms)
return "FarmOS: endpoint для asset не знайдено (404)."
if resp.status_code in (401, 403):
_tlog_farmos("farmos_search_assets", ok=False, reason="auth_error", http=True,
status=resp.status_code, asset_type=asset_type, limit=limit, ms=ms)
return f"FarmOS: помилка авторизації ({resp.status_code})."
if resp.status_code != 200:
_tlog_farmos("farmos_search_assets", ok=False, reason=f"http_{resp.status_code}",
http=True, status=resp.status_code, asset_type=asset_type, limit=limit, ms=ms)
return f"FarmOS: помилка запиту ({resp.status_code})."
items, parse_err = _parse_jsonapi_list(resp)
if parse_err:
_tlog_farmos("farmos_search_assets", ok=False, reason="parse_error", http=True,
status=200, asset_type=asset_type, limit=limit, ms=ms)
return parse_err
# Client-side substring filter (plain, no regex)
if client_filter and name_contains and items:
needle = name_contains.lower()
items = [
it for it in items
if needle in _extract_label(it).lower()
]
if not items:
_tlog_farmos("farmos_search_assets", ok=True, reason="empty", http=True,
status=200, asset_type=asset_type, limit=limit,
filtered=bool(name_contains), count=0, ms=ms)
return "FarmOS: нічого не знайдено."
lines = []
for item in items:
try:
label = _extract_label(item)
item_type = str(item.get("type", asset_type))
item_id = str(item.get("id", ""))[:8] # тільки перші 8 символів UUID
line = f"- {label} | {item_type} | id={item_id}"
# Patch 1: normalize і обріз рядка до 120 символів
line = re.sub(r"\s+", " ", line).strip()[:120]
lines.append(line)
except Exception:
lines.append("- (помилка читання активу)")
if len(lines) > _OUTPUT_MAX_LINES:
lines = lines[:_OUTPUT_MAX_LINES]
_tlog_farmos("farmos_search_assets", ok=True, reason="ok", http=True,
status=200, asset_type=asset_type, limit=limit,
filtered=bool(name_contains), count=len(lines), ms=ms)
return "\n".join(lines)
except requests.exceptions.Timeout:
ms = int((time.time() - _t) * 1000)
_tlog_farmos("farmos_search_assets", ok=False, reason="timeout", err="timeout",
asset_type=asset_type, limit=limit, ms=ms)
return "FarmOS: timeout (5s)."
except Exception as exc:
ms = int((time.time() - _t) * 1000)
err_kind = _classify_exception(exc)
_tlog_farmos("farmos_search_assets", ok=False, reason=err_kind, err=err_kind,
asset_type=asset_type, limit=limit, ms=ms)
return f"FarmOS: мережна помилка ({err_kind})."
# ── Shared helpers ────────────────────────────────────────────────────────────
def _has_auth() -> bool:
"""Перевіряє наявність будь-якого виду авторизації."""
token = os.getenv("FARMOS_TOKEN", "").strip()
farmos_user = os.getenv("FARMOS_USER", "").strip()
farmos_pass = os.getenv("FARMOS_PASS", os.getenv("FARMOS_PASSWORD", "")).strip()
return bool(token or (farmos_user and farmos_pass))
def _try_requests(url: str, headers: dict, param_sets: list, timeout: float) -> "requests.Response | None":
"""
Пробує серію наборів параметрів. Повертає першу відповідь, яка не 404,
або None якщо всі 404.
"""
resp = None
for params in param_sets:
resp = requests.get(url, headers=headers, params=params, timeout=timeout)
if resp.status_code != 404:
return resp
return resp # остання відповідь (404 або None)
def _parse_jsonapi_list(resp: "requests.Response") -> "tuple[list, str | None]":
"""
Парсить JSON:API list відповідь. Повертає (items, error_string).
Patch 3: перевіряє що data є list; якщо ні → зрозуміла помилка.
"""
try:
data = resp.json()
except Exception:
return [], "FarmOS: не вдалося розібрати відповідь (не валідний JSON)."
if not isinstance(data, dict):
return [], "FarmOS: неочікуваний формат відповіді."
raw = data.get("data")
if raw is None:
return [], None # порожня відповідь — не помилка
if not isinstance(raw, list):
return [], "FarmOS: неочікуваний формат відповіді (data не є списком)."
return raw, None
def _safe_notes(raw: str, max_len: int = 80) -> str:
"""
Patch 1: normalize whitespace (tabs, newlines, multiple spaces → single space),
потім обрізає по межі слова (не посеред UTF-8 символу).
"""
normalized = re.sub(r"\s+", " ", raw).strip()
if len(normalized) <= max_len:
return normalized
truncated = normalized[:max_len]
# Обріз по останньому пробілу — не посеред слова
boundary = truncated.rfind(" ")
if boundary > max_len // 2:
return truncated[:boundary]
return truncated
def _extract_name(attrs: dict, fallback: str = "") -> str:
"""Patch 2: best-effort name з різних полів JSON:API."""
raw = (
attrs.get("name")
or attrs.get("label")
or attrs.get("type")
or fallback
)
return re.sub(r"\s+", " ", str(raw)).strip()
def _extract_label(item: dict) -> str:
"""Отримує label/name активу з JSON:API item."""
attrs = item.get("attributes", {}) if isinstance(item, dict) else {}
raw = (
attrs.get("label")
or attrs.get("name")
or attrs.get("type")
or "(no label)"
)
return re.sub(r"\s+", " ", str(raw)).strip()
def _extract_date(attrs: dict) -> str:
"""Patch 2: best-effort date з timestamp/changed/created."""
from datetime import datetime, timezone
for field in ("timestamp", "changed", "created"):
ts = attrs.get(field)
if ts is None:
continue
if isinstance(ts, (int, float)) and ts > 0:
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d")
if isinstance(ts, str) and ts:
return ts[:10]
return ""
# ── Telemetry helper ──────────────────────────────────────────────────────────
def _tlog_farmos(
event: str,
ok: bool,
reason: str,
status: int = 0,
http: bool = False,
err: str = "",
log_type: str = "",
asset_type: str = "",
limit: int = 0,
filtered: bool = False,
count: int = -1,
ms: int = 0,
) -> None:
"""PII-safe telemetry. Без URL, токенів, user_id."""
try:
extra = f" http={http} status={status}" if http else ""
extra += f" err={err}" if err else ""
extra += f" log_type={log_type}" if log_type else ""
extra += f" asset_type={asset_type}" if asset_type else ""
extra += f" limit={limit}" if limit else ""
extra += f" filtered={filtered}" if filtered else ""
extra += f" count={count}" if count >= 0 else ""
logger.info(
"AGX_STEPAN_METRIC %s ok=%s reason=%s%s ms=%s",
event, ok, reason, extra, ms,
)
audit_tool_call(
f"tool_farmos_read.{event}",
{"reason": reason, "log_type": log_type, "asset_type": asset_type},
{"ok": ok, "status": status},
ok,
ms,
)
except Exception:
pass
def get_asset(asset_id: str):
_t = time.time()
url = f"{FARMOS_BASE_URL}/jsonapi/asset/asset/{asset_id}"
r = requests.get(url, headers=_auth_headers(), timeout=20)
r.raise_for_status()
out = r.json()
audit_tool_call("tool_farmos_read.get_asset", {"asset_id": asset_id}, {"ok": True}, True, int((time.time()-_t)*1000))
return out
def search_assets(name_contains: str = "", limit: int = 10):
_t = time.time()
params = {}
if name_contains:
params["filter[name][condition][path]"] = "name"
params["filter[name][condition][operator]"] = "CONTAINS"
params["filter[name][condition][value]"] = name_contains
params["page[limit]"] = limit
url = f"{FARMOS_BASE_URL}/jsonapi/asset/asset"
r = requests.get(url, headers=_auth_headers(), params=params, timeout=20)
r.raise_for_status()
out = r.json()
audit_tool_call("tool_farmos_read.search_assets", {"name_contains": name_contains}, {"ok": True}, True, int((time.time()-_t)*1000))
return out
def read_logs(log_type: str = "observation", limit: int = 10):
_t = time.time()
url = f"{FARMOS_BASE_URL}/jsonapi/log/{log_type}"
params = {"page[limit]": limit}
r = requests.get(url, headers=_auth_headers(), params=params, timeout=20)
r.raise_for_status()
out = r.json()
audit_tool_call("tool_farmos_read.read_logs", {"log_type": log_type, "limit": limit}, {"ok": True}, True, int((time.time()-_t)*1000))
return out
def read_inventory(limit: int = 10):
_t = time.time()
url = f"{FARMOS_BASE_URL}/jsonapi/log/inventory"
params = {"page[limit]": limit}
r = requests.get(url, headers=_auth_headers(), params=params, timeout=20)
r.raise_for_status()
out = r.json()
audit_tool_call("tool_farmos_read.read_inventory", {"limit": limit}, {"ok": True}, True, int((time.time()-_t)*1000))
return out