| from __future__ import annotations |
|
|
| import argparse |
| import gc |
| import math |
| import os |
| from pathlib import Path |
|
|
| import numpy as np |
| import torch |
| from decord import VideoReader, cpu |
| from PIL import Image |
| from scipy.spatial import cKDTree |
| from transformers import AutoModel, AutoTokenizer |
|
|
|
|
| DEFAULT_MAX_NUM_FRAMES = 180 |
| MAX_NUM_PACKING = 3 |
| TIME_SCALE = 0.1 |
|
|
|
|
| def map_to_nearest_scale(values, scale): |
| tree = cKDTree(np.asarray(scale)[:, None]) |
| _, indices = tree.query(np.asarray(values)[:, None]) |
| return np.asarray(scale)[indices] |
|
|
|
|
| def group_array(arr, size): |
| return [arr[i : i + size] for i in range(0, len(arr), size)] |
|
|
|
|
| def uniform_sample(items, n): |
| if n <= 0: |
| return [] |
| gap = len(items) / n |
| return [items[min(len(items) - 1, int(i * gap + gap / 2))] for i in range(n)] |
|
|
|
|
| def build_low_vram_device_map(gpu_layers: int) -> dict[str, int | str]: |
| gpu_layers = max(4, min(24, gpu_layers)) |
| device_map: dict[str, int | str] = { |
| "vpm": 0, |
| "resampler": 0, |
| "llm.model.embed_tokens": 0, |
| } |
| for layer_idx in range(36): |
| device_map[f"llm.model.layers.{layer_idx}"] = 0 if layer_idx < gpu_layers else "cpu" |
| device_map["llm.model.norm"] = "cpu" |
| device_map["llm.model.rotary_emb"] = "cpu" |
| device_map["llm.lm_head"] = "cpu" |
| return device_map |
|
|
|
|
| def encode_video( |
| video_path: Path, |
| choose_fps: int = 5, |
| max_num_frames: int = DEFAULT_MAX_NUM_FRAMES, |
| force_packing=None, |
| ): |
| |
| |
| vr = VideoReader(str(video_path), ctx=cpu(0)) |
| fps = vr.get_avg_fps() |
| video_duration = len(vr) / fps |
|
|
| if choose_fps * int(video_duration) <= max_num_frames: |
| packing_nums = 1 |
| choose_frames = round(min(choose_fps, round(fps)) * min(max_num_frames, video_duration)) |
| else: |
| packing_nums = math.ceil(video_duration * choose_fps / max_num_frames) |
| if packing_nums <= MAX_NUM_PACKING: |
| choose_frames = round(video_duration * choose_fps) |
| else: |
| choose_frames = round(max_num_frames * MAX_NUM_PACKING) |
| packing_nums = MAX_NUM_PACKING |
|
|
| choose_frames = max(1, min(len(vr), choose_frames)) |
| frame_idx = np.array(uniform_sample(list(range(len(vr))), choose_frames)) |
|
|
| if force_packing: |
| packing_nums = min(force_packing, MAX_NUM_PACKING) |
|
|
| frames = vr.get_batch(frame_idx).asnumpy() |
| frame_idx_ts = frame_idx / fps |
| scale = np.arange(0, video_duration, TIME_SCALE) |
| frame_ts_id = map_to_nearest_scale(frame_idx_ts, scale) / TIME_SCALE |
| frame_ts_id = frame_ts_id.astype(np.int32) |
|
|
| frames = [Image.fromarray(frame.astype("uint8")).convert("RGB") for frame in frames] |
| frame_ts_id_group = group_array(frame_ts_id, packing_nums) |
| return frames, frame_ts_id_group |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--video", required=True) |
| parser.add_argument("--model", required=True) |
| parser.add_argument("--fps", type=int, default=5) |
| parser.add_argument("--device", default="cuda:0") |
| parser.add_argument("--max-frames", type=int, default=DEFAULT_MAX_NUM_FRAMES) |
| parser.add_argument("--low-vram", action="store_true") |
| parser.add_argument("--gpu-memory-gb", type=int, default=8) |
| parser.add_argument("--cpu-memory-gb", type=int, default=48) |
| parser.add_argument("--gpu-layers", type=int, default=10) |
| args = parser.parse_args() |
|
|
| video_path = Path(args.video) |
| model_path = Path(args.model) |
| output_path = video_path.with_suffix(".txt") |
|
|
| if args.device.startswith("cuda") and not torch.cuda.is_available(): |
| raise RuntimeError("请求使用 CUDA,但 torch.cuda.is_available() 为 False") |
|
|
| os.environ.setdefault("CUDA_VISIBLE_DEVICES", "0") |
| torch.manual_seed(100) |
| device = torch.device(args.device if torch.cuda.is_available() else "cpu") |
| dtype = torch.float16 if device.type == "cuda" else torch.float32 |
| print(f"[MiniCPM] torch={torch.__version__}, device={device}, cuda={torch.cuda.is_available()}", flush=True) |
| if device.type == "cuda": |
| print(f"[MiniCPM] gpu={torch.cuda.get_device_name(device.index or 0)}", flush=True) |
| print( |
| f"[MiniCPM] low_vram={args.low_vram}, fps={args.fps}, max_frames={args.max_frames}, gpu_mem={args.gpu_memory_gb}GiB, gpu_layers={args.gpu_layers}", |
| flush=True, |
| ) |
|
|
| load_kwargs = { |
| "trust_remote_code": True, |
| "attn_implementation": "sdpa", |
| "local_files_only": True, |
| } |
| if device.type == "cuda": |
| load_kwargs["torch_dtype"] = torch.float16 |
| if args.low_vram: |
| offload_dir = model_path.parent / "_minicpm_offload" |
| offload_dir.mkdir(parents=True, exist_ok=True) |
| load_kwargs.update( |
| { |
| "device_map": build_low_vram_device_map(args.gpu_layers), |
| "max_memory": {0: f"{max(4, args.gpu_memory_gb)}GiB", "cpu": f"{max(16, args.cpu_memory_gb)}GiB"}, |
| "low_cpu_mem_usage": True, |
| "offload_folder": str(offload_dir), |
| "offload_state_dict": True, |
| "offload_buffers": True, |
| } |
| ) |
| else: |
| load_kwargs["device_map"] = {"": str(device)} |
| else: |
| load_kwargs["torch_dtype"] = dtype |
|
|
| model = AutoModel.from_pretrained(str(model_path), **load_kwargs) |
| model = model.eval() |
| if device.type == "cuda" and not args.low_vram: |
| first_param = next(model.parameters()) |
| print(f"[MiniCPM] model parameter device={first_param.device}, dtype={first_param.dtype}", flush=True) |
| elif device.type == "cuda": |
| print(f"[MiniCPM] device_map={getattr(model, 'hf_device_map', None)}", flush=True) |
| else: |
| model = model.to(device=device, dtype=dtype) |
|
|
| tokenizer = AutoTokenizer.from_pretrained(str(model_path), trust_remote_code=True, local_files_only=True) |
| frames, temporal_ids = encode_video( |
| video_path, |
| choose_fps=args.fps, |
| max_num_frames=max(16, args.max_frames), |
| ) |
|
|
| question = ( |
| "Describe this video as an English training caption for a text-to-video model. " |
| "Focus on subject, action, camera movement, scene, lighting, style, and important visual details. " |
| "Return one concise prompt only, no bullet points." |
| ) |
| msgs = [{"role": "user", "content": frames + [question]}] |
| with torch.inference_mode(): |
| answer = model.chat( |
| msgs=msgs, |
| tokenizer=tokenizer, |
| use_image_id=False, |
| max_slice_nums=1, |
| temporal_ids=temporal_ids, |
| ) |
| text = str(answer).strip() |
| output_path.write_text(text, encoding="utf-8") |
| print(text) |
| del frames, temporal_ids, msgs, answer, tokenizer, model |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|