# mcp_servers.py (FIXED: Added OpenAI & Nebius Support to get_llm_response) import asyncio import json import re import google.generativeai as genai from anthropic import AsyncAnthropic from openai import AsyncOpenAI from typing import Dict, Optional, Tuple, List, Any import config from utils import load_prompt from personas import PERSONAS_DATA EVALUATION_PROMPT_TEMPLATE = load_prompt(config.PROMPT_FILES["evaluator"]) # Schema definition EVALUATION_SCHEMA = { "type": "OBJECT", "properties": { "Novelty": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]}, "Usefulness_Feasibility": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]}, "Flexibility": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]}, "Elaboration": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]}, "Cultural_Appropriateness": {"type": "OBJECT", "properties": {"score": {"type": "INTEGER"}, "justification": {"type": "STRING"}}, "required": ["score", "justification"]} }, "required": ["Novelty", "Usefulness_Feasibility", "Flexibility", "Elaboration", "Cultural_Appropriateness"] } def extract_json(text: str) -> dict: try: clean_text = text.strip() if "```json" in clean_text: clean_text = clean_text.split("```json")[1].split("```")[0].strip() elif "```" in clean_text: clean_text = clean_text.split("```")[1].split("```")[0].strip() return json.loads(clean_text) except (json.JSONDecodeError, IndexError): try: match = re.search(r'(\{[\s\S]*\})', text) if match: return json.loads(match.group(1)) except: pass raise ValueError(f"Could not extract JSON from response: {text[:100]}...") class BusinessSolutionEvaluator: def __init__(self, gemini_client: Optional[genai.GenerativeModel]): if not gemini_client: raise ValueError("BusinessSolutionEvaluator requires a Google/Gemini client.") self.gemini_model = gemini_client async def evaluate(self, problem: str, solution_text: str) -> Tuple[dict, dict]: print(f"Evaluating solution (live): {solution_text[:50]}...") base_prompt = EVALUATION_PROMPT_TEMPLATE.format(problem=problem, solution_text=solution_text) schema_instruction = """ [IMPORTANT SYSTEM INSTRUCTION] Ignore any previous examples of JSON formatting in this prompt. You MUST strictly follow the Output Schema provided below. For EACH of the 5 metrics, you must provide an object with TWO fields: "score" (integer) and "justification" (string). Do not output a list. Return a single JSON object. """ final_prompt = base_prompt + schema_instruction usage = {"model": "Gemini", "input": 0, "output": 0} try: response = await self.gemini_model.generate_content_async( final_prompt, generation_config=genai.types.GenerationConfig( response_mime_type="application/json", response_schema=EVALUATION_SCHEMA ) ) if hasattr(response, "usage_metadata"): usage["input"] = response.usage_metadata.prompt_token_count usage["output"] = response.usage_metadata.candidates_token_count v_fitness = extract_json(response.text) if not isinstance(v_fitness, (dict, list)): raise ValueError(f"Judge returned invalid type: {type(v_fitness)}") return v_fitness, usage except Exception as e: print(f"ERROR: BusinessSolutionEvaluator failed: {e}") return {"Novelty": {"score": 1, "justification": f"Error: {str(e)}"}}, usage class AgentCalibrator: def __init__(self, api_clients: dict, evaluator: BusinessSolutionEvaluator): self.evaluator = evaluator self.api_clients = {name: client for name, client in api_clients.items() if client} self.sponsor_llms = list(self.api_clients.keys()) print(f"AgentCalibrator initialized with enabled clients: {self.sponsor_llms}") async def calibrate_team(self, problem: str) -> Tuple[Dict[str, Any], List[str], List[Dict[str, Any]], List[Dict[str, Any]]]: print(f"Running LIVE calibration test for specialist team on {self.sponsor_llms}...") error_log = [] detailed_results = [] all_usage_stats = [] if not self.sponsor_llms: raise Exception("AgentCalibrator cannot run: No LLM clients are configured.") if len(self.sponsor_llms) == 1: default_llm = self.sponsor_llms[0] print("Only one LLM available. Skipping calibration.") plan = { "Plant": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Plant"], "llm": default_llm}, "Implementer": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Implementer"], "llm": default_llm}, "Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": default_llm} } return plan, error_log, [], [] roles_to_test = {role: PERSONAS_DATA[key]["description"] for role, key in config.CALIBRATION_CONFIG["roles_to_test"].items()} test_problem = f"For the business problem '{problem}', generate a single, brief, one-paragraph concept-level solution." tasks = [] for role, persona in roles_to_test.items(): for llm_name in self.sponsor_llms: tasks.append(self.run_calibration_test(problem, role, llm_name, persona, test_problem)) results = await asyncio.gather(*tasks) detailed_results = results for res in results: if "usage_gen" in res: all_usage_stats.append(res["usage_gen"]) if "usage_eval" in res: all_usage_stats.append(res["usage_eval"]) best_llms = {} role_metrics = config.CALIBRATION_CONFIG["role_metrics"] for role in roles_to_test.keys(): best_score = -1 best_llm = self.sponsor_llms[0] for res in results: if res["role"] == role: if res.get("error"): error_log.append(f"Calibration failed for {res['llm']} (as {role}): {res['error']}") continue metric = role_metrics[role] raw_score_data = res.get("score", {}) if not isinstance(raw_score_data, (dict, list)): raw_score_data = {} if isinstance(raw_score_data, list): raw_score_data = raw_score_data[0] if len(raw_score_data) > 0 else {} metric_data = raw_score_data.get(metric, {}) if not isinstance(metric_data, (dict, list)): metric_data = {} if isinstance(metric_data, list): metric_data = metric_data[0] if len(metric_data) > 0 else {} score = metric_data.get("score", 0) if score > best_score: best_score = score best_llm = res["llm"] best_llms[role] = best_llm team_plan = { "Plant": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Plant"], "llm": best_llms["Plant"]}, "Implementer": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Implementer"], "llm": best_llms["Implementer"]}, "Monitor": {"persona": config.CALIBRATION_CONFIG["roles_to_test"]["Monitor"], "llm": best_llms["Monitor"]} } print(f"Calibration complete (live). Team plan: {team_plan}") return team_plan, error_log, detailed_results, all_usage_stats async def run_calibration_test(self, problem, role, llm_name, persona, test_problem): client = self.api_clients[llm_name] solution, gen_usage = await get_llm_response(llm_name, client, persona, test_problem) if "Error generating response" in solution: return {"role": role, "llm": llm_name, "error": solution, "output": solution, "usage_gen": gen_usage} score, eval_usage = await self.evaluator.evaluate(problem, solution) return { "role": role, "llm": llm_name, "score": score, "output": solution, "usage_gen": gen_usage, "usage_eval": eval_usage } # --- UPDATED: Handles OpenAI and Nebius --- async def get_llm_response(client_name: str, client, system_prompt: str, user_prompt: str) -> Tuple[str, dict]: """Returns (text_response, usage_dict)""" usage = {"model": client_name, "input": 0, "output": 0} try: if client_name == "Gemini": model = client full_prompt = [{'role': 'user', 'parts': [system_prompt]}, {'role': 'model', 'parts': ["Understood."]}, {'role': 'user', 'parts': [user_prompt]}] response = await model.generate_content_async(full_prompt) if hasattr(response, "usage_metadata"): usage["input"] = response.usage_metadata.prompt_token_count usage["output"] = response.usage_metadata.candidates_token_count return response.text, usage elif client_name == "Anthropic": response = await client.messages.create( model=config.MODELS["Anthropic"]["default"], max_tokens=8192, system=system_prompt, messages=[{"role": "user", "content": user_prompt}] ) if hasattr(response, "usage"): usage["input"] = response.usage.input_tokens usage["output"] = response.usage.output_tokens return response.content[0].text, usage # --- THIS IS THE PART THAT WAS MISSING OR INCOMPLETE --- elif client_name in ["SambaNova", "OpenAI", "Nebius"]: # Dynamically get the correct model ID from config.py model_id = config.MODELS.get(client_name, {}).get("default", "gpt-4o-mini") completion = await client.chat.completions.create( model=model_id, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] ) if hasattr(completion, "usage"): usage["input"] = completion.usage.prompt_tokens usage["output"] = completion.usage.completion_tokens return completion.choices[0].message.content, usage except Exception as e: error_message = f"Error generating response from {client_name}: {str(e)}" print(f"ERROR: API call to {client_name} failed: {e}") return error_message, usage