Skip to content

Module rabbit.dlx

None

None

View Source
import asyncio

from typing import Callable

import attr

from aioamqp.envelope import Envelope

from aioamqp.properties import Properties

from ._wait import constant

from .client import AioRabbitClient

from .exceptions import AttributeNotInitialized, OperationError

from .exchange import Exchange

from .logger import logger

from .queue import Queue

@attr.s(slots=True, repr=False)

class DLX:

    _client = attr.ib(

        type=AioRabbitClient,

        validator=attr.validators.instance_of(AioRabbitClient),

        repr=False,

    )

    exchange = attr.ib(

        type=Exchange,

        validator=attr.validators.instance_of(Exchange),

    )

    dlq_exchange = attr.ib(

        type=Exchange,

        validator=attr.validators.instance_of(Exchange),

    )

    queue = attr.ib(

        type=Queue,

        validator=attr.validators.instance_of(Queue),

    )

    delay_strategy = attr.ib(

        type=Callable, default=constant, validator=attr.validators.is_callable()

    )

    _channel = attr.ib(init=False, repr=False)

    def __repr__(self) -> str:

        return f"DLX(queue={self.queue}, delay_strategy={self.delay_strategy.__name__}, exchange={self.exchange}), dlq_exchange={self.dlq_exchange}"

    async def configure(self) -> None:

        self._channel = await self._client.get_channel()

        try:

            await self._configure_queue()

            await self._configure_exchange()

            await self._configure_queue_bind()

        except AttributeNotInitialized:

            logger.debug("Waiting client initialization...DLX")

    async def _configure_exchange(self) -> None:

        await asyncio.gather(

            self._channel.exchange_declare(

                exchange_name=self.exchange.name,

                type_name=self.exchange.exchange_type,

                durable=self.exchange.durable,

            ),

            self._channel.exchange_declare(

                exchange_name=self.dlq_exchange.name,

                type_name=self.dlq_exchange.exchange_type,

                durable=self.dlq_exchange.durable,

            ),

        )

        await asyncio.sleep(2)

    async def _configure_queue(self) -> None:

        queue_name = await self._ensure_endswith_dlq(self.queue.name)

        await self._channel.queue_declare(

            queue_name=queue_name,

            durable=self.queue.durable,

            arguments=self.queue.arguments,

        )

    async def _configure_queue_bind(self) -> None:

        queue_name = await self._ensure_endswith_dlq(self.queue.name)

        await asyncio.gather(

            self._channel.queue_bind(

                exchange_name=self.exchange.name,

                queue_name=queue_name,

                routing_key=self.queue.name,

            ),

            self._channel.queue_bind(

                exchange_name=self.dlq_exchange.name,

                queue_name=self.queue.name,

                routing_key=self.dlq_exchange.topic,

            ),

        )

    async def _ensure_endswith_dlq(self, value: str) -> str:

        if not value.endswith(".dlq"):

            value = f"{value}.dlq"

        return value

    async def send_event(

        self, cause: Exception, body: bytes, envelope: Envelope, properties: Properties

    ) -> None:

        timeout = self.delay_strategy(properties.headers)

        properties = await self._get_properties(timeout, cause, envelope)

        logger.debug(

            f"Send event to dlq: [exchange: {self.exchange.name}"

            f" | routing_key: {self.queue.name} | properties: {properties}]"

        )

        try:

            await self._channel.publish(

                body, self.exchange.name, self.queue.name, properties

            )

        except AttributeError:

            raise OperationError("Ensure that instance was connected ")

    async def _get_properties(

        self, timeout: int, exception_message: Exception, envelope: Envelope

    ) -> dict:

        return {

            "expiration": f"{timeout}",

            "headers": {

                "x-delay": f"{timeout}",

                "x-exception-message": f"{exception_message}",

                "x-original-exchange": f"{envelope.exchange_name}",

                "x-original-routingKey": f"{envelope.routing_key}",

            },

        }

Classes

DLX

class DLX(
    client: rabbit.client.AioRabbitClient,
    exchange: rabbit.exchange.Exchange,
    dlq_exchange: rabbit.exchange.Exchange,
    queue: rabbit.queue.Queue,
    delay_strategy: Callable = <function constant at 0x7f3ec5471040>
)
View Source
@attr.s(slots=True, repr=False)

class DLX:

    _client = attr.ib(

        type=AioRabbitClient,

        validator=attr.validators.instance_of(AioRabbitClient),

        repr=False,

    )

    exchange = attr.ib(

        type=Exchange,

        validator=attr.validators.instance_of(Exchange),

    )

    dlq_exchange = attr.ib(

        type=Exchange,

        validator=attr.validators.instance_of(Exchange),

    )

    queue = attr.ib(

        type=Queue,

        validator=attr.validators.instance_of(Queue),

    )

    delay_strategy = attr.ib(

        type=Callable, default=constant, validator=attr.validators.is_callable()

    )

    _channel = attr.ib(init=False, repr=False)

    def __repr__(self) -> str:

        return f"DLX(queue={self.queue}, delay_strategy={self.delay_strategy.__name__}, exchange={self.exchange}), dlq_exchange={self.dlq_exchange}"

    async def configure(self) -> None:

        self._channel = await self._client.get_channel()

        try:

            await self._configure_queue()

            await self._configure_exchange()

            await self._configure_queue_bind()

        except AttributeNotInitialized:

            logger.debug("Waiting client initialization...DLX")

    async def _configure_exchange(self) -> None:

        await asyncio.gather(

            self._channel.exchange_declare(

                exchange_name=self.exchange.name,

                type_name=self.exchange.exchange_type,

                durable=self.exchange.durable,

            ),

            self._channel.exchange_declare(

                exchange_name=self.dlq_exchange.name,

                type_name=self.dlq_exchange.exchange_type,

                durable=self.dlq_exchange.durable,

            ),

        )

        await asyncio.sleep(2)

    async def _configure_queue(self) -> None:

        queue_name = await self._ensure_endswith_dlq(self.queue.name)

        await self._channel.queue_declare(

            queue_name=queue_name,

            durable=self.queue.durable,

            arguments=self.queue.arguments,

        )

    async def _configure_queue_bind(self) -> None:

        queue_name = await self._ensure_endswith_dlq(self.queue.name)

        await asyncio.gather(

            self._channel.queue_bind(

                exchange_name=self.exchange.name,

                queue_name=queue_name,

                routing_key=self.queue.name,

            ),

            self._channel.queue_bind(

                exchange_name=self.dlq_exchange.name,

                queue_name=self.queue.name,

                routing_key=self.dlq_exchange.topic,

            ),

        )

    async def _ensure_endswith_dlq(self, value: str) -> str:

        if not value.endswith(".dlq"):

            value = f"{value}.dlq"

        return value

    async def send_event(

        self, cause: Exception, body: bytes, envelope: Envelope, properties: Properties

    ) -> None:

        timeout = self.delay_strategy(properties.headers)

        properties = await self._get_properties(timeout, cause, envelope)

        logger.debug(

            f"Send event to dlq: [exchange: {self.exchange.name}"

            f" | routing_key: {self.queue.name} | properties: {properties}]"

        )

        try:

            await self._channel.publish(

                body, self.exchange.name, self.queue.name, properties

            )

        except AttributeError:

            raise OperationError("Ensure that instance was connected ")

    async def _get_properties(

        self, timeout: int, exception_message: Exception, envelope: Envelope

    ) -> dict:

        return {

            "expiration": f"{timeout}",

            "headers": {

                "x-delay": f"{timeout}",

                "x-exception-message": f"{exception_message}",

                "x-original-exchange": f"{envelope.exchange_name}",

                "x-original-routingKey": f"{envelope.routing_key}",

            },

        }

Class variables

delay_strategy
dlq_exchange
exchange
queue

Instance variables

delay_strategy
dlq_exchange
exchange
queue

Methods

configure

def configure(
    self
) -> None
View Source
    async def configure(self) -> None:

        self._channel = await self._client.get_channel()

        try:

            await self._configure_queue()

            await self._configure_exchange()

            await self._configure_queue_bind()

        except AttributeNotInitialized:

            logger.debug("Waiting client initialization...DLX")

send_event

def send_event(
    self,
    cause: Exception,
    body: bytes,
    envelope: aioamqp.envelope.Envelope,
    properties: aioamqp.properties.Properties
) -> None
View Source
    async def send_event(

        self, cause: Exception, body: bytes, envelope: Envelope, properties: Properties

    ) -> None:

        timeout = self.delay_strategy(properties.headers)

        properties = await self._get_properties(timeout, cause, envelope)

        logger.debug(

            f"Send event to dlq: [exchange: {self.exchange.name}"

            f" | routing_key: {self.queue.name} | properties: {properties}]"

        )

        try:

            await self._channel.publish(

                body, self.exchange.name, self.queue.name, properties

            )

        except AttributeError:

            raise OperationError("Ensure that instance was connected ")