Files
microdao-daarion/crews/agromatrix_crew/run.py
Apple 90080c632a fix(fabric): use broadcast subject for NATS capabilities discovery
NATS wildcards (node.*.capabilities.get) only work for subscriptions,
not for publish. Switch to a dedicated broadcast subject
(fabric.capabilities.discover) that all NCS instances subscribe to,
enabling proper scatter-gather discovery across nodes.

Made-with: Cursor
2026-02-27 03:20:13 -08:00

945 lines
47 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sys
import os
import json
import logging
import re
import subprocess
from pathlib import Path
from crewai import Crew, Task
from crews.agromatrix_crew.agents.stepan_orchestrator import build_stepan
from crews.agromatrix_crew.agents.operations_agent import build_operations
from crews.agromatrix_crew.agents.iot_agent import build_iot
from crews.agromatrix_crew.agents.platform_agent import build_platform
from crews.agromatrix_crew.agents.spreadsheet_agent import build_spreadsheet
from crews.agromatrix_crew.agents.sustainability_agent import build_sustainability
from crews.agromatrix_crew.audit import audit_event, new_trace
from agromatrix_tools import tool_dictionary
from agromatrix_tools import tool_operation_plan
from crews.agromatrix_crew.operator_commands import route_operator_command, route_operator_text
from crews.agromatrix_crew.memory_manager import (
load_user_profile,
save_user_profile,
load_farm_profile,
save_farm_profile,
update_profile_if_needed,
)
from crews.agromatrix_crew.style_adapter import adapt_response_style, build_style_prefix
from crews.agromatrix_crew.reflection_engine import reflect_on_response
from crews.agromatrix_crew.light_reply import build_light_reply, classify_light_event
from crews.agromatrix_crew.depth_classifier import classify_depth
from crews.agromatrix_crew.telemetry import tlog
from crews.agromatrix_crew.session_context import (
load_session,
update_session,
is_doc_focus_active,
is_doc_focus_cooldown_active,
DOC_FOCUS_TTL,
DOC_FOCUS_COOLDOWN_S,
)
from crews.agromatrix_crew.proactivity import maybe_add_proactivity
from crews.agromatrix_crew.doc_facts import (
extract_doc_facts,
merge_doc_facts,
can_answer_from_facts,
compute_scenario,
format_facts_as_text,
extract_fact_claims,
build_self_correction,
)
logger = logging.getLogger(__name__)
# ── Doc Focus Gate helpers (v3.5 / v3.6 / v3.7) ────────────────────────────
# Логіка винесена в doc_focus.py (без залежностей від crewai/agromatrix_tools)
from crews.agromatrix_crew.doc_focus import ( # noqa: E402
_is_doc_question,
_detect_domain,
detect_context_signals,
build_mode_clarifier,
)
from crews.agromatrix_crew.farm_state import ( # noqa: E402
detect_farm_state_updates,
update_farm_state,
build_farm_state_prefix,
)
# ── v4.2: Vision → Agronomy Bridge ──────────────────────────────────────────
# Fail-safe lazy import: vision_guard живе в gateway-bot/, не в crews/.
# Якщо модуль недоступний (наприклад, юніт-тести без gateway-bot) — мовчки
# пропускаємо; вся логіка нижче захищена try/except.
def _vb_get_vision_lock(agent_id: str, chat_id: str) -> dict:
try:
from vision_guard import get_vision_lock as _gvl
return _gvl(agent_id, chat_id) or {}
except Exception:
return {}
# ── v4.7: FarmOS Farm State Bridge ───────────────────────────────────────────
# Читаємо збережений /farm state snapshot з memory-service.
# Fail-closed: timeout 2s, не кидає виняток, не блокує відповідь.
# Максимальний вік snapshot: 24h (86400s).
_FARM_STATE_SNAPSHOT_TTL_S = 86400.0
def _load_farm_state_snapshot(chat_id: str) -> str | None:
"""
Завантажує snapshot тексту з memory-service (ключ farm_state:agromatrix:chat:{chat_id}).
Повертає текст якщо snapshot є і не старший за 24h, інакше None.
Fail-closed: будь-яка помилка → None.
"""
try:
import httpx
import json as _json
from datetime import datetime, timezone
mem_url = os.getenv(
"AGX_MEMORY_SERVICE_URL",
os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000"),
)
fact_key = f"farm_state:agromatrix:chat:{chat_id}"
synthetic_uid = f"farm:{chat_id}"
# memory-service API: GET /facts/{fact_key}?user_id=...
# (не /facts/get — той endpoint не існує)
resp = httpx.get(
f"{mem_url}/facts/{fact_key}",
params={"user_id": synthetic_uid},
timeout=2.0,
)
if resp.status_code != 200:
return None
data = resp.json()
# fact_value_json може повертатися як рядок або dict
val = data.get("fact_value_json") or data.get("fact_value")
if isinstance(val, str):
try:
val = _json.loads(val)
except Exception:
return None
if not isinstance(val, dict):
return None
# Перевірка TTL: generated_at має бути не старішим за 24h
generated_at = val.get("generated_at", "")
if generated_at:
try:
ts = datetime.fromisoformat(generated_at.replace("Z", "+00:00"))
age_s = (datetime.now(timezone.utc) - ts).total_seconds()
if age_s > _FARM_STATE_SNAPSHOT_TTL_S:
return None
except Exception:
pass # якщо не можемо перевірити вік — все одно повертаємо snapshot
text = val.get("text", "").strip()
return text if text else None
except Exception:
return None
# ---------------------------------------------------------------------------
# Light / Deep activation layer — реалізація у depth_classifier.py
# ---------------------------------------------------------------------------
def _stepan_light_response(text: str, stepan, trace: dict, user_profile: dict | None) -> str:
"""
Відповідь Степана у Light mode.
Спочатку намагається побудувати детерміновану (seeded) відповідь без LLM —
для чітких greeting/thanks/ack/short_followup подій.
Якщо без-LLM відповідь є — повертає її одразу (нульова затримка).
Інакше — LLM одним агентом без делегування.
Логує crew_launch=false.
"""
logger.info("crew_launch=false depth=light")
# Fast path: no LLM needed for clear social events
fast_reply = build_light_reply(text, user_profile)
if fast_reply:
audit_event({**trace, 'agent': 'stepan', 'action': 'light_nollm'})
return fast_reply
# LLM path: single Stepan task, no sub-agents
task = Task(
description=(
f"Повідомлення від користувача: {text}\n\n"
"Дай коротку, природну, людську відповідь. "
"Не запускай жодних операційних інструментів. "
"Не форматуй як JSON. "
"Не починай з 'Звісно', 'Чудово', 'Дозвольте'. "
"Одне питання максимум, якщо потрібно."
),
expected_output="Коротка розмовна відповідь українською, 14 речення.",
agent=stepan,
)
crew = Crew(agents=[stepan], tasks=[task], verbose=False)
result = crew.kickoff()
audit_event({**trace, 'agent': 'stepan', 'action': 'light_response'})
return adapt_response_style(str(result), user_profile)
# ---------------------------------------------------------------------------
def farmos_ui_hint():
port = os.getenv('FARMOS_UI_PORT', '18080')
try:
out = subprocess.check_output(['docker','ps','--format','{{.Names}}'], text=True)
if 'farmos_ui_proxy' in out:
return "\n[UI] farmOS доступний локально: http://127.0.0.1:{} (basic auth)".format(port)
except Exception:
return ""
return ""
def detect_intent(text: str) -> str:
t = text.lower()
if 'сплануй' in t and 'тиж' in t:
return 'plan_week'
if 'сплануй' in t:
return 'plan_day'
if 'критично' in t or 'на завтра' in t:
return 'show_critical_tomorrow'
if 'план/факт' in t or 'план факт' in t:
return 'plan_vs_fact'
if 'закрий план' in t:
return 'close_plan'
return 'general'
def validate_payload(obj: dict):
required = ['status', 'summary', 'artifacts', 'tool_calls', 'next_actions']
for k in required:
if k not in obj:
return False, f'missing:{k}'
if obj['status'] not in ['ok', 'error']:
return False, 'bad_status'
if not isinstance(obj['summary'], str):
return False, 'summary_not_string'
if not isinstance(obj['artifacts'], list):
return False, 'artifacts_not_list'
if not isinstance(obj['tool_calls'], list):
return False, 'tool_calls_not_list'
if not isinstance(obj['next_actions'], list):
return False, 'next_actions_not_list'
return True, 'ok'
def run_task_with_retry(agent, description: str, trace_id: str, max_retries: int = 2):
instruction = "Return ONLY valid JSON matching schema in crews/agromatrix_crew/schema.json. No extra text."
last_error = ''
for attempt in range(max_retries + 1):
desc = description if attempt == 0 else (description + "\n\n" + instruction)
task = Task(
description=desc,
expected_output="JSON strictly matching schema.json",
agent=agent
)
crew = Crew(agents=[agent], tasks=[task], verbose=True)
result = crew.kickoff()
raw = str(result)
try:
data = json.loads(raw)
ok, reason = validate_payload(data)
if ok:
return data
last_error = reason
except Exception as e:
last_error = f'json_error:{e}'
audit_event({
'trace_id': trace_id,
'agent': agent.role,
'action': 'json_validation_failed',
'error': last_error
})
return {
'status': 'error',
'summary': f'JSON validation failed: {last_error}',
'artifacts': [],
'tool_calls': [],
'next_actions': []
}
def handle_message(
text: str,
user_id: str = '',
chat_id: str = '',
trace_id: str = '',
ops_mode: bool = False,
last_pending_list: list | None = None,
# injected by memory layer (Промт 3); None until that layer is wired
user_profile: dict | None = None,
farm_profile: dict | None = None,
has_doc_context: bool = False,
# Doc Bridge (v3.1): короткий контекст активного документа з gateway
doc_context: dict | None = None,
# Chat History Bridge (v3.2): текст переписки з memory-service (до 40 повідомлень)
chat_history: str = '',
) -> str:
trace = new_trace(text)
if trace_id:
trace['trace_id'] = trace_id
os.environ['AGX_TRACE_ID'] = trace['trace_id']
os.environ['AGX_USER_ID'] = str(user_id)
os.environ['AGX_CHAT_ID'] = str(chat_id)
os.environ['AGX_OPS_MODE'] = '1' if ops_mode else '0'
# Load profiles (fail-safe: always returns a valid dict)
if user_profile is None and user_id:
user_profile = load_user_profile(str(user_id))
if farm_profile is None and chat_id:
# v2.8: pass user_id for lazy legacy migration
farm_profile = load_farm_profile(str(chat_id), user_id=str(user_id) if user_id else None)
# operator commands (unchanged routing)
if text.strip().startswith('/'):
op_res = route_operator_command(text, str(user_id), str(chat_id))
if op_res:
return json.dumps(op_res, ensure_ascii=False)
elif ops_mode:
op_res = route_operator_text(text, str(user_id), str(chat_id), last_pending_list=last_pending_list)
if op_res:
return json.dumps(op_res, ensure_ascii=False)
# Load session context (v3: TTL 15 min, in-memory)
session = load_session(str(chat_id))
# ── v4: FARM STATE UPDATE ────────────────────────────────────────────────
# Оновлюємо farm_state до будь-якої іншої логіки (ізольовано, fail-safe).
# Не впливає на doc_mode, depth classifier, memory_manager.
_farm_updates = detect_farm_state_updates(text or "")
if _farm_updates:
update_farm_state(session, _farm_updates)
tlog(logger, "farm_state_updated",
chat_id=str(chat_id),
fields=",".join(_farm_updates.keys()))
# ── v4.2: VISION → AGRONOMY BRIDGE ──────────────────────────────────────
# Читаємо vision lock (per agent_id:chat_id) і зберігаємо label у session.
# Тільки якщо lock свіжий (TTL перевіряє get_vision_lock всередині).
# User override ("це соняшник") вже записаний у lock.user_label.
# Fail-safe: будь-яка помилка → пропускаємо.
_agent_id_str = os.getenv("AGX_AGENT_ID", "agromatrix")
try:
_vb_lock = _vb_get_vision_lock(_agent_id_str, str(chat_id))
if _vb_lock:
_vb_label = (_vb_lock.get("user_label") or _vb_lock.get("label") or "").strip()
if _vb_label:
session["vision_last_label"] = _vb_label
tlog(logger, "vision_bridge_label_loaded",
chat_id=str(chat_id), label=_vb_label)
except Exception:
pass
# ── v4.2: USER TEXT OVERRIDE for vision_last_label ───────────────────────
# Якщо юзер явно змінює культуру текстом ("тепер це кукурудза") →
# перезаписуємо vision_last_label безпосередньо.
# Використовуємо той самий detect_user_override з vision_guard (fail-safe).
if text:
try:
from vision_guard import detect_user_override as _vb_detect_override
_vb_text_label = _vb_detect_override(text)
if _vb_text_label:
session["vision_last_label"] = _vb_text_label
tlog(logger, "vision_bridge_text_override",
chat_id=str(chat_id), label=_vb_text_label)
except Exception:
pass
# ── DOC CONTEXT FALLBACK (v3.3) ──────────────────────────────────────────
# Якщо gateway не передав doc_context — пробуємо підтягнути chat-scoped з memory.
# Це дозволяє Stepan бачити документ навіть якщо gateway не синхронізований.
if not doc_context and chat_id:
try:
import httpx as _httpx
import os as _os
_mem_url = _os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8000")
_agent_id_env = _os.getenv("AGX_AGENT_ID", "agromatrix")
_aid = _agent_id_env.lower()
_fact_user = f"chat:{_aid}:{chat_id}"
_fact_key = f"doc_context_chat:{_aid}:{chat_id}"
with _httpx.Client(timeout=3.0) as _hc:
_r = _hc.post(
f"{_mem_url}/facts/get",
json={"user_id": _fact_user, "fact_key": _fact_key},
)
if _r.status_code == 200:
_fdata = _r.json()
_fval = _fdata.get("fact_value_json") or {}
if isinstance(_fval, str):
import json as _json
_fval = _json.loads(_fval)
if _fval.get("doc_id") or _fval.get("file_unique_id"):
doc_context = _fval
has_doc_context = True
tlog(logger, "doc_context_fallback_loaded",
chat_id=str(chat_id),
doc_id=str(_fval.get("doc_id", ""))[:16])
except Exception as _fbe:
logger.debug("Doc context fallback failed (non-blocking): %s", _fbe)
# ── DOC ANCHOR RESET (v3.3) ──────────────────────────────────────────────
# Якщо doc_id змінився — скидаємо doc_facts і fact_claims попереднього документу.
# Fix D: active_doc_id пріоритетний — він явно фіксується при upload (навіть без тексту),
# тому перший text-запит після upload одразу має правильний anchor.
_current_doc_id: str | None = None
if doc_context:
_current_doc_id = (
doc_context.get("active_doc_id")
or doc_context.get("doc_id")
or doc_context.get("file_unique_id")
or None
)
if _current_doc_id:
_prev_doc_id = session.get("active_doc_id")
if _prev_doc_id and _prev_doc_id != _current_doc_id:
session["doc_facts"] = {}
session["fact_claims"] = []
tlog(logger, "doc_anchor_reset",
old=str(_prev_doc_id)[:16], new=str(_current_doc_id)[:16])
# ── DOC FOCUS GATE (v3.5 / v3.6) ────────────────────────────────────────
import time as _time_mod
_now_ts = _time_mod.time()
# TTL auto-expire (самозцілювальний)
if session.get("doc_focus") and not is_doc_focus_active(session, _now_ts):
_expired_age = round(_now_ts - (session.get("doc_focus_ts") or 0.0))
session["doc_focus"] = False
session["doc_focus_ts"] = 0.0
tlog(logger, "doc_focus_expired", chat_id=str(chat_id),
ttl_s=_expired_age, last_doc=str(session.get("active_doc_id", ""))[:16])
_signals = detect_context_signals(text)
_domain = _detect_domain(text, logger=logger)
_focus_active = is_doc_focus_active(session, _now_ts)
_cooldown_active = is_doc_focus_cooldown_active(session, _now_ts)
# Auto-clear doc_focus + встановити cooldown при зміні домену (web/vision)
_df_cooldown_until_update: float | None = None
if _focus_active and _domain in ("vision", "web"):
session["doc_focus"] = False
session["doc_focus_ts"] = 0.0
_focus_active = False
_cooldown_until = _now_ts + DOC_FOCUS_COOLDOWN_S
session["doc_focus_cooldown_until"] = _cooldown_until
_cooldown_active = True
_df_cooldown_until_update = _cooldown_until
tlog(logger, "doc_focus_cleared", chat_id=str(chat_id), reason=_domain)
tlog(logger, "doc_focus_cooldown_set", chat_id=str(chat_id),
seconds=int(DOC_FOCUS_COOLDOWN_S), reason=_domain)
# Визначаємо context_mode з gating-правилами (v3.6)
_context_mode: str
_doc_denied_reason: str | None = None
if _domain in ("doc",):
_is_explicit = _signals["has_explicit_doc_token"]
# Rule 1: Cooldown блокує implicit doc (не explicit)
if _cooldown_active and not _is_explicit:
_context_mode = "general"
_doc_denied_reason = "cooldown"
_ttl_left = int((session.get("doc_focus_cooldown_until") or 0.0) - _now_ts)
tlog(logger, "doc_mode_denied", chat_id=str(chat_id),
reason="cooldown", ttl_left=_ttl_left)
# Rule 2: Без explicit — дозволити doc тільки якщо є fact-сигнал або факти покривають
elif not _is_explicit:
_session_facts_gate = session.get("doc_facts") or {}
try:
from crews.agromatrix_crew.doc_facts import can_answer_from_facts as _cafg
_can_use_facts, _ = _cafg(text, _session_facts_gate)
except Exception:
_can_use_facts = False
if _signals["has_fact_signal"] or _can_use_facts:
# Дозволяємо doc-mode через факт-сигнал
_context_mode = "doc"
else:
_context_mode = "general"
_doc_denied_reason = "no_fact_signal"
tlog(logger, "doc_mode_denied", chat_id=str(chat_id),
reason="no_fact_signal",
has_facts=bool(_session_facts_gate))
# Rule 3: Explicit doc-токен завжди дозволяє doc (навіть при cooldown)
else:
_context_mode = "doc"
elif _domain == "general" and (_focus_active or _current_doc_id) and _signals["has_fact_signal"]:
# Факт-сигнал без doc-домену + active doc → дозволяємо doc якщо фокус живий
_context_mode = "doc" if _focus_active else "general"
else:
_context_mode = "general"
# Активуємо/продовжуємо doc_focus при успішному doc-mode
if _context_mode == "doc" and _current_doc_id:
if not _focus_active:
session["doc_focus"] = True
session["doc_focus_ts"] = _now_ts
_focus_active = True
tlog(logger, "doc_focus_set", chat_id=str(chat_id),
reason="doc_question_reactivated", doc_id=str(_current_doc_id)[:16])
tlog(logger, "context_mode", chat_id=str(chat_id),
mode=_context_mode, domain=_domain, focus=_focus_active,
cooldown=_cooldown_active)
# v3.6: Якщо doc-mode заблокований — повернути clarifier одразу (без LLM)
if _doc_denied_reason in ("cooldown", "no_fact_signal") and _domain == "doc":
_clarifier = build_mode_clarifier(text)
update_session(str(chat_id), text, depth="light", agents=[], last_question=None)
return _clarifier
# ── PHOTO CONTEXT GATE (v3.5 fix) ───────────────────────────────────────
# Якщо щойно було фото (< 120с) і запит короткий (<=3 слів) і без explicit doc —
# відповідаємо коротким уточненням замість light/general відповіді без контексту.
_last_photo_ts = float(session.get("last_photo_ts") or 0.0)
_photo_ctx_ttl = 120.0
_photo_just_sent = (_now_ts - _last_photo_ts) < _photo_ctx_ttl and _last_photo_ts > 0
if _photo_just_sent and len(text.split()) <= 3 and not _signals.get("has_explicit_doc_token"):
_photo_age = int(_now_ts - _last_photo_ts)
tlog(logger, "photo_context_gate", chat_id=str(chat_id),
age_s=_photo_age, words=len(text.split()))
update_session(str(chat_id), text, depth="light", agents=[], last_question=None)
return "Що саме хочеш дізнатися про фото?"
# ── CONFIRMATION GATE (v3.1) ────────────────────────────────────────────
# Якщо є pending_action у сесії і повідомлення — підтвердження,
# підставляємо контекст попереднього кроку і йдемо в deep.
_CONFIRMATION_WORDS = re.compile(
r"^(так|зроби|ок|ok|окей|погоджуюсь|погодився|роби|давай|підтверджую|yes|go|sure)[\W]*$",
re.IGNORECASE | re.UNICODE,
)
pending_action = session.get("pending_action")
if pending_action and _CONFIRMATION_WORDS.match(text.strip()):
tlog(logger, "confirmation_consumed", chat_id=str(chat_id),
intent=pending_action.get("intent"))
# Розширюємо контекст: використовуємо збережений intent і what_to_do_next
text = pending_action.get("what_to_do_next") or text
has_doc_context = has_doc_context or bool(pending_action.get("doc_context"))
# ── DOC BRIDGE (v3.1/v3.2 / v3.5) ──────────────────────────────────────
# PROMPT B: doc_context підмішуємо в промпт ТІЛЬКИ якщо context_mode == "doc".
# Якщо "general" — документ є але мовчимо про нього (не нав'язуємо "у цьому звіті").
_doc_summary_snippet: str = ""
if doc_context and _context_mode == "doc":
doc_id = doc_context.get("doc_id") or doc_context.get("id", "")
doc_title = doc_context.get("title") or doc_context.get("filename", "")
doc_summary = doc_context.get("extracted_summary") or doc_context.get("summary", "")
has_doc_context = True
tlog(logger, "doc_context_used", chat_id=str(chat_id), doc_id=doc_id,
has_content=bool(doc_summary))
elif doc_context and _context_mode == "general":
# Документ є, але поточний запит не про нього — не підмішуємо
doc_id = doc_context.get("doc_id") or ""
doc_title = ""
doc_summary = ""
tlog(logger, "doc_context_suppressed", chat_id=str(chat_id),
reason="context_mode_general", doc_id=doc_id[:16] if doc_id else "")
if doc_context and _context_mode == "doc":
# Будуємо snippet для deep-mode промпту (тільки в doc mode)
parts = []
if doc_title:
parts.append(f"=== ДОКУМЕНТ: «{doc_title}» ===")
if doc_summary:
# До 3000 символів — достатньо для xlsx з кількома листами
parts.append(f"ЗМІСТ ДОКУМЕНТА:\n{doc_summary[:3000]}")
parts.append("=== КІНЕЦЬ ДОКУМЕНТА ===")
elif doc_title:
# Fix E: є документ але summary порожній.
# Якщо file_id відомий — витяг можливий; НЕ просимо "надіслати ще раз".
_doc_file_id = doc_context.get("file_id") or doc_context.get("file_unique_id") or ""
if _doc_file_id:
parts.append(
f"(Вміст «{doc_title}» ще не витягнутий у цій сесії — "
f"але файл є. Перевір ІСТОРІЮ ДІАЛОГУ нижче: там можуть бути "
f"попередні відповіді про цей документ. Якщо в history нічого — "
f"відповідай: 'Зараз витягую дані — дай секунду.' і запропонуй "
f"1 конкретне уточнюючи питання.)"
)
else:
# file_id невідомий — тоді можна попросити надіслати знову
parts.append(
f"(Вміст «{doc_title}» недоступний. "
f"Перевір ІСТОРІЮ ДІАЛОГУ — там можуть бути попередні відповіді. "
f"Якщо в history нічого — попроси надіслати файл ще раз, одним реченням.)"
)
_doc_summary_snippet = "\n".join(parts)
# Light / Deep classification
last_topic = (user_profile or {}).get('last_topic')
depth = classify_depth(
text,
has_doc_context=has_doc_context,
last_topic=last_topic,
user_profile=user_profile,
session=session,
)
audit_event({**trace, 'agent': 'stepan', 'action': 'intake', 'depth': depth})
style_prefix = build_style_prefix(user_profile)
stepan = build_stepan(style_prefix=style_prefix)
# ── LIGHT MODE ────────────────────────────────────────────────────────────
if depth == "light":
tlog(logger, "crew_launch", launched=False, depth="light")
response = _stepan_light_response(text, stepan, trace, user_profile)
update_profile_if_needed(str(user_id), str(chat_id), text, response, intent=None, depth="light")
update_session(str(chat_id), text, depth="light", agents=[], last_question=None)
return response
# ── DEEP MODE ─────────────────────────────────────────────────────────────
tlog(logger, "crew_launch", launched=True, depth="deep", agents=["stepan"])
audit_event({**trace, 'agent': 'stepan', 'action': 'deep_single_agent'})
# ── FACT LOCK (v3.2) ────────────────────────────────────────────────────
# Якщо в сесії є зафіксовані числові факти — відповідаємо з кешу без RAG.
_session_facts: dict = session.get("doc_facts") or {}
if _session_facts:
_can_reuse, _reuse_keys = can_answer_from_facts(text, _session_facts)
if _can_reuse:
tlog(logger, "fact_reused", chat_id=str(chat_id),
keys=",".join(_reuse_keys))
# Спочатку перевіряємо сценарний розрахунок
_scen_ok, _scen_text = compute_scenario(text, _session_facts)
if _scen_ok:
_self_corr = build_self_correction(text, _session_facts, session, current_doc_id=_current_doc_id)
final = (_self_corr + _scen_text) if _self_corr else _scen_text
final = adapt_response_style(final, user_profile)
_new_claims = extract_fact_claims(final)
update_session(str(chat_id), text, depth="deep", agents=[],
last_question=None, doc_facts=_session_facts,
fact_claims=_new_claims, active_doc_id=_current_doc_id,
doc_focus=True, doc_focus_ts=_now_ts)
return final
# Без сценарію — форматуємо відомі факти
_facts_text = format_facts_as_text(
{k: _session_facts[k] for k in _reuse_keys if k in _session_facts}
)
_self_corr = build_self_correction(text, _session_facts, session, current_doc_id=_current_doc_id)
final = (_self_corr + _facts_text) if _self_corr else _facts_text
final = adapt_response_style(final, user_profile)
_new_claims = extract_fact_claims(final)
update_session(str(chat_id), text, depth="deep", agents=[],
last_question=None, doc_facts=_session_facts,
fact_claims=_new_claims, active_doc_id=_current_doc_id,
doc_focus=True, doc_focus_ts=_now_ts)
return final
# Preflight normalization — лише збагачення контексту, НЕ блокатор.
norm = {}
try:
norm = tool_dictionary.normalize_from_text(text, trace_id=trace['trace_id'], source='telegram')
except Exception as _ne:
logger.debug("normalize_from_text error (non-blocking): %s", _ne)
_all_pending = [item for cat in norm.values() for item in cat if item.get('status') == 'pending']
if _all_pending:
tlog(logger, "pending_terms_info", chat_id=str(chat_id), count=len(_all_pending))
intent = detect_intent(text)
# Специфічні intent-и що потребують tool-виклику
if intent in ['plan_week', 'plan_day']:
try:
plan_id = tool_operation_plan.create_plan({
'scope': {
'field_ids': [i.get('normalized_id') for i in norm.get('fields', []) if i.get('status')=='ok'],
'crop_ids': [i.get('normalized_id') for i in norm.get('crops', []) if i.get('status')=='ok'],
'date_window': {'start': '', 'end': ''}
},
'tasks': []
}, trace_id=trace['trace_id'], source='telegram')
return f"План створено: {plan_id}. Уточни дати та операції."
except Exception:
pass
# Будуємо промпт для Степана — з doc_context і chat_history
task_parts = []
# 0. v4: Farm State prefix (тільки не в doc і не web режимі)
if _context_mode != "doc" and _domain not in ("web",):
_farm_prefix = build_farm_state_prefix(session)
if _farm_prefix:
task_parts.append(_farm_prefix)
tlog(logger, "farm_state_injected", chat_id=str(chat_id),
crop=str((session.get("farm_state") or {}).get("current_crop", ""))[:20])
# 0b. v4.2: Vision → Agronomy Bridge prefix
# Додаємо контекст культури з останнього фото, якщо:
# - НЕ doc mode (документ має пріоритет)
# - НЕ web mode
# - vision_last_label є і не дублює farm_state (щоб не плутати Степана)
if _context_mode != "doc" and _domain not in ("web",):
_vb_label_now = (session.get("vision_last_label") or "").strip()
_farm_crop = str((session.get("farm_state") or {}).get("current_crop", "")).strip()
if _vb_label_now and _vb_label_now != _farm_crop:
task_parts.append(
f"SYSTEM NOTE (не виводь це в відповідь): "
f"Культура з останнього фото — {_vb_label_now}. "
"Використовуй як контекст для агрономічних питань."
)
tlog(logger, "vision_bridge_injected",
chat_id=str(chat_id), label=_vb_label_now)
# 0c. v4.7: FarmOS Farm State Bridge
# Читаємо збережений /farm state snapshot з memory-service.
# Умови injection (аналогічно vision bridge):
# - НЕ doc mode
# - НЕ web domain
# - snapshot не старший за 24h
if _context_mode != "doc" and _domain not in ("web",):
_fs_text = _load_farm_state_snapshot(str(chat_id))
if _fs_text:
task_parts.append(
f"SYSTEM NOTE (не виводь це в відповідь): "
f"Farm state snapshot (FarmOS, актуально):\n{_fs_text}"
)
tlog(logger, "farm_state_snapshot_loaded",
chat_id=str(chat_id), found=True,
preview=_fs_text[:40])
else:
tlog(logger, "farm_state_snapshot_loaded",
chat_id=str(chat_id), found=False)
# 1. Документ (якщо є вміст)
if _doc_summary_snippet:
task_parts.append(_doc_summary_snippet)
# 2. Контекст переписки (chat history з memory-service)
if chat_history:
# Беремо останні 3000 символів — достатньо для контексту
_history_snippet = chat_history[-3000:] if len(chat_history) > 3000 else chat_history
task_parts.append(
f"=== ІСТОРІЯ ДІАЛОГУ (до 40 повідомлень) ===\n"
f"{_history_snippet}\n"
f"=== КІНЕЦЬ ІСТОРІЇ ==="
)
# 3. Контекст профілю якщо є
if farm_profile:
fields = farm_profile.get("fields", [])
if fields:
task_parts.append(f"Поля господарства: {', '.join(str(f) for f in fields[:5])}")
task_parts.append(f"Поточний запит: {text}")
if _doc_summary_snippet:
task_parts.append(
"ІНСТРУКЦІЯ: У тебе є вміст документа вище. "
"Відповідай ТІЛЬКИ на основі цього документа. "
"Якщо є числа — цитуй їх точно. "
"Якщо потрібного числа немає — скажи в якому рядку/колонці шукати (1 речення)."
)
elif chat_history:
task_parts.append(
"ІНСТРУКЦІЯ: Використовуй ІСТОРІЮ ДІАЛОГУ вище для відповіді. "
"Якщо в history є згадка документа або дані — спирайся на них. "
"Відповідай коротко і конкретно українською."
)
elif doc_context and (doc_context.get("file_id") or doc_context.get("file_unique_id")):
# Fix E: файл є, але summary порожній і history немає → НЕ казати "немає даних"
_f_name = doc_context.get("file_name") or "документ"
task_parts.append(
f"ІНСТРУКЦІЯ: Файл «{_f_name}» отримано, але вміст ще не витягнутий. "
f"НЕ кажи 'немає даних' або 'надішли ще раз'. "
f"Відповідай: 'Зараз опрацьовую — дай хвилину' і постав 1 уточнюючи питання "
f"про те, що саме потрібно знайти у файлі."
)
else:
task_parts.append(
"Дай коротку, конкретну відповідь українською. "
"Якщо даних недостатньо — скажи що саме потрібно уточнити (1 питання)."
)
tlog(logger, "deep_context_ready", chat_id=str(chat_id),
has_doc=bool(_doc_summary_snippet), has_history=bool(chat_history),
history_len=len(chat_history))
final_task = Task(
description="\n\n".join(task_parts),
expected_output="Коротка відповідь для користувача українською мовою.",
agent=stepan
)
crew = Crew(agents=[stepan], tasks=[final_task], verbose=False)
result = crew.kickoff()
raw_response = str(result) + farmos_ui_hint()
styled_response = adapt_response_style(raw_response, user_profile)
# Reflection (Deep mode only, never recursive)
reflection = reflect_on_response(text, styled_response, user_profile, farm_profile)
if reflection.get("style_shift") and user_profile:
user_profile["style"] = reflection["style_shift"]
if reflection.get("clarifying_question"):
styled_response = styled_response.rstrip() + "\n\n" + reflection["clarifying_question"]
if reflection.get("new_facts") and user_id:
u = user_profile or {}
for k, v in reflection["new_facts"].items():
if k == "new_crops":
# Handled by update_profile_if_needed / FarmProfile
pass
elif k in ("name", "role"):
u[k] = v
if user_id:
save_user_profile(str(user_id), u)
audit_event({**trace, 'agent': 'stepan', 'action': 'reflection',
'confidence': reflection.get("confidence"), 'new_facts': list(reflection.get("new_facts", {}).keys())})
# Soft proactivity (v3: 1 речення max, за умовами)
clarifying_q = reflection.get("clarifying_question") if reflection else None
styled_response, _ = maybe_add_proactivity(
styled_response, user_profile or {}, depth="deep", reflection=reflection
)
update_profile_if_needed(str(user_id), str(chat_id), text, styled_response, intent=intent, depth="deep")
# ── v3.7: STATE-AWARE DOC ACK ────────────────────────────────────────────
# Якщо doc_focus щойно встановлений (focus_active раніше не був) і context_mode==doc,
# додаємо короткий префікс (max 60 символів), щоб уникнути "Так, пам'ятаю".
_doc_just_activated = (
_context_mode == "doc"
and _current_doc_id
and not _focus_active # _focus_active = стан ДО активації у цьому запиті
)
if _doc_just_activated:
_ack = "По звіту дивлюсь." if _signals.get("has_explicit_doc_token") else "Працюємо зі звітом."
# Тільки якщо відповідь не починається з нашого ack
if not styled_response.startswith(_ack):
styled_response = f"{_ack}\n{styled_response}"
tlog(logger, "doc_focus_acknowledged", chat_id=str(chat_id),
ack=_ack[:20], explicit=_signals.get("has_explicit_doc_token", False))
# ── FACT LOCK: витягуємо факти з відповіді і зберігаємо в session ──────
_new_facts = extract_doc_facts(styled_response)
_merged_facts: dict | None = None
if _new_facts:
_merged_facts = merge_doc_facts(_session_facts, _new_facts)
_conflicts = _merged_facts.get("conflicts", {})
tlog(logger, "fact_locked", chat_id=str(chat_id),
keys=",".join(k for k in _new_facts if k not in ("conflicts","needs_recheck")),
conflicts=bool(_conflicts))
# Якщо конфлікт — додаємо 1 речення до відповіді
if _conflicts:
conflict_key = next(iter(_conflicts))
tlog(logger, "fact_conflict", chat_id=str(chat_id), key=conflict_key)
styled_response = styled_response.rstrip() + (
f"\n\nБачу розбіжність по \"{conflict_key.replace('_uah','').replace('_ha','')}\". "
"Підтверди, яке значення правильне."
)
# ── SELF-CORRECTION: prefix якщо нова відповідь суперечить попередній ──
_self_corr_prefix = build_self_correction(styled_response, _session_facts, session, current_doc_id=_current_doc_id)
if _self_corr_prefix:
styled_response = _self_corr_prefix + styled_response
tlog(logger, "self_corrected", chat_id=str(chat_id))
# ── pending_action ───────────────────────────────────────────────────────
_pending_action: dict | None = None
if clarifying_q and intent:
_pending_action = {
"intent": intent,
"what_to_do_next": text,
"doc_context": {"doc_id": doc_context.get("doc_id")} if doc_context else None,
}
_new_claims = extract_fact_claims(styled_response)
# Якщо відповідь в doc-режимі і факти знайдено — вмикаємо/продовжуємо doc_focus
_df_update: bool | None = None
_df_ts_update: float | None = None
if _context_mode == "doc" and _current_doc_id:
_df_update = True
_df_ts_update = _now_ts
if not _focus_active:
tlog(logger, "doc_focus_set", chat_id=str(chat_id),
reason="deep_doc_answer", doc_id=str(_current_doc_id)[:16])
elif _context_mode == "general" and session.get("doc_focus"):
# Відповідь в general-режимі — скидаємо фокус (не продовжуємо TTL)
_df_update = False
_df_ts_update = 0.0
update_session(
str(chat_id), text, depth="deep",
agents=["stepan"],
last_question=clarifying_q,
pending_action=_pending_action,
doc_facts=_merged_facts if _merged_facts is not None else (_session_facts or None),
fact_claims=_new_claims,
active_doc_id=_current_doc_id,
doc_focus=_df_update,
doc_focus_ts=_df_ts_update,
doc_focus_cooldown_until=_df_cooldown_until_update,
)
# ── CONTEXT BLEED GUARD (v3.5 / v3.6 / v3.7) ────────────────────────────
# Якщо відповідь містить doc-фрази але context_mode == "general" → замінити.
# Це блокує "витік" шаблонних фраз навіть коли doc_context не підмішувався.
if _context_mode == "general":
_BLEED_RE = re.compile(
r"у\s+(?:цьому|наданому|даному)\s+документі"
r"\s+(?:цьому|наданому|даному)\s+документі"
r"|у\s+(?:цьому\s+)?звіті|в\s+(?:цьому\s+)?звіті",
re.IGNORECASE | re.UNICODE,
)
_bleed_match = _BLEED_RE.search(styled_response)
if _bleed_match:
_bleed_phrase = _bleed_match.group(0)
tlog(logger, "doc_phrase_suppressed", chat_id=str(chat_id),
phrase=_bleed_phrase[:40], mode="general")
# v3.6: використовуємо контекстний clarifier замість фіксованої фрази
styled_response = build_mode_clarifier(text)
# ── UX-PHRASE GUARD (v3.7) ────────────────────────────────────────────────
# Заміна шаблонних фраз "Так, пам'ятаю" / "Не бачу його перед собою" тощо.
_DOC_AWARENESS_RE = re.compile(
r"(так,\s*пам['\u2019]ятаю|не\s+бачу\s+його|не\s+бачу\s+перед\s+собою"
r"|мені\s+(?:не\s+)?доступний\s+документ)",
re.IGNORECASE | re.UNICODE,
)
if _DOC_AWARENESS_RE.search(styled_response):
tlog(logger, "doc_ux_phrase_suppressed", chat_id=str(chat_id))
styled_response = re.sub(
_DOC_AWARENESS_RE,
lambda m: build_mode_clarifier(text),
styled_response,
count=1,
)
# v3.7: Заміна "у цьому документі" у general при будь-якому тексті
if _context_mode == "general":
_DOC_MENTION_RE = re.compile(
r"\bзвіт\b",
re.IGNORECASE | re.UNICODE,
)
if _DOC_MENTION_RE.search(styled_response) and "doc_facts" not in str(styled_response[:100]):
tlog(logger, "doc_mention_blocked_in_general", chat_id=str(chat_id))
return styled_response
def main():
if len(sys.argv) < 2:
print('Usage: python run.py [--trace <id>] "<user request>"')
sys.exit(1)
args = sys.argv[1:]
trace_override = None
if args and args[0] == '--trace':
trace_override = args[1]
args = args[2:]
user_request = args[0]
output = handle_message(user_request, user_id=os.getenv('AGX_USER_ID',''), chat_id=os.getenv('AGX_CHAT_ID',''), trace_id=trace_override or '', ops_mode=os.getenv('AGX_OPS_MODE','0')=='1')
print(output)
if __name__ == '__main__':
main()