Spaces:
Running
Running
| import sqlite3 | |
| import json | |
| import uuid | |
| import os | |
| from typing import Any, Dict, Optional | |
| from src.infra.context import get_session_id | |
| from src.infra.logger import get_logger | |
| logger = get_logger(__name__) | |
| class PoiRepository: | |
| # ========================================================================= | |
| # 1. 設定絕對路徑 (Class Attributes) - 確保在 __init__ 之前就存在 | |
| # ========================================================================= | |
| # 抓出這個檔案 (poi_repository.py) 的絕對路徑: .../LifeFlow-AI/src/infra | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| # 往上推兩層回到專案根目錄: .../LifeFlow-AI | |
| # (src/infra -> src -> LifeFlow-AI) | |
| PROJECT_ROOT = os.path.dirname(os.path.dirname(BASE_DIR)) | |
| # 設定 DB 存放目錄: .../LifeFlow-AI/storage | |
| DB_DIR = os.path.join(PROJECT_ROOT, "storage") | |
| # 設定 DB 完整路徑: .../LifeFlow-AI/storage/lifeflow_payloads.db | |
| DB_FILE = "lifeflow_payloads.db" | |
| DB_PATH = os.path.join(DB_DIR, DB_FILE) | |
| # ========================================================================= | |
| # 2. 初始化邏輯 | |
| # ========================================================================= | |
| def __init__(self): | |
| # 1. 先確保資料夾存在 (這時候 self.DB_DIR 已經讀得到了) | |
| os.makedirs(self.DB_DIR, exist_ok=True) | |
| # 2. 再初始化 DB (這時候 self.DB_PATH 已經讀得到了) | |
| self._init_db() | |
| def _init_db(self): | |
| # 使用 self.DB_PATH | |
| with sqlite3.connect(self.DB_PATH) as conn: | |
| # 1. 既有的資料表 | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS offloaded_data ( | |
| ref_id TEXT PRIMARY KEY, | |
| data_type TEXT, | |
| payload JSON, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # 2. Session 狀態表 | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS session_state ( | |
| session_id TEXT PRIMARY KEY, | |
| last_ref_id TEXT, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # ========================================================================= | |
| # 3. 讀寫操作 | |
| # ========================================================================= | |
| def save(self, data: Any, data_type: str = "generic") -> str: | |
| ref_id = f"{data_type}_{uuid.uuid4().hex[:8]}" | |
| with sqlite3.connect(self.DB_PATH) as conn: | |
| # 1. 寫入 Payload | |
| conn.execute( | |
| "INSERT INTO offloaded_data (ref_id, data_type, payload) VALUES (?, ?, ?)", | |
| (ref_id, data_type, json.dumps(data, default=str)) | |
| ) | |
| # 2. 更新 Session 的最後 ID | |
| current_session = get_session_id() | |
| if current_session: | |
| conn.execute(""" | |
| INSERT OR REPLACE INTO session_state (session_id, last_ref_id) | |
| VALUES (?, ?) | |
| """, (current_session, ref_id)) | |
| logger.info(f"💾 [Repo] Saved {ref_id} for Session: {current_session}") | |
| else: | |
| # 這裡改用 debug level,以免 log 太多雜訊 | |
| logger.debug(f"⚠️ [Repo] No session context found, 'last_id' not tracked.") | |
| return ref_id | |
| def load(self, ref_id: str) -> Optional[Any]: | |
| if not ref_id: return None | |
| # 容錯:如果路徑還沒建立 (理論上 init 已建立,但防呆) | |
| if not os.path.exists(self.DB_PATH): | |
| return None | |
| with sqlite3.connect(self.DB_PATH) as conn: | |
| cursor = conn.execute("SELECT payload FROM offloaded_data WHERE ref_id = ?", (ref_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| return json.loads(row[0]) | |
| return None | |
| def get_last_id_by_session(self, session_id: str) -> Optional[str]: | |
| if not os.path.exists(self.DB_PATH): | |
| return None | |
| with sqlite3.connect(self.DB_PATH) as conn: | |
| cursor = conn.execute("SELECT last_ref_id FROM session_state WHERE session_id = ?", (session_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| return row[0] | |
| return None | |
| # 全域單例 | |
| poi_repo = PoiRepository() | |