backend / redis_keep_alive_service.py
Soumik555's picture
added upstash
d93d481
import os
import requests
from dotenv import load_dotenv
import redis
import logging
from datetime import datetime
import uuid
from pydantic import BaseModel
# Logging (console only)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
load_dotenv()
REDIS_SERVICES = {
# Self-managed Redis
"redis_production": {
"host": os.getenv("REDIS_PRODUCTION_HOST"),
"port": os.getenv("REDIS_PRODUCTION_PORT"),
"password": os.getenv("REDIS_PRODUCTION_PASSWORD"),
},
"redis_development": {
"host": os.getenv("REDIS_DEVELOPMENT_HOST"),
"port": os.getenv("REDIS_DEVELOPMENT_PORT"),
"password": os.getenv("REDIS_DEVELOPMENT_PASSWORD"),
},
# Upstash Redis (REST API)
"upstash_dev": {
"rest_url": os.getenv("UPSTASH_REDIS_DEV_REST_URL"),
"rest_token": os.getenv("UPSTASH_REDIS_DEV_REST_TOKEN"),
},
"upstash_prod": {
"rest_url": os.getenv("UPSTASH_REDIS_PROD_REST_URL"),
"rest_token": os.getenv("UPSTASH_REDIS_PROD_REST_TOKEN"),
},
}
# ---------- Pydantic Response Models ----------
class RedisPingResponse(BaseModel):
service_name: str
success: bool
error: str | None = None
time: str
latency_ms: float | None = None
type: str # "self-managed" or "upstash"
set_latency_ms: float | None = None
get_latency_ms: float | None = None
value: str | None = None
class RedisPingAllResponse(BaseModel):
redis_services: list[RedisPingResponse]
# ---------- Self-managed Redis ----------
def ping_redis(service_name: str, redis_config: dict) -> RedisPingResponse:
now = datetime.utcnow().isoformat()
try:
start_time = datetime.now()
r = redis.Redis(
host=redis_config["host"],
port=int(redis_config["port"]),
password=redis_config["password"],
ssl=True if redis_config.get("ssl") else False,
)
# PING
r.ping()
latency = (datetime.now() - start_time).total_seconds() * 1000
# Simple SET/GET check
key = f"healthcheck:{uuid.uuid4()}"
r.set(key, "ok")
got = r.get(key)
got_value = got.decode() if got else None # ✅ ensure only str, not bytes or Redis object
logger.info(f"{service_name} PING OK, latency {latency:.2f}ms, value={got_value}")
return RedisPingResponse(
service_name=service_name,
success=True,
error=None,
time=now,
latency_ms=latency,
type="self-managed",
value=got_value,
)
except Exception as e:
error_msg = str(e)
logger.error(f"{service_name} failed: {error_msg}")
return RedisPingResponse(
service_name=service_name,
success=False,
error=error_msg,
time=now,
latency_ms=None,
type="self-managed",
)
# ---------- Upstash Redis ----------
def ping_upstash(service_name: str, redis_config: dict) -> RedisPingResponse:
now = datetime.utcnow().isoformat()
try:
headers = {"Authorization": f"Bearer {redis_config['rest_token']}"}
# PING
start_time = datetime.now()
url = f"{redis_config['rest_url']}/ping"
resp = requests.get(url, headers=headers)
latency = (datetime.now() - start_time).total_seconds() * 1000
if resp.status_code != 200:
return RedisPingResponse(
service_name=service_name,
success=False,
error=f"HTTP {resp.status_code}: {resp.text}",
time=now,
latency_ms=None,
type="upstash",
)
try:
result = resp.json().get("result")
except Exception:
result = resp.text.strip()
if not result or result.upper() != "PONG":
return RedisPingResponse(
service_name=service_name,
success=False,
error=f"Unexpected response: {resp.status_code} - {resp.text}",
time=now,
latency_ms=None,
type="upstash",
)
# ✅ SET/GET test
key = f"healthcheck:{uuid.uuid4()}"
value = "ok"
set_start = datetime.now()
set_resp = requests.post(f"{redis_config['rest_url']}/set/{key}/{value}", headers=headers)
set_latency = (datetime.now() - set_start).total_seconds() * 1000
get_start = datetime.now()
get_resp = requests.get(f"{redis_config['rest_url']}/get/{key}", headers=headers)
get_latency = (datetime.now() - get_start).total_seconds() * 1000
got_value = None
if get_resp.status_code == 200:
got_value = get_resp.json().get("result")
return RedisPingResponse(
service_name=service_name,
success=True,
error=None,
time=now,
latency_ms=latency,
type="upstash",
set_latency_ms=set_latency,
get_latency_ms=get_latency,
value=got_value,
)
except Exception as e:
return RedisPingResponse(
service_name=service_name,
success=False,
error=str(e),
time=now,
latency_ms=None,
type="upstash",
)
# ---------- Run All ----------
def ping_all_redis_projects() -> RedisPingAllResponse:
logger.info("Starting Redis service health check")
results: list[RedisPingResponse] = []
for service_name, config in REDIS_SERVICES.items():
if not config or not any(config.values()):
continue
if "rest_url" in config: # Upstash
results.append(ping_upstash(service_name, config))
else: # Self-managed
results.append(ping_redis(service_name, config))
logger.info("Completed all Redis service checks")
return RedisPingAllResponse(redis_services=results)