""" ============================================================ PERFORMANCE OPTIMIZER ============================================================ Yeh file performance optimizations handle karti hai: - Response caching - Request rate limiting - Memory management - Batch processing - Connection pooling ============================================================ """ import time import hashlib import logging import asyncio from typing import Optional, Dict, Any from collections import OrderedDict from functools import wraps from datetime import datetime, timedelta logger = logging.getLogger('RuhiOptimizer') class ResponseCache: """ LRU Cache for AI responses. Similar questions ka answer cache se deta hai. Fast response time milta hai repeated questions ke liye. """ def __init__(self, max_size: int = 500, ttl_seconds: int = 3600): """ Args: max_size: Maximum cached responses ttl_seconds: Time to live for each cache entry (1 hour default) """ self.max_size = max_size self.ttl_seconds = ttl_seconds self.cache: OrderedDict = OrderedDict() self.hits = 0 self.misses = 0 self.total_requests = 0 def _make_key(self, message: str) -> str: """Message se cache key banata hai""" # Normalize: lowercase, strip, remove extra spaces normalized = ' '.join(message.lower().strip().split()) return hashlib.md5(normalized.encode()).hexdigest() def get(self, message: str) -> Optional[str]: """ Cache se response dhundhta hai. Args: message: User ka message Returns: Cached response or None """ self.total_requests += 1 key = self._make_key(message) if key in self.cache: entry = self.cache[key] # Check TTL if time.time() - entry['timestamp'] < self.ttl_seconds: # Move to end (most recently used) self.cache.move_to_end(key) self.hits += 1 logger.debug(f"Cache HIT for: {message[:50]}...") return entry['response'] else: # Expired - remove del self.cache[key] self.misses += 1 return None def set(self, message: str, response: str): """ Response ko cache mein store karta hai. Args: message: User ka message response: AI ka response """ key = self._make_key(message) # Remove oldest if full if len(self.cache) >= self.max_size: self.cache.popitem(last=False) self.cache[key] = { 'response': response, 'timestamp': time.time(), 'message': message[:100], } def clear(self): """Cache clear karta hai""" self.cache.clear() logger.info("Response cache cleared") def get_stats(self) -> dict: """Cache statistics return karta hai""" hit_rate = 0 if self.total_requests > 0: hit_rate = (self.hits / self.total_requests) * 100 return { 'size': len(self.cache), 'max_size': self.max_size, 'hits': self.hits, 'misses': self.misses, 'total_requests': self.total_requests, 'hit_rate_percent': round(hit_rate, 2), 'ttl_seconds': self.ttl_seconds, } class RateLimiter: """ Per-user rate limiting. Ek user bahut zyada requests bheje to slow down karta hai. """ def __init__( self, max_requests: int = 20, window_seconds: int = 60, cooldown_seconds: int = 10 ): """ Args: max_requests: Max requests per window window_seconds: Time window in seconds cooldown_seconds: Cooldown after limit hit """ self.max_requests = max_requests self.window_seconds = window_seconds self.cooldown_seconds = cooldown_seconds self.user_requests: Dict[str, list] = {} self.cooldowns: Dict[str, float] = {} def is_allowed(self, user_id: str) -> tuple: """ Check karta hai ki user ko request karne dena chahiye ya nahi. Args: user_id: User identifier Returns: (is_allowed: bool, wait_seconds: int) """ now = time.time() # Check cooldown if user_id in self.cooldowns: cooldown_end = self.cooldowns[user_id] if now < cooldown_end: wait = int(cooldown_end - now) + 1 return False, wait else: del self.cooldowns[user_id] # Initialize user's request list if user_id not in self.user_requests: self.user_requests[user_id] = [] # Remove old requests outside window cutoff = now - self.window_seconds self.user_requests[user_id] = [ t for t in self.user_requests[user_id] if t > cutoff ] # Check limit if len(self.user_requests[user_id]) >= self.max_requests: # Set cooldown self.cooldowns[user_id] = now + self.cooldown_seconds return False, self.cooldown_seconds # Record request self.user_requests[user_id].append(now) return True, 0 def cleanup(self): """Old data cleanup karta hai""" now = time.time() cutoff = now - self.window_seconds * 2 # Clean old requests for user_id in list(self.user_requests.keys()): self.user_requests[user_id] = [ t for t in self.user_requests[user_id] if t > cutoff ] if not self.user_requests[user_id]: del self.user_requests[user_id] # Clean expired cooldowns for user_id in list(self.cooldowns.keys()): if now > self.cooldowns[user_id]: del self.cooldowns[user_id] class RequestQueue: """ Request queue for sequential processing. Model ek time pe ek hi request process kar sakta hai. Yeh queue requests ko orderly process karta hai. """ def __init__(self, max_queue_size: int = 50): self.queue = asyncio.Queue(maxsize=max_queue_size) self.is_processing = False self.processed_count = 0 self.dropped_count = 0 async def add_request( self, request_id: str, process_func, *args, **kwargs ) -> Optional[Any]: """ Request ko queue mein add karta hai aur result wait karta hai. Args: request_id: Unique request identifier process_func: Async function to process *args, **kwargs: Arguments for the function Returns: Function result or None if queue is full """ result_future = asyncio.Future() try: self.queue.put_nowait({ 'id': request_id, 'func': process_func, 'args': args, 'kwargs': kwargs, 'future': result_future, 'timestamp': time.time(), }) logger.debug( f"Request {request_id} queued. " f"Queue size: {self.queue.qsize()}" ) except asyncio.QueueFull: self.dropped_count += 1 logger.warning(f"Queue full! Request {request_id} dropped") return None # Start processing if not already if not self.is_processing: asyncio.create_task(self._process_queue()) # Wait for result with timeout try: result = await asyncio.wait_for(result_future, timeout=120) return result except asyncio.TimeoutError: logger.warning(f"Request {request_id} timed out") return None async def _process_queue(self): """Queue ko sequentially process karta hai""" self.is_processing = True while not self.queue.empty(): try: item = await self.queue.get() request_id = item['id'] func = item['func'] args = item['args'] kwargs = item['kwargs'] future = item['future'] logger.debug(f"Processing request {request_id}") try: if asyncio.iscoroutinefunction(func): result = await func(*args, **kwargs) else: result = func(*args, **kwargs) if not future.done(): future.set_result(result) self.processed_count += 1 except Exception as e: logger.error( f"Error processing request {request_id}: {e}" ) if not future.done(): future.set_exception(e) self.queue.task_done() except Exception as e: logger.error(f"Queue processing error: {e}") self.is_processing = False def get_stats(self) -> dict: """Queue statistics""" return { 'queue_size': self.queue.qsize(), 'max_size': self.queue.maxsize, 'is_processing': self.is_processing, 'processed_count': self.processed_count, 'dropped_count': self.dropped_count, } class PerformanceMonitor: """ Performance metrics track karta hai. Response times, throughput, errors etc. """ def __init__(self): self.response_times = [] self.max_tracked = 1000 # Last 1000 requests track karo self.error_count = 0 self.success_count = 0 self.start_time = time.time() def record_response(self, duration_seconds: float, success: bool = True): """Response time record karta hai""" self.response_times.append({ 'duration': duration_seconds, 'timestamp': time.time(), 'success': success, }) if success: self.success_count += 1 else: self.error_count += 1 # Keep only last N if len(self.response_times) > self.max_tracked: self.response_times = self.response_times[-self.max_tracked:] def get_metrics(self) -> dict: """Performance metrics return karta hai""" if not self.response_times: return { 'avg_response_time': 0, 'min_response_time': 0, 'max_response_time': 0, 'total_requests': 0, 'success_rate': 0, 'uptime_seconds': int(time.time() - self.start_time), } durations = [r['duration'] for r in self.response_times] total = self.success_count + self.error_count return { 'avg_response_time': round(sum(durations) / len(durations), 2), 'min_response_time': round(min(durations), 2), 'max_response_time': round(max(durations), 2), 'median_response_time': round( sorted(durations)[len(durations) // 2], 2 ), 'total_requests': total, 'success_count': self.success_count, 'error_count': self.error_count, 'success_rate': round( (self.success_count / total * 100) if total > 0 else 0, 2 ), 'uptime_seconds': int(time.time() - self.start_time), 'requests_per_minute': round( total / max( (time.time() - self.start_time) / 60, 1 ), 2 ), } def measure_time(func): """Decorator jo function ka execution time measure karta hai""" @wraps(func) async def async_wrapper(*args, **kwargs): start = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start logger.info( f"{func.__name__} completed in {duration:.2f}s" ) return result except Exception as e: duration = time.time() - start logger.error( f"{func.__name__} failed after {duration:.2f}s: {e}" ) raise @wraps(func) def sync_wrapper(*args, **kwargs): start = time.time() try: result = func(*args, **kwargs) duration = time.time() - start logger.info( f"{func.__name__} completed in {duration:.2f}s" ) return result except Exception as e: duration = time.time() - start logger.error( f"{func.__name__} failed after {duration:.2f}s: {e}" ) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper # ==================== GLOBAL INSTANCES ==================== response_cache = ResponseCache(max_size=500, ttl_seconds=3600) rate_limiter = RateLimiter( max_requests=20, window_seconds=60, cooldown_seconds=10 ) request_queue = RequestQueue(max_queue_size=50) performance_monitor = PerformanceMonitor()