Spaces:
Running
Running
| """ | |
| LifeFlow AI - Planner Service (Refactored for MCP Architecture) | |
| ๆ ธๅฟๆฅญๅ้่ผฏๅฑค๏ผ่ฒ ่ฒฌๅ่ชฟ Agent ้ไฝใ็ๆ ๆดๆฐ่่ณๆ่็ใ | |
| โ ็งป้คๆฌๅฐ Toolkits | |
| โ ๆดๅๅ จๅ MCP Client | |
| โ ไฟๆๆฅญๅ้่ผฏไธ่ฎ | |
| """ | |
| import json | |
| import os | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| from typing import Generator, Dict, Any, Tuple, Optional, AsyncGenerator | |
| # Core Imports | |
| from core.session import UserSession | |
| from ui.renderers import ( | |
| create_task_card, | |
| create_summary_card, | |
| create_timeline_html_enhanced, | |
| ) | |
| from core.visualizers import create_animated_map | |
| from config import AGENTS_INFO | |
| # Models | |
| from agno.models.google import Gemini | |
| from agno.models.openai import OpenAIChat | |
| from agno.models.groq import Groq | |
| # Agno Framework | |
| from agno.agent import RunEvent | |
| from agno.run.team import TeamRunEvent | |
| from src.agent.base import UserState, Location, get_context | |
| from src.agent.planner import create_planner_agent | |
| from src.agent.core_team import create_core_team | |
| from src.infra.context import set_session_id | |
| from src.infra.poi_repository import poi_repo | |
| from src.infra.logger import get_logger | |
| from src.infra.client_context import client_session_ctx | |
| # ๐ฅ๐ฅ๐ฅ NEW IMPORTS: ๅชไฝฟ็จ MCPTools | |
| from agno.tools.mcp import MCPTools | |
| gemini_safety_settings = [ | |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
| ] | |
| logger = get_logger(__name__) | |
| max_retries = 5 | |
| class PlannerService: | |
| """ | |
| PlannerService (MCP Version) | |
| """ | |
| # Active Sessions ๅฟซๅ (ๅ ๅญ Session ็ฉไปถ๏ผไธๅซๅ จๅ่ณๆบ) | |
| _active_sessions: Dict[str, UserSession] = {} | |
| _cancelled_sessions: set = set() | |
| # ๐ฅ ๅ จๅๅทฅๅ ทๅ็ ง (็ฑ App ๆณจๅ ฅ) | |
| _global_toolkits: Dict[str, MCPTools] = {} | |
| def set_global_tools(self, toolkits_dict: Dict[str, MCPTools]): | |
| """็ฑ app.py ๅผๅซ๏ผๆณจๅ ฅ MCP Client""" | |
| self._global_toolkits = toolkits_dict | |
| logger.info("๐ MCP Toolkit injected into PlannerService successfully.") | |
| def _inject_session_id_into_toolkit(self, toolkit: MCPTools, session_id: str): | |
| """ | |
| Monkey Patch 2.0: | |
| ๆๆช Toolkit ็ข็็ Function๏ผไธฆๅจๅท่กๆ่ชๅๆณจๅ ฅ session_id ๅๆธใ | |
| """ | |
| # 1. ๅไปฝๅๅง็ get_tools ๆนๆณ | |
| # ๅ ็บ Agno ๆฏๅผๅซ get_tools() ไพๅๅพๅทฅๅ ทๅ่กจ็ | |
| original_get_tools = toolkit.functions | |
| def get_tools_wrapper(): | |
| # 2. ๅๅพๅๅงๅทฅๅ ทๅ่กจ (List[Function]) | |
| tools = original_get_tools | |
| for tool in tools: | |
| # 3. ๅไปฝๆฏๅๅทฅๅ ท็ๅท่กๅ ฅๅฃ (entrypoint) | |
| original_entrypoint = tool.entrypoint | |
| # 4. ๅฎ็พฉๆฐ็ๅท่กๅ ฅๅฃ (Wrapper) | |
| def entrypoint_wrapper(*args, **kwargs): | |
| # ๐ฅ ๆ ธๅฟ้ญๆณ๏ผๅจ้่ฃกๅทๅทๅกๅ ฅ session_id | |
| # ้ๆจฃ LLM ๆฒๅณ้ๅๅๆธ๏ผไนๆ่ขซ่ชๅ่ฃไธ | |
| kwargs['session_id'] = session_id | |
| # ๅท่กๅๅง MCP ๅผๅซ | |
| return original_entrypoint(*args, **kwargs) | |
| # 5. ๆฟๆๆๅ ฅๅฃ | |
| tool.entrypoint = entrypoint_wrapper | |
| return tools | |
| # 6. ๅฐ Toolkit ็ get_tools ๆๆๆๅ็ Wrapper | |
| # ้ๆฏ Instance Level ็ไฟฎๆน๏ผๅชๆๅฝฑ้ฟ็ถๅ้ๅ Agent ็ Toolkit | |
| toolkit.get_tools = get_tools_wrapper | |
| def cancel_session(self, session_id: str): | |
| if session_id: | |
| logger.info(f"๐ Requesting cancellation for session: {session_id}") | |
| self._cancelled_sessions.add(session_id) | |
| def _get_live_session(self, incoming_session: UserSession) -> UserSession: | |
| sid = incoming_session.session_id | |
| if not sid: return incoming_session | |
| if sid and sid in self._active_sessions: | |
| live_session = self._active_sessions[sid] | |
| live_session.lat = incoming_session.lat | |
| live_session.lng = incoming_session.lng | |
| if incoming_session.custom_settings: | |
| live_session.custom_settings.update(incoming_session.custom_settings) | |
| if len(incoming_session.chat_history) > len(live_session.chat_history): | |
| live_session.chat_history = incoming_session.chat_history | |
| return live_session | |
| self._active_sessions[sid] = incoming_session | |
| return incoming_session | |
| def initialize_agents(self, session: UserSession, lat: float, lng: float) -> UserSession: | |
| if not session.session_id: | |
| session.session_id = str(uuid.uuid4()) | |
| logger.info(f"๐ Generated New Session ID: {session.session_id}") | |
| session = self._get_live_session(session) | |
| session.lat = lat | |
| session.lng = lng | |
| if not session.user_state: | |
| session.user_state = UserState(location=Location(lat=lat, lng=lng)) | |
| else: | |
| session.user_state.location = Location(lat=lat, lng=lng) | |
| if session.planner_agent is not None: | |
| return session | |
| # 1. ่จญๅฎๆจกๅ (Models) | |
| settings = session.custom_settings | |
| provider = settings.get("llm_provider", "Gemini") | |
| main_api_key = settings.get("model_api_key") | |
| selected_model_id = settings.get("model", "gemini-2.5-flash") | |
| helper_model_id = settings.get("groq_fast_model", "openai/gpt-oss-20b") | |
| enable_fast_mode = settings.get("enable_fast_mode", False) | |
| if main_api_key is None: | |
| provider = "Gemini" | |
| main_api_key = os.environ.get("GEMINI_API_KEY") | |
| selected_model_id = "gemini-2.5-flash" | |
| logger.warning("โ ๏ธ Main API key not provided, defaulting to Gemini-2.5-flash with env var.") | |
| groq_api_key = settings["groq_api_key"] if settings.get("groq_api_key") else os.environ.get("GROQ_API_KEY") | |
| # ๅๅงๅ Main Brain | |
| if provider.lower() == "gemini": | |
| main_brain = Gemini(id=selected_model_id, api_key=main_api_key, thinking_budget=1024, | |
| safety_settings=gemini_safety_settings) | |
| elif provider.lower() == "openai": | |
| main_brain = OpenAIChat(id=selected_model_id, api_key=main_api_key) | |
| elif provider.lower() == "groq": | |
| main_brain = Groq(id=selected_model_id, api_key=main_api_key, temperature=0.1) | |
| else: | |
| main_brain = Gemini(id='gemini-2.5-flash', api_key=main_api_key) | |
| # ๅๅงๅ Helper Model | |
| if enable_fast_mode and groq_api_key: | |
| helper_model = Groq(id=helper_model_id, api_key=groq_api_key, temperature=0.1) | |
| else: | |
| if provider.lower() == "gemini": | |
| helper_model = Gemini(id="gemini-2.5-flash-lite", api_key=main_api_key, | |
| safety_settings=gemini_safety_settings) | |
| elif provider.lower() == "openai": | |
| helper_model = OpenAIChat(id="gpt-4o-mini", api_key=main_api_key) | |
| else: | |
| helper_model = main_brain | |
| models_dict = { | |
| "team": main_brain, | |
| "presenter": main_brain, | |
| "scout": helper_model, | |
| "optimizer": helper_model, | |
| "navigator": helper_model, | |
| "weatherman": helper_model | |
| } | |
| # 2. ๆบๅ Tools (๐ฅ MCP ้ๆงๆ ธๅฟ) | |
| if not self._global_toolkits: | |
| logger.warning("โ ๏ธ MCP Toolkit is NOT initialized! Agents will have no tools.") | |
| def get_tool_list(agent_name): | |
| toolkit = self._global_toolkits.get(agent_name) | |
| return [toolkit] if toolkit else [] | |
| tools_dict = { | |
| "scout": get_tool_list("scout"), | |
| "optimizer": get_tool_list("optimizer"), | |
| "navigator": get_tool_list("navigator"), | |
| "weatherman": get_tool_list("weatherman"), | |
| "presenter": get_tool_list("presenter"), | |
| } | |
| planner_kwargs = { | |
| "additional_context": get_context(session.user_state), | |
| "timezone_identifier": session.user_state.utc_offset, | |
| "debug_mode": False, | |
| } | |
| team_kwargs = {"timezone_identifier": session.user_state.utc_offset} | |
| # 3. ๅปบ็ซ Agents | |
| session.planner_agent = create_planner_agent(main_brain, planner_kwargs, session_id=session.session_id) | |
| session.core_team = create_core_team(models_dict, team_kwargs, tools_dict, session_id=session.session_id) | |
| self._active_sessions[session.session_id] = session | |
| logger.info(f"โ Agents initialized (MCP Mode) for session {session.session_id}") | |
| return session | |
| # ================= Step 1: Analyze Tasks ================= | |
| def run_step1_analysis(self, user_input: str, auto_location: bool, | |
| lat: float, lng: float, session: UserSession) -> Generator[Dict[str, Any], None, None]: | |
| if not user_input or len(user_input.strip()) == 0: | |
| yield {"type": "error", "message": "โ ๏ธ Please enter your plans first!", | |
| "stream_text": "Waiting for input...", "block_next_step": True} | |
| return | |
| if auto_location and (lat == 0 or lng == 0): | |
| yield {"type": "error", "message": "โ ๏ธ Location detection failed.", "stream_text": "Location Error...", | |
| "block_next_step": True} | |
| return | |
| if not auto_location and (lat is None or lng is None): | |
| yield {"type": "error", "message": "โ ๏ธ Please enter valid Latitude/Longitude.", | |
| "stream_text": "Location Error...", "block_next_step": True} | |
| return | |
| try: | |
| session = self.initialize_agents(session, lat, lng) | |
| self._add_reasoning(session, "planner", "๐ Starting analysis...") | |
| yield {"type": "stream", "stream_text": "๐ค Analyzing your request with AI...", | |
| "agent_status": ("planner", "working", "Initializing..."), "session": session} | |
| self._add_reasoning(session, "planner", f"Processing: {user_input[:50]}...") | |
| current_text = "๐ค Analyzing your request with AI...\n๐ AI is extracting tasks..." | |
| planner_stream = session.planner_agent.run( | |
| f"help user to update the task_list, user's message: {user_input}", | |
| stream=True, stream_events=True | |
| ) | |
| accumulated_response, displayed_text = "", current_text + "\n\n" | |
| for chunk in planner_stream: | |
| if chunk.event == RunEvent.run_content: | |
| content = chunk.content | |
| accumulated_response += content | |
| if "@@@" not in accumulated_response: | |
| displayed_text += content | |
| formatted_text = displayed_text.replace("\n", "<br/>") | |
| yield {"type": "stream", "stream_text": formatted_text, | |
| "agent_status": ("planner", "working", "Thinking..."), "session": session} | |
| json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1] | |
| json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ") | |
| try: | |
| task_list_data = json.loads(json_data) | |
| if task_list_data["global_info"]["start_location"].lower() == "user location": | |
| task_list_data["global_info"]["start_location"] = {"lat": lat, "lng": lng} | |
| session.planner_agent.update_session_state(session_id=session.session_id, | |
| session_state_updates={"task_list": task_list_data}) | |
| session.task_list = self._convert_task_list_to_ui_format(task_list_data) | |
| except Exception as e: | |
| logger.error(f"Failed to parse task_list: {e}") | |
| session.task_list = [] | |
| if not session.task_list: | |
| err_msg = "โ ๏ธ AI couldn't identify any tasks." | |
| self._add_reasoning(session, "planner", "โ No tasks found") | |
| yield {"type": "error", "message": err_msg, "stream_text": err_msg, "session": session, | |
| "block_next_step": True} | |
| return | |
| if "priority" in session.task_list: | |
| for i in session.task_list: | |
| if not session.task_list[i].get("priority"): | |
| session.task_list[i]["priority"] = "MEDIUM" | |
| else: | |
| session.task_list[i]["priority"] = session.task_list[i]["priority"].upper() | |
| high_priority = sum(1 for t in session.task_list if t.get("priority") == "HIGH") | |
| total_time = sum(int(t.get("duration", "0").split()[0]) for t in session.task_list if t.get("duration")) | |
| yield {"type": "complete", "stream_text": "Analysis complete!", | |
| "start_location": task_list_data["global_info"].get("start_location", "N/A"), | |
| "high_priority": high_priority, "total_time": total_time, | |
| "start_time": task_list_data["global_info"].get("departure_time", "N/A"), "session": session, | |
| "block_next_step": False} | |
| except Exception as e: | |
| logger.error(f"Error: {e}") | |
| yield {"type": "error", "message": str(e), "session": session, "block_next_step": True} | |
| # ================= Task Modification (Chat) ================= | |
| def modify_task_chat(self, user_message: str, session: UserSession) -> Generator[Dict[str, Any], None, None]: | |
| if not user_message or len(user_message.replace(' ', '')) == 0: | |
| yield {"type": "chat_error", "message": "Please enter a message.", "session": session} | |
| return | |
| session = self._get_live_session(session) | |
| session.chat_history.append( | |
| {"role": "user", "message": user_message, "time": datetime.now().strftime("%H:%M:%S")}) | |
| yield {"type": "update_history", "session": session} | |
| try: | |
| if session.planner_agent is None: | |
| if session.lat and session.lng: | |
| session = self.initialize_agents(session, session.lat, session.lng) | |
| else: | |
| yield {"type": "chat_error", "message": "Session lost. Please restart.", "session": session} | |
| return | |
| session.chat_history.append( | |
| {"role": "assistant", "message": "๐ค AI is thinking...", "time": datetime.now().strftime("%H:%M:%S")}) | |
| yield {"type": "update_history", "session": session} | |
| planner_stream = session.planner_agent.run( | |
| f"help user to update the task_list, user's message: {user_message}", | |
| stream=True, stream_events=True | |
| ) | |
| accumulated_response = "" | |
| for chunk in planner_stream: | |
| if chunk.event == RunEvent.run_content: | |
| accumulated_response += chunk.content | |
| json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1] | |
| json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ") | |
| try: | |
| task_list_data = json.loads(json_data) | |
| if isinstance(task_list_data["global_info"]["start_location"], str) and task_list_data["global_info"][ | |
| "start_location"].lower() == "user location": | |
| task_list_data["global_info"]["start_location"] = {"lat": session.lat, "lng": session.lng} | |
| session.planner_agent.update_session_state(session_id=session.session_id, | |
| session_state_updates={"task_list": task_list_data}) | |
| session.task_list = self._convert_task_list_to_ui_format(task_list_data) | |
| except Exception as e: | |
| logger.error(f"Failed to parse modified task_list: {e}") | |
| raise e | |
| high_priority = sum(1 for t in session.task_list if t.get("priority") == "HIGH") | |
| total_time = sum(int(t.get("duration", "0").split()[0]) for t in session.task_list if t.get("duration")) | |
| start_location = task_list_data["global_info"].get("start_location", "N/A") | |
| date = task_list_data["global_info"].get("departure_time", "N/A") | |
| summary_html = create_summary_card(len(session.task_list), high_priority, total_time, start_location, date) | |
| session.chat_history[-1] = {"role": "assistant", "message": "โ Tasks updated.", | |
| "time": datetime.now().strftime("%H:%M:%S")} | |
| self._add_reasoning(session, "planner", f"Updated: {user_message[:30]}...") | |
| yield {"type": "complete", "summary_html": summary_html, "session": session} | |
| except Exception as e: | |
| logger.error(f"Chat error: {e}") | |
| session.chat_history.append( | |
| {"role": "assistant", "message": f"โ Error: {str(e)}", "time": datetime.now().strftime("%H:%M:%S")}) | |
| yield {"type": "update_history", "session": session} | |
| # ================= Step 3: Run Core Team ================= | |
| async def run_step3_team(self, session: UserSession) -> AsyncGenerator[Dict[str, Any], None]: | |
| token = client_session_ctx.set(session.session_id) | |
| attempt = 0 | |
| success = False | |
| start_time = time.perf_counter() | |
| try: | |
| session = self._get_live_session(session) | |
| sid = session.session_id | |
| if sid in self._cancelled_sessions: self._cancelled_sessions.remove(sid) | |
| if not session.task_list: | |
| yield {"type": "error", "message": "No tasks to plan.", "session": session} | |
| return | |
| task_list_input = session.planner_agent.get_session_state()["task_list"] | |
| task_list_str = json.dumps(task_list_input, indent=2, ensure_ascii=False) if isinstance(task_list_input, ( | |
| dict, list)) else str(task_list_input) | |
| self._add_reasoning(session, "team", "๐ฏ Multi-agent collaboration started") | |
| yield {"type": "reasoning_update", "session": session, | |
| "agent_status": ("team", "working", "Analyzing tasks...")} | |
| while attempt < max_retries and not success: | |
| attempt += 1 | |
| try: | |
| active_agents = set() | |
| # ๐ฅ ้้ปไฟฎๆน 1: ไฝฟ็จ arun (Async Run) | |
| # ๐ฅ ้้ปไฟฎๆน 2: ้ๅๆนๆณๆฌ่บซๅๅณ็ๆฏไธๅ AsyncGenerator๏ผๆไปฅ่ฆ็ดๆฅ iterate | |
| team_stream = session.core_team.arun( | |
| task_list_str, | |
| stream=True, | |
| stream_events=True, | |
| session_id=session.session_id | |
| ) | |
| report_content = "" | |
| start_time = time.perf_counter() | |
| has_content = False | |
| # ๐ฅ ้้ปไฟฎๆน 3: ไฝฟ็จ async for ไพ่ฟญไปฃ | |
| async for event in team_stream: | |
| if event.event in [RunEvent.run_content, RunEvent.tool_call_started]: | |
| has_content = True | |
| success = True | |
| if sid in self._cancelled_sessions: | |
| logger.warning(f"๐ Execution terminated by user for session {sid}") | |
| self._cancelled_sessions.remove(sid) | |
| yield {"type": "error", "message": "Plan cancelled by user."} | |
| return | |
| if event.event == RunEvent.run_started: | |
| agent_id = event.agent_id or "team" | |
| active_agents.add(agent_id) | |
| if agent_id == "presenter": report_content = "" | |
| yield {"type": "reasoning_update", "session": session, | |
| "agent_status": (agent_id, "working", "Thinking...")} | |
| elif event.event == RunEvent.run_completed: | |
| agent_id = event.agent_id or "team" | |
| if agent_id == "team": | |
| yield {"type": "reasoning_update", "session": session, | |
| "agent_status": ("team", "working", "Processing...")} | |
| continue | |
| if agent_id in active_agents: active_agents.remove(agent_id) | |
| yield {"type": "reasoning_update", "session": session, | |
| "agent_status": (agent_id, "idle", "Standby")} | |
| yield {"type": "reasoning_update", "session": session, | |
| "agent_status": ("team", "working", "Reviewing results...")} | |
| elif event.event == RunEvent.run_content and event.agent_id == "presenter": | |
| report_content += event.content | |
| yield {"type": "report_stream", "content": report_content, "session": session} | |
| elif event.event == TeamRunEvent.tool_call_started: | |
| tool_name = event.tool.tool_name | |
| yield {"type": "reasoning_update", "session": session, | |
| "agent_status": ("team", "working", "Orchestrating...")} | |
| if "delegate_task_to_member" in tool_name: | |
| member_id = event.tool.tool_args.get("member_id", "unknown") | |
| self._add_reasoning(session, "team", f"๐ Delegating to {member_id}...") | |
| yield {"type": "reasoning_update", "session": session, | |
| "agent_status": (member_id, "working", "Receiving Task...")} | |
| else: | |
| self._add_reasoning(session, "team", f"๐ง Tool: {tool_name}") | |
| elif event.event == RunEvent.tool_call_started: | |
| member_id = event.agent_id | |
| tool_name = event.tool.tool_name | |
| yield {"type": "reasoning_update", "session": session, | |
| "agent_status": ("team", "working", f"Monitoring {member_id}...")} | |
| self._add_reasoning(session, member_id, f"Using tool: {tool_name}...") | |
| yield {"type": "reasoning_update", "session": session, | |
| "agent_status": (member_id, "working", f"Running Tool...")} | |
| elif event.event == TeamRunEvent.run_completed: | |
| self._add_reasoning(session, "team", "๐ Planning process finished") | |
| if hasattr(event, 'metrics'): | |
| logger.info(f"Total tokens: {event.metrics.total_tokens}") | |
| logger.info(f"Input tokens: {event.metrics.input_tokens}") | |
| logger.info(f"Output tokens: {event.metrics.output_tokens}") | |
| if not has_content and attempt < max_retries: continue | |
| break | |
| finally: | |
| logger.info(f"Run time (s): {time.perf_counter() - start_time}") | |
| for agent in ["scout", "optimizer", "navigator", "weatherman", "presenter"]: | |
| yield {"type": "reasoning_update", "session": session, "agent_status": (agent, "idle", "Standby")} | |
| yield {"type": "reasoning_update", "session": session, "agent_status": ("team", "complete", "All Done!")} | |
| session.final_report = report_html = f"## ๐ฏ Planning Complete\n\n{report_content}" | |
| yield {"type": "complete", "report_html": report_html, "session": session, | |
| "agent_status": ("team", "complete", "Finished")} | |
| except GeneratorExit: | |
| return | |
| except Exception as e: | |
| logger.error(f"Error in attempt {attempt}: {e}") | |
| if attempt >= max_retries: yield {"type": "error", "message": str(e), "session": session} | |
| finally: | |
| client_session_ctx.reset(token) | |
| # ================= Step 4: Finalize ================= | |
| def run_step4_finalize(self, session: UserSession) -> Dict[str, Any]: | |
| try: | |
| session = self._get_live_session(session) | |
| final_ref_id = poi_repo.get_last_id_by_session(session.session_id) | |
| if not final_ref_id: | |
| raise ValueError(f"No results found, please Stop & Back to Edit to re-run the planning.") | |
| structured_data = poi_repo.load(final_ref_id) | |
| timeline_html = create_timeline_html_enhanced(structured_data.get("timeline", [])) | |
| metrics = structured_data.get("metrics", {}) | |
| traffic = structured_data.get("traffic_summary", {}) | |
| task_count = f"{metrics['completed_tasks']} / {metrics['total_tasks']}" | |
| high_prio = sum(1 for t in session.task_list if t.get("priority") == "HIGH") | |
| total_time = metrics.get("optimized_duration_min", traffic.get("total_duration_min", 0)) | |
| dist_m = metrics.get("optimized_distance_m", 0) | |
| total_dist_km = dist_m / 1000.0 | |
| efficiency = metrics.get("route_efficiency_pct", 0) | |
| saved_dist_m = metrics.get("distance_saved_m", 0) | |
| saved_time_min = metrics.get("time_saved_min", 0) | |
| start_location = structured_data.get("global_info", {}).get("start_location", {}).get("name", "N/A") | |
| date = structured_data.get("global_info", {}).get("departure_time", "N/A") | |
| summary_card = create_summary_card(task_count, high_prio, int(total_time), start_location, date) | |
| eff_color = "#047857" if efficiency >= 80 else "#d97706" | |
| eff_bg = "#ecfdf5" if efficiency >= 80 else "#fffbeb" | |
| eff_border = "#a7f3d0" if efficiency >= 80 else "#fde68a" | |
| ai_stats_html = f""" | |
| <div style="display: flex; gap: 12px; margin-bottom: 20px;"> | |
| <div style="flex: 1; background: {eff_bg}; padding: 16px; border-radius: 12px; border: 1px solid {eff_border};"> | |
| <div style="font-size: 0.8rem; color: {eff_color}; font-weight: 600; display: flex; align-items: center; gap: 4px;"><span>๐ AI EFFICIENCY</span></div> | |
| <div style="font-size: 1.8rem; font-weight: 800; color: {eff_color}; line-height: 1.2;">{efficiency:.1f}%</div> | |
| <div style="font-size: 0.75rem; color: {eff_color}; opacity: 0.9; margin-top: 4px;">โก Saved {saved_time_min:.0f} mins</div> | |
| </div> | |
| <div style="flex: 1; background: #eff6ff; padding: 16px; border-radius: 12px; border: 1px solid #bfdbfe;"> | |
| <div style="font-size: 0.8rem; color: #2563eb; font-weight: 600;">๐ TOTAL DISTANCE</div> | |
| <div style="font-size: 1.8rem; font-weight: 800; color: #1d4ed8; line-height: 1.2;">{total_dist_km:.2f} <span style="font-size: 1rem;">km</span></div> | |
| <div style="font-size: 0.75rem; color: #2563eb; opacity: 0.9; margin-top: 4px;">๐ Reduced {saved_dist_m} m</div> | |
| </div> | |
| </div>""" | |
| full_summary_html = f"{summary_card}{ai_stats_html}<h3>๐ Itinerary Timeline</h3>{timeline_html}" | |
| map_fig = create_animated_map(structured_data) | |
| task_list_html = self.generate_task_list_html(session) | |
| session.planning_completed = True | |
| return {"type": "success", "summary_tab_html": full_summary_html, "report_md": session.final_report, | |
| "task_list_html": task_list_html, "map_fig": map_fig, "session": session} | |
| except Exception as e: | |
| logger.error(f"Finalize error: {e}", exc_info=True) | |
| return {"type": "error", "message": str(e), "session": session} | |
| # ================= Helpers ================= | |
| def _add_reasoning(self, session: UserSession, agent: str, message: str): | |
| session.reasoning_messages.append( | |
| {"agent": agent, "message": message, "time": datetime.now().strftime("%H:%M:%S")}) | |
| def _convert_task_list_to_ui_format(self, task_list_data): | |
| ui_tasks = [] | |
| if isinstance(task_list_data, dict): | |
| tasks = task_list_data.get("tasks", []) | |
| elif isinstance(task_list_data, list): | |
| tasks = task_list_data | |
| else: | |
| return [] | |
| for i, task in enumerate(tasks, 1): | |
| ui_tasks.append({ | |
| "id": i, | |
| "title": task.get("description", "Task"), | |
| "priority": task.get("priority", "MEDIUM"), | |
| "time": task.get("time_window", "Anytime"), | |
| "duration": f"{task.get('service_duration_min', 30)} minutes", | |
| "location": task.get("location_hint", "To be determined"), | |
| "icon": self._get_task_icon(task.get("category", "other")) | |
| }) | |
| return ui_tasks | |
| def _get_task_icon(self, category: str) -> str: | |
| icons = {"medical": "๐ฅ", "shopping": "๐", "postal": "๐ฎ", "food": "๐ฝ๏ธ", "entertainment": "๐ญ", | |
| "transportation": "๐", "other": "๐"} | |
| return icons.get(category.lower(), "๐") | |
| def generate_task_list_html(self, session: UserSession) -> str: | |
| if not session.task_list: return "<p>No tasks available</p>" | |
| html = "" | |
| for task in session.task_list: | |
| html += create_task_card(task["id"], task["title"], task["priority"], task["time"], task["duration"], | |
| task["location"], task.get("icon", "๐")) | |
| return html |