Ruhijii / performance_optimizer.py
Ruhivig65's picture
Upload 24 files
7fd6889 verified
"""
============================================================
PERFORMANCE OPTIMIZER
============================================================
Yeh file performance optimizations handle karti hai:
- Response caching
- Request rate limiting
- Memory management
- Batch processing
- Connection pooling
============================================================
"""
import time
import hashlib
import logging
import asyncio
from typing import Optional, Dict, Any
from collections import OrderedDict
from functools import wraps
from datetime import datetime, timedelta
logger = logging.getLogger('RuhiOptimizer')
class ResponseCache:
"""
LRU Cache for AI responses.
Similar questions ka answer cache se deta hai.
Fast response time milta hai repeated questions ke liye.
"""
def __init__(self, max_size: int = 500, ttl_seconds: int = 3600):
"""
Args:
max_size: Maximum cached responses
ttl_seconds: Time to live for each cache entry (1 hour default)
"""
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self.cache: OrderedDict = OrderedDict()
self.hits = 0
self.misses = 0
self.total_requests = 0
def _make_key(self, message: str) -> str:
"""Message se cache key banata hai"""
# Normalize: lowercase, strip, remove extra spaces
normalized = ' '.join(message.lower().strip().split())
return hashlib.md5(normalized.encode()).hexdigest()
def get(self, message: str) -> Optional[str]:
"""
Cache se response dhundhta hai.
Args:
message: User ka message
Returns:
Cached response or None
"""
self.total_requests += 1
key = self._make_key(message)
if key in self.cache:
entry = self.cache[key]
# Check TTL
if time.time() - entry['timestamp'] < self.ttl_seconds:
# Move to end (most recently used)
self.cache.move_to_end(key)
self.hits += 1
logger.debug(f"Cache HIT for: {message[:50]}...")
return entry['response']
else:
# Expired - remove
del self.cache[key]
self.misses += 1
return None
def set(self, message: str, response: str):
"""
Response ko cache mein store karta hai.
Args:
message: User ka message
response: AI ka response
"""
key = self._make_key(message)
# Remove oldest if full
if len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = {
'response': response,
'timestamp': time.time(),
'message': message[:100],
}
def clear(self):
"""Cache clear karta hai"""
self.cache.clear()
logger.info("Response cache cleared")
def get_stats(self) -> dict:
"""Cache statistics return karta hai"""
hit_rate = 0
if self.total_requests > 0:
hit_rate = (self.hits / self.total_requests) * 100
return {
'size': len(self.cache),
'max_size': self.max_size,
'hits': self.hits,
'misses': self.misses,
'total_requests': self.total_requests,
'hit_rate_percent': round(hit_rate, 2),
'ttl_seconds': self.ttl_seconds,
}
class RateLimiter:
"""
Per-user rate limiting.
Ek user bahut zyada requests bheje to slow down karta hai.
"""
def __init__(
self,
max_requests: int = 20,
window_seconds: int = 60,
cooldown_seconds: int = 10
):
"""
Args:
max_requests: Max requests per window
window_seconds: Time window in seconds
cooldown_seconds: Cooldown after limit hit
"""
self.max_requests = max_requests
self.window_seconds = window_seconds
self.cooldown_seconds = cooldown_seconds
self.user_requests: Dict[str, list] = {}
self.cooldowns: Dict[str, float] = {}
def is_allowed(self, user_id: str) -> tuple:
"""
Check karta hai ki user ko request karne dena chahiye ya nahi.
Args:
user_id: User identifier
Returns:
(is_allowed: bool, wait_seconds: int)
"""
now = time.time()
# Check cooldown
if user_id in self.cooldowns:
cooldown_end = self.cooldowns[user_id]
if now < cooldown_end:
wait = int(cooldown_end - now) + 1
return False, wait
else:
del self.cooldowns[user_id]
# Initialize user's request list
if user_id not in self.user_requests:
self.user_requests[user_id] = []
# Remove old requests outside window
cutoff = now - self.window_seconds
self.user_requests[user_id] = [
t for t in self.user_requests[user_id] if t > cutoff
]
# Check limit
if len(self.user_requests[user_id]) >= self.max_requests:
# Set cooldown
self.cooldowns[user_id] = now + self.cooldown_seconds
return False, self.cooldown_seconds
# Record request
self.user_requests[user_id].append(now)
return True, 0
def cleanup(self):
"""Old data cleanup karta hai"""
now = time.time()
cutoff = now - self.window_seconds * 2
# Clean old requests
for user_id in list(self.user_requests.keys()):
self.user_requests[user_id] = [
t for t in self.user_requests[user_id] if t > cutoff
]
if not self.user_requests[user_id]:
del self.user_requests[user_id]
# Clean expired cooldowns
for user_id in list(self.cooldowns.keys()):
if now > self.cooldowns[user_id]:
del self.cooldowns[user_id]
class RequestQueue:
"""
Request queue for sequential processing.
Model ek time pe ek hi request process kar sakta hai.
Yeh queue requests ko orderly process karta hai.
"""
def __init__(self, max_queue_size: int = 50):
self.queue = asyncio.Queue(maxsize=max_queue_size)
self.is_processing = False
self.processed_count = 0
self.dropped_count = 0
async def add_request(
self,
request_id: str,
process_func,
*args,
**kwargs
) -> Optional[Any]:
"""
Request ko queue mein add karta hai aur result wait karta hai.
Args:
request_id: Unique request identifier
process_func: Async function to process
*args, **kwargs: Arguments for the function
Returns:
Function result or None if queue is full
"""
result_future = asyncio.Future()
try:
self.queue.put_nowait({
'id': request_id,
'func': process_func,
'args': args,
'kwargs': kwargs,
'future': result_future,
'timestamp': time.time(),
})
logger.debug(
f"Request {request_id} queued. "
f"Queue size: {self.queue.qsize()}"
)
except asyncio.QueueFull:
self.dropped_count += 1
logger.warning(f"Queue full! Request {request_id} dropped")
return None
# Start processing if not already
if not self.is_processing:
asyncio.create_task(self._process_queue())
# Wait for result with timeout
try:
result = await asyncio.wait_for(result_future, timeout=120)
return result
except asyncio.TimeoutError:
logger.warning(f"Request {request_id} timed out")
return None
async def _process_queue(self):
"""Queue ko sequentially process karta hai"""
self.is_processing = True
while not self.queue.empty():
try:
item = await self.queue.get()
request_id = item['id']
func = item['func']
args = item['args']
kwargs = item['kwargs']
future = item['future']
logger.debug(f"Processing request {request_id}")
try:
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
if not future.done():
future.set_result(result)
self.processed_count += 1
except Exception as e:
logger.error(
f"Error processing request {request_id}: {e}"
)
if not future.done():
future.set_exception(e)
self.queue.task_done()
except Exception as e:
logger.error(f"Queue processing error: {e}")
self.is_processing = False
def get_stats(self) -> dict:
"""Queue statistics"""
return {
'queue_size': self.queue.qsize(),
'max_size': self.queue.maxsize,
'is_processing': self.is_processing,
'processed_count': self.processed_count,
'dropped_count': self.dropped_count,
}
class PerformanceMonitor:
"""
Performance metrics track karta hai.
Response times, throughput, errors etc.
"""
def __init__(self):
self.response_times = []
self.max_tracked = 1000 # Last 1000 requests track karo
self.error_count = 0
self.success_count = 0
self.start_time = time.time()
def record_response(self, duration_seconds: float, success: bool = True):
"""Response time record karta hai"""
self.response_times.append({
'duration': duration_seconds,
'timestamp': time.time(),
'success': success,
})
if success:
self.success_count += 1
else:
self.error_count += 1
# Keep only last N
if len(self.response_times) > self.max_tracked:
self.response_times = self.response_times[-self.max_tracked:]
def get_metrics(self) -> dict:
"""Performance metrics return karta hai"""
if not self.response_times:
return {
'avg_response_time': 0,
'min_response_time': 0,
'max_response_time': 0,
'total_requests': 0,
'success_rate': 0,
'uptime_seconds': int(time.time() - self.start_time),
}
durations = [r['duration'] for r in self.response_times]
total = self.success_count + self.error_count
return {
'avg_response_time': round(sum(durations) / len(durations), 2),
'min_response_time': round(min(durations), 2),
'max_response_time': round(max(durations), 2),
'median_response_time': round(
sorted(durations)[len(durations) // 2], 2
),
'total_requests': total,
'success_count': self.success_count,
'error_count': self.error_count,
'success_rate': round(
(self.success_count / total * 100) if total > 0 else 0, 2
),
'uptime_seconds': int(time.time() - self.start_time),
'requests_per_minute': round(
total / max(
(time.time() - self.start_time) / 60, 1
), 2
),
}
def measure_time(func):
"""Decorator jo function ka execution time measure karta hai"""
@wraps(func)
async def async_wrapper(*args, **kwargs):
start = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start
logger.info(
f"{func.__name__} completed in {duration:.2f}s"
)
return result
except Exception as e:
duration = time.time() - start
logger.error(
f"{func.__name__} failed after {duration:.2f}s: {e}"
)
raise
@wraps(func)
def sync_wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start
logger.info(
f"{func.__name__} completed in {duration:.2f}s"
)
return result
except Exception as e:
duration = time.time() - start
logger.error(
f"{func.__name__} failed after {duration:.2f}s: {e}"
)
raise
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
# ==================== GLOBAL INSTANCES ====================
response_cache = ResponseCache(max_size=500, ttl_seconds=3600)
rate_limiter = RateLimiter(
max_requests=20,
window_seconds=60,
cooldown_seconds=10
)
request_queue = RequestQueue(max_queue_size=50)
performance_monitor = PerformanceMonitor()