Files
microdao-daarion/services/ai-security-agent/security_agent.py
Apple 744c149300
Some checks failed
Build and Deploy Docs / build-and-deploy (push) Has been cancelled
Add automated session logging system
- Created logs/ structure (sessions, operations, incidents)
- Added session-start/log/end scripts
- Installed Git hooks for auto-logging commits/pushes
- Added shell integration for zsh
- Created CHANGELOG.md
- Documented today's session (2026-01-10)
2026-01-10 04:53:17 -08:00

405 lines
17 KiB
Python

#!/usr/bin/env python3
"""
AI Security Agent - NODE1 Crypto Miner Detection
Uses local LLM (Ollama qwen3:8b) for intelligent threat detection
"""
import os
import json
import time
import subprocess
import psutil
import requests
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any
# Configuration
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3:8b")
CHECK_INTERVAL = int(os.getenv("CHECK_INTERVAL", "300")) # 5 minutes
LOG_FILE = "/var/log/ai-security-agent.log"
ALERT_THRESHOLD = float(os.getenv("ALERT_THRESHOLD", "0.7")) # 70% confidence
# Telegram Configuration
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "") # Admin chat ID
TELEGRAM_ENABLED = bool(TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID)
# Known miner signatures from previous incidents
KNOWN_MINER_SIGNATURES = [
"cpioshuf", "ipcalcpg_recvlogical", "mysql", "softirq", "vrarhpb",
"bzip2egrep", "flockresize", "catcal", "G4NQXBp"
]
SUSPICIOUS_PATHS = [
"/tmp/.perf.c/", "/tmp/*perf*", "/tmp/.*/"
]
class AISecurityAgent:
def __init__(self):
self.log(f"🤖 AI Security Agent started (model: {OLLAMA_MODEL})")
self.incident_count = 0
if TELEGRAM_ENABLED:
self.log(f"📱 Telegram alerts enabled (chat_id: {TELEGRAM_CHAT_ID})")
else:
self.log("⚠️ Telegram alerts disabled (no token/chat_id)")
def log(self, message: str, level: str = "INFO"):
"""Log message to file and stdout"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] [{level}] {message}"
print(log_entry)
try:
with open(LOG_FILE, "a") as f:
f.write(log_entry + "\n")
except Exception as e:
print(f"Failed to write to log file: {e}")
def send_telegram_alert(self, message: str):
"""Send alert to Telegram"""
if not TELEGRAM_ENABLED:
return
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
data = {
"chat_id": TELEGRAM_CHAT_ID,
"text": f"🚨 *AI Security Agent Alert*\n\n{message}",
"parse_mode": "Markdown"
}
response = requests.post(url, json=data, timeout=10)
if response.status_code != 200:
self.log(f"Failed to send Telegram alert: {response.text}", "WARNING")
except Exception as e:
self.log(f"Failed to send Telegram alert: {e}", "WARNING")
def collect_system_metrics(self) -> Dict[str, Any]:
"""Collect system metrics for analysis"""
metrics = {
"timestamp": datetime.now().isoformat(),
"cpu": {
"load_avg": os.getloadavg(),
"percent": psutil.cpu_percent(interval=1),
"count": psutil.cpu_count()
},
"memory": {
"percent": psutil.virtual_memory().percent,
"available_gb": round(psutil.virtual_memory().available / (1024**3), 2)
},
"high_cpu_processes": [],
"suspicious_processes": [],
"tmp_executables": [],
"network_connections": []
}
# Find high CPU processes
for proc in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'cmdline']):
try:
info = proc.info
if info['cpu_percent'] and info['cpu_percent'] > 50:
metrics["high_cpu_processes"].append({
"pid": info['pid'],
"name": info['name'],
"user": info['username'],
"cpu": info['cpu_percent'],
"cmdline": ' '.join(info['cmdline'] or [])[:200]
})
# Check for known miner signatures
if info['name'] in KNOWN_MINER_SIGNATURES:
metrics["suspicious_processes"].append({
"pid": info['pid'],
"name": info['name'],
"reason": "Known miner signature",
"cmdline": ' '.join(info['cmdline'] or [])[:200]
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# Check /tmp for suspicious executables
try:
result = subprocess.run(
["find", "/tmp", "-type", "f", "-executable", "-mtime", "-1"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
tmp_files = result.stdout.strip().split('\n')
metrics["tmp_executables"] = [f for f in tmp_files if f and f != "/tmp/fix_healthcheck.sh"]
except Exception as e:
self.log(f"Failed to scan /tmp: {e}", "WARNING")
# Check for suspicious network connections
try:
for conn in psutil.net_connections(kind='inet'):
if conn.status == 'ESTABLISHED' and conn.raddr:
# Check for connections to mining pools (common ports)
if conn.raddr.port in [3333, 4444, 5555, 7777, 8888, 9999, 14444]:
try:
proc = psutil.Process(conn.pid)
metrics["network_connections"].append({
"pid": conn.pid,
"process": proc.name(),
"remote": f"{conn.raddr.ip}:{conn.raddr.port}",
"reason": "Suspicious port (common mining pool)"
})
except:
pass
except Exception as e:
self.log(f"Failed to check network connections: {e}", "WARNING")
return metrics
def analyze_with_llm(self, metrics: Dict[str, Any]) -> Dict[str, Any]:
"""Use LLM to analyze metrics and detect threats"""
# Prepare prompt for LLM
prompt = f"""You are a cybersecurity expert analyzing a Linux server for cryptocurrency mining malware.
SYSTEM METRICS:
- Load Average: {metrics['cpu']['load_avg']}
- CPU Usage: {metrics['cpu']['percent']}%
- Memory Usage: {metrics['memory']['percent']}%
HIGH CPU PROCESSES ({len(metrics['high_cpu_processes'])}):
{json.dumps(metrics['high_cpu_processes'], indent=2)}
SUSPICIOUS PROCESSES ({len(metrics['suspicious_processes'])}):
{json.dumps(metrics['suspicious_processes'], indent=2)}
SUSPICIOUS FILES IN /tmp ({len(metrics['tmp_executables'])}):
{json.dumps(metrics['tmp_executables'], indent=2)}
SUSPICIOUS NETWORK CONNECTIONS ({len(metrics['network_connections'])}):
{json.dumps(metrics['network_connections'], indent=2)}
KNOWN MINER PATTERNS:
- Process names: {', '.join(KNOWN_MINER_SIGNATURES)}
- Common paths: /tmp/.perf.c/, /tmp/.*/ (hidden dirs)
- Behavior: High CPU (>1000%), disguised as system processes (postgres, mysql, etc.)
ANALYZE:
1. Is there evidence of cryptocurrency mining?
2. What is the confidence level (0.0-1.0)?
3. What specific indicators support your conclusion?
4. What immediate actions should be taken?
Respond in JSON format:
{{
"threat_detected": true/false,
"confidence": 0.0-1.0,
"threat_type": "crypto_miner|suspicious_activity|false_positive|unknown",
"indicators": ["list", "of", "specific", "findings"],
"recommended_actions": ["action1", "action2"],
"summary": "brief explanation"
}}
Respond ONLY with valid JSON, no additional text."""
try:
response = requests.post(
f"{OLLAMA_BASE_URL}/api/generate",
json={
"model": OLLAMA_MODEL,
"prompt": prompt,
"stream": False,
"temperature": 0.3, # Lower temperature for more deterministic analysis
"options": {
"num_predict": 512
}
},
timeout=60
)
if response.status_code == 200:
result = response.json()
llm_response = result.get("response", "")
# Try to parse JSON from response
try:
# Find JSON in response (might have extra text)
start = llm_response.find('{')
end = llm_response.rfind('}') + 1
if start >= 0 and end > start:
json_str = llm_response[start:end]
analysis = json.loads(json_str)
return analysis
else:
self.log(f"No JSON found in LLM response: {llm_response[:200]}", "WARNING")
return self._fallback_analysis(metrics)
except json.JSONDecodeError as e:
self.log(f"Failed to parse LLM JSON: {e}\nResponse: {llm_response[:200]}", "WARNING")
return self._fallback_analysis(metrics)
else:
self.log(f"Ollama API error: {response.status_code}", "ERROR")
return self._fallback_analysis(metrics)
except requests.exceptions.RequestException as e:
self.log(f"Failed to connect to Ollama: {e}", "ERROR")
return self._fallback_analysis(metrics)
def _fallback_analysis(self, metrics: Dict[str, Any]) -> Dict[str, Any]:
"""Fallback analysis using simple rules if LLM fails"""
threat_detected = False
confidence = 0.0
indicators = []
# Check load average
if metrics['cpu']['load_avg'][0] > 10:
threat_detected = True
confidence += 0.3
indicators.append(f"High load average: {metrics['cpu']['load_avg'][0]}")
# Check high CPU processes
if metrics['high_cpu_processes']:
threat_detected = True
confidence += 0.3
for proc in metrics['high_cpu_processes']:
indicators.append(f"High CPU process: {proc['name']} (PID {proc['pid']}, {proc['cpu']}%)")
# Check suspicious processes
if metrics['suspicious_processes']:
threat_detected = True
confidence += 0.4
for proc in metrics['suspicious_processes']:
indicators.append(f"Known miner signature: {proc['name']} (PID {proc['pid']})")
# Check /tmp executables
if metrics['tmp_executables']:
threat_detected = True
confidence += 0.2
indicators.append(f"Suspicious executables in /tmp: {len(metrics['tmp_executables'])}")
# Check network connections
if metrics['network_connections']:
threat_detected = True
confidence += 0.3
indicators.append(f"Suspicious network connections: {len(metrics['network_connections'])}")
confidence = min(confidence, 1.0)
return {
"threat_detected": threat_detected,
"confidence": confidence,
"threat_type": "crypto_miner" if confidence > 0.6 else "suspicious_activity",
"indicators": indicators,
"recommended_actions": [
"Kill suspicious processes",
"Remove /tmp executables",
"Block network connections"
] if threat_detected else [],
"summary": f"Fallback analysis: {len(indicators)} indicators detected" if threat_detected else "No threats detected"
}
def execute_mitigation(self, analysis: Dict[str, Any], metrics: Dict[str, Any]):
"""Execute mitigation actions for detected threats"""
if not analysis.get("threat_detected"):
return
self.incident_count += 1
self.log(f"🚨 THREAT DETECTED (Incident #{self.incident_count})", "ALERT")
self.log(f" Confidence: {analysis['confidence']:.2%}", "ALERT")
self.log(f" Type: {analysis['threat_type']}", "ALERT")
self.log(f" Summary: {analysis['summary']}", "ALERT")
# Prepare Telegram message
telegram_msg = f"*NODE1 Security Incident #{self.incident_count}*\n\n"
telegram_msg += f"⚠️ *Confidence:* {analysis['confidence']:.0%}\n"
telegram_msg += f"🔍 *Type:* {analysis['threat_type']}\n"
telegram_msg += f"📝 *Summary:* {analysis['summary']}\n\n"
telegram_msg += "*Indicators:*\n"
for indicator in analysis['indicators']:
self.log(f" 📍 {indicator}", "ALERT")
telegram_msg += f"{indicator}\n"
# AUTO-MITIGATION (only if high confidence)
if analysis['confidence'] >= ALERT_THRESHOLD:
self.log("⚡ EXECUTING AUTO-MITIGATION", "ALERT")
# Kill high CPU processes
for proc in metrics['high_cpu_processes']:
try:
self.log(f" Killing PID {proc['pid']} ({proc['name']})", "ACTION")
subprocess.run(["kill", "-9", str(proc['pid'])], check=False)
except Exception as e:
self.log(f" Failed to kill PID {proc['pid']}: {e}", "ERROR")
# Kill known miner processes
for proc in metrics['suspicious_processes']:
try:
self.log(f" Killing known miner PID {proc['pid']} ({proc['name']})", "ACTION")
subprocess.run(["kill", "-9", str(proc['pid'])], check=False)
except Exception as e:
self.log(f" Failed to kill PID {proc['pid']}: {e}", "ERROR")
# Remove /tmp executables
for filepath in metrics['tmp_executables']:
try:
self.log(f" Removing {filepath}", "ACTION")
subprocess.run(["rm", "-rf", filepath], check=False)
except Exception as e:
self.log(f" Failed to remove {filepath}: {e}", "ERROR")
# Clean /tmp/.perf.c/
try:
self.log(" Cleaning /tmp/.perf.c/", "ACTION")
subprocess.run(["rm", "-rf", "/tmp/.perf.c"], check=False)
except Exception as e:
self.log(f" Failed to clean /tmp/.perf.c: {e}", "ERROR")
self.log("✅ AUTO-MITIGATION COMPLETED", "ALERT")
telegram_msg += "\n✅ *Auto-mitigation executed*"
else:
self.log(f"⚠️ Confidence {analysis['confidence']:.2%} below threshold {ALERT_THRESHOLD:.2%}, manual review recommended", "ALERT")
telegram_msg += f"\n⚠️ Manual review recommended (below {ALERT_THRESHOLD:.0%} threshold)"
# Send Telegram alert
self.send_telegram_alert(telegram_msg)
def run(self):
"""Main monitoring loop"""
self.log(f"Starting monitoring loop (interval: {CHECK_INTERVAL}s)")
while True:
try:
self.log("🔍 Starting security scan...")
# Collect metrics
metrics = self.collect_system_metrics()
# Quick check: if nothing suspicious, skip LLM analysis
if (not metrics['high_cpu_processes'] and
not metrics['suspicious_processes'] and
not metrics['tmp_executables'] and
not metrics['network_connections'] and
metrics['cpu']['load_avg'][0] < 5):
self.log("✅ System clean (quick check)")
else:
self.log("🧠 Analyzing with AI (suspicious activity detected)...")
analysis = self.analyze_with_llm(metrics)
self.log(f" Analysis complete: threat={analysis['threat_detected']}, confidence={analysis.get('confidence', 0):.2%}")
if analysis['threat_detected']:
self.execute_mitigation(analysis, metrics)
else:
self.log("✅ No threats detected")
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
self.log("Received shutdown signal", "INFO")
break
except Exception as e:
self.log(f"Error in monitoring loop: {e}", "ERROR")
time.sleep(60) # Wait before retry
if __name__ == "__main__":
agent = AISecurityAgent()
agent.run()