| |
| """每条 golden_set 样本和 ruler 200 条做 cosine 相似度,保留 Top-100 邻居。 |
| |
| 跟 batch_top5_match.py 的差别: |
| - 默认 --top-k 100 |
| - summary.csv 不再展平 100 个邻居(会爆 400 列),改成几个聚合统计: |
| * top1_*:第一近邻的 rank/score/sim |
| * mean/median_score_top100:100 个邻居的 ruler score 均值/中位数 |
| * frac_rank_lt_106_top100:100 个邻居里 rank < 106 的比例(投票预测信号) |
| * weighted_score:sim-加权 score 平均 |
| * 多种 0/1 预测:top1 / majority(>=50) / weighted / mean_score |
| - JSONL 里仍然保留完整 100 条邻居(rank/score/sim/item_id),便于事后再分析 |
| |
| 用法: |
| python3 batch_top100_match.py |
| python3 batch_top100_match.py --limit 50 # 先小跑 |
| python3 batch_top100_match.py --top-k 100 --boundary-rank 106 --boundary-score 44.72 |
| """ |
| import argparse |
| import json |
| import re |
| import time |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| import torch |
| import torch.nn.functional as F |
| from torch import Tensor |
| from transformers import AutoTokenizer, AutoModel |
|
|
|
|
| DEFAULT_MODEL = "/mnt/bn/tns-algo-ue-my/biaowu/WorkSpace/Models/Qwen3-Embedding-8B" |
| DEFAULT_RULER = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/ranking_moderation/data/dm/youth_sexual_and_physical_abuse_aigt_v009/ranking_bucket/ruler_items.json" |
| DEFAULT_CSV = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/example/yss_ruler_eval/data/aipf_golden_set.csv" |
|
|
|
|
| |
| def last_token_pool(h: Tensor, attn: Tensor) -> Tensor: |
| if (attn[:, -1].sum() == attn.shape[0]): |
| return h[:, -1] |
| lens = attn.sum(dim=1) - 1 |
| bsz = h.shape[0] |
| return h[torch.arange(bsz, device=h.device), lens] |
|
|
|
|
| @torch.no_grad() |
| def encode(texts, tokenizer, model, max_length, batch_size, label): |
| embs = [] |
| n = len(texts) |
| t0 = time.time() |
| for i in range(0, n, batch_size): |
| batch = texts[i:i + batch_size] |
| d = tokenizer(batch, padding=True, truncation=True, |
| max_length=max_length, return_tensors="pt").to(model.device) |
| out = model(**d) |
| e = last_token_pool(out.last_hidden_state, d["attention_mask"]) |
| e = F.normalize(e, p=2, dim=1) |
| embs.append(e.cpu().float()) |
| del out, d, e |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| done = min(i + batch_size, n) |
| if done % (batch_size * 10) == 0 or done == n: |
| elapsed = time.time() - t0 |
| rate = done / max(elapsed, 1e-3) |
| eta = (n - done) / max(rate, 1e-3) |
| print(f" [{label}] {done}/{n} | {rate:.1f} ex/s | eta {eta:.0f}s", flush=True) |
| return torch.cat(embs, dim=0).numpy() |
|
|
|
|
| |
| def load_ruler_items(path): |
| with open(path, "r", encoding="utf-8") as f: |
| data = json.load(f) |
| items = data if isinstance(data, list) else ( |
| data.get("items") or data.get("ruler_items") or data.get("data") or []) |
| out = [] |
| for it in items: |
| inner = it.get("item", {}) if isinstance(it.get("item"), dict) else {} |
| conv = inner.get("conv_text") or it.get("conv_text") or "" |
| out.append({ |
| "rank": int(it.get("rank")) if it.get("rank") is not None else None, |
| "score": float(it.get("score", 0.0)), |
| "item_id": str(it.get("item_id")), |
| "text": conv, |
| }) |
| return out |
|
|
|
|
| _M_PREFIX = re.compile(r"<m\d+>") |
|
|
|
|
| def extract_conv(raw): |
| if not isinstance(raw, str): |
| return "" |
| m = _M_PREFIX.search(raw) |
| return raw[m.start():] if m else raw.strip() |
|
|
|
|
| def load_csv(path, text_col, id_col, label_col, limit=None): |
| df = pd.read_csv(path, keep_default_na=False) |
| if text_col not in df.columns and "conv_text" in df.columns: |
| text_col = "conv_text" |
| for c in (id_col, label_col, text_col): |
| if c not in df.columns: |
| raise ValueError(f"missing column: {c}; available: {list(df.columns)}") |
| if limit: |
| df = df.head(limit).copy() |
| rows = [] |
| for _, r in df.iterrows(): |
| rows.append({ |
| "task_id": str(r[id_col]), |
| "label": str(r[label_col]).strip().upper(), |
| "conv_text": extract_conv(r[text_col]), |
| }) |
| return rows |
|
|
|
|
| |
| def encode_with_cache(texts, tokenizer, model, *, max_length, batch_size, |
| cache_dir, name): |
| if cache_dir: |
| Path(cache_dir).mkdir(parents=True, exist_ok=True) |
| p = Path(cache_dir) / f"{name}_n{len(texts)}_L{max_length}.npy" |
| if p.exists(): |
| print(f" [{name}] cache hit: {p}") |
| return np.load(p) |
| emb = encode(texts, tokenizer, model, max_length, batch_size, label=name) |
| if cache_dir: |
| np.save(p, emb) |
| print(f" [{name}] cached: {p}") |
| return emb |
|
|
|
|
| |
| def parse_args(): |
| p = argparse.ArgumentParser() |
| p.add_argument("--csv", default=DEFAULT_CSV) |
| p.add_argument("--ruler", default=DEFAULT_RULER) |
| p.add_argument("--model", default=DEFAULT_MODEL) |
| p.add_argument("--output", default="golden_top100.jsonl") |
| p.add_argument("--text-col", default="text") |
| p.add_argument("--id-col", default="task_id") |
| p.add_argument("--label-col", default="label") |
| p.add_argument("--top-k", type=int, default=100) |
| p.add_argument("--boundary-rank", type=int, default=106, |
| help="rank<X 当严重;用于投票/top1 预测") |
| p.add_argument("--boundary-score", type=float, default=44.72, |
| help="weighted_score / mean_score >= X 当严重") |
| p.add_argument("--positive-label", default="Y", |
| help="csv label 列里算正样本的字符串(大小写不敏感)") |
| p.add_argument("--max-length", type=int, default=4096) |
| p.add_argument("--batch-size", type=int, default=4) |
| p.add_argument("--cache-dir", default="cache_emb") |
| p.add_argument("--limit", type=int, default=None) |
| p.add_argument("--cpu", action="store_true") |
| p.add_argument("--no-flash-attn", action="store_true") |
| return p.parse_args() |
|
|
|
|
| def main(): |
| args = parse_args() |
| pos_label = args.positive_label.strip().upper() |
|
|
| print(f"[1/4] load csv: {args.csv}") |
| rows = load_csv(args.csv, args.text_col, args.id_col, args.label_col, args.limit) |
| print(f" -> {len(rows)} samples") |
|
|
| print(f"[2/4] load ruler: {args.ruler}") |
| ruler = load_ruler_items(args.ruler) |
| print(f" -> {len(ruler)} ruler items") |
| K = min(args.top_k, len(ruler)) |
| print(f" keeping top-{K} per sample") |
|
|
| print(f"[3/4] load model: {args.model}") |
| device = "cpu" if args.cpu else ("cuda" if torch.cuda.is_available() else "cpu") |
| print(f" device: {device}") |
| mk = {} |
| if device == "cuda": |
| mk["torch_dtype"] = torch.float16 |
| if not args.no_flash_attn: |
| mk["attn_implementation"] = "flash_attention_2" |
| tokenizer = AutoTokenizer.from_pretrained(args.model, padding_side="left") |
| model = AutoModel.from_pretrained(args.model, **mk).to(device).eval() |
|
|
| cd = args.cache_dir or None |
| print(f"[4/4] encode (batch_size={args.batch_size}, max_length={args.max_length})") |
| csv_emb = encode_with_cache([r["conv_text"] for r in rows], |
| tokenizer, model, |
| max_length=args.max_length, |
| batch_size=args.batch_size, |
| cache_dir=cd, name=f"csv_{Path(args.csv).stem}") |
| ruler_emb = encode_with_cache([it["text"] for it in ruler], |
| tokenizer, model, |
| max_length=args.max_length, |
| batch_size=args.batch_size, |
| cache_dir=cd, name=f"ruler_{Path(args.ruler).parent.name}") |
|
|
| sims = csv_emb @ ruler_emb.T |
| top_idx_part = np.argpartition(-sims, K - 1, axis=1)[:, :K] |
| row_arange = np.arange(sims.shape[0])[:, None] |
| top_sims_part = sims[row_arange, top_idx_part] |
| order = np.argsort(-top_sims_part, axis=1) |
| top_idx = np.take_along_axis(top_idx_part, order, axis=1) |
| top_sims = np.take_along_axis(top_sims_part, order, axis=1) |
|
|
| |
| out_path = Path(args.output) |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
| summary_rows = [] |
| print(f"[write] {out_path}") |
| with out_path.open("w", encoding="utf-8") as f: |
| for i, row in enumerate(rows): |
| topk_full = [] |
| ranks_arr = np.empty(K, dtype=np.int64) |
| scores_arr = np.empty(K, dtype=np.float64) |
| sims_arr = np.empty(K, dtype=np.float64) |
| for j in range(K): |
| idx = int(top_idx[i, j]) |
| r = ruler[idx] |
| ranks_arr[j] = r["rank"] |
| scores_arr[j] = r["score"] |
| sims_arr[j] = float(top_sims[i, j]) |
| topk_full.append({ |
| "rank": r["rank"], |
| "score": r["score"], |
| "sim": sims_arr[j], |
| "item_id": r["item_id"], |
| }) |
|
|
| gt = int(row["label"] == pos_label) |
| wsim = float(sims_arr.sum()) |
| weighted_score = float((sims_arr * scores_arr).sum() / wsim) if wsim > 0 else 0.0 |
| mean_score = float(scores_arr.mean()) |
| median_score = float(np.median(scores_arr)) |
| frac_severe = float((ranks_arr < args.boundary_rank).mean()) |
| vote_count = int((ranks_arr < args.boundary_rank).sum()) |
|
|
| pred_top1 = int(ranks_arr[0] < args.boundary_rank) |
| pred_majority = int(vote_count > K / 2) |
| pred_weighted = int(weighted_score >= args.boundary_score) |
| pred_mean = int(mean_score >= args.boundary_score) |
|
|
| record = { |
| "task_id": row["task_id"], |
| "label": row["label"], |
| "ground_truth": gt, |
| |
| "top1_rank": int(ranks_arr[0]), |
| "top1_score": float(scores_arr[0]), |
| "top1_sim": sims_arr[0], |
| "mean_score_topk": mean_score, |
| "median_score_topk": median_score, |
| "frac_rank_lt_boundary_topk": frac_severe, |
| "vote_count_topk": vote_count, |
| "weighted_score": weighted_score, |
| |
| "pred_top1": pred_top1, |
| "pred_majority": pred_majority, |
| "pred_weighted": pred_weighted, |
| "pred_mean_score": pred_mean, |
| |
| "topk": topk_full, |
| } |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") |
|
|
| summary_rows.append({ |
| "task_id": row["task_id"], |
| "label": row["label"], |
| "ground_truth": gt, |
| "top1_rank": int(ranks_arr[0]), |
| "top1_score": round(scores_arr[0], 4), |
| "top1_sim": round(sims_arr[0], 4), |
| "mean_score_topk": round(mean_score, 4), |
| "median_score_topk": round(median_score, 4), |
| "frac_rank_lt_boundary_topk": round(frac_severe, 4), |
| "vote_count_topk": vote_count, |
| "weighted_score": round(weighted_score, 4), |
| "pred_top1": pred_top1, |
| "pred_majority": pred_majority, |
| "pred_weighted": pred_weighted, |
| "pred_mean_score": pred_mean, |
| }) |
|
|
| summary_csv = out_path.with_suffix(".summary.csv") |
| pd.DataFrame(summary_rows).to_csv(summary_csv, index=False) |
| print(f"[write] {summary_csv}") |
|
|
| |
| sdf = pd.DataFrame(summary_rows) |
| print(f"\n[metrics] (positive_label='{pos_label}', boundary_rank={args.boundary_rank}, " |
| f"boundary_score={args.boundary_score}, K={K})") |
| print(f"{'pred':<22}{'TP':>5}{'FP':>5}{'TN':>5}{'FN':>5} {'P':>7}{'R':>7}{'F1':>7}{'Acc':>7}") |
| print("-" * 80) |
| for col in ("pred_top1", "pred_majority", "pred_weighted", "pred_mean_score"): |
| tp = int(((sdf[col] == 1) & (sdf.ground_truth == 1)).sum()) |
| fp = int(((sdf[col] == 1) & (sdf.ground_truth == 0)).sum()) |
| tn = int(((sdf[col] == 0) & (sdf.ground_truth == 0)).sum()) |
| fn = int(((sdf[col] == 0) & (sdf.ground_truth == 1)).sum()) |
| prec = tp/(tp+fp) if tp+fp else 0.0 |
| rec = tp/(tp+fn) if tp+fn else 0.0 |
| f1 = 2*prec*rec/(prec+rec) if prec+rec else 0.0 |
| acc = (tp+tn)/len(sdf) |
| print(f"{col:<22}{tp:>5}{fp:>5}{tn:>5}{fn:>5} {prec:>7.4f}{rec:>7.4f}{f1:>7.4f}{acc:>7.4f}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|