| """ |
| Batch processing module for BackgroundFX Pro. |
| Handles efficient processing of multiple files with optimized resource management. |
| """ |
|
|
| import os |
| import cv2 |
| import numpy as np |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple, Union, Callable, Any, Generator |
| from dataclasses import dataclass, field |
| from enum import Enum |
| import time |
| import threading |
| from queue import Queue, PriorityQueue, Empty |
| from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed |
| import multiprocessing as mp |
| import json |
| import hashlib |
| import pickle |
| import shutil |
| import tempfile |
| from datetime import datetime |
| import psutil |
| import mimetypes |
|
|
| from ..utils.logger import setup_logger |
| from ..utils.device import DeviceManager |
| from ..utils import TimeEstimator, MemoryMonitor |
| from .pipeline import ProcessingPipeline, PipelineConfig, PipelineResult, ProcessingMode |
| from .video_processor import VideoProcessorAPI, VideoStats |
|
|
| logger = setup_logger(__name__) |
|
|
|
|
| class BatchPriority(Enum): |
| """Batch processing priority levels.""" |
| LOW = 3 |
| NORMAL = 2 |
| HIGH = 1 |
| URGENT = 0 |
|
|
|
|
| class FileType(Enum): |
| """Supported file types.""" |
| IMAGE = "image" |
| VIDEO = "video" |
| UNKNOWN = "unknown" |
|
|
|
|
| @dataclass |
| class BatchItem: |
| """Individual item in batch processing.""" |
| id: str |
| input_path: str |
| output_path: str |
| file_type: FileType |
| priority: BatchPriority = BatchPriority.NORMAL |
| background: Optional[Union[str, np.ndarray]] = None |
| config_overrides: Dict[str, Any] = field(default_factory=dict) |
| metadata: Dict[str, Any] = field(default_factory=dict) |
| retry_count: int = 0 |
| max_retries: int = 3 |
| status: str = "pending" |
| error: Optional[str] = None |
| result: Optional[Any] = None |
| processing_time: float = 0.0 |
| |
| def __lt__(self, other): |
| """Compare items by priority for PriorityQueue.""" |
| return self.priority.value < other.priority.value |
|
|
|
|
| @dataclass |
| class BatchConfig: |
| """Configuration for batch processing.""" |
| |
| max_workers: int = mp.cpu_count() |
| use_multiprocessing: bool = False |
| chunk_size: int = 10 |
| |
| |
| max_memory_gb: float = 8.0 |
| max_gpu_memory_gb: float = 4.0 |
| cpu_limit_percent: float = 80.0 |
| |
| |
| input_dir: Optional[str] = None |
| output_dir: Optional[str] = None |
| recursive: bool = True |
| file_patterns: List[str] = field(default_factory=lambda: ["*.jpg", "*.png", "*.mp4", "*.avi"]) |
| preserve_structure: bool = True |
| |
| |
| default_background: Optional[Union[str, np.ndarray]] = None |
| background_per_file: Dict[str, Union[str, np.ndarray]] = field(default_factory=dict) |
| |
| |
| image_quality: int = 95 |
| video_quality: str = "high" |
| maintain_resolution: bool = True |
| |
| |
| enable_caching: bool = True |
| cache_dir: Optional[str] = None |
| deduplicate: bool = True |
| |
| |
| progress_callback: Optional[Callable[[float, Dict], None]] = None |
| save_report: bool = True |
| report_path: Optional[str] = None |
| |
| |
| stop_on_error: bool = False |
| skip_existing: bool = True |
| |
| |
| pipeline_config: Optional[PipelineConfig] = None |
|
|
|
|
| @dataclass |
| class BatchReport: |
| """Batch processing report.""" |
| start_time: datetime |
| end_time: Optional[datetime] = None |
| total_items: int = 0 |
| processed_items: int = 0 |
| successful_items: int = 0 |
| failed_items: int = 0 |
| skipped_items: int = 0 |
| total_processing_time: float = 0.0 |
| avg_processing_time: float = 0.0 |
| total_input_size_mb: float = 0.0 |
| total_output_size_mb: float = 0.0 |
| compression_ratio: float = 1.0 |
| errors: List[Dict[str, Any]] = field(default_factory=list) |
| warnings: List[str] = field(default_factory=list) |
| resource_usage: Dict[str, Any] = field(default_factory=dict) |
| quality_metrics: Dict[str, float] = field(default_factory=dict) |
|
|
|
|
| class BatchProcessor: |
| """High-performance batch processing engine.""" |
| |
| def __init__(self, config: Optional[BatchConfig] = None): |
| """ |
| Initialize batch processor. |
| |
| Args: |
| config: Batch processing configuration |
| """ |
| self.config = config or BatchConfig() |
| self.logger = setup_logger(f"{__name__}.BatchProcessor") |
| |
| |
| self.device_manager = DeviceManager() |
| self.memory_monitor = MemoryMonitor() |
| self.time_estimator = TimeEstimator() |
| |
| |
| self.pipeline = ProcessingPipeline(self.config.pipeline_config) |
| self.video_processor = VideoProcessorAPI() |
| |
| |
| self.is_processing = False |
| self.should_stop = False |
| self.current_item = None |
| |
| |
| self.pending_queue = PriorityQueue() |
| self.processing_queue = Queue() |
| self.completed_queue = Queue() |
| |
| |
| if self.config.use_multiprocessing: |
| self.executor = ProcessPoolExecutor(max_workers=self.config.max_workers) |
| else: |
| self.executor = ThreadPoolExecutor(max_workers=self.config.max_workers) |
| |
| |
| self.cache_dir = Path(self.config.cache_dir or tempfile.mkdtemp(prefix="bgfx_cache_")) |
| self.cache_index = {} |
| |
| |
| self.report = BatchReport(start_time=datetime.now()) |
| |
| self.logger.info(f"BatchProcessor initialized with {self.config.max_workers} workers") |
| |
| def process_directory(self, |
| input_dir: str, |
| output_dir: str, |
| background: Optional[Union[str, np.ndarray]] = None) -> BatchReport: |
| """ |
| Process all supported files in a directory. |
| |
| Args: |
| input_dir: Input directory path |
| output_dir: Output directory path |
| background: Default background for all files |
| |
| Returns: |
| Batch processing report |
| """ |
| input_path = Path(input_dir) |
| output_path = Path(output_dir) |
| |
| if not input_path.exists(): |
| raise ValueError(f"Input directory does not exist: {input_dir}") |
| |
| output_path.mkdir(parents=True, exist_ok=True) |
| |
| |
| items = self._collect_files(input_path, output_path, background) |
| |
| if not items: |
| self.logger.warning("No files found to process") |
| return self.report |
| |
| self.logger.info(f"Found {len(items)} files to process") |
| |
| |
| return self.process_batch(items) |
| |
| def _collect_files(self, |
| input_path: Path, |
| output_path: Path, |
| background: Optional[Union[str, np.ndarray]]) -> List[BatchItem]: |
| """Collect all files to process from directory.""" |
| items = [] |
| |
| |
| if self.config.recursive: |
| file_iterator = input_path.rglob |
| else: |
| file_iterator = input_path.glob |
| |
| |
| for pattern in self.config.file_patterns: |
| for file_path in file_iterator(pattern): |
| if file_path.is_file(): |
| |
| if self.config.preserve_structure: |
| relative_path = file_path.relative_to(input_path) |
| output_file = output_path / relative_path.parent / f"{file_path.stem}_processed{file_path.suffix}" |
| else: |
| output_file = output_path / f"{file_path.stem}_processed{file_path.suffix}" |
| |
| |
| if self.config.skip_existing and output_file.exists(): |
| self.report.skipped_items += 1 |
| continue |
| |
| |
| file_type = self._detect_file_type(str(file_path)) |
| |
| |
| item = BatchItem( |
| id=self._generate_item_id(file_path), |
| input_path=str(file_path), |
| output_path=str(output_file), |
| file_type=file_type, |
| background=self.config.background_per_file.get( |
| str(file_path), |
| background or self.config.default_background |
| ) |
| ) |
| |
| items.append(item) |
| |
| return items |
| |
| def process_batch(self, items: List[BatchItem]) -> BatchReport: |
| """ |
| Process a batch of items. |
| |
| Args: |
| items: List of batch items to process |
| |
| Returns: |
| Batch processing report |
| """ |
| self.is_processing = True |
| self.report = BatchReport(start_time=datetime.now()) |
| self.report.total_items = len(items) |
| |
| try: |
| |
| for item in items: |
| self.pending_queue.put(item) |
| |
| |
| if self.config.deduplicate: |
| items = self._deduplicate_items(items) |
| |
| |
| self._process_items(items) |
| |
| finally: |
| self.is_processing = False |
| self.report.end_time = datetime.now() |
| self.report.total_processing_time = ( |
| self.report.end_time - self.report.start_time |
| ).total_seconds() |
| |
| if self.report.processed_items > 0: |
| self.report.avg_processing_time = ( |
| self.report.total_processing_time / self.report.processed_items |
| ) |
| |
| |
| if self.config.save_report: |
| self._save_report() |
| |
| return self.report |
| |
| def _process_items(self, items: List[BatchItem]): |
| """Process all items in the batch.""" |
| |
| chunks = [items[i:i + self.config.chunk_size] |
| for i in range(0, len(items), self.config.chunk_size)] |
| |
| for chunk_idx, chunk in enumerate(chunks): |
| if self.should_stop: |
| break |
| |
| |
| self._wait_for_resources() |
| |
| |
| futures = [] |
| for item in chunk: |
| if self.should_stop: |
| break |
| |
| future = self.executor.submit(self._process_single_item, item) |
| futures.append((future, item)) |
| |
| |
| for future, item in futures: |
| try: |
| result = future.result(timeout=300) |
| item.result = result |
| item.status = "completed" if result else "failed" |
| |
| if result: |
| self.report.successful_items += 1 |
| else: |
| self.report.failed_items += 1 |
| |
| except Exception as e: |
| self.logger.error(f"Processing failed for {item.id}: {e}") |
| item.status = "failed" |
| item.error = str(e) |
| self.report.failed_items += 1 |
| |
| if self.config.stop_on_error: |
| self.should_stop = True |
| break |
| |
| finally: |
| self.report.processed_items += 1 |
| |
| |
| if self.config.progress_callback: |
| progress = self.report.processed_items / self.report.total_items |
| self.config.progress_callback(progress, { |
| 'current_item': item.id, |
| 'processed': self.report.processed_items, |
| 'total': self.report.total_items, |
| 'successful': self.report.successful_items, |
| 'failed': self.report.failed_items |
| }) |
| |
| def _process_single_item(self, item: BatchItem) -> bool: |
| """ |
| Process a single batch item. |
| |
| Args: |
| item: Batch item to process |
| |
| Returns: |
| True if successful |
| """ |
| start_time = time.time() |
| |
| try: |
| |
| if self.config.enable_caching: |
| cached_result = self._check_cache(item) |
| if cached_result is not None: |
| self._save_cached_result(item, cached_result) |
| item.processing_time = time.time() - start_time |
| return True |
| |
| |
| if item.file_type == FileType.IMAGE: |
| success = self._process_image(item) |
| elif item.file_type == FileType.VIDEO: |
| success = self._process_video(item) |
| else: |
| raise ValueError(f"Unsupported file type: {item.file_type}") |
| |
| |
| if success and self.config.enable_caching: |
| self._cache_result(item) |
| |
| item.processing_time = time.time() - start_time |
| |
| |
| self._update_size_stats(item) |
| |
| return success |
| |
| except Exception as e: |
| self.logger.error(f"Error processing {item.id}: {e}") |
| item.error = str(e) |
| |
| |
| if item.retry_count < item.max_retries: |
| item.retry_count += 1 |
| self.logger.info(f"Retrying {item.id} (attempt {item.retry_count}/{item.max_retries})") |
| return self._process_single_item(item) |
| |
| return False |
| |
| def _process_image(self, item: BatchItem) -> bool: |
| """Process an image file.""" |
| try: |
| |
| image = cv2.imread(item.input_path) |
| if image is None: |
| raise ValueError(f"Cannot load image: {item.input_path}") |
| |
| |
| pipeline_config = self.config.pipeline_config or PipelineConfig() |
| for key, value in item.config_overrides.items(): |
| if hasattr(pipeline_config, key): |
| setattr(pipeline_config, key, value) |
| |
| |
| result = self.pipeline.process_image( |
| image, |
| item.background |
| ) |
| |
| if result.success and result.output_image is not None: |
| |
| output_path = Path(item.output_path) |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| |
| |
| if output_path.suffix.lower() in ['.jpg', '.jpeg']: |
| cv2.imwrite( |
| str(output_path), |
| result.output_image, |
| [cv2.IMWRITE_JPEG_QUALITY, self.config.image_quality] |
| ) |
| else: |
| cv2.imwrite(str(output_path), result.output_image) |
| |
| |
| item.metadata['quality_score'] = result.quality_score |
| self._update_quality_metrics(result.quality_score) |
| |
| return True |
| |
| return False |
| |
| except Exception as e: |
| self.logger.error(f"Image processing failed for {item.input_path}: {e}") |
| raise |
| |
| def _process_video(self, item: BatchItem) -> bool: |
| """Process a video file.""" |
| try: |
| |
| output_path = Path(item.output_path) |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| |
| |
| stats = self.video_processor.process_video( |
| item.input_path, |
| str(output_path), |
| item.background |
| ) |
| |
| |
| item.metadata['video_stats'] = { |
| 'frames_processed': stats.frames_processed, |
| 'frames_dropped': stats.frames_dropped, |
| 'processing_fps': stats.processing_fps, |
| 'avg_quality': stats.avg_quality_score |
| } |
| |
| self._update_quality_metrics(stats.avg_quality_score) |
| |
| return stats.frames_processed > 0 |
| |
| except Exception as e: |
| self.logger.error(f"Video processing failed for {item.input_path}: {e}") |
| raise |
| |
| def _detect_file_type(self, file_path: str) -> FileType: |
| """Detect file type from path.""" |
| mime_type, _ = mimetypes.guess_type(file_path) |
| |
| if mime_type: |
| if mime_type.startswith('image/'): |
| return FileType.IMAGE |
| elif mime_type.startswith('video/'): |
| return FileType.VIDEO |
| |
| |
| ext = Path(file_path).suffix.lower() |
| if ext in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp']: |
| return FileType.IMAGE |
| elif ext in ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv']: |
| return FileType.VIDEO |
| |
| return FileType.UNKNOWN |
| |
| def _generate_item_id(self, file_path: Path) -> str: |
| """Generate unique ID for batch item.""" |
| |
| content = f"{file_path}{time.time()}" |
| return hashlib.md5(content.encode()).hexdigest()[:16] |
| |
| def _deduplicate_items(self, items: List[BatchItem]) -> List[BatchItem]: |
| """Remove duplicate items based on file content hash.""" |
| seen_hashes = set() |
| unique_items = [] |
| |
| for item in items: |
| try: |
| file_hash = self._calculate_file_hash(item.input_path) |
| |
| if file_hash not in seen_hashes: |
| seen_hashes.add(file_hash) |
| unique_items.append(item) |
| else: |
| self.logger.info(f"Skipping duplicate: {item.input_path}") |
| self.report.skipped_items += 1 |
| |
| except Exception as e: |
| self.logger.warning(f"Cannot calculate hash for {item.input_path}: {e}") |
| unique_items.append(item) |
| |
| return unique_items |
| |
| def _calculate_file_hash(self, file_path: str, chunk_size: int = 8192) -> str: |
| """Calculate MD5 hash of file.""" |
| hasher = hashlib.md5() |
| |
| with open(file_path, 'rb') as f: |
| while chunk:= f.read(chunk_size): |
| hasher.update(chunk) |
| |
| return hasher.hexdigest() |
| |
| def _check_cache(self, item: BatchItem) -> Optional[Any]: |
| """Check if item result is cached.""" |
| cache_key = self._get_cache_key(item) |
| cache_file = self.cache_dir / f"{cache_key}.pkl" |
| |
| if cache_file.exists(): |
| try: |
| with open(cache_file, 'rb') as f: |
| cached_data = pickle.load(f) |
| |
| |
| if cached_data.get('input_hash') == self._calculate_file_hash(item.input_path): |
| self.logger.info(f"Using cached result for {item.id}") |
| return cached_data['result'] |
| |
| except Exception as e: |
| self.logger.warning(f"Cache read failed: {e}") |
| |
| return None |
| |
| def _cache_result(self, item: BatchItem): |
| """Cache processing result.""" |
| try: |
| cache_key = self._get_cache_key(item) |
| cache_file = self.cache_dir / f"{cache_key}.pkl" |
| |
| |
| with open(item.output_path, 'rb') as f: |
| result_data = f.read() |
| |
| |
| cache_data = { |
| 'input_hash': self._calculate_file_hash(item.input_path), |
| 'result': result_data, |
| 'metadata': item.metadata, |
| 'timestamp': time.time() |
| } |
| |
| with open(cache_file, 'wb') as f: |
| pickle.dump(cache_data, f) |
| |
| except Exception as e: |
| self.logger.warning(f"Cache write failed: {e}") |
| |
| def _save_cached_result(self, item: BatchItem, cached_data: bytes): |
| """Save cached result to output file.""" |
| output_path = Path(item.output_path) |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| |
| with open(output_path, 'wb') as f: |
| f.write(cached_data) |
| |
| def _get_cache_key(self, item: BatchItem) -> str: |
| """Generate cache key for item.""" |
| |
| key_parts = [ |
| item.input_path, |
| str(item.background) if item.background is not None else "none", |
| json.dumps(item.config_overrides, sort_keys=True) |
| ] |
| |
| key_string = "|".join(key_parts) |
| return hashlib.md5(key_string.encode()).hexdigest() |
| |
| def _wait_for_resources(self): |
| """Wait for sufficient resources before processing.""" |
| while True: |
| |
| cpu_percent = psutil.cpu_percent(interval=1) |
| if cpu_percent > self.config.cpu_limit_percent: |
| self.logger.debug(f"CPU usage high ({cpu_percent}%), waiting...") |
| time.sleep(2) |
| continue |
| |
| |
| memory = psutil.virtual_memory() |
| memory_gb = (memory.total - memory.available) / (1024**3) |
| if memory_gb > self.config.max_memory_gb: |
| self.logger.debug(f"Memory usage high ({memory_gb:.1f}GB), waiting...") |
| time.sleep(2) |
| continue |
| |
| |
| break |
| |
| def _update_size_stats(self, item: BatchItem): |
| """Update file size statistics.""" |
| try: |
| input_size = os.path.getsize(item.input_path) / (1024**2) |
| output_size = os.path.getsize(item.output_path) / (1024**2) |
| |
| self.report.total_input_size_mb += input_size |
| self.report.total_output_size_mb += output_size |
| |
| if self.report.total_input_size_mb > 0: |
| self.report.compression_ratio = ( |
| self.report.total_output_size_mb / self.report.total_input_size_mb |
| ) |
| |
| except Exception as e: |
| self.logger.warning(f"Cannot update size stats: {e}") |
| |
| def _update_quality_metrics(self, quality_score: float): |
| """Update quality metrics in report.""" |
| if 'scores' not in self.report.quality_metrics: |
| self.report.quality_metrics['scores'] = [] |
| |
| self.report.quality_metrics['scores'].append(quality_score) |
| |
| scores = self.report.quality_metrics['scores'] |
| self.report.quality_metrics['avg_quality'] = np.mean(scores) |
| self.report.quality_metrics['min_quality'] = np.min(scores) |
| self.report.quality_metrics['max_quality'] = np.max(scores) |
| self.report.quality_metrics['std_quality'] = np.std(scores) |
| |
| def _save_report(self): |
| """Save processing report to file.""" |
| try: |
| report_path = self.config.report_path |
| if not report_path: |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| report_path = f"batch_report_{timestamp}.json" |
| |
| report_dict = { |
| 'start_time': self.report.start_time.isoformat(), |
| 'end_time': self.report.end_time.isoformat() if self.report.end_time else None, |
| 'total_items': self.report.total_items, |
| 'processed_items': self.report.processed_items, |
| 'successful_items': self.report.successful_items, |
| 'failed_items': self.report.failed_items, |
| 'skipped_items': self.report.skipped_items, |
| 'total_processing_time': self.report.total_processing_time, |
| 'avg_processing_time': self.report.avg_processing_time, |
| 'total_input_size_mb': self.report.total_input_size_mb, |
| 'total_output_size_mb': self.report.total_output_size_mb, |
| 'compression_ratio': self.report.compression_ratio, |
| 'quality_metrics': self.report.quality_metrics, |
| 'errors': self.report.errors, |
| 'warnings': self.report.warnings |
| } |
| |
| with open(report_path, 'w') as f: |
| json.dump(report_dict, f, indent=2) |
| |
| self.logger.info(f"Report saved to {report_path}") |
| |
| except Exception as e: |
| self.logger.error(f"Failed to save report: {e}") |
| |
| def process_with_pattern(self, |
| pattern: str, |
| output_template: str, |
| background: Optional[Union[str, np.ndarray]] = None) -> BatchReport: |
| """ |
| Process files matching a pattern with template-based output. |
| |
| Args: |
| pattern: File pattern (e.g., "images/*.jpg") |
| output_template: Output path template (e.g., "output/{name}_bg.{ext}") |
| background: Background for processing |
| |
| Returns: |
| Batch processing report |
| """ |
| items = [] |
| |
| for file_path in Path().glob(pattern): |
| if file_path.is_file(): |
| |
| output_path = output_template.format( |
| name=file_path.stem, |
| ext=file_path.suffix[1:], |
| dir=file_path.parent, |
| date=datetime.now().strftime("%Y%m%d") |
| ) |
| |
| item = BatchItem( |
| id=self._generate_item_id(file_path), |
| input_path=str(file_path), |
| output_path=output_path, |
| file_type=self._detect_file_type(str(file_path)), |
| background=background |
| ) |
| |
| items.append(item) |
| |
| return self.process_batch(items) |
| |
| def stop_processing(self): |
| """Stop batch processing.""" |
| self.should_stop = True |
| self.logger.info("Stopping batch processing...") |
| |
| def cleanup(self): |
| """Clean up resources.""" |
| self.stop_processing() |
| self.executor.shutdown(wait=True) |
| |
| |
| if self.config.cache_dir is None: |
| shutil.rmtree(self.cache_dir, ignore_errors=True) |
| |
| self.logger.info("Batch processor cleanup complete") |
| |
| def get_status(self) -> Dict[str, Any]: |
| """Get current processing status.""" |
| return { |
| 'is_processing': self.is_processing, |
| 'total_items': self.report.total_items, |
| 'processed_items': self.report.processed_items, |
| 'successful_items': self.report.successful_items, |
| 'failed_items': self.report.failed_items, |
| 'skipped_items': self.report.skipped_items, |
| 'current_item': self.current_item.id if self.current_item else None, |
| 'progress': (self.report.processed_items / self.report.total_items * 100 |
| if self.report.total_items > 0 else 0), |
| 'estimated_time_remaining': self.time_estimator.estimate_remaining( |
| self.report.processed_items, |
| self.report.total_items |
| ) if self.is_processing else None |
| } |