import sqlite3 import json import uuid import os from typing import Any, Dict, Optional from src.infra.context import get_session_id from src.infra.logger import get_logger logger = get_logger(__name__) class PoiRepository: # ========================================================================= # 1. 設定絕對路徑 (Class Attributes) - 確保在 __init__ 之前就存在 # ========================================================================= # 抓出這個檔案 (poi_repository.py) 的絕對路徑: .../LifeFlow-AI/src/infra BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 往上推兩層回到專案根目錄: .../LifeFlow-AI # (src/infra -> src -> LifeFlow-AI) PROJECT_ROOT = os.path.dirname(os.path.dirname(BASE_DIR)) # 設定 DB 存放目錄: .../LifeFlow-AI/storage DB_DIR = os.path.join(PROJECT_ROOT, "storage") # 設定 DB 完整路徑: .../LifeFlow-AI/storage/lifeflow_payloads.db DB_FILE = "lifeflow_payloads.db" DB_PATH = os.path.join(DB_DIR, DB_FILE) # ========================================================================= # 2. 初始化邏輯 # ========================================================================= def __init__(self): # 1. 先確保資料夾存在 (這時候 self.DB_DIR 已經讀得到了) os.makedirs(self.DB_DIR, exist_ok=True) # 2. 再初始化 DB (這時候 self.DB_PATH 已經讀得到了) self._init_db() def _init_db(self): # 使用 self.DB_PATH with sqlite3.connect(self.DB_PATH) as conn: # 1. 既有的資料表 conn.execute(""" CREATE TABLE IF NOT EXISTS offloaded_data ( ref_id TEXT PRIMARY KEY, data_type TEXT, payload JSON, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 2. Session 狀態表 conn.execute(""" CREATE TABLE IF NOT EXISTS session_state ( session_id TEXT PRIMARY KEY, last_ref_id TEXT, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # ========================================================================= # 3. 讀寫操作 # ========================================================================= def save(self, data: Any, data_type: str = "generic") -> str: ref_id = f"{data_type}_{uuid.uuid4().hex[:8]}" with sqlite3.connect(self.DB_PATH) as conn: # 1. 寫入 Payload conn.execute( "INSERT INTO offloaded_data (ref_id, data_type, payload) VALUES (?, ?, ?)", (ref_id, data_type, json.dumps(data, default=str)) ) # 2. 更新 Session 的最後 ID current_session = get_session_id() if current_session: conn.execute(""" INSERT OR REPLACE INTO session_state (session_id, last_ref_id) VALUES (?, ?) """, (current_session, ref_id)) logger.info(f"💾 [Repo] Saved {ref_id} for Session: {current_session}") else: # 這裡改用 debug level,以免 log 太多雜訊 logger.debug(f"⚠️ [Repo] No session context found, 'last_id' not tracked.") return ref_id def load(self, ref_id: str) -> Optional[Any]: if not ref_id: return None # 容錯:如果路徑還沒建立 (理論上 init 已建立,但防呆) if not os.path.exists(self.DB_PATH): return None with sqlite3.connect(self.DB_PATH) as conn: cursor = conn.execute("SELECT payload FROM offloaded_data WHERE ref_id = ?", (ref_id,)) row = cursor.fetchone() if row: return json.loads(row[0]) return None def get_last_id_by_session(self, session_id: str) -> Optional[str]: if not os.path.exists(self.DB_PATH): return None with sqlite3.connect(self.DB_PATH) as conn: cursor = conn.execute("SELECT last_ref_id FROM session_state WHERE session_id = ?", (session_id,)) row = cursor.fetchone() if row: return row[0] return None # 全域單例 poi_repo = PoiRepository()