Files
Apple ef3473db21 snapshot: NODE1 production state 2026-02-09
Complete snapshot of /opt/microdao-daarion/ from NODE1 (144.76.224.179).
This represents the actual running production code that has diverged
significantly from the previous main branch.

Key changes from old main:
- Gateway (http_api.py): expanded from ~40KB to 164KB with full agent support
- Router: new /v1/agents/{id}/infer endpoint with vision + DeepSeek routing
- Behavior Policy: SOWA v2.2 (3-level: FULL/ACK/SILENT)
- Agent Registry: config/agent_registry.yml as single source of truth
- 13 agents configured (was 3)
- Memory service integration
- CrewAI teams and roles

Excluded from snapshot: venv/, .env, data/, backups, .tgz archives

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-09 08:46:46 -08:00

417 lines
14 KiB
Python

"""
Ingest Service
==============
Виділений сервіс для обробки вхідних файлів (фото, відео, аудіо, документи).
Функціонал:
- Прийом файлів з Gateway
- Валідація (розмір, тип, virus scan)
- Збереження в Object Storage
- Публікація події attachment.created в NATS
- Передача для подальшої обробки (STT, Vision, Parser)
"""
import os
import asyncio
import logging
import hashlib
import mimetypes
from datetime import datetime
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import uuid
from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
import httpx
logger = logging.getLogger(__name__)
# Configuration
MAX_FILE_SIZE = int(os.getenv("MAX_FILE_SIZE", 50 * 1024 * 1024)) # 50MB
STORAGE_PATH = os.getenv("STORAGE_PATH", "/data/uploads")
NATS_URL = os.getenv("NATS_URL", "nats://nats:4222")
SWAPPER_URL = os.getenv("SWAPPER_URL", "http://swapper-service:8890")
class FileType(str, Enum):
IMAGE = "image"
VIDEO = "video"
AUDIO = "audio"
DOCUMENT = "document"
UNKNOWN = "unknown"
class ProcessingStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class IngestResult:
file_id: str
file_type: FileType
original_name: str
size_bytes: int
mime_type: str
storage_path: str
content_hash: str
status: ProcessingStatus = ProcessingStatus.PENDING
processing_result: Optional[Dict] = None
error: Optional[str] = None
# MIME type mapping
MIME_TO_TYPE = {
"image/jpeg": FileType.IMAGE,
"image/png": FileType.IMAGE,
"image/gif": FileType.IMAGE,
"image/webp": FileType.IMAGE,
"video/mp4": FileType.VIDEO,
"video/webm": FileType.VIDEO,
"video/quicktime": FileType.VIDEO,
"audio/mpeg": FileType.AUDIO,
"audio/ogg": FileType.AUDIO,
"audio/wav": FileType.AUDIO,
"audio/webm": FileType.AUDIO,
"application/pdf": FileType.DOCUMENT,
"application/msword": FileType.DOCUMENT,
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": FileType.DOCUMENT,
"text/plain": FileType.DOCUMENT,
}
def detect_file_type(mime_type: str) -> FileType:
"""Визначає тип файлу за MIME"""
if mime_type in MIME_TO_TYPE:
return MIME_TO_TYPE[mime_type]
if mime_type.startswith("image/"):
return FileType.IMAGE
if mime_type.startswith("video/"):
return FileType.VIDEO
if mime_type.startswith("audio/"):
return FileType.AUDIO
return FileType.UNKNOWN
def compute_hash(content: bytes) -> str:
"""Обчислює SHA256 хеш контенту"""
return hashlib.sha256(content).hexdigest()
class IngestService:
"""
Основний сервіс для обробки вхідних файлів.
"""
def __init__(self):
self.storage_path = Path(STORAGE_PATH)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.http_client = httpx.AsyncClient(timeout=60.0)
self.nats_client = None
self.js = None # JetStream
async def connect_nats(self):
"""Підключення до NATS"""
try:
import nats
self.nats_client = await nats.connect(NATS_URL)
self.js = self.nats_client.jetstream()
logger.info("Connected to NATS JetStream")
except Exception as e:
logger.warning(f"NATS connection failed: {e}")
async def close(self):
"""Закриття з'єднань"""
if self.nats_client:
await self.nats_client.close()
await self.http_client.aclose()
async def ingest_file(self,
file: UploadFile,
user_id: str,
chat_id: str,
agent_id: str = "helion") -> IngestResult:
"""
Обробляє вхідний файл.
1. Валідація
2. Збереження
3. Публікація події
4. Запуск обробки
"""
# Read content
content = await file.read()
size = len(content)
# Validate size
if size > MAX_FILE_SIZE:
raise HTTPException(413, f"File too large: {size} > {MAX_FILE_SIZE}")
# Detect type
mime_type = file.content_type or mimetypes.guess_type(file.filename)[0] or "application/octet-stream"
file_type = detect_file_type(mime_type)
# Generate ID and hash
file_id = str(uuid.uuid4())
content_hash = compute_hash(content)
# Save to storage
ext = Path(file.filename).suffix if file.filename else ""
storage_name = f"{file_id}{ext}"
storage_path = self.storage_path / agent_id / storage_name
storage_path.parent.mkdir(parents=True, exist_ok=True)
with open(storage_path, "wb") as f:
f.write(content)
# Create result
result = IngestResult(
file_id=file_id,
file_type=file_type,
original_name=file.filename or "unknown",
size_bytes=size,
mime_type=mime_type,
storage_path=str(storage_path),
content_hash=content_hash,
)
# Publish event to NATS
await self._publish_event(result, user_id, chat_id, agent_id)
logger.info(f"Ingested file: {file_id} ({file_type.value}, {size} bytes)")
return result
async def _publish_event(self,
result: IngestResult,
user_id: str,
chat_id: str,
agent_id: str):
"""Публікує подію attachment.created в NATS"""
if not self.js:
logger.warning("NATS not connected, skipping event")
return
import json
event = {
"event_id": str(uuid.uuid4()),
"event_type": "attachment.created",
"timestamp": datetime.utcnow().isoformat(),
"file_id": result.file_id,
"file_type": result.file_type.value,
"mime_type": result.mime_type,
"size_bytes": result.size_bytes,
"content_hash": result.content_hash,
"storage_path": result.storage_path,
"user_id": user_id,
"chat_id": chat_id,
"agent_id": agent_id,
}
subject = f"attachment.created.{result.file_type.value}"
await self.js.publish(subject, json.dumps(event).encode())
logger.debug(f"Published event: {subject}")
async def process_file(self, result: IngestResult) -> Dict[str, Any]:
"""
Обробляє файл відповідно до його типу.
- IMAGE -> Vision (опис)
- AUDIO -> STT (транскрипція)
- DOCUMENT -> Parser (текст)
"""
result.status = ProcessingStatus.PROCESSING
try:
if result.file_type == FileType.IMAGE:
return await self._process_image(result)
elif result.file_type == FileType.AUDIO:
return await self._process_audio(result)
elif result.file_type == FileType.DOCUMENT:
return await self._process_document(result)
else:
return {"status": "skipped", "reason": "unknown type"}
except Exception as e:
result.status = ProcessingStatus.FAILED
result.error = str(e)
logger.error(f"Processing failed for {result.file_id}: {e}")
return {"status": "failed", "error": str(e)}
async def _process_image(self, result: IngestResult) -> Dict:
"""Обробляє зображення через Vision"""
try:
with open(result.storage_path, "rb") as f:
image_data = f.read()
import base64
b64_image = base64.b64encode(image_data).decode()
resp = await self.http_client.post(
f"{SWAPPER_URL}/vision",
json={
"image_base64": b64_image,
"prompt": "Опиши це зображення коротко українською"
},
timeout=30.0
)
if resp.status_code == 200:
data = resp.json()
result.status = ProcessingStatus.COMPLETED
result.processing_result = {
"description": data.get("description", data.get("text", "")),
"processed_at": datetime.utcnow().isoformat()
}
return result.processing_result
else:
raise Exception(f"Vision API error: {resp.status_code}")
except Exception as e:
logger.warning(f"Image processing failed: {e}")
raise
async def _process_audio(self, result: IngestResult) -> Dict:
"""Обробляє аудіо через STT"""
try:
with open(result.storage_path, "rb") as f:
audio_data = f.read()
import base64
b64_audio = base64.b64encode(audio_data).decode()
resp = await self.http_client.post(
f"{SWAPPER_URL}/stt",
json={
"audio_base64": b64_audio,
"language": "uk"
},
timeout=60.0
)
if resp.status_code == 200:
data = resp.json()
result.status = ProcessingStatus.COMPLETED
result.processing_result = {
"transcript": data.get("text", ""),
"language": data.get("language", "uk"),
"processed_at": datetime.utcnow().isoformat()
}
return result.processing_result
else:
raise Exception(f"STT API error: {resp.status_code}")
except Exception as e:
logger.warning(f"Audio processing failed: {e}")
raise
async def _process_document(self, result: IngestResult) -> Dict:
"""Обробляє документ через Parser"""
try:
resp = await self.http_client.post(
f"{SWAPPER_URL}/document",
json={
"file_path": result.storage_path,
"mime_type": result.mime_type
},
timeout=60.0
)
if resp.status_code == 200:
data = resp.json()
result.status = ProcessingStatus.COMPLETED
result.processing_result = {
"text": data.get("text", ""),
"pages": data.get("pages", 1),
"processed_at": datetime.utcnow().isoformat()
}
return result.processing_result
else:
raise Exception(f"Parser API error: {resp.status_code}")
except Exception as e:
logger.warning(f"Document processing failed: {e}")
raise
# ==================== FastAPI App ====================
app = FastAPI(title="Ingest Service", version="1.0.0")
ingest_service = IngestService()
@app.on_event("startup")
async def startup():
await ingest_service.connect_nats()
@app.on_event("shutdown")
async def shutdown():
await ingest_service.close()
@app.get("/health")
async def health():
return {
"status": "healthy",
"service": "ingest-service",
"storage_path": str(ingest_service.storage_path),
"nats_connected": ingest_service.nats_client is not None
}
@app.post("/ingest")
async def ingest_file(
file: UploadFile = File(...),
user_id: str = "unknown",
chat_id: str = "unknown",
agent_id: str = "helion",
background_tasks: BackgroundTasks = None
):
"""
Приймає файл для обробки.
1. Зберігає файл
2. Публікує подію в NATS
3. Запускає обробку в background
"""
result = await ingest_service.ingest_file(file, user_id, chat_id, agent_id)
# Process in background
if background_tasks:
background_tasks.add_task(ingest_service.process_file, result)
return {
"file_id": result.file_id,
"file_type": result.file_type.value,
"size_bytes": result.size_bytes,
"status": result.status.value,
"message": "File accepted for processing"
}
@app.post("/ingest/sync")
async def ingest_file_sync(
file: UploadFile = File(...),
user_id: str = "unknown",
chat_id: str = "unknown",
agent_id: str = "helion"
):
"""
Приймає та обробляє файл синхронно.
Повертає результат обробки.
"""
result = await ingest_service.ingest_file(file, user_id, chat_id, agent_id)
processing_result = await ingest_service.process_file(result)
return {
"file_id": result.file_id,
"file_type": result.file_type.value,
"size_bytes": result.size_bytes,
"status": result.status.value,
"result": processing_result
}
@app.get("/files/{file_id}")
async def get_file_status(file_id: str):
"""Повертає статус файлу"""
# TODO: Implement file registry lookup
return {"file_id": file_id, "status": "not_implemented"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8100)