| import os, sys, shutil, types, subprocess |
| import numpy as np |
| import cv2 |
| import gradio as gr |
|
|
| |
| MODEL_DIR = "/tmp/models" |
| WORK_DIR = "/tmp/workspace" |
| os.makedirs(MODEL_DIR, exist_ok=True) |
| os.makedirs(f"{WORK_DIR}/temp", exist_ok=True) |
| os.makedirs(f"{WORK_DIR}/outputs", exist_ok=True) |
|
|
| |
| INSWAPPER_PATH = f"{MODEL_DIR}/inswapper_128.onnx" |
|
|
| def download_models(): |
| from huggingface_hub import hf_hub_download |
| if not os.path.exists(INSWAPPER_PATH): |
| print("Downloading inswapper_128.onnx ...") |
| hf_hub_download( |
| repo_id="ezioruan/inswapper_128.onnx", |
| filename="inswapper_128.onnx", |
| local_dir=MODEL_DIR, |
| ) |
| print("inswapper ready.") |
|
|
| download_models() |
|
|
| |
| import insightface |
| from insightface.app import FaceAnalysis |
| import onnxruntime as ort |
|
|
| PROVIDERS = ( |
| ["CUDAExecutionProvider", "CPUExecutionProvider"] |
| if "CUDAExecutionProvider" in ort.get_available_providers() |
| else ["CPUExecutionProvider"] |
| ) |
| print(f"Using providers: {PROVIDERS}") |
|
|
| face_app = FaceAnalysis(name="buffalo_l", providers=PROVIDERS) |
| face_app.prepare(ctx_id=0, det_size=(640, 640)) |
|
|
| swapper = insightface.model_zoo.get_model(INSWAPPER_PATH, providers=PROVIDERS) |
|
|
| print("Models loaded.") |
|
|
|
|
| def to_h264(src: str, dst: str): |
| subprocess.run( |
| ["ffmpeg", "-y", "-i", src, |
| "-vcodec", "libx264", "-acodec", "aac", "-preset", "fast", |
| dst, "-loglevel", "error"], |
| check=True, |
| ) |
|
|
|
|
|
|
| |
| def process(face_image, video_file, trim_seconds, progress=gr.Progress(track_tqdm=True)): |
| if face_image is None: |
| return None, "Please upload a source face image." |
| if video_file is None: |
| return None, "Please upload a video file." |
|
|
| try: |
| progress(0.0, desc="Detecting source face...") |
|
|
| |
| source_img = cv2.imread(face_image) |
| source_faces = face_app.get(source_img) |
| if not source_faces: |
| source_img_r = cv2.resize(source_img, (640, 640)) |
| source_faces = face_app.get(source_img_r) |
| if not source_faces: |
| return None, "No face detected β use a clear, front-facing photo." |
|
|
| source_face = sorted( |
| source_faces, |
| key=lambda f: (f.bbox[2] - f.bbox[0]) * (f.bbox[3] - f.bbox[1]), |
| reverse=True, |
| )[0] |
| source_face.embedding /= np.linalg.norm(source_face.embedding) |
|
|
| |
| progress(0.05, desc="Preparing video...") |
| raw_video = f"{WORK_DIR}/temp/input.mp4" |
| converted = f"{WORK_DIR}/temp/input_h264.mp4" |
|
|
| shutil.copy(video_file, raw_video) |
| to_h264(raw_video, converted) |
|
|
| |
| cap_check = cv2.VideoCapture(converted) |
| ok, _ = cap_check.read() |
| cap_check.release() |
| if not ok: |
| return None, "Could not read the video β try a different file format." |
|
|
| |
| input_video = converted |
| if trim_seconds and int(trim_seconds) > 0: |
| trimmed = f"{WORK_DIR}/temp/input_trimmed.mp4" |
| subprocess.run( |
| ["ffmpeg", "-y", "-i", converted, |
| "-t", str(int(trim_seconds)), |
| "-c:v", "libx264", "-c:a", "aac", |
| trimmed, "-loglevel", "error"], |
| check=True, |
| ) |
| input_video = trimmed |
|
|
| |
| cap = cv2.VideoCapture(input_video) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
| |
| temp_out = f"{WORK_DIR}/temp/no_audio.mp4" |
| final_out = f"{WORK_DIR}/outputs/face_swapped.mp4" |
|
|
| writer = cv2.VideoWriter( |
| temp_out, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h) |
| ) |
|
|
| for i in range(total): |
| ret, frame = cap.read() |
| if not ret: |
| break |
| progress(0.1 + 0.8 * (i / total), desc=f"Frame {i+1}/{total}") |
|
|
| faces = face_app.get(frame) |
| result = frame.copy() |
| for face in faces: |
| result = swapper.get(result, face, source_face, paste_back=True) |
| writer.write(result) |
|
|
| cap.release() |
| writer.release() |
|
|
| |
| progress(0.92, desc="Merging audio...") |
| subprocess.run( |
| ["ffmpeg", "-y", |
| "-i", temp_out, "-i", input_video, |
| "-map", "0:v:0", "-map", "1:a:0", |
| "-c:v", "copy", "-c:a", "aac", "-shortest", |
| final_out, "-loglevel", "error"], |
| ) |
| if not os.path.exists(final_out): |
| shutil.copy(temp_out, final_out) |
|
|
| progress(1.0, desc="Done!") |
| size = os.path.getsize(final_out) / (1024 * 1024) |
| return final_out, f"Done! {total} frames | {size:.1f} MB" |
|
|
| except Exception as e: |
| return None, f"Error: {e}" |
|
|
|
|
| |
| with gr.Blocks(title="Face Fusion") as demo: |
|
|
| gr.Markdown(""" |
| # π Face Fusion β AI Video Face Swap |
| Swap any face into a video using **InsightFace + inswapper_128**. |
| |
| > **Note:** Runs on CPU β ~1β3 min per 10 seconds of video. For GPU speed, run the notebook on Kaggle. |
| """) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| face_input = gr.Image( |
| label="Source Face Photo", |
| type="filepath", |
| height=220, |
| ) |
| gr.Markdown("> β οΈ **YouTube URLs don't work on HF free Spaces** (DNS blocked). Download your video locally first, then upload it below.") |
| video_input = gr.Video(label="Upload Video File") |
| trim_input = gr.Slider( |
| label="Trim to first N seconds (0 = full video)", |
| minimum=0, maximum=60, step=5, value=10, |
| ) |
| run_btn = gr.Button("Run Face Swap", variant="primary", size="lg") |
|
|
| with gr.Column(): |
| status_box = gr.Textbox(label="Status", interactive=False, lines=2) |
| video_out = gr.Video(label="Output Video", height=400) |
|
|
| gr.Markdown(""" |
| --- |
| **Tips for best results** |
| - Clear, front-facing photo β no sunglasses or heavy shadows |
| - Keep video under 15 seconds for reasonable CPU processing time |
| - Single-face videos give the cleanest swap |
| """) |
|
|
| run_btn.click( |
| fn=process, |
| inputs=[face_input, video_input, trim_input], |
| outputs=[video_out, status_box], |
| ) |
|
|
| demo.launch() |
|
|