Spaces:
Running
Running
| import os | |
| import requests | |
| from dotenv import load_dotenv | |
| import redis | |
| import logging | |
| from datetime import datetime | |
| import uuid | |
| from pydantic import BaseModel | |
| # Logging (console only) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| handlers=[logging.StreamHandler()], | |
| ) | |
| logger = logging.getLogger(__name__) | |
| load_dotenv() | |
| REDIS_SERVICES = { | |
| # Self-managed Redis | |
| "redis_production": { | |
| "host": os.getenv("REDIS_PRODUCTION_HOST"), | |
| "port": os.getenv("REDIS_PRODUCTION_PORT"), | |
| "password": os.getenv("REDIS_PRODUCTION_PASSWORD"), | |
| }, | |
| "redis_development": { | |
| "host": os.getenv("REDIS_DEVELOPMENT_HOST"), | |
| "port": os.getenv("REDIS_DEVELOPMENT_PORT"), | |
| "password": os.getenv("REDIS_DEVELOPMENT_PASSWORD"), | |
| }, | |
| # Upstash Redis (REST API) | |
| "upstash_dev": { | |
| "rest_url": os.getenv("UPSTASH_REDIS_DEV_REST_URL"), | |
| "rest_token": os.getenv("UPSTASH_REDIS_DEV_REST_TOKEN"), | |
| }, | |
| "upstash_prod": { | |
| "rest_url": os.getenv("UPSTASH_REDIS_PROD_REST_URL"), | |
| "rest_token": os.getenv("UPSTASH_REDIS_PROD_REST_TOKEN"), | |
| }, | |
| } | |
| # ---------- Pydantic Response Models ---------- | |
| class RedisPingResponse(BaseModel): | |
| service_name: str | |
| success: bool | |
| error: str | None = None | |
| time: str | |
| latency_ms: float | None = None | |
| type: str # "self-managed" or "upstash" | |
| set_latency_ms: float | None = None | |
| get_latency_ms: float | None = None | |
| value: str | None = None | |
| class RedisPingAllResponse(BaseModel): | |
| redis_services: list[RedisPingResponse] | |
| # ---------- Self-managed Redis ---------- | |
| def ping_redis(service_name: str, redis_config: dict) -> RedisPingResponse: | |
| now = datetime.utcnow().isoformat() | |
| try: | |
| start_time = datetime.now() | |
| r = redis.Redis( | |
| host=redis_config["host"], | |
| port=int(redis_config["port"]), | |
| password=redis_config["password"], | |
| ssl=True if redis_config.get("ssl") else False, | |
| ) | |
| # PING | |
| r.ping() | |
| latency = (datetime.now() - start_time).total_seconds() * 1000 | |
| # Simple SET/GET check | |
| key = f"healthcheck:{uuid.uuid4()}" | |
| r.set(key, "ok") | |
| got = r.get(key) | |
| got_value = got.decode() if got else None # ✅ ensure only str, not bytes or Redis object | |
| logger.info(f"{service_name} PING OK, latency {latency:.2f}ms, value={got_value}") | |
| return RedisPingResponse( | |
| service_name=service_name, | |
| success=True, | |
| error=None, | |
| time=now, | |
| latency_ms=latency, | |
| type="self-managed", | |
| value=got_value, | |
| ) | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"{service_name} failed: {error_msg}") | |
| return RedisPingResponse( | |
| service_name=service_name, | |
| success=False, | |
| error=error_msg, | |
| time=now, | |
| latency_ms=None, | |
| type="self-managed", | |
| ) | |
| # ---------- Upstash Redis ---------- | |
| def ping_upstash(service_name: str, redis_config: dict) -> RedisPingResponse: | |
| now = datetime.utcnow().isoformat() | |
| try: | |
| headers = {"Authorization": f"Bearer {redis_config['rest_token']}"} | |
| # PING | |
| start_time = datetime.now() | |
| url = f"{redis_config['rest_url']}/ping" | |
| resp = requests.get(url, headers=headers) | |
| latency = (datetime.now() - start_time).total_seconds() * 1000 | |
| if resp.status_code != 200: | |
| return RedisPingResponse( | |
| service_name=service_name, | |
| success=False, | |
| error=f"HTTP {resp.status_code}: {resp.text}", | |
| time=now, | |
| latency_ms=None, | |
| type="upstash", | |
| ) | |
| try: | |
| result = resp.json().get("result") | |
| except Exception: | |
| result = resp.text.strip() | |
| if not result or result.upper() != "PONG": | |
| return RedisPingResponse( | |
| service_name=service_name, | |
| success=False, | |
| error=f"Unexpected response: {resp.status_code} - {resp.text}", | |
| time=now, | |
| latency_ms=None, | |
| type="upstash", | |
| ) | |
| # ✅ SET/GET test | |
| key = f"healthcheck:{uuid.uuid4()}" | |
| value = "ok" | |
| set_start = datetime.now() | |
| set_resp = requests.post(f"{redis_config['rest_url']}/set/{key}/{value}", headers=headers) | |
| set_latency = (datetime.now() - set_start).total_seconds() * 1000 | |
| get_start = datetime.now() | |
| get_resp = requests.get(f"{redis_config['rest_url']}/get/{key}", headers=headers) | |
| get_latency = (datetime.now() - get_start).total_seconds() * 1000 | |
| got_value = None | |
| if get_resp.status_code == 200: | |
| got_value = get_resp.json().get("result") | |
| return RedisPingResponse( | |
| service_name=service_name, | |
| success=True, | |
| error=None, | |
| time=now, | |
| latency_ms=latency, | |
| type="upstash", | |
| set_latency_ms=set_latency, | |
| get_latency_ms=get_latency, | |
| value=got_value, | |
| ) | |
| except Exception as e: | |
| return RedisPingResponse( | |
| service_name=service_name, | |
| success=False, | |
| error=str(e), | |
| time=now, | |
| latency_ms=None, | |
| type="upstash", | |
| ) | |
| # ---------- Run All ---------- | |
| def ping_all_redis_projects() -> RedisPingAllResponse: | |
| logger.info("Starting Redis service health check") | |
| results: list[RedisPingResponse] = [] | |
| for service_name, config in REDIS_SERVICES.items(): | |
| if not config or not any(config.values()): | |
| continue | |
| if "rest_url" in config: # Upstash | |
| results.append(ping_upstash(service_name, config)) | |
| else: # Self-managed | |
| results.append(ping_redis(service_name, config)) | |
| logger.info("Completed all Redis service checks") | |
| return RedisPingAllResponse(redis_services=results) | |