LifeFlow-AI / src /infra /poi_repository.py
Marco310's picture
feat: 🚀 Evolve to Stateful MCP Architecture with Context Injection Middleware
529a8bd
import sqlite3
import json
import uuid
import os
from typing import Any, Dict, Optional
from src.infra.context import get_session_id
from src.infra.logger import get_logger
logger = get_logger(__name__)
class PoiRepository:
# =========================================================================
# 1. 設定絕對路徑 (Class Attributes) - 確保在 __init__ 之前就存在
# =========================================================================
# 抓出這個檔案 (poi_repository.py) 的絕對路徑: .../LifeFlow-AI/src/infra
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 往上推兩層回到專案根目錄: .../LifeFlow-AI
# (src/infra -> src -> LifeFlow-AI)
PROJECT_ROOT = os.path.dirname(os.path.dirname(BASE_DIR))
# 設定 DB 存放目錄: .../LifeFlow-AI/storage
DB_DIR = os.path.join(PROJECT_ROOT, "storage")
# 設定 DB 完整路徑: .../LifeFlow-AI/storage/lifeflow_payloads.db
DB_FILE = "lifeflow_payloads.db"
DB_PATH = os.path.join(DB_DIR, DB_FILE)
# =========================================================================
# 2. 初始化邏輯
# =========================================================================
def __init__(self):
# 1. 先確保資料夾存在 (這時候 self.DB_DIR 已經讀得到了)
os.makedirs(self.DB_DIR, exist_ok=True)
# 2. 再初始化 DB (這時候 self.DB_PATH 已經讀得到了)
self._init_db()
def _init_db(self):
# 使用 self.DB_PATH
with sqlite3.connect(self.DB_PATH) as conn:
# 1. 既有的資料表
conn.execute("""
CREATE TABLE IF NOT EXISTS offloaded_data (
ref_id TEXT PRIMARY KEY,
data_type TEXT,
payload JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 2. Session 狀態表
conn.execute("""
CREATE TABLE IF NOT EXISTS session_state (
session_id TEXT PRIMARY KEY,
last_ref_id TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# =========================================================================
# 3. 讀寫操作
# =========================================================================
def save(self, data: Any, data_type: str = "generic") -> str:
ref_id = f"{data_type}_{uuid.uuid4().hex[:8]}"
with sqlite3.connect(self.DB_PATH) as conn:
# 1. 寫入 Payload
conn.execute(
"INSERT INTO offloaded_data (ref_id, data_type, payload) VALUES (?, ?, ?)",
(ref_id, data_type, json.dumps(data, default=str))
)
# 2. 更新 Session 的最後 ID
current_session = get_session_id()
if current_session:
conn.execute("""
INSERT OR REPLACE INTO session_state (session_id, last_ref_id)
VALUES (?, ?)
""", (current_session, ref_id))
logger.info(f"💾 [Repo] Saved {ref_id} for Session: {current_session}")
else:
# 這裡改用 debug level,以免 log 太多雜訊
logger.debug(f"⚠️ [Repo] No session context found, 'last_id' not tracked.")
return ref_id
def load(self, ref_id: str) -> Optional[Any]:
if not ref_id: return None
# 容錯:如果路徑還沒建立 (理論上 init 已建立,但防呆)
if not os.path.exists(self.DB_PATH):
return None
with sqlite3.connect(self.DB_PATH) as conn:
cursor = conn.execute("SELECT payload FROM offloaded_data WHERE ref_id = ?", (ref_id,))
row = cursor.fetchone()
if row:
return json.loads(row[0])
return None
def get_last_id_by_session(self, session_id: str) -> Optional[str]:
if not os.path.exists(self.DB_PATH):
return None
with sqlite3.connect(self.DB_PATH) as conn:
cursor = conn.execute("SELECT last_ref_id FROM session_state WHERE session_id = ?", (session_id,))
row = cursor.fetchone()
if row:
return row[0]
return None
# 全域單例
poi_repo = PoiRepository()