Sound / batch_top5_match.py
Wendy-Fly's picture
Upload batch_top5_match.py with huggingface_hub
9d30467 verified
#!/usr/bin/env python3
"""把 golden_set.csv (≈1000 条) 全部和 ruler 200 条做 cosine 相似度,
每条算 Top-K 最近的 ruler items,并把结果保存到本地。
用法:
# 默认路径
python3 batch_top5_match.py
# 自定义
python3 batch_top5_match.py \
--csv /mnt/.../aipf_golden_set.csv \
--ruler /mnt/.../ruler_items.json \
--model /mnt/.../Qwen3-Embedding-8B \
--output golden_top5.jsonl \
--top-k 5 \
--boundary-score 44.72 \
--cache-dir cache_emb \
--limit 50 # 先小跑 50 条 sanity check
输出:
- {output}.jsonl 每行一条样本,含 task_id / label / Top-K 详情 / weighted_score / 预测
- {output}.summary.csv 按行汇总,便于在 Excel / pandas 里筛
- cache_emb/*.npy (可选)embedding 缓存,重跑时自动复用
"""
import argparse
import json
import re
import sys
import time
from pathlib import Path
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from torch import Tensor
from transformers import AutoTokenizer, AutoModel
DEFAULT_MODEL = "/mnt/bn/tns-algo-ue-my/biaowu/WorkSpace/Models/Qwen3-Embedding-8B"
DEFAULT_RULER = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/ranking_moderation/data/dm/youth_sexual_and_physical_abuse_aigt_v009/ranking_bucket/ruler_items.json"
DEFAULT_CSV = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/example/yss_ruler_eval/data/aipf_golden_set.csv"
# ---------- model utils ----------
def last_token_pool(h: Tensor, attn: Tensor) -> Tensor:
if (attn[:, -1].sum() == attn.shape[0]): # left padding
return h[:, -1]
lens = attn.sum(dim=1) - 1
bsz = h.shape[0]
return h[torch.arange(bsz, device=h.device), lens]
@torch.no_grad()
def encode(texts, tokenizer, model, max_length, batch_size, label="encode"):
embs = []
n = len(texts)
t0 = time.time()
for i in range(0, n, batch_size):
batch = texts[i:i + batch_size]
d = tokenizer(batch, padding=True, truncation=True,
max_length=max_length, return_tensors="pt").to(model.device)
out = model(**d)
e = last_token_pool(out.last_hidden_state, d["attention_mask"])
e = F.normalize(e, p=2, dim=1)
embs.append(e.cpu().float())
del out, d, e
if torch.cuda.is_available():
torch.cuda.empty_cache()
done = min(i + batch_size, n)
if done % (batch_size * 10) == 0 or done == n:
elapsed = time.time() - t0
rate = done / max(elapsed, 1e-3)
eta = (n - done) / max(rate, 1e-3)
print(f" [{label}] {done}/{n} | {rate:.1f} ex/s | eta {eta:.0f}s", flush=True)
return torch.cat(embs, dim=0).numpy()
# ---------- data utils ----------
def load_ruler_items(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
items = data if isinstance(data, list) else (
data.get("items") or data.get("ruler_items") or data.get("data") or [])
out = []
for it in items:
inner = it.get("item", {}) if isinstance(it.get("item"), dict) else {}
conv = inner.get("conv_text") or it.get("conv_text") or ""
out.append({
"rank": it.get("rank"),
"score": float(it.get("score", 0.0)),
"item_id": str(it.get("item_id")),
"text": conv,
})
return out
_M_PREFIX = re.compile(r"<m\d+>")
def extract_conv(raw):
"""golden_set 的 text 里可能带 alias-age dict 前缀,这里只取 <m0>... 之后的。"""
if not isinstance(raw, str):
return ""
m = _M_PREFIX.search(raw)
return raw[m.start():] if m else raw.strip()
def load_csv(path, text_col, id_col, label_col, limit=None):
df = pd.read_csv(path, keep_default_na=False)
needed = [c for c in (id_col, label_col) if c not in df.columns]
if needed:
raise ValueError(f"missing columns: {needed}; available: {list(df.columns)}")
if text_col not in df.columns:
if "conv_text" in df.columns:
text_col = "conv_text"
else:
raise ValueError("no text/conv_text column")
if limit:
df = df.head(limit).copy()
rows = []
for _, r in df.iterrows():
rows.append({
"task_id": str(r[id_col]),
"label": str(r[label_col]).strip().upper(),
"raw_text": str(r[text_col]),
"conv_text": extract_conv(r[text_col]),
})
return rows
# ---------- cache ----------
def cache_path(cache_dir, name, n_items, max_length):
return Path(cache_dir) / f"{name}_n{n_items}_L{max_length}.npy"
def encode_with_cache(texts, tokenizer, model, *, max_length, batch_size,
cache_dir, name):
if cache_dir:
Path(cache_dir).mkdir(parents=True, exist_ok=True)
p = cache_path(cache_dir, name, len(texts), max_length)
if p.exists():
print(f" [{name}] using cached embeddings: {p}")
return np.load(p)
emb = encode(texts, tokenizer, model, max_length, batch_size, label=name)
if cache_dir:
np.save(p, emb)
print(f" [{name}] saved cache: {p}")
return emb
# ---------- args ----------
def parse_args():
p = argparse.ArgumentParser()
p.add_argument("--csv", default=DEFAULT_CSV)
p.add_argument("--ruler", default=DEFAULT_RULER)
p.add_argument("--model", default=DEFAULT_MODEL)
p.add_argument("--output", default="golden_top5.jsonl")
p.add_argument("--text-col", default="text")
p.add_argument("--id-col", default="task_id")
p.add_argument("--label-col", default="label")
p.add_argument("--top-k", type=int, default=5)
p.add_argument("--boundary-score", type=float, default=44.72,
help="预测阈值,weighted_score >= 该值则 pred=1(默认从 pipeline.yaml 抄过来的 youth 类阈值)")
p.add_argument("--max-length", type=int, default=4096)
p.add_argument("--batch-size", type=int, default=4)
p.add_argument("--cache-dir", default="cache_emb",
help="embedding 缓存目录;设空字符串关闭缓存")
p.add_argument("--limit", type=int, default=None,
help="只跑前 N 条做 smoke test")
p.add_argument("--cpu", action="store_true")
p.add_argument("--no-flash-attn", action="store_true")
return p.parse_args()
def main():
args = parse_args()
# 1) data
print(f"[1/4] load csv: {args.csv}")
rows = load_csv(args.csv, args.text_col, args.id_col, args.label_col, args.limit)
print(f" -> {len(rows)} samples")
print(f"[2/4] load ruler: {args.ruler}")
ruler = load_ruler_items(args.ruler)
print(f" -> {len(ruler)} ruler items")
# 2) model
print(f"[3/4] load model: {args.model}")
device = "cpu" if args.cpu else ("cuda" if torch.cuda.is_available() else "cpu")
print(f" device: {device}")
mk = {}
if device == "cuda":
mk["torch_dtype"] = torch.float16
if not args.no_flash_attn:
mk["attn_implementation"] = "flash_attention_2"
tokenizer = AutoTokenizer.from_pretrained(args.model, padding_side="left")
model = AutoModel.from_pretrained(args.model, **mk).to(device).eval()
# 3) encode(分别缓存 csv 和 ruler)
cd = args.cache_dir or None
print(f"[4/4] encode (batch_size={args.batch_size}, max_length={args.max_length})")
csv_emb = encode_with_cache([r["conv_text"] for r in rows],
tokenizer, model,
max_length=args.max_length,
batch_size=args.batch_size,
cache_dir=cd, name=f"csv_{Path(args.csv).stem}")
ruler_emb = encode_with_cache([it["text"] for it in ruler],
tokenizer, model,
max_length=args.max_length,
batch_size=args.batch_size,
cache_dir=cd, name=f"ruler_{Path(args.ruler).parent.name}")
# 4) sim matrix + Top-K
sims = csv_emb @ ruler_emb.T # (N_csv, N_ruler)
K = min(args.top_k, len(ruler))
# argpartition 找 K 个最大,再排序
top_idx_part = np.argpartition(-sims, K - 1, axis=1)[:, :K]
# 在每行内按 sim 排序
row_arange = np.arange(sims.shape[0])[:, None]
top_sims_part = sims[row_arange, top_idx_part]
order = np.argsort(-top_sims_part, axis=1)
top_idx = np.take_along_axis(top_idx_part, order, axis=1)
top_sims = np.take_along_axis(top_sims_part, order, axis=1)
# 5) 写 JSONL + summary
out_path = Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
summary_rows = []
print(f"[write] {out_path}")
with out_path.open("w", encoding="utf-8") as f:
for i, row in enumerate(rows):
topk = []
for j in range(K):
idx = int(top_idx[i, j])
topk.append({
"rank": ruler[idx]["rank"],
"score": ruler[idx]["score"],
"sim": float(top_sims[i, j]),
"item_id": ruler[idx]["item_id"],
})
sims_arr = np.array([t["sim"] for t in topk], dtype=float)
scores_arr = np.array([t["score"] for t in topk], dtype=float)
wsim = float(sims_arr.sum())
weighted_score = float((sims_arr * scores_arr).sum() / wsim) if wsim > 0 else 0.0
top1_score = topk[0]["score"]
pred = int(weighted_score >= args.boundary_score)
gt = int(row["label"] == "Y")
record = {
"task_id": row["task_id"],
"label": row["label"],
"ground_truth": gt,
"weighted_score": weighted_score,
"top1_score": top1_score,
"top1_sim": topk[0]["sim"],
"top1_rank": topk[0]["rank"],
"pred_by_weighted": pred,
"topk": topk,
}
f.write(json.dumps(record, ensure_ascii=False) + "\n")
summary_rows.append({
"task_id": row["task_id"],
"label": row["label"],
"ground_truth": gt,
"weighted_score": round(weighted_score, 4),
"top1_rank": topk[0]["rank"],
"top1_score": round(top1_score, 4),
"top1_sim": round(topk[0]["sim"], 4),
"top1_item_id": topk[0]["item_id"],
"pred_by_weighted": pred,
})
summary_csv = out_path.with_suffix(".summary.csv")
pd.DataFrame(summary_rows).to_csv(summary_csv, index=False)
print(f"[write] {summary_csv}")
# 6) 顺手算个总指标
sdf = pd.DataFrame(summary_rows)
if "ground_truth" in sdf.columns and len(sdf):
tp = int(((sdf.pred_by_weighted == 1) & (sdf.ground_truth == 1)).sum())
fp = int(((sdf.pred_by_weighted == 1) & (sdf.ground_truth == 0)).sum())
tn = int(((sdf.pred_by_weighted == 0) & (sdf.ground_truth == 0)).sum())
fn = int(((sdf.pred_by_weighted == 0) & (sdf.ground_truth == 1)).sum())
prec = tp / (tp + fp) if tp + fp else 0.0
rec = tp / (tp + fn) if tp + fn else 0.0
f1 = 2 * prec * rec / (prec + rec) if prec + rec else 0.0
print(f"\n[metrics @ weighted_score >= {args.boundary_score}]")
print(f" TP={tp} FP={fp} TN={tn} FN={fn}")
print(f" precision={prec:.4f} recall={rec:.4f} f1={f1:.4f}")
if __name__ == "__main__":
main()