Files
microdao-daarion/create_stream.py
Apple 4601c6fca8 feat: add Vision Encoder service + Vision RAG implementation
- Vision Encoder Service (OpenCLIP ViT-L/14, GPU-accelerated)
  - FastAPI app with text/image embedding endpoints (768-dim)
  - Docker support with NVIDIA GPU runtime
  - Port 8001, health checks, model info API

- Qdrant Vector Database integration
  - Port 6333/6334 (HTTP/gRPC)
  - Image embeddings storage (768-dim, Cosine distance)
  - Auto collection creation

- Vision RAG implementation
  - VisionEncoderClient (Python client for API)
  - Image Search module (text-to-image, image-to-image)
  - Vision RAG routing in DAGI Router (mode: image_search)
  - VisionEncoderProvider integration

- Documentation (5000+ lines)
  - SYSTEM-INVENTORY.md - Complete system inventory
  - VISION-ENCODER-STATUS.md - Service status
  - VISION-RAG-IMPLEMENTATION.md - Implementation details
  - vision_encoder_deployment_task.md - Deployment checklist
  - services/vision-encoder/README.md - Deployment guide
  - Updated WARP.md, INFRASTRUCTURE.md, Jupyter Notebook

- Testing
  - test-vision-encoder.sh - Smoke tests (6 tests)
  - Unit tests for client, image search, routing

- Services: 17 total (added Vision Encoder + Qdrant)
- AI Models: 3 (qwen3:8b, OpenCLIP ViT-L/14, BAAI/bge-m3)
- GPU Services: 2 (Vision Encoder, Ollama)
- VRAM Usage: ~10 GB (concurrent)

Status: Production Ready 
2025-11-17 05:24:36 -08:00

53 lines
1.5 KiB
Python

import asyncio
import nats
import json
async def main():
# Connect to NATS
nc = await nats.connect('nats://localhost:4222')
print("Connected to NATS")
# Get JetStream context
js = nc.jetstream()
print("Got JetStream context")
# Create STREAM_RAG
try:
stream_config = {
"name": "STREAM_RAG",
"description": "Stream for RAG ingestion events",
"subjects": ["parser.document.parsed", "rag.document.ingested", "rag.document.indexed"],
"retention": "workqueue",
"storage": "file",
"replicas": 3,
"max_bytes": -1,
"max_age": 0,
"max_msgs": -1
}
await js.add_stream(
name="STREAM_RAG",
subjects=stream_config["subjects"],
retention=nats.RetentionPolicy.WORK_QUEUE,
storage=nats.StorageType.FILE,
replicas=3
)
print("STREAM_RAG created successfully")
# Verify stream exists
streams = await js.streams_info()
for stream in streams:
if stream.config.name == "STREAM_RAG":
print(f"Verified STREAM_RAG: {stream.config.name}")
print(f"Subjects: {stream.config.subjects}")
return
print("STREAM_RAG created but not verified")
except Exception as e:
print(f"Error creating stream: {e}")
# Close connection
await nc.close()
if __name__ == "__main__":
asyncio.run(main())