| | |
| | """ |
| | Dataset Tester for ML Inference Service |
| | |
| | Tests the generated PyArrow datasets against the running ML inference service. |
| | Validates API requests/responses and measures performance metrics. |
| | """ |
| |
|
| | import json |
| | import time |
| | import asyncio |
| | import statistics |
| | from pathlib import Path |
| | from typing import Dict, List, Any, Optional |
| | import argparse |
| |
|
| | import pyarrow.parquet as pq |
| | import requests |
| | import pandas as pd |
| |
|
| |
|
| | class DatasetTester: |
| | def __init__(self, base_url: str = "http://127.0.0.1:8000", datasets_dir: str = "test_datasets"): |
| | self.base_url = base_url.rstrip('/') |
| | self.datasets_dir = Path(datasets_dir) |
| | self.endpoint = f"{self.base_url}/predict" |
| | self.results = [] |
| |
|
| | def load_dataset(self, dataset_path: Path) -> pd.DataFrame: |
| | """Load a PyArrow dataset.""" |
| | table = pq.read_table(dataset_path) |
| | return table.to_pandas() |
| |
|
| | def test_api_connection(self) -> bool: |
| | """Test if the API is running and accessible.""" |
| | try: |
| | response = requests.get(f"{self.base_url}/docs", timeout=5) |
| | return response.status_code == 200 |
| | except requests.RequestException: |
| | return False |
| |
|
| | def send_prediction_request(self, api_request_json: str) -> Dict[str, Any]: |
| | """Send a single prediction request to the API.""" |
| | try: |
| | request_data = json.loads(api_request_json) |
| | start_time = time.time() |
| |
|
| | response = requests.post( |
| | self.endpoint, |
| | json=request_data, |
| | headers={"Content-Type": "application/json"}, |
| | timeout=30 |
| | ) |
| |
|
| | end_time = time.time() |
| | latency_ms = (end_time - start_time) * 1000 |
| |
|
| | return { |
| | "success": response.status_code == 200, |
| | "status_code": response.status_code, |
| | "response": response.json() if response.status_code == 200 else response.text, |
| | "latency_ms": round(latency_ms, 2), |
| | "error": None |
| | } |
| |
|
| | except requests.RequestException as e: |
| | return { |
| | "success": False, |
| | "status_code": None, |
| | "response": None, |
| | "latency_ms": None, |
| | "error": str(e) |
| | } |
| | except json.JSONDecodeError as e: |
| | return { |
| | "success": False, |
| | "status_code": None, |
| | "response": None, |
| | "latency_ms": None, |
| | "error": f"JSON decode error: {str(e)}" |
| | } |
| |
|
| | def validate_response(self, actual_response: Dict[str, Any], |
| | expected_response_json: str) -> Dict[str, Any]: |
| | """Validate API response against expected response.""" |
| | try: |
| | expected = json.loads(expected_response_json) |
| |
|
| | validation = { |
| | "structure_valid": True, |
| | "field_errors": [] |
| | } |
| |
|
| | |
| | required_fields = ["prediction", "confidence", "predicted_label", "model", "mediaType"] |
| | for field in required_fields: |
| | if field not in actual_response: |
| | validation["structure_valid"] = False |
| | validation["field_errors"].append(f"Missing field: {field}") |
| |
|
| | |
| | if "confidence" in actual_response: |
| | if not isinstance(actual_response["confidence"], (int, float)): |
| | validation["field_errors"].append("confidence must be numeric") |
| | elif not (0 <= actual_response["confidence"] <= 1): |
| | validation["field_errors"].append("confidence must be between 0 and 1") |
| |
|
| | if "predicted_label" in actual_response: |
| | if not isinstance(actual_response["predicted_label"], int): |
| | validation["field_errors"].append("predicted_label must be integer") |
| |
|
| | return validation |
| |
|
| | except json.JSONDecodeError: |
| | return { |
| | "structure_valid": False, |
| | "field_errors": ["Invalid expected response JSON"] |
| | } |
| |
|
| | def test_dataset(self, dataset_path: Path, max_samples: Optional[int] = None) -> Dict[str, Any]: |
| | """Test a single dataset.""" |
| | print(f"📊 Testing dataset: {dataset_path.name}") |
| |
|
| | try: |
| | df = self.load_dataset(dataset_path) |
| | if max_samples: |
| | df = df.head(max_samples) |
| |
|
| | results = { |
| | "dataset_name": dataset_path.stem, |
| | "total_samples": len(df), |
| | "tested_samples": 0, |
| | "successful_requests": 0, |
| | "failed_requests": 0, |
| | "validation_errors": 0, |
| | "latencies_ms": [], |
| | "errors": [], |
| | "category": df['test_category'].iloc[0] if not df.empty else "unknown" |
| | } |
| |
|
| | for idx, row in df.iterrows(): |
| | print(f" Testing sample {idx + 1}/{len(df)}", end="\r") |
| |
|
| | |
| | api_result = self.send_prediction_request(row['api_request']) |
| | results["tested_samples"] += 1 |
| |
|
| | if api_result["success"]: |
| | results["successful_requests"] += 1 |
| | results["latencies_ms"].append(api_result["latency_ms"]) |
| |
|
| | |
| | validation = self.validate_response( |
| | api_result["response"], |
| | row['expected_response'] |
| | ) |
| |
|
| | if not validation["structure_valid"]: |
| | results["validation_errors"] += 1 |
| | results["errors"].append({ |
| | "sample_id": row['image_id'], |
| | "type": "validation_error", |
| | "details": validation["field_errors"] |
| | }) |
| |
|
| | else: |
| | results["failed_requests"] += 1 |
| | results["errors"].append({ |
| | "sample_id": row['image_id'], |
| | "type": "request_failed", |
| | "status_code": api_result["status_code"], |
| | "error": api_result["error"] |
| | }) |
| |
|
| | |
| | if results["latencies_ms"]: |
| | results["avg_latency_ms"] = round(statistics.mean(results["latencies_ms"]), 2) |
| | results["min_latency_ms"] = round(min(results["latencies_ms"]), 2) |
| | results["max_latency_ms"] = round(max(results["latencies_ms"]), 2) |
| | results["median_latency_ms"] = round(statistics.median(results["latencies_ms"]), 2) |
| | else: |
| | results.update({ |
| | "avg_latency_ms": None, |
| | "min_latency_ms": None, |
| | "max_latency_ms": None, |
| | "median_latency_ms": None |
| | }) |
| |
|
| | results["success_rate"] = round( |
| | results["successful_requests"] / results["tested_samples"] * 100, 2 |
| | ) if results["tested_samples"] > 0 else 0 |
| |
|
| | print(f"\n ✅ Completed: {results['success_rate']}% success rate") |
| | return results |
| |
|
| | except Exception as e: |
| | print(f"\n ❌ Failed to test dataset: {str(e)}") |
| | return { |
| | "dataset_name": dataset_path.stem, |
| | "error": str(e), |
| | "success_rate": 0 |
| | } |
| |
|
| | def test_all_datasets(self, max_samples_per_dataset: Optional[int] = None, |
| | category_filter: Optional[str] = None) -> Dict[str, Any]: |
| | """Test all datasets or filtered by category.""" |
| | if not self.test_api_connection(): |
| | print("❌ API is not accessible. Please start the service first:") |
| | print(" uvicorn main:app --reload") |
| | return {"error": "API not accessible"} |
| |
|
| | print(f" Starting dataset testing against {self.endpoint}") |
| |
|
| | parquet_files = list(self.datasets_dir.glob("*.parquet")) |
| | if not parquet_files: |
| | print(f"❌ No datasets found in {self.datasets_dir}") |
| | return {"error": "No datasets found"} |
| |
|
| | if category_filter: |
| | parquet_files = [f for f in parquet_files if category_filter in f.name] |
| |
|
| | print(f" Found {len(parquet_files)} datasets to test") |
| |
|
| | all_results = [] |
| | start_time = time.time() |
| |
|
| | for dataset_file in parquet_files: |
| | result = self.test_dataset(dataset_file, max_samples_per_dataset) |
| | all_results.append(result) |
| |
|
| | end_time = time.time() |
| | total_time = end_time - start_time |
| |
|
| | summary = self.generate_summary(all_results, total_time) |
| |
|
| | self.save_results(summary, all_results) |
| |
|
| | return summary |
| |
|
| | def generate_summary(self, results: List[Dict[str, Any]], total_time: float) -> Dict[str, Any]: |
| | """Generate summary of all test results.""" |
| | successful_datasets = [r for r in results if r.get("success_rate", 0) > 0] |
| | failed_datasets = [r for r in results if r.get("error") or r.get("success_rate", 0) == 0] |
| |
|
| | total_samples = sum(r.get("tested_samples", 0) for r in results) |
| | total_successful = sum(r.get("successful_requests", 0) for r in results) |
| | total_failed = sum(r.get("failed_requests", 0) for r in results) |
| |
|
| | all_latencies = [] |
| | for r in results: |
| | all_latencies.extend(r.get("latencies_ms", [])) |
| |
|
| | summary = { |
| | "test_summary": { |
| | "total_datasets": len(results), |
| | "successful_datasets": len(successful_datasets), |
| | "failed_datasets": len(failed_datasets), |
| | "total_samples_tested": total_samples, |
| | "total_successful_requests": total_successful, |
| | "total_failed_requests": total_failed, |
| | "overall_success_rate": round( |
| | total_successful / total_samples * 100, 2 |
| | ) if total_samples > 0 else 0, |
| | "total_test_time_seconds": round(total_time, 2) |
| | }, |
| | "performance_metrics": { |
| | "avg_latency_ms": round(statistics.mean(all_latencies), 2) if all_latencies else None, |
| | "median_latency_ms": round(statistics.median(all_latencies), 2) if all_latencies else None, |
| | "min_latency_ms": round(min(all_latencies), 2) if all_latencies else None, |
| | "max_latency_ms": round(max(all_latencies), 2) if all_latencies else None, |
| | "requests_per_second": round( |
| | total_successful / total_time, 2 |
| | ) if total_time > 0 else 0 |
| | }, |
| | "category_breakdown": {}, |
| | "failed_datasets": [r["dataset_name"] for r in failed_datasets] |
| | } |
| |
|
| | categories = {} |
| | for result in results: |
| | category = result.get("category", "unknown") |
| | if category not in categories: |
| | categories[category] = { |
| | "count": 0, |
| | "success_rates": [], |
| | "avg_success_rate": 0 |
| | } |
| | categories[category]["count"] += 1 |
| | categories[category]["success_rates"].append(result.get("success_rate", 0)) |
| |
|
| | for category, data in categories.items(): |
| | data["avg_success_rate"] = round( |
| | statistics.mean(data["success_rates"]), 2 |
| | ) if data["success_rates"] else 0 |
| |
|
| | summary["category_breakdown"] = categories |
| |
|
| | return summary |
| |
|
| | def save_results(self, summary: Dict[str, Any], detailed_results: List[Dict[str, Any]]): |
| | """Save test results to files.""" |
| | results_dir = Path("test_results") |
| | results_dir.mkdir(exist_ok=True) |
| |
|
| | timestamp = int(time.time()) |
| |
|
| | |
| | summary_path = results_dir / f"test_summary_{timestamp}.json" |
| | with open(summary_path, 'w') as f: |
| | json.dump(summary, f, indent=2) |
| |
|
| | |
| | detailed_path = results_dir / f"test_detailed_{timestamp}.json" |
| | with open(detailed_path, 'w') as f: |
| | json.dump(detailed_results, f, indent=2) |
| |
|
| | print(f" Results saved:") |
| | print(f" Summary: {summary_path}") |
| | print(f" Details: {detailed_path}") |
| |
|
| | def print_summary(self, summary: Dict[str, Any]): |
| | """Print test summary to console.""" |
| | print("\n" + "="*60) |
| | print("🏁 DATASET TESTING SUMMARY") |
| | print("="*60) |
| |
|
| | ts = summary["test_summary"] |
| | print(f"Datasets tested: {ts['total_datasets']}") |
| | print(f"Successful datasets: {ts['successful_datasets']}") |
| | print(f"Failed datasets: {ts['failed_datasets']}") |
| | print(f"Total samples: {ts['total_samples_tested']}") |
| | print(f"Overall success rate: {ts['overall_success_rate']}%") |
| | print(f"Test duration: {ts['total_test_time_seconds']}s") |
| |
|
| | pm = summary["performance_metrics"] |
| | if pm["avg_latency_ms"]: |
| | print(f"\nPerformance:") |
| | print(f" Avg latency: {pm['avg_latency_ms']}ms") |
| | print(f" Median latency: {pm['median_latency_ms']}ms") |
| | print(f" Min latency: {pm['min_latency_ms']}ms") |
| | print(f" Max latency: {pm['max_latency_ms']}ms") |
| | print(f" Requests/sec: {pm['requests_per_second']}") |
| |
|
| | print(f"\nCategory breakdown:") |
| | for category, data in summary["category_breakdown"].items(): |
| | print(f" {category}: {data['count']} datasets, {data['avg_success_rate']}% avg success") |
| |
|
| | if summary["failed_datasets"]: |
| | print(f"\nFailed datasets: {', '.join(summary['failed_datasets'])}") |
| |
|
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser(description="Test PyArrow datasets against ML inference service") |
| | parser.add_argument("--base-url", default="http://127.0.0.1:8000", help="Base URL of the API") |
| | parser.add_argument("--datasets-dir", default="test_datasets", help="Directory containing datasets") |
| | parser.add_argument("--max-samples", type=int, help="Max samples per dataset to test") |
| | parser.add_argument("--category", help="Filter datasets by category (standard, edge_case, performance, model_comparison)") |
| | parser.add_argument("--quick", action="store_true", help="Quick test with max 5 samples per dataset") |
| |
|
| | args = parser.parse_args() |
| |
|
| | tester = DatasetTester(args.base_url, args.datasets_dir) |
| |
|
| | max_samples = args.max_samples |
| | if args.quick: |
| | max_samples = 5 |
| |
|
| | results = tester.test_all_datasets(max_samples, args.category) |
| |
|
| | if "error" not in results: |
| | tester.print_summary(results) |
| |
|
| | if results["test_summary"]["overall_success_rate"] > 90: |
| | print("\n🎉 Excellent! API is working great with the datasets!") |
| | elif results["test_summary"]["overall_success_rate"] > 70: |
| | print("\n👍 Good! API works well, minor issues detected.") |
| | else: |
| | print("\n⚠️ Warning: Several issues detected. Check the detailed results.") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |