Files
microdao-daarion/services/living-map-service/repository_history.py
Apple 3de3c8cb36 feat: Add presence heartbeat for Matrix online status
- matrix-gateway: POST /internal/matrix/presence/online endpoint
- usePresenceHeartbeat hook with activity tracking
- Auto away after 5 min inactivity
- Offline on page close/visibility change
- Integrated in MatrixChatRoom component
2025-11-27 00:19:40 -08:00

117 lines
3.5 KiB
Python

"""
History Repository — Database operations for Living Map events
Phase 9: Living Map
"""
import uuid
from typing import List, Optional
from datetime import datetime
import asyncpg
from models import HistoryItem, HistoryQueryParams
class HistoryRepository:
def __init__(self, db_pool: asyncpg.Pool):
self.db = db_pool
async def add_event(
self,
event_type: str,
payload: dict,
source_service: Optional[str] = None,
entity_id: Optional[str] = None,
entity_type: Optional[str] = None
) -> str:
"""Add event to history"""
event_id = uuid.uuid4()
await self.db.execute(
"""
INSERT INTO living_map_history (
id, event_type, payload, source_service, entity_id, entity_type
)
VALUES ($1, $2, $3, $4, $5, $6)
""",
event_id,
event_type,
payload,
source_service,
entity_id,
entity_type
)
return str(event_id)
async def query_history(
self,
params: HistoryQueryParams
) -> tuple[List[HistoryItem], int]:
"""Query history with filters"""
conditions = []
values = []
param_idx = 1
if params.since:
conditions.append(f"timestamp >= ${param_idx}")
values.append(params.since)
param_idx += 1
if params.until:
conditions.append(f"timestamp <= ${param_idx}")
values.append(params.until)
param_idx += 1
if params.event_type:
conditions.append(f"event_type = ${param_idx}")
values.append(params.event_type)
param_idx += 1
if params.entity_id:
conditions.append(f"entity_id = ${param_idx}")
values.append(params.entity_id)
param_idx += 1
where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
# Get total count
count_query = f"SELECT COUNT(*) FROM living_map_history {where_clause}"
total = await self.db.fetchval(count_query, *values)
# Get paginated results
values.extend([params.limit, params.offset])
query = f"""
SELECT id, timestamp, event_type, payload, source_service, entity_id, entity_type
FROM living_map_history
{where_clause}
ORDER BY timestamp DESC
LIMIT ${param_idx} OFFSET ${param_idx + 1}
"""
rows = await self.db.fetch(query, *values)
items = [
HistoryItem(
id=str(row['id']),
timestamp=row['timestamp'],
event_type=row['event_type'],
payload=row['payload'],
source_service=row['source_service'],
entity_id=row['entity_id'],
entity_type=row['entity_type']
)
for row in rows
]
return items, total or 0
async def cleanup_old_events(self, days: int = 30) -> int:
"""Cleanup events older than N days"""
result = await self.db.execute(
"""
DELETE FROM living_map_history
WHERE timestamp < NOW() - INTERVAL '%s days'
""",
days
)
# Extract count from result string like "DELETE 123"
return int(result.split()[-1]) if result else 0