LifeFlow-AI / services /planner_service.py
Marco310's picture
doc: Optimization error description
de31bb3
"""
LifeFlow AI - Planner Service (Refactored for MCP Architecture)
ๆ ธๅฟƒๆฅญๅ‹™้‚่ผฏๅฑค๏ผš่ฒ ่ฒฌๅ”่ชฟ Agent ้‹ไฝœใ€็‹€ๆ…‹ๆ›ดๆ–ฐ่ˆ‡่ณ‡ๆ–™่™•็†ใ€‚
โœ… ็งป้™คๆœฌๅœฐ Toolkits
โœ… ๆ•ดๅˆๅ…จๅŸŸ MCP Client
โœ… ไฟๆŒๆฅญๅ‹™้‚่ผฏไธ่ฎŠ
"""
import json
import os
import time
import uuid
from datetime import datetime
from typing import Generator, Dict, Any, Tuple, Optional, AsyncGenerator
# Core Imports
from core.session import UserSession
from ui.renderers import (
create_task_card,
create_summary_card,
create_timeline_html_enhanced,
)
from core.visualizers import create_animated_map
from config import AGENTS_INFO
# Models
from agno.models.google import Gemini
from agno.models.openai import OpenAIChat
from agno.models.groq import Groq
# Agno Framework
from agno.agent import RunEvent
from agno.run.team import TeamRunEvent
from src.agent.base import UserState, Location, get_context
from src.agent.planner import create_planner_agent
from src.agent.core_team import create_core_team
from src.infra.context import set_session_id
from src.infra.poi_repository import poi_repo
from src.infra.logger import get_logger
from src.infra.client_context import client_session_ctx
# ๐Ÿ”ฅ๐Ÿ”ฅ๐Ÿ”ฅ NEW IMPORTS: ๅชไฝฟ็”จ MCPTools
from agno.tools.mcp import MCPTools
gemini_safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
]
logger = get_logger(__name__)
max_retries = 5
class PlannerService:
"""
PlannerService (MCP Version)
"""
# Active Sessions ๅฟซๅ– (ๅƒ…ๅญ˜ Session ็‰ฉไปถ๏ผŒไธๅซๅ…จๅŸŸ่ณ‡ๆบ)
_active_sessions: Dict[str, UserSession] = {}
_cancelled_sessions: set = set()
# ๐Ÿ”ฅ ๅ…จๅŸŸๅทฅๅ…ทๅƒ็…ง (็”ฑ App ๆณจๅ…ฅ)
_global_toolkits: Dict[str, MCPTools] = {}
def set_global_tools(self, toolkits_dict: Dict[str, MCPTools]):
"""็”ฑ app.py ๅ‘ผๅซ๏ผŒๆณจๅ…ฅ MCP Client"""
self._global_toolkits = toolkits_dict
logger.info("๐Ÿ’‰ MCP Toolkit injected into PlannerService successfully.")
def _inject_session_id_into_toolkit(self, toolkit: MCPTools, session_id: str):
"""
Monkey Patch 2.0:
ๆ””ๆˆช Toolkit ็”ข็”Ÿ็š„ Function๏ผŒไธฆๅœจๅŸท่กŒๆ™‚่‡ชๅ‹•ๆณจๅ…ฅ session_id ๅƒๆ•ธใ€‚
"""
# 1. ๅ‚™ไปฝๅŽŸๅง‹็š„ get_tools ๆ–นๆณ•
# ๅ› ็‚บ Agno ๆ˜ฏๅ‘ผๅซ get_tools() ไพ†ๅ–ๅพ—ๅทฅๅ…ทๅˆ—่กจ็š„
original_get_tools = toolkit.functions
def get_tools_wrapper():
# 2. ๅ–ๅพ—ๅŽŸๅง‹ๅทฅๅ…ทๅˆ—่กจ (List[Function])
tools = original_get_tools
for tool in tools:
# 3. ๅ‚™ไปฝๆฏๅ€‹ๅทฅๅ…ท็š„ๅŸท่กŒๅ…ฅๅฃ (entrypoint)
original_entrypoint = tool.entrypoint
# 4. ๅฎš็พฉๆ–ฐ็š„ๅŸท่กŒๅ…ฅๅฃ (Wrapper)
def entrypoint_wrapper(*args, **kwargs):
# ๐Ÿ”ฅ ๆ ธๅฟƒ้ญ”ๆณ•๏ผšๅœจ้€™่ฃกๅทๅทๅกžๅ…ฅ session_id
# ้€™ๆจฃ LLM ๆฒ’ๅ‚ณ้€™ๅ€‹ๅƒๆ•ธ๏ผŒไนŸๆœƒ่ขซ่‡ชๅ‹•่ฃœไธŠ
kwargs['session_id'] = session_id
# ๅŸท่กŒๅŽŸๅง‹ MCP ๅ‘ผๅซ
return original_entrypoint(*args, **kwargs)
# 5. ๆ›ฟๆ›ๆމๅ…ฅๅฃ
tool.entrypoint = entrypoint_wrapper
return tools
# 6. ๅฐ‡ Toolkit ็š„ get_tools ๆ›ๆˆๆˆ‘ๅ€‘็š„ Wrapper
# ้€™ๆ˜ฏ Instance Level ็š„ไฟฎๆ”น๏ผŒๅชๆœƒๅฝฑ้Ÿฟ็•ถๅ‰้€™ๅ€‹ Agent ็š„ Toolkit
toolkit.get_tools = get_tools_wrapper
def cancel_session(self, session_id: str):
if session_id:
logger.info(f"๐Ÿ›‘ Requesting cancellation for session: {session_id}")
self._cancelled_sessions.add(session_id)
def _get_live_session(self, incoming_session: UserSession) -> UserSession:
sid = incoming_session.session_id
if not sid: return incoming_session
if sid and sid in self._active_sessions:
live_session = self._active_sessions[sid]
live_session.lat = incoming_session.lat
live_session.lng = incoming_session.lng
if incoming_session.custom_settings:
live_session.custom_settings.update(incoming_session.custom_settings)
if len(incoming_session.chat_history) > len(live_session.chat_history):
live_session.chat_history = incoming_session.chat_history
return live_session
self._active_sessions[sid] = incoming_session
return incoming_session
def initialize_agents(self, session: UserSession, lat: float, lng: float) -> UserSession:
if not session.session_id:
session.session_id = str(uuid.uuid4())
logger.info(f"๐Ÿ†” Generated New Session ID: {session.session_id}")
session = self._get_live_session(session)
session.lat = lat
session.lng = lng
if not session.user_state:
session.user_state = UserState(location=Location(lat=lat, lng=lng))
else:
session.user_state.location = Location(lat=lat, lng=lng)
if session.planner_agent is not None:
return session
# 1. ่จญๅฎšๆจกๅž‹ (Models)
settings = session.custom_settings
provider = settings.get("llm_provider", "Gemini")
main_api_key = settings.get("model_api_key")
selected_model_id = settings.get("model", "gemini-2.5-flash")
helper_model_id = settings.get("groq_fast_model", "openai/gpt-oss-20b")
enable_fast_mode = settings.get("enable_fast_mode", False)
if main_api_key is None:
provider = "Gemini"
main_api_key = os.environ.get("GEMINI_API_KEY")
selected_model_id = "gemini-2.5-flash"
logger.warning("โš ๏ธ Main API key not provided, defaulting to Gemini-2.5-flash with env var.")
groq_api_key = settings["groq_api_key"] if settings.get("groq_api_key") else os.environ.get("GROQ_API_KEY")
# ๅˆๅง‹ๅŒ– Main Brain
if provider.lower() == "gemini":
main_brain = Gemini(id=selected_model_id, api_key=main_api_key, thinking_budget=1024,
safety_settings=gemini_safety_settings)
elif provider.lower() == "openai":
main_brain = OpenAIChat(id=selected_model_id, api_key=main_api_key)
elif provider.lower() == "groq":
main_brain = Groq(id=selected_model_id, api_key=main_api_key, temperature=0.1)
else:
main_brain = Gemini(id='gemini-2.5-flash', api_key=main_api_key)
# ๅˆๅง‹ๅŒ– Helper Model
if enable_fast_mode and groq_api_key:
helper_model = Groq(id=helper_model_id, api_key=groq_api_key, temperature=0.1)
else:
if provider.lower() == "gemini":
helper_model = Gemini(id="gemini-2.5-flash-lite", api_key=main_api_key,
safety_settings=gemini_safety_settings)
elif provider.lower() == "openai":
helper_model = OpenAIChat(id="gpt-4o-mini", api_key=main_api_key)
else:
helper_model = main_brain
models_dict = {
"team": main_brain,
"presenter": main_brain,
"scout": helper_model,
"optimizer": helper_model,
"navigator": helper_model,
"weatherman": helper_model
}
# 2. ๆบ–ๅ‚™ Tools (๐Ÿ”ฅ MCP ้‡ๆง‹ๆ ธๅฟƒ)
if not self._global_toolkits:
logger.warning("โš ๏ธ MCP Toolkit is NOT initialized! Agents will have no tools.")
def get_tool_list(agent_name):
toolkit = self._global_toolkits.get(agent_name)
return [toolkit] if toolkit else []
tools_dict = {
"scout": get_tool_list("scout"),
"optimizer": get_tool_list("optimizer"),
"navigator": get_tool_list("navigator"),
"weatherman": get_tool_list("weatherman"),
"presenter": get_tool_list("presenter"),
}
planner_kwargs = {
"additional_context": get_context(session.user_state),
"timezone_identifier": session.user_state.utc_offset,
"debug_mode": False,
}
team_kwargs = {"timezone_identifier": session.user_state.utc_offset}
# 3. ๅปบ็ซ‹ Agents
session.planner_agent = create_planner_agent(main_brain, planner_kwargs, session_id=session.session_id)
session.core_team = create_core_team(models_dict, team_kwargs, tools_dict, session_id=session.session_id)
self._active_sessions[session.session_id] = session
logger.info(f"โœ… Agents initialized (MCP Mode) for session {session.session_id}")
return session
# ================= Step 1: Analyze Tasks =================
def run_step1_analysis(self, user_input: str, auto_location: bool,
lat: float, lng: float, session: UserSession) -> Generator[Dict[str, Any], None, None]:
if not user_input or len(user_input.strip()) == 0:
yield {"type": "error", "message": "โš ๏ธ Please enter your plans first!",
"stream_text": "Waiting for input...", "block_next_step": True}
return
if auto_location and (lat == 0 or lng == 0):
yield {"type": "error", "message": "โš ๏ธ Location detection failed.", "stream_text": "Location Error...",
"block_next_step": True}
return
if not auto_location and (lat is None or lng is None):
yield {"type": "error", "message": "โš ๏ธ Please enter valid Latitude/Longitude.",
"stream_text": "Location Error...", "block_next_step": True}
return
try:
session = self.initialize_agents(session, lat, lng)
self._add_reasoning(session, "planner", "๐Ÿš€ Starting analysis...")
yield {"type": "stream", "stream_text": "๐Ÿค” Analyzing your request with AI...",
"agent_status": ("planner", "working", "Initializing..."), "session": session}
self._add_reasoning(session, "planner", f"Processing: {user_input[:50]}...")
current_text = "๐Ÿค” Analyzing your request with AI...\n๐Ÿ“‹ AI is extracting tasks..."
planner_stream = session.planner_agent.run(
f"help user to update the task_list, user's message: {user_input}",
stream=True, stream_events=True
)
accumulated_response, displayed_text = "", current_text + "\n\n"
for chunk in planner_stream:
if chunk.event == RunEvent.run_content:
content = chunk.content
accumulated_response += content
if "@@@" not in accumulated_response:
displayed_text += content
formatted_text = displayed_text.replace("\n", "<br/>")
yield {"type": "stream", "stream_text": formatted_text,
"agent_status": ("planner", "working", "Thinking..."), "session": session}
json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1]
json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ")
try:
task_list_data = json.loads(json_data)
if task_list_data["global_info"]["start_location"].lower() == "user location":
task_list_data["global_info"]["start_location"] = {"lat": lat, "lng": lng}
session.planner_agent.update_session_state(session_id=session.session_id,
session_state_updates={"task_list": task_list_data})
session.task_list = self._convert_task_list_to_ui_format(task_list_data)
except Exception as e:
logger.error(f"Failed to parse task_list: {e}")
session.task_list = []
if not session.task_list:
err_msg = "โš ๏ธ AI couldn't identify any tasks."
self._add_reasoning(session, "planner", "โŒ No tasks found")
yield {"type": "error", "message": err_msg, "stream_text": err_msg, "session": session,
"block_next_step": True}
return
if "priority" in session.task_list:
for i in session.task_list:
if not session.task_list[i].get("priority"):
session.task_list[i]["priority"] = "MEDIUM"
else:
session.task_list[i]["priority"] = session.task_list[i]["priority"].upper()
high_priority = sum(1 for t in session.task_list if t.get("priority") == "HIGH")
total_time = sum(int(t.get("duration", "0").split()[0]) for t in session.task_list if t.get("duration"))
yield {"type": "complete", "stream_text": "Analysis complete!",
"start_location": task_list_data["global_info"].get("start_location", "N/A"),
"high_priority": high_priority, "total_time": total_time,
"start_time": task_list_data["global_info"].get("departure_time", "N/A"), "session": session,
"block_next_step": False}
except Exception as e:
logger.error(f"Error: {e}")
yield {"type": "error", "message": str(e), "session": session, "block_next_step": True}
# ================= Task Modification (Chat) =================
def modify_task_chat(self, user_message: str, session: UserSession) -> Generator[Dict[str, Any], None, None]:
if not user_message or len(user_message.replace(' ', '')) == 0:
yield {"type": "chat_error", "message": "Please enter a message.", "session": session}
return
session = self._get_live_session(session)
session.chat_history.append(
{"role": "user", "message": user_message, "time": datetime.now().strftime("%H:%M:%S")})
yield {"type": "update_history", "session": session}
try:
if session.planner_agent is None:
if session.lat and session.lng:
session = self.initialize_agents(session, session.lat, session.lng)
else:
yield {"type": "chat_error", "message": "Session lost. Please restart.", "session": session}
return
session.chat_history.append(
{"role": "assistant", "message": "๐Ÿค” AI is thinking...", "time": datetime.now().strftime("%H:%M:%S")})
yield {"type": "update_history", "session": session}
planner_stream = session.planner_agent.run(
f"help user to update the task_list, user's message: {user_message}",
stream=True, stream_events=True
)
accumulated_response = ""
for chunk in planner_stream:
if chunk.event == RunEvent.run_content:
accumulated_response += chunk.content
json_data = "{" + accumulated_response.split("{", maxsplit=1)[-1]
json_data = json_data.replace("`", "").replace("@", "").replace("\\", " ").replace("\n", " ")
try:
task_list_data = json.loads(json_data)
if isinstance(task_list_data["global_info"]["start_location"], str) and task_list_data["global_info"][
"start_location"].lower() == "user location":
task_list_data["global_info"]["start_location"] = {"lat": session.lat, "lng": session.lng}
session.planner_agent.update_session_state(session_id=session.session_id,
session_state_updates={"task_list": task_list_data})
session.task_list = self._convert_task_list_to_ui_format(task_list_data)
except Exception as e:
logger.error(f"Failed to parse modified task_list: {e}")
raise e
high_priority = sum(1 for t in session.task_list if t.get("priority") == "HIGH")
total_time = sum(int(t.get("duration", "0").split()[0]) for t in session.task_list if t.get("duration"))
start_location = task_list_data["global_info"].get("start_location", "N/A")
date = task_list_data["global_info"].get("departure_time", "N/A")
summary_html = create_summary_card(len(session.task_list), high_priority, total_time, start_location, date)
session.chat_history[-1] = {"role": "assistant", "message": "โœ… Tasks updated.",
"time": datetime.now().strftime("%H:%M:%S")}
self._add_reasoning(session, "planner", f"Updated: {user_message[:30]}...")
yield {"type": "complete", "summary_html": summary_html, "session": session}
except Exception as e:
logger.error(f"Chat error: {e}")
session.chat_history.append(
{"role": "assistant", "message": f"โŒ Error: {str(e)}", "time": datetime.now().strftime("%H:%M:%S")})
yield {"type": "update_history", "session": session}
# ================= Step 3: Run Core Team =================
async def run_step3_team(self, session: UserSession) -> AsyncGenerator[Dict[str, Any], None]:
token = client_session_ctx.set(session.session_id)
attempt = 0
success = False
start_time = time.perf_counter()
try:
session = self._get_live_session(session)
sid = session.session_id
if sid in self._cancelled_sessions: self._cancelled_sessions.remove(sid)
if not session.task_list:
yield {"type": "error", "message": "No tasks to plan.", "session": session}
return
task_list_input = session.planner_agent.get_session_state()["task_list"]
task_list_str = json.dumps(task_list_input, indent=2, ensure_ascii=False) if isinstance(task_list_input, (
dict, list)) else str(task_list_input)
self._add_reasoning(session, "team", "๐ŸŽฏ Multi-agent collaboration started")
yield {"type": "reasoning_update", "session": session,
"agent_status": ("team", "working", "Analyzing tasks...")}
while attempt < max_retries and not success:
attempt += 1
try:
active_agents = set()
# ๐Ÿ”ฅ ้‡้ปžไฟฎๆ”น 1: ไฝฟ็”จ arun (Async Run)
# ๐Ÿ”ฅ ้‡้ปžไฟฎๆ”น 2: ้€™ๅ€‹ๆ–นๆณ•ๆœฌ่บซๅ›žๅ‚ณ็š„ๆ˜ฏไธ€ๅ€‹ AsyncGenerator๏ผŒๆ‰€ไปฅ่ฆ็›ดๆŽฅ iterate
team_stream = session.core_team.arun(
task_list_str,
stream=True,
stream_events=True,
session_id=session.session_id
)
report_content = ""
start_time = time.perf_counter()
has_content = False
# ๐Ÿ”ฅ ้‡้ปžไฟฎๆ”น 3: ไฝฟ็”จ async for ไพ†่ฟญไปฃ
async for event in team_stream:
if event.event in [RunEvent.run_content, RunEvent.tool_call_started]:
has_content = True
success = True
if sid in self._cancelled_sessions:
logger.warning(f"๐Ÿ›‘ Execution terminated by user for session {sid}")
self._cancelled_sessions.remove(sid)
yield {"type": "error", "message": "Plan cancelled by user."}
return
if event.event == RunEvent.run_started:
agent_id = event.agent_id or "team"
active_agents.add(agent_id)
if agent_id == "presenter": report_content = ""
yield {"type": "reasoning_update", "session": session,
"agent_status": (agent_id, "working", "Thinking...")}
elif event.event == RunEvent.run_completed:
agent_id = event.agent_id or "team"
if agent_id == "team":
yield {"type": "reasoning_update", "session": session,
"agent_status": ("team", "working", "Processing...")}
continue
if agent_id in active_agents: active_agents.remove(agent_id)
yield {"type": "reasoning_update", "session": session,
"agent_status": (agent_id, "idle", "Standby")}
yield {"type": "reasoning_update", "session": session,
"agent_status": ("team", "working", "Reviewing results...")}
elif event.event == RunEvent.run_content and event.agent_id == "presenter":
report_content += event.content
yield {"type": "report_stream", "content": report_content, "session": session}
elif event.event == TeamRunEvent.tool_call_started:
tool_name = event.tool.tool_name
yield {"type": "reasoning_update", "session": session,
"agent_status": ("team", "working", "Orchestrating...")}
if "delegate_task_to_member" in tool_name:
member_id = event.tool.tool_args.get("member_id", "unknown")
self._add_reasoning(session, "team", f"๐Ÿ‘‰ Delegating to {member_id}...")
yield {"type": "reasoning_update", "session": session,
"agent_status": (member_id, "working", "Receiving Task...")}
else:
self._add_reasoning(session, "team", f"๐Ÿ”ง Tool: {tool_name}")
elif event.event == RunEvent.tool_call_started:
member_id = event.agent_id
tool_name = event.tool.tool_name
yield {"type": "reasoning_update", "session": session,
"agent_status": ("team", "working", f"Monitoring {member_id}...")}
self._add_reasoning(session, member_id, f"Using tool: {tool_name}...")
yield {"type": "reasoning_update", "session": session,
"agent_status": (member_id, "working", f"Running Tool...")}
elif event.event == TeamRunEvent.run_completed:
self._add_reasoning(session, "team", "๐ŸŽ‰ Planning process finished")
if hasattr(event, 'metrics'):
logger.info(f"Total tokens: {event.metrics.total_tokens}")
logger.info(f"Input tokens: {event.metrics.input_tokens}")
logger.info(f"Output tokens: {event.metrics.output_tokens}")
if not has_content and attempt < max_retries: continue
break
finally:
logger.info(f"Run time (s): {time.perf_counter() - start_time}")
for agent in ["scout", "optimizer", "navigator", "weatherman", "presenter"]:
yield {"type": "reasoning_update", "session": session, "agent_status": (agent, "idle", "Standby")}
yield {"type": "reasoning_update", "session": session, "agent_status": ("team", "complete", "All Done!")}
session.final_report = report_html = f"## ๐ŸŽฏ Planning Complete\n\n{report_content}"
yield {"type": "complete", "report_html": report_html, "session": session,
"agent_status": ("team", "complete", "Finished")}
except GeneratorExit:
return
except Exception as e:
logger.error(f"Error in attempt {attempt}: {e}")
if attempt >= max_retries: yield {"type": "error", "message": str(e), "session": session}
finally:
client_session_ctx.reset(token)
# ================= Step 4: Finalize =================
def run_step4_finalize(self, session: UserSession) -> Dict[str, Any]:
try:
session = self._get_live_session(session)
final_ref_id = poi_repo.get_last_id_by_session(session.session_id)
if not final_ref_id:
raise ValueError(f"No results found, please Stop & Back to Edit to re-run the planning.")
structured_data = poi_repo.load(final_ref_id)
timeline_html = create_timeline_html_enhanced(structured_data.get("timeline", []))
metrics = structured_data.get("metrics", {})
traffic = structured_data.get("traffic_summary", {})
task_count = f"{metrics['completed_tasks']} / {metrics['total_tasks']}"
high_prio = sum(1 for t in session.task_list if t.get("priority") == "HIGH")
total_time = metrics.get("optimized_duration_min", traffic.get("total_duration_min", 0))
dist_m = metrics.get("optimized_distance_m", 0)
total_dist_km = dist_m / 1000.0
efficiency = metrics.get("route_efficiency_pct", 0)
saved_dist_m = metrics.get("distance_saved_m", 0)
saved_time_min = metrics.get("time_saved_min", 0)
start_location = structured_data.get("global_info", {}).get("start_location", {}).get("name", "N/A")
date = structured_data.get("global_info", {}).get("departure_time", "N/A")
summary_card = create_summary_card(task_count, high_prio, int(total_time), start_location, date)
eff_color = "#047857" if efficiency >= 80 else "#d97706"
eff_bg = "#ecfdf5" if efficiency >= 80 else "#fffbeb"
eff_border = "#a7f3d0" if efficiency >= 80 else "#fde68a"
ai_stats_html = f"""
<div style="display: flex; gap: 12px; margin-bottom: 20px;">
<div style="flex: 1; background: {eff_bg}; padding: 16px; border-radius: 12px; border: 1px solid {eff_border};">
<div style="font-size: 0.8rem; color: {eff_color}; font-weight: 600; display: flex; align-items: center; gap: 4px;"><span>๐Ÿš€ AI EFFICIENCY</span></div>
<div style="font-size: 1.8rem; font-weight: 800; color: {eff_color}; line-height: 1.2;">{efficiency:.1f}%</div>
<div style="font-size: 0.75rem; color: {eff_color}; opacity: 0.9; margin-top: 4px;">โšก Saved {saved_time_min:.0f} mins</div>
</div>
<div style="flex: 1; background: #eff6ff; padding: 16px; border-radius: 12px; border: 1px solid #bfdbfe;">
<div style="font-size: 0.8rem; color: #2563eb; font-weight: 600;">๐Ÿš— TOTAL DISTANCE</div>
<div style="font-size: 1.8rem; font-weight: 800; color: #1d4ed8; line-height: 1.2;">{total_dist_km:.2f} <span style="font-size: 1rem;">km</span></div>
<div style="font-size: 0.75rem; color: #2563eb; opacity: 0.9; margin-top: 4px;">๐Ÿ“‰ Reduced {saved_dist_m} m</div>
</div>
</div>"""
full_summary_html = f"{summary_card}{ai_stats_html}<h3>๐Ÿ“ Itinerary Timeline</h3>{timeline_html}"
map_fig = create_animated_map(structured_data)
task_list_html = self.generate_task_list_html(session)
session.planning_completed = True
return {"type": "success", "summary_tab_html": full_summary_html, "report_md": session.final_report,
"task_list_html": task_list_html, "map_fig": map_fig, "session": session}
except Exception as e:
logger.error(f"Finalize error: {e}", exc_info=True)
return {"type": "error", "message": str(e), "session": session}
# ================= Helpers =================
def _add_reasoning(self, session: UserSession, agent: str, message: str):
session.reasoning_messages.append(
{"agent": agent, "message": message, "time": datetime.now().strftime("%H:%M:%S")})
def _convert_task_list_to_ui_format(self, task_list_data):
ui_tasks = []
if isinstance(task_list_data, dict):
tasks = task_list_data.get("tasks", [])
elif isinstance(task_list_data, list):
tasks = task_list_data
else:
return []
for i, task in enumerate(tasks, 1):
ui_tasks.append({
"id": i,
"title": task.get("description", "Task"),
"priority": task.get("priority", "MEDIUM"),
"time": task.get("time_window", "Anytime"),
"duration": f"{task.get('service_duration_min', 30)} minutes",
"location": task.get("location_hint", "To be determined"),
"icon": self._get_task_icon(task.get("category", "other"))
})
return ui_tasks
def _get_task_icon(self, category: str) -> str:
icons = {"medical": "๐Ÿฅ", "shopping": "๐Ÿ›’", "postal": "๐Ÿ“ฎ", "food": "๐Ÿฝ๏ธ", "entertainment": "๐ŸŽญ",
"transportation": "๐Ÿš—", "other": "๐Ÿ“‹"}
return icons.get(category.lower(), "๐Ÿ“‹")
def generate_task_list_html(self, session: UserSession) -> str:
if not session.task_list: return "<p>No tasks available</p>"
html = ""
for task in session.task_list:
html += create_task_card(task["id"], task["title"], task["priority"], task["time"], task["duration"],
task["location"], task.get("icon", "๐Ÿ“‹"))
return html