""" Repository: thin async wrapper over SQLAlchemy for read/write. """ from __future__ import annotations import json from typing import Optional from sqlalchemy import desc, select from app.db.schema import ( BookSnapshotRecord, QuoteRecord, TradeRecord, async_session, ) from app.domain.events import BookL2Event, QuoteEvent, TradeEvent async def save_trade(event: TradeEvent) -> None: async with async_session() as session: record = TradeRecord( provider=event.provider, symbol=event.symbol, price=event.price, size=event.size, side=event.side, trade_id=event.trade_id, ts_exchange=event.ts_exchange, ts_recv=event.ts_recv, ) session.add(record) await session.commit() async def save_quote(event: QuoteEvent) -> None: async with async_session() as session: record = QuoteRecord( provider=event.provider, symbol=event.symbol, bid=event.bid, ask=event.ask, bid_size=event.bid_size, ask_size=event.ask_size, ts_exchange=event.ts_exchange, ts_recv=event.ts_recv, ) session.add(record) await session.commit() async def save_book_snapshot(event: BookL2Event) -> None: async with async_session() as session: record = BookSnapshotRecord( provider=event.provider, symbol=event.symbol, bids_json=json.dumps([{"price": b.price, "size": b.size} for b in event.bids]), asks_json=json.dumps([{"price": a.price, "size": a.size} for a in event.asks]), depth=max(len(event.bids), len(event.asks)), ts_exchange=event.ts_exchange, ts_recv=event.ts_recv, ) session.add(record) await session.commit() async def get_latest_trade(symbol: str) -> Optional[dict]: async with async_session() as session: stmt = ( select(TradeRecord) .where(TradeRecord.symbol == symbol.upper()) .order_by(desc(TradeRecord.ts_recv)) .limit(1) ) result = await session.execute(stmt) row = result.scalar_one_or_none() if row is None: return None return { "symbol": row.symbol, "price": row.price, "size": row.size, "side": row.side, "provider": row.provider, "ts_recv": row.ts_recv.isoformat() if row.ts_recv else None, "ts_exchange": row.ts_exchange.isoformat() if row.ts_exchange else None, } async def get_latest_quote(symbol: str) -> Optional[dict]: async with async_session() as session: stmt = ( select(QuoteRecord) .where(QuoteRecord.symbol == symbol.upper()) .order_by(desc(QuoteRecord.ts_recv)) .limit(1) ) result = await session.execute(stmt) row = result.scalar_one_or_none() if row is None: return None return { "symbol": row.symbol, "bid": row.bid, "ask": row.ask, "bid_size": row.bid_size, "ask_size": row.ask_size, "provider": row.provider, "ts_recv": row.ts_recv.isoformat() if row.ts_recv else None, }