Files
microdao-daarion/crews/agromatrix_crew/operator_commands.py
Apple 90080c632a fix(fabric): use broadcast subject for NATS capabilities discovery
NATS wildcards (node.*.capabilities.get) only work for subscriptions,
not for publish. Switch to a dedicated broadcast subject
(fabric.capabilities.discover) that all NCS instances subscribe to,
enabling proper scatter-gather discovery across nodes.

Made-with: Cursor
2026-02-27 03:20:13 -08:00

686 lines
24 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Operator commands for AgroMatrix (Stepan). Access control and slash commands.
Access control (env, used by gateway and here):
- AGX_OPERATOR_IDS: comma-separated Telegram user_id list; only these users are operators.
- AGX_OPERATOR_CHAT_ID: optional; if set, operator actions allowed only in this chat_id.
When is_operator(user_id, chat_id) is True, gateway routes any message (not only slash)
to Stepan for human-friendly operator interaction.
"""
import os
import re
import shlex
from agromatrix_tools import tool_dictionary_review as review
CATEGORIES = {"field","crop","operation","material","unit"}
# Only these slash-commands are treated as operator commands.
# Everything else (e.g. /start, /agromatrix) must fall through to the normal agent flow.
OPERATOR_COMMANDS = {
"whoami",
"pending",
"pending_show",
"approve",
"reject",
"apply_dict",
"pending_stats",
"doc", # v3.5: Doc Focus Gate control (/doc on|off|status)
"farmos", # v4.3: FarmOS healthcheck (/farmos | /farmos status)
"farm", # v4.6: Farm state snapshot (/farm state)
}
def is_operator(user_id: str | None, chat_id: str | None) -> bool:
allowed_ids = [s.strip() for s in os.getenv('AGX_OPERATOR_IDS', '').split(',') if s.strip()]
allowed_chat = os.getenv('AGX_OPERATOR_CHAT_ID', '').strip()
if allowed_chat and chat_id and str(chat_id) != allowed_chat:
return False
if allowed_ids and user_id and str(user_id) in allowed_ids:
return True
return False
def parse_operator_command(text: str):
parts = shlex.split(text)
if not parts:
return None
cmd = parts[0].lstrip('/')
args = parts[1:]
if cmd not in OPERATOR_COMMANDS:
return None
return {"cmd": cmd, "args": args}
def _wrap(summary: str, details: dict | None = None):
return {
"status": "ok",
"summary": summary,
"artifacts": [],
"tool_calls": [],
"next_actions": [],
"details": details or {}
}
def _extract_ref(text: str) -> str | None:
m = re.search(r"pending\.jsonl:\d+", text)
return m.group(0) if m else None
def _extract_index(text: str) -> int | None:
m = re.search(r"(\d{1,3})", text)
if not m:
return None
try:
return int(m.group(1))
except Exception:
return None
def _extract_limit(text: str, default: int = 10) -> int:
m = re.search(r"(?:до|limit)\s*(\d{1,3})", text)
if m:
try:
return int(m.group(1))
except Exception:
return default
m = re.search(r"(\d{1,3})", text)
if m:
try:
return int(m.group(1))
except Exception:
return default
return default
def _extract_category(text: str) -> str | None:
t = text.lower()
if 'один' in t or 'unit' in t:
return 'unit'
if 'операц' in t or 'operation' in t:
return 'operation'
if 'культур' in t or 'crop' in t:
return 'crop'
if 'матеріал' in t or 'material' in t:
return 'material'
if 'поле' in t or 'field' in t:
return 'field'
return None
def _extract_canonical_id(text: str) -> str | None:
m = re.search(r"(?:як|as)\s+([\w\-:.]+)", text, flags=re.IGNORECASE)
if m:
return m.group(1)
return None
def _extract_reason(text: str) -> str:
if ':' in text:
return text.split(':', 1)[1].strip()
m = re.search(r"(?:бо|через)\s+(.+)$", text, flags=re.IGNORECASE)
if m:
return m.group(1).strip()
return ''
def _resolve_ref_by_index(last_list: list | None, idx: int | None) -> str | None:
if not last_list or not idx:
return None
if idx < 1 or idx > len(last_list):
return None
item = last_list[idx - 1]
return item.get('pending_ref') or item.get('ref')
def handle_whoami(user_id: str | None, chat_id: str | None):
summary = "user_id: {}\nchat_id: {}".format(user_id or '', chat_id or '')
return _wrap(summary)
def handle_pending(limit=10, category=None):
items = review.list_pending(limit=limit, category=category)
lines = []
pending_items = []
for item in items:
sug = item.get('suggestions', [])[:5]
sug_s = ', '.join([f"{s['id']}({s['score']:.2f})" for s in sug])
lines.append(f"{item['pending_ref']} | {item.get('category')} | {item.get('raw_term')} | {sug_s}")
pending_items.append({
'pending_ref': item.get('pending_ref'),
'pending_id': item.get('pending_id'),
'raw_term': item.get('raw_term'),
'category': item.get('category'),
'suggestions': [{"id": s.get('id'), "score": s.get('score')} for s in sug]
})
summary = "\n".join(lines) if lines else "Немає pending записів."
return _wrap(summary, {"count": len(items), "pending_items": pending_items})
def handle_pending_show(ref: str):
detail = review.get_pending_detail(ref)
if not detail:
return _wrap('Не знайдено pending запис')
lines = [
f"ref: {detail.get('ref')}",
f"category: {detail.get('category')}",
f"raw_term: {detail.get('raw_term')}",
f"ts: {detail.get('ts')}",
f"status: {detail.get('status')}"
]
if detail.get('decision'):
lines.append(f"decision: {detail.get('decision')}")
if detail.get('reason'):
lines.append(f"reason: {detail.get('reason')}")
lines.append('suggestions:')
suggestions = detail.get('suggestions') or []
if suggestions:
for s in suggestions:
score = s.get('score')
score_s = f"{score:.2f}" if isinstance(score, (int, float)) else "n/a"
lines.append(f"- {s.get('id')} ({score_s})")
else:
lines.append('- (none)')
return _wrap("\n".join(lines), detail)
def handle_approve(ref, map_to=None, create_new=None, category=None, name=None, id_=None, apply=False):
if map_to:
action = {"type": "map_to_existing", "canonical_id": map_to}
elif create_new:
action = {"type": "create_new_entry", "canonical_id": id_ or '', "canonical_name": name or ''}
else:
action = {"type": "add_synonym", "canonical_id": map_to}
res = review.approve_pending(ref, action)
if apply:
review.apply_resolutions()
return _wrap(f"approved {ref}", {"record": res, "applied": apply})
def handle_reject(ref, reason):
res = review.reject_pending(ref, reason)
return _wrap(f"rejected {ref}", {"record": res})
def handle_apply():
count = review.apply_resolutions()
return _wrap(f"applied {count}")
def handle_stats():
stats = review.stats()
return _wrap(f"open={stats['open']} approved={stats['approved']} rejected={stats['rejected']}", stats)
def route_operator_text(text: str, user_id: str | None, chat_id: str | None, last_pending_list: list | None = None):
if not is_operator(user_id, chat_id):
return {
"status": "error",
"summary": "Недостатньо прав",
"artifacts": [],
"tool_calls": [],
"next_actions": []
}
t = text.strip().lower()
# list pending
if any(k in t for k in ['покажи', 'показати', 'невпізнан', 'непізнан', 'pending', 'невідом']):
category = _extract_category(t)
limit = _extract_limit(t, default=10)
if limit < 1:
limit = 1
if limit > 100:
limit = 100
if category and category not in CATEGORIES:
return _wrap('unknown category')
return handle_pending(limit=limit, category=category)
# stats
if any(k in t for k in ['статист', 'скільки pending', 'pending stats']):
return handle_stats()
# apply
if any(k in t for k in ['застосуй', 'apply', 'застосувати', 'застосуй зміни']):
return handle_apply()
# details
if any(k in t for k in ['деталі', 'detail', 'подробиц', 'покажи деталі']):
ref = _extract_ref(t)
if not ref:
idx = _extract_index(t)
ref = _resolve_ref_by_index(last_pending_list, idx)
if not ref:
return _wrap('Немає ref або контексту для деталей')
return handle_pending_show(ref)
# approve
if any(k in t for k in ['підтверд', 'approve', 'схвали']):
ref = _extract_ref(t)
if not ref:
idx = _extract_index(t)
ref = _resolve_ref_by_index(last_pending_list, idx)
if not ref:
return _wrap('Немає ref або контексту для підтвердження')
canonical_id = _extract_canonical_id(t)
if not canonical_id:
return _wrap('Вкажіть canonical_id після "як"')
return handle_approve(ref, map_to=canonical_id)
# reject
if any(k in t for k in ['відхил', 'reject', 'забракуй']):
ref = _extract_ref(t)
if not ref:
idx = _extract_index(t)
ref = _resolve_ref_by_index(last_pending_list, idx)
if not ref:
return _wrap('Немає ref або контексту для відхилення')
reason = _extract_reason(text)
if not reason:
return _wrap('Вкажіть причину відхилення')
return handle_reject(ref, reason)
return None
def route_operator_command(text: str, user_id: str | None, chat_id: str | None):
parsed = parse_operator_command(text)
if not parsed:
return None
if not is_operator(user_id, chat_id):
return {
"status": "error",
"summary": "Недостатньо прав",
"artifacts": [],
"tool_calls": [],
"next_actions": []
}
cmd = parsed['cmd']
args = parsed['args']
if cmd == 'whoami':
return handle_whoami(user_id, chat_id)
if cmd == 'pending_show':
if not args:
return _wrap('Потрібен ref')
return handle_pending_show(args[0])
if cmd == 'pending':
limit = 10
category = None
i = 0
while i < len(args):
if args[i] == '--limit' and i + 1 < len(args):
try:
limit = int(args[i+1])
except Exception:
limit = 10
i += 2
continue
if args[i] == '--category' and i + 1 < len(args):
category = args[i+1]
i += 2
continue
i += 1
# clamp
if limit < 1:
limit = 1
if limit > 100:
limit = 100
if category and category not in CATEGORIES:
return _wrap('unknown category')
return handle_pending(limit=limit, category=category)
if cmd == 'approve':
if not args:
return _wrap('missing ref')
ref = args[0]
map_to = None
create_new = False
category = None
name = None
id_ = None
apply = False
i = 1
while i < len(args):
if args[i] == 'map_to':
map_to = args[i+1]
i += 2
elif args[i] == '--apply':
apply = True
i += 1
elif args[i] == 'create_new':
create_new = True
i += 1
elif args[i] == 'category':
category = args[i+1]
i += 2
elif args[i] == 'name':
name = args[i+1]
i += 2
elif args[i] == 'id':
id_ = args[i+1]
i += 2
else:
i += 1
if apply:
allow_apply = os.getenv('AGX_ALLOW_APPLY', '0') == '1' or os.getenv('AGX_OPS_MODE', '0') == '1'
if not allow_apply:
return _wrap('apply_not_allowed')
return handle_approve(ref, map_to=map_to, create_new=create_new, category=category, name=name, id_=id_, apply=apply)
if cmd == 'reject':
if len(args) < 2:
return _wrap('missing ref or reason')
ref = args[0]
reason = ' '.join(args[1:])
return handle_reject(ref, reason)
if cmd == 'apply_dict':
return handle_apply()
if cmd == 'pending_stats':
return handle_stats()
# ── /doc [on|off|status] (v3.5: Doc Focus Gate) ─────────────────────────
if cmd == 'doc':
sub = args[0].lower() if args else "status"
from crews.agromatrix_crew.doc_focus import handle_doc_focus as _hdf
return _hdf(sub, chat_id=chat_id)
# ── /farmos [status] (v4.3: FarmOS healthcheck) ──────────────────────────
if cmd == 'farmos':
return handle_farmos_status(args)
# ── /farm state (v4.6: FarmOS → Farm State Snapshot) ─────────────────────
if cmd == 'farm':
return handle_farm_command(args, chat_id=chat_id)
return _wrap('unknown command')
def handle_farmos_status(args: list) -> dict:
"""
/farmos [status|logs [log_type] [limit]] — FarmOS diagnostics.
Fail-closed: будь-яка внутрішня помилка → зрозумілий текст.
Не виводить URL або токени.
Subcommands:
(no args) | status — healthcheck ping
logs [log_type] [limit] — останні записи farmOS
"""
sub = args[0].lower() if args else "status"
# ── /farmos logs [log_type] [limit] ──────────────────────────────────────
if sub == "logs":
log_type = "activity"
limit = 10
if len(args) >= 2:
log_type = args[1].lower()
if len(args) >= 3:
try:
limit = int(args[2])
except ValueError:
pass
try:
from agromatrix_tools.tool_farmos_read import _farmos_read_logs_impl
result_text = _farmos_read_logs_impl(log_type=log_type, limit=limit)
except Exception as exc:
result_text = f"FarmOS logs: внутрішня помилка ({type(exc).__name__})."
_tlog_farmos_cmd("farmos_logs_cmd", ok=not result_text.startswith("FarmOS:"),
sub="logs", log_type=log_type)
return _wrap(result_text)
# ── /farmos або /farmos status ────────────────────────────────────────────
if sub not in ("status",):
return _wrap(
"Команда farmos: підтримується /farmos, /farmos status або /farmos logs [log_type] [limit]."
)
try:
from agromatrix_tools.tool_farmos_read import _farmos_ping_impl
status_text = _farmos_ping_impl()
except Exception as exc:
status_text = f"FarmOS status недоступний: внутрішня помилка виконання ({type(exc).__name__})."
ok = status_text.startswith("FarmOS доступний")
_tlog_farmos_cmd("farmos_status_cmd", ok=ok, sub="status")
return _wrap(status_text)
def _tlog_farmos_cmd(event: str, ok: bool, sub: str = "", log_type: str = "") -> None:
"""PII-safe telemetry для farmos operator commands."""
try:
import logging as _logging
extra = f" sub={sub}" if sub else ""
extra += f" log_type={log_type}" if log_type else ""
_logging.getLogger(__name__).info(
"AGX_STEPAN_METRIC %s ok=%s%s", event, ok, extra,
)
except Exception:
pass
# ── v4.6: /farm state — FarmOS → Farm State Snapshot ─────────────────────────
#
# Smoke checklist (manual):
# /farmos → FarmOS ping still works (regression)
# /farm → "підтримується тільки /farm state"
# /farm state (no env) → "FarmOS не налаштований..."
# /farm state (env ok) → snapshot text, logs show farm_state_cmd_saved ok=true
# /farm foo → unknown subcommand message
_FARM_STATE_ASSET_QUERIES: list[tuple[str, int]] = [
("asset_land", 10),
("asset_plant", 10),
("asset_equipment", 5),
]
_FARM_STATE_LABELS: dict[str, str] = {
"asset_land": "Поля",
"asset_plant": "Культури/рослини",
"asset_equipment": "Техніка",
}
# Максимальна довжина snapshot-тексту
_FARM_STATE_MAX_CHARS = 900
def handle_farm_command(args: list, chat_id: str | None = None) -> dict:
"""
/farm state — збирає snapshot активів з FarmOS і зберігає в memory-service.
Fail-closed: будь-яка помилка → зрозумілий текст, не кидає.
Smoke checklist (manual):
/farm → only /farm state supported
/farm state (no env) → config missing message
/farm state (env) → snapshot + saved to memory
"""
sub = args[0].lower() if args else ""
if sub != "state":
return _wrap(
"Команда farm: підтримується тільки /farm state."
)
try:
return _handle_farm_state(chat_id=chat_id)
except Exception as exc:
import logging as _logging
_logging.getLogger(__name__).warning(
"handle_farm_command unexpected error: %s", exc
)
return _wrap("Farm state: не вдалося отримати дані (перевір FarmOS / мережу).")
def _handle_farm_state(chat_id: str | None) -> dict:
"""Ядро логіки /farm state. Викликається тільки з handle_farm_command."""
import logging as _logging
_log = _logging.getLogger(__name__)
_log.info("AGX_STEPAN_METRIC farm_state_cmd_started chat_id=h:%s",
str(chat_id or "")[:6])
# ── Крок 1: перевірка FarmOS доступності ─────────────────────────────────
try:
from agromatrix_tools.tool_farmos_read import _farmos_ping_impl
ping_result = _farmos_ping_impl()
except Exception as exc:
ping_result = f"FarmOS: помилка перевірки ({type(exc).__name__})."
if not ping_result.startswith("FarmOS доступний"):
return _wrap(ping_result)
# ── Крок 2: запит активів по трьох типах ─────────────────────────────────
counts: dict[str, int] = {}
tops: dict[str, list[str]] = {}
try:
from agromatrix_tools.tool_farmos_read import _farmos_search_assets_impl
except Exception:
return _wrap("Farm state: не вдалося отримати дані (agromatrix_tools недоступні).")
for asset_type, limit in _FARM_STATE_ASSET_QUERIES:
try:
raw = _farmos_search_assets_impl(asset_type=asset_type, limit=limit)
items = _parse_asset_lines(raw)
except Exception:
items = []
counts[asset_type] = len(items)
# top-3 labels (тільки назва, без UUID)
tops[asset_type] = [_label_from_asset_line(ln) for ln in items[:3]]
# ── Крок 3: формуємо snapshot-текст ──────────────────────────────────────
snapshot_text = _build_snapshot_text(counts, tops)
# ── Крок 4: зберігаємо в memory-service ──────────────────────────────────
save_ok = False
save_suffix = ""
if chat_id:
save_ok = _save_farm_state_snapshot(chat_id, counts, tops, snapshot_text)
if not save_ok:
save_suffix = "\n(Не зміг зберегти в пам'ять.)"
_log.info(
"AGX_STEPAN_METRIC farm_state_cmd_saved ok=%s reason=%s "
"land=%s plant=%s equip=%s",
save_ok,
"saved" if save_ok else ("no_chat_id" if not chat_id else "memory_error"),
counts.get("asset_land", 0),
counts.get("asset_plant", 0),
counts.get("asset_equipment", 0),
)
return _wrap(snapshot_text + save_suffix)
def _parse_asset_lines(raw: str) -> list[str]:
"""
Парсить рядки виду "- label | type | id=xxxx" або "FarmOS: ...".
Повертає лише рядки що починаються з "- ".
Fail-safe: якщо raw не такого формату — повертає [].
"""
try:
lines = [ln.strip() for ln in str(raw).split("\n") if ln.strip().startswith("- ")]
return lines
except Exception:
return []
def _label_from_asset_line(line: str) -> str:
"""
Витягує label з рядка "- label | type | id=xxxx".
Повертає перший сегмент після "- ", обрізаний.
"""
try:
content = line.lstrip("- ").strip()
return content.split("|")[0].strip()
except Exception:
return line.strip()
def _build_snapshot_text(
counts: dict[str, int],
tops: dict[str, list[str]],
) -> str:
"""Формує human-readable snapshot ≤ _FARM_STATE_MAX_CHARS символів."""
total = sum(counts.values())
if total == 0:
return (
"FarmOS: немає даних по assets "
"(або типи відрізняються у вашій інстанції)."
)
lines = ["Farm state (FarmOS):"]
for asset_type, label in _FARM_STATE_LABELS.items():
n = counts.get(asset_type, 0)
top = tops.get(asset_type, [])
if top:
top_str = ", ".join(top[:3])
lines.append(f"- {label}: {n} (топ: {top_str})")
else:
lines.append(f"- {label}: {n}")
text = "\n".join(lines)
# Детермінований hard cap
if len(text) > _FARM_STATE_MAX_CHARS:
text = text[:_FARM_STATE_MAX_CHARS].rsplit("\n", 1)[0]
return text
def _save_farm_state_snapshot(
chat_id: str,
counts: dict[str, int],
tops: dict[str, list[str]],
snapshot_text: str,
) -> bool:
"""
Зберігає snapshot у memory-service під ключем
farm_state:agromatrix:chat:{chat_id}.
Fail-closed: повертає True/False, не кидає.
"""
try:
from datetime import datetime, timezone
fact_key = f"farm_state:agromatrix:chat:{chat_id}"
# synthetic_uid — той самий паттерн що в memory_manager.py
synthetic_uid = f"farm:{chat_id}"
payload = {
"_version": 1,
"source": "farmos",
"generated_at": datetime.now(timezone.utc).isoformat(),
"counts": counts,
"top": tops,
"text": snapshot_text,
}
import os
import json
import httpx
mem_url = os.getenv(
"AGX_MEMORY_SERVICE_URL",
os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000"),
)
resp = httpx.post(
f"{mem_url}/facts/upsert",
json={
"user_id": synthetic_uid,
"fact_key": fact_key,
"fact_value_json": payload,
},
timeout=3.0,
)
return resp.status_code in (200, 201)
except Exception as exc:
import logging as _logging
_logging.getLogger(__name__).debug(
"farm_state snapshot save failed: %s", exc
)
return False