## Agents Added - Alateya: R&D, biotech, innovations - Clan (Spirit): Community spirit agent - Eonarch: Consciousness evolution agent ## Changes - docker-compose.node1.yml: Added tokens for all 3 new agents - gateway-bot/http_api.py: Added configs and webhook endpoints - gateway-bot/clan_prompt.txt: New prompt file - gateway-bot/eonarch_prompt.txt: New prompt file ## Fixes - Fixed ROUTER_URL from :9102 to :8000 (internal container port) - All 9 Telegram agents now working ## Documentation - Created PROJECT-MASTER-INDEX.md - single entry point - Added various status documents and scripts Tokens configured: - Helion, NUTRA, Agromatrix (existing) - Alateya, Clan, Eonarch (new) - Druid, GreenFood, DAARWIZZ (configured)
308 lines
9.7 KiB
Python
308 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Qdrant Security Smoke Test
|
|
|
|
Verifies security invariants for canonical collection filters.
|
|
|
|
Usage:
|
|
python qdrant_smoke_test.py --host dagi-qdrant-node1
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add parent to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from services.memory.qdrant.filters import (
|
|
AccessContext,
|
|
FilterSecurityError,
|
|
build_qdrant_filter,
|
|
build_multi_agent_filter,
|
|
build_agent_only_filter,
|
|
)
|
|
|
|
|
|
def test_multi_agent_unauthorized_raises():
|
|
"""Test: non-admin requesting unauthorized agent_ids → error"""
|
|
print("\n[TEST 1] Multi-agent unauthorized access...")
|
|
|
|
ctx = AccessContext(
|
|
tenant_id="t_daarion",
|
|
team_id="team_core",
|
|
allowed_agent_ids=["agt_helion", "agt_nutra"], # Only these allowed
|
|
)
|
|
|
|
try:
|
|
# Try to access agt_druid which is NOT in allowed list
|
|
build_multi_agent_filter(ctx, agent_ids=["agt_helion", "agt_druid"])
|
|
print(" ❌ FAIL: Should have raised FilterSecurityError")
|
|
return False
|
|
except FilterSecurityError as e:
|
|
if "agt_druid" in str(e) and "Unauthorized" in str(e):
|
|
print(f" ✅ PASS: Correctly raised error: {e}")
|
|
return True
|
|
else:
|
|
print(f" ❌ FAIL: Wrong error message: {e}")
|
|
return False
|
|
|
|
|
|
def test_multi_agent_requires_allowlist():
|
|
"""Test: non-admin without allowed_agent_ids → error"""
|
|
print("\n[TEST 2] Multi-agent requires allowlist...")
|
|
|
|
ctx = AccessContext(
|
|
tenant_id="t_daarion",
|
|
team_id="team_core",
|
|
# No allowed_agent_ids!
|
|
)
|
|
|
|
try:
|
|
build_multi_agent_filter(ctx, agent_ids=["agt_helion"])
|
|
print(" ❌ FAIL: Should have raised FilterSecurityError")
|
|
return False
|
|
except FilterSecurityError as e:
|
|
if "allowed_agent_ids" in str(e):
|
|
print(f" ✅ PASS: Correctly raised error: {e}")
|
|
return True
|
|
else:
|
|
print(f" ❌ FAIL: Wrong error message: {e}")
|
|
return False
|
|
|
|
|
|
def test_admin_default_no_private():
|
|
"""Test: admin default does NOT include private"""
|
|
print("\n[TEST 3] Admin default excludes private...")
|
|
|
|
ctx = AccessContext(
|
|
tenant_id="t_daarion",
|
|
is_admin=True,
|
|
# No visibility specified, no include_private
|
|
)
|
|
|
|
result = build_qdrant_filter(ctx)
|
|
|
|
# Check should conditions
|
|
if "should" not in result:
|
|
print(" ❌ FAIL: No should in result")
|
|
return False
|
|
|
|
should = result["should"]
|
|
# Admin should have visibility filter with public+confidential only
|
|
visibility_cond = should[0].get("must", [{}])[0]
|
|
|
|
if visibility_cond.get("key") == "visibility":
|
|
vis_values = visibility_cond.get("match", {}).get("any", [])
|
|
if "private" in vis_values:
|
|
print(f" ❌ FAIL: Admin default includes private: {vis_values}")
|
|
return False
|
|
elif "public" in vis_values and "confidential" in vis_values:
|
|
print(f" ✅ PASS: Admin default is public+confidential: {vis_values}")
|
|
return True
|
|
|
|
print(f" ❌ FAIL: Unexpected filter structure: {should}")
|
|
return False
|
|
|
|
|
|
def test_admin_can_request_private():
|
|
"""Test: admin with include_private=True gets private"""
|
|
print("\n[TEST 4] Admin can explicitly request private...")
|
|
|
|
ctx = AccessContext(
|
|
tenant_id="t_daarion",
|
|
is_admin=True,
|
|
)
|
|
|
|
result = build_qdrant_filter(ctx, include_private=True)
|
|
|
|
should = result.get("should", [])
|
|
visibility_cond = should[0].get("must", [{}])[0] if should else {}
|
|
|
|
if visibility_cond.get("key") == "visibility":
|
|
vis_values = visibility_cond.get("match", {}).get("any", [])
|
|
if "private" in vis_values:
|
|
print(f" ✅ PASS: Admin with include_private gets private: {vis_values}")
|
|
return True
|
|
|
|
print(f" ❌ FAIL: Admin with include_private should see private: {result}")
|
|
return False
|
|
|
|
|
|
def test_owner_gets_private():
|
|
"""Test: owner with include_private=True gets own private"""
|
|
print("\n[TEST 5] Owner can access own private...")
|
|
|
|
ctx = AccessContext(
|
|
tenant_id="t_daarion",
|
|
team_id="team_core",
|
|
agent_id="agt_helion",
|
|
)
|
|
|
|
result = build_agent_only_filter(ctx, agent_id="agt_helion")
|
|
|
|
# Check that filter includes private for owner
|
|
should = result.get("should", [])
|
|
|
|
# Find condition that has visibility with private
|
|
has_private_for_owner = False
|
|
for cond in should:
|
|
must = cond.get("must", [])
|
|
has_owner_check = any(
|
|
c.get("key") == "owner_id" and c.get("match", {}).get("value") == "agt_helion"
|
|
for c in must
|
|
)
|
|
has_private = any(
|
|
c.get("key") == "visibility" and "private" in str(c.get("match", {}))
|
|
for c in must
|
|
)
|
|
if has_owner_check and has_private:
|
|
has_private_for_owner = True
|
|
break
|
|
|
|
if has_private_for_owner:
|
|
print(" ✅ PASS: Owner can access own private content")
|
|
return True
|
|
else:
|
|
print(f" ❌ FAIL: Owner should be able to access private: {should}")
|
|
return False
|
|
|
|
|
|
def test_tenant_always_required():
|
|
"""Test: tenant_id is always required"""
|
|
print("\n[TEST 6] Tenant ID always required...")
|
|
|
|
ctx = AccessContext(
|
|
tenant_id="", # Empty!
|
|
team_id="team_core",
|
|
)
|
|
|
|
try:
|
|
build_qdrant_filter(ctx)
|
|
print(" ❌ FAIL: Should have raised FilterSecurityError for empty tenant_id")
|
|
return False
|
|
except FilterSecurityError as e:
|
|
if "tenant_id" in str(e):
|
|
print(f" ✅ PASS: Correctly raised error: {e}")
|
|
return True
|
|
else:
|
|
print(f" ❌ FAIL: Wrong error: {e}")
|
|
return False
|
|
|
|
|
|
def test_qdrant_filter_format(host: str, port: int):
|
|
"""Test: generated filters work with actual Qdrant"""
|
|
print(f"\n[TEST 7] Qdrant filter format smoke test ({host}:{port})...")
|
|
|
|
try:
|
|
from qdrant_client import QdrantClient
|
|
except ImportError:
|
|
print(" ⚠️ SKIP: qdrant-client not installed")
|
|
return None
|
|
|
|
try:
|
|
client = QdrantClient(host=host, port=port, timeout=5)
|
|
|
|
# Get list of collections
|
|
collections = client.get_collections().collections
|
|
if not collections:
|
|
print(" ⚠️ SKIP: No collections in Qdrant")
|
|
return None
|
|
|
|
# Use first collection for smoke test
|
|
collection_name = collections[0].name
|
|
print(f" Using collection: {collection_name}")
|
|
|
|
# Build a filter
|
|
ctx = AccessContext(
|
|
tenant_id="t_daarion",
|
|
team_id="team_core",
|
|
allowed_agent_ids=["agt_helion", "agt_nutra"],
|
|
)
|
|
|
|
filter_dict = build_multi_agent_filter(
|
|
ctx,
|
|
agent_ids=["agt_helion", "agt_nutra"],
|
|
scope="docs"
|
|
)
|
|
|
|
# Try to search (we don't care about results, just that filter is valid)
|
|
# Create a dummy vector
|
|
info = client.get_collection(collection_name)
|
|
dim = info.config.params.vectors.size
|
|
dummy_vector = [0.0] * dim
|
|
|
|
from qdrant_client.models import Filter, FieldCondition, MatchValue, MatchAny
|
|
|
|
# Manual filter conversion for test
|
|
results = client.search(
|
|
collection_name=collection_name,
|
|
query_vector=dummy_vector,
|
|
limit=1,
|
|
query_filter=Filter(
|
|
must=[
|
|
FieldCondition(key="tenant_id", match=MatchValue(value="t_daarion")),
|
|
]
|
|
)
|
|
)
|
|
|
|
print(f" ✅ PASS: Qdrant accepts filter format (returned {len(results)} results)")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f" ❌ FAIL: Qdrant error: {e}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Qdrant Security Smoke Test")
|
|
parser.add_argument("--host", default=os.getenv("QDRANT_HOST", "localhost"))
|
|
parser.add_argument("--port", type=int, default=int(os.getenv("QDRANT_PORT", "6333")))
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 60)
|
|
print("QDRANT SECURITY SMOKE TEST")
|
|
print("=" * 60)
|
|
|
|
results = []
|
|
|
|
# Unit tests (no Qdrant needed)
|
|
results.append(("Multi-agent unauthorized", test_multi_agent_unauthorized_raises()))
|
|
results.append(("Multi-agent requires allowlist", test_multi_agent_requires_allowlist()))
|
|
results.append(("Admin default no private", test_admin_default_no_private()))
|
|
results.append(("Admin can request private", test_admin_can_request_private()))
|
|
results.append(("Owner gets private", test_owner_gets_private()))
|
|
results.append(("Tenant always required", test_tenant_always_required()))
|
|
|
|
# Integration test (needs Qdrant)
|
|
qdrant_result = test_qdrant_filter_format(args.host, args.port)
|
|
if qdrant_result is not None:
|
|
results.append(("Qdrant filter format", qdrant_result))
|
|
|
|
# Summary
|
|
print("\n" + "=" * 60)
|
|
print("SUMMARY")
|
|
print("=" * 60)
|
|
|
|
passed = sum(1 for _, r in results if r is True)
|
|
failed = sum(1 for _, r in results if r is False)
|
|
skipped = sum(1 for _, r in results if r is None)
|
|
|
|
for name, result in results:
|
|
status = "✅ PASS" if result is True else "❌ FAIL" if result is False else "⚠️ SKIP"
|
|
print(f" {status}: {name}")
|
|
|
|
print(f"\nTotal: {passed} passed, {failed} failed, {skipped} skipped")
|
|
|
|
if failed > 0:
|
|
print("\n❌ SMOKE TEST FAILED")
|
|
sys.exit(1)
|
|
else:
|
|
print("\n✅ SMOKE TEST PASSED - Ready for cutover")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|