""" Real-time system metrics collector Збирає реальні метрики системи для NODE2 """ import psutil import platform from datetime import datetime from typing import Dict, Any def get_cpu_metrics() -> Dict[str, Any]: """Отримати метрики CPU""" cpu_percent = psutil.cpu_percent(interval=1) cpu_count = psutil.cpu_count() cpu_freq = psutil.cpu_freq() return { "percent": round(cpu_percent, 2), "count": cpu_count, "frequency_mhz": round(cpu_freq.current, 0) if cpu_freq else 0, } def get_memory_metrics() -> Dict[str, Any]: """Отримати метрики пам'яті""" memory = psutil.virtual_memory() return { "total_gb": round(memory.total / (1024**3), 2), "available_gb": round(memory.available / (1024**3), 2), "used_gb": round(memory.used / (1024**3), 2), "percent": round(memory.percent, 2), } def get_disk_metrics() -> Dict[str, Any]: """Отримати метрики диска""" disk = psutil.disk_usage('/') return { "total_gb": round(disk.total / (1024**3), 2), "used_gb": round(disk.used / (1024**3), 2), "free_gb": round(disk.free / (1024**3), 2), "percent": round(disk.percent, 2), } def get_gpu_metrics() -> Dict[str, Any]: """Отримати метрики GPU (для Apple Silicon використовуємо приблизну оцінку)""" # Для M4 Max немає прямого API для GPU metrics через psutil # Використовуємо CPU як проксі (Metal використовує інтегровану графіку) cpu_percent = psutil.cpu_percent(interval=0.5) # Примітивна оцінка: якщо CPU > 50%, то GPU теж активний gpu_estimate = min(cpu_percent * 1.2, 100.0) # GPU зазвичай трохи більше навантажений system_info = platform.processor() is_apple_silicon = 'arm' in platform.machine().lower() return { "available": is_apple_silicon, "model": "M4 Max GPU (40 cores)" if is_apple_silicon else "Unknown", "percent": round(gpu_estimate, 2) if is_apple_silicon else 0, "cores": 40 if is_apple_silicon and "Max" in str(system_info) else 0, } def get_network_metrics() -> Dict[str, Any]: """Отримати метрики мережі""" net_io = psutil.net_io_counters() return { "bytes_sent_mb": round(net_io.bytes_sent / (1024**2), 2), "bytes_recv_mb": round(net_io.bytes_recv / (1024**2), 2), "packets_sent": net_io.packets_sent, "packets_recv": net_io.packets_recv, } def get_system_info() -> Dict[str, Any]: """Отримати загальну інформацію про систему""" boot_time = datetime.fromtimestamp(psutil.boot_time()) uptime_seconds = (datetime.now() - boot_time).total_seconds() return { "platform": platform.system(), "platform_version": platform.version(), "architecture": platform.machine(), "processor": platform.processor(), "hostname": platform.node(), "uptime_seconds": round(uptime_seconds, 0), "boot_time": boot_time.isoformat(), } def get_all_metrics() -> Dict[str, Any]: """Отримати всі метрики системи""" return { "timestamp": datetime.utcnow().isoformat() + "Z", "cpu": get_cpu_metrics(), "memory": get_memory_metrics(), "disk": get_disk_metrics(), "gpu": get_gpu_metrics(), "network": get_network_metrics(), "system": get_system_info(), }