Sound / batch_top100_match.py
Wendy-Fly's picture
Upload batch_top100_match.py with huggingface_hub
c01a125 verified
#!/usr/bin/env python3
"""每条 golden_set 样本和 ruler 200 条做 cosine 相似度,保留 Top-100 邻居。
跟 batch_top5_match.py 的差别:
- 默认 --top-k 100
- summary.csv 不再展平 100 个邻居(会爆 400 列),改成几个聚合统计:
* top1_*:第一近邻的 rank/score/sim
* mean/median_score_top100:100 个邻居的 ruler score 均值/中位数
* frac_rank_lt_106_top100:100 个邻居里 rank < 106 的比例(投票预测信号)
* weighted_score:sim-加权 score 平均
* 多种 0/1 预测:top1 / majority(>=50) / weighted / mean_score
- JSONL 里仍然保留完整 100 条邻居(rank/score/sim/item_id),便于事后再分析
用法:
python3 batch_top100_match.py
python3 batch_top100_match.py --limit 50 # 先小跑
python3 batch_top100_match.py --top-k 100 --boundary-rank 106 --boundary-score 44.72
"""
import argparse
import json
import re
import time
from pathlib import Path
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from torch import Tensor
from transformers import AutoTokenizer, AutoModel
DEFAULT_MODEL = "/mnt/bn/tns-algo-ue-my/biaowu/WorkSpace/Models/Qwen3-Embedding-8B"
DEFAULT_RULER = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/ranking_moderation/data/dm/youth_sexual_and_physical_abuse_aigt_v009/ranking_bucket/ruler_items.json"
DEFAULT_CSV = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/example/yss_ruler_eval/data/aipf_golden_set.csv"
# ---------- model ----------
def last_token_pool(h: Tensor, attn: Tensor) -> Tensor:
if (attn[:, -1].sum() == attn.shape[0]):
return h[:, -1]
lens = attn.sum(dim=1) - 1
bsz = h.shape[0]
return h[torch.arange(bsz, device=h.device), lens]
@torch.no_grad()
def encode(texts, tokenizer, model, max_length, batch_size, label):
embs = []
n = len(texts)
t0 = time.time()
for i in range(0, n, batch_size):
batch = texts[i:i + batch_size]
d = tokenizer(batch, padding=True, truncation=True,
max_length=max_length, return_tensors="pt").to(model.device)
out = model(**d)
e = last_token_pool(out.last_hidden_state, d["attention_mask"])
e = F.normalize(e, p=2, dim=1)
embs.append(e.cpu().float())
del out, d, e
if torch.cuda.is_available():
torch.cuda.empty_cache()
done = min(i + batch_size, n)
if done % (batch_size * 10) == 0 or done == n:
elapsed = time.time() - t0
rate = done / max(elapsed, 1e-3)
eta = (n - done) / max(rate, 1e-3)
print(f" [{label}] {done}/{n} | {rate:.1f} ex/s | eta {eta:.0f}s", flush=True)
return torch.cat(embs, dim=0).numpy()
# ---------- data ----------
def load_ruler_items(path):
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
items = data if isinstance(data, list) else (
data.get("items") or data.get("ruler_items") or data.get("data") or [])
out = []
for it in items:
inner = it.get("item", {}) if isinstance(it.get("item"), dict) else {}
conv = inner.get("conv_text") or it.get("conv_text") or ""
out.append({
"rank": int(it.get("rank")) if it.get("rank") is not None else None,
"score": float(it.get("score", 0.0)),
"item_id": str(it.get("item_id")),
"text": conv,
})
return out
_M_PREFIX = re.compile(r"<m\d+>")
def extract_conv(raw):
if not isinstance(raw, str):
return ""
m = _M_PREFIX.search(raw)
return raw[m.start():] if m else raw.strip()
def load_csv(path, text_col, id_col, label_col, limit=None):
df = pd.read_csv(path, keep_default_na=False)
if text_col not in df.columns and "conv_text" in df.columns:
text_col = "conv_text"
for c in (id_col, label_col, text_col):
if c not in df.columns:
raise ValueError(f"missing column: {c}; available: {list(df.columns)}")
if limit:
df = df.head(limit).copy()
rows = []
for _, r in df.iterrows():
rows.append({
"task_id": str(r[id_col]),
"label": str(r[label_col]).strip().upper(),
"conv_text": extract_conv(r[text_col]),
})
return rows
# ---------- cache ----------
def encode_with_cache(texts, tokenizer, model, *, max_length, batch_size,
cache_dir, name):
if cache_dir:
Path(cache_dir).mkdir(parents=True, exist_ok=True)
p = Path(cache_dir) / f"{name}_n{len(texts)}_L{max_length}.npy"
if p.exists():
print(f" [{name}] cache hit: {p}")
return np.load(p)
emb = encode(texts, tokenizer, model, max_length, batch_size, label=name)
if cache_dir:
np.save(p, emb)
print(f" [{name}] cached: {p}")
return emb
# ---------- args ----------
def parse_args():
p = argparse.ArgumentParser()
p.add_argument("--csv", default=DEFAULT_CSV)
p.add_argument("--ruler", default=DEFAULT_RULER)
p.add_argument("--model", default=DEFAULT_MODEL)
p.add_argument("--output", default="golden_top100.jsonl")
p.add_argument("--text-col", default="text")
p.add_argument("--id-col", default="task_id")
p.add_argument("--label-col", default="label")
p.add_argument("--top-k", type=int, default=100)
p.add_argument("--boundary-rank", type=int, default=106,
help="rank<X 当严重;用于投票/top1 预测")
p.add_argument("--boundary-score", type=float, default=44.72,
help="weighted_score / mean_score >= X 当严重")
p.add_argument("--positive-label", default="Y",
help="csv label 列里算正样本的字符串(大小写不敏感)")
p.add_argument("--max-length", type=int, default=4096)
p.add_argument("--batch-size", type=int, default=4)
p.add_argument("--cache-dir", default="cache_emb")
p.add_argument("--limit", type=int, default=None)
p.add_argument("--cpu", action="store_true")
p.add_argument("--no-flash-attn", action="store_true")
return p.parse_args()
def main():
args = parse_args()
pos_label = args.positive_label.strip().upper()
print(f"[1/4] load csv: {args.csv}")
rows = load_csv(args.csv, args.text_col, args.id_col, args.label_col, args.limit)
print(f" -> {len(rows)} samples")
print(f"[2/4] load ruler: {args.ruler}")
ruler = load_ruler_items(args.ruler)
print(f" -> {len(ruler)} ruler items")
K = min(args.top_k, len(ruler))
print(f" keeping top-{K} per sample")
print(f"[3/4] load model: {args.model}")
device = "cpu" if args.cpu else ("cuda" if torch.cuda.is_available() else "cpu")
print(f" device: {device}")
mk = {}
if device == "cuda":
mk["torch_dtype"] = torch.float16
if not args.no_flash_attn:
mk["attn_implementation"] = "flash_attention_2"
tokenizer = AutoTokenizer.from_pretrained(args.model, padding_side="left")
model = AutoModel.from_pretrained(args.model, **mk).to(device).eval()
cd = args.cache_dir or None
print(f"[4/4] encode (batch_size={args.batch_size}, max_length={args.max_length})")
csv_emb = encode_with_cache([r["conv_text"] for r in rows],
tokenizer, model,
max_length=args.max_length,
batch_size=args.batch_size,
cache_dir=cd, name=f"csv_{Path(args.csv).stem}")
ruler_emb = encode_with_cache([it["text"] for it in ruler],
tokenizer, model,
max_length=args.max_length,
batch_size=args.batch_size,
cache_dir=cd, name=f"ruler_{Path(args.ruler).parent.name}")
sims = csv_emb @ ruler_emb.T # (N, R)
top_idx_part = np.argpartition(-sims, K - 1, axis=1)[:, :K]
row_arange = np.arange(sims.shape[0])[:, None]
top_sims_part = sims[row_arange, top_idx_part]
order = np.argsort(-top_sims_part, axis=1)
top_idx = np.take_along_axis(top_idx_part, order, axis=1)
top_sims = np.take_along_axis(top_sims_part, order, axis=1)
# 写文件
out_path = Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
summary_rows = []
print(f"[write] {out_path}")
with out_path.open("w", encoding="utf-8") as f:
for i, row in enumerate(rows):
topk_full = []
ranks_arr = np.empty(K, dtype=np.int64)
scores_arr = np.empty(K, dtype=np.float64)
sims_arr = np.empty(K, dtype=np.float64)
for j in range(K):
idx = int(top_idx[i, j])
r = ruler[idx]
ranks_arr[j] = r["rank"]
scores_arr[j] = r["score"]
sims_arr[j] = float(top_sims[i, j])
topk_full.append({
"rank": r["rank"],
"score": r["score"],
"sim": sims_arr[j],
"item_id": r["item_id"],
})
gt = int(row["label"] == pos_label)
wsim = float(sims_arr.sum())
weighted_score = float((sims_arr * scores_arr).sum() / wsim) if wsim > 0 else 0.0
mean_score = float(scores_arr.mean())
median_score = float(np.median(scores_arr))
frac_severe = float((ranks_arr < args.boundary_rank).mean())
vote_count = int((ranks_arr < args.boundary_rank).sum())
pred_top1 = int(ranks_arr[0] < args.boundary_rank)
pred_majority = int(vote_count > K / 2)
pred_weighted = int(weighted_score >= args.boundary_score)
pred_mean = int(mean_score >= args.boundary_score)
record = {
"task_id": row["task_id"],
"label": row["label"],
"ground_truth": gt,
# 聚合统计
"top1_rank": int(ranks_arr[0]),
"top1_score": float(scores_arr[0]),
"top1_sim": sims_arr[0],
"mean_score_topk": mean_score,
"median_score_topk": median_score,
"frac_rank_lt_boundary_topk": frac_severe,
"vote_count_topk": vote_count,
"weighted_score": weighted_score,
# 多种 0/1 预测
"pred_top1": pred_top1,
"pred_majority": pred_majority,
"pred_weighted": pred_weighted,
"pred_mean_score": pred_mean,
# 完整邻居列表
"topk": topk_full,
}
f.write(json.dumps(record, ensure_ascii=False) + "\n")
summary_rows.append({
"task_id": row["task_id"],
"label": row["label"],
"ground_truth": gt,
"top1_rank": int(ranks_arr[0]),
"top1_score": round(scores_arr[0], 4),
"top1_sim": round(sims_arr[0], 4),
"mean_score_topk": round(mean_score, 4),
"median_score_topk": round(median_score, 4),
"frac_rank_lt_boundary_topk": round(frac_severe, 4),
"vote_count_topk": vote_count,
"weighted_score": round(weighted_score, 4),
"pred_top1": pred_top1,
"pred_majority": pred_majority,
"pred_weighted": pred_weighted,
"pred_mean_score": pred_mean,
})
summary_csv = out_path.with_suffix(".summary.csv")
pd.DataFrame(summary_rows).to_csv(summary_csv, index=False)
print(f"[write] {summary_csv}")
# 各预测口径的 P/R/F1
sdf = pd.DataFrame(summary_rows)
print(f"\n[metrics] (positive_label='{pos_label}', boundary_rank={args.boundary_rank}, "
f"boundary_score={args.boundary_score}, K={K})")
print(f"{'pred':<22}{'TP':>5}{'FP':>5}{'TN':>5}{'FN':>5} {'P':>7}{'R':>7}{'F1':>7}{'Acc':>7}")
print("-" * 80)
for col in ("pred_top1", "pred_majority", "pred_weighted", "pred_mean_score"):
tp = int(((sdf[col] == 1) & (sdf.ground_truth == 1)).sum())
fp = int(((sdf[col] == 1) & (sdf.ground_truth == 0)).sum())
tn = int(((sdf[col] == 0) & (sdf.ground_truth == 0)).sum())
fn = int(((sdf[col] == 0) & (sdf.ground_truth == 1)).sum())
prec = tp/(tp+fp) if tp+fp else 0.0
rec = tp/(tp+fn) if tp+fn else 0.0
f1 = 2*prec*rec/(prec+rec) if prec+rec else 0.0
acc = (tp+tn)/len(sdf)
print(f"{col:<22}{tp:>5}{fp:>5}{tn:>5}{fn:>5} {prec:>7.4f}{rec:>7.4f}{f1:>7.4f}{acc:>7.4f}")
if __name__ == "__main__":
main()