Translsis's picture
Update app.py
6f66838 verified
import gradio as gr
import torch
import os
import time
import copy
from pathlib import Path
from typing import Optional, Tuple
import spaces
from vibevoice.modular.modeling_vibevoice_streaming_inference import (
VibeVoiceStreamingForConditionalGenerationInference,
)
from vibevoice.processor.vibevoice_streaming_processor import (
VibeVoiceStreamingProcessor,
)
class VoiceMapper:
"""Maps speaker names to voice file paths"""
def __init__(self):
self.setup_voice_presets()
# Change name according to our preset voice file
new_dict = {}
for name, path in self.voice_presets.items():
if "_" in name:
name = name.split("_")[0]
if "-" in name:
name = name.split("-")[-1]
new_dict[name] = path
self.voice_presets.update(new_dict)
def setup_voice_presets(self):
"""Setup voice presets by scanning the voices directory."""
voices_dir = os.path.join(os.path.dirname(__file__), "demo/voices/streaming_model")
# Check if voices directory exists
if not os.path.exists(voices_dir):
print(f"Warning: Voices directory not found at {voices_dir}")
self.voice_presets = {}
self.available_voices = {}
return
# Scan for all VOICE files in the voices directory
self.voice_presets = {}
# Get all .pt files in the voices directory
pt_files = [
f
for f in os.listdir(voices_dir)
if f.lower().endswith(".pt") and os.path.isfile(os.path.join(voices_dir, f))
]
# Create dictionary with filename (without extension) as key
for pt_file in pt_files:
# Remove .pt extension to get the name
name = os.path.splitext(pt_file)[0]
# Create full path
full_path = os.path.join(voices_dir, pt_file)
self.voice_presets[name] = full_path
# Sort the voice presets alphabetically by name for better UI
self.voice_presets = dict(sorted(self.voice_presets.items()))
# Filter out voices that don't exist (this is now redundant but kept for safety)
self.available_voices = {
name: path for name, path in self.voice_presets.items() if os.path.exists(path)
}
print(f"Found {len(self.available_voices)} voice files in {voices_dir}")
print(f"Available voices: {', '.join(self.available_voices.keys())}")
def get_voice_path(self, speaker_name: str) -> str:
"""Get voice file path for a given speaker name"""
# First try exact match
if speaker_name in self.voice_presets:
return self.voice_presets[speaker_name]
# Try partial matching (case insensitive)
speaker_lower = speaker_name.lower()
for preset_name, path in self.voice_presets.items():
if preset_name.lower() in speaker_lower or speaker_lower in preset_name.lower():
return path
# Default to first voice if no match found
default_voice = list(self.voice_presets.values())[0]
print(
f"Warning: No voice preset found for '{speaker_name}', using default voice: {default_voice}"
)
return default_voice
# Patch the _update_model_kwargs_for_generation method
def patched_update_model_kwargs_for_generation(
self,
outputs,
model_kwargs,
is_encoder_decoder=False,
model_inputs=None,
num_new_tokens=1,
):
"""Patched version that handles both dict and object-like outputs"""
# Handle both dict and object-like outputs for cache
cache_name = "past_key_values"
if isinstance(outputs, dict):
# For dict outputs, use .get() method
model_kwargs[cache_name] = outputs.get(cache_name)
else:
# For object outputs, try to get the attribute
model_kwargs[cache_name] = getattr(outputs, cache_name, None)
if getattr(self, "config", None) is not None:
if "token_type_ids" in model_kwargs and model_kwargs["token_type_ids"] is not None:
token_type_ids = model_kwargs["token_type_ids"]
model_kwargs["token_type_ids"] = torch.cat(
[token_type_ids, token_type_ids[:, -1:]], dim=-1
)
if not is_encoder_decoder:
# update attention mask
if "attention_mask" in model_kwargs and model_kwargs["attention_mask"] is not None:
attention_mask = model_kwargs["attention_mask"]
model_kwargs["attention_mask"] = torch.cat(
[attention_mask, attention_mask.new_ones((attention_mask.shape[0], 1))],
dim=-1,
)
else:
# update decoder attention mask
if "decoder_attention_mask" in model_kwargs and model_kwargs["decoder_attention_mask"] is not None:
decoder_attention_mask = model_kwargs["decoder_attention_mask"]
model_kwargs["decoder_attention_mask"] = torch.cat(
[
decoder_attention_mask,
decoder_attention_mask.new_ones((decoder_attention_mask.shape[0], 1)),
],
dim=-1,
)
if model_inputs is not None and "cache_position" in model_inputs:
model_kwargs["cache_position"] = model_inputs["cache_position"][-1:] + num_new_tokens
return model_kwargs
# Check if CUDA is available
CUDA_AVAILABLE = torch.cuda.is_available()
DEVICE = "cuda" if CUDA_AVAILABLE else "cpu"
DTYPE = torch.float16 if CUDA_AVAILABLE else torch.float32
print(f"CUDA available: {CUDA_AVAILABLE}")
print(f"Using device: {DEVICE}")
# Load model and processor directly
print("Loading VibeVoice-Realtime model...")
MODEL_PATH = "microsoft/VibeVoice-Realtime-0.5B"
# Load processor (CPU operation)
PROCESSOR = VibeVoiceStreamingProcessor.from_pretrained(MODEL_PATH)
# Load model - use appropriate dtype based on device
MODEL = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
MODEL_PATH,
torch_dtype=DTYPE,
device_map="cpu", # Always start on CPU for ZeroGPU compatibility
attn_implementation="sdpa",
)
# Apply the patch to the model instance
MODEL._update_model_kwargs_for_generation = patched_update_model_kwargs_for_generation.__get__(MODEL, type(MODEL))
MODEL.eval()
MODEL.set_ddpm_inference_steps(num_steps=5)
# Initialize voice mapper
VOICE_MAPPER = VoiceMapper()
print("Model loaded successfully!")
def move_to_device(obj, device):
"""Recursively move tensors in nested structures to device"""
if torch.is_tensor(obj):
return obj.to(device)
elif isinstance(obj, dict):
return {k: move_to_device(v, device) for k, v in obj.items()}
elif isinstance(obj, list):
return [move_to_device(item, device) for item in obj]
elif isinstance(obj, tuple):
return tuple(move_to_device(item, device) for item in obj)
else:
return obj
@spaces.GPU(duration=60) # Request GPU for 60 seconds
def generate_speech(
text: str,
speaker_name: str,
cfg_scale: float = 1.5,
progress=gr.Progress(),
) -> Tuple[Optional[str], str]:
"""
Generate speech from text using VibeVoice-Realtime with ZeroGPU
Args:
text: Input text to convert to speech
speaker_name: Name of the speaker voice to use
cfg_scale: Classifier-Free Guidance scale (higher = more faithful to text)
progress: Gradio progress tracker
Returns:
Tuple of (audio_path, status_message)
"""
if not text or not text.strip():
return None, "❌ Error: Please enter some text to convert to speech."
try:
# Detect actual device inside the decorated function
device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.float16 if device == "cuda" else torch.float32
progress(0, desc="Loading voice preset...")
# Clean text
full_script = text.strip().replace("'", "'").replace('"', '"').replace('"', '"')
# Get voice sample path
voice_sample = VOICE_MAPPER.get_voice_path(speaker_name)
# Load voice sample to CPU first
all_prefilled_outputs = torch.load(
voice_sample, map_location="cpu", weights_only=False
)
# Move model to the appropriate device
MODEL.to(device)
# Move voice sample tensors to device
all_prefilled_outputs = move_to_device(all_prefilled_outputs, device)
progress(0.2, desc="Preparing inputs...")
# Prepare inputs
inputs = PROCESSOR.process_input_with_cached_prompt(
text=full_script,
cached_prompt=all_prefilled_outputs,
padding=True,
return_tensors="pt",
return_attention_mask=True,
)
# Move input tensors to device
inputs = move_to_device(inputs, device)
progress(0.4, desc=f"Generating speech on {device.upper()}...")
# Generate audio
start_time = time.time()
# Use autocast only if on CUDA
if device == "cuda":
with torch.cuda.amp.autocast():
outputs = MODEL.generate(
**inputs,
max_new_tokens=None,
cfg_scale=cfg_scale,
tokenizer=PROCESSOR.tokenizer,
generation_config={"do_sample": False},
verbose=False,
all_prefilled_outputs=copy.deepcopy(all_prefilled_outputs)
if all_prefilled_outputs is not None
else None,
)
else:
outputs = MODEL.generate(
**inputs,
max_new_tokens=None,
cfg_scale=cfg_scale,
tokenizer=PROCESSOR.tokenizer,
generation_config={"do_sample": False},
verbose=False,
all_prefilled_outputs=copy.deepcopy(all_prefilled_outputs)
if all_prefilled_outputs is not None
else None,
)
generation_time = time.time() - start_time
progress(0.8, desc="Saving audio...")
# Calculate metrics
if outputs.speech_outputs and outputs.speech_outputs[0] is not None:
sample_rate = 24000
audio_samples = (
outputs.speech_outputs[0].shape[-1]
if len(outputs.speech_outputs[0].shape) > 0
else len(outputs.speech_outputs[0])
)
audio_duration = audio_samples / sample_rate
rtf = generation_time / audio_duration if audio_duration > 0 else float("inf")
# Save output
output_dir = "./outputs"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"generated_{int(time.time())}.wav")
PROCESSOR.save_audio(
outputs.speech_outputs[0].cpu(), # Move to CPU for saving
output_path=output_path,
)
progress(1.0, desc="Complete!")
# Create status message
device_info = "ZeroGPU (CUDA)" if device == "cuda" else "CPU"
status = f"""✅ **Generation Complete!**
📊 **Metrics:**
- Audio Duration: {audio_duration:.2f}s
- Generation Time: {generation_time:.2f}s
- Real-Time Factor: {rtf:.2f}x
- Speaker: {speaker_name}
- CFG Scale: {cfg_scale}
- Device: {device_info}
"""
# Move model back to CPU to free GPU memory
MODEL.to("cpu")
if device == "cuda":
torch.cuda.empty_cache()
return output_path, status
else:
MODEL.to("cpu")
if device == "cuda":
torch.cuda.empty_cache()
return None, "❌ Error: No audio output generated."
except Exception as e:
import traceback
error_msg = f"❌ Error during generation:\n{str(e)}\n\n{traceback.format_exc()}"
print(error_msg)
# Clean up GPU memory on error
try:
MODEL.to("cpu")
if torch.cuda.is_available():
torch.cuda.empty_cache()
except:
pass
return None, error_msg
# Create Gradio interface
with gr.Blocks(fill_height=True) as demo:
gr.Markdown(
f"""
# 🎙️ VibeVoice-Realtime Text-to-Speech
Convert text to natural-sounding speech using Microsoft's VibeVoice-Realtime model.
**🚀 Device:** {"ZeroGPU - Efficient GPU allocation for fast inference!" if CUDA_AVAILABLE else "CPU Mode - GPU will be allocated when generating"}
<div style="text-align: center; margin-top: 10px;">
<a href="https://huggingface.co/spaces/akhaliq/anycoder" target="_blank" style="text-decoration: none; color: #4F46E5; font-weight: 600;">
Built with anycoder ✨
</a>
</div>
"""
)
with gr.Row():
with gr.Column(scale=2):
# Input section
text_input = gr.Textbox(
label="Text to Convert",
placeholder="Enter the text you want to convert to speech...",
lines=8,
max_lines=20,
)
with gr.Row():
speaker_dropdown = gr.Dropdown(
choices=list(VOICE_MAPPER.available_voices.keys()),
value=list(VOICE_MAPPER.available_voices.keys())[0]
if VOICE_MAPPER.available_voices
else None,
label="Speaker Voice",
info="Select the voice to use for speech generation",
)
cfg_slider = gr.Slider(
minimum=1.0,
maximum=3.0,
value=1.5,
step=0.1,
label="CFG Scale",
info="Higher values = more faithful to text (1.0-3.0)",
)
generate_btn = gr.Button("🎵 Generate Speech", variant="primary", size="lg")
with gr.Column(scale=1):
# Output section
audio_output = gr.Audio(
label="Generated Speech",
type="filepath",
interactive=False,
)
status_output = gr.Markdown(
"""
**Status:** Ready to generate speech
Enter text and click "Generate Speech" to start.
⚡ GPU will be allocated dynamically for generation
"""
)
# Example inputs
gr.Examples(
examples=[
[
"VibeVoice is a novel framework designed for generating expressive, long-form, multi-speaker conversational audio.",
list(VOICE_MAPPER.available_voices.keys())[0]
if VOICE_MAPPER.available_voices
else "Wayne",
1.5,
],
[
"The quick brown fox jumps over the lazy dog. This is a test of the text-to-speech system.",
list(VOICE_MAPPER.available_voices.keys())[0]
if VOICE_MAPPER.available_voices
else "Wayne",
1.5,
],
],
inputs=[text_input, speaker_dropdown, cfg_slider],
label="Example Inputs",
)
# Event handlers
generate_btn.click(
fn=generate_speech,
inputs=[text_input, speaker_dropdown, cfg_slider],
outputs=[audio_output, status_output],
api_name="generate",
)
# Footer
gr.Markdown(
"""
---
### 📝 Notes:
- **Model**: Microsoft VibeVoice-Realtime-0.5B
- **Sample Rate**: 24kHz
- **Context Length**: 8K tokens
- **Generation Length**: ~10 minutes
- **Infrastructure**: ZeroGPU (Hugging Face Spaces)
### ⚠️ Important:
- The model is designed for English text only
- Very short inputs (< 3 words) may produce unstable results
- Code, formulas, and special symbols are not supported
- Please use responsibly and disclose AI-generated content
- GPU is allocated dynamically - generation may take a few seconds to start
"""
)
# Launch the app with Gradio 6 syntax
if __name__ == "__main__":
demo.launch(
theme=gr.themes.Soft(
primary_hue="blue",
secondary_hue="indigo",
neutral_hue="slate",
),
footer_links=[
{"label": "Built with anycoder", "url": "https://huggingface.co/spaces/akhaliq/anycoder"}
],
)