Skip to content

Module rabbit

None

None

View Source
from ._wait import constant, expo, fibo

from .client import AioRabbitClient

from .dlx import DLX

from .exchange import Exchange

from .publish import Publish

from .queue import Queue

from .subscribe import Subscribe

__version__ = "2.1.1"

__all__ = [

    "__version__",

    "AioRabbitClient",

    "DLX",

    "Exchange",

    "Queue",

    "Publish",

    "Subscribe",

    "expo",

    "constant",

    "fibo",

]

Sub-modules

Variables

__version__

Functions

constant

def constant(
    headers,
    delay: int = 300000
) -> int
View Source
def constant(headers, delay: int = int(os.getenv("CONSTANT_DELAY", 300000))) -> int:

    logger.debug(f"constant delay strategy: [delay={delay}]")

    return _set_timeout(headers, delay)

expo

def expo(
    headers,
    delay: int = 300000,
    base: int = 2,
    factor: int = 1,
    max_delay: Optional[int] = None
) -> int
View Source
def expo(

    headers,

    delay: int = int(os.getenv("EXPO_DELAY", 300000)),

    base: int = int(os.getenv("EXPO_BASE", 2)),

    factor: int = int(os.getenv("EXPO_FACTOR", 1)),

    max_delay: Optional[int] = os.getenv("EXPO_MAX_DELAY"),  # type: ignore

) -> int:

    logger.debug(

        f"expo delay strategy: [delay={delay}, base={base}, factor={factor}, max_delay={max_delay}]"

    )

    if max_delay:

        max_delay = int(max_delay)

    current_delay = _set_timeout(headers, delay)

    delay_updated = int(current_delay * (base * factor))

    if max_delay is None or delay_updated <= max_delay:

        return delay_updated

    return int(max_delay)

fibo

def fibo(
    headers,
    delay: int = 300000,
    max_delay: int = 86400000
) -> int
View Source
def fibo(

    headers,

    delay: int = int(os.getenv("FIBO_DELAY", 300000)),

    max_delay: int = int(os.getenv("FIBO_MAX_DELAY", 86400000)),

) -> int:

    logger.debug(f"fibo delay strategy: [delay={delay}, max_delay={max_delay}]")

    current_delay = _set_timeout(headers, delay)

    if current_delay < max_delay:

        return int(current_delay + 60000)

    return int(max_delay)

Classes

AioRabbitClient

class AioRabbitClient(

)
View Source
@attr.s(slots=True, repr=False)

class AioRabbitClient:

    _protocol = attr.ib(type=AmqpProtocol, init=False, default=None)

    transport = attr.ib(init=False, default=None)

    _event = attr.ib(init=False)

    def __attrs_post_init__(self):

        self._event = asyncio.Event()

    async def watch(self, item):

        logger.info("Watch connection enabled.")

        self._event.clear()

        await self._event.wait()

        logger.error("Connection lost.")

        if not item.__module__.endswith(".dlx"):

            logger.warning("Trying to establish a new connection...")

            await item.configure()

    @property

    def protocol(self):

        return self._protocol

    @protocol.setter

    def protocol(self, value):

        self._protocol = value

        self._event.set()

    async def get_channel(self) -> Channel:

        if not self.protocol:

            raise AttributeNotInitialized("Connection not initialized.")

        channel = await self.protocol.channel()

        return channel

    async def connect(self, **kwargs) -> None:

        self.transport, self.protocol = await aioamqp.connect(**kwargs)

    async def persistent_connect(self, **kwargs):

        while True:

            try:

                self.transport, self.protocol = await aioamqp.connect(**kwargs)

                await asyncio.sleep(2)

                await self.protocol.wait_closed()

                self.transport.close()

            except (OSError, aioamqp.exceptions.AmqpClosedConnection) as err:

                logger.error(f"Error: {err} - Params: {kwargs}")

                await asyncio.sleep(5)

                await self.persistent_connect(**kwargs)

Class variables

transport

Instance variables

protocol
transport

Methods

connect

def connect(
    self,
    **kwargs
) -> None
View Source
    async def connect(self, **kwargs) -> None:

        self.transport, self.protocol = await aioamqp.connect(**kwargs)

get_channel

def get_channel(
    self
) -> aioamqp.channel.Channel
View Source
    async def get_channel(self) -> Channel:

        if not self.protocol:

            raise AttributeNotInitialized("Connection not initialized.")

        channel = await self.protocol.channel()

        return channel

persistent_connect

def persistent_connect(
    self,
    **kwargs
)
View Source
    async def persistent_connect(self, **kwargs):

        while True:

            try:

                self.transport, self.protocol = await aioamqp.connect(**kwargs)

                await asyncio.sleep(2)

                await self.protocol.wait_closed()

                self.transport.close()

            except (OSError, aioamqp.exceptions.AmqpClosedConnection) as err:

                logger.error(f"Error: {err} - Params: {kwargs}")

                await asyncio.sleep(5)

                await self.persistent_connect(**kwargs)

watch

def watch(
    self,
    item
)
View Source
    async def watch(self, item):

        logger.info("Watch connection enabled.")

        self._event.clear()

        await self._event.wait()

        logger.error("Connection lost.")

        if not item.__module__.endswith(".dlx"):

            logger.warning("Trying to establish a new connection...")

            await item.configure()

DLX

class DLX(
    client: rabbit.client.AioRabbitClient,
    exchange: rabbit.exchange.Exchange,
    dlq_exchange: rabbit.exchange.Exchange,
    queue: rabbit.queue.Queue,
    delay_strategy: Callable = <function constant at 0x7f3ec5471040>
)
View Source
@attr.s(slots=True, repr=False)

class DLX:

    _client = attr.ib(

        type=AioRabbitClient,

        validator=attr.validators.instance_of(AioRabbitClient),

        repr=False,

    )

    exchange = attr.ib(

        type=Exchange,

        validator=attr.validators.instance_of(Exchange),

    )

    dlq_exchange = attr.ib(

        type=Exchange,

        validator=attr.validators.instance_of(Exchange),

    )

    queue = attr.ib(

        type=Queue,

        validator=attr.validators.instance_of(Queue),

    )

    delay_strategy = attr.ib(

        type=Callable, default=constant, validator=attr.validators.is_callable()

    )

    _channel = attr.ib(init=False, repr=False)

    def __repr__(self) -> str:

        return f"DLX(queue={self.queue}, delay_strategy={self.delay_strategy.__name__}, exchange={self.exchange}), dlq_exchange={self.dlq_exchange}"

    async def configure(self) -> None:

        self._channel = await self._client.get_channel()

        try:

            await self._configure_queue()

            await self._configure_exchange()

            await self._configure_queue_bind()

        except AttributeNotInitialized:

            logger.debug("Waiting client initialization...DLX")

    async def _configure_exchange(self) -> None:

        await asyncio.gather(

            self._channel.exchange_declare(

                exchange_name=self.exchange.name,

                type_name=self.exchange.exchange_type,

                durable=self.exchange.durable,

            ),

            self._channel.exchange_declare(

                exchange_name=self.dlq_exchange.name,

                type_name=self.dlq_exchange.exchange_type,

                durable=self.dlq_exchange.durable,

            ),

        )

        await asyncio.sleep(2)

    async def _configure_queue(self) -> None:

        queue_name = await self._ensure_endswith_dlq(self.queue.name)

        await self._channel.queue_declare(

            queue_name=queue_name,

            durable=self.queue.durable,

            arguments=self.queue.arguments,

        )

    async def _configure_queue_bind(self) -> None:

        queue_name = await self._ensure_endswith_dlq(self.queue.name)

        await asyncio.gather(

            self._channel.queue_bind(

                exchange_name=self.exchange.name,

                queue_name=queue_name,

                routing_key=self.queue.name,

            ),

            self._channel.queue_bind(

                exchange_name=self.dlq_exchange.name,

                queue_name=self.queue.name,

                routing_key=self.dlq_exchange.topic,

            ),

        )

    async def _ensure_endswith_dlq(self, value: str) -> str:

        if not value.endswith(".dlq"):

            value = f"{value}.dlq"

        return value

    async def send_event(

        self, cause: Exception, body: bytes, envelope: Envelope, properties: Properties

    ) -> None:

        timeout = self.delay_strategy(properties.headers)

        properties = await self._get_properties(timeout, cause, envelope)

        logger.debug(

            f"Send event to dlq: [exchange: {self.exchange.name}"

            f" | routing_key: {self.queue.name} | properties: {properties}]"

        )

        try:

            await self._channel.publish(

                body, self.exchange.name, self.queue.name, properties

            )

        except AttributeError:

            raise OperationError("Ensure that instance was connected ")

    async def _get_properties(

        self, timeout: int, exception_message: Exception, envelope: Envelope

    ) -> dict:

        return {

            "expiration": f"{timeout}",

            "headers": {

                "x-delay": f"{timeout}",

                "x-exception-message": f"{exception_message}",

                "x-original-exchange": f"{envelope.exchange_name}",

                "x-original-routingKey": f"{envelope.routing_key}",

            },

        }

Class variables

delay_strategy
dlq_exchange
exchange
queue

Instance variables

delay_strategy
dlq_exchange
exchange
queue

Methods

configure

def configure(
    self
) -> None
View Source
    async def configure(self) -> None:

        self._channel = await self._client.get_channel()

        try:

            await self._configure_queue()

            await self._configure_exchange()

            await self._configure_queue_bind()

        except AttributeNotInitialized:

            logger.debug("Waiting client initialization...DLX")

send_event

def send_event(
    self,
    cause: Exception,
    body: bytes,
    envelope: aioamqp.envelope.Envelope,
    properties: aioamqp.properties.Properties
) -> None
View Source
    async def send_event(

        self, cause: Exception, body: bytes, envelope: Envelope, properties: Properties

    ) -> None:

        timeout = self.delay_strategy(properties.headers)

        properties = await self._get_properties(timeout, cause, envelope)

        logger.debug(

            f"Send event to dlq: [exchange: {self.exchange.name}"

            f" | routing_key: {self.queue.name} | properties: {properties}]"

        )

        try:

            await self._channel.publish(

                body, self.exchange.name, self.queue.name, properties

            )

        except AttributeError:

            raise OperationError("Ensure that instance was connected ")

Exchange

class Exchange(
    name: str,
    exchange_type: str,
    topic: str = '#',
    durable: bool = True
)
View Source
@attr.s(frozen=True)

class Exchange:

    name = attr.ib(type=str, validator=attr.validators.instance_of(str))

    exchange_type = attr.ib(type=str, validator=attr.validators.instance_of(str))

    topic = attr.ib(type=str, default="#", validator=attr.validators.instance_of(str))

    durable = attr.ib(

        type=bool, default=True, validator=attr.validators.instance_of(bool)

    )

Class variables

durable
exchange_type
name
topic

Publish

class Publish(
    client: rabbit.client.AioRabbitClient,
    exchange_name: str = 'default.in.exchange',
    routing_key: str = '#'
)
View Source
@attr.s(slots=True)

class Publish:

    _client = attr.ib(

        type=AioRabbitClient,

        validator=attr.validators.instance_of(AioRabbitClient),

        repr=False,

    )

    exchange_name = attr.ib(

        type=str,

        default=os.getenv("PUBLISH_EXCHANGE_NAME", "default.in.exchange"),

        validator=attr.validators.instance_of(str),

    )

    routing_key = attr.ib(

        type=str,

        default=os.getenv("PUBLISH_ROUTING_KEY", "#"),

        validator=attr.validators.instance_of(str),

    )

    _channel = attr.ib(init=False, repr=False)

    async def configure(self) -> None:

        await asyncio.sleep(1)

        self._channel = await self._client.get_channel()

        loop = asyncio.get_running_loop()

        # loop.create_task(self._client.watch(self), name="publish_watcher")

        loop.create_task(self._client.watch(self))

    async def send_event(self, payload: bytes, **kwargs) -> None:

        await self._channel.publish(

            payload=payload,

            exchange_name=self.exchange_name,

            routing_key=self.routing_key,

            **kwargs,

        )

Class variables

exchange_name
routing_key

Instance variables

exchange_name
routing_key

Methods

configure

def configure(
    self
) -> None
View Source
    async def configure(self) -> None:

        await asyncio.sleep(1)

        self._channel = await self._client.get_channel()

        loop = asyncio.get_running_loop()

        # loop.create_task(self._client.watch(self), name="publish_watcher")

        loop.create_task(self._client.watch(self))

send_event

def send_event(
    self,
    payload: bytes,
    **kwargs
) -> None
View Source
    async def send_event(self, payload: bytes, **kwargs) -> None:

        await self._channel.publish(

            payload=payload,

            exchange_name=self.exchange_name,

            routing_key=self.routing_key,

            **kwargs,

        )

Queue

class Queue(
    name: str,
    durable: bool = True,
    arguments: dict = NOTHING
)
View Source
@attr.s(frozen=True)

class Queue:

    name = attr.ib(type=str, validator=attr.validators.instance_of(str))

    durable = attr.ib(

        type=bool, default=True, validator=attr.validators.instance_of(bool)

    )

    arguments = attr.ib(

        type=dict, factory=dict, validator=attr.validators.instance_of(dict)

    )

Class variables

arguments
durable
name

Subscribe

class Subscribe(
    client: rabbit.client.AioRabbitClient,
    task: Callable,
    exchange: rabbit.exchange.Exchange = Exchange(name='default.in.exchange', exchange_type='topic', topic='#', durable=True),
    queue: rabbit.queue.Queue = Queue(name='default.subscribe.queue', durable=True, arguments={}),
    concurrent: int = 1,
    delay_strategy: Callable = <function constant at 0x7f3ec5471040>
)
View Source
@attr.s(slots=True, repr=False)

class Subscribe:

    _client = attr.ib(

        type=AioRabbitClient,

        validator=attr.validators.instance_of(AioRabbitClient),

        repr=False,

    )

    task = attr.ib(type=Callable, validator=attr.validators.is_callable())

    exchange = attr.ib(

        type=Exchange,

        default=Exchange(

            name=os.getenv("SUBSCRIBE_EXCHANGE_NAME", "default.in.exchange"),

            exchange_type=os.getenv("SUBSCRIBE_EXCHANGE_TYPE", "topic"),

            topic=os.getenv("SUBSCRIBE_TOPIC", "#"),

        ),

        validator=attr.validators.instance_of(Exchange),

    )

    queue = attr.ib(

        type=Queue,

        default=Queue(

            name=os.getenv("SUBSCRIBE_QUEUE_NAME", "default.subscribe.queue")

        ),

        validator=attr.validators.instance_of(Queue),

    )

    concurrent = attr.ib(

        type=int, default=1, validator=attr.validators.instance_of(int)

    )

    delay_strategy = attr.ib(

        type=Callable, default=constant, validator=attr.validators.is_callable()

    )

    _dlx = attr.ib(type=DLX, validator=attr.validators.instance_of(DLX), init=False)

    _job_queue = attr.ib(init=False, repr=False)

    _loop = attr.ib(init=False, repr=False)

    _channel = attr.ib(init=False, repr=False)

    def __repr__(self) -> str:

        return f"Subscribe(task={self.task.__name__}, exchange={self.exchange}, queue={self.queue}, concurrent={self.concurrent}, dlx={self._dlx})"

    def __attrs_post_init__(self) -> None:

        self._dlx = DLX(

            client=self._client,

            exchange=Exchange(

                name=os.getenv("DLX_EXCHANGE_NAME", "DLX"),

                exchange_type=os.getenv("DLX_TYPE", "direct"),

            ),

            dlq_exchange=Exchange(

                name=os.getenv(

                    "DLQ_EXCHANGE_NAME", f"dlqReRouter.{self.exchange.name}"

                ),

                exchange_type=os.getenv("DLQ_EXCHANGE_TYPE", "topic"),

                topic=os.getenv("SUBSCRIBE_QUEUE", self.queue.name),

            ),

            queue=Queue(

                name=self.queue.name,

                arguments={

                    "x-dead-letter-exchange": f"dlqReRouter.{self.exchange.name}",

                    "x-dead-letter-routing-key": self.queue.name,

                },

            ),

            delay_strategy=self.delay_strategy,

        )

        self._job_queue = asyncio.Queue(maxsize=self.concurrent)

        self._loop = asyncio.get_event_loop()

    async def configure(self) -> None:

        await asyncio.sleep(3)

        self._channel = await self._client.get_channel()

        await self.qos(prefetch_count=self.concurrent)

        # self._loop.create_task(self._client.watch(self), name="subscribe_watcher")

        self._loop.create_task(self._client.watch(self))

        with suppress(SynchronizationError):

            try:

                await self._configure_queue()

                await self._dlx.configure()

                await self._configure_exchange()

                await self._configure_queue_bind()

            except AttributeNotInitialized:

                logger.debug("Waiting client initialization...SUBSCRIBE")

    async def _configure_exchange(self) -> None:

        await self._channel.exchange_declare(

            exchange_name=self.exchange.name,

            type_name=self.exchange.exchange_type,

            durable=self.exchange.durable,

        )

        await asyncio.sleep(2)

    async def _configure_queue(self) -> None:

        await self._channel.queue_declare(

            queue_name=self.queue.name, durable=self.queue.durable

        )

    async def _configure_queue_bind(self) -> None:

        await self._channel.queue_bind(

            exchange_name=self.exchange.name,

            queue_name=self.queue.name,

            routing_key=self.exchange.topic,

        )

        await self._channel.basic_consume(

            callback=self.callback, queue_name=self.queue.name

        )

    async def callback(

        self, channel: Channel, body: bytes, envelope: Envelope, properties: Properties

    ) -> None:

        if not self._job_queue.full():

            self._job_queue.put_nowait((body, envelope, properties))

            self._loop.create_task(self._run())

        else:

            await self.nack_event(envelope, requeue=True)

            await self._job_queue.join()

    async def _run(self) -> None:

        try:

            body, envelope, properties = await self._job_queue.get()

            await self.task(body)

            self._job_queue.task_done()

            await self.ack_event(envelope, multiple=False)

        except Exception as cause:

            await asyncio.shield(

                asyncio.gather(

                    self.ack_event(envelope, multiple=False),

                    self._dlx.send_event(cause, body, envelope, properties),

                )

            )

    async def ack_event(self, envelope: Envelope, multiple: bool = False) -> None:

        await self._channel.basic_client_ack(

            delivery_tag=envelope.delivery_tag, multiple=multiple

        )

    async def nack_event(

        self, envelope: Envelope, multiple: bool = False, requeue: bool = True

    ) -> None:

        await self._channel.basic_client_nack(

            delivery_tag=envelope.delivery_tag, multiple=multiple, requeue=requeue

        )

    async def reject_event(self, envelope: Envelope, requeue: bool = False) -> None:

        await self._channel.basic_reject(

            delivery_tag=envelope.delivery_tag, requeue=requeue

        )

    async def qos(

        self,

        prefetch_size: int = 0,

        prefetch_count: int = 0,

        connection_global: bool = False,

    ):

        await self._channel.basic_qos(

            prefetch_size=prefetch_size,

            prefetch_count=prefetch_count,

            connection_global=connection_global,

        )

Class variables

concurrent
delay_strategy
exchange
queue
task

Instance variables

concurrent
delay_strategy
exchange
queue
task

Methods

ack_event

def ack_event(
    self,
    envelope: aioamqp.envelope.Envelope,
    multiple: bool = False
) -> None
View Source
    async def ack_event(self, envelope: Envelope, multiple: bool = False) -> None:

        await self._channel.basic_client_ack(

            delivery_tag=envelope.delivery_tag, multiple=multiple

        )

callback

def callback(
    self,
    channel: aioamqp.channel.Channel,
    body: bytes,
    envelope: aioamqp.envelope.Envelope,
    properties: aioamqp.properties.Properties
) -> None
View Source
    async def callback(

        self, channel: Channel, body: bytes, envelope: Envelope, properties: Properties

    ) -> None:

        if not self._job_queue.full():

            self._job_queue.put_nowait((body, envelope, properties))

            self._loop.create_task(self._run())

        else:

            await self.nack_event(envelope, requeue=True)

            await self._job_queue.join()

configure

def configure(
    self
) -> None
View Source
    async def configure(self) -> None:

        await asyncio.sleep(3)

        self._channel = await self._client.get_channel()

        await self.qos(prefetch_count=self.concurrent)

        # self._loop.create_task(self._client.watch(self), name="subscribe_watcher")

        self._loop.create_task(self._client.watch(self))

        with suppress(SynchronizationError):

            try:

                await self._configure_queue()

                await self._dlx.configure()

                await self._configure_exchange()

                await self._configure_queue_bind()

            except AttributeNotInitialized:

                logger.debug("Waiting client initialization...SUBSCRIBE")

nack_event

def nack_event(
    self,
    envelope: aioamqp.envelope.Envelope,
    multiple: bool = False,
    requeue: bool = True
) -> None
View Source
    async def nack_event(

        self, envelope: Envelope, multiple: bool = False, requeue: bool = True

    ) -> None:

        await self._channel.basic_client_nack(

            delivery_tag=envelope.delivery_tag, multiple=multiple, requeue=requeue

        )

qos

def qos(
    self,
    prefetch_size: int = 0,
    prefetch_count: int = 0,
    connection_global: bool = False
)
View Source
    async def qos(

        self,

        prefetch_size: int = 0,

        prefetch_count: int = 0,

        connection_global: bool = False,

    ):

        await self._channel.basic_qos(

            prefetch_size=prefetch_size,

            prefetch_count=prefetch_count,

            connection_global=connection_global,

        )

reject_event

def reject_event(
    self,
    envelope: aioamqp.envelope.Envelope,
    requeue: bool = False
) -> None
View Source
    async def reject_event(self, envelope: Envelope, requeue: bool = False) -> None:

        await self._channel.basic_reject(

            delivery_tag=envelope.delivery_tag, requeue=requeue

        )