| """ |
| ============================================================ |
| PERFORMANCE OPTIMIZER |
| ============================================================ |
| Yeh file performance optimizations handle karti hai: |
| - Response caching |
| - Request rate limiting |
| - Memory management |
| - Batch processing |
| - Connection pooling |
| ============================================================ |
| """ |
|
|
| import time |
| import hashlib |
| import logging |
| import asyncio |
| from typing import Optional, Dict, Any |
| from collections import OrderedDict |
| from functools import wraps |
| from datetime import datetime, timedelta |
|
|
| logger = logging.getLogger('RuhiOptimizer') |
|
|
|
|
| class ResponseCache: |
| """ |
| LRU Cache for AI responses. |
| Similar questions ka answer cache se deta hai. |
| Fast response time milta hai repeated questions ke liye. |
| """ |
|
|
| def __init__(self, max_size: int = 500, ttl_seconds: int = 3600): |
| """ |
| Args: |
| max_size: Maximum cached responses |
| ttl_seconds: Time to live for each cache entry (1 hour default) |
| """ |
| self.max_size = max_size |
| self.ttl_seconds = ttl_seconds |
| self.cache: OrderedDict = OrderedDict() |
| self.hits = 0 |
| self.misses = 0 |
| self.total_requests = 0 |
|
|
| def _make_key(self, message: str) -> str: |
| """Message se cache key banata hai""" |
| |
| normalized = ' '.join(message.lower().strip().split()) |
| return hashlib.md5(normalized.encode()).hexdigest() |
|
|
| def get(self, message: str) -> Optional[str]: |
| """ |
| Cache se response dhundhta hai. |
| |
| Args: |
| message: User ka message |
| |
| Returns: |
| Cached response or None |
| """ |
| self.total_requests += 1 |
| key = self._make_key(message) |
|
|
| if key in self.cache: |
| entry = self.cache[key] |
| |
| if time.time() - entry['timestamp'] < self.ttl_seconds: |
| |
| self.cache.move_to_end(key) |
| self.hits += 1 |
| logger.debug(f"Cache HIT for: {message[:50]}...") |
| return entry['response'] |
| else: |
| |
| del self.cache[key] |
|
|
| self.misses += 1 |
| return None |
|
|
| def set(self, message: str, response: str): |
| """ |
| Response ko cache mein store karta hai. |
| |
| Args: |
| message: User ka message |
| response: AI ka response |
| """ |
| key = self._make_key(message) |
|
|
| |
| if len(self.cache) >= self.max_size: |
| self.cache.popitem(last=False) |
|
|
| self.cache[key] = { |
| 'response': response, |
| 'timestamp': time.time(), |
| 'message': message[:100], |
| } |
|
|
| def clear(self): |
| """Cache clear karta hai""" |
| self.cache.clear() |
| logger.info("Response cache cleared") |
|
|
| def get_stats(self) -> dict: |
| """Cache statistics return karta hai""" |
| hit_rate = 0 |
| if self.total_requests > 0: |
| hit_rate = (self.hits / self.total_requests) * 100 |
|
|
| return { |
| 'size': len(self.cache), |
| 'max_size': self.max_size, |
| 'hits': self.hits, |
| 'misses': self.misses, |
| 'total_requests': self.total_requests, |
| 'hit_rate_percent': round(hit_rate, 2), |
| 'ttl_seconds': self.ttl_seconds, |
| } |
|
|
|
|
| class RateLimiter: |
| """ |
| Per-user rate limiting. |
| Ek user bahut zyada requests bheje to slow down karta hai. |
| """ |
|
|
| def __init__( |
| self, |
| max_requests: int = 20, |
| window_seconds: int = 60, |
| cooldown_seconds: int = 10 |
| ): |
| """ |
| Args: |
| max_requests: Max requests per window |
| window_seconds: Time window in seconds |
| cooldown_seconds: Cooldown after limit hit |
| """ |
| self.max_requests = max_requests |
| self.window_seconds = window_seconds |
| self.cooldown_seconds = cooldown_seconds |
| self.user_requests: Dict[str, list] = {} |
| self.cooldowns: Dict[str, float] = {} |
|
|
| def is_allowed(self, user_id: str) -> tuple: |
| """ |
| Check karta hai ki user ko request karne dena chahiye ya nahi. |
| |
| Args: |
| user_id: User identifier |
| |
| Returns: |
| (is_allowed: bool, wait_seconds: int) |
| """ |
| now = time.time() |
|
|
| |
| if user_id in self.cooldowns: |
| cooldown_end = self.cooldowns[user_id] |
| if now < cooldown_end: |
| wait = int(cooldown_end - now) + 1 |
| return False, wait |
| else: |
| del self.cooldowns[user_id] |
|
|
| |
| if user_id not in self.user_requests: |
| self.user_requests[user_id] = [] |
|
|
| |
| cutoff = now - self.window_seconds |
| self.user_requests[user_id] = [ |
| t for t in self.user_requests[user_id] if t > cutoff |
| ] |
|
|
| |
| if len(self.user_requests[user_id]) >= self.max_requests: |
| |
| self.cooldowns[user_id] = now + self.cooldown_seconds |
| return False, self.cooldown_seconds |
|
|
| |
| self.user_requests[user_id].append(now) |
| return True, 0 |
|
|
| def cleanup(self): |
| """Old data cleanup karta hai""" |
| now = time.time() |
| cutoff = now - self.window_seconds * 2 |
|
|
| |
| for user_id in list(self.user_requests.keys()): |
| self.user_requests[user_id] = [ |
| t for t in self.user_requests[user_id] if t > cutoff |
| ] |
| if not self.user_requests[user_id]: |
| del self.user_requests[user_id] |
|
|
| |
| for user_id in list(self.cooldowns.keys()): |
| if now > self.cooldowns[user_id]: |
| del self.cooldowns[user_id] |
|
|
|
|
| class RequestQueue: |
| """ |
| Request queue for sequential processing. |
| Model ek time pe ek hi request process kar sakta hai. |
| Yeh queue requests ko orderly process karta hai. |
| """ |
|
|
| def __init__(self, max_queue_size: int = 50): |
| self.queue = asyncio.Queue(maxsize=max_queue_size) |
| self.is_processing = False |
| self.processed_count = 0 |
| self.dropped_count = 0 |
|
|
| async def add_request( |
| self, |
| request_id: str, |
| process_func, |
| *args, |
| **kwargs |
| ) -> Optional[Any]: |
| """ |
| Request ko queue mein add karta hai aur result wait karta hai. |
| |
| Args: |
| request_id: Unique request identifier |
| process_func: Async function to process |
| *args, **kwargs: Arguments for the function |
| |
| Returns: |
| Function result or None if queue is full |
| """ |
| result_future = asyncio.Future() |
|
|
| try: |
| self.queue.put_nowait({ |
| 'id': request_id, |
| 'func': process_func, |
| 'args': args, |
| 'kwargs': kwargs, |
| 'future': result_future, |
| 'timestamp': time.time(), |
| }) |
|
|
| logger.debug( |
| f"Request {request_id} queued. " |
| f"Queue size: {self.queue.qsize()}" |
| ) |
|
|
| except asyncio.QueueFull: |
| self.dropped_count += 1 |
| logger.warning(f"Queue full! Request {request_id} dropped") |
| return None |
|
|
| |
| if not self.is_processing: |
| asyncio.create_task(self._process_queue()) |
|
|
| |
| try: |
| result = await asyncio.wait_for(result_future, timeout=120) |
| return result |
| except asyncio.TimeoutError: |
| logger.warning(f"Request {request_id} timed out") |
| return None |
|
|
| async def _process_queue(self): |
| """Queue ko sequentially process karta hai""" |
| self.is_processing = True |
|
|
| while not self.queue.empty(): |
| try: |
| item = await self.queue.get() |
|
|
| request_id = item['id'] |
| func = item['func'] |
| args = item['args'] |
| kwargs = item['kwargs'] |
| future = item['future'] |
|
|
| logger.debug(f"Processing request {request_id}") |
|
|
| try: |
| if asyncio.iscoroutinefunction(func): |
| result = await func(*args, **kwargs) |
| else: |
| result = func(*args, **kwargs) |
|
|
| if not future.done(): |
| future.set_result(result) |
|
|
| self.processed_count += 1 |
|
|
| except Exception as e: |
| logger.error( |
| f"Error processing request {request_id}: {e}" |
| ) |
| if not future.done(): |
| future.set_exception(e) |
|
|
| self.queue.task_done() |
|
|
| except Exception as e: |
| logger.error(f"Queue processing error: {e}") |
|
|
| self.is_processing = False |
|
|
| def get_stats(self) -> dict: |
| """Queue statistics""" |
| return { |
| 'queue_size': self.queue.qsize(), |
| 'max_size': self.queue.maxsize, |
| 'is_processing': self.is_processing, |
| 'processed_count': self.processed_count, |
| 'dropped_count': self.dropped_count, |
| } |
|
|
|
|
| class PerformanceMonitor: |
| """ |
| Performance metrics track karta hai. |
| Response times, throughput, errors etc. |
| """ |
|
|
| def __init__(self): |
| self.response_times = [] |
| self.max_tracked = 1000 |
| self.error_count = 0 |
| self.success_count = 0 |
| self.start_time = time.time() |
|
|
| def record_response(self, duration_seconds: float, success: bool = True): |
| """Response time record karta hai""" |
| self.response_times.append({ |
| 'duration': duration_seconds, |
| 'timestamp': time.time(), |
| 'success': success, |
| }) |
|
|
| if success: |
| self.success_count += 1 |
| else: |
| self.error_count += 1 |
|
|
| |
| if len(self.response_times) > self.max_tracked: |
| self.response_times = self.response_times[-self.max_tracked:] |
|
|
| def get_metrics(self) -> dict: |
| """Performance metrics return karta hai""" |
| if not self.response_times: |
| return { |
| 'avg_response_time': 0, |
| 'min_response_time': 0, |
| 'max_response_time': 0, |
| 'total_requests': 0, |
| 'success_rate': 0, |
| 'uptime_seconds': int(time.time() - self.start_time), |
| } |
|
|
| durations = [r['duration'] for r in self.response_times] |
| total = self.success_count + self.error_count |
|
|
| return { |
| 'avg_response_time': round(sum(durations) / len(durations), 2), |
| 'min_response_time': round(min(durations), 2), |
| 'max_response_time': round(max(durations), 2), |
| 'median_response_time': round( |
| sorted(durations)[len(durations) // 2], 2 |
| ), |
| 'total_requests': total, |
| 'success_count': self.success_count, |
| 'error_count': self.error_count, |
| 'success_rate': round( |
| (self.success_count / total * 100) if total > 0 else 0, 2 |
| ), |
| 'uptime_seconds': int(time.time() - self.start_time), |
| 'requests_per_minute': round( |
| total / max( |
| (time.time() - self.start_time) / 60, 1 |
| ), 2 |
| ), |
| } |
|
|
|
|
| def measure_time(func): |
| """Decorator jo function ka execution time measure karta hai""" |
|
|
| @wraps(func) |
| async def async_wrapper(*args, **kwargs): |
| start = time.time() |
| try: |
| result = await func(*args, **kwargs) |
| duration = time.time() - start |
| logger.info( |
| f"{func.__name__} completed in {duration:.2f}s" |
| ) |
| return result |
| except Exception as e: |
| duration = time.time() - start |
| logger.error( |
| f"{func.__name__} failed after {duration:.2f}s: {e}" |
| ) |
| raise |
|
|
| @wraps(func) |
| def sync_wrapper(*args, **kwargs): |
| start = time.time() |
| try: |
| result = func(*args, **kwargs) |
| duration = time.time() - start |
| logger.info( |
| f"{func.__name__} completed in {duration:.2f}s" |
| ) |
| return result |
| except Exception as e: |
| duration = time.time() - start |
| logger.error( |
| f"{func.__name__} failed after {duration:.2f}s: {e}" |
| ) |
| raise |
|
|
| if asyncio.iscoroutinefunction(func): |
| return async_wrapper |
| return sync_wrapper |
|
|
|
|
| |
|
|
| response_cache = ResponseCache(max_size=500, ttl_seconds=3600) |
| rate_limiter = RateLimiter( |
| max_requests=20, |
| window_seconds=60, |
| cooldown_seconds=10 |
| ) |
| request_queue = RequestQueue(max_queue_size=50) |
| performance_monitor = PerformanceMonitor() |