from __future__ import annotations import argparse import gc import math import os from pathlib import Path import numpy as np import torch from decord import VideoReader, cpu from PIL import Image from scipy.spatial import cKDTree from transformers import AutoModel, AutoTokenizer DEFAULT_MAX_NUM_FRAMES = 180 MAX_NUM_PACKING = 3 TIME_SCALE = 0.1 def map_to_nearest_scale(values, scale): tree = cKDTree(np.asarray(scale)[:, None]) _, indices = tree.query(np.asarray(values)[:, None]) return np.asarray(scale)[indices] def group_array(arr, size): return [arr[i : i + size] for i in range(0, len(arr), size)] def uniform_sample(items, n): if n <= 0: return [] gap = len(items) / n return [items[min(len(items) - 1, int(i * gap + gap / 2))] for i in range(n)] def build_low_vram_device_map(gpu_layers: int) -> dict[str, int | str]: gpu_layers = max(4, min(24, gpu_layers)) device_map: dict[str, int | str] = { "vpm": 0, "resampler": 0, "llm.model.embed_tokens": 0, } for layer_idx in range(36): device_map[f"llm.model.layers.{layer_idx}"] = 0 if layer_idx < gpu_layers else "cpu" device_map["llm.model.norm"] = "cpu" device_map["llm.model.rotary_emb"] = "cpu" device_map["llm.lm_head"] = "cpu" return device_map def encode_video( video_path: Path, choose_fps: int = 5, max_num_frames: int = DEFAULT_MAX_NUM_FRAMES, force_packing=None, ): # The bundled decord build may not include CUDA video decoding. Frame extraction # is cheap for these short clips, while the MiniCPM model itself still runs on GPU. vr = VideoReader(str(video_path), ctx=cpu(0)) fps = vr.get_avg_fps() video_duration = len(vr) / fps if choose_fps * int(video_duration) <= max_num_frames: packing_nums = 1 choose_frames = round(min(choose_fps, round(fps)) * min(max_num_frames, video_duration)) else: packing_nums = math.ceil(video_duration * choose_fps / max_num_frames) if packing_nums <= MAX_NUM_PACKING: choose_frames = round(video_duration * choose_fps) else: choose_frames = round(max_num_frames * MAX_NUM_PACKING) packing_nums = MAX_NUM_PACKING choose_frames = max(1, min(len(vr), choose_frames)) frame_idx = np.array(uniform_sample(list(range(len(vr))), choose_frames)) if force_packing: packing_nums = min(force_packing, MAX_NUM_PACKING) frames = vr.get_batch(frame_idx).asnumpy() frame_idx_ts = frame_idx / fps scale = np.arange(0, video_duration, TIME_SCALE) frame_ts_id = map_to_nearest_scale(frame_idx_ts, scale) / TIME_SCALE frame_ts_id = frame_ts_id.astype(np.int32) frames = [Image.fromarray(frame.astype("uint8")).convert("RGB") for frame in frames] frame_ts_id_group = group_array(frame_ts_id, packing_nums) return frames, frame_ts_id_group def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--video", required=True) parser.add_argument("--model", required=True) parser.add_argument("--fps", type=int, default=5) parser.add_argument("--device", default="cuda:0") parser.add_argument("--max-frames", type=int, default=DEFAULT_MAX_NUM_FRAMES) parser.add_argument("--low-vram", action="store_true") parser.add_argument("--gpu-memory-gb", type=int, default=8) parser.add_argument("--cpu-memory-gb", type=int, default=48) parser.add_argument("--gpu-layers", type=int, default=10) args = parser.parse_args() video_path = Path(args.video) model_path = Path(args.model) output_path = video_path.with_suffix(".txt") if args.device.startswith("cuda") and not torch.cuda.is_available(): raise RuntimeError("请求使用 CUDA,但 torch.cuda.is_available() 为 False") os.environ.setdefault("CUDA_VISIBLE_DEVICES", "0") torch.manual_seed(100) device = torch.device(args.device if torch.cuda.is_available() else "cpu") dtype = torch.float16 if device.type == "cuda" else torch.float32 print(f"[MiniCPM] torch={torch.__version__}, device={device}, cuda={torch.cuda.is_available()}", flush=True) if device.type == "cuda": print(f"[MiniCPM] gpu={torch.cuda.get_device_name(device.index or 0)}", flush=True) print( f"[MiniCPM] low_vram={args.low_vram}, fps={args.fps}, max_frames={args.max_frames}, gpu_mem={args.gpu_memory_gb}GiB, gpu_layers={args.gpu_layers}", flush=True, ) load_kwargs = { "trust_remote_code": True, "attn_implementation": "sdpa", "local_files_only": True, } if device.type == "cuda": load_kwargs["torch_dtype"] = torch.float16 if args.low_vram: offload_dir = model_path.parent / "_minicpm_offload" offload_dir.mkdir(parents=True, exist_ok=True) load_kwargs.update( { "device_map": build_low_vram_device_map(args.gpu_layers), "max_memory": {0: f"{max(4, args.gpu_memory_gb)}GiB", "cpu": f"{max(16, args.cpu_memory_gb)}GiB"}, "low_cpu_mem_usage": True, "offload_folder": str(offload_dir), "offload_state_dict": True, "offload_buffers": True, } ) else: load_kwargs["device_map"] = {"": str(device)} else: load_kwargs["torch_dtype"] = dtype model = AutoModel.from_pretrained(str(model_path), **load_kwargs) model = model.eval() if device.type == "cuda" and not args.low_vram: first_param = next(model.parameters()) print(f"[MiniCPM] model parameter device={first_param.device}, dtype={first_param.dtype}", flush=True) elif device.type == "cuda": print(f"[MiniCPM] device_map={getattr(model, 'hf_device_map', None)}", flush=True) else: model = model.to(device=device, dtype=dtype) tokenizer = AutoTokenizer.from_pretrained(str(model_path), trust_remote_code=True, local_files_only=True) frames, temporal_ids = encode_video( video_path, choose_fps=args.fps, max_num_frames=max(16, args.max_frames), ) question = ( "Describe this video as an English training caption for a text-to-video model. " "Focus on subject, action, camera movement, scene, lighting, style, and important visual details. " "Return one concise prompt only, no bullet points." ) msgs = [{"role": "user", "content": frames + [question]}] with torch.inference_mode(): answer = model.chat( msgs=msgs, tokenizer=tokenizer, use_image_id=False, max_slice_nums=1, temporal_ids=temporal_ids, ) text = str(answer).strip() output_path.write_text(text, encoding="utf-8") print(text) del frames, temporal_ids, msgs, answer, tokenizer, model gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() return 0 if __name__ == "__main__": raise SystemExit(main())