from service_auth import require_service_auth, verify_service """ Control Plane Service ===================== Централізоване управління policy, config, prompts. Endpoints: - /policy/{agent_id} — RBAC/entitlements - /config/{key} — routing rules, feature flags - /prompts/{agent_id} — versioned system prompts - /quotas/{user_id} — rate limits, budgets Всі сервіси кешують відповіді, оновлення через NATS events. """ import os import json import logging from datetime import datetime from typing import Dict, Any, Optional, List from pathlib import Path import hashlib from fastapi import FastAPI, HTTPException, Depends, Query from service_auth import require_service_auth from pydantic import BaseModel import yaml logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) app = FastAPI(title="Control Plane", version="1.0.0") # Configuration paths PROMPTS_PATH = Path(os.getenv("PROMPTS_PATH", "/data/prompts")) CONFIG_PATH = Path(os.getenv("CONFIG_PATH", "/data/config")) NATS_URL = os.getenv("NATS_URL", "nats://nats:4222") # In-memory cache with versions _cache = { "prompts": {}, "config": {}, "policy": {}, "quotas": {} } # ==================== Models ==================== class PromptVersion(BaseModel): agent_id: str version: str content: str hash: str updated_at: str tags: List[str] = [] class PolicyRule(BaseModel): agent_id: str allowed_modes: List[str] = ["public", "team", "private"] allowed_tools: List[str] = ["*"] max_tokens: int = 8000 rate_limit_per_minute: int = 60 confidential_allowed: bool = False class ConfigItem(BaseModel): key: str value: Any version: str updated_at: str class QuotaStatus(BaseModel): user_id: str requests_today: int = 0 tokens_today: int = 0 limit_requests: int = 1000 limit_tokens: int = 100000 reset_at: str = "" # ==================== Prompts API ==================== @app.get("/prompts/{agent_id}", response_model=PromptVersion) async def get_prompt(agent_id: str, version: Optional[str] = None, service=Depends(verify_service)): """ Get system prompt for agent. Cached, returns hash for cache validation. """ # Check cache first cache_key = f"{agent_id}:{version or 'latest'}" if cache_key in _cache["prompts"]: return _cache["prompts"][cache_key] # Load from file prompt_file = PROMPTS_PATH / f"{agent_id}_prompt.txt" if not prompt_file.exists(): raise HTTPException(404, f"Prompt not found: {agent_id}") content = prompt_file.read_text() content_hash = hashlib.md5(content.encode()).hexdigest()[:8] result = PromptVersion( agent_id=agent_id, version=version or "latest", content=content, hash=content_hash, updated_at=datetime.fromtimestamp(prompt_file.stat().st_mtime).isoformat() ) _cache["prompts"][cache_key] = result return result @app.get("/prompts/{agent_id}/hash") async def get_prompt_hash(agent_id: str): """Quick hash check for cache validation""" prompt = await get_prompt(agent_id) return {"agent_id": agent_id, "hash": prompt.hash} @app.get("/prompts") async def list_prompts(): """List all available prompts""" prompts = [] if PROMPTS_PATH.exists(): for f in PROMPTS_PATH.glob("*_prompt.txt"): agent_id = f.stem.replace("_prompt", "") prompts.append({ "agent_id": agent_id, "file": f.name, "size": f.stat().st_size }) return {"prompts": prompts} # ==================== Policy API ==================== # Default policies DEFAULT_POLICIES = { "helion": PolicyRule( agent_id="helion", allowed_modes=["public", "team", "private"], allowed_tools=["web_search", "graph_query", "memory_search", "image_gen"], max_tokens=8000, rate_limit_per_minute=60, confidential_allowed=True ), "nutra": PolicyRule( agent_id="nutra", allowed_modes=["public", "team", "private"], allowed_tools=["memory_search", "web_search"], max_tokens=4000, rate_limit_per_minute=30, confidential_allowed=False ), "druid": PolicyRule( agent_id="druid", allowed_modes=["public", "team"], allowed_tools=["memory_search"], max_tokens=4000, rate_limit_per_minute=30, confidential_allowed=False ), "daarwizz": PolicyRule( agent_id="daarwizz", allowed_modes=["public", "team", "private"], allowed_tools=["*"], max_tokens=16000, rate_limit_per_minute=100, confidential_allowed=True ) } @app.get("/policy/{agent_id}", response_model=PolicyRule) async def get_policy(agent_id: str): """Get RBAC/entitlements for agent""" if agent_id in _cache["policy"]: return _cache["policy"][agent_id] policy = DEFAULT_POLICIES.get(agent_id, PolicyRule( agent_id=agent_id, allowed_modes=["public"], allowed_tools=["memory_search"], max_tokens=2000, rate_limit_per_minute=10 )) _cache["policy"][agent_id] = policy return policy @app.get("/policy") async def list_policies(): """List all agent policies""" return {"policies": list(DEFAULT_POLICIES.keys())} # ==================== Config API ==================== # Default configs DEFAULT_CONFIG = { "routing.default_model": "deepseek-chat", "routing.fallback_model": "qwen3:8b", "routing.temperature": 0.2, "routing.max_retries": 3, "features.web_search": True, "features.image_gen": True, "features.vision": True, "features.stt": True, "features.tts": True, "limits.max_file_size_mb": 50, "limits.max_context_messages": 20, "privacy.default_mode": "team", "privacy.pii_detection": True } @require_service_auth(allowed_roles=['router', 'gateway']) @app.get async def get_config(key: str, service=Depends(verify_service)): """Get configuration value""" if key in _cache["config"]: return _cache["config"][key] if key in DEFAULT_CONFIG: result = ConfigItem( key=key, value=DEFAULT_CONFIG[key], version="1.0", updated_at=datetime.utcnow().isoformat() ) _cache["config"][key] = result return result raise HTTPException(404, f"Config not found: {key}") @app.get("/config") async def list_config(): """List all configuration keys""" return {"config": DEFAULT_CONFIG} # ==================== Quotas API ==================== # In-memory quota tracking (should be Redis in production) _quotas: Dict[str, QuotaStatus] = {} @app.get("/quotas/{user_id}", response_model=QuotaStatus) async def get_quota(user_id: str, service=Depends(verify_service)): """Get quota status for user""" if user_id not in _quotas: _quotas[user_id] = QuotaStatus( user_id=user_id, reset_at=(datetime.utcnow().replace(hour=0, minute=0, second=0)).isoformat() ) return _quotas[user_id] @app.post("/quotas/{user_id}/consume") async def consume_quota(user_id: str, requests: int = 1, tokens: int = 0): """Record quota consumption""" quota = await get_quota(user_id) quota.requests_today += requests quota.tokens_today += tokens # Check limits exceeded = [] if quota.requests_today > quota.limit_requests: exceeded.append("requests") if quota.tokens_today > quota.limit_tokens: exceeded.append("tokens") return { "user_id": user_id, "consumed": {"requests": requests, "tokens": tokens}, "exceeded": exceeded, "remaining": { "requests": max(0, quota.limit_requests - quota.requests_today), "tokens": max(0, quota.limit_tokens - quota.tokens_today) } } # ==================== Cache Management ==================== @app.post("/cache/invalidate") async def invalidate_cache(cache_type: str = "all"): """Invalidate cache (trigger reload)""" if cache_type == "all": for key in _cache: _cache[key] = {} elif cache_type in _cache: _cache[cache_type] = {} else: raise HTTPException(400, f"Unknown cache type: {cache_type}") return {"status": "invalidated", "type": cache_type} # ==================== Health ==================== @app.get("/health") async def health(): return { "status": "healthy", "service": "control-plane", "cache_stats": {k: len(v) for k, v in _cache.items()}, "prompts_path": str(PROMPTS_PATH), "prompts_available": PROMPTS_PATH.exists() } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=9200)