Files
microdao-daarion/services/config-registry/config_registry.py
Apple ef3473db21 snapshot: NODE1 production state 2026-02-09
Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.

Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles

Excluded from snapshot: venv/, .env, data/, backups, .tgz archives

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 08:46:46 -08:00

567 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Config Registry Service
=======================
Версійоване сховище конфігурацій для DAARION platform.
Зберігає:
- Системні промпти (версійовані)
- Routing rules
- Agent configurations
- Feature flags
- Quotas per user/team/agent
"""
import json
import hashlib
import logging
from datetime import datetime
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from pathlib import Path
import yaml
logger = logging.getLogger(__name__)
@dataclass
class ConfigVersion:
"""Версія конфігурації"""
version: str
content: Dict[str, Any]
content_hash: str
created_at: datetime
created_by: str
is_active: bool = False
description: str = ""
@dataclass
class PromptConfig:
"""Конфігурація системного промпту"""
agent_id: str
current_version: str
versions: Dict[str, ConfigVersion] = field(default_factory=dict)
def get_active_prompt(self) -> str:
"""Повертає активний промпт"""
if self.current_version in self.versions:
return self.versions[self.current_version].content.get("prompt", "")
return ""
@dataclass
class RoutingRule:
"""Правило маршрутизації"""
rule_id: str
name: str
conditions: Dict[str, Any] # {"intent": "energy", "confidence_min": 0.7}
target_agent: str
priority: int = 0
enabled: bool = True
@dataclass
class AgentQuota:
"""Квота для агента/користувача"""
entity_type: str # "user", "team", "agent"
entity_id: str
quota_type: str # "tokens", "requests", "storage"
limit: int
used: int = 0
reset_period: str = "daily" # "daily", "weekly", "monthly"
last_reset: Optional[datetime] = None
class ConfigRegistry:
"""
Централізований реєстр конфігурацій.
Підтримує:
- Версіонування промптів
- Routing rules
- Feature flags
- Quotas
"""
def __init__(self, config_dir: str = "/opt/microdao-daarion/config"):
self.config_dir = Path(config_dir)
self.config_dir.mkdir(parents=True, exist_ok=True)
# In-memory cache
self._prompts: Dict[str, PromptConfig] = {}
self._routing_rules: Dict[str, RoutingRule] = {}
self._feature_flags: Dict[str, bool] = {}
self._quotas: Dict[str, AgentQuota] = {}
# Load existing configs
self._load_all()
def _load_all(self):
"""Завантажує всі конфігурації з файлів"""
self._load_prompts()
self._load_routing_rules()
self._load_feature_flags()
self._load_quotas()
def _compute_hash(self, content: Any) -> str:
"""Обчислює SHA256 хеш контенту"""
content_str = json.dumps(content, sort_keys=True, default=str)
return hashlib.sha256(content_str.encode()).hexdigest()[:16]
# ==================== PROMPTS ====================
def _load_prompts(self):
"""Завантажує промпти з файлів"""
prompts_dir = self.config_dir / "prompts"
if not prompts_dir.exists():
prompts_dir.mkdir(parents=True, exist_ok=True)
return
for prompt_file in prompts_dir.glob("*.yaml"):
try:
with open(prompt_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data:
agent_id = prompt_file.stem
self._prompts[agent_id] = self._parse_prompt_config(agent_id, data)
except Exception as e:
logger.error(f"Failed to load prompt {prompt_file}: {e}")
def _parse_prompt_config(self, agent_id: str, data: Dict) -> PromptConfig:
"""Парсить конфігурацію промпту"""
versions = {}
for ver_id, ver_data in data.get("versions", {}).items():
versions[ver_id] = ConfigVersion(
version=ver_id,
content=ver_data.get("content", {}),
content_hash=self._compute_hash(ver_data.get("content", {})),
created_at=datetime.fromisoformat(ver_data.get("created_at", datetime.now().isoformat())),
created_by=ver_data.get("created_by", "system"),
is_active=ver_data.get("is_active", False),
description=ver_data.get("description", "")
)
return PromptConfig(
agent_id=agent_id,
current_version=data.get("current_version", "v1"),
versions=versions
)
def get_prompt(self, agent_id: str, version: str = None) -> Optional[str]:
"""
Повертає промпт для агента.
Args:
agent_id: ID агента
version: Версія (або None для активної)
"""
if agent_id not in self._prompts:
return None
config = self._prompts[agent_id]
target_version = version or config.current_version
if target_version in config.versions:
return config.versions[target_version].content.get("prompt", "")
return None
def set_prompt(self, agent_id: str, prompt: str,
version: str = None, created_by: str = "system",
description: str = "", activate: bool = True) -> str:
"""
Створює нову версію промпту.
Returns:
Версія нового промпту
"""
if agent_id not in self._prompts:
self._prompts[agent_id] = PromptConfig(
agent_id=agent_id,
current_version="v1",
versions={}
)
config = self._prompts[agent_id]
# Generate version if not provided
if version is None:
version = f"v{len(config.versions) + 1}"
# Create new version
content = {"prompt": prompt}
config.versions[version] = ConfigVersion(
version=version,
content=content,
content_hash=self._compute_hash(content),
created_at=datetime.now(),
created_by=created_by,
is_active=activate,
description=description
)
if activate:
# Deactivate previous
for v in config.versions.values():
if v.version != version:
v.is_active = False
config.current_version = version
# Save to file
self._save_prompt(agent_id)
logger.info(f"Created prompt version {version} for {agent_id}")
return version
def _save_prompt(self, agent_id: str):
"""Зберігає промпт у файл"""
prompts_dir = self.config_dir / "prompts"
prompts_dir.mkdir(parents=True, exist_ok=True)
config = self._prompts[agent_id]
data = {
"agent_id": agent_id,
"current_version": config.current_version,
"versions": {
v.version: {
"content": v.content,
"content_hash": v.content_hash,
"created_at": v.created_at.isoformat(),
"created_by": v.created_by,
"is_active": v.is_active,
"description": v.description
}
for v in config.versions.values()
}
}
with open(prompts_dir / f"{agent_id}.yaml", 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False)
def list_prompt_versions(self, agent_id: str) -> List[Dict]:
"""Повертає список версій промпту"""
if agent_id not in self._prompts:
return []
return [
{
"version": v.version,
"content_hash": v.content_hash,
"created_at": v.created_at.isoformat(),
"created_by": v.created_by,
"is_active": v.is_active,
"description": v.description
}
for v in self._prompts[agent_id].versions.values()
]
def rollback_prompt(self, agent_id: str, version: str) -> bool:
"""Відкатує промпт до вказаної версії"""
if agent_id not in self._prompts:
return False
config = self._prompts[agent_id]
if version not in config.versions:
return False
# Deactivate all
for v in config.versions.values():
v.is_active = False
# Activate target
config.versions[version].is_active = True
config.current_version = version
self._save_prompt(agent_id)
logger.info(f"Rolled back {agent_id} prompt to {version}")
return True
# ==================== ROUTING RULES ====================
def _load_routing_rules(self):
"""Завантажує routing rules"""
rules_file = self.config_dir / "routing_rules.yaml"
if not rules_file.exists():
return
try:
with open(rules_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f) or {}
for rule_id, rule_data in data.get("rules", {}).items():
self._routing_rules[rule_id] = RoutingRule(
rule_id=rule_id,
name=rule_data.get("name", rule_id),
conditions=rule_data.get("conditions", {}),
target_agent=rule_data.get("target_agent", ""),
priority=rule_data.get("priority", 0),
enabled=rule_data.get("enabled", True)
)
except Exception as e:
logger.error(f"Failed to load routing rules: {e}")
def get_routing_rules(self, enabled_only: bool = True) -> List[RoutingRule]:
"""Повертає routing rules, відсортовані за пріоритетом"""
rules = list(self._routing_rules.values())
if enabled_only:
rules = [r for r in rules if r.enabled]
return sorted(rules, key=lambda r: r.priority, reverse=True)
def add_routing_rule(self, rule: RoutingRule):
"""Додає routing rule"""
self._routing_rules[rule.rule_id] = rule
self._save_routing_rules()
def _save_routing_rules(self):
"""Зберігає routing rules"""
data = {
"rules": {
r.rule_id: {
"name": r.name,
"conditions": r.conditions,
"target_agent": r.target_agent,
"priority": r.priority,
"enabled": r.enabled
}
for r in self._routing_rules.values()
}
}
with open(self.config_dir / "routing_rules.yaml", 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False)
# ==================== FEATURE FLAGS ====================
def _load_feature_flags(self):
"""Завантажує feature flags"""
flags_file = self.config_dir / "feature_flags.yaml"
if not flags_file.exists():
# Default flags
self._feature_flags = {
"orchestration_enabled": True,
"soft_handoff_enabled": True,
"auto_handoff_enabled": False,
"privacy_gate_enabled": True,
"audit_logging_enabled": True,
"multi_agent_workflows": False,
}
self._save_feature_flags()
return
try:
with open(flags_file, 'r', encoding='utf-8') as f:
self._feature_flags = yaml.safe_load(f) or {}
except Exception as e:
logger.error(f"Failed to load feature flags: {e}")
def get_feature_flag(self, flag_name: str, default: bool = False) -> bool:
"""Повертає значення feature flag"""
return self._feature_flags.get(flag_name, default)
def set_feature_flag(self, flag_name: str, value: bool):
"""Встановлює feature flag"""
self._feature_flags[flag_name] = value
self._save_feature_flags()
logger.info(f"Feature flag {flag_name} set to {value}")
def _save_feature_flags(self):
"""Зберігає feature flags"""
with open(self.config_dir / "feature_flags.yaml", 'w', encoding='utf-8') as f:
yaml.dump(self._feature_flags, f, allow_unicode=True)
# ==================== QUOTAS ====================
def _load_quotas(self):
"""Завантажує квоти"""
quotas_file = self.config_dir / "quotas.yaml"
if not quotas_file.exists():
return
try:
with open(quotas_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f) or {}
for quota_id, quota_data in data.get("quotas", {}).items():
self._quotas[quota_id] = AgentQuota(
entity_type=quota_data.get("entity_type", "user"),
entity_id=quota_data.get("entity_id", ""),
quota_type=quota_data.get("quota_type", "requests"),
limit=quota_data.get("limit", 1000),
used=quota_data.get("used", 0),
reset_period=quota_data.get("reset_period", "daily"),
last_reset=datetime.fromisoformat(quota_data["last_reset"]) if quota_data.get("last_reset") else None
)
except Exception as e:
logger.error(f"Failed to load quotas: {e}")
def get_quota(self, entity_type: str, entity_id: str, quota_type: str) -> Optional[AgentQuota]:
"""Повертає квоту"""
quota_id = f"{entity_type}:{entity_id}:{quota_type}"
return self._quotas.get(quota_id)
def check_quota(self, entity_type: str, entity_id: str,
quota_type: str, amount: int = 1) -> bool:
"""
Перевіряє, чи є достатньо квоти.
Returns:
True якщо квота дозволяє, False якщо ліміт перевищено
"""
quota = self.get_quota(entity_type, entity_id, quota_type)
if quota is None:
return True # No quota = unlimited
return (quota.used + amount) <= quota.limit
def consume_quota(self, entity_type: str, entity_id: str,
quota_type: str, amount: int = 1) -> bool:
"""
Споживає квоту.
Returns:
True якщо успішно, False якщо ліміт перевищено
"""
quota_id = f"{entity_type}:{entity_id}:{quota_type}"
if quota_id not in self._quotas:
# Create default quota
self._quotas[quota_id] = AgentQuota(
entity_type=entity_type,
entity_id=entity_id,
quota_type=quota_type,
limit=10000, # Default limit
used=0,
reset_period="daily"
)
quota = self._quotas[quota_id]
if (quota.used + amount) > quota.limit:
return False
quota.used += amount
self._save_quotas()
return True
def _save_quotas(self):
"""Зберігає квоти"""
data = {
"quotas": {
f"{q.entity_type}:{q.entity_id}:{q.quota_type}": {
"entity_type": q.entity_type,
"entity_id": q.entity_id,
"quota_type": q.quota_type,
"limit": q.limit,
"used": q.used,
"reset_period": q.reset_period,
"last_reset": q.last_reset.isoformat() if q.last_reset else None
}
for q in self._quotas.values()
}
}
with open(self.config_dir / "quotas.yaml", 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False)
# Global instance
_config_registry: Optional[ConfigRegistry] = None
def get_config_registry() -> ConfigRegistry:
"""Повертає глобальний екземпляр ConfigRegistry"""
global _config_registry
if _config_registry is None:
_config_registry = ConfigRegistry()
return _config_registry
# ==================== HTTP API ====================
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
router = APIRouter(prefix="/config", tags=["config"])
class PromptRequest(BaseModel):
prompt: str
version: Optional[str] = None
description: str = ""
activate: bool = True
class FeatureFlagRequest(BaseModel):
value: bool
@router.get("/prompts/{agent_id}")
async def get_prompt(agent_id: str, version: str = None):
"""Отримати промпт для агента"""
registry = get_config_registry()
prompt = registry.get_prompt(agent_id, version)
if prompt is None:
raise HTTPException(404, f"Prompt not found for {agent_id}")
return {"agent_id": agent_id, "prompt": prompt}
@router.get("/prompts/{agent_id}/versions")
async def list_prompt_versions(agent_id: str):
"""Список версій промпту"""
registry = get_config_registry()
versions = registry.list_prompt_versions(agent_id)
return {"agent_id": agent_id, "versions": versions}
@router.post("/prompts/{agent_id}")
async def set_prompt(agent_id: str, request: PromptRequest):
"""Створити нову версію промпту"""
registry = get_config_registry()
version = registry.set_prompt(
agent_id=agent_id,
prompt=request.prompt,
version=request.version,
description=request.description,
activate=request.activate
)
return {"agent_id": agent_id, "version": version, "activated": request.activate}
@router.post("/prompts/{agent_id}/rollback/{version}")
async def rollback_prompt(agent_id: str, version: str):
"""Відкатити промпт до версії"""
registry = get_config_registry()
success = registry.rollback_prompt(agent_id, version)
if not success:
raise HTTPException(404, f"Version {version} not found")
return {"agent_id": agent_id, "version": version, "status": "rolled_back"}
@router.get("/feature-flags")
async def get_feature_flags():
"""Отримати всі feature flags"""
registry = get_config_registry()
return {"flags": registry._feature_flags}
@router.get("/feature-flags/{flag_name}")
async def get_feature_flag(flag_name: str):
"""Отримати feature flag"""
registry = get_config_registry()
value = registry.get_feature_flag(flag_name)
return {"flag": flag_name, "value": value}
@router.put("/feature-flags/{flag_name}")
async def set_feature_flag(flag_name: str, request: FeatureFlagRequest):
"""Встановити feature flag"""
registry = get_config_registry()
registry.set_feature_flag(flag_name, request.value)
return {"flag": flag_name, "value": request.value}
@router.get("/routing-rules")
async def get_routing_rules(enabled_only: bool = True):
"""Отримати routing rules"""
registry = get_config_registry()
rules = registry.get_routing_rules(enabled_only)
return {"rules": [
{
"rule_id": r.rule_id,
"name": r.name,
"conditions": r.conditions,
"target_agent": r.target_agent,
"priority": r.priority,
"enabled": r.enabled
}
for r in rules
]}
@router.get("/health")
async def health():
"""Health check"""
return {"status": "ok", "service": "config-registry"}