| |
| """ |
| PyArrow Dataset Generator for ML Inference Service |
| |
| Generates test datasets for academic challenges and model validation. |
| Creates 100 PyArrow datasets with various image types and test scenarios. |
| """ |
|
|
| import base64 |
| import json |
| import random |
| from pathlib import Path |
| from typing import Dict, List, Any, Tuple |
| import io |
|
|
| import numpy as np |
| import pyarrow as pa |
| import pyarrow.parquet as pq |
| from PIL import Image, ImageDraw, ImageFont |
|
|
|
|
| class TestDatasetGenerator: |
| def __init__(self, output_dir: str = "test_datasets"): |
| self.output_dir = Path(output_dir) |
| self.output_dir.mkdir(exist_ok=True) |
|
|
| |
| self.imagenet_labels = [ |
| "tench", "goldfish", "great_white_shark", "tiger_shark", "hammerhead", |
| "electric_ray", "stingray", "cock", "hen", "ostrich", "brambling", |
| "goldfinch", "house_finch", "junco", "indigo_bunting", "robin", |
| "bulbul", "jay", "magpie", "chickadee", "water_ouzel", "kite", |
| "bald_eagle", "vulture", "great_grey_owl", "European_fire_salamander", |
| "common_newt", "eft", "spotted_salamander", "axolotl", "bullfrog", |
| "tree_frog", "tailed_frog", "loggerhead", "leatherback_turtle", |
| "mud_turtle", "terrapin", "box_turtle", "banded_gecko", "common_iguana", |
| "American_chameleon", "whiptail", "agama", "frilled_lizard", "alligator_lizard", |
| "Gila_monster", "green_lizard", "African_chameleon", "Komodo_dragon", |
| "African_crocodile", "American_alligator", "triceratops", "thunder_snake" |
| ] |
|
|
| def create_synthetic_image(self, width: int = 224, height: int = 224, |
| image_type: str = "random") -> Image.Image: |
| """Create synthetic images for testing.""" |
| if image_type == "random": |
| |
| array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8) |
| return Image.fromarray(array) |
|
|
| elif image_type == "geometric": |
| |
| img = Image.new('RGB', (width, height), color='white') |
| draw = ImageDraw.Draw(img) |
|
|
| |
| for _ in range(random.randint(3, 8)): |
| color = tuple(random.randint(0, 255) for _ in range(3)) |
| shape_type = random.choice(['rectangle', 'ellipse']) |
| x1, y1 = random.randint(0, width//2), random.randint(0, height//2) |
| x2, y2 = x1 + random.randint(20, width//2), y1 + random.randint(20, height//2) |
|
|
| if shape_type == 'rectangle': |
| draw.rectangle([x1, y1, x2, y2], fill=color) |
| else: |
| draw.ellipse([x1, y1, x2, y2], fill=color) |
|
|
| return img |
|
|
| elif image_type == "gradient": |
| array = np.zeros((height, width, 3), dtype=np.uint8) |
| for i in range(height): |
| for j in range(width): |
| array[i, j] = [i * 255 // height, j * 255 // width, (i + j) * 255 // (height + width)] |
| return Image.fromarray(array) |
|
|
| elif image_type == "text": |
| img = Image.new('RGB', (width, height), color='white') |
| draw = ImageDraw.Draw(img) |
|
|
| try: |
| font = ImageFont.load_default() |
| except: |
| font = None |
|
|
| text = f"Test Image {random.randint(1, 1000)}" |
| draw.text((width//4, height//2), text, fill='black', font=font) |
| return img |
|
|
| else: |
| color = tuple(random.randint(0, 255) for _ in range(3)) |
| return Image.new('RGB', (width, height), color=color) |
|
|
| def image_to_base64(self, image: Image.Image, format: str = "JPEG") -> str: |
| """Convert PIL image to base64 string.""" |
| buffer = io.BytesIO() |
| image.save(buffer, format=format) |
| image_bytes = buffer.getvalue() |
| return base64.b64encode(image_bytes).decode('utf-8') |
|
|
| def create_api_request(self, image_b64: str, media_type: str = "image/jpeg") -> Dict[str, Any]: |
| """Create API request structure matching your service.""" |
| return { |
| "image": { |
| "mediaType": media_type, |
| "data": image_b64 |
| } |
| } |
|
|
| def create_expected_response(self, model_name: str = "microsoft/resnet-18", |
| media_type: str = "image/jpeg") -> Dict[str, Any]: |
| """Create expected response structure.""" |
| prediction = random.choice(self.imagenet_labels) |
| return { |
| "prediction": prediction, |
| "confidence": round(random.uniform(0.3, 0.99), 4), |
| "predicted_label": random.randint(0, len(self.imagenet_labels) - 1), |
| "model": model_name, |
| "mediaType": media_type |
| } |
|
|
| def generate_standard_datasets(self, count: int = 25) -> List[Dict[str, Any]]: |
| """Generate standard test cases with normal images.""" |
| datasets = [] |
|
|
| for i in range(count): |
| image_types = ["random", "geometric", "gradient", "text", "solid"] |
| sizes = [(224, 224), (256, 256), (299, 299), (384, 384)] |
| formats = [("JPEG", "image/jpeg"), ("PNG", "image/png")] |
|
|
| records = [] |
| for j in range(random.randint(5, 20)): |
| img_type = random.choice(image_types) |
| size = random.choice(sizes) |
| format_info = random.choice(formats) |
|
|
| image = self.create_synthetic_image(size[0], size[1], img_type) |
| image_b64 = self.image_to_base64(image, format_info[0]) |
|
|
| api_request = self.create_api_request(image_b64, format_info[1]) |
| expected_response = self.create_expected_response() |
|
|
| record = { |
| "dataset_id": f"standard_{i:03d}", |
| "image_id": f"img_{j:03d}", |
| "image_type": img_type, |
| "image_size": f"{size[0]}x{size[1]}", |
| "format": format_info[0], |
| "media_type": format_info[1], |
| "api_request": json.dumps(api_request), |
| "expected_response": json.dumps(expected_response), |
| "test_category": "standard", |
| "difficulty": "normal" |
| } |
| records.append(record) |
|
|
| datasets.append({ |
| "name": f"standard_test_{i:03d}", |
| "category": "standard", |
| "description": f"Standard test dataset {i+1} with {len(records)} images", |
| "records": records |
| }) |
|
|
| return datasets |
|
|
| def generate_edge_case_datasets(self, count: int = 25) -> List[Dict[str, Any]]: |
| """Generate datasets for edge case scenarios.""" |
| datasets = [] |
|
|
| for i in range(count): |
| records = [] |
| edge_cases = [ |
| {"type": "tiny", "size": (32, 32), "difficulty": "high"}, |
| {"type": "huge", "size": (2048, 2048), "difficulty": "high"}, |
| {"type": "extreme_aspect", "size": (1000, 50), "difficulty": "medium"}, |
| {"type": "single_pixel", "size": (1, 1), "difficulty": "extreme"}, |
| {"type": "corrupted_base64", "size": (224, 224), "difficulty": "extreme"} |
| ] |
|
|
| for j, edge_case in enumerate(edge_cases): |
| if edge_case["type"] == "corrupted_base64": |
| image = self.create_synthetic_image(224, 224, "random") |
| image_b64 = self.image_to_base64(image, "JPEG") |
| corrupted_b64 = image_b64[:-20] + "CORRUPTED_DATA" |
| api_request = self.create_api_request(corrupted_b64) |
| expected_response = { |
| "error": "Invalid image data", |
| "status": "failed" |
| } |
| else: |
| image = self.create_synthetic_image( |
| edge_case["size"][0], edge_case["size"][1], "random" |
| ) |
| image_b64 = self.image_to_base64(image, "PNG") |
| api_request = self.create_api_request(image_b64, "image/png") |
| expected_response = self.create_expected_response() |
|
|
| record = { |
| "dataset_id": f"edge_{i:03d}", |
| "image_id": f"edge_{j:03d}", |
| "image_type": edge_case["type"], |
| "image_size": f"{edge_case['size'][0]}x{edge_case['size'][1]}", |
| "format": "PNG", |
| "media_type": "image/png", |
| "api_request": json.dumps(api_request), |
| "expected_response": json.dumps(expected_response), |
| "test_category": "edge_case", |
| "difficulty": edge_case["difficulty"] |
| } |
| records.append(record) |
|
|
| datasets.append({ |
| "name": f"edge_case_{i:03d}", |
| "category": "edge_case", |
| "description": f"Edge case dataset {i+1} with challenging scenarios", |
| "records": records |
| }) |
|
|
| return datasets |
|
|
| def generate_performance_datasets(self, count: int = 25) -> List[Dict[str, Any]]: |
| """Generate performance benchmark datasets.""" |
| datasets = [] |
|
|
| for i in range(count): |
| batch_sizes = [1, 5, 10, 25, 50, 100] |
| batch_size = random.choice(batch_sizes) |
|
|
| records = [] |
| for j in range(batch_size): |
| image = self.create_synthetic_image(224, 224, "random") |
| image_b64 = self.image_to_base64(image, "JPEG") |
| api_request = self.create_api_request(image_b64) |
| expected_response = self.create_expected_response() |
|
|
| record = { |
| "dataset_id": f"perf_{i:03d}", |
| "image_id": f"batch_{j:03d}", |
| "image_type": "performance_test", |
| "image_size": "224x224", |
| "format": "JPEG", |
| "media_type": "image/jpeg", |
| "api_request": json.dumps(api_request), |
| "expected_response": json.dumps(expected_response), |
| "test_category": "performance", |
| "difficulty": "normal", |
| "batch_size": batch_size, |
| "expected_max_latency_ms": batch_size * 100 |
| } |
| records.append(record) |
|
|
| datasets.append({ |
| "name": f"performance_test_{i:03d}", |
| "category": "performance", |
| "description": f"Performance dataset {i+1} with batch size {batch_size}", |
| "records": records |
| }) |
|
|
| return datasets |
|
|
| def generate_model_comparison_datasets(self, count: int = 25) -> List[Dict[str, Any]]: |
| """Generate datasets for comparing different models.""" |
| datasets = [] |
|
|
| model_types = [ |
| "microsoft/resnet-18", "microsoft/resnet-50", "google/vit-base-patch16-224", |
| "facebook/convnext-tiny-224", "microsoft/swin-tiny-patch4-window7-224" |
| ] |
|
|
| for i in range(count): |
| |
| base_images = [] |
| for _ in range(10): |
| image = self.create_synthetic_image(224, 224, "geometric") |
| base_images.append(self.image_to_base64(image, "JPEG")) |
|
|
| records = [] |
| for j, model in enumerate(model_types): |
| for k, image_b64 in enumerate(base_images): |
| api_request = self.create_api_request(image_b64) |
| expected_response = self.create_expected_response(model) |
|
|
| record = { |
| "dataset_id": f"comparison_{i:03d}", |
| "image_id": f"img_{k:03d}_model_{j}", |
| "image_type": "comparison_base", |
| "image_size": "224x224", |
| "format": "JPEG", |
| "media_type": "image/jpeg", |
| "api_request": json.dumps(api_request), |
| "expected_response": json.dumps(expected_response), |
| "test_category": "model_comparison", |
| "difficulty": "normal", |
| "model_type": model, |
| "comparison_group": k |
| } |
| records.append(record) |
|
|
| datasets.append({ |
| "name": f"model_comparison_{i:03d}", |
| "category": "model_comparison", |
| "description": f"Model comparison dataset {i+1} testing {len(model_types)} models", |
| "records": records |
| }) |
|
|
| return datasets |
|
|
| def save_dataset_to_parquet(self, dataset: Dict[str, Any]): |
| """Save a dataset to PyArrow Parquet format.""" |
| records = dataset["records"] |
|
|
| |
| table = pa.table({ |
| "dataset_id": [r["dataset_id"] for r in records], |
| "image_id": [r["image_id"] for r in records], |
| "image_type": [r["image_type"] for r in records], |
| "image_size": [r["image_size"] for r in records], |
| "format": [r["format"] for r in records], |
| "media_type": [r["media_type"] for r in records], |
| "api_request": [r["api_request"] for r in records], |
| "expected_response": [r["expected_response"] for r in records], |
| "test_category": [r["test_category"] for r in records], |
| "difficulty": [r["difficulty"] for r in records], |
| |
| "batch_size": [r.get("batch_size", 1) for r in records], |
| "expected_max_latency_ms": [r.get("expected_max_latency_ms", 1000) for r in records], |
| "model_type": [r.get("model_type", "microsoft/resnet-18") for r in records], |
| "comparison_group": [r.get("comparison_group", 0) for r in records] |
| }) |
|
|
| output_path = self.output_dir / f"{dataset['name']}.parquet" |
| pq.write_table(table, output_path) |
|
|
| |
| metadata = { |
| "name": dataset["name"], |
| "category": dataset["category"], |
| "description": dataset["description"], |
| "record_count": len(records), |
| "file_size_mb": round(output_path.stat().st_size / (1024 * 1024), 2), |
| "schema": [field.name for field in table.schema] |
| } |
|
|
| metadata_path = self.output_dir / f"{dataset['name']}_metadata.json" |
| with open(metadata_path, 'w') as f: |
| json.dump(metadata, f, indent=2) |
|
|
| def generate_all_datasets(self): |
| """Generate all 100 datasets.""" |
| print(" Starting dataset generation...") |
|
|
| print("📊 Generating standard test datasets (25)...") |
| standard_datasets = self.generate_standard_datasets(25) |
| for dataset in standard_datasets: |
| self.save_dataset_to_parquet(dataset) |
|
|
| print("⚡ Generating edge case datasets (25)...") |
| edge_datasets = self.generate_edge_case_datasets(25) |
| for dataset in edge_datasets: |
| self.save_dataset_to_parquet(dataset) |
|
|
| print("🏁 Generating performance datasets (25)...") |
| performance_datasets = self.generate_performance_datasets(25) |
| for dataset in performance_datasets: |
| self.save_dataset_to_parquet(dataset) |
|
|
| print("🔄 Generating model comparison datasets (25)...") |
| comparison_datasets = self.generate_model_comparison_datasets(25) |
| for dataset in comparison_datasets: |
| self.save_dataset_to_parquet(dataset) |
|
|
| print(f"✅ Generated 100 datasets in {self.output_dir}/") |
|
|
| self.generate_summary() |
|
|
| def generate_summary(self): |
| """Generate a summary of all datasets.""" |
| summary = { |
| "total_datasets": 100, |
| "categories": { |
| "standard": 25, |
| "edge_case": 25, |
| "performance": 25, |
| "model_comparison": 25 |
| }, |
| "dataset_info": [], |
| "usage_instructions": { |
| "loading": "Use pyarrow.parquet.read_table('dataset.parquet')", |
| "testing": "Run python scripts/test_datasets.py", |
| "api_endpoint": "POST /predict/resnet", |
| "request_format": "See api_request column in datasets" |
| } |
| } |
|
|
| |
| for parquet_file in self.output_dir.glob("*.parquet"): |
| metadata_file = self.output_dir / f"{parquet_file.stem}_metadata.json" |
| if metadata_file.exists(): |
| with open(metadata_file, 'r') as f: |
| metadata = json.load(f) |
| summary["dataset_info"].append(metadata) |
|
|
| summary_path = self.output_dir / "datasets_summary.json" |
| with open(summary_path, 'w') as f: |
| json.dump(summary, f, indent=2) |
|
|
| print(f"📋 Summary saved to {summary_path}") |
|
|
|
|
| if __name__ == "__main__": |
| generator = TestDatasetGenerator() |
| generator.generate_all_datasets() |