Spaces:
Sleeping
Sleeping
File size: 5,575 Bytes
12e33e8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
import os
import json
import subprocess
from pathlib import Path
from urllib.parse import urlparse, parse_qs
import requests
import yt_dlp
# ====== CONFIG ====== #
HF_API_KEY = os.getenv("HF_API_KEY") # you will set this in Space Settings
HF_VLM_MODEL = os.getenv("HF_VLM_MODEL", "Qwen/Qwen2.5-VL-3B-Instruct")
HF_API_URL = f"https://huggingface.co/proxy/api-inference.huggingface.co/models/{HF_VLM_MODEL}"
# ====== YOUTUBE HELPERS ====== #
def get_youtube_video_id(url: str) -> str | None:
try:
p = urlparse(url)
if p.netloc in ("youtu.be", "www.youtu.be"):
return p.path.lstrip("/")
if "youtube.com" in p.netloc:
qs = parse_qs(p.query)
return qs.get("v", [None])[0]
except Exception:
return None
return None
def download_video(url: str, out_path: str = "video.mp4") -> tuple[str, float]:
"""
Download YouTube video to out_path and return (path, duration_seconds)
"""
ydl_opts = {
"format": "mp4",
"outtmpl": out_path,
"quiet": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
duration = float(info.get("duration", 60.0)) # default 60s if unknown
return out_path, duration
# ====== VLM CALL ====== #
def call_vlm_with_thumbnail(thumb_url: str, product_text: str) -> dict:
"""
Ask the VLM where to put the product in the frame.
Returns a dict with bbox etc.
"""
if not HF_API_KEY:
raise RuntimeError("HF_API_KEY is not set in Space variables.")
system_prompt = """
You are an AI video ad editor.
You see a single video thumbnail and a product/brand description.
Choose where in the image the product could naturally appear.
Return ONLY JSON with this schema:
{
"naturalness_score": float,
"reason": "short reason",
"bbox": {
"x": float, // center x between 0 and 1
"y": float, // center y between 0 and 1
"w": float, // width between 0 and 1
"h": float // height between 0 and 1
},
"object_description": "short description",
"new_subtitle": "optional subtitle text or empty string"
}
"""
user_prompt = f"Product / brand description:\n{product_text}\n\nImage URL: {thumb_url}"
payload = {
"inputs": {
"prompt": system_prompt + "\n\n" + user_prompt,
"image": thumb_url,
},
"parameters": {
"max_new_tokens": 320
},
"options": {
"wait_for_model": True
}
}
headers = {"Authorization": f"Bearer {HF_API_KEY}"}
r = requests.post(HF_API_URL, headers=headers, json=payload, timeout=180)
if r.status_code >= 400:
raise RuntimeError(f"Hugging Face error {r.status_code}: {r.text[:200]}")
data = r.json()
# Many HF endpoints return [{"generated_text": "..."}]
if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict) and "generated_text" in data[0]:
text = data[0]["generated_text"]
elif isinstance(data, dict) and "generated_text" in data:
text = data["generated_text"]
elif isinstance(data, dict):
return data
else:
text = str(data)
# Try to parse JSON inside the text
try:
return json.loads(text)
except json.JSONDecodeError:
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and end > start:
return json.loads(text[start:end+1])
raise RuntimeError(f"VLM did not return valid JSON: {text[:200]}")
# ====== FFMPEG OVERLAY (simple box) ====== #
def overlay_product_box(
input_video: str,
output_video: str,
bbox: dict,
t_start: float,
t_end: float,
) -> str:
"""
Draw a semi-transparent colored box where the product should appear
between t_start and t_end seconds.
bbox coords are normalized [0,1] and treated as CENTER.
"""
x = float(bbox.get("x", 0.5))
y = float(bbox.get("y", 0.5))
w = float(bbox.get("w", 0.2))
h = float(bbox.get("h", 0.2))
# ffmpeg expression: use iw/ih (image width/height)
# Center-based box:
drawbox = (
f"drawbox=x=iw*({x}-{w}/2):y=ih*({y}-{h}/2):"
f"w=iw*{w}:h=ih*{h}:[email protected]:t=fill:"
f"enable='between(t,{t_start},{t_end})'"
)
cmd = [
"ffmpeg",
"-y",
"-i", input_video,
"-vf", drawbox,
"-c:a", "copy",
output_video,
]
subprocess.run(cmd, check=True)
return output_video
# ====== MAIN PIPELINE ====== #
def run_halftime_pipeline(youtube_url: str, product_text: str) -> str:
"""
1. Figure out YouTube ID and thumbnail URL.
2. Ask VLM for bbox.
3. Download video.
4. Draw colored box where product should appear for 3 seconds.
5. Return path to edited video.
"""
vid = get_youtube_video_id(youtube_url)
if not vid:
raise RuntimeError("Could not parse YouTube video ID.")
thumb_url = f"https://img.youtube.com/vi/{vid}/maxresdefault.jpg"
placement = call_vlm_with_thumbnail(thumb_url, product_text)
bbox = placement.get("bbox", {"x": 0.5, "y": 0.5, "w": 0.2, "h": 0.2})
video_path, duration = download_video(youtube_url, "input.mp4")
# Choose middle of the video as the 'ad moment'
center = duration / 2.0
t_start = max(center - 1.5, 0.0)
t_end = min(center + 1.5, duration)
output_path = "edited.mp4"
overlay_product_box(video_path, output_path, bbox, t_start, t_end)
return output_path |