Files
microdao-daarion/scripts/qdrant_smoke_test.py
Apple 0c8bef82f4 feat: Add Alateya, Clan, Eonarch agents + fix gateway-router connection
## Agents Added
- Alateya: R&D, biotech, innovations
- Clan (Spirit): Community spirit agent
- Eonarch: Consciousness evolution agent

## Changes
- docker-compose.node1.yml: Added tokens for all 3 new agents
- gateway-bot/http_api.py: Added configs and webhook endpoints
- gateway-bot/clan_prompt.txt: New prompt file
- gateway-bot/eonarch_prompt.txt: New prompt file

## Fixes
- Fixed ROUTER_URL from :9102 to :8000 (internal container port)
- All 9 Telegram agents now working

## Documentation
- Created PROJECT-MASTER-INDEX.md - single entry point
- Added various status documents and scripts

Tokens configured:
- Helion, NUTRA, Agromatrix (existing)
- Alateya, Clan, Eonarch (new)
- Druid, GreenFood, DAARWIZZ (configured)
2026-01-28 06:40:34 -08:00

308 lines
9.7 KiB
Python

#!/usr/bin/env python3
"""
Qdrant Security Smoke Test
Verifies security invariants for canonical collection filters.
Usage:
python qdrant_smoke_test.py --host dagi-qdrant-node1
"""
import argparse
import os
import sys
from pathlib import Path
# Add parent to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from services.memory.qdrant.filters import (
AccessContext,
FilterSecurityError,
build_qdrant_filter,
build_multi_agent_filter,
build_agent_only_filter,
)
def test_multi_agent_unauthorized_raises():
"""Test: non-admin requesting unauthorized agent_ids → error"""
print("\n[TEST 1] Multi-agent unauthorized access...")
ctx = AccessContext(
tenant_id="t_daarion",
team_id="team_core",
allowed_agent_ids=["agt_helion", "agt_nutra"], # Only these allowed
)
try:
# Try to access agt_druid which is NOT in allowed list
build_multi_agent_filter(ctx, agent_ids=["agt_helion", "agt_druid"])
print(" ❌ FAIL: Should have raised FilterSecurityError")
return False
except FilterSecurityError as e:
if "agt_druid" in str(e) and "Unauthorized" in str(e):
print(f" ✅ PASS: Correctly raised error: {e}")
return True
else:
print(f" ❌ FAIL: Wrong error message: {e}")
return False
def test_multi_agent_requires_allowlist():
"""Test: non-admin without allowed_agent_ids → error"""
print("\n[TEST 2] Multi-agent requires allowlist...")
ctx = AccessContext(
tenant_id="t_daarion",
team_id="team_core",
# No allowed_agent_ids!
)
try:
build_multi_agent_filter(ctx, agent_ids=["agt_helion"])
print(" ❌ FAIL: Should have raised FilterSecurityError")
return False
except FilterSecurityError as e:
if "allowed_agent_ids" in str(e):
print(f" ✅ PASS: Correctly raised error: {e}")
return True
else:
print(f" ❌ FAIL: Wrong error message: {e}")
return False
def test_admin_default_no_private():
"""Test: admin default does NOT include private"""
print("\n[TEST 3] Admin default excludes private...")
ctx = AccessContext(
tenant_id="t_daarion",
is_admin=True,
# No visibility specified, no include_private
)
result = build_qdrant_filter(ctx)
# Check should conditions
if "should" not in result:
print(" ❌ FAIL: No should in result")
return False
should = result["should"]
# Admin should have visibility filter with public+confidential only
visibility_cond = should[0].get("must", [{}])[0]
if visibility_cond.get("key") == "visibility":
vis_values = visibility_cond.get("match", {}).get("any", [])
if "private" in vis_values:
print(f" ❌ FAIL: Admin default includes private: {vis_values}")
return False
elif "public" in vis_values and "confidential" in vis_values:
print(f" ✅ PASS: Admin default is public+confidential: {vis_values}")
return True
print(f" ❌ FAIL: Unexpected filter structure: {should}")
return False
def test_admin_can_request_private():
"""Test: admin with include_private=True gets private"""
print("\n[TEST 4] Admin can explicitly request private...")
ctx = AccessContext(
tenant_id="t_daarion",
is_admin=True,
)
result = build_qdrant_filter(ctx, include_private=True)
should = result.get("should", [])
visibility_cond = should[0].get("must", [{}])[0] if should else {}
if visibility_cond.get("key") == "visibility":
vis_values = visibility_cond.get("match", {}).get("any", [])
if "private" in vis_values:
print(f" ✅ PASS: Admin with include_private gets private: {vis_values}")
return True
print(f" ❌ FAIL: Admin with include_private should see private: {result}")
return False
def test_owner_gets_private():
"""Test: owner with include_private=True gets own private"""
print("\n[TEST 5] Owner can access own private...")
ctx = AccessContext(
tenant_id="t_daarion",
team_id="team_core",
agent_id="agt_helion",
)
result = build_agent_only_filter(ctx, agent_id="agt_helion")
# Check that filter includes private for owner
should = result.get("should", [])
# Find condition that has visibility with private
has_private_for_owner = False
for cond in should:
must = cond.get("must", [])
has_owner_check = any(
c.get("key") == "owner_id" and c.get("match", {}).get("value") == "agt_helion"
for c in must
)
has_private = any(
c.get("key") == "visibility" and "private" in str(c.get("match", {}))
for c in must
)
if has_owner_check and has_private:
has_private_for_owner = True
break
if has_private_for_owner:
print(" ✅ PASS: Owner can access own private content")
return True
else:
print(f" ❌ FAIL: Owner should be able to access private: {should}")
return False
def test_tenant_always_required():
"""Test: tenant_id is always required"""
print("\n[TEST 6] Tenant ID always required...")
ctx = AccessContext(
tenant_id="", # Empty!
team_id="team_core",
)
try:
build_qdrant_filter(ctx)
print(" ❌ FAIL: Should have raised FilterSecurityError for empty tenant_id")
return False
except FilterSecurityError as e:
if "tenant_id" in str(e):
print(f" ✅ PASS: Correctly raised error: {e}")
return True
else:
print(f" ❌ FAIL: Wrong error: {e}")
return False
def test_qdrant_filter_format(host: str, port: int):
"""Test: generated filters work with actual Qdrant"""
print(f"\n[TEST 7] Qdrant filter format smoke test ({host}:{port})...")
try:
from qdrant_client import QdrantClient
except ImportError:
print(" ⚠️ SKIP: qdrant-client not installed")
return None
try:
client = QdrantClient(host=host, port=port, timeout=5)
# Get list of collections
collections = client.get_collections().collections
if not collections:
print(" ⚠️ SKIP: No collections in Qdrant")
return None
# Use first collection for smoke test
collection_name = collections[0].name
print(f" Using collection: {collection_name}")
# Build a filter
ctx = AccessContext(
tenant_id="t_daarion",
team_id="team_core",
allowed_agent_ids=["agt_helion", "agt_nutra"],
)
filter_dict = build_multi_agent_filter(
ctx,
agent_ids=["agt_helion", "agt_nutra"],
scope="docs"
)
# Try to search (we don't care about results, just that filter is valid)
# Create a dummy vector
info = client.get_collection(collection_name)
dim = info.config.params.vectors.size
dummy_vector = [0.0] * dim
from qdrant_client.models import Filter, FieldCondition, MatchValue, MatchAny
# Manual filter conversion for test
results = client.search(
collection_name=collection_name,
query_vector=dummy_vector,
limit=1,
query_filter=Filter(
must=[
FieldCondition(key="tenant_id", match=MatchValue(value="t_daarion")),
]
)
)
print(f" ✅ PASS: Qdrant accepts filter format (returned {len(results)} results)")
return True
except Exception as e:
print(f" ❌ FAIL: Qdrant error: {e}")
return False
def main():
parser = argparse.ArgumentParser(description="Qdrant Security Smoke Test")
parser.add_argument("--host", default=os.getenv("QDRANT_HOST", "localhost"))
parser.add_argument("--port", type=int, default=int(os.getenv("QDRANT_PORT", "6333")))
args = parser.parse_args()
print("=" * 60)
print("QDRANT SECURITY SMOKE TEST")
print("=" * 60)
results = []
# Unit tests (no Qdrant needed)
results.append(("Multi-agent unauthorized", test_multi_agent_unauthorized_raises()))
results.append(("Multi-agent requires allowlist", test_multi_agent_requires_allowlist()))
results.append(("Admin default no private", test_admin_default_no_private()))
results.append(("Admin can request private", test_admin_can_request_private()))
results.append(("Owner gets private", test_owner_gets_private()))
results.append(("Tenant always required", test_tenant_always_required()))
# Integration test (needs Qdrant)
qdrant_result = test_qdrant_filter_format(args.host, args.port)
if qdrant_result is not None:
results.append(("Qdrant filter format", qdrant_result))
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
passed = sum(1 for _, r in results if r is True)
failed = sum(1 for _, r in results if r is False)
skipped = sum(1 for _, r in results if r is None)
for name, result in results:
status = "✅ PASS" if result is True else "❌ FAIL" if result is False else "⚠️ SKIP"
print(f" {status}: {name}")
print(f"\nTotal: {passed} passed, {failed} failed, {skipped} skipped")
if failed > 0:
print("\n❌ SMOKE TEST FAILED")
sys.exit(1)
else:
print("\n✅ SMOKE TEST PASSED - Ready for cutover")
sys.exit(0)
if __name__ == "__main__":
main()