vidore-leaderboard / data /pipeline_handler.py
QuentinJG's picture
Pipeline leaderboard (#17)
342cf1f
import os
import requests
from typing import Dict, List, Optional
import pandas as pd
class PipelineHandler:
"""Handler for ViDoRe v3 pipeline evaluation results from GitHub."""
def __init__(self):
self.pipeline_infos = {}
self.pipeline_aliases = {} # Maps folder_name -> pipeline_alias for display
self.github_base_url = "https://raw.githubusercontent.com/illuin-tech/vidore-benchmark/main/results/metrics"
self.github_descriptions_base_url = (
"https://raw.githubusercontent.com/illuin-tech/vidore-benchmark/main/results/pipeline_descriptions"
)
self.available_datasets = []
self.available_languages = ["english"] # Default languages available
# Setup GitHub authentication if token is available
self.github_token = os.environ.get("GITHUB_TOKEN")
self.headers = {}
if self.github_token:
self.headers["Authorization"] = f"token {self.github_token}"
print("GitHub token detected - using authenticated requests")
def get_pipeline_folders_from_github(self) -> List[str]:
"""Get list of pipeline folders from GitHub API."""
api_url = "https://api.github.com/repos/illuin-tech/vidore-benchmark/contents/results/metrics"
try:
response = requests.get(api_url, headers=self.headers)
response.raise_for_status()
contents = response.json()
# Filter for directories only
folders = [item["name"] for item in contents if item["type"] == "dir"]
return sorted(folders)
except Exception as e:
print(f"Error fetching pipeline folders from GitHub: {e}")
return []
def get_dataset_files_from_github(self, pipeline_name: str) -> List[str]:
"""Get list of dataset JSON files for a specific pipeline from GitHub API."""
api_url = f"https://api.github.com/repos/illuin-tech/vidore-benchmark/contents/results/metrics/{pipeline_name}"
try:
response = requests.get(api_url, headers=self.headers)
response.raise_for_status()
contents = response.json()
# Filter for JSON files that start with 'vidore_v3'
files = [
item["name"]
for item in contents
if item["type"] == "file" and item["name"].startswith("vidore_v3") and item["name"].endswith(".json")
]
return sorted(files)
except Exception as e:
print(f"Error fetching dataset files from {pipeline_name}: {e}")
return []
def fetch_json_from_github(self, pipeline_name: str, filename: str) -> Optional[Dict]:
"""Fetch a JSON file from GitHub raw content."""
url = f"{self.github_base_url}/{pipeline_name}/{filename}"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching {filename} from {pipeline_name}: {e}")
return None
def fetch_pipeline_alias(self, pipeline_name: str) -> Optional[str]:
"""Fetch the pipeline_alias from description.json for a pipeline.
Uses raw.githubusercontent.com to avoid API rate limits.
"""
url = f"{self.github_descriptions_base_url}/{pipeline_name}/description.json"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
description = response.json()
return description.get("pipeline_alias")
except Exception as e:
print(f"Error fetching description for {pipeline_name}: {e}")
return None
def get_pipeline_data(self):
"""Fetch all pipeline data from GitHub."""
pipeline_folders = self.get_pipeline_folders_from_github()
datasets_set = set()
languages_set = set(["overall"])
for pipeline_name in pipeline_folders:
# Get all dataset files for this pipeline
dataset_files = self.get_dataset_files_from_github(pipeline_name)
if not dataset_files:
continue
pipeline_data = {}
for filename in dataset_files:
results = self.fetch_json_from_github(pipeline_name, filename)
if results:
# Extract dataset name from filename (remove vidore_v3_ prefix and .json suffix)
dataset_name = filename.replace("vidore_v3_", "").replace(".json", "")
datasets_set.add(dataset_name)
pipeline_data[dataset_name] = results
# Collect available languages
if "aggregated_metrics" in results and "by_language" in results["aggregated_metrics"]:
languages_set.update(results["aggregated_metrics"]["by_language"].keys())
if pipeline_data:
self.pipeline_infos[pipeline_name] = pipeline_data
# Fetch the pipeline alias for display (uses raw URL, not API)
alias = self.fetch_pipeline_alias(pipeline_name)
if alias:
self.pipeline_aliases[pipeline_name] = alias
self.available_datasets = sorted(list(datasets_set))
self.available_languages = sorted(list(languages_set))
def calculate_cost_metric(self, pipeline_datasets: Dict) -> float:
"""
Calculate a compute cost metric based on retrieval time across all datasets.
Returns cost in arbitrary units (could be refined based on actual compute costs).
"""
total_time_s = 0
for dataset_name, dataset_data in pipeline_datasets.items():
if "aggregated_metrics" not in dataset_data:
continue
timing = dataset_data["aggregated_metrics"].get("timing", {})
total_time_ms = timing.get("total_retrieval_time_milliseconds", 0)
total_time_s += total_time_ms / 1000.0
# Simple cost model: assume $0.01 per second of compute (adjustable)
cost = total_time_s * 0.01
return round(cost, 4)
def extract_dataset_metrics(
self, pipeline_datasets: Dict, metric: str = "ndcg_cut_5", language: str = "english"
) -> Dict[str, float]:
"""
Extract metrics for individual datasets from the aggregated results.
Args:
pipeline_datasets: Dictionary mapping dataset names to their data
metric: The metric to extract (e.g., 'ndcg_at_5')
language: The language to filter by ('english' for English-only results, or specific language)
Returns:
Dictionary mapping dataset names to metric values
"""
# Map metric names from UI format to API format
metric_mapping = {
"ndcg_at_1": "ndcg_cut_5", # Using cut_5 as approximation
"ndcg_at_5": "ndcg_cut_5",
"ndcg_at_10": "ndcg_cut_10",
"ndcg_at_100": "ndcg_cut_100",
"recall_at_1": "recall_5",
"recall_at_5": "recall_5",
"recall_at_10": "recall_10",
"recall_at_100": "recall_100",
}
actual_metric = metric_mapping.get(metric, metric)
dataset_metrics = {}
for dataset_name, dataset_data in pipeline_datasets.items():
if "aggregated_metrics" not in dataset_data:
continue
aggregated = dataset_data["aggregated_metrics"]
# Get metrics for the specified language
if language == "overall":
metrics_data = aggregated.get("overall", {})
else:
metrics_data = aggregated.get("by_language", {}).get(language, {})
if metrics_data:
# Format dataset name for display
display_name = dataset_name.replace("_", " ").title()
dataset_metrics[display_name] = metrics_data.get(actual_metric, 0.0)
return dataset_metrics
def render_df(self, metric: str = "ndcg_at_5", language: str = "overall") -> pd.DataFrame:
"""
Render a DataFrame with pipeline results.
Args:
metric: The metric to display (e.g., 'ndcg_at_5')
language: The language to filter by ('overall' for all languages, or specific language)
Returns:
DataFrame with columns: Pipeline Name, Compute Cost, Timing metrics, Dataset metrics
"""
pipeline_res = {}
for pipeline_name, pipeline_datasets in self.pipeline_infos.items():
row_data = {}
# Aggregate time metrics across all datasets
total_time_ms = 0
total_queries = 0
indexing_time_ms = 0
search_time_ms = 0
num_datasets = 0
for dataset_name, dataset_data in pipeline_datasets.items():
if "aggregated_metrics" in dataset_data:
timing = dataset_data["aggregated_metrics"].get("timing", {})
total_time_ms += timing.get("total_retrieval_time_milliseconds", 0)
total_queries += timing.get("num_queries", 0)
indexing_time_ms += timing.get("indexing_throughput_ms_per_doc", 0)
search_time_ms += timing.get("search_throughput_ms_per_query", 0)
num_datasets += 1
if total_queries > 0:
if total_time_ms > 0:
# row_data["Total retrieval time (s)"] = round(
# (total_time_ms / 1000), 0
# )
row_data["Indexing latency (s/doc)"] = (
(indexing_time_ms / 1000) / num_datasets if indexing_time_ms > 0 else None
)
row_data["Search latency (s/query)"] = (
(search_time_ms / 1000) / num_datasets if search_time_ms > 0 else None
)
else:
# row_data["Total retrieval time (s)"] = 0
row_data["Indexing latency (s/doc)"] = 0
row_data["Search latency (s/query)"] = 0
else:
# row_data["Total retrieval time (s)"] = -1
row_data["Indexing latency (s/doc)"] = -1
row_data["Search latency (s/query)"] = -1
# Add dataset metrics
dataset_metrics = self.extract_dataset_metrics(pipeline_datasets, metric, language)
row_data.update(dataset_metrics)
# Calculate average across datasets if there are multiple
if dataset_metrics:
row_data["Average Score"] = round(sum(dataset_metrics.values()) / len(dataset_metrics), 4)
# Use pipeline_alias for display if available, otherwise fallback to folder name
display_name = self.pipeline_aliases.get(pipeline_name, pipeline_name)
# Store folder name for link generation (will be used in utils.py)
row_data["_folder_name"] = pipeline_name
pipeline_res[display_name] = row_data
if pipeline_res:
df = pd.DataFrame(pipeline_res).T
# Reorder columns to have Average right after timing metrics
cols = list(df.columns)
if "Average Score" in cols:
cols.remove("Average Score")
# Insert Average after Search latency (s/query)
insert_pos = cols.index("Search latency (s/query)") + 1 if "Search latency (s/query)" in cols else 2
cols.insert(insert_pos, "Average Score")
df = df[cols]
return df
return pd.DataFrame()