final_v2 / api /db.py
k22056537
feat: sync integration updates across app and ML pipeline
eb4abb8
"""SQLite DB for focus sessions and user settings."""
from __future__ import annotations
import asyncio
import json
from datetime import datetime
import aiosqlite
def get_db_path() -> str:
"""Database file path from config or default."""
try:
from config import get
return get("app.db_path") or "focus_guard.db"
except Exception:
return "focus_guard.db"
async def init_database(db_path: str | None = None) -> None:
"""Create focus_sessions, focus_events, user_settings tables if missing."""
path = db_path or get_db_path()
async with aiosqlite.connect(path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS focus_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
start_time TIMESTAMP NOT NULL,
end_time TIMESTAMP,
duration_seconds INTEGER DEFAULT 0,
focus_score REAL DEFAULT 0.0,
total_frames INTEGER DEFAULT 0,
focused_frames INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS focus_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
timestamp TIMESTAMP NOT NULL,
is_focused BOOLEAN NOT NULL,
confidence REAL NOT NULL,
detection_data TEXT,
FOREIGN KEY (session_id) REFERENCES focus_sessions (id)
)
""")
await db.execute("""
CREATE TABLE IF NOT EXISTS user_settings (
id INTEGER PRIMARY KEY CHECK (id = 1),
model_name TEXT DEFAULT 'mlp'
)
""")
await db.execute("""
INSERT OR IGNORE INTO user_settings (id, model_name)
VALUES (1, 'mlp')
""")
await db.commit()
async def create_session(db_path: str | None = None) -> int:
"""Insert a new focus session. Returns session id."""
path = db_path or get_db_path()
async with aiosqlite.connect(path) as db:
cursor = await db.execute(
"INSERT INTO focus_sessions (start_time) VALUES (?)",
(datetime.now().isoformat(),),
)
await db.commit()
return cursor.lastrowid
async def end_session(session_id: int, db_path: str | None = None) -> dict | None:
"""Close session and return summary (duration, focus_score, etc.)."""
path = db_path or get_db_path()
async with aiosqlite.connect(path) as db:
cursor = await db.execute(
"SELECT start_time, total_frames, focused_frames FROM focus_sessions WHERE id = ?",
(session_id,),
)
row = await cursor.fetchone()
if not row:
return None
start_time_str, total_frames, focused_frames = row
start_time = datetime.fromisoformat(start_time_str)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
focus_score = focused_frames / total_frames if total_frames > 0 else 0.0
async with aiosqlite.connect(path) as db:
await db.execute("""
UPDATE focus_sessions
SET end_time = ?, duration_seconds = ?, focus_score = ?
WHERE id = ?
""", (end_time.isoformat(), int(duration), focus_score, session_id))
await db.commit()
return {
"session_id": session_id,
"start_time": start_time_str,
"end_time": end_time.isoformat(),
"duration_seconds": int(duration),
"focus_score": round(focus_score, 3),
"total_frames": total_frames,
"focused_frames": focused_frames,
}
async def store_focus_event(
session_id: int,
is_focused: bool,
confidence: float,
metadata: dict,
db_path: str | None = None,
) -> None:
"""Append one focus event and update session counters."""
path = db_path or get_db_path()
async with aiosqlite.connect(path) as db:
await db.execute("""
INSERT INTO focus_events (session_id, timestamp, is_focused, confidence, detection_data)
VALUES (?, ?, ?, ?, ?)
""", (session_id, datetime.now().isoformat(), is_focused, confidence, json.dumps(metadata)))
await db.execute("""
UPDATE focus_sessions
SET total_frames = total_frames + 1,
focused_frames = focused_frames + ?
WHERE id = ?
""", (1 if is_focused else 0, session_id))
await db.commit()
class EventBuffer:
"""Buffer focus events and flush to DB in batches to avoid per-frame writes."""
def __init__(self, db_path: str | None = None, flush_interval: float = 2.0):
self._db_path = db_path or get_db_path()
self._flush_interval = flush_interval
self._buf: list = []
self._lock = asyncio.Lock()
self._task: asyncio.Task | None = None
self._total_frames = 0
self._focused_frames = 0
def start(self) -> None:
if self._task is None:
self._task = asyncio.create_task(self._flush_loop())
async def stop(self) -> None:
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
await self._flush()
def add(self, session_id: int, is_focused: bool, confidence: float, metadata: dict) -> None:
self._buf.append((
session_id,
datetime.now().isoformat(),
is_focused,
confidence,
json.dumps(metadata),
))
self._total_frames += 1
if is_focused:
self._focused_frames += 1
async def _flush_loop(self) -> None:
while True:
await asyncio.sleep(self._flush_interval)
await self._flush()
async def _flush(self) -> None:
async with self._lock:
if not self._buf:
return
batch = self._buf[:]
total = self._total_frames
focused = self._focused_frames
self._buf.clear()
self._total_frames = 0
self._focused_frames = 0
if not batch:
return
session_id = batch[0][0]
try:
async with aiosqlite.connect(self._db_path) as db:
await db.executemany("""
INSERT INTO focus_events (session_id, timestamp, is_focused, confidence, detection_data)
VALUES (?, ?, ?, ?, ?)
""", batch)
await db.execute("""
UPDATE focus_sessions
SET total_frames = total_frames + ?,
focused_frames = focused_frames + ?
WHERE id = ?
""", (total, focused, session_id))
await db.commit()
except Exception as e:
import logging
logging.getLogger(__name__).warning("DB flush error: %s", e)