"""SQLite DB for focus sessions and user settings.""" from __future__ import annotations import asyncio import json from datetime import datetime import aiosqlite def get_db_path() -> str: """Database file path from config or default.""" try: from config import get return get("app.db_path") or "focus_guard.db" except Exception: return "focus_guard.db" async def init_database(db_path: str | None = None) -> None: """Create focus_sessions, focus_events, user_settings tables if missing.""" path = db_path or get_db_path() async with aiosqlite.connect(path) as db: await db.execute(""" CREATE TABLE IF NOT EXISTS focus_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, start_time TIMESTAMP NOT NULL, end_time TIMESTAMP, duration_seconds INTEGER DEFAULT 0, focus_score REAL DEFAULT 0.0, total_frames INTEGER DEFAULT 0, focused_frames INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS focus_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id INTEGER NOT NULL, timestamp TIMESTAMP NOT NULL, is_focused BOOLEAN NOT NULL, confidence REAL NOT NULL, detection_data TEXT, FOREIGN KEY (session_id) REFERENCES focus_sessions (id) ) """) await db.execute(""" CREATE TABLE IF NOT EXISTS user_settings ( id INTEGER PRIMARY KEY CHECK (id = 1), model_name TEXT DEFAULT 'mlp' ) """) await db.execute(""" INSERT OR IGNORE INTO user_settings (id, model_name) VALUES (1, 'mlp') """) await db.commit() async def create_session(db_path: str | None = None) -> int: """Insert a new focus session. Returns session id.""" path = db_path or get_db_path() async with aiosqlite.connect(path) as db: cursor = await db.execute( "INSERT INTO focus_sessions (start_time) VALUES (?)", (datetime.now().isoformat(),), ) await db.commit() return cursor.lastrowid async def end_session(session_id: int, db_path: str | None = None) -> dict | None: """Close session and return summary (duration, focus_score, etc.).""" path = db_path or get_db_path() async with aiosqlite.connect(path) as db: cursor = await db.execute( "SELECT start_time, total_frames, focused_frames FROM focus_sessions WHERE id = ?", (session_id,), ) row = await cursor.fetchone() if not row: return None start_time_str, total_frames, focused_frames = row start_time = datetime.fromisoformat(start_time_str) end_time = datetime.now() duration = (end_time - start_time).total_seconds() focus_score = focused_frames / total_frames if total_frames > 0 else 0.0 async with aiosqlite.connect(path) as db: await db.execute(""" UPDATE focus_sessions SET end_time = ?, duration_seconds = ?, focus_score = ? WHERE id = ? """, (end_time.isoformat(), int(duration), focus_score, session_id)) await db.commit() return { "session_id": session_id, "start_time": start_time_str, "end_time": end_time.isoformat(), "duration_seconds": int(duration), "focus_score": round(focus_score, 3), "total_frames": total_frames, "focused_frames": focused_frames, } async def store_focus_event( session_id: int, is_focused: bool, confidence: float, metadata: dict, db_path: str | None = None, ) -> None: """Append one focus event and update session counters.""" path = db_path or get_db_path() async with aiosqlite.connect(path) as db: await db.execute(""" INSERT INTO focus_events (session_id, timestamp, is_focused, confidence, detection_data) VALUES (?, ?, ?, ?, ?) """, (session_id, datetime.now().isoformat(), is_focused, confidence, json.dumps(metadata))) await db.execute(""" UPDATE focus_sessions SET total_frames = total_frames + 1, focused_frames = focused_frames + ? WHERE id = ? """, (1 if is_focused else 0, session_id)) await db.commit() class EventBuffer: """Buffer focus events and flush to DB in batches to avoid per-frame writes.""" def __init__(self, db_path: str | None = None, flush_interval: float = 2.0): self._db_path = db_path or get_db_path() self._flush_interval = flush_interval self._buf: list = [] self._lock = asyncio.Lock() self._task: asyncio.Task | None = None self._total_frames = 0 self._focused_frames = 0 def start(self) -> None: if self._task is None: self._task = asyncio.create_task(self._flush_loop()) async def stop(self) -> None: if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None await self._flush() def add(self, session_id: int, is_focused: bool, confidence: float, metadata: dict) -> None: self._buf.append(( session_id, datetime.now().isoformat(), is_focused, confidence, json.dumps(metadata), )) self._total_frames += 1 if is_focused: self._focused_frames += 1 async def _flush_loop(self) -> None: while True: await asyncio.sleep(self._flush_interval) await self._flush() async def _flush(self) -> None: async with self._lock: if not self._buf: return batch = self._buf[:] total = self._total_frames focused = self._focused_frames self._buf.clear() self._total_frames = 0 self._focused_frames = 0 if not batch: return session_id = batch[0][0] try: async with aiosqlite.connect(self._db_path) as db: await db.executemany(""" INSERT INTO focus_events (session_id, timestamp, is_focused, confidence, detection_data) VALUES (?, ?, ?, ?, ?) """, batch) await db.execute(""" UPDATE focus_sessions SET total_frames = total_frames + ?, focused_frames = focused_frames + ? WHERE id = ? """, (total, focused, session_id)) await db.commit() except Exception as e: import logging logging.getLogger(__name__).warning("DB flush error: %s", e)