Files
Apple ef3473db21 snapshot: NODE1 production state 2026-02-09
Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.

Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles

Excluded from snapshot: venv/, .env, data/, backups, .tgz archives

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 08:46:46 -08:00

203 lines
6.0 KiB
Python

import json
import os
import uuid
from datetime import datetime
from pathlib import Path
from . import tool_integration_write
DATA_PATH = Path(os.getenv('AGX_OPERATION_PATH', '/opt/microdao-daarion/data/operations/operation_plans.jsonl'))
STATUS_FLOW = {
'planned': ['scheduled', 'cancelled'],
'scheduled': ['in_progress', 'cancelled'],
'in_progress': ['done', 'cancelled'],
'done': ['verified', 'closed'],
'verified': ['closed'],
'closed': [],
'cancelled': []
}
def _now():
return datetime.utcnow().isoformat() + 'Z'
def _append_event(ev: dict):
DATA_PATH.parent.mkdir(parents=True, exist_ok=True)
with DATA_PATH.open('a', encoding='utf-8') as f:
f.write(json.dumps(ev, ensure_ascii=False) + '\n')
def _load_events():
if not DATA_PATH.exists():
return []
events = []
for line in DATA_PATH.read_text(encoding='utf-8').split('\n'):
if not line.strip():
continue
events.append(json.loads(line))
return events
def _project():
plans = {}
for ev in _load_events():
pid = ev.get('plan_id')
if ev['type'] == 'create_plan':
plans[pid] = ev['payload']
elif ev['type'] == 'update_plan' and pid in plans:
plans[pid].update(ev['payload'])
plans[pid]['updated_ts'] = ev['ts']
elif ev['type'] == 'set_status' and pid in plans:
plans[pid]['status'] = ev['payload']['status']
plans[pid]['updated_ts'] = ev['ts']
elif ev['type'] == 'record_fact' and pid in plans:
plans[pid].setdefault('fact_events', []).append(ev['payload'])
plans[pid]['updated_ts'] = ev['ts']
return plans
def _new_id(prefix: str):
return f"{prefix}_{uuid.uuid4().hex[:8]}"
def create_plan(plan_spec: dict, trace_id: str = '', source: str = 'telegram'):
plan_id = f"opplan_{datetime.utcnow().strftime('%Y%m%d')}_{uuid.uuid4().hex[:6]}"
plan = {
'plan_id': plan_id,
'created_ts': _now(),
'updated_ts': _now(),
'trace_id': trace_id,
'source': source,
'status': 'planned',
'scope': plan_spec.get('scope', {}),
'tasks': [],
'fact_events': []
}
for task in plan_spec.get('tasks', []):
t = task.copy()
t['task_id'] = t.get('task_id') or _new_id('task')
plan['tasks'].append(t)
_append_event({
'ts': _now(),
'type': 'create_plan',
'plan_id': plan_id,
'trace_id': trace_id,
'payload': plan
})
return plan_id
def list_plans(filters: dict | None = None):
plans = list(_project().values())
if not filters:
return plans
status = filters.get('status')
if status:
plans = [p for p in plans if p.get('status') == status]
return plans
def get_plan(plan_id: str):
return _project().get(plan_id)
def update_plan(plan_id: str, patch: dict, trace_id: str = ''):
_append_event({
'ts': _now(),
'type': 'update_plan',
'plan_id': plan_id,
'trace_id': trace_id,
'payload': patch
})
return True
def set_status(plan_id: str, status: str, trace_id: str = ''):
plan = get_plan(plan_id)
if not plan:
raise ValueError('plan_not_found')
current = plan.get('status')
if status not in STATUS_FLOW.get(current, []):
raise ValueError('invalid_transition')
_append_event({
'ts': _now(),
'type': 'set_status',
'plan_id': plan_id,
'trace_id': trace_id,
'payload': {'status': status}
})
return True
def record_fact(plan_id: str, fact_event: dict, trace_id: str = ''):
plan = get_plan(plan_id)
if not plan:
raise ValueError('plan_not_found')
fact = fact_event.copy()
fact['fact_id'] = fact.get('fact_id') or _new_id('fact')
fact.setdefault('farmos_write', {'status': 'pending', 'ref': ''})
# write to farmOS via integration service (single-writer)
try:
tool_integration_write.write_tasklog(
{'source': 'operation_plan', 'deviceId': fact.get('field_id', '')},
{
'task': fact.get('operation_id', ''),
'status': 'done',
'ts': int(datetime.utcnow().timestamp() * 1000),
'notes': json.dumps(fact.get('fact', {}), ensure_ascii=False)
}
)
fact['farmos_write'] = {'status': 'ok', 'ref': ''}
except Exception:
fact['farmos_write'] = {'status': 'failed', 'ref': ''}
_append_event({
'ts': _now(),
'type': 'record_fact',
'plan_id': plan_id,
'trace_id': trace_id,
'payload': fact
})
return True
def plan_dashboard(date_range: dict | None = None, filters: dict | None = None):
plans = list_plans(filters)
counts = {'planned': 0, 'in_progress': 0, 'done': 0, 'overdue': 0}
critical_tasks = []
today = datetime.utcnow().date().isoformat()
for p in plans:
status = p.get('status')
if status in counts:
counts[status] += 1
for t in p.get('tasks', []):
planned_date = t.get('planned_date')
if planned_date and planned_date < today and status not in ['done', 'closed', 'verified']:
counts['overdue'] += 1
critical_tasks.append({
'field_id': p.get('scope', {}).get('field_ids', [''])[0],
'operation_id': t.get('operation_id'),
'planned_date': planned_date,
'reason': 'overdue'
})
if t.get('priority') == 'critical':
critical_tasks.append({
'field_id': p.get('scope', {}).get('field_ids', [''])[0],
'operation_id': t.get('operation_id'),
'planned_date': planned_date,
'reason': 'critical'
})
return {
'status': 'ok',
'date_range': date_range or {},
'counts': counts,
'critical_tasks': critical_tasks,
'plan_vs_fact': []
}