File size: 4,205 Bytes
0bf22fe bf292d9 66c4f69 2c8368f 531b571 0bf22fe 66c4f69 2c8368f bbfbcdd 2c8368f bbfbcdd ba71442 66c4f69 32b704b 2c8368f 66c4f69 bbfbcdd bf292d9 0bfda05 bf292d9 3692feb 66c4f69 3692feb 66c4f69 3692feb bf292d9 531b571 66c4f69 531b571 66c4f69 531b571 66c4f69 0bf22fe 2c8368f 0bf22fe bbfbcdd 66c4f69 2c8368f 0bf22fe 2c8368f 0bf22fe bf292d9 2c8368f 0bf22fe 2c8368f bbfbcdd 66c4f69 bf292d9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | # rabbit_repo.py
import uuid
import asyncio
import logging
from typing import Any, Optional
from urllib.parse import urlsplit, unquote
import aiormq
import aio_pika
from config import settings
from models import CloudEvent
from rabbit_base import RabbitBase
from utils import to_json, json_compress_str
logger = logging.getLogger(__name__)
class RabbitRepo(RabbitBase):
def __init__(self, external_source: str):
super().__init__(exchange_type_resolver=self._resolve_type)
self._source = external_source
def _resolve_type(self, exch: str) -> str:
if exch.lower().startswith("oa."):
return "direct"
if hasattr(settings, 'EXCHANGE_TYPES') and settings.EXCHANGE_TYPES:
matches = [k for k in settings.EXCHANGE_TYPES.keys()
if exch.lower().startswith(k.lower())]
if matches:
return settings.EXCHANGE_TYPES[max(matches, key=len)]
return "fanout"
def _publisher_user_id(self) -> Optional[str]:
user = getattr(settings, "RABBIT_USER_NAME", None)
if isinstance(user, str) and user.strip():
return user.strip()
amqp_url = getattr(settings, "AMQP_URL", None)
if isinstance(amqp_url, str) and amqp_url.strip():
try:
parsed = urlsplit(amqp_url)
if parsed.username:
return unquote(parsed.username)
except Exception:
pass
return None
async def _publish_with_retry(self, exchange: str, body: bytes, routing_key: str = "") -> None:
attempts, delay = 0, 0.5
publisher_user_id = self._publisher_user_id()
while True:
try:
ex = await self.ensure_exchange(exchange)
msg = aio_pika.Message(
body=body,
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
user_id=publisher_user_id,
)
await ex.publish(msg, routing_key=routing_key)
return
except (asyncio.CancelledError,
aiormq.exceptions.ChannelInvalidStateError,
aiormq.exceptions.ConnectionClosed,
aio_pika.exceptions.AMQPError,
RuntimeError) as e:
attempts += 1
logger.warning("publish failed attempt=%d exchange=%s rk=%s err=%r",
attempts, exchange, routing_key, e)
try:
await self.close()
except Exception:
pass
if attempts >= 5:
logger.exception("publish giving up after %d attempts", attempts)
raise
await asyncio.sleep(delay)
delay = min(delay * 2, 5.0)
async def publish(self, exchange: str, obj: Any, routing_key: str = "") -> None:
payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
evt = CloudEvent.wrap(
event_id=str(uuid.uuid4()),
event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
source=self._source,
data=payload,
)
body = evt.model_dump_json(exclude_none=True).encode("utf-8")
await self._publish_with_retry(exchange, body, routing_key)
async def publish_jsonz(
self,
exchange: str,
obj: Any,
routing_key: str = "",
with_id: Optional[str] = None,
) -> str:
payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
datajson = to_json(payload)
datajsonZ = json_compress_str(datajson)
wrapped: Any = (datajsonZ, with_id) if with_id else datajsonZ
evt = CloudEvent.wrap(
event_id=str(uuid.uuid4()),
event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
source=self._source,
data=wrapped,
)
body = evt.model_dump_json(exclude_none=True).encode("utf-8")
await self._publish_with_retry(exchange, body, routing_key)
return datajsonZ
|