Module rabbit
None
None
View Source
from ._wait import constant, expo, fibo
from .client import AioRabbitClient
from .dlx import DLX
from .exchange import Exchange
from .publish import Publish
from .queue import Queue
from .subscribe import Subscribe
__version__ = "2.1.1"
__all__ = [
"__version__",
"AioRabbitClient",
"DLX",
"Exchange",
"Queue",
"Publish",
"Subscribe",
"expo",
"constant",
"fibo",
]
Sub-modules
- rabbit.cli
- rabbit.client
- rabbit.dlx
- rabbit.exceptions
- rabbit.exchange
- rabbit.job
- rabbit.logger
- rabbit.publish
- rabbit.queue
- rabbit.subscribe
Variables
__version__
Functions
constant
def constant(
headers,
delay: int = 300000
) -> int
View Source
def constant(headers, delay: int = int(os.getenv("CONSTANT_DELAY", 300000))) -> int:
logger.debug(f"constant delay strategy: [delay={delay}]")
return _set_timeout(headers, delay)
expo
def expo(
headers,
delay: int = 300000,
base: int = 2,
factor: int = 1,
max_delay: Optional[int] = None
) -> int
View Source
def expo(
headers,
delay: int = int(os.getenv("EXPO_DELAY", 300000)),
base: int = int(os.getenv("EXPO_BASE", 2)),
factor: int = int(os.getenv("EXPO_FACTOR", 1)),
max_delay: Optional[int] = os.getenv("EXPO_MAX_DELAY"), # type: ignore
) -> int:
logger.debug(
f"expo delay strategy: [delay={delay}, base={base}, factor={factor}, max_delay={max_delay}]"
)
if max_delay:
max_delay = int(max_delay)
current_delay = _set_timeout(headers, delay)
delay_updated = int(current_delay * (base * factor))
if max_delay is None or delay_updated <= max_delay:
return delay_updated
return int(max_delay)
fibo
def fibo(
headers,
delay: int = 300000,
max_delay: int = 86400000
) -> int
View Source
def fibo(
headers,
delay: int = int(os.getenv("FIBO_DELAY", 300000)),
max_delay: int = int(os.getenv("FIBO_MAX_DELAY", 86400000)),
) -> int:
logger.debug(f"fibo delay strategy: [delay={delay}, max_delay={max_delay}]")
current_delay = _set_timeout(headers, delay)
if current_delay < max_delay:
return int(current_delay + 60000)
return int(max_delay)
Classes
AioRabbitClient
class AioRabbitClient(
)
View Source
@attr.s(slots=True, repr=False)
class AioRabbitClient:
_protocol = attr.ib(type=AmqpProtocol, init=False, default=None)
transport = attr.ib(init=False, default=None)
_event = attr.ib(init=False)
def __attrs_post_init__(self):
self._event = asyncio.Event()
async def watch(self, item):
logger.info("Watch connection enabled.")
self._event.clear()
await self._event.wait()
logger.error("Connection lost.")
if not item.__module__.endswith(".dlx"):
logger.warning("Trying to establish a new connection...")
await item.configure()
@property
def protocol(self):
return self._protocol
@protocol.setter
def protocol(self, value):
self._protocol = value
self._event.set()
async def get_channel(self) -> Channel:
if not self.protocol:
raise AttributeNotInitialized("Connection not initialized.")
channel = await self.protocol.channel()
return channel
async def connect(self, **kwargs) -> None:
self.transport, self.protocol = await aioamqp.connect(**kwargs)
async def persistent_connect(self, **kwargs):
while True:
try:
self.transport, self.protocol = await aioamqp.connect(**kwargs)
await asyncio.sleep(2)
await self.protocol.wait_closed()
self.transport.close()
except (OSError, aioamqp.exceptions.AmqpClosedConnection) as err:
logger.error(f"Error: {err} - Params: {kwargs}")
await asyncio.sleep(5)
await self.persistent_connect(**kwargs)
Class variables
transport
Instance variables
protocol
transport
Methods
connect
def connect(
self,
**kwargs
) -> None
View Source
async def connect(self, **kwargs) -> None:
self.transport, self.protocol = await aioamqp.connect(**kwargs)
get_channel
def get_channel(
self
) -> aioamqp.channel.Channel
View Source
async def get_channel(self) -> Channel:
if not self.protocol:
raise AttributeNotInitialized("Connection not initialized.")
channel = await self.protocol.channel()
return channel
persistent_connect
def persistent_connect(
self,
**kwargs
)
View Source
async def persistent_connect(self, **kwargs):
while True:
try:
self.transport, self.protocol = await aioamqp.connect(**kwargs)
await asyncio.sleep(2)
await self.protocol.wait_closed()
self.transport.close()
except (OSError, aioamqp.exceptions.AmqpClosedConnection) as err:
logger.error(f"Error: {err} - Params: {kwargs}")
await asyncio.sleep(5)
await self.persistent_connect(**kwargs)
watch
def watch(
self,
item
)
View Source
async def watch(self, item):
logger.info("Watch connection enabled.")
self._event.clear()
await self._event.wait()
logger.error("Connection lost.")
if not item.__module__.endswith(".dlx"):
logger.warning("Trying to establish a new connection...")
await item.configure()
DLX
class DLX(
client: rabbit.client.AioRabbitClient,
exchange: rabbit.exchange.Exchange,
dlq_exchange: rabbit.exchange.Exchange,
queue: rabbit.queue.Queue,
delay_strategy: Callable = <function constant at 0x7f3ec5471040>
)
View Source
@attr.s(slots=True, repr=False)
class DLX:
_client = attr.ib(
type=AioRabbitClient,
validator=attr.validators.instance_of(AioRabbitClient),
repr=False,
)
exchange = attr.ib(
type=Exchange,
validator=attr.validators.instance_of(Exchange),
)
dlq_exchange = attr.ib(
type=Exchange,
validator=attr.validators.instance_of(Exchange),
)
queue = attr.ib(
type=Queue,
validator=attr.validators.instance_of(Queue),
)
delay_strategy = attr.ib(
type=Callable, default=constant, validator=attr.validators.is_callable()
)
_channel = attr.ib(init=False, repr=False)
def __repr__(self) -> str:
return f"DLX(queue={self.queue}, delay_strategy={self.delay_strategy.__name__}, exchange={self.exchange}), dlq_exchange={self.dlq_exchange}"
async def configure(self) -> None:
self._channel = await self._client.get_channel()
try:
await self._configure_queue()
await self._configure_exchange()
await self._configure_queue_bind()
except AttributeNotInitialized:
logger.debug("Waiting client initialization...DLX")
async def _configure_exchange(self) -> None:
await asyncio.gather(
self._channel.exchange_declare(
exchange_name=self.exchange.name,
type_name=self.exchange.exchange_type,
durable=self.exchange.durable,
),
self._channel.exchange_declare(
exchange_name=self.dlq_exchange.name,
type_name=self.dlq_exchange.exchange_type,
durable=self.dlq_exchange.durable,
),
)
await asyncio.sleep(2)
async def _configure_queue(self) -> None:
queue_name = await self._ensure_endswith_dlq(self.queue.name)
await self._channel.queue_declare(
queue_name=queue_name,
durable=self.queue.durable,
arguments=self.queue.arguments,
)
async def _configure_queue_bind(self) -> None:
queue_name = await self._ensure_endswith_dlq(self.queue.name)
await asyncio.gather(
self._channel.queue_bind(
exchange_name=self.exchange.name,
queue_name=queue_name,
routing_key=self.queue.name,
),
self._channel.queue_bind(
exchange_name=self.dlq_exchange.name,
queue_name=self.queue.name,
routing_key=self.dlq_exchange.topic,
),
)
async def _ensure_endswith_dlq(self, value: str) -> str:
if not value.endswith(".dlq"):
value = f"{value}.dlq"
return value
async def send_event(
self, cause: Exception, body: bytes, envelope: Envelope, properties: Properties
) -> None:
timeout = self.delay_strategy(properties.headers)
properties = await self._get_properties(timeout, cause, envelope)
logger.debug(
f"Send event to dlq: [exchange: {self.exchange.name}"
f" | routing_key: {self.queue.name} | properties: {properties}]"
)
try:
await self._channel.publish(
body, self.exchange.name, self.queue.name, properties
)
except AttributeError:
raise OperationError("Ensure that instance was connected ")
async def _get_properties(
self, timeout: int, exception_message: Exception, envelope: Envelope
) -> dict:
return {
"expiration": f"{timeout}",
"headers": {
"x-delay": f"{timeout}",
"x-exception-message": f"{exception_message}",
"x-original-exchange": f"{envelope.exchange_name}",
"x-original-routingKey": f"{envelope.routing_key}",
},
}
Class variables
delay_strategy
dlq_exchange
exchange
queue
Instance variables
delay_strategy
dlq_exchange
exchange
queue
Methods
configure
def configure(
self
) -> None
View Source
async def configure(self) -> None:
self._channel = await self._client.get_channel()
try:
await self._configure_queue()
await self._configure_exchange()
await self._configure_queue_bind()
except AttributeNotInitialized:
logger.debug("Waiting client initialization...DLX")
send_event
def send_event(
self,
cause: Exception,
body: bytes,
envelope: aioamqp.envelope.Envelope,
properties: aioamqp.properties.Properties
) -> None
View Source
async def send_event(
self, cause: Exception, body: bytes, envelope: Envelope, properties: Properties
) -> None:
timeout = self.delay_strategy(properties.headers)
properties = await self._get_properties(timeout, cause, envelope)
logger.debug(
f"Send event to dlq: [exchange: {self.exchange.name}"
f" | routing_key: {self.queue.name} | properties: {properties}]"
)
try:
await self._channel.publish(
body, self.exchange.name, self.queue.name, properties
)
except AttributeError:
raise OperationError("Ensure that instance was connected ")
Exchange
class Exchange(
name: str,
exchange_type: str,
topic: str = '#',
durable: bool = True
)
View Source
@attr.s(frozen=True)
class Exchange:
name = attr.ib(type=str, validator=attr.validators.instance_of(str))
exchange_type = attr.ib(type=str, validator=attr.validators.instance_of(str))
topic = attr.ib(type=str, default="#", validator=attr.validators.instance_of(str))
durable = attr.ib(
type=bool, default=True, validator=attr.validators.instance_of(bool)
)
Class variables
durable
exchange_type
name
topic
Publish
class Publish(
client: rabbit.client.AioRabbitClient,
exchange_name: str = 'default.in.exchange',
routing_key: str = '#'
)
View Source
@attr.s(slots=True)
class Publish:
_client = attr.ib(
type=AioRabbitClient,
validator=attr.validators.instance_of(AioRabbitClient),
repr=False,
)
exchange_name = attr.ib(
type=str,
default=os.getenv("PUBLISH_EXCHANGE_NAME", "default.in.exchange"),
validator=attr.validators.instance_of(str),
)
routing_key = attr.ib(
type=str,
default=os.getenv("PUBLISH_ROUTING_KEY", "#"),
validator=attr.validators.instance_of(str),
)
_channel = attr.ib(init=False, repr=False)
async def configure(self) -> None:
await asyncio.sleep(1)
self._channel = await self._client.get_channel()
loop = asyncio.get_running_loop()
# loop.create_task(self._client.watch(self), name="publish_watcher")
loop.create_task(self._client.watch(self))
async def send_event(self, payload: bytes, **kwargs) -> None:
await self._channel.publish(
payload=payload,
exchange_name=self.exchange_name,
routing_key=self.routing_key,
**kwargs,
)
Class variables
exchange_name
routing_key
Instance variables
exchange_name
routing_key
Methods
configure
def configure(
self
) -> None
View Source
async def configure(self) -> None:
await asyncio.sleep(1)
self._channel = await self._client.get_channel()
loop = asyncio.get_running_loop()
# loop.create_task(self._client.watch(self), name="publish_watcher")
loop.create_task(self._client.watch(self))
send_event
def send_event(
self,
payload: bytes,
**kwargs
) -> None
View Source
async def send_event(self, payload: bytes, **kwargs) -> None:
await self._channel.publish(
payload=payload,
exchange_name=self.exchange_name,
routing_key=self.routing_key,
**kwargs,
)
Queue
class Queue(
name: str,
durable: bool = True,
arguments: dict = NOTHING
)
View Source
@attr.s(frozen=True)
class Queue:
name = attr.ib(type=str, validator=attr.validators.instance_of(str))
durable = attr.ib(
type=bool, default=True, validator=attr.validators.instance_of(bool)
)
arguments = attr.ib(
type=dict, factory=dict, validator=attr.validators.instance_of(dict)
)
Class variables
arguments
durable
name
Subscribe
class Subscribe(
client: rabbit.client.AioRabbitClient,
task: Callable,
exchange: rabbit.exchange.Exchange = Exchange(name='default.in.exchange', exchange_type='topic', topic='#', durable=True),
queue: rabbit.queue.Queue = Queue(name='default.subscribe.queue', durable=True, arguments={}),
concurrent: int = 1,
delay_strategy: Callable = <function constant at 0x7f3ec5471040>
)
View Source
@attr.s(slots=True, repr=False)
class Subscribe:
_client = attr.ib(
type=AioRabbitClient,
validator=attr.validators.instance_of(AioRabbitClient),
repr=False,
)
task = attr.ib(type=Callable, validator=attr.validators.is_callable())
exchange = attr.ib(
type=Exchange,
default=Exchange(
name=os.getenv("SUBSCRIBE_EXCHANGE_NAME", "default.in.exchange"),
exchange_type=os.getenv("SUBSCRIBE_EXCHANGE_TYPE", "topic"),
topic=os.getenv("SUBSCRIBE_TOPIC", "#"),
),
validator=attr.validators.instance_of(Exchange),
)
queue = attr.ib(
type=Queue,
default=Queue(
name=os.getenv("SUBSCRIBE_QUEUE_NAME", "default.subscribe.queue")
),
validator=attr.validators.instance_of(Queue),
)
concurrent = attr.ib(
type=int, default=1, validator=attr.validators.instance_of(int)
)
delay_strategy = attr.ib(
type=Callable, default=constant, validator=attr.validators.is_callable()
)
_dlx = attr.ib(type=DLX, validator=attr.validators.instance_of(DLX), init=False)
_job_queue = attr.ib(init=False, repr=False)
_loop = attr.ib(init=False, repr=False)
_channel = attr.ib(init=False, repr=False)
def __repr__(self) -> str:
return f"Subscribe(task={self.task.__name__}, exchange={self.exchange}, queue={self.queue}, concurrent={self.concurrent}, dlx={self._dlx})"
def __attrs_post_init__(self) -> None:
self._dlx = DLX(
client=self._client,
exchange=Exchange(
name=os.getenv("DLX_EXCHANGE_NAME", "DLX"),
exchange_type=os.getenv("DLX_TYPE", "direct"),
),
dlq_exchange=Exchange(
name=os.getenv(
"DLQ_EXCHANGE_NAME", f"dlqReRouter.{self.exchange.name}"
),
exchange_type=os.getenv("DLQ_EXCHANGE_TYPE", "topic"),
topic=os.getenv("SUBSCRIBE_QUEUE", self.queue.name),
),
queue=Queue(
name=self.queue.name,
arguments={
"x-dead-letter-exchange": f"dlqReRouter.{self.exchange.name}",
"x-dead-letter-routing-key": self.queue.name,
},
),
delay_strategy=self.delay_strategy,
)
self._job_queue = asyncio.Queue(maxsize=self.concurrent)
self._loop = asyncio.get_event_loop()
async def configure(self) -> None:
await asyncio.sleep(3)
self._channel = await self._client.get_channel()
await self.qos(prefetch_count=self.concurrent)
# self._loop.create_task(self._client.watch(self), name="subscribe_watcher")
self._loop.create_task(self._client.watch(self))
with suppress(SynchronizationError):
try:
await self._configure_queue()
await self._dlx.configure()
await self._configure_exchange()
await self._configure_queue_bind()
except AttributeNotInitialized:
logger.debug("Waiting client initialization...SUBSCRIBE")
async def _configure_exchange(self) -> None:
await self._channel.exchange_declare(
exchange_name=self.exchange.name,
type_name=self.exchange.exchange_type,
durable=self.exchange.durable,
)
await asyncio.sleep(2)
async def _configure_queue(self) -> None:
await self._channel.queue_declare(
queue_name=self.queue.name, durable=self.queue.durable
)
async def _configure_queue_bind(self) -> None:
await self._channel.queue_bind(
exchange_name=self.exchange.name,
queue_name=self.queue.name,
routing_key=self.exchange.topic,
)
await self._channel.basic_consume(
callback=self.callback, queue_name=self.queue.name
)
async def callback(
self, channel: Channel, body: bytes, envelope: Envelope, properties: Properties
) -> None:
if not self._job_queue.full():
self._job_queue.put_nowait((body, envelope, properties))
self._loop.create_task(self._run())
else:
await self.nack_event(envelope, requeue=True)
await self._job_queue.join()
async def _run(self) -> None:
try:
body, envelope, properties = await self._job_queue.get()
await self.task(body)
self._job_queue.task_done()
await self.ack_event(envelope, multiple=False)
except Exception as cause:
await asyncio.shield(
asyncio.gather(
self.ack_event(envelope, multiple=False),
self._dlx.send_event(cause, body, envelope, properties),
)
)
async def ack_event(self, envelope: Envelope, multiple: bool = False) -> None:
await self._channel.basic_client_ack(
delivery_tag=envelope.delivery_tag, multiple=multiple
)
async def nack_event(
self, envelope: Envelope, multiple: bool = False, requeue: bool = True
) -> None:
await self._channel.basic_client_nack(
delivery_tag=envelope.delivery_tag, multiple=multiple, requeue=requeue
)
async def reject_event(self, envelope: Envelope, requeue: bool = False) -> None:
await self._channel.basic_reject(
delivery_tag=envelope.delivery_tag, requeue=requeue
)
async def qos(
self,
prefetch_size: int = 0,
prefetch_count: int = 0,
connection_global: bool = False,
):
await self._channel.basic_qos(
prefetch_size=prefetch_size,
prefetch_count=prefetch_count,
connection_global=connection_global,
)
Class variables
concurrent
delay_strategy
exchange
queue
task
Instance variables
concurrent
delay_strategy
exchange
queue
task
Methods
ack_event
def ack_event(
self,
envelope: aioamqp.envelope.Envelope,
multiple: bool = False
) -> None
View Source
async def ack_event(self, envelope: Envelope, multiple: bool = False) -> None:
await self._channel.basic_client_ack(
delivery_tag=envelope.delivery_tag, multiple=multiple
)
callback
def callback(
self,
channel: aioamqp.channel.Channel,
body: bytes,
envelope: aioamqp.envelope.Envelope,
properties: aioamqp.properties.Properties
) -> None
View Source
async def callback(
self, channel: Channel, body: bytes, envelope: Envelope, properties: Properties
) -> None:
if not self._job_queue.full():
self._job_queue.put_nowait((body, envelope, properties))
self._loop.create_task(self._run())
else:
await self.nack_event(envelope, requeue=True)
await self._job_queue.join()
configure
def configure(
self
) -> None
View Source
async def configure(self) -> None:
await asyncio.sleep(3)
self._channel = await self._client.get_channel()
await self.qos(prefetch_count=self.concurrent)
# self._loop.create_task(self._client.watch(self), name="subscribe_watcher")
self._loop.create_task(self._client.watch(self))
with suppress(SynchronizationError):
try:
await self._configure_queue()
await self._dlx.configure()
await self._configure_exchange()
await self._configure_queue_bind()
except AttributeNotInitialized:
logger.debug("Waiting client initialization...SUBSCRIBE")
nack_event
def nack_event(
self,
envelope: aioamqp.envelope.Envelope,
multiple: bool = False,
requeue: bool = True
) -> None
View Source
async def nack_event(
self, envelope: Envelope, multiple: bool = False, requeue: bool = True
) -> None:
await self._channel.basic_client_nack(
delivery_tag=envelope.delivery_tag, multiple=multiple, requeue=requeue
)
qos
def qos(
self,
prefetch_size: int = 0,
prefetch_count: int = 0,
connection_global: bool = False
)
View Source
async def qos(
self,
prefetch_size: int = 0,
prefetch_count: int = 0,
connection_global: bool = False,
):
await self._channel.basic_qos(
prefetch_size=prefetch_size,
prefetch_count=prefetch_count,
connection_global=connection_global,
)
reject_event
def reject_event(
self,
envelope: aioamqp.envelope.Envelope,
requeue: bool = False
) -> None
View Source
async def reject_event(self, envelope: Envelope, requeue: bool = False) -> None:
await self._channel.basic_reject(
delivery_tag=envelope.delivery_tag, requeue=requeue
)