feat: add data cleanup scripts and config (Task 027)

This commit is contained in:
Apple
2025-11-28 09:22:57 -08:00
parent d58347a890
commit 4286d64f05
3 changed files with 250 additions and 323 deletions

View File

@@ -3,77 +3,27 @@ nodes:
- node-2-macbook-m4max
microdao:
- slug: daarion
- slug: energy-union
- slug: greenfood
- slug: clan
- slug: soul
- slug: yaromir
- slug: druid
- slug: nutra
- slug: eonarch
- core-operations
- daarion-dao
- developer-hub
- energy-union
- greenfood-coop
- clan-network
- druid-labs
- soul-district
- eonarch-vision
agents:
# Orchestrators on Node 1
- slug: daarwizz
- slug: helion
- slug: greenfood
- slug: druid
- slug: clan
- slug: eonarch
- slug: nutra
- slug: soul
- slug: yaromir
# Node 2 Agents
- slug: faye
- slug: helix
- slug: exor
- slug: solarius
- slug: primesynth
- slug: nexor
- slug: monitor-node2
- slug: strategic-sentinels
- slug: vindex
- slug: aurora
- slug: arbitron
- slug: byteforge
- slug: vector
- slug: chainweaver
- slug: cypher
- slug: canvas
- slug: roxy
- slug: mira
- slug: tempo
- slug: harmony
- slug: storytelling
- slug: financial-analyst
- slug: accountant
- slug: tax-advisor
- slug: smart-contract-dev
- slug: defi-analyst
- slug: tokenomics-expert
- slug: nft-specialist
- slug: dao-governance
- slug: shadelock
- slug: penetration-tester
- slug: security-monitor
- slug: incident-responder
- slug: shadelock-forensics
- slug: exor-forensics
- slug: lumen
- slug: spectra
- slug: video-analyzer
- slug: protomind
- slug: labforge
- slug: testpilot
- slug: modelscout
- slug: breakpoint
- slug: growcell
- slug: somnia
- slug: memory-manager
- slug: knowledge-indexer
- slug: iris
- slug: sofia
- slug: budget-planner
- daarwizz
- helion
- greenfood
- clan
- druid
- eonarch
- soul
- yaromir
- exor
- faye
- helix
- iris
- sofia

View File

@@ -1,165 +1,143 @@
import asyncio
import os
import asyncpg
import yaml
import argparse
from typing import List, Dict, Any, Set
import asyncpg
from datetime import datetime
# Configuration
SYSTEM_CANDIDATE_KINDS = {'infra', 'router', 'monitor', 'system'}
SYSTEM_CANDIDATE_NAMES = {
'daarwizz', 'helion', 'greenfood', 'clan', 'druid', 'eonarch', 'soul', 'yaromir', 'core-monitor',
'sofia', 'exor', 'faye', 'helix', 'iris'
}
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://daarion:daarion@localhost:5432/daarion")
ALLOWLIST_PATH = os.getenv("ALLOWLIST_PATH", "config/data_cleanup_allowlist.yml")
async def get_db_connection():
url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/daarion")
return await asyncpg.connect(url)
async def fetch_all(conn, query):
records = await conn.fetch(query)
return [dict(r) for r in records]
SYSTEM_CANDIDATE_KINDS = ['infra', 'router', 'monitor', 'system']
def load_allowlist():
# Try relative path first (for local dev)
path = os.path.join(os.path.dirname(__file__), '../../config/data_cleanup_allowlist.yml')
# Resolve relative path if running from root
path = ALLOWLIST_PATH
if not os.path.exists(path):
# Try absolute path (for container)
path = '/app/config/data_cleanup_allowlist.yml'
# Try absolute path assuming docker context
path = os.path.join('/app', ALLOWLIST_PATH)
if not os.path.exists(path):
print(f"Warning: Allowlist file not found at {path}")
return {}
with open(path, 'r') as f:
return yaml.safe_load(f)
async def mark_entities(apply=False):
conn = await get_db_connection()
async def main():
parser = argparse.ArgumentParser(description='Mark test entities')
parser.add_argument('--apply', action='store_true', help='Apply changes to DB')
args = parser.parse_args()
allowlist = load_allowlist()
allowed_microdao_slugs = set(allowlist.get('microdao', []))
allowed_agent_slugs = set(allowlist.get('agents', []))
print(f"Connecting to DB...")
try:
allowlist = load_allowlist()
allowed_agent_slugs = set()
if allowlist.get('agents'):
allowed_agent_slugs = {a['slug'] for a in allowlist['agents']}
allowed_microdao_slugs = set()
if allowlist.get('microdao'):
allowed_microdao_slugs = {m['slug'] for m in allowlist['microdao']}
# 1. Fetch Agents
agents = await fetch_all(conn, """
SELECT id, display_name, slug, public_slug, node_id, kind, is_test, deleted_at
FROM agents
WHERE deleted_at IS NULL
""")
# 2. Fetch MicroDAOs
microdaos = await fetch_all(conn, """
SELECT id, name, slug, is_test, deleted_at
FROM microdaos
WHERE deleted_at IS NULL
""")
conn = await asyncpg.connect(DATABASE_URL)
except Exception as e:
print(f"Failed to connect: {e}")
return
# 3. Fetch Memberships
memberships = await fetch_all(conn, """
SELECT ma.agent_id, m.slug as microdao_slug
FROM microdao_agents ma
JOIN microdaos m ON ma.microdao_id = m.id
""")
try:
# Check table name for microdao
try:
table_name = "microdaos"
await conn.execute(f"SELECT 1 FROM {table_name} LIMIT 1")
except asyncpg.UndefinedTableError:
table_name = "microdao"
print(f"Using microdao table: {table_name}")
# 1. Identify Test Agents
# Use dynamic table name for JOIN
query = f"""
SELECT a.id, a.display_name, a.slug, a.kind, a.is_test,
(SELECT COUNT(*) FROM microdao_agents ma WHERE ma.agent_id = a.id) as microdao_count
FROM agents a
WHERE a.is_test = false AND a.deleted_at IS NULL
"""
agents = await conn.fetch(query)
agent_dao_slugs = {}
for m in memberships:
aid = m['agent_id']
if aid not in agent_dao_slugs:
agent_dao_slugs[aid] = set()
agent_dao_slugs[aid].add(m['microdao_slug'])
# Identify Test Agents
test_agent_ids = []
print("\n--- Agents Analysis ---")
for agent in agents:
aid = agent['id']
slug = agent.get('slug') or agent.get('public_slug') or ''
name = agent['display_name']
kind = agent['kind']
agents_to_mark = []
for a in agents:
is_real = False
is_allowed_slug = slug in allowed_agent_slugs
is_in_allowed_dao = False
for dao_slug in agent_dao_slugs.get(aid, []):
if dao_slug in allowed_microdao_slugs:
is_in_allowed_dao = True
break
is_system = (kind in SYSTEM_CANDIDATE_KINDS) or \
(name.lower() in SYSTEM_CANDIDATE_NAMES) or \
(slug and slug.lower() in SYSTEM_CANDIDATE_NAMES)
is_real = is_allowed_slug or is_in_allowed_dao or is_system
# Check if allowed explicitly
if a['slug'] and a['slug'] in allowed_agent_slugs:
is_real = True
# Check if in allowed microdao
if not is_real:
print(f"Mark as TEST: {aid} ({name}) - Slug: {slug}, Kind: {kind}")
test_agent_ids.append(aid)
# Identify Test MicroDAOs
# Fetch agent counts (only non-test agents count towards realness, but we haven't updated DB yet)
# For this check, we'll consider a MicroDAO real if it's in the allowlist.
# Empty MicroDAOs not in allowlist are test.
microdao_agent_counts = {}
memberships_all = await fetch_all(conn, "SELECT microdao_id FROM microdao_agents")
for m in memberships_all:
mid = m['microdao_id']
microdao_agent_counts[mid] = microdao_agent_counts.get(mid, 0) + 1
memberships = await conn.fetch(f"""
SELECT m.slug FROM microdao_agents ma
JOIN {table_name} m ON ma.microdao_id = m.id
WHERE ma.agent_id = $1
""", a['id'])
for m in memberships:
if m['slug'] in allowed_microdao_slugs:
is_real = True
break
test_microdao_ids = []
print("\n--- MicroDAO Analysis ---")
for md in microdaos:
mid = md['id']
slug = md['slug']
count = microdao_agent_counts.get(mid, 0)
is_allowed = slug in allowed_microdao_slugs
# If it has agents but not in allowlist, it's tricky. But user said: "All others with agents_count = 0 -> candidates"
# Let's stick to: in allowlist = real. Not in allowlist AND count=0 = test.
# What if count > 0 but not in allowlist? User didn't specify. I'll assume test if not allowed.
# Actually user logic: "Real if in allowlist. All others with agents_count=0 -> candidates".
# So if count > 0 and not in allowlist, we should be careful.
is_real = is_allowed
# Check if system
if not is_real:
if count == 0:
print(f"Mark as TEST: {mid} ({md['name']}) - Slug: {slug}, Count: {count}")
test_microdao_ids.append(mid)
else:
print(f"WARNING: MicroDAO {slug} has {count} agents but is not in allowlist. Skipping mark.")
# Apply updates
if apply:
print(f"\nApplying updates...")
if test_agent_ids:
await conn.execute("""
UPDATE agents
SET is_test = true,
deleted_at = NOW()
WHERE id = ANY($1::text[])
""", test_agent_ids)
print(f"Updated {len(test_agent_ids)} agents.")
if a['kind'] in SYSTEM_CANDIDATE_KINDS:
is_real = True
if not is_real:
agents_to_mark.append(a)
print(f"\nAGENTS TO MARK AS TEST: {len(agents_to_mark)}")
for a in agents_to_mark:
print(f" - {a['display_name']} ({a['slug']}) [MicroDAOs: {a['microdao_count']}]")
if test_microdao_ids:
# 2. Identify Test MicroDAOs
microdaos = await conn.fetch(f"""
SELECT m.id, m.slug, m.name, m.is_test,
(SELECT COUNT(*) FROM microdao_agents ma WHERE ma.microdao_id = m.id) as agent_count
FROM {table_name} m
WHERE m.is_test = false AND m.deleted_at IS NULL
""")
microdaos_to_mark = []
for m in microdaos:
is_real = False
if m['slug'] in allowed_microdao_slugs:
is_real = True
if not is_real and m['agent_count'] == 0:
microdaos_to_mark.append(m)
print(f"\nMICRODAOS TO MARK AS TEST: {len(microdaos_to_mark)}")
for m in microdaos_to_mark:
print(f" - {m['name']} ({m['slug']}) [Agents: {m['agent_count']}]")
if args.apply:
print("\nAPPLYING CHANGES...")
if agents_to_mark:
agent_ids = [a['id'] for a in agents_to_mark]
await conn.execute("""
UPDATE microdaos
SET is_test = true,
deleted_at = NOW()
WHERE id = ANY($1::text[])
""", test_microdao_ids)
print(f"Updated {len(test_microdao_ids)} microdaos.")
UPDATE agents
SET is_test = true, deleted_at = NOW()
WHERE id = ANY($1::uuid[])
""", agent_ids)
print(f"Marked {len(agent_ids)} agents as test.")
if microdaos_to_mark:
microdao_ids = [m['id'] for m in microdaos_to_mark]
await conn.execute(f"""
UPDATE {table_name}
SET is_test = true, deleted_at = NOW()
WHERE id = ANY($1::uuid[])
""", microdao_ids)
print(f"Marked {len(microdao_ids)} microdaos as test.")
else:
print("\nDry run. Use --apply to update database.")
print("\nDry run. Use --apply to execute changes.")
finally:
await conn.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--apply", action="store_true", help="Apply changes to database")
args = parser.parse_args()
asyncio.run(mark_entities(args.apply))
asyncio.run(main())

View File

@@ -1,137 +1,136 @@
import asyncio
import os
import json
import asyncpg
from typing import List, Dict, Any, Set
from datetime import datetime
# Configuration
SYSTEM_CANDIDATE_KINDS = {'infra', 'router', 'monitor', 'system'}
SYSTEM_CANDIDATE_NAMES = {
'daarwizz', 'helion', 'greenfood', 'clan', 'druid', 'eonarch', 'soul', 'yaromir', 'core-monitor',
'sofia', 'exor', 'faye', 'helix', 'iris'
}
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://daarion:daarion@localhost:5432/daarion")
REPORT_DIR = "docs/internal/clean"
REPORT_FILE = os.path.join(REPORT_DIR, "DATA_CLEANUP_REPORT.md")
async def get_db_connection():
url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/daarion")
return await asyncpg.connect(url)
SYSTEM_CANDIDATE_KINDS = ['infra', 'router', 'monitor', 'system']
SYSTEM_CANDIDATE_NAMES = [
'daarwizz', 'helion', 'greenfood', 'clan', 'druid', 'eonarch', 'soul',
'yaromir', 'core-monitor', 'exor', 'faye', 'helix', 'iris', 'sofia'
]
async def fetch_all(conn, query):
records = await conn.fetch(query)
return [dict(r) for r in records]
async def scan_entities():
conn = await get_db_connection()
async def main():
print(f"Connecting to DB...")
# Use provided URL or default local
try:
# 1. Fetch Nodes
# Note: node_registry is usually a separate service/db, but we might have cached nodes in node_cache table
# or we assume known nodes. For this script, let's query node_cache if available or just use hardcoded known nodes for validation
# Actually user said "Source of truth по нодах: node_registry". Assuming we can query node_registry.nodes table if it's in the same DB
# or use node_cache table which we used in previous tasks.
# Let's try to fetch from node_cache table.
conn = await asyncpg.connect(DATABASE_URL)
except Exception as e:
print(f"Failed to connect to DB: {e}")
return
try:
print("Fetching data...")
# 1. Nodes
try:
nodes = await fetch_all(conn, "SELECT node_id, node_name, status FROM node_cache")
nodes = await conn.fetch("SELECT id, name, hostname, roles FROM node_registry.nodes")
except asyncpg.UndefinedTableError:
print("Warning: node_cache table not found. Using empty list for nodes.")
print("Warning: node_registry.nodes table not found. Using empty list.")
nodes = []
known_node_ids = {n['node_id'] for n in nodes}
# Add hardcoded known nodes just in case cache is empty
known_node_ids.update({'node-1-hetzner-gex44', 'node-2-macbook-m4max'})
# 2. Fetch Agents
agents = await fetch_all(conn, """
SELECT id, display_name, slug, node_id, kind, is_test, deleted_at
FROM agents
WHERE deleted_at IS NULL
""")
# 3. Fetch MicroDAOs
microdaos = await fetch_all(conn, """
SELECT id, name, slug, is_test, deleted_at
FROM microdaos
WHERE deleted_at IS NULL
node_ids = {str(n['id']) for n in nodes}
# 2. MicroDAOs
# Check if table is microdao or microdaos
try:
table_name = "microdaos"
await conn.execute(f"SELECT 1 FROM {table_name} LIMIT 1")
except asyncpg.UndefinedTableError:
table_name = "microdao"
print(f"Using microdao table: {table_name}")
microdaos = await conn.fetch(f"""
SELECT m.id, m.slug, m.name, COUNT(ma.agent_id) as agent_count
FROM {table_name} m
LEFT JOIN microdao_agents ma ON ma.microdao_id = m.id
GROUP BY m.id, m.slug, m.name
""")
# 4. Fetch MicroDAO Agents
memberships = await fetch_all(conn, "SELECT agent_id, microdao_id FROM microdao_agents")
# 3. Agents
agents = await conn.fetch("""
SELECT a.id, a.display_name, a.kind, a.node_id, a.slug, a.is_public,
(SELECT COUNT(*) FROM microdao_agents ma WHERE ma.agent_id = a.id) as microdao_count
FROM agents a
""")
# Process Data
agent_microdao_map = {}
for m in memberships:
agent_id = m['agent_id']
if agent_id not in agent_microdao_map:
agent_microdao_map[agent_id] = set()
agent_microdao_map[agent_id].add(m['microdao_id'])
# Analyze Agents
agent_report = []
for a in agents:
agent_id = str(a['id'])
node_id = a['node_id']
microdao_agent_count = {}
for m in memberships:
md_id = m['microdao_id']
microdao_agent_count[md_id] = microdao_agent_count.get(md_id, 0) + 1
# Generate Report
report_lines = []
report_lines.append("# Data Cleanup Report")
report_lines.append(f"Generated at: {asyncio.get_event_loop().time()}") # Placeholder for time
report_lines.append("\n## Nodes Summary")
for node_id in known_node_ids:
report_lines.append(f"- {node_id}")
report_lines.append("\n## Agents Analysis")
report_lines.append("| ID | Name | Node | Kind | MicroDAOs | Is Orphan | System Candidate | Is Test |")
report_lines.append("|---|---|---|---|---|---|---|---|")
orphan_count = 0
for agent in agents:
a_id = agent['id']
name = agent['display_name']
node = agent['node_id']
kind = agent['kind']
slug = agent.get('slug')
has_valid_node = node in known_node_ids
dao_count = len(agent_microdao_map.get(a_id, []))
has_membership = dao_count > 0
is_orphan = not has_valid_node or not has_membership
if is_orphan:
orphan_count += 1
has_valid_node = node_id in node_ids if node_id else False
# If no nodes found at all, we assume valid node check is skipped or failed
if not nodes:
has_valid_node = True
is_system = (kind in SYSTEM_CANDIDATE_KINDS) or \
(name.lower() in SYSTEM_CANDIDATE_NAMES) or \
(slug and slug.lower() in SYSTEM_CANDIDATE_NAMES)
report_lines.append(
f"| {a_id} | {name} | {node} | {kind} | {dao_count} | {is_orphan} | {is_system} | {agent['is_test']} |"
)
report_lines.append("\n## MicroDAO Summary")
report_lines.append("| ID | Name | Slug | Agent Count | Suspicious (0 agents) | Is Test |")
report_lines.append("|---|---|---|---|---|---|")
for md in microdaos:
md_id = md['id']
count = microdao_agent_count.get(md_id, 0)
is_suspicious = count == 0
report_lines.append(
f"| {md_id} | {md['name']} | {md['slug']} | {count} | {is_suspicious} | {md['is_test']} |"
)
# Output
report_content = "\n".join(report_lines)
print(report_content)
os.makedirs("docs/internal/clean", exist_ok=True)
with open("docs/internal/clean/DATA_CLEANUP_REPORT.md", "w") as f:
f.write(report_content)
has_microdao_membership = a['microdao_count'] > 0
is_orphan = not has_valid_node or not has_microdao_membership
print(f"\nReport saved to docs/internal/clean/DATA_CLEANUP_REPORT.md")
print(f"Total agents: {len(agents)}")
print(f"Orphan agents found: {orphan_count}")
display_name = a['display_name'] or ''
slug = a['slug'] or ''
is_system_candidate = (
a['kind'] in SYSTEM_CANDIDATE_KINDS or
any(name in display_name.lower() for name in SYSTEM_CANDIDATE_NAMES) or
any(name in slug.lower() for name in SYSTEM_CANDIDATE_NAMES)
)
agent_report.append({
"id": agent_id,
"name": display_name,
"slug": slug,
"kind": a['kind'],
"node_id": node_id,
"has_valid_node": has_valid_node,
"microdao_count": a['microdao_count'],
"is_orphan": is_orphan,
"is_system_candidate": is_system_candidate
})
# Generate Report
os.makedirs(REPORT_DIR, exist_ok=True)
with open(REPORT_FILE, "w") as f:
f.write(f"# Data Cleanup Report\n")
f.write(f"Generated at: {datetime.now()}\n\n")
f.write("## Nodes Summary\n")
f.write("| ID | Name | Hostname | Roles |\n")
f.write("|---|---|---|---|\n")
for n in nodes:
f.write(f"| {n['id']} | {n['name']} | {n['hostname']} | {n['roles']} |\n")
f.write("\n")
f.write("## MicroDAO Summary\n")
f.write("| Name | Slug | Agents Count | Suspicious (0 agents) |\n")
f.write("|---|---|---|---|\n")
for m in microdaos:
suspicious = "⚠️ YES" if m['agent_count'] == 0 else "NO"
f.write(f"| {m['name']} | {m['slug']} | {m['agent_count']} | {suspicious} |\n")
f.write("\n")
f.write("## Agents by Node\n")
f.write("| ID | Name | Slug | Kind | Node | Valid Node? | MicroDAOs | Orphan? | System? |\n")
f.write("|---|---|---|---|---|---|---|---|---|\n")
for a in agent_report:
orphan_mark = "⚠️ YES" if a['is_orphan'] else "NO"
system_mark = "✅ YES" if a['is_system_candidate'] else "NO"
valid_node_mark = "" if a['has_valid_node'] else ""
f.write(f"| {a['id']} | {a['name']} | {a['slug']} | {a['kind']} | {a['node_id']} | {valid_node_mark} | {a['microdao_count']} | {orphan_mark} | {system_mark} |\n")
print(f"Report generated: {REPORT_FILE}")
finally:
await conn.close()
if __name__ == "__main__":
asyncio.run(scan_entities())
asyncio.run(main())