Sync NODE1 crewai-service runtime files and monitor summary script

This commit is contained in:
Apple
2026-02-18 06:00:19 -08:00
committed by Codex
parent 963813607b
commit 77ab034744
3 changed files with 367 additions and 37 deletions

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT="/opt/microdao-daarion"
STATUS_DIR="$ROOT/ops/status"
STATUS_JSON="$STATUS_DIR/canary_all.latest.json"
STATUS_LOG="$STATUS_DIR/canary_all.latest.log"
TS_START="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
mkdir -p "$STATUS_DIR"
set +e
out="$(cd "$ROOT" && ./ops/canary_all.sh 2>&1)"
rc=$?
set -e
printf '%s\n' "$out" > "$STATUS_LOG"
TS_END="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
status="ok"
if [[ $rc -ne 0 ]]; then
status="fail"
fi
python3 - <<PY
import json
from pathlib import Path
payload = {
"status": "$status",
"exit_code": $rc,
"started_at": "$TS_START",
"ended_at": "$TS_END",
"log_path": "$STATUS_LOG"
}
Path("$STATUS_JSON").write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(payload, ensure_ascii=False))
PY
# Optional notify to SOFIIA (non-fatal for canary status)
set +e
notify_out="$("$ROOT/ops/monitor_notify_sofiia.sh" "$STATUS_JSON" 2>&1)"
notify_rc=$?
set -e
printf '%s\n' "$notify_out" >> "$STATUS_LOG"
if [[ $notify_rc -ne 0 ]]; then
echo "[WARN] sofiia notify failed (rc=$notify_rc)"
fi
exit $rc

View File

@@ -7,6 +7,7 @@ import json
import time import time
import asyncio import asyncio
import logging import logging
import re
import httpx import httpx
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
@@ -28,9 +29,48 @@ from registry_loader import (
app = FastAPI(title="CrewAI Service", version="2.0.0") app = FastAPI(title="CrewAI Service", version="2.0.0")
# Configuration # Configuration
ROUTER_URL = os.getenv("ROUTER_URL", "http://dagi-staging-router:8000") _router_url = os.getenv("ROUTER_URL", "http://router:8000")
# Backward compatibility for older envs injecting unreachable hostname.
ROUTER_URL = _router_url.replace("dagi-staging-router", "router")
DEFAULT_MAX_CONCURRENCY = int(os.getenv("MAX_CONCURRENT_ROLES", "3")) DEFAULT_MAX_CONCURRENCY = int(os.getenv("MAX_CONCURRENT_ROLES", "3"))
LLM_TIMEOUT = int(os.getenv("LLM_TIMEOUT", "120")) LLM_TIMEOUT = int(os.getenv("LLM_TIMEOUT", "120"))
CREWAI_ORCHESTRATOR_LLM_PROFILE = os.getenv("CREWAI_ORCHESTRATOR_LLM_PROFILE", "cloud_deepseek").strip()
CREWAI_WORKER_LLM_PROFILE = os.getenv("CREWAI_WORKER_LLM_PROFILE", "local_qwen3_8b").strip()
TEAM_VOICE_ORCHESTRATORS = {"daarwizz"}
TEAM_VOICE_MARKERS_RE = re.compile(
r"(\bwe\b|\bour\b|\bour team\b|наша команда|\bми\b|\bмы\b|\аш\w*\b)",
flags=re.IGNORECASE,
)
VISIBILITY_LEVELS = {"public", "interclan", "incircle", "soulsafe", "sacred"}
CONSENT_STATUSES = {"none", "pending", "confirmed"}
def validate_runtime_envelope(envelope: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Lightweight envelope guard for /crew/run boundary."""
if not isinstance(envelope, dict):
return {"stop_code": "STOP_SCHEMA_ENVELOPE", "details": ["runtime_envelope_not_object"]}
required = [
"request_id",
"visibility_level_target",
"consent_status",
"allowed_actions",
"expected_output",
"input_text",
]
missing = [k for k in required if k not in envelope]
if missing:
return {"stop_code": "STOP_SCHEMA_ENVELOPE", "details": [f"missing:{m}" for m in missing]}
if envelope.get("visibility_level_target") not in VISIBILITY_LEVELS:
return {"stop_code": "STOP_SCHEMA_ENVELOPE", "details": ["invalid:visibility_level_target"]}
if envelope.get("consent_status") not in CONSENT_STATUSES:
return {"stop_code": "STOP_SCHEMA_ENVELOPE", "details": ["invalid:consent_status"]}
if not isinstance(envelope.get("allowed_actions"), list) or len(envelope.get("allowed_actions")) == 0:
return {"stop_code": "STOP_SCHEMA_ENVELOPE", "details": ["invalid:allowed_actions"]}
if not isinstance(envelope.get("input_text"), str) or not envelope.get("input_text", "").strip():
return {"stop_code": "STOP_SCHEMA_ENVELOPE", "details": ["invalid:input_text"]}
return None
# Request/Response models # Request/Response models
@@ -58,7 +98,9 @@ async def call_internal_llm(
prompt: str, prompt: str,
system_prompt: str = None, system_prompt: str = None,
role_context: str = None, role_context: str = None,
llm_profile: str = "reasoning" llm_profile: str = "reasoning",
max_tokens: int = 1200,
temperature: float = 0.3,
) -> str: ) -> str:
"""Call Router internal LLM endpoint for a single role""" """Call Router internal LLM endpoint for a single role"""
url = f"{ROUTER_URL}/internal/llm/complete" url = f"{ROUTER_URL}/internal/llm/complete"
@@ -66,8 +108,8 @@ async def call_internal_llm(
payload = { payload = {
"prompt": prompt, "prompt": prompt,
"llm_profile": llm_profile, "llm_profile": llm_profile,
"max_tokens": 2048, "max_tokens": max_tokens,
"temperature": 0.3 "temperature": temperature,
} }
if system_prompt: if system_prompt:
payload["system_prompt"] = system_prompt payload["system_prompt"] = system_prompt
@@ -85,6 +127,18 @@ async def call_internal_llm(
raise raise
def resolve_generation_controls(context: Dict[str, Any]) -> Dict[str, Any]:
"""Soft generation controls from Gateway metadata (no hard policy lock)."""
metadata = (context or {}).get("metadata", {}) if isinstance(context, dict) else {}
force_concise = bool(metadata.get("force_concise"))
is_training = bool(metadata.get("is_training_group"))
if force_concise:
return {"max_tokens": 220, "temperature": 0.2, "mode": "concise"}
if is_training:
return {"max_tokens": 520, "temperature": 0.25, "mode": "training"}
return {"max_tokens": 1200, "temperature": 0.3, "mode": "default"}
async def delegate_to_agent( async def delegate_to_agent(
orchestrator_id: str, orchestrator_id: str,
target_agent_id: str, target_agent_id: str,
@@ -137,13 +191,14 @@ async def execute_role(
"""Execute a single role with rate limiting""" """Execute a single role with rate limiting"""
role_id = role_config.get("id", "unknown") role_id = role_config.get("id", "unknown")
role_context = role_config.get("role_context", role_id) role_context = role_config.get("role_context", role_id)
llm_profile = role_config.get("llm_profile", "reasoning") llm_profile = CREWAI_WORKER_LLM_PROFILE or role_config.get("llm_profile", "reasoning")
system_prompt = role_config.get("system_prompt", "") system_prompt = role_config.get("system_prompt", "")
memory_brief = context.get("memory_brief", {}) memory_brief = context.get("memory_brief", {})
memory_str = json.dumps(memory_brief, ensure_ascii=False)[:500] if memory_brief else "" memory_str = json.dumps(memory_brief, ensure_ascii=False)[:500] if memory_brief else ""
prompt = f"Task: {task}\n\nContext: {memory_str}\n\nYour role: {role_context}\n\nProvide your analysis and recommendations." prompt = f"Task: {task}\n\nContext: {memory_str}\n\nYour role: {role_context}\n\nProvide your analysis and recommendations."
controls = resolve_generation_controls(context)
async with semaphore: async with semaphore:
t0 = time.time() t0 = time.time()
@@ -153,7 +208,9 @@ async def execute_role(
prompt=prompt, prompt=prompt,
system_prompt=system_prompt, system_prompt=system_prompt,
role_context=role_context, role_context=role_context,
llm_profile=llm_profile llm_profile=llm_profile,
max_tokens=controls["max_tokens"],
temperature=controls["temperature"],
) )
elapsed = time.time() - t0 elapsed = time.time() - t0
logger.info(f"ROLE DONE: {role_context} ({elapsed:.1f}s)") logger.info(f"ROLE DONE: {role_context} ({elapsed:.1f}s)")
@@ -240,13 +297,48 @@ async def run_crew_canonical(
# Synthesis # Synthesis
synthesis_prompt = synthesis_config.get("system_prompt", "") synthesis_prompt = synthesis_config.get("system_prompt", "")
synthesis_role = synthesis_config.get("role_context", "Synthesis") synthesis_role = synthesis_config.get("role_context", "Synthesis")
synthesis_llm = synthesis_config.get("llm_profile", "reasoning") synthesis_llm = CREWAI_ORCHESTRATOR_LLM_PROFILE or synthesis_config.get("llm_profile", "reasoning")
controls = resolve_generation_controls(context)
if orchestrator_id in TEAM_VOICE_ORCHESTRATORS:
voice_rule = (
"You may speak as an orchestrator team when appropriate."
)
else:
voice_rule = (
"CRITICAL STYLE: Write only in first-person singular as this single agent "
"(I/me in English; я in Ukrainian/Russian). "
"Do not present yourself as a team, group, council, or collective. "
"Do not use phrases like 'we', 'our team', 'наша команда', 'мы'."
)
if orchestrator_id == "nutra":
voice_rule += (
" CRITICAL GENDER: NUTRA must always use feminine first-person wording "
"in Ukrainian/Russian (e.g., 'я підготувала', 'я готова', 'я зрозуміла'); "
"never masculine forms like 'понял/готов'."
)
if controls["mode"] in ("concise", "training"):
final_prompt = f"""Task: {task} final_prompt = f"""Task: {task}
Team Analysis: Team Analysis:
{synthesis_context} {synthesis_context}
{voice_rule}
Return a concise user-facing answer in the user's language.
Format:
- 2-4 short bullets with key points
- 1 short next step
Avoid long reports and verbose section headers."""
else:
final_prompt = f"""Task: {task}
Team Analysis:
{synthesis_context}
{voice_rule}
Synthesize the above into a coherent, actionable response. Include: Synthesize the above into a coherent, actionable response. Include:
- Key findings - Key findings
- Recommendations - Recommendations
@@ -258,11 +350,37 @@ Synthesize the above into a coherent, actionable response. Include:
prompt=final_prompt, prompt=final_prompt,
system_prompt=synthesis_prompt, system_prompt=synthesis_prompt,
role_context=synthesis_role, role_context=synthesis_role,
llm_profile=synthesis_llm llm_profile=synthesis_llm,
max_tokens=controls["max_tokens"],
temperature=controls["temperature"],
) )
except Exception as e: except Exception as e:
final_result = f"Synthesis failed: {e}\n\nRaw team results:\n{synthesis_context}" final_result = f"Synthesis failed: {e}\n\nRaw team results:\n{synthesis_context}"
# Enforce single-agent voice for non-network orchestrators.
if orchestrator_id not in TEAM_VOICE_ORCHESTRATORS and TEAM_VOICE_MARKERS_RE.search(final_result or ""):
rewrite_prompt = f"""Rewrite the text below in the user's language.
Hard constraints:
- First-person singular only (I/me; я).
- Never use collective/team voice: no "we", "our", "our team", "ми", "мы", "наша команда", "наш*".
- Keep original meaning and structure concise.
{"- For NUTRA: strictly feminine first-person in Ukrainian/Russian; never masculine forms." if orchestrator_id == "nutra" else ""}
Text:
{final_result}"""
try:
final_result = await call_internal_llm(
prompt=rewrite_prompt,
system_prompt="You are a strict style editor for agent voice consistency.",
role_context="Voice Consistency Editor",
llm_profile=CREWAI_ORCHESTRATOR_LLM_PROFILE or "reasoning",
max_tokens=min(600, controls["max_tokens"] + 120),
temperature=0.1,
)
except Exception as e:
logger.warning(f"Voice rewrite skipped due to error: {e}")
elapsed = time.time() - t0 elapsed = time.time() - t0
return CrewRunResponse( return CrewRunResponse(
@@ -326,6 +444,20 @@ async def run_crew(request: CrewRunRequest):
"""Execute multi-role orchestration for an agent""" """Execute multi-role orchestration for an agent"""
orchestrator_id = request.orchestrator_id orchestrator_id = request.orchestrator_id
runtime_envelope = request.context.get("runtime_envelope") if isinstance(request.context, dict) else None
if runtime_envelope is not None:
envelope_error = validate_runtime_envelope(runtime_envelope)
if envelope_error:
return CrewRunResponse(
success=False,
error=envelope_error["stop_code"],
meta={
"stop_code": envelope_error["stop_code"],
"details": envelope_error.get("details", []),
"request_id": runtime_envelope.get("request_id") if isinstance(runtime_envelope, dict) else None,
},
)
if not is_orchestrator(orchestrator_id): if not is_orchestrator(orchestrator_id):
raise HTTPException(status_code=404, detail=f"Agent {orchestrator_id} not found or not an orchestrator") raise HTTPException(status_code=404, detail=f"Agent {orchestrator_id} not found or not an orchestrator")

View File

@@ -1,4 +1,3 @@
""" """
CrewAI Registry Loader - Variant A (Profiles per Agent) CrewAI Registry Loader - Variant A (Profiles per Agent)
Loads team configurations from crewai_teams.yml with profile support. Loads team configurations from crewai_teams.yml with profile support.
@@ -8,18 +7,76 @@ import json
import yaml import yaml
import logging import logging
from pathlib import Path from pathlib import Path
from functools import lru_cache from typing import Dict, Any, List
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
CREWAI_AGENTS_PATH = os.getenv("CREWAI_AGENTS_PATH", "/app/config/crewai_agents.json") CREWAI_AGENTS_PATH = os.getenv("CREWAI_AGENTS_PATH", "/app/config/crewai_agents.json")
CREWAI_TEAMS_PATH = os.getenv("CREWAI_TEAMS_PATH", "/app/config/crewai_teams.yml") CREWAI_TEAMS_PATH = os.getenv("CREWAI_TEAMS_PATH", "/app/config/crewai_teams.yml")
CREWAI_TEAMS_GENERATED_PATH = os.getenv("CREWAI_TEAMS_GENERATED_PATH", "/app/config/crewai_teams.generated.yml")
ROLES_BASE_PATH = os.getenv("ROLES_BASE_PATH", "/app/config/roles") ROLES_BASE_PATH = os.getenv("ROLES_BASE_PATH", "/app/config/roles")
# Example: "agromatrix=agx" means refs under agromatrix/* also try agx/*
ROLE_NAMESPACE_OVERRIDES_RAW = os.getenv("ROLE_NAMESPACE_OVERRIDES", "agromatrix=agx")
ROLE_NAMESPACE_OVERRIDES = {}
for part in ROLE_NAMESPACE_OVERRIDES_RAW.split(","):
part = part.strip()
if not part or "=" not in part:
continue
src, dst = part.split("=", 1)
src = src.strip()
dst = dst.strip()
if src and dst:
ROLE_NAMESPACE_OVERRIDES[src] = dst
_teams_config = None _teams_config = None
_agents_config = None _agents_config = None
def _normalize_prompt_ref(prompt_ref: str) -> str:
ref = (prompt_ref or "").strip().lstrip("/")
while ref.startswith("roles/"):
ref = ref[len("roles/"):]
return ref
def _build_prompt_candidates(prompt_ref: str) -> List[Path]:
base = Path(ROLES_BASE_PATH)
ref = _normalize_prompt_ref(prompt_ref)
candidates: List[Path] = []
if ref:
candidates.append(base / ref)
parts = ref.split("/", 1)
if parts and parts[0] in ROLE_NAMESPACE_OVERRIDES and len(parts) > 1:
mapped = f"{ROLE_NAMESPACE_OVERRIDES[parts[0]]}/{parts[1]}"
candidates.append(base / mapped)
# Legacy fallback (in case a ref is already relative but not under roles/*)
raw_ref = (prompt_ref or "").strip().lstrip("/")
if raw_ref and raw_ref != ref:
candidates.append(base / raw_ref)
# Deduplicate while preserving order
unique: List[Path] = []
seen = set()
for p in candidates:
k = str(p)
if k in seen:
continue
seen.add(k)
unique.append(p)
return unique
def resolve_prompt_path(prompt_ref: str) -> Path:
for candidate in _build_prompt_candidates(prompt_ref):
if candidate.exists():
return candidate
return None
def load_agents_config(): def load_agents_config():
"""Load basic agent config from crewai_agents.json""" """Load basic agent config from crewai_agents.json"""
global _agents_config global _agents_config
@@ -35,14 +92,64 @@ def load_agents_config():
def load_teams_config(): def load_teams_config():
"""Load teams/profiles config from crewai_teams.yml""" """Load teams/profiles config with generated-over-legacy merge."""
global _teams_config global _teams_config
if _teams_config is None: if _teams_config is None:
try: try:
with open(CREWAI_TEAMS_PATH, "r") as f: with open(CREWAI_TEAMS_PATH, "r") as f:
_teams_config = yaml.safe_load(f) legacy = yaml.safe_load(f) or {}
version = _teams_config.get("version", "unknown") merged = dict(legacy)
logger.info(f"Loaded teams config v{version} from {CREWAI_TEAMS_PATH}") version = merged.get("version", "unknown")
logger.info(f"Loaded legacy teams config v{version} from {CREWAI_TEAMS_PATH}")
generated = {}
gen_path = Path(CREWAI_TEAMS_GENERATED_PATH)
if gen_path.exists():
with open(gen_path, "r") as f:
generated = yaml.safe_load(f) or {}
logger.info(f"Loaded generated teams config from {CREWAI_TEAMS_GENERATED_PATH}")
# Merge strategy: generated overrides legacy for same (agent, profile).
# Missing agents/profiles continue to work from legacy file.
if generated:
skip_keys = {"schema_version", "version", "description"}
for key, val in generated.items():
if key in skip_keys:
continue
if not isinstance(val, dict):
merged[key] = val
continue
legacy_agent = merged.get(key, {})
if not isinstance(legacy_agent, dict):
legacy_agent = {}
merged_agent = dict(legacy_agent)
gen_profiles = val.get("profiles", {})
if isinstance(gen_profiles, dict):
legacy_profiles = legacy_agent.get("profiles", {})
if not isinstance(legacy_profiles, dict):
legacy_profiles = {}
combined_profiles = dict(legacy_profiles)
for profile_name, profile_cfg in gen_profiles.items():
if profile_name in combined_profiles:
logger.info(
f"Generated teams override legacy profile: {key}.{profile_name}"
)
combined_profiles[profile_name] = profile_cfg
merged_agent["profiles"] = combined_profiles
if "default_profile" in val:
merged_agent["default_profile"] = val["default_profile"]
if "profile_hints" in val:
merged_agent["profile_hints"] = val["profile_hints"]
merged[key] = merged_agent
_teams_config = merged
merged_version = _teams_config.get("version", "unknown")
logger.info(f"Effective teams config v{merged_version} loaded (legacy+generated merge)")
except Exception as e: except Exception as e:
logger.error(f"Failed to load teams config: {e}") logger.error(f"Failed to load teams config: {e}")
_teams_config = {} _teams_config = {}
@@ -50,22 +157,64 @@ def load_teams_config():
def load_role_prompt(prompt_ref: str) -> str: def load_role_prompt(prompt_ref: str) -> str:
"""Load role prompt from .md file""" """Load role prompt from .md file with normalized path resolution."""
if not prompt_ref: if not prompt_ref:
return "" return ""
prompt_path = Path(ROLES_BASE_PATH) / prompt_ref resolved = resolve_prompt_path(prompt_ref)
try: if not resolved:
if prompt_path.exists(): tried = ", ".join(str(p) for p in _build_prompt_candidates(prompt_ref))
return prompt_path.read_text(encoding="utf-8") logger.warning(f"Role prompt not found: ref={prompt_ref}; tried=[{tried}]")
else:
logger.warning(f"Role prompt not found: {prompt_path}")
return f"# Role: {prompt_ref}\n(prompt file missing)" return f"# Role: {prompt_ref}\n(prompt file missing)"
try:
return resolved.read_text(encoding="utf-8")
except Exception as e: except Exception as e:
logger.error(f"Error loading role prompt {prompt_ref}: {e}") logger.error(f"Error loading role prompt {resolved}: {e}")
return "" return ""
def validate_required_prompts(strict: bool = False) -> Dict[str, Any]:
"""Validate all team/synthesis prompt refs are resolvable."""
config = load_teams_config()
missing = []
skip_keys = {"schema_version", "version", "description"}
for agent_id, agent_cfg in config.items():
if agent_id in skip_keys or not isinstance(agent_cfg, dict):
continue
profiles = agent_cfg.get("profiles", {})
if not isinstance(profiles, dict):
continue
for profile_name, profile_cfg in profiles.items():
if not isinstance(profile_cfg, dict):
continue
synthesis = profile_cfg.get("synthesis", {}) or {}
synth_ref = synthesis.get("system_prompt_ref", "")
if synth_ref and not resolve_prompt_path(synth_ref):
missing.append(f"{agent_id}.{profile_name}.synthesis -> {synth_ref}")
for member in profile_cfg.get("team", []) or []:
ref = (member or {}).get("system_prompt_ref", "")
if ref and not resolve_prompt_path(ref):
mid = (member or {}).get("id", "unknown")
missing.append(f"{agent_id}.{profile_name}.{mid} -> {ref}")
if missing:
msg = f"Missing CrewAI role prompts: {len(missing)}"
if strict:
sample = "; ".join(missing[:8])
raise RuntimeError(f"{msg}. Examples: {sample}")
logger.warning(f"{msg}. Examples: {'; '.join(missing[:8])}")
return {
"missing_count": len(missing),
"missing": missing,
}
def get_agent_profiles(agent_id: str) -> list: def get_agent_profiles(agent_id: str) -> list:
"""Get list of available profiles for an agent""" """Get list of available profiles for an agent"""
config = load_teams_config() config = load_teams_config()