Spaces:
Running
Running
| import os | |
| import json | |
| import subprocess | |
| from pathlib import Path | |
| from urllib.parse import urlparse, parse_qs | |
| import requests | |
| import yt_dlp | |
| # ====== CONFIG ====== # | |
| HF_API_KEY = os.getenv("HF_API_KEY") # you will set this in Space Settings | |
| HF_VLM_MODEL = os.getenv("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-3B-Instruct") | |
| HF_API_URL = f"https://huggingface.co/proxy/api-inference.huggingface.co/models/{HF_VLM_MODEL}" | |
| # ====== YOUTUBE HELPERS ====== # | |
| def get_youtube_video_id(url: str) -> str | None: | |
| try: | |
| p = urlparse(url) | |
| if p.netloc in ("youtu.be", "www.youtu.be"): | |
| return p.path.lstrip("/") | |
| if "youtube.com" in p.netloc: | |
| qs = parse_qs(p.query) | |
| return qs.get("v", [None])[0] | |
| except Exception: | |
| return None | |
| return None | |
| def download_video(url: str, out_path: str = "video.mp4") -> tuple[str, float]: | |
| """ | |
| Download YouTube video to out_path and return (path, duration_seconds) | |
| """ | |
| ydl_opts = { | |
| "format": "mp4", | |
| "outtmpl": out_path, | |
| "quiet": True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| duration = float(info.get("duration", 60.0)) # default 60s if unknown | |
| return out_path, duration | |
| # ====== VLM CALL ====== # | |
| def call_vlm_with_thumbnail(thumb_url: str, product_text: str) -> dict: | |
| """ | |
| Ask the VLM where to put the product in the frame. | |
| Returns a dict with bbox etc. | |
| """ | |
| if not HF_API_KEY: | |
| raise RuntimeError("HF_API_KEY is not set in Space variables.") | |
| system_prompt = """ | |
| You are an AI video ad editor. | |
| You see a single video thumbnail and a product/brand description. | |
| Choose where in the image the product could naturally appear. | |
| Return ONLY JSON with this schema: | |
| { | |
| "naturalness_score": float, | |
| "reason": "short reason", | |
| "bbox": { | |
| "x": float, // center x between 0 and 1 | |
| "y": float, // center y between 0 and 1 | |
| "w": float, // width between 0 and 1 | |
| "h": float // height between 0 and 1 | |
| }, | |
| "object_description": "short description", | |
| "new_subtitle": "optional subtitle text or empty string" | |
| } | |
| """ | |
| user_prompt = f"Product / brand description:\n{product_text}\n\nImage URL: {thumb_url}" | |
| payload = { | |
| "inputs": { | |
| "prompt": system_prompt + "\n\n" + user_prompt, | |
| "image": thumb_url, | |
| }, | |
| "parameters": { | |
| "max_new_tokens": 320 | |
| }, | |
| "options": { | |
| "wait_for_model": True | |
| } | |
| } | |
| headers = {"Authorization": f"Bearer {HF_API_KEY}"} | |
| r = requests.post(HF_API_URL, headers=headers, json=payload, timeout=180) | |
| if r.status_code >= 400: | |
| raise RuntimeError(f"Hugging Face error {r.status_code}: {r.text[:200]}") | |
| data = r.json() | |
| # Many HF endpoints return [{"generated_text": "..."}] | |
| if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict) and "generated_text" in data[0]: | |
| text = data[0]["generated_text"] | |
| elif isinstance(data, dict) and "generated_text" in data: | |
| text = data["generated_text"] | |
| elif isinstance(data, dict): | |
| return data | |
| else: | |
| text = str(data) | |
| # Try to parse JSON inside the text | |
| try: | |
| return json.loads(text) | |
| except json.JSONDecodeError: | |
| start = text.find("{") | |
| end = text.rfind("}") | |
| if start != -1 and end != -1 and end > start: | |
| return json.loads(text[start:end+1]) | |
| raise RuntimeError(f"VLM did not return valid JSON: {text[:200]}") | |
| # ====== FFMPEG OVERLAY (simple box) ====== # | |
| def overlay_product_box( | |
| input_video: str, | |
| output_video: str, | |
| bbox: dict, | |
| t_start: float, | |
| t_end: float, | |
| ) -> str: | |
| """ | |
| Draw a semi-transparent colored box where the product should appear | |
| between t_start and t_end seconds. | |
| bbox coords are normalized [0,1] and treated as CENTER. | |
| """ | |
| x = float(bbox.get("x", 0.5)) | |
| y = float(bbox.get("y", 0.5)) | |
| w = float(bbox.get("w", 0.2)) | |
| h = float(bbox.get("h", 0.2)) | |
| # ffmpeg expression: use iw/ih (image width/height) | |
| # Center-based box: | |
| drawbox = ( | |
| f"drawbox=x=iw*({x}-{w}/2):y=ih*({y}-{h}/2):" | |
| f"w=iw*{w}:h=ih*{h}:[email protected]:t=fill:" | |
| f"enable='between(t,{t_start},{t_end})'" | |
| ) | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", input_video, | |
| "-vf", drawbox, | |
| "-c:a", "copy", | |
| output_video, | |
| ] | |
| subprocess.run(cmd, check=True) | |
| return output_video | |
| # ====== MAIN PIPELINE ====== # | |
| def run_halftime_pipeline(youtube_url: str, product_text: str) -> str: | |
| """ | |
| 1. Figure out YouTube ID and thumbnail URL. | |
| 2. Ask VLM for bbox. | |
| 3. Download video. | |
| 4. Draw colored box where product should appear for 3 seconds. | |
| 5. Return path to edited video. | |
| """ | |
| vid = get_youtube_video_id(youtube_url) | |
| if not vid: | |
| raise RuntimeError("Could not parse YouTube video ID.") | |
| thumb_url = f"https://img.youtube.com/vi/{vid}/maxresdefault.jpg" | |
| placement = call_vlm_with_thumbnail(thumb_url, product_text) | |
| bbox = placement.get("bbox", {"x": 0.5, "y": 0.5, "w": 0.2, "h": 0.2}) | |
| video_path, duration = download_video(youtube_url, "input.mp4") | |
| # Choose middle of the video as the 'ad moment' | |
| center = duration / 2.0 | |
| t_start = max(center - 1.5, 0.0) | |
| t_end = min(center + 1.5, duration) | |
| output_path = "edited.mp4" | |
| overlay_product_box(video_path, output_path, bbox, t_start, t_end) | |
| return output_path |