Files
Apple ef3473db21 snapshot: NODE1 production state 2026-02-09
Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.

Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles

Excluded from snapshot: venv/, .env, data/, backups, .tgz archives

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 08:46:46 -08:00

192 lines
6.5 KiB
Python

import json
import os
from pathlib import Path
from datetime import datetime
import yaml
try:
from rapidfuzz import process, fuzz
HAS_RAPIDFUZZ = True
except Exception:
HAS_RAPIDFUZZ = False
from .normalize import normalize_unit as _normalize_unit
DATA_PATH = Path(os.getenv('AGX_DICTIONARY_PATH', '/opt/microdao-daarion/data/dictionaries/dictionaries.yaml'))
PENDING_PATH = Path(os.getenv('AGX_PENDING_PATH', '/opt/microdao-daarion/data/dictionaries/pending.jsonl'))
REDACT_KEYS = set(k.strip().lower() for k in os.getenv('AGX_AUDIT_REDACT_KEYS', 'token,secret,password,authorization,cookie,api_key,signature').split(','))
def _load():
data = yaml.safe_load(DATA_PATH.read_text(encoding='utf-8'))
return data
def _sanitize(obj):
if isinstance(obj, dict):
out = {}
for k, v in obj.items():
if k.lower() in REDACT_KEYS:
out[k] = '***REDACTED***'
else:
out[k] = _sanitize(v)
return out
if isinstance(obj, list):
return [_sanitize(v) for v in obj]
return obj
def _suggest(term: str, items: list):
term_cf = term.casefold().strip()
candidates = []
for item in items:
if item.get('id'):
candidates.append((item['id'], item['name'], item['id']))
candidates.append((item['id'], item['name'], item['name']))
for s in item.get('synonyms', []):
candidates.append((item['id'], item['name'], s))
scores = []
if HAS_RAPIDFUZZ:
choices = [c[2] for c in candidates]
results = process.extract(term_cf, choices, scorer=fuzz.WRatio, limit=5)
for match, score, idx in results:
item_id, name, _ = candidates[idx]
scores.append({"id": item_id, "name": name, "score": score / 100})
else:
import difflib
for item_id, name, cand in candidates:
score = difflib.SequenceMatcher(None, term_cf, cand.casefold()).ratio()
scores.append({"id": item_id, "name": name, "score": score})
scores = sorted(scores, key=lambda x: x['score'], reverse=True)[:5]
return scores
def _match(term: str, items: list):
term_cf = term.casefold().strip()
for item in items:
if term_cf == item.get('id', '').casefold():
return item, 1.0, 'exact_id'
if term_cf == item.get('name', '').casefold():
return item, 0.98, 'exact_name'
for s in item.get('synonyms', []):
if term_cf == s.casefold():
return item, 0.95, 'synonym'
# fuzzy
suggestions = _suggest(term, items)
if suggestions:
top = suggestions[0]
if top['score'] >= 0.85:
item = next((i for i in items if i.get('id') == top['id']), None)
if item:
return item, float(top['score']), 'fuzzy'
return None, 0.0, 'none'
def _pending(trace_id: str, source: str, category: str, raw_term: str, suggestions: list, note: str = ''):
PENDING_PATH.parent.mkdir(parents=True, exist_ok=True)
record = {
"ts": datetime.utcnow().isoformat() + 'Z',
"trace_id": trace_id,
"source": source,
"category": category,
"raw_term": raw_term,
"suggestions": suggestions,
"note": note
}
record = _sanitize(record)
with PENDING_PATH.open('a', encoding='utf-8') as f:
f.write(json.dumps(record, ensure_ascii=False) + '\n')
def _normalize(term: str, items: list, category: str, trace_id: str, source: str):
item, conf, matched_by = _match(term, items)
suggestions = _suggest(term, items)
if not item or conf < 0.85:
_pending(trace_id, source, category, term, suggestions)
return {
"status": "pending",
"normalized_id": None,
"canonical_name": None,
"confidence": conf,
"matched_by": matched_by,
"suggestions": suggestions
}
return {
"status": "ok",
"normalized_id": item['id'],
"canonical_name": item.get('name'),
"confidence": conf,
"matched_by": matched_by,
"suggestions": suggestions
}
def normalize_field(term: str, trace_id: str = '', source: str = 'telegram'):
data = _load()
return _normalize(term, data.get('fields', []), 'field', trace_id, source)
def normalize_crop(term: str, trace_id: str = '', source: str = 'telegram'):
data = _load()
return _normalize(term, data.get('crops', []), 'crop', trace_id, source)
def normalize_operation(term: str, trace_id: str = '', source: str = 'telegram'):
data = _load()
return _normalize(term, data.get('operations', []), 'operation', trace_id, source)
def normalize_material(term: str, trace_id: str = '', source: str = 'telegram'):
data = _load()
return _normalize(term, data.get('materials', []), 'material', trace_id, source)
def normalize_unit(term: str, trace_id: str = '', source: str = 'telegram'):
data = _load()
unit_id = _normalize_unit(term, data.get('units', []))
if not unit_id:
suggestions = _suggest(term, data.get('units', []))
_pending(trace_id, source, 'unit', term, suggestions)
return {
"status": "pending",
"normalized_id": None,
"canonical_name": None,
"confidence": 0.0,
"matched_by": 'none',
"suggestions": suggestions
}
item = next(u for u in data.get('units', []) if u['id'] == unit_id)
return {
"status": "ok",
"normalized_id": unit_id,
"canonical_name": item.get('id'),
"confidence": 1.0,
"matched_by": 'synonym',
"suggestions": _suggest(term, data.get('units', []))
}
def normalize_from_text(text: str, trace_id: str = '', source: str = 'telegram'):
# naive extractor: split by comma
parts = [p.strip() for p in text.split(',') if p.strip()]
results = {"fields": [], "crops": [], "operations": [], "materials": [], "units": []}
for p in parts:
for cat, func in [
("fields", normalize_field),
("crops", normalize_crop),
("operations", normalize_operation),
("materials", normalize_material),
("units", normalize_unit)
]:
res = func(p, trace_id=trace_id, source=source)
if res['status'] == 'ok':
results[cat].append({"term": p, **res})
break
elif res['status'] == 'pending':
results[cat].append({"term": p, **res})
break
return results