From 4286d64f054dcc40689e56a849863ec43d247bb0 Mon Sep 17 00:00:00 2001 From: Apple Date: Fri, 28 Nov 2025 09:22:57 -0800 Subject: [PATCH] feat: add data cleanup scripts and config (Task 027) --- config/data_cleanup_allowlist.yml | 94 +++------- scripts/data/mark_test_entities.py | 250 ++++++++++++--------------- scripts/data/scan_agents_microdao.py | 229 ++++++++++++------------ 3 files changed, 250 insertions(+), 323 deletions(-) diff --git a/config/data_cleanup_allowlist.yml b/config/data_cleanup_allowlist.yml index 029475b4..d999aae6 100644 --- a/config/data_cleanup_allowlist.yml +++ b/config/data_cleanup_allowlist.yml @@ -3,77 +3,27 @@ nodes: - node-2-macbook-m4max microdao: - - slug: daarion - - slug: energy-union - - slug: greenfood - - slug: clan - - slug: soul - - slug: yaromir - - slug: druid - - slug: nutra - - slug: eonarch + - core-operations + - daarion-dao + - developer-hub + - energy-union + - greenfood-coop + - clan-network + - druid-labs + - soul-district + - eonarch-vision agents: - # Orchestrators on Node 1 - - slug: daarwizz - - slug: helion - - slug: greenfood - - slug: druid - - slug: clan - - slug: eonarch - - slug: nutra - - slug: soul - - slug: yaromir - - # Node 2 Agents - - slug: faye - - slug: helix - - slug: exor - - slug: solarius - - slug: primesynth - - slug: nexor - - slug: monitor-node2 - - slug: strategic-sentinels - - slug: vindex - - slug: aurora - - slug: arbitron - - slug: byteforge - - slug: vector - - slug: chainweaver - - slug: cypher - - slug: canvas - - slug: roxy - - slug: mira - - slug: tempo - - slug: harmony - - slug: storytelling - - slug: financial-analyst - - slug: accountant - - slug: tax-advisor - - slug: smart-contract-dev - - slug: defi-analyst - - slug: tokenomics-expert - - slug: nft-specialist - - slug: dao-governance - - slug: shadelock - - slug: penetration-tester - - slug: security-monitor - - slug: incident-responder - - slug: shadelock-forensics - - slug: exor-forensics - - slug: lumen - - slug: spectra - - slug: video-analyzer - - slug: protomind - - slug: labforge - - slug: testpilot - - slug: modelscout - - slug: breakpoint - - slug: growcell - - slug: somnia - - slug: memory-manager - - slug: knowledge-indexer - - slug: iris - - slug: sofia - - slug: budget-planner - + - daarwizz + - helion + - greenfood + - clan + - druid + - eonarch + - soul + - yaromir + - exor + - faye + - helix + - iris + - sofia diff --git a/scripts/data/mark_test_entities.py b/scripts/data/mark_test_entities.py index 9d3bd09c..4de0d5fe 100644 --- a/scripts/data/mark_test_entities.py +++ b/scripts/data/mark_test_entities.py @@ -1,165 +1,143 @@ import asyncio import os -import asyncpg import yaml import argparse -from typing import List, Dict, Any, Set +import asyncpg +from datetime import datetime # Configuration -SYSTEM_CANDIDATE_KINDS = {'infra', 'router', 'monitor', 'system'} -SYSTEM_CANDIDATE_NAMES = { - 'daarwizz', 'helion', 'greenfood', 'clan', 'druid', 'eonarch', 'soul', 'yaromir', 'core-monitor', - 'sofia', 'exor', 'faye', 'helix', 'iris' -} +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://daarion:daarion@localhost:5432/daarion") +ALLOWLIST_PATH = os.getenv("ALLOWLIST_PATH", "config/data_cleanup_allowlist.yml") -async def get_db_connection(): - url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/daarion") - return await asyncpg.connect(url) - -async def fetch_all(conn, query): - records = await conn.fetch(query) - return [dict(r) for r in records] +SYSTEM_CANDIDATE_KINDS = ['infra', 'router', 'monitor', 'system'] def load_allowlist(): - # Try relative path first (for local dev) - path = os.path.join(os.path.dirname(__file__), '../../config/data_cleanup_allowlist.yml') + # Resolve relative path if running from root + path = ALLOWLIST_PATH if not os.path.exists(path): - # Try absolute path (for container) - path = '/app/config/data_cleanup_allowlist.yml' + # Try absolute path assuming docker context + path = os.path.join('/app', ALLOWLIST_PATH) + if not os.path.exists(path): + print(f"Warning: Allowlist file not found at {path}") + return {} + with open(path, 'r') as f: return yaml.safe_load(f) -async def mark_entities(apply=False): - conn = await get_db_connection() +async def main(): + parser = argparse.ArgumentParser(description='Mark test entities') + parser.add_argument('--apply', action='store_true', help='Apply changes to DB') + args = parser.parse_args() + + allowlist = load_allowlist() + allowed_microdao_slugs = set(allowlist.get('microdao', [])) + allowed_agent_slugs = set(allowlist.get('agents', [])) + + print(f"Connecting to DB...") try: - allowlist = load_allowlist() - allowed_agent_slugs = set() - if allowlist.get('agents'): - allowed_agent_slugs = {a['slug'] for a in allowlist['agents']} - - allowed_microdao_slugs = set() - if allowlist.get('microdao'): - allowed_microdao_slugs = {m['slug'] for m in allowlist['microdao']} - - # 1. Fetch Agents - agents = await fetch_all(conn, """ - SELECT id, display_name, slug, public_slug, node_id, kind, is_test, deleted_at - FROM agents - WHERE deleted_at IS NULL - """) - - # 2. Fetch MicroDAOs - microdaos = await fetch_all(conn, """ - SELECT id, name, slug, is_test, deleted_at - FROM microdaos - WHERE deleted_at IS NULL - """) + conn = await asyncpg.connect(DATABASE_URL) + except Exception as e: + print(f"Failed to connect: {e}") + return - # 3. Fetch Memberships - memberships = await fetch_all(conn, """ - SELECT ma.agent_id, m.slug as microdao_slug - FROM microdao_agents ma - JOIN microdaos m ON ma.microdao_id = m.id - """) + try: + # Check table name for microdao + try: + table_name = "microdaos" + await conn.execute(f"SELECT 1 FROM {table_name} LIMIT 1") + except asyncpg.UndefinedTableError: + table_name = "microdao" + + print(f"Using microdao table: {table_name}") + + # 1. Identify Test Agents + # Use dynamic table name for JOIN + query = f""" + SELECT a.id, a.display_name, a.slug, a.kind, a.is_test, + (SELECT COUNT(*) FROM microdao_agents ma WHERE ma.agent_id = a.id) as microdao_count + FROM agents a + WHERE a.is_test = false AND a.deleted_at IS NULL + """ + agents = await conn.fetch(query) - agent_dao_slugs = {} - for m in memberships: - aid = m['agent_id'] - if aid not in agent_dao_slugs: - agent_dao_slugs[aid] = set() - agent_dao_slugs[aid].add(m['microdao_slug']) - - # Identify Test Agents - test_agent_ids = [] - print("\n--- Agents Analysis ---") - for agent in agents: - aid = agent['id'] - slug = agent.get('slug') or agent.get('public_slug') or '' - name = agent['display_name'] - kind = agent['kind'] + agents_to_mark = [] + for a in agents: + is_real = False - is_allowed_slug = slug in allowed_agent_slugs - - is_in_allowed_dao = False - for dao_slug in agent_dao_slugs.get(aid, []): - if dao_slug in allowed_microdao_slugs: - is_in_allowed_dao = True - break - - is_system = (kind in SYSTEM_CANDIDATE_KINDS) or \ - (name.lower() in SYSTEM_CANDIDATE_NAMES) or \ - (slug and slug.lower() in SYSTEM_CANDIDATE_NAMES) - - is_real = is_allowed_slug or is_in_allowed_dao or is_system + # Check if allowed explicitly + if a['slug'] and a['slug'] in allowed_agent_slugs: + is_real = True + # Check if in allowed microdao if not is_real: - print(f"Mark as TEST: {aid} ({name}) - Slug: {slug}, Kind: {kind}") - test_agent_ids.append(aid) - - # Identify Test MicroDAOs - # Fetch agent counts (only non-test agents count towards realness, but we haven't updated DB yet) - # For this check, we'll consider a MicroDAO real if it's in the allowlist. - # Empty MicroDAOs not in allowlist are test. - - microdao_agent_counts = {} - memberships_all = await fetch_all(conn, "SELECT microdao_id FROM microdao_agents") - for m in memberships_all: - mid = m['microdao_id'] - microdao_agent_counts[mid] = microdao_agent_counts.get(mid, 0) + 1 + memberships = await conn.fetch(f""" + SELECT m.slug FROM microdao_agents ma + JOIN {table_name} m ON ma.microdao_id = m.id + WHERE ma.agent_id = $1 + """, a['id']) + for m in memberships: + if m['slug'] in allowed_microdao_slugs: + is_real = True + break - test_microdao_ids = [] - print("\n--- MicroDAO Analysis ---") - for md in microdaos: - mid = md['id'] - slug = md['slug'] - count = microdao_agent_counts.get(mid, 0) - - is_allowed = slug in allowed_microdao_slugs - # If it has agents but not in allowlist, it's tricky. But user said: "All others with agents_count = 0 -> candidates" - # Let's stick to: in allowlist = real. Not in allowlist AND count=0 = test. - # What if count > 0 but not in allowlist? User didn't specify. I'll assume test if not allowed. - # Actually user logic: "Real if in allowlist. All others with agents_count=0 -> candidates". - # So if count > 0 and not in allowlist, we should be careful. - - is_real = is_allowed + # Check if system if not is_real: - if count == 0: - print(f"Mark as TEST: {mid} ({md['name']}) - Slug: {slug}, Count: {count}") - test_microdao_ids.append(mid) - else: - print(f"WARNING: MicroDAO {slug} has {count} agents but is not in allowlist. Skipping mark.") - - # Apply updates - if apply: - print(f"\nApplying updates...") - if test_agent_ids: - await conn.execute(""" - UPDATE agents - SET is_test = true, - deleted_at = NOW() - WHERE id = ANY($1::text[]) - """, test_agent_ids) - print(f"Updated {len(test_agent_ids)} agents.") + if a['kind'] in SYSTEM_CANDIDATE_KINDS: + is_real = True + + if not is_real: + agents_to_mark.append(a) + + print(f"\nAGENTS TO MARK AS TEST: {len(agents_to_mark)}") + for a in agents_to_mark: + print(f" - {a['display_name']} ({a['slug']}) [MicroDAOs: {a['microdao_count']}]") - if test_microdao_ids: + # 2. Identify Test MicroDAOs + microdaos = await conn.fetch(f""" + SELECT m.id, m.slug, m.name, m.is_test, + (SELECT COUNT(*) FROM microdao_agents ma WHERE ma.microdao_id = m.id) as agent_count + FROM {table_name} m + WHERE m.is_test = false AND m.deleted_at IS NULL + """) + + microdaos_to_mark = [] + for m in microdaos: + is_real = False + if m['slug'] in allowed_microdao_slugs: + is_real = True + + if not is_real and m['agent_count'] == 0: + microdaos_to_mark.append(m) + + print(f"\nMICRODAOS TO MARK AS TEST: {len(microdaos_to_mark)}") + for m in microdaos_to_mark: + print(f" - {m['name']} ({m['slug']}) [Agents: {m['agent_count']}]") + + if args.apply: + print("\nAPPLYING CHANGES...") + if agents_to_mark: + agent_ids = [a['id'] for a in agents_to_mark] await conn.execute(""" - UPDATE microdaos - SET is_test = true, - deleted_at = NOW() - WHERE id = ANY($1::text[]) - """, test_microdao_ids) - print(f"Updated {len(test_microdao_ids)} microdaos.") + UPDATE agents + SET is_test = true, deleted_at = NOW() + WHERE id = ANY($1::uuid[]) + """, agent_ids) + print(f"Marked {len(agent_ids)} agents as test.") + + if microdaos_to_mark: + microdao_ids = [m['id'] for m in microdaos_to_mark] + await conn.execute(f""" + UPDATE {table_name} + SET is_test = true, deleted_at = NOW() + WHERE id = ANY($1::uuid[]) + """, microdao_ids) + print(f"Marked {len(microdao_ids)} microdaos as test.") else: - print("\nDry run. Use --apply to update database.") - + print("\nDry run. Use --apply to execute changes.") + finally: await conn.close() if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--apply", action="store_true", help="Apply changes to database") - args = parser.parse_args() - - asyncio.run(mark_entities(args.apply)) - + asyncio.run(main()) diff --git a/scripts/data/scan_agents_microdao.py b/scripts/data/scan_agents_microdao.py index 4e4ba114..9ad18626 100644 --- a/scripts/data/scan_agents_microdao.py +++ b/scripts/data/scan_agents_microdao.py @@ -1,137 +1,136 @@ import asyncio import os +import json import asyncpg -from typing import List, Dict, Any, Set +from datetime import datetime # Configuration -SYSTEM_CANDIDATE_KINDS = {'infra', 'router', 'monitor', 'system'} -SYSTEM_CANDIDATE_NAMES = { - 'daarwizz', 'helion', 'greenfood', 'clan', 'druid', 'eonarch', 'soul', 'yaromir', 'core-monitor', - 'sofia', 'exor', 'faye', 'helix', 'iris' -} +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://daarion:daarion@localhost:5432/daarion") +REPORT_DIR = "docs/internal/clean" +REPORT_FILE = os.path.join(REPORT_DIR, "DATA_CLEANUP_REPORT.md") -async def get_db_connection(): - url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/daarion") - return await asyncpg.connect(url) +SYSTEM_CANDIDATE_KINDS = ['infra', 'router', 'monitor', 'system'] +SYSTEM_CANDIDATE_NAMES = [ + 'daarwizz', 'helion', 'greenfood', 'clan', 'druid', 'eonarch', 'soul', + 'yaromir', 'core-monitor', 'exor', 'faye', 'helix', 'iris', 'sofia' +] -async def fetch_all(conn, query): - records = await conn.fetch(query) - return [dict(r) for r in records] - -async def scan_entities(): - conn = await get_db_connection() +async def main(): + print(f"Connecting to DB...") + # Use provided URL or default local try: - # 1. Fetch Nodes - # Note: node_registry is usually a separate service/db, but we might have cached nodes in node_cache table - # or we assume known nodes. For this script, let's query node_cache if available or just use hardcoded known nodes for validation - # Actually user said "Source of truth по нодах: node_registry". Assuming we can query node_registry.nodes table if it's in the same DB - # or use node_cache table which we used in previous tasks. - # Let's try to fetch from node_cache table. + conn = await asyncpg.connect(DATABASE_URL) + except Exception as e: + print(f"Failed to connect to DB: {e}") + return + + try: + print("Fetching data...") + + # 1. Nodes try: - nodes = await fetch_all(conn, "SELECT node_id, node_name, status FROM node_cache") + nodes = await conn.fetch("SELECT id, name, hostname, roles FROM node_registry.nodes") except asyncpg.UndefinedTableError: - print("Warning: node_cache table not found. Using empty list for nodes.") + print("Warning: node_registry.nodes table not found. Using empty list.") nodes = [] - known_node_ids = {n['node_id'] for n in nodes} - # Add hardcoded known nodes just in case cache is empty - known_node_ids.update({'node-1-hetzner-gex44', 'node-2-macbook-m4max'}) - - # 2. Fetch Agents - agents = await fetch_all(conn, """ - SELECT id, display_name, slug, node_id, kind, is_test, deleted_at - FROM agents - WHERE deleted_at IS NULL - """) - - # 3. Fetch MicroDAOs - microdaos = await fetch_all(conn, """ - SELECT id, name, slug, is_test, deleted_at - FROM microdaos - WHERE deleted_at IS NULL + node_ids = {str(n['id']) for n in nodes} + + # 2. MicroDAOs + # Check if table is microdao or microdaos + try: + table_name = "microdaos" + await conn.execute(f"SELECT 1 FROM {table_name} LIMIT 1") + except asyncpg.UndefinedTableError: + table_name = "microdao" + + print(f"Using microdao table: {table_name}") + + microdaos = await conn.fetch(f""" + SELECT m.id, m.slug, m.name, COUNT(ma.agent_id) as agent_count + FROM {table_name} m + LEFT JOIN microdao_agents ma ON ma.microdao_id = m.id + GROUP BY m.id, m.slug, m.name """) - # 4. Fetch MicroDAO Agents - memberships = await fetch_all(conn, "SELECT agent_id, microdao_id FROM microdao_agents") + # 3. Agents + agents = await conn.fetch(""" + SELECT a.id, a.display_name, a.kind, a.node_id, a.slug, a.is_public, + (SELECT COUNT(*) FROM microdao_agents ma WHERE ma.agent_id = a.id) as microdao_count + FROM agents a + """) - # Process Data - agent_microdao_map = {} - for m in memberships: - agent_id = m['agent_id'] - if agent_id not in agent_microdao_map: - agent_microdao_map[agent_id] = set() - agent_microdao_map[agent_id].add(m['microdao_id']) + # Analyze Agents + agent_report = [] + for a in agents: + agent_id = str(a['id']) + node_id = a['node_id'] - microdao_agent_count = {} - for m in memberships: - md_id = m['microdao_id'] - microdao_agent_count[md_id] = microdao_agent_count.get(md_id, 0) + 1 - - # Generate Report - report_lines = [] - report_lines.append("# Data Cleanup Report") - report_lines.append(f"Generated at: {asyncio.get_event_loop().time()}") # Placeholder for time - - report_lines.append("\n## Nodes Summary") - for node_id in known_node_ids: - report_lines.append(f"- {node_id}") - - report_lines.append("\n## Agents Analysis") - report_lines.append("| ID | Name | Node | Kind | MicroDAOs | Is Orphan | System Candidate | Is Test |") - report_lines.append("|---|---|---|---|---|---|---|---|") - - orphan_count = 0 - - for agent in agents: - a_id = agent['id'] - name = agent['display_name'] - node = agent['node_id'] - kind = agent['kind'] - slug = agent.get('slug') - - has_valid_node = node in known_node_ids - dao_count = len(agent_microdao_map.get(a_id, [])) - has_membership = dao_count > 0 - - is_orphan = not has_valid_node or not has_membership - if is_orphan: - orphan_count += 1 + has_valid_node = node_id in node_ids if node_id else False + # If no nodes found at all, we assume valid node check is skipped or failed + if not nodes: + has_valid_node = True - is_system = (kind in SYSTEM_CANDIDATE_KINDS) or \ - (name.lower() in SYSTEM_CANDIDATE_NAMES) or \ - (slug and slug.lower() in SYSTEM_CANDIDATE_NAMES) - - report_lines.append( - f"| {a_id} | {name} | {node} | {kind} | {dao_count} | {is_orphan} | {is_system} | {agent['is_test']} |" - ) - - report_lines.append("\n## MicroDAO Summary") - report_lines.append("| ID | Name | Slug | Agent Count | Suspicious (0 agents) | Is Test |") - report_lines.append("|---|---|---|---|---|---|") - - for md in microdaos: - md_id = md['id'] - count = microdao_agent_count.get(md_id, 0) - is_suspicious = count == 0 - report_lines.append( - f"| {md_id} | {md['name']} | {md['slug']} | {count} | {is_suspicious} | {md['is_test']} |" - ) - - # Output - report_content = "\n".join(report_lines) - print(report_content) - - os.makedirs("docs/internal/clean", exist_ok=True) - with open("docs/internal/clean/DATA_CLEANUP_REPORT.md", "w") as f: - f.write(report_content) + has_microdao_membership = a['microdao_count'] > 0 + is_orphan = not has_valid_node or not has_microdao_membership - print(f"\nReport saved to docs/internal/clean/DATA_CLEANUP_REPORT.md") - print(f"Total agents: {len(agents)}") - print(f"Orphan agents found: {orphan_count}") - + display_name = a['display_name'] or '' + slug = a['slug'] or '' + + is_system_candidate = ( + a['kind'] in SYSTEM_CANDIDATE_KINDS or + any(name in display_name.lower() for name in SYSTEM_CANDIDATE_NAMES) or + any(name in slug.lower() for name in SYSTEM_CANDIDATE_NAMES) + ) + + agent_report.append({ + "id": agent_id, + "name": display_name, + "slug": slug, + "kind": a['kind'], + "node_id": node_id, + "has_valid_node": has_valid_node, + "microdao_count": a['microdao_count'], + "is_orphan": is_orphan, + "is_system_candidate": is_system_candidate + }) + + # Generate Report + os.makedirs(REPORT_DIR, exist_ok=True) + + with open(REPORT_FILE, "w") as f: + f.write(f"# Data Cleanup Report\n") + f.write(f"Generated at: {datetime.now()}\n\n") + + f.write("## Nodes Summary\n") + f.write("| ID | Name | Hostname | Roles |\n") + f.write("|---|---|---|---|\n") + for n in nodes: + f.write(f"| {n['id']} | {n['name']} | {n['hostname']} | {n['roles']} |\n") + f.write("\n") + + f.write("## MicroDAO Summary\n") + f.write("| Name | Slug | Agents Count | Suspicious (0 agents) |\n") + f.write("|---|---|---|---|\n") + for m in microdaos: + suspicious = "⚠️ YES" if m['agent_count'] == 0 else "NO" + f.write(f"| {m['name']} | {m['slug']} | {m['agent_count']} | {suspicious} |\n") + f.write("\n") + + f.write("## Agents by Node\n") + f.write("| ID | Name | Slug | Kind | Node | Valid Node? | MicroDAOs | Orphan? | System? |\n") + f.write("|---|---|---|---|---|---|---|---|---|\n") + for a in agent_report: + orphan_mark = "⚠️ YES" if a['is_orphan'] else "NO" + system_mark = "✅ YES" if a['is_system_candidate'] else "NO" + valid_node_mark = "✅" if a['has_valid_node'] else "❌" + + f.write(f"| {a['id']} | {a['name']} | {a['slug']} | {a['kind']} | {a['node_id']} | {valid_node_mark} | {a['microdao_count']} | {orphan_mark} | {system_mark} |\n") + + print(f"Report generated: {REPORT_FILE}") + finally: await conn.close() if __name__ == "__main__": - asyncio.run(scan_entities()) - + asyncio.run(main())