NATS wildcards (node.*.capabilities.get) only work for subscriptions, not for publish. Switch to a dedicated broadcast subject (fabric.capabilities.discover) that all NCS instances subscribe to, enabling proper scatter-gather discovery across nodes. Made-with: Cursor
686 lines
24 KiB
Python
686 lines
24 KiB
Python
"""
|
||
Operator commands for AgroMatrix (Stepan). Access control and slash commands.
|
||
|
||
Access control (env, used by gateway and here):
|
||
- AGX_OPERATOR_IDS: comma-separated Telegram user_id list; only these users are operators.
|
||
- AGX_OPERATOR_CHAT_ID: optional; if set, operator actions allowed only in this chat_id.
|
||
|
||
When is_operator(user_id, chat_id) is True, gateway routes any message (not only slash)
|
||
to Stepan for human-friendly operator interaction.
|
||
"""
|
||
import os
|
||
import re
|
||
import shlex
|
||
|
||
from agromatrix_tools import tool_dictionary_review as review
|
||
|
||
CATEGORIES = {"field","crop","operation","material","unit"}
|
||
|
||
# Only these slash-commands are treated as operator commands.
|
||
# Everything else (e.g. /start, /agromatrix) must fall through to the normal agent flow.
|
||
OPERATOR_COMMANDS = {
|
||
"whoami",
|
||
"pending",
|
||
"pending_show",
|
||
"approve",
|
||
"reject",
|
||
"apply_dict",
|
||
"pending_stats",
|
||
"doc", # v3.5: Doc Focus Gate control (/doc on|off|status)
|
||
"farmos", # v4.3: FarmOS healthcheck (/farmos | /farmos status)
|
||
"farm", # v4.6: Farm state snapshot (/farm state)
|
||
}
|
||
|
||
|
||
def is_operator(user_id: str | None, chat_id: str | None) -> bool:
|
||
allowed_ids = [s.strip() for s in os.getenv('AGX_OPERATOR_IDS', '').split(',') if s.strip()]
|
||
allowed_chat = os.getenv('AGX_OPERATOR_CHAT_ID', '').strip()
|
||
if allowed_chat and chat_id and str(chat_id) != allowed_chat:
|
||
return False
|
||
if allowed_ids and user_id and str(user_id) in allowed_ids:
|
||
return True
|
||
return False
|
||
|
||
|
||
def parse_operator_command(text: str):
|
||
parts = shlex.split(text)
|
||
if not parts:
|
||
return None
|
||
cmd = parts[0].lstrip('/')
|
||
args = parts[1:]
|
||
if cmd not in OPERATOR_COMMANDS:
|
||
return None
|
||
return {"cmd": cmd, "args": args}
|
||
|
||
|
||
def _wrap(summary: str, details: dict | None = None):
|
||
return {
|
||
"status": "ok",
|
||
"summary": summary,
|
||
"artifacts": [],
|
||
"tool_calls": [],
|
||
"next_actions": [],
|
||
"details": details or {}
|
||
}
|
||
|
||
|
||
def _extract_ref(text: str) -> str | None:
|
||
m = re.search(r"pending\.jsonl:\d+", text)
|
||
return m.group(0) if m else None
|
||
|
||
|
||
def _extract_index(text: str) -> int | None:
|
||
m = re.search(r"(\d{1,3})", text)
|
||
if not m:
|
||
return None
|
||
try:
|
||
return int(m.group(1))
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _extract_limit(text: str, default: int = 10) -> int:
|
||
m = re.search(r"(?:до|limit)\s*(\d{1,3})", text)
|
||
if m:
|
||
try:
|
||
return int(m.group(1))
|
||
except Exception:
|
||
return default
|
||
m = re.search(r"(\d{1,3})", text)
|
||
if m:
|
||
try:
|
||
return int(m.group(1))
|
||
except Exception:
|
||
return default
|
||
return default
|
||
|
||
|
||
def _extract_category(text: str) -> str | None:
|
||
t = text.lower()
|
||
if 'один' in t or 'unit' in t:
|
||
return 'unit'
|
||
if 'операц' in t or 'operation' in t:
|
||
return 'operation'
|
||
if 'культур' in t or 'crop' in t:
|
||
return 'crop'
|
||
if 'матеріал' in t or 'material' in t:
|
||
return 'material'
|
||
if 'поле' in t or 'field' in t:
|
||
return 'field'
|
||
return None
|
||
|
||
|
||
def _extract_canonical_id(text: str) -> str | None:
|
||
m = re.search(r"(?:як|as)\s+([\w\-:.]+)", text, flags=re.IGNORECASE)
|
||
if m:
|
||
return m.group(1)
|
||
return None
|
||
|
||
|
||
def _extract_reason(text: str) -> str:
|
||
if ':' in text:
|
||
return text.split(':', 1)[1].strip()
|
||
m = re.search(r"(?:бо|через)\s+(.+)$", text, flags=re.IGNORECASE)
|
||
if m:
|
||
return m.group(1).strip()
|
||
return ''
|
||
|
||
|
||
def _resolve_ref_by_index(last_list: list | None, idx: int | None) -> str | None:
|
||
if not last_list or not idx:
|
||
return None
|
||
if idx < 1 or idx > len(last_list):
|
||
return None
|
||
item = last_list[idx - 1]
|
||
return item.get('pending_ref') or item.get('ref')
|
||
|
||
|
||
def handle_whoami(user_id: str | None, chat_id: str | None):
|
||
summary = "user_id: {}\nchat_id: {}".format(user_id or '', chat_id or '')
|
||
return _wrap(summary)
|
||
|
||
|
||
def handle_pending(limit=10, category=None):
|
||
items = review.list_pending(limit=limit, category=category)
|
||
lines = []
|
||
pending_items = []
|
||
for item in items:
|
||
sug = item.get('suggestions', [])[:5]
|
||
sug_s = ', '.join([f"{s['id']}({s['score']:.2f})" for s in sug])
|
||
lines.append(f"{item['pending_ref']} | {item.get('category')} | {item.get('raw_term')} | {sug_s}")
|
||
pending_items.append({
|
||
'pending_ref': item.get('pending_ref'),
|
||
'pending_id': item.get('pending_id'),
|
||
'raw_term': item.get('raw_term'),
|
||
'category': item.get('category'),
|
||
'suggestions': [{"id": s.get('id'), "score": s.get('score')} for s in sug]
|
||
})
|
||
summary = "\n".join(lines) if lines else "Немає pending записів."
|
||
return _wrap(summary, {"count": len(items), "pending_items": pending_items})
|
||
|
||
|
||
def handle_pending_show(ref: str):
|
||
detail = review.get_pending_detail(ref)
|
||
if not detail:
|
||
return _wrap('Не знайдено pending запис')
|
||
|
||
lines = [
|
||
f"ref: {detail.get('ref')}",
|
||
f"category: {detail.get('category')}",
|
||
f"raw_term: {detail.get('raw_term')}",
|
||
f"ts: {detail.get('ts')}",
|
||
f"status: {detail.get('status')}"
|
||
]
|
||
if detail.get('decision'):
|
||
lines.append(f"decision: {detail.get('decision')}")
|
||
if detail.get('reason'):
|
||
lines.append(f"reason: {detail.get('reason')}")
|
||
|
||
lines.append('suggestions:')
|
||
suggestions = detail.get('suggestions') or []
|
||
if suggestions:
|
||
for s in suggestions:
|
||
score = s.get('score')
|
||
score_s = f"{score:.2f}" if isinstance(score, (int, float)) else "n/a"
|
||
lines.append(f"- {s.get('id')} ({score_s})")
|
||
else:
|
||
lines.append('- (none)')
|
||
|
||
return _wrap("\n".join(lines), detail)
|
||
|
||
def handle_approve(ref, map_to=None, create_new=None, category=None, name=None, id_=None, apply=False):
|
||
if map_to:
|
||
action = {"type": "map_to_existing", "canonical_id": map_to}
|
||
elif create_new:
|
||
action = {"type": "create_new_entry", "canonical_id": id_ or '', "canonical_name": name or ''}
|
||
else:
|
||
action = {"type": "add_synonym", "canonical_id": map_to}
|
||
res = review.approve_pending(ref, action)
|
||
if apply:
|
||
review.apply_resolutions()
|
||
return _wrap(f"approved {ref}", {"record": res, "applied": apply})
|
||
|
||
|
||
def handle_reject(ref, reason):
|
||
res = review.reject_pending(ref, reason)
|
||
return _wrap(f"rejected {ref}", {"record": res})
|
||
|
||
|
||
def handle_apply():
|
||
count = review.apply_resolutions()
|
||
return _wrap(f"applied {count}")
|
||
|
||
|
||
def handle_stats():
|
||
stats = review.stats()
|
||
return _wrap(f"open={stats['open']} approved={stats['approved']} rejected={stats['rejected']}", stats)
|
||
|
||
|
||
def route_operator_text(text: str, user_id: str | None, chat_id: str | None, last_pending_list: list | None = None):
|
||
if not is_operator(user_id, chat_id):
|
||
return {
|
||
"status": "error",
|
||
"summary": "Недостатньо прав",
|
||
"artifacts": [],
|
||
"tool_calls": [],
|
||
"next_actions": []
|
||
}
|
||
|
||
t = text.strip().lower()
|
||
|
||
# list pending
|
||
if any(k in t for k in ['покажи', 'показати', 'невпізнан', 'непізнан', 'pending', 'невідом']):
|
||
category = _extract_category(t)
|
||
limit = _extract_limit(t, default=10)
|
||
if limit < 1:
|
||
limit = 1
|
||
if limit > 100:
|
||
limit = 100
|
||
if category and category not in CATEGORIES:
|
||
return _wrap('unknown category')
|
||
return handle_pending(limit=limit, category=category)
|
||
|
||
# stats
|
||
if any(k in t for k in ['статист', 'скільки pending', 'pending stats']):
|
||
return handle_stats()
|
||
|
||
# apply
|
||
if any(k in t for k in ['застосуй', 'apply', 'застосувати', 'застосуй зміни']):
|
||
return handle_apply()
|
||
|
||
# details
|
||
if any(k in t for k in ['деталі', 'detail', 'подробиц', 'покажи деталі']):
|
||
ref = _extract_ref(t)
|
||
if not ref:
|
||
idx = _extract_index(t)
|
||
ref = _resolve_ref_by_index(last_pending_list, idx)
|
||
if not ref:
|
||
return _wrap('Немає ref або контексту для деталей')
|
||
return handle_pending_show(ref)
|
||
|
||
# approve
|
||
if any(k in t for k in ['підтверд', 'approve', 'схвали']):
|
||
ref = _extract_ref(t)
|
||
if not ref:
|
||
idx = _extract_index(t)
|
||
ref = _resolve_ref_by_index(last_pending_list, idx)
|
||
if not ref:
|
||
return _wrap('Немає ref або контексту для підтвердження')
|
||
canonical_id = _extract_canonical_id(t)
|
||
if not canonical_id:
|
||
return _wrap('Вкажіть canonical_id після "як"')
|
||
return handle_approve(ref, map_to=canonical_id)
|
||
|
||
# reject
|
||
if any(k in t for k in ['відхил', 'reject', 'забракуй']):
|
||
ref = _extract_ref(t)
|
||
if not ref:
|
||
idx = _extract_index(t)
|
||
ref = _resolve_ref_by_index(last_pending_list, idx)
|
||
if not ref:
|
||
return _wrap('Немає ref або контексту для відхилення')
|
||
reason = _extract_reason(text)
|
||
if not reason:
|
||
return _wrap('Вкажіть причину відхилення')
|
||
return handle_reject(ref, reason)
|
||
|
||
return None
|
||
|
||
|
||
def route_operator_command(text: str, user_id: str | None, chat_id: str | None):
|
||
parsed = parse_operator_command(text)
|
||
if not parsed:
|
||
return None
|
||
if not is_operator(user_id, chat_id):
|
||
return {
|
||
"status": "error",
|
||
"summary": "Недостатньо прав",
|
||
"artifacts": [],
|
||
"tool_calls": [],
|
||
"next_actions": []
|
||
}
|
||
|
||
cmd = parsed['cmd']
|
||
args = parsed['args']
|
||
|
||
if cmd == 'whoami':
|
||
return handle_whoami(user_id, chat_id)
|
||
|
||
if cmd == 'pending_show':
|
||
if not args:
|
||
return _wrap('Потрібен ref')
|
||
return handle_pending_show(args[0])
|
||
|
||
if cmd == 'pending':
|
||
limit = 10
|
||
category = None
|
||
i = 0
|
||
while i < len(args):
|
||
if args[i] == '--limit' and i + 1 < len(args):
|
||
try:
|
||
limit = int(args[i+1])
|
||
except Exception:
|
||
limit = 10
|
||
i += 2
|
||
continue
|
||
if args[i] == '--category' and i + 1 < len(args):
|
||
category = args[i+1]
|
||
i += 2
|
||
continue
|
||
i += 1
|
||
# clamp
|
||
if limit < 1:
|
||
limit = 1
|
||
if limit > 100:
|
||
limit = 100
|
||
if category and category not in CATEGORIES:
|
||
return _wrap('unknown category')
|
||
return handle_pending(limit=limit, category=category)
|
||
|
||
if cmd == 'approve':
|
||
if not args:
|
||
return _wrap('missing ref')
|
||
ref = args[0]
|
||
map_to = None
|
||
create_new = False
|
||
category = None
|
||
name = None
|
||
id_ = None
|
||
apply = False
|
||
i = 1
|
||
while i < len(args):
|
||
if args[i] == 'map_to':
|
||
map_to = args[i+1]
|
||
i += 2
|
||
elif args[i] == '--apply':
|
||
apply = True
|
||
i += 1
|
||
elif args[i] == 'create_new':
|
||
create_new = True
|
||
i += 1
|
||
elif args[i] == 'category':
|
||
category = args[i+1]
|
||
i += 2
|
||
elif args[i] == 'name':
|
||
name = args[i+1]
|
||
i += 2
|
||
elif args[i] == 'id':
|
||
id_ = args[i+1]
|
||
i += 2
|
||
else:
|
||
i += 1
|
||
if apply:
|
||
allow_apply = os.getenv('AGX_ALLOW_APPLY', '0') == '1' or os.getenv('AGX_OPS_MODE', '0') == '1'
|
||
if not allow_apply:
|
||
return _wrap('apply_not_allowed')
|
||
return handle_approve(ref, map_to=map_to, create_new=create_new, category=category, name=name, id_=id_, apply=apply)
|
||
|
||
if cmd == 'reject':
|
||
if len(args) < 2:
|
||
return _wrap('missing ref or reason')
|
||
ref = args[0]
|
||
reason = ' '.join(args[1:])
|
||
return handle_reject(ref, reason)
|
||
|
||
if cmd == 'apply_dict':
|
||
return handle_apply()
|
||
|
||
if cmd == 'pending_stats':
|
||
return handle_stats()
|
||
|
||
# ── /doc [on|off|status] (v3.5: Doc Focus Gate) ─────────────────────────
|
||
if cmd == 'doc':
|
||
sub = args[0].lower() if args else "status"
|
||
from crews.agromatrix_crew.doc_focus import handle_doc_focus as _hdf
|
||
return _hdf(sub, chat_id=chat_id)
|
||
|
||
# ── /farmos [status] (v4.3: FarmOS healthcheck) ──────────────────────────
|
||
if cmd == 'farmos':
|
||
return handle_farmos_status(args)
|
||
|
||
# ── /farm state (v4.6: FarmOS → Farm State Snapshot) ─────────────────────
|
||
if cmd == 'farm':
|
||
return handle_farm_command(args, chat_id=chat_id)
|
||
|
||
return _wrap('unknown command')
|
||
|
||
|
||
def handle_farmos_status(args: list) -> dict:
|
||
"""
|
||
/farmos [status|logs [log_type] [limit]] — FarmOS diagnostics.
|
||
Fail-closed: будь-яка внутрішня помилка → зрозумілий текст.
|
||
Не виводить URL або токени.
|
||
|
||
Subcommands:
|
||
(no args) | status — healthcheck ping
|
||
logs [log_type] [limit] — останні записи farmOS
|
||
"""
|
||
sub = args[0].lower() if args else "status"
|
||
|
||
# ── /farmos logs [log_type] [limit] ──────────────────────────────────────
|
||
if sub == "logs":
|
||
log_type = "activity"
|
||
limit = 10
|
||
if len(args) >= 2:
|
||
log_type = args[1].lower()
|
||
if len(args) >= 3:
|
||
try:
|
||
limit = int(args[2])
|
||
except ValueError:
|
||
pass
|
||
try:
|
||
from agromatrix_tools.tool_farmos_read import _farmos_read_logs_impl
|
||
result_text = _farmos_read_logs_impl(log_type=log_type, limit=limit)
|
||
except Exception as exc:
|
||
result_text = f"FarmOS logs: внутрішня помилка ({type(exc).__name__})."
|
||
_tlog_farmos_cmd("farmos_logs_cmd", ok=not result_text.startswith("FarmOS:"),
|
||
sub="logs", log_type=log_type)
|
||
return _wrap(result_text)
|
||
|
||
# ── /farmos або /farmos status ────────────────────────────────────────────
|
||
if sub not in ("status",):
|
||
return _wrap(
|
||
"Команда farmos: підтримується /farmos, /farmos status або /farmos logs [log_type] [limit]."
|
||
)
|
||
|
||
try:
|
||
from agromatrix_tools.tool_farmos_read import _farmos_ping_impl
|
||
status_text = _farmos_ping_impl()
|
||
except Exception as exc:
|
||
status_text = f"FarmOS status недоступний: внутрішня помилка виконання ({type(exc).__name__})."
|
||
|
||
ok = status_text.startswith("FarmOS доступний")
|
||
_tlog_farmos_cmd("farmos_status_cmd", ok=ok, sub="status")
|
||
return _wrap(status_text)
|
||
|
||
|
||
def _tlog_farmos_cmd(event: str, ok: bool, sub: str = "", log_type: str = "") -> None:
|
||
"""PII-safe telemetry для farmos operator commands."""
|
||
try:
|
||
import logging as _logging
|
||
extra = f" sub={sub}" if sub else ""
|
||
extra += f" log_type={log_type}" if log_type else ""
|
||
_logging.getLogger(__name__).info(
|
||
"AGX_STEPAN_METRIC %s ok=%s%s", event, ok, extra,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ── v4.6: /farm state — FarmOS → Farm State Snapshot ─────────────────────────
|
||
#
|
||
# Smoke checklist (manual):
|
||
# /farmos → FarmOS ping still works (regression)
|
||
# /farm → "підтримується тільки /farm state"
|
||
# /farm state (no env) → "FarmOS не налаштований..."
|
||
# /farm state (env ok) → snapshot text, logs show farm_state_cmd_saved ok=true
|
||
# /farm foo → unknown subcommand message
|
||
|
||
_FARM_STATE_ASSET_QUERIES: list[tuple[str, int]] = [
|
||
("asset_land", 10),
|
||
("asset_plant", 10),
|
||
("asset_equipment", 5),
|
||
]
|
||
|
||
_FARM_STATE_LABELS: dict[str, str] = {
|
||
"asset_land": "Поля",
|
||
"asset_plant": "Культури/рослини",
|
||
"asset_equipment": "Техніка",
|
||
}
|
||
|
||
# Максимальна довжина snapshot-тексту
|
||
_FARM_STATE_MAX_CHARS = 900
|
||
|
||
|
||
def handle_farm_command(args: list, chat_id: str | None = None) -> dict:
|
||
"""
|
||
/farm state — збирає snapshot активів з FarmOS і зберігає в memory-service.
|
||
Fail-closed: будь-яка помилка → зрозумілий текст, не кидає.
|
||
|
||
Smoke checklist (manual):
|
||
/farm → only /farm state supported
|
||
/farm state (no env) → config missing message
|
||
/farm state (env) → snapshot + saved to memory
|
||
"""
|
||
sub = args[0].lower() if args else ""
|
||
|
||
if sub != "state":
|
||
return _wrap(
|
||
"Команда farm: підтримується тільки /farm state."
|
||
)
|
||
|
||
try:
|
||
return _handle_farm_state(chat_id=chat_id)
|
||
except Exception as exc:
|
||
import logging as _logging
|
||
_logging.getLogger(__name__).warning(
|
||
"handle_farm_command unexpected error: %s", exc
|
||
)
|
||
return _wrap("Farm state: не вдалося отримати дані (перевір FarmOS / мережу).")
|
||
|
||
|
||
def _handle_farm_state(chat_id: str | None) -> dict:
|
||
"""Ядро логіки /farm state. Викликається тільки з handle_farm_command."""
|
||
import logging as _logging
|
||
_log = _logging.getLogger(__name__)
|
||
|
||
_log.info("AGX_STEPAN_METRIC farm_state_cmd_started chat_id=h:%s",
|
||
str(chat_id or "")[:6])
|
||
|
||
# ── Крок 1: перевірка FarmOS доступності ─────────────────────────────────
|
||
try:
|
||
from agromatrix_tools.tool_farmos_read import _farmos_ping_impl
|
||
ping_result = _farmos_ping_impl()
|
||
except Exception as exc:
|
||
ping_result = f"FarmOS: помилка перевірки ({type(exc).__name__})."
|
||
|
||
if not ping_result.startswith("FarmOS доступний"):
|
||
return _wrap(ping_result)
|
||
|
||
# ── Крок 2: запит активів по трьох типах ─────────────────────────────────
|
||
counts: dict[str, int] = {}
|
||
tops: dict[str, list[str]] = {}
|
||
|
||
try:
|
||
from agromatrix_tools.tool_farmos_read import _farmos_search_assets_impl
|
||
except Exception:
|
||
return _wrap("Farm state: не вдалося отримати дані (agromatrix_tools недоступні).")
|
||
|
||
for asset_type, limit in _FARM_STATE_ASSET_QUERIES:
|
||
try:
|
||
raw = _farmos_search_assets_impl(asset_type=asset_type, limit=limit)
|
||
items = _parse_asset_lines(raw)
|
||
except Exception:
|
||
items = []
|
||
counts[asset_type] = len(items)
|
||
# top-3 labels (тільки назва, без UUID)
|
||
tops[asset_type] = [_label_from_asset_line(ln) for ln in items[:3]]
|
||
|
||
# ── Крок 3: формуємо snapshot-текст ──────────────────────────────────────
|
||
snapshot_text = _build_snapshot_text(counts, tops)
|
||
|
||
# ── Крок 4: зберігаємо в memory-service ──────────────────────────────────
|
||
save_ok = False
|
||
save_suffix = ""
|
||
if chat_id:
|
||
save_ok = _save_farm_state_snapshot(chat_id, counts, tops, snapshot_text)
|
||
if not save_ok:
|
||
save_suffix = "\n(Не зміг зберегти в пам'ять.)"
|
||
|
||
_log.info(
|
||
"AGX_STEPAN_METRIC farm_state_cmd_saved ok=%s reason=%s "
|
||
"land=%s plant=%s equip=%s",
|
||
save_ok,
|
||
"saved" if save_ok else ("no_chat_id" if not chat_id else "memory_error"),
|
||
counts.get("asset_land", 0),
|
||
counts.get("asset_plant", 0),
|
||
counts.get("asset_equipment", 0),
|
||
)
|
||
|
||
return _wrap(snapshot_text + save_suffix)
|
||
|
||
|
||
def _parse_asset_lines(raw: str) -> list[str]:
|
||
"""
|
||
Парсить рядки виду "- label | type | id=xxxx" або "FarmOS: ...".
|
||
Повертає лише рядки що починаються з "- ".
|
||
Fail-safe: якщо raw не такого формату — повертає [].
|
||
"""
|
||
try:
|
||
lines = [ln.strip() for ln in str(raw).split("\n") if ln.strip().startswith("- ")]
|
||
return lines
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
def _label_from_asset_line(line: str) -> str:
|
||
"""
|
||
Витягує label з рядка "- label | type | id=xxxx".
|
||
Повертає перший сегмент після "- ", обрізаний.
|
||
"""
|
||
try:
|
||
content = line.lstrip("- ").strip()
|
||
return content.split("|")[0].strip()
|
||
except Exception:
|
||
return line.strip()
|
||
|
||
|
||
def _build_snapshot_text(
|
||
counts: dict[str, int],
|
||
tops: dict[str, list[str]],
|
||
) -> str:
|
||
"""Формує human-readable snapshot ≤ _FARM_STATE_MAX_CHARS символів."""
|
||
total = sum(counts.values())
|
||
if total == 0:
|
||
return (
|
||
"FarmOS: немає даних по assets "
|
||
"(або типи відрізняються у вашій інстанції)."
|
||
)
|
||
|
||
lines = ["Farm state (FarmOS):"]
|
||
for asset_type, label in _FARM_STATE_LABELS.items():
|
||
n = counts.get(asset_type, 0)
|
||
top = tops.get(asset_type, [])
|
||
if top:
|
||
top_str = ", ".join(top[:3])
|
||
lines.append(f"- {label}: {n} (топ: {top_str})")
|
||
else:
|
||
lines.append(f"- {label}: {n}")
|
||
|
||
text = "\n".join(lines)
|
||
# Детермінований hard cap
|
||
if len(text) > _FARM_STATE_MAX_CHARS:
|
||
text = text[:_FARM_STATE_MAX_CHARS].rsplit("\n", 1)[0]
|
||
return text
|
||
|
||
|
||
def _save_farm_state_snapshot(
|
||
chat_id: str,
|
||
counts: dict[str, int],
|
||
tops: dict[str, list[str]],
|
||
snapshot_text: str,
|
||
) -> bool:
|
||
"""
|
||
Зберігає snapshot у memory-service під ключем
|
||
farm_state:agromatrix:chat:{chat_id}.
|
||
Fail-closed: повертає True/False, не кидає.
|
||
"""
|
||
try:
|
||
from datetime import datetime, timezone
|
||
fact_key = f"farm_state:agromatrix:chat:{chat_id}"
|
||
# synthetic_uid — той самий паттерн що в memory_manager.py
|
||
synthetic_uid = f"farm:{chat_id}"
|
||
|
||
payload = {
|
||
"_version": 1,
|
||
"source": "farmos",
|
||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||
"counts": counts,
|
||
"top": tops,
|
||
"text": snapshot_text,
|
||
}
|
||
|
||
import os
|
||
import json
|
||
import httpx
|
||
mem_url = os.getenv(
|
||
"AGX_MEMORY_SERVICE_URL",
|
||
os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000"),
|
||
)
|
||
resp = httpx.post(
|
||
f"{mem_url}/facts/upsert",
|
||
json={
|
||
"user_id": synthetic_uid,
|
||
"fact_key": fact_key,
|
||
"fact_value_json": payload,
|
||
},
|
||
timeout=3.0,
|
||
)
|
||
return resp.status_code in (200, 201)
|
||
except Exception as exc:
|
||
import logging as _logging
|
||
_logging.getLogger(__name__).debug(
|
||
"farm_state snapshot save failed: %s", exc
|
||
)
|
||
return False
|