| |
| """把 golden_set.csv (≈1000 条) 全部和 ruler 200 条做 cosine 相似度, |
| 每条算 Top-K 最近的 ruler items,并把结果保存到本地。 |
| |
| 用法: |
| # 默认路径 |
| python3 batch_top5_match.py |
| |
| # 自定义 |
| python3 batch_top5_match.py \ |
| --csv /mnt/.../aipf_golden_set.csv \ |
| --ruler /mnt/.../ruler_items.json \ |
| --model /mnt/.../Qwen3-Embedding-8B \ |
| --output golden_top5.jsonl \ |
| --top-k 5 \ |
| --boundary-score 44.72 \ |
| --cache-dir cache_emb \ |
| --limit 50 # 先小跑 50 条 sanity check |
| |
| 输出: |
| - {output}.jsonl 每行一条样本,含 task_id / label / Top-K 详情 / weighted_score / 预测 |
| - {output}.summary.csv 按行汇总,便于在 Excel / pandas 里筛 |
| - cache_emb/*.npy (可选)embedding 缓存,重跑时自动复用 |
| """ |
| import argparse |
| import json |
| import re |
| import sys |
| import time |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| import torch |
| import torch.nn.functional as F |
| from torch import Tensor |
| from transformers import AutoTokenizer, AutoModel |
|
|
|
|
| DEFAULT_MODEL = "/mnt/bn/tns-algo-ue-my/biaowu/WorkSpace/Models/Qwen3-Embedding-8B" |
| DEFAULT_RULER = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/ranking_moderation/data/dm/youth_sexual_and_physical_abuse_aigt_v009/ranking_bucket/ruler_items.json" |
| DEFAULT_CSV = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/example/yss_ruler_eval/data/aipf_golden_set.csv" |
|
|
|
|
| |
| def last_token_pool(h: Tensor, attn: Tensor) -> Tensor: |
| if (attn[:, -1].sum() == attn.shape[0]): |
| return h[:, -1] |
| lens = attn.sum(dim=1) - 1 |
| bsz = h.shape[0] |
| return h[torch.arange(bsz, device=h.device), lens] |
|
|
|
|
| @torch.no_grad() |
| def encode(texts, tokenizer, model, max_length, batch_size, label="encode"): |
| embs = [] |
| n = len(texts) |
| t0 = time.time() |
| for i in range(0, n, batch_size): |
| batch = texts[i:i + batch_size] |
| d = tokenizer(batch, padding=True, truncation=True, |
| max_length=max_length, return_tensors="pt").to(model.device) |
| out = model(**d) |
| e = last_token_pool(out.last_hidden_state, d["attention_mask"]) |
| e = F.normalize(e, p=2, dim=1) |
| embs.append(e.cpu().float()) |
| del out, d, e |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| done = min(i + batch_size, n) |
| if done % (batch_size * 10) == 0 or done == n: |
| elapsed = time.time() - t0 |
| rate = done / max(elapsed, 1e-3) |
| eta = (n - done) / max(rate, 1e-3) |
| print(f" [{label}] {done}/{n} | {rate:.1f} ex/s | eta {eta:.0f}s", flush=True) |
| return torch.cat(embs, dim=0).numpy() |
|
|
|
|
| |
| def load_ruler_items(path): |
| with open(path, "r", encoding="utf-8") as f: |
| data = json.load(f) |
| items = data if isinstance(data, list) else ( |
| data.get("items") or data.get("ruler_items") or data.get("data") or []) |
| out = [] |
| for it in items: |
| inner = it.get("item", {}) if isinstance(it.get("item"), dict) else {} |
| conv = inner.get("conv_text") or it.get("conv_text") or "" |
| out.append({ |
| "rank": it.get("rank"), |
| "score": float(it.get("score", 0.0)), |
| "item_id": str(it.get("item_id")), |
| "text": conv, |
| }) |
| return out |
|
|
|
|
| _M_PREFIX = re.compile(r"<m\d+>") |
|
|
|
|
| def extract_conv(raw): |
| """golden_set 的 text 里可能带 alias-age dict 前缀,这里只取 <m0>... 之后的。""" |
| if not isinstance(raw, str): |
| return "" |
| m = _M_PREFIX.search(raw) |
| return raw[m.start():] if m else raw.strip() |
|
|
|
|
| def load_csv(path, text_col, id_col, label_col, limit=None): |
| df = pd.read_csv(path, keep_default_na=False) |
| needed = [c for c in (id_col, label_col) if c not in df.columns] |
| if needed: |
| raise ValueError(f"missing columns: {needed}; available: {list(df.columns)}") |
| if text_col not in df.columns: |
| if "conv_text" in df.columns: |
| text_col = "conv_text" |
| else: |
| raise ValueError("no text/conv_text column") |
| if limit: |
| df = df.head(limit).copy() |
| rows = [] |
| for _, r in df.iterrows(): |
| rows.append({ |
| "task_id": str(r[id_col]), |
| "label": str(r[label_col]).strip().upper(), |
| "raw_text": str(r[text_col]), |
| "conv_text": extract_conv(r[text_col]), |
| }) |
| return rows |
|
|
|
|
| |
| def cache_path(cache_dir, name, n_items, max_length): |
| return Path(cache_dir) / f"{name}_n{n_items}_L{max_length}.npy" |
|
|
|
|
| def encode_with_cache(texts, tokenizer, model, *, max_length, batch_size, |
| cache_dir, name): |
| if cache_dir: |
| Path(cache_dir).mkdir(parents=True, exist_ok=True) |
| p = cache_path(cache_dir, name, len(texts), max_length) |
| if p.exists(): |
| print(f" [{name}] using cached embeddings: {p}") |
| return np.load(p) |
| emb = encode(texts, tokenizer, model, max_length, batch_size, label=name) |
| if cache_dir: |
| np.save(p, emb) |
| print(f" [{name}] saved cache: {p}") |
| return emb |
|
|
|
|
| |
| def parse_args(): |
| p = argparse.ArgumentParser() |
| p.add_argument("--csv", default=DEFAULT_CSV) |
| p.add_argument("--ruler", default=DEFAULT_RULER) |
| p.add_argument("--model", default=DEFAULT_MODEL) |
| p.add_argument("--output", default="golden_top5.jsonl") |
| p.add_argument("--text-col", default="text") |
| p.add_argument("--id-col", default="task_id") |
| p.add_argument("--label-col", default="label") |
| p.add_argument("--top-k", type=int, default=5) |
| p.add_argument("--boundary-score", type=float, default=44.72, |
| help="预测阈值,weighted_score >= 该值则 pred=1(默认从 pipeline.yaml 抄过来的 youth 类阈值)") |
| p.add_argument("--max-length", type=int, default=4096) |
| p.add_argument("--batch-size", type=int, default=4) |
| p.add_argument("--cache-dir", default="cache_emb", |
| help="embedding 缓存目录;设空字符串关闭缓存") |
| p.add_argument("--limit", type=int, default=None, |
| help="只跑前 N 条做 smoke test") |
| p.add_argument("--cpu", action="store_true") |
| p.add_argument("--no-flash-attn", action="store_true") |
| return p.parse_args() |
|
|
|
|
| def main(): |
| args = parse_args() |
|
|
| |
| print(f"[1/4] load csv: {args.csv}") |
| rows = load_csv(args.csv, args.text_col, args.id_col, args.label_col, args.limit) |
| print(f" -> {len(rows)} samples") |
|
|
| print(f"[2/4] load ruler: {args.ruler}") |
| ruler = load_ruler_items(args.ruler) |
| print(f" -> {len(ruler)} ruler items") |
|
|
| |
| print(f"[3/4] load model: {args.model}") |
| device = "cpu" if args.cpu else ("cuda" if torch.cuda.is_available() else "cpu") |
| print(f" device: {device}") |
| mk = {} |
| if device == "cuda": |
| mk["torch_dtype"] = torch.float16 |
| if not args.no_flash_attn: |
| mk["attn_implementation"] = "flash_attention_2" |
| tokenizer = AutoTokenizer.from_pretrained(args.model, padding_side="left") |
| model = AutoModel.from_pretrained(args.model, **mk).to(device).eval() |
|
|
| |
| cd = args.cache_dir or None |
| print(f"[4/4] encode (batch_size={args.batch_size}, max_length={args.max_length})") |
| csv_emb = encode_with_cache([r["conv_text"] for r in rows], |
| tokenizer, model, |
| max_length=args.max_length, |
| batch_size=args.batch_size, |
| cache_dir=cd, name=f"csv_{Path(args.csv).stem}") |
| ruler_emb = encode_with_cache([it["text"] for it in ruler], |
| tokenizer, model, |
| max_length=args.max_length, |
| batch_size=args.batch_size, |
| cache_dir=cd, name=f"ruler_{Path(args.ruler).parent.name}") |
|
|
| |
| sims = csv_emb @ ruler_emb.T |
| K = min(args.top_k, len(ruler)) |
| |
| top_idx_part = np.argpartition(-sims, K - 1, axis=1)[:, :K] |
| |
| row_arange = np.arange(sims.shape[0])[:, None] |
| top_sims_part = sims[row_arange, top_idx_part] |
| order = np.argsort(-top_sims_part, axis=1) |
| top_idx = np.take_along_axis(top_idx_part, order, axis=1) |
| top_sims = np.take_along_axis(top_sims_part, order, axis=1) |
|
|
| |
| out_path = Path(args.output) |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
| summary_rows = [] |
| print(f"[write] {out_path}") |
| with out_path.open("w", encoding="utf-8") as f: |
| for i, row in enumerate(rows): |
| topk = [] |
| for j in range(K): |
| idx = int(top_idx[i, j]) |
| topk.append({ |
| "rank": ruler[idx]["rank"], |
| "score": ruler[idx]["score"], |
| "sim": float(top_sims[i, j]), |
| "item_id": ruler[idx]["item_id"], |
| }) |
| sims_arr = np.array([t["sim"] for t in topk], dtype=float) |
| scores_arr = np.array([t["score"] for t in topk], dtype=float) |
| wsim = float(sims_arr.sum()) |
| weighted_score = float((sims_arr * scores_arr).sum() / wsim) if wsim > 0 else 0.0 |
| top1_score = topk[0]["score"] |
| pred = int(weighted_score >= args.boundary_score) |
| gt = int(row["label"] == "Y") |
| record = { |
| "task_id": row["task_id"], |
| "label": row["label"], |
| "ground_truth": gt, |
| "weighted_score": weighted_score, |
| "top1_score": top1_score, |
| "top1_sim": topk[0]["sim"], |
| "top1_rank": topk[0]["rank"], |
| "pred_by_weighted": pred, |
| "topk": topk, |
| } |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") |
| summary_rows.append({ |
| "task_id": row["task_id"], |
| "label": row["label"], |
| "ground_truth": gt, |
| "weighted_score": round(weighted_score, 4), |
| "top1_rank": topk[0]["rank"], |
| "top1_score": round(top1_score, 4), |
| "top1_sim": round(topk[0]["sim"], 4), |
| "top1_item_id": topk[0]["item_id"], |
| "pred_by_weighted": pred, |
| }) |
|
|
| summary_csv = out_path.with_suffix(".summary.csv") |
| pd.DataFrame(summary_rows).to_csv(summary_csv, index=False) |
| print(f"[write] {summary_csv}") |
|
|
| |
| sdf = pd.DataFrame(summary_rows) |
| if "ground_truth" in sdf.columns and len(sdf): |
| tp = int(((sdf.pred_by_weighted == 1) & (sdf.ground_truth == 1)).sum()) |
| fp = int(((sdf.pred_by_weighted == 1) & (sdf.ground_truth == 0)).sum()) |
| tn = int(((sdf.pred_by_weighted == 0) & (sdf.ground_truth == 0)).sum()) |
| fn = int(((sdf.pred_by_weighted == 0) & (sdf.ground_truth == 1)).sum()) |
| prec = tp / (tp + fp) if tp + fp else 0.0 |
| rec = tp / (tp + fn) if tp + fn else 0.0 |
| f1 = 2 * prec * rec / (prec + rec) if prec + rec else 0.0 |
| print(f"\n[metrics @ weighted_score >= {args.boundary_score}]") |
| print(f" TP={tp} FP={fp} TN={tn} FN={fn}") |
| print(f" precision={prec:.4f} recall={rec:.4f} f1={f1:.4f}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|