AI-tools / minicpm_video_caption.py
dx8152's picture
Upload 10 files
35d337f verified
from __future__ import annotations
import argparse
import gc
import math
import os
from pathlib import Path
import numpy as np
import torch
from decord import VideoReader, cpu
from PIL import Image
from scipy.spatial import cKDTree
from transformers import AutoModel, AutoTokenizer
DEFAULT_MAX_NUM_FRAMES = 180
MAX_NUM_PACKING = 3
TIME_SCALE = 0.1
def map_to_nearest_scale(values, scale):
tree = cKDTree(np.asarray(scale)[:, None])
_, indices = tree.query(np.asarray(values)[:, None])
return np.asarray(scale)[indices]
def group_array(arr, size):
return [arr[i : i + size] for i in range(0, len(arr), size)]
def uniform_sample(items, n):
if n <= 0:
return []
gap = len(items) / n
return [items[min(len(items) - 1, int(i * gap + gap / 2))] for i in range(n)]
def build_low_vram_device_map(gpu_layers: int) -> dict[str, int | str]:
gpu_layers = max(4, min(24, gpu_layers))
device_map: dict[str, int | str] = {
"vpm": 0,
"resampler": 0,
"llm.model.embed_tokens": 0,
}
for layer_idx in range(36):
device_map[f"llm.model.layers.{layer_idx}"] = 0 if layer_idx < gpu_layers else "cpu"
device_map["llm.model.norm"] = "cpu"
device_map["llm.model.rotary_emb"] = "cpu"
device_map["llm.lm_head"] = "cpu"
return device_map
def encode_video(
video_path: Path,
choose_fps: int = 5,
max_num_frames: int = DEFAULT_MAX_NUM_FRAMES,
force_packing=None,
):
# The bundled decord build may not include CUDA video decoding. Frame extraction
# is cheap for these short clips, while the MiniCPM model itself still runs on GPU.
vr = VideoReader(str(video_path), ctx=cpu(0))
fps = vr.get_avg_fps()
video_duration = len(vr) / fps
if choose_fps * int(video_duration) <= max_num_frames:
packing_nums = 1
choose_frames = round(min(choose_fps, round(fps)) * min(max_num_frames, video_duration))
else:
packing_nums = math.ceil(video_duration * choose_fps / max_num_frames)
if packing_nums <= MAX_NUM_PACKING:
choose_frames = round(video_duration * choose_fps)
else:
choose_frames = round(max_num_frames * MAX_NUM_PACKING)
packing_nums = MAX_NUM_PACKING
choose_frames = max(1, min(len(vr), choose_frames))
frame_idx = np.array(uniform_sample(list(range(len(vr))), choose_frames))
if force_packing:
packing_nums = min(force_packing, MAX_NUM_PACKING)
frames = vr.get_batch(frame_idx).asnumpy()
frame_idx_ts = frame_idx / fps
scale = np.arange(0, video_duration, TIME_SCALE)
frame_ts_id = map_to_nearest_scale(frame_idx_ts, scale) / TIME_SCALE
frame_ts_id = frame_ts_id.astype(np.int32)
frames = [Image.fromarray(frame.astype("uint8")).convert("RGB") for frame in frames]
frame_ts_id_group = group_array(frame_ts_id, packing_nums)
return frames, frame_ts_id_group
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--video", required=True)
parser.add_argument("--model", required=True)
parser.add_argument("--fps", type=int, default=5)
parser.add_argument("--device", default="cuda:0")
parser.add_argument("--max-frames", type=int, default=DEFAULT_MAX_NUM_FRAMES)
parser.add_argument("--low-vram", action="store_true")
parser.add_argument("--gpu-memory-gb", type=int, default=8)
parser.add_argument("--cpu-memory-gb", type=int, default=48)
parser.add_argument("--gpu-layers", type=int, default=10)
args = parser.parse_args()
video_path = Path(args.video)
model_path = Path(args.model)
output_path = video_path.with_suffix(".txt")
if args.device.startswith("cuda") and not torch.cuda.is_available():
raise RuntimeError("请求使用 CUDA,但 torch.cuda.is_available() 为 False")
os.environ.setdefault("CUDA_VISIBLE_DEVICES", "0")
torch.manual_seed(100)
device = torch.device(args.device if torch.cuda.is_available() else "cpu")
dtype = torch.float16 if device.type == "cuda" else torch.float32
print(f"[MiniCPM] torch={torch.__version__}, device={device}, cuda={torch.cuda.is_available()}", flush=True)
if device.type == "cuda":
print(f"[MiniCPM] gpu={torch.cuda.get_device_name(device.index or 0)}", flush=True)
print(
f"[MiniCPM] low_vram={args.low_vram}, fps={args.fps}, max_frames={args.max_frames}, gpu_mem={args.gpu_memory_gb}GiB, gpu_layers={args.gpu_layers}",
flush=True,
)
load_kwargs = {
"trust_remote_code": True,
"attn_implementation": "sdpa",
"local_files_only": True,
}
if device.type == "cuda":
load_kwargs["torch_dtype"] = torch.float16
if args.low_vram:
offload_dir = model_path.parent / "_minicpm_offload"
offload_dir.mkdir(parents=True, exist_ok=True)
load_kwargs.update(
{
"device_map": build_low_vram_device_map(args.gpu_layers),
"max_memory": {0: f"{max(4, args.gpu_memory_gb)}GiB", "cpu": f"{max(16, args.cpu_memory_gb)}GiB"},
"low_cpu_mem_usage": True,
"offload_folder": str(offload_dir),
"offload_state_dict": True,
"offload_buffers": True,
}
)
else:
load_kwargs["device_map"] = {"": str(device)}
else:
load_kwargs["torch_dtype"] = dtype
model = AutoModel.from_pretrained(str(model_path), **load_kwargs)
model = model.eval()
if device.type == "cuda" and not args.low_vram:
first_param = next(model.parameters())
print(f"[MiniCPM] model parameter device={first_param.device}, dtype={first_param.dtype}", flush=True)
elif device.type == "cuda":
print(f"[MiniCPM] device_map={getattr(model, 'hf_device_map', None)}", flush=True)
else:
model = model.to(device=device, dtype=dtype)
tokenizer = AutoTokenizer.from_pretrained(str(model_path), trust_remote_code=True, local_files_only=True)
frames, temporal_ids = encode_video(
video_path,
choose_fps=args.fps,
max_num_frames=max(16, args.max_frames),
)
question = (
"Describe this video as an English training caption for a text-to-video model. "
"Focus on subject, action, camera movement, scene, lighting, style, and important visual details. "
"Return one concise prompt only, no bullet points."
)
msgs = [{"role": "user", "content": frames + [question]}]
with torch.inference_mode():
answer = model.chat(
msgs=msgs,
tokenizer=tokenizer,
use_image_id=False,
max_slice_nums=1,
temporal_ids=temporal_ids,
)
text = str(answer).strip()
output_path.write_text(text, encoding="utf-8")
print(text)
del frames, temporal_ids, msgs, answer, tokenizer, model
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
return 0
if __name__ == "__main__":
raise SystemExit(main())