Spaces:
Running
Running
| import os | |
| import requests | |
| from typing import Dict, List, Optional | |
| import pandas as pd | |
| class PipelineHandler: | |
| """Handler for ViDoRe v3 pipeline evaluation results from GitHub.""" | |
| def __init__(self): | |
| self.pipeline_infos = {} | |
| self.pipeline_aliases = {} # Maps folder_name -> pipeline_alias for display | |
| self.github_base_url = "https://raw.githubusercontent.com/illuin-tech/vidore-benchmark/main/results/metrics" | |
| self.github_descriptions_base_url = ( | |
| "https://raw.githubusercontent.com/illuin-tech/vidore-benchmark/main/results/pipeline_descriptions" | |
| ) | |
| self.available_datasets = [] | |
| self.available_languages = ["english"] # Default languages available | |
| # Setup GitHub authentication if token is available | |
| self.github_token = os.environ.get("GITHUB_TOKEN") | |
| self.headers = {} | |
| if self.github_token: | |
| self.headers["Authorization"] = f"token {self.github_token}" | |
| print("GitHub token detected - using authenticated requests") | |
| def get_pipeline_folders_from_github(self) -> List[str]: | |
| """Get list of pipeline folders from GitHub API.""" | |
| api_url = "https://api.github.com/repos/illuin-tech/vidore-benchmark/contents/results/metrics" | |
| try: | |
| response = requests.get(api_url, headers=self.headers) | |
| response.raise_for_status() | |
| contents = response.json() | |
| # Filter for directories only | |
| folders = [item["name"] for item in contents if item["type"] == "dir"] | |
| return sorted(folders) | |
| except Exception as e: | |
| print(f"Error fetching pipeline folders from GitHub: {e}") | |
| return [] | |
| def get_dataset_files_from_github(self, pipeline_name: str) -> List[str]: | |
| """Get list of dataset JSON files for a specific pipeline from GitHub API.""" | |
| api_url = f"https://api.github.com/repos/illuin-tech/vidore-benchmark/contents/results/metrics/{pipeline_name}" | |
| try: | |
| response = requests.get(api_url, headers=self.headers) | |
| response.raise_for_status() | |
| contents = response.json() | |
| # Filter for JSON files that start with 'vidore_v3' | |
| files = [ | |
| item["name"] | |
| for item in contents | |
| if item["type"] == "file" and item["name"].startswith("vidore_v3") and item["name"].endswith(".json") | |
| ] | |
| return sorted(files) | |
| except Exception as e: | |
| print(f"Error fetching dataset files from {pipeline_name}: {e}") | |
| return [] | |
| def fetch_json_from_github(self, pipeline_name: str, filename: str) -> Optional[Dict]: | |
| """Fetch a JSON file from GitHub raw content.""" | |
| url = f"{self.github_base_url}/{pipeline_name}/{filename}" | |
| try: | |
| response = requests.get(url, headers=self.headers) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| print(f"Error fetching {filename} from {pipeline_name}: {e}") | |
| return None | |
| def fetch_pipeline_alias(self, pipeline_name: str) -> Optional[str]: | |
| """Fetch the pipeline_alias from description.json for a pipeline. | |
| Uses raw.githubusercontent.com to avoid API rate limits. | |
| """ | |
| url = f"{self.github_descriptions_base_url}/{pipeline_name}/description.json" | |
| try: | |
| response = requests.get(url, headers=self.headers) | |
| response.raise_for_status() | |
| description = response.json() | |
| return description.get("pipeline_alias") | |
| except Exception as e: | |
| print(f"Error fetching description for {pipeline_name}: {e}") | |
| return None | |
| def get_pipeline_data(self): | |
| """Fetch all pipeline data from GitHub.""" | |
| pipeline_folders = self.get_pipeline_folders_from_github() | |
| datasets_set = set() | |
| languages_set = set(["overall"]) | |
| for pipeline_name in pipeline_folders: | |
| # Get all dataset files for this pipeline | |
| dataset_files = self.get_dataset_files_from_github(pipeline_name) | |
| if not dataset_files: | |
| continue | |
| pipeline_data = {} | |
| for filename in dataset_files: | |
| results = self.fetch_json_from_github(pipeline_name, filename) | |
| if results: | |
| # Extract dataset name from filename (remove vidore_v3_ prefix and .json suffix) | |
| dataset_name = filename.replace("vidore_v3_", "").replace(".json", "") | |
| datasets_set.add(dataset_name) | |
| pipeline_data[dataset_name] = results | |
| # Collect available languages | |
| if "aggregated_metrics" in results and "by_language" in results["aggregated_metrics"]: | |
| languages_set.update(results["aggregated_metrics"]["by_language"].keys()) | |
| if pipeline_data: | |
| self.pipeline_infos[pipeline_name] = pipeline_data | |
| # Fetch the pipeline alias for display (uses raw URL, not API) | |
| alias = self.fetch_pipeline_alias(pipeline_name) | |
| if alias: | |
| self.pipeline_aliases[pipeline_name] = alias | |
| self.available_datasets = sorted(list(datasets_set)) | |
| self.available_languages = sorted(list(languages_set)) | |
| def calculate_cost_metric(self, pipeline_datasets: Dict) -> float: | |
| """ | |
| Calculate a compute cost metric based on retrieval time across all datasets. | |
| Returns cost in arbitrary units (could be refined based on actual compute costs). | |
| """ | |
| total_time_s = 0 | |
| for dataset_name, dataset_data in pipeline_datasets.items(): | |
| if "aggregated_metrics" not in dataset_data: | |
| continue | |
| timing = dataset_data["aggregated_metrics"].get("timing", {}) | |
| total_time_ms = timing.get("total_retrieval_time_milliseconds", 0) | |
| total_time_s += total_time_ms / 1000.0 | |
| # Simple cost model: assume $0.01 per second of compute (adjustable) | |
| cost = total_time_s * 0.01 | |
| return round(cost, 4) | |
| def extract_dataset_metrics( | |
| self, pipeline_datasets: Dict, metric: str = "ndcg_cut_5", language: str = "english" | |
| ) -> Dict[str, float]: | |
| """ | |
| Extract metrics for individual datasets from the aggregated results. | |
| Args: | |
| pipeline_datasets: Dictionary mapping dataset names to their data | |
| metric: The metric to extract (e.g., 'ndcg_at_5') | |
| language: The language to filter by ('english' for English-only results, or specific language) | |
| Returns: | |
| Dictionary mapping dataset names to metric values | |
| """ | |
| # Map metric names from UI format to API format | |
| metric_mapping = { | |
| "ndcg_at_1": "ndcg_cut_5", # Using cut_5 as approximation | |
| "ndcg_at_5": "ndcg_cut_5", | |
| "ndcg_at_10": "ndcg_cut_10", | |
| "ndcg_at_100": "ndcg_cut_100", | |
| "recall_at_1": "recall_5", | |
| "recall_at_5": "recall_5", | |
| "recall_at_10": "recall_10", | |
| "recall_at_100": "recall_100", | |
| } | |
| actual_metric = metric_mapping.get(metric, metric) | |
| dataset_metrics = {} | |
| for dataset_name, dataset_data in pipeline_datasets.items(): | |
| if "aggregated_metrics" not in dataset_data: | |
| continue | |
| aggregated = dataset_data["aggregated_metrics"] | |
| # Get metrics for the specified language | |
| if language == "overall": | |
| metrics_data = aggregated.get("overall", {}) | |
| else: | |
| metrics_data = aggregated.get("by_language", {}).get(language, {}) | |
| if metrics_data: | |
| # Format dataset name for display | |
| display_name = dataset_name.replace("_", " ").title() | |
| dataset_metrics[display_name] = metrics_data.get(actual_metric, 0.0) | |
| return dataset_metrics | |
| def render_df(self, metric: str = "ndcg_at_5", language: str = "overall") -> pd.DataFrame: | |
| """ | |
| Render a DataFrame with pipeline results. | |
| Args: | |
| metric: The metric to display (e.g., 'ndcg_at_5') | |
| language: The language to filter by ('overall' for all languages, or specific language) | |
| Returns: | |
| DataFrame with columns: Pipeline Name, Compute Cost, Timing metrics, Dataset metrics | |
| """ | |
| pipeline_res = {} | |
| for pipeline_name, pipeline_datasets in self.pipeline_infos.items(): | |
| row_data = {} | |
| # Aggregate time metrics across all datasets | |
| total_time_ms = 0 | |
| total_queries = 0 | |
| indexing_time_ms = 0 | |
| search_time_ms = 0 | |
| num_datasets = 0 | |
| for dataset_name, dataset_data in pipeline_datasets.items(): | |
| if "aggregated_metrics" in dataset_data: | |
| timing = dataset_data["aggregated_metrics"].get("timing", {}) | |
| total_time_ms += timing.get("total_retrieval_time_milliseconds", 0) | |
| total_queries += timing.get("num_queries", 0) | |
| indexing_time_ms += timing.get("indexing_throughput_ms_per_doc", 0) | |
| search_time_ms += timing.get("search_throughput_ms_per_query", 0) | |
| num_datasets += 1 | |
| if total_queries > 0: | |
| if total_time_ms > 0: | |
| # row_data["Total retrieval time (s)"] = round( | |
| # (total_time_ms / 1000), 0 | |
| # ) | |
| row_data["Indexing latency (s/doc)"] = ( | |
| (indexing_time_ms / 1000) / num_datasets if indexing_time_ms > 0 else None | |
| ) | |
| row_data["Search latency (s/query)"] = ( | |
| (search_time_ms / 1000) / num_datasets if search_time_ms > 0 else None | |
| ) | |
| else: | |
| # row_data["Total retrieval time (s)"] = 0 | |
| row_data["Indexing latency (s/doc)"] = 0 | |
| row_data["Search latency (s/query)"] = 0 | |
| else: | |
| # row_data["Total retrieval time (s)"] = -1 | |
| row_data["Indexing latency (s/doc)"] = -1 | |
| row_data["Search latency (s/query)"] = -1 | |
| # Add dataset metrics | |
| dataset_metrics = self.extract_dataset_metrics(pipeline_datasets, metric, language) | |
| row_data.update(dataset_metrics) | |
| # Calculate average across datasets if there are multiple | |
| if dataset_metrics: | |
| row_data["Average Score"] = round(sum(dataset_metrics.values()) / len(dataset_metrics), 4) | |
| # Use pipeline_alias for display if available, otherwise fallback to folder name | |
| display_name = self.pipeline_aliases.get(pipeline_name, pipeline_name) | |
| # Store folder name for link generation (will be used in utils.py) | |
| row_data["_folder_name"] = pipeline_name | |
| pipeline_res[display_name] = row_data | |
| if pipeline_res: | |
| df = pd.DataFrame(pipeline_res).T | |
| # Reorder columns to have Average right after timing metrics | |
| cols = list(df.columns) | |
| if "Average Score" in cols: | |
| cols.remove("Average Score") | |
| # Insert Average after Search latency (s/query) | |
| insert_pos = cols.index("Search latency (s/query)") + 1 if "Search latency (s/query)" in cols else 2 | |
| cols.insert(insert_pos, "Average Score") | |
| df = df[cols] | |
| return df | |
| return pd.DataFrame() | |