aastha-malik's picture
Upload-only: remove YouTube (DNS blocked on HF free tier), show clear message
337462b
Raw
History Blame Contribute Delete
7.41 kB
import os, sys, shutil, types, subprocess
import numpy as np
import cv2
import gradio as gr
# ── Paths ────────────────────────────────────────────────────────────
MODEL_DIR = "/tmp/models"
WORK_DIR = "/tmp/workspace"
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(f"{WORK_DIR}/temp", exist_ok=True)
os.makedirs(f"{WORK_DIR}/outputs", exist_ok=True)
# ── Model download ───────────────────────────────────────────────────
INSWAPPER_PATH = f"{MODEL_DIR}/inswapper_128.onnx"
def download_models():
from huggingface_hub import hf_hub_download
if not os.path.exists(INSWAPPER_PATH):
print("Downloading inswapper_128.onnx ...")
hf_hub_download(
repo_id="ezioruan/inswapper_128.onnx",
filename="inswapper_128.onnx",
local_dir=MODEL_DIR,
)
print("inswapper ready.")
download_models()
# ── Load models ──────────────────────────────────────────────────────
import insightface
from insightface.app import FaceAnalysis
import onnxruntime as ort
PROVIDERS = (
["CUDAExecutionProvider", "CPUExecutionProvider"]
if "CUDAExecutionProvider" in ort.get_available_providers()
else ["CPUExecutionProvider"]
)
print(f"Using providers: {PROVIDERS}")
face_app = FaceAnalysis(name="buffalo_l", providers=PROVIDERS)
face_app.prepare(ctx_id=0, det_size=(640, 640))
swapper = insightface.model_zoo.get_model(INSWAPPER_PATH, providers=PROVIDERS)
print("Models loaded.")
def to_h264(src: str, dst: str):
subprocess.run(
["ffmpeg", "-y", "-i", src,
"-vcodec", "libx264", "-acodec", "aac", "-preset", "fast",
dst, "-loglevel", "error"],
check=True,
)
# ── Core processing ──────────────────────────────────────────────────
def process(face_image, video_file, trim_seconds, progress=gr.Progress(track_tqdm=True)):
if face_image is None:
return None, "Please upload a source face image."
if video_file is None:
return None, "Please upload a video file."
try:
progress(0.0, desc="Detecting source face...")
# Source face
source_img = cv2.imread(face_image)
source_faces = face_app.get(source_img)
if not source_faces:
source_img_r = cv2.resize(source_img, (640, 640))
source_faces = face_app.get(source_img_r)
if not source_faces:
return None, "No face detected β€” use a clear, front-facing photo."
source_face = sorted(
source_faces,
key=lambda f: (f.bbox[2] - f.bbox[0]) * (f.bbox[3] - f.bbox[1]),
reverse=True,
)[0]
source_face.embedding /= np.linalg.norm(source_face.embedding)
# Prepare video
progress(0.05, desc="Preparing video...")
raw_video = f"{WORK_DIR}/temp/input.mp4"
converted = f"{WORK_DIR}/temp/input_h264.mp4"
shutil.copy(video_file, raw_video)
to_h264(raw_video, converted)
# Verify codec
cap_check = cv2.VideoCapture(converted)
ok, _ = cap_check.read()
cap_check.release()
if not ok:
return None, "Could not read the video β€” try a different file format."
# Trim
input_video = converted
if trim_seconds and int(trim_seconds) > 0:
trimmed = f"{WORK_DIR}/temp/input_trimmed.mp4"
subprocess.run(
["ffmpeg", "-y", "-i", converted,
"-t", str(int(trim_seconds)),
"-c:v", "libx264", "-c:a", "aac",
trimmed, "-loglevel", "error"],
check=True,
)
input_video = trimmed
# Video info
cap = cv2.VideoCapture(input_video)
fps = cap.get(cv2.CAP_PROP_FPS)
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Frame pipeline
temp_out = f"{WORK_DIR}/temp/no_audio.mp4"
final_out = f"{WORK_DIR}/outputs/face_swapped.mp4"
writer = cv2.VideoWriter(
temp_out, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h)
)
for i in range(total):
ret, frame = cap.read()
if not ret:
break
progress(0.1 + 0.8 * (i / total), desc=f"Frame {i+1}/{total}")
faces = face_app.get(frame)
result = frame.copy()
for face in faces:
result = swapper.get(result, face, source_face, paste_back=True)
writer.write(result)
cap.release()
writer.release()
# Merge audio
progress(0.92, desc="Merging audio...")
subprocess.run(
["ffmpeg", "-y",
"-i", temp_out, "-i", input_video,
"-map", "0:v:0", "-map", "1:a:0",
"-c:v", "copy", "-c:a", "aac", "-shortest",
final_out, "-loglevel", "error"],
)
if not os.path.exists(final_out):
shutil.copy(temp_out, final_out)
progress(1.0, desc="Done!")
size = os.path.getsize(final_out) / (1024 * 1024)
return final_out, f"Done! {total} frames | {size:.1f} MB"
except Exception as e:
return None, f"Error: {e}"
# ── Gradio UI ────────────────────────────────────────────────────────
with gr.Blocks(title="Face Fusion") as demo:
gr.Markdown("""
# 🎭 Face Fusion β€” AI Video Face Swap
Swap any face into a video using **InsightFace + inswapper_128**.
> **Note:** Runs on CPU β€” ~1–3 min per 10 seconds of video. For GPU speed, run the notebook on Kaggle.
""")
with gr.Row():
with gr.Column():
face_input = gr.Image(
label="Source Face Photo",
type="filepath",
height=220,
)
gr.Markdown("> ⚠️ **YouTube URLs don't work on HF free Spaces** (DNS blocked). Download your video locally first, then upload it below.")
video_input = gr.Video(label="Upload Video File")
trim_input = gr.Slider(
label="Trim to first N seconds (0 = full video)",
minimum=0, maximum=60, step=5, value=10,
)
run_btn = gr.Button("Run Face Swap", variant="primary", size="lg")
with gr.Column():
status_box = gr.Textbox(label="Status", interactive=False, lines=2)
video_out = gr.Video(label="Output Video", height=400)
gr.Markdown("""
---
**Tips for best results**
- Clear, front-facing photo β€” no sunglasses or heavy shadows
- Keep video under 15 seconds for reasonable CPU processing time
- Single-face videos give the cleanest swap
""")
run_btn.click(
fn=process,
inputs=[face_input, video_input, trim_input],
outputs=[video_out, status_box],
)
demo.launch()