#!/usr/bin/env python3 """每条 golden_set 样本和 ruler 200 条做 cosine 相似度,保留 Top-100 邻居。 跟 batch_top5_match.py 的差别: - 默认 --top-k 100 - summary.csv 不再展平 100 个邻居(会爆 400 列),改成几个聚合统计: * top1_*:第一近邻的 rank/score/sim * mean/median_score_top100:100 个邻居的 ruler score 均值/中位数 * frac_rank_lt_106_top100:100 个邻居里 rank < 106 的比例(投票预测信号) * weighted_score:sim-加权 score 平均 * 多种 0/1 预测:top1 / majority(>=50) / weighted / mean_score - JSONL 里仍然保留完整 100 条邻居(rank/score/sim/item_id),便于事后再分析 用法: python3 batch_top100_match.py python3 batch_top100_match.py --limit 50 # 先小跑 python3 batch_top100_match.py --top-k 100 --boundary-rank 106 --boundary-score 44.72 """ import argparse import json import re import time from pathlib import Path import numpy as np import pandas as pd import torch import torch.nn.functional as F from torch import Tensor from transformers import AutoTokenizer, AutoModel DEFAULT_MODEL = "/mnt/bn/tns-algo-ue-my/biaowu/WorkSpace/Models/Qwen3-Embedding-8B" DEFAULT_RULER = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/ranking_moderation/data/dm/youth_sexual_and_physical_abuse_aigt_v009/ranking_bucket/ruler_items.json" DEFAULT_CSV = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/example/yss_ruler_eval/data/aipf_golden_set.csv" # ---------- model ---------- def last_token_pool(h: Tensor, attn: Tensor) -> Tensor: if (attn[:, -1].sum() == attn.shape[0]): return h[:, -1] lens = attn.sum(dim=1) - 1 bsz = h.shape[0] return h[torch.arange(bsz, device=h.device), lens] @torch.no_grad() def encode(texts, tokenizer, model, max_length, batch_size, label): embs = [] n = len(texts) t0 = time.time() for i in range(0, n, batch_size): batch = texts[i:i + batch_size] d = tokenizer(batch, padding=True, truncation=True, max_length=max_length, return_tensors="pt").to(model.device) out = model(**d) e = last_token_pool(out.last_hidden_state, d["attention_mask"]) e = F.normalize(e, p=2, dim=1) embs.append(e.cpu().float()) del out, d, e if torch.cuda.is_available(): torch.cuda.empty_cache() done = min(i + batch_size, n) if done % (batch_size * 10) == 0 or done == n: elapsed = time.time() - t0 rate = done / max(elapsed, 1e-3) eta = (n - done) / max(rate, 1e-3) print(f" [{label}] {done}/{n} | {rate:.1f} ex/s | eta {eta:.0f}s", flush=True) return torch.cat(embs, dim=0).numpy() # ---------- data ---------- def load_ruler_items(path): with open(path, "r", encoding="utf-8") as f: data = json.load(f) items = data if isinstance(data, list) else ( data.get("items") or data.get("ruler_items") or data.get("data") or []) out = [] for it in items: inner = it.get("item", {}) if isinstance(it.get("item"), dict) else {} conv = inner.get("conv_text") or it.get("conv_text") or "" out.append({ "rank": int(it.get("rank")) if it.get("rank") is not None else None, "score": float(it.get("score", 0.0)), "item_id": str(it.get("item_id")), "text": conv, }) return out _M_PREFIX = re.compile(r"") def extract_conv(raw): if not isinstance(raw, str): return "" m = _M_PREFIX.search(raw) return raw[m.start():] if m else raw.strip() def load_csv(path, text_col, id_col, label_col, limit=None): df = pd.read_csv(path, keep_default_na=False) if text_col not in df.columns and "conv_text" in df.columns: text_col = "conv_text" for c in (id_col, label_col, text_col): if c not in df.columns: raise ValueError(f"missing column: {c}; available: {list(df.columns)}") if limit: df = df.head(limit).copy() rows = [] for _, r in df.iterrows(): rows.append({ "task_id": str(r[id_col]), "label": str(r[label_col]).strip().upper(), "conv_text": extract_conv(r[text_col]), }) return rows # ---------- cache ---------- def encode_with_cache(texts, tokenizer, model, *, max_length, batch_size, cache_dir, name): if cache_dir: Path(cache_dir).mkdir(parents=True, exist_ok=True) p = Path(cache_dir) / f"{name}_n{len(texts)}_L{max_length}.npy" if p.exists(): print(f" [{name}] cache hit: {p}") return np.load(p) emb = encode(texts, tokenizer, model, max_length, batch_size, label=name) if cache_dir: np.save(p, emb) print(f" [{name}] cached: {p}") return emb # ---------- args ---------- def parse_args(): p = argparse.ArgumentParser() p.add_argument("--csv", default=DEFAULT_CSV) p.add_argument("--ruler", default=DEFAULT_RULER) p.add_argument("--model", default=DEFAULT_MODEL) p.add_argument("--output", default="golden_top100.jsonl") p.add_argument("--text-col", default="text") p.add_argument("--id-col", default="task_id") p.add_argument("--label-col", default="label") p.add_argument("--top-k", type=int, default=100) p.add_argument("--boundary-rank", type=int, default=106, help="rank {len(rows)} samples") print(f"[2/4] load ruler: {args.ruler}") ruler = load_ruler_items(args.ruler) print(f" -> {len(ruler)} ruler items") K = min(args.top_k, len(ruler)) print(f" keeping top-{K} per sample") print(f"[3/4] load model: {args.model}") device = "cpu" if args.cpu else ("cuda" if torch.cuda.is_available() else "cpu") print(f" device: {device}") mk = {} if device == "cuda": mk["torch_dtype"] = torch.float16 if not args.no_flash_attn: mk["attn_implementation"] = "flash_attention_2" tokenizer = AutoTokenizer.from_pretrained(args.model, padding_side="left") model = AutoModel.from_pretrained(args.model, **mk).to(device).eval() cd = args.cache_dir or None print(f"[4/4] encode (batch_size={args.batch_size}, max_length={args.max_length})") csv_emb = encode_with_cache([r["conv_text"] for r in rows], tokenizer, model, max_length=args.max_length, batch_size=args.batch_size, cache_dir=cd, name=f"csv_{Path(args.csv).stem}") ruler_emb = encode_with_cache([it["text"] for it in ruler], tokenizer, model, max_length=args.max_length, batch_size=args.batch_size, cache_dir=cd, name=f"ruler_{Path(args.ruler).parent.name}") sims = csv_emb @ ruler_emb.T # (N, R) top_idx_part = np.argpartition(-sims, K - 1, axis=1)[:, :K] row_arange = np.arange(sims.shape[0])[:, None] top_sims_part = sims[row_arange, top_idx_part] order = np.argsort(-top_sims_part, axis=1) top_idx = np.take_along_axis(top_idx_part, order, axis=1) top_sims = np.take_along_axis(top_sims_part, order, axis=1) # 写文件 out_path = Path(args.output) out_path.parent.mkdir(parents=True, exist_ok=True) summary_rows = [] print(f"[write] {out_path}") with out_path.open("w", encoding="utf-8") as f: for i, row in enumerate(rows): topk_full = [] ranks_arr = np.empty(K, dtype=np.int64) scores_arr = np.empty(K, dtype=np.float64) sims_arr = np.empty(K, dtype=np.float64) for j in range(K): idx = int(top_idx[i, j]) r = ruler[idx] ranks_arr[j] = r["rank"] scores_arr[j] = r["score"] sims_arr[j] = float(top_sims[i, j]) topk_full.append({ "rank": r["rank"], "score": r["score"], "sim": sims_arr[j], "item_id": r["item_id"], }) gt = int(row["label"] == pos_label) wsim = float(sims_arr.sum()) weighted_score = float((sims_arr * scores_arr).sum() / wsim) if wsim > 0 else 0.0 mean_score = float(scores_arr.mean()) median_score = float(np.median(scores_arr)) frac_severe = float((ranks_arr < args.boundary_rank).mean()) vote_count = int((ranks_arr < args.boundary_rank).sum()) pred_top1 = int(ranks_arr[0] < args.boundary_rank) pred_majority = int(vote_count > K / 2) pred_weighted = int(weighted_score >= args.boundary_score) pred_mean = int(mean_score >= args.boundary_score) record = { "task_id": row["task_id"], "label": row["label"], "ground_truth": gt, # 聚合统计 "top1_rank": int(ranks_arr[0]), "top1_score": float(scores_arr[0]), "top1_sim": sims_arr[0], "mean_score_topk": mean_score, "median_score_topk": median_score, "frac_rank_lt_boundary_topk": frac_severe, "vote_count_topk": vote_count, "weighted_score": weighted_score, # 多种 0/1 预测 "pred_top1": pred_top1, "pred_majority": pred_majority, "pred_weighted": pred_weighted, "pred_mean_score": pred_mean, # 完整邻居列表 "topk": topk_full, } f.write(json.dumps(record, ensure_ascii=False) + "\n") summary_rows.append({ "task_id": row["task_id"], "label": row["label"], "ground_truth": gt, "top1_rank": int(ranks_arr[0]), "top1_score": round(scores_arr[0], 4), "top1_sim": round(sims_arr[0], 4), "mean_score_topk": round(mean_score, 4), "median_score_topk": round(median_score, 4), "frac_rank_lt_boundary_topk": round(frac_severe, 4), "vote_count_topk": vote_count, "weighted_score": round(weighted_score, 4), "pred_top1": pred_top1, "pred_majority": pred_majority, "pred_weighted": pred_weighted, "pred_mean_score": pred_mean, }) summary_csv = out_path.with_suffix(".summary.csv") pd.DataFrame(summary_rows).to_csv(summary_csv, index=False) print(f"[write] {summary_csv}") # 各预测口径的 P/R/F1 sdf = pd.DataFrame(summary_rows) print(f"\n[metrics] (positive_label='{pos_label}', boundary_rank={args.boundary_rank}, " f"boundary_score={args.boundary_score}, K={K})") print(f"{'pred':<22}{'TP':>5}{'FP':>5}{'TN':>5}{'FN':>5} {'P':>7}{'R':>7}{'F1':>7}{'Acc':>7}") print("-" * 80) for col in ("pred_top1", "pred_majority", "pred_weighted", "pred_mean_score"): tp = int(((sdf[col] == 1) & (sdf.ground_truth == 1)).sum()) fp = int(((sdf[col] == 1) & (sdf.ground_truth == 0)).sum()) tn = int(((sdf[col] == 0) & (sdf.ground_truth == 0)).sum()) fn = int(((sdf[col] == 0) & (sdf.ground_truth == 1)).sum()) prec = tp/(tp+fp) if tp+fp else 0.0 rec = tp/(tp+fn) if tp+fn else 0.0 f1 = 2*prec*rec/(prec+rec) if prec+rec else 0.0 acc = (tp+tn)/len(sdf) print(f"{col:<22}{tp:>5}{fp:>5}{tn:>5}{fn:>5} {prec:>7.4f}{rec:>7.4f}{f1:>7.4f}{acc:>7.4f}") if __name__ == "__main__": main()