Files
microdao-daarion/services/node-registry/app/system_metrics.py
Apple 3de3c8cb36 feat: Add presence heartbeat for Matrix online status
- matrix-gateway: POST /internal/matrix/presence/online endpoint
- usePresenceHeartbeat hook with activity tracking
- Auto away after 5 min inactivity
- Offline on page close/visibility change
- Integrated in MatrixChatRoom component
2025-11-27 00:19:40 -08:00

108 lines
3.7 KiB
Python

"""
Real-time system metrics collector
Збирає реальні метрики системи для NODE2
"""
import psutil
import platform
from datetime import datetime
from typing import Dict, Any
def get_cpu_metrics() -> Dict[str, Any]:
"""Отримати метрики CPU"""
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
cpu_freq = psutil.cpu_freq()
return {
"percent": round(cpu_percent, 2),
"count": cpu_count,
"frequency_mhz": round(cpu_freq.current, 0) if cpu_freq else 0,
}
def get_memory_metrics() -> Dict[str, Any]:
"""Отримати метрики пам'яті"""
memory = psutil.virtual_memory()
return {
"total_gb": round(memory.total / (1024**3), 2),
"available_gb": round(memory.available / (1024**3), 2),
"used_gb": round(memory.used / (1024**3), 2),
"percent": round(memory.percent, 2),
}
def get_disk_metrics() -> Dict[str, Any]:
"""Отримати метрики диска"""
disk = psutil.disk_usage('/')
return {
"total_gb": round(disk.total / (1024**3), 2),
"used_gb": round(disk.used / (1024**3), 2),
"free_gb": round(disk.free / (1024**3), 2),
"percent": round(disk.percent, 2),
}
def get_gpu_metrics() -> Dict[str, Any]:
"""Отримати метрики GPU (для Apple Silicon використовуємо приблизну оцінку)"""
# Для M4 Max немає прямого API для GPU metrics через psutil
# Використовуємо CPU як проксі (Metal використовує інтегровану графіку)
cpu_percent = psutil.cpu_percent(interval=0.5)
# Примітивна оцінка: якщо CPU > 50%, то GPU теж активний
gpu_estimate = min(cpu_percent * 1.2, 100.0) # GPU зазвичай трохи більше навантажений
system_info = platform.processor()
is_apple_silicon = 'arm' in platform.machine().lower()
return {
"available": is_apple_silicon,
"model": "M4 Max GPU (40 cores)" if is_apple_silicon else "Unknown",
"percent": round(gpu_estimate, 2) if is_apple_silicon else 0,
"cores": 40 if is_apple_silicon and "Max" in str(system_info) else 0,
}
def get_network_metrics() -> Dict[str, Any]:
"""Отримати метрики мережі"""
net_io = psutil.net_io_counters()
return {
"bytes_sent_mb": round(net_io.bytes_sent / (1024**2), 2),
"bytes_recv_mb": round(net_io.bytes_recv / (1024**2), 2),
"packets_sent": net_io.packets_sent,
"packets_recv": net_io.packets_recv,
}
def get_system_info() -> Dict[str, Any]:
"""Отримати загальну інформацію про систему"""
boot_time = datetime.fromtimestamp(psutil.boot_time())
uptime_seconds = (datetime.now() - boot_time).total_seconds()
return {
"platform": platform.system(),
"platform_version": platform.version(),
"architecture": platform.machine(),
"processor": platform.processor(),
"hostname": platform.node(),
"uptime_seconds": round(uptime_seconds, 0),
"boot_time": boot_time.isoformat(),
}
def get_all_metrics() -> Dict[str, Any]:
"""Отримати всі метрики системи"""
return {
"timestamp": datetime.utcnow().isoformat() + "Z",
"cpu": get_cpu_metrics(),
"memory": get_memory_metrics(),
"disk": get_disk_metrics(),
"gpu": get_gpu_metrics(),
"network": get_network_metrics(),
"system": get_system_info(),
}