Skip to content

Module rabbit.subscribe

None

None

View Source
import asyncio

import os

from contextlib import suppress

from typing import Callable

import attr

from aioamqp.channel import Channel

from aioamqp.envelope import Envelope

from aioamqp.exceptions import SynchronizationError

from aioamqp.properties import Properties

from ._wait import constant

from .client import AioRabbitClient

from .dlx import DLX

from .exceptions import AttributeNotInitialized

from .exchange import Exchange

from .logger import logger

from .queue import Queue

@attr.s(slots=True, repr=False)

class Subscribe:

    _client = attr.ib(

        type=AioRabbitClient,

        validator=attr.validators.instance_of(AioRabbitClient),

        repr=False,

    )

    task = attr.ib(type=Callable, validator=attr.validators.is_callable())

    exchange = attr.ib(

        type=Exchange,

        default=Exchange(

            name=os.getenv("SUBSCRIBE_EXCHANGE_NAME", "default.in.exchange"),

            exchange_type=os.getenv("SUBSCRIBE_EXCHANGE_TYPE", "topic"),

            topic=os.getenv("SUBSCRIBE_TOPIC", "#"),

        ),

        validator=attr.validators.instance_of(Exchange),

    )

    queue = attr.ib(

        type=Queue,

        default=Queue(

            name=os.getenv("SUBSCRIBE_QUEUE_NAME", "default.subscribe.queue")

        ),

        validator=attr.validators.instance_of(Queue),

    )

    concurrent = attr.ib(

        type=int, default=1, validator=attr.validators.instance_of(int)

    )

    delay_strategy = attr.ib(

        type=Callable, default=constant, validator=attr.validators.is_callable()

    )

    _dlx = attr.ib(type=DLX, validator=attr.validators.instance_of(DLX), init=False)

    _job_queue = attr.ib(init=False, repr=False)

    _loop = attr.ib(init=False, repr=False)

    _channel = attr.ib(init=False, repr=False)

    def __repr__(self) -> str:

        return f"Subscribe(task={self.task.__name__}, exchange={self.exchange}, queue={self.queue}, concurrent={self.concurrent}, dlx={self._dlx})"

    def __attrs_post_init__(self) -> None:

        self._dlx = DLX(

            client=self._client,

            exchange=Exchange(

                name=os.getenv("DLX_EXCHANGE_NAME", "DLX"),

                exchange_type=os.getenv("DLX_TYPE", "direct"),

            ),

            dlq_exchange=Exchange(

                name=os.getenv(

                    "DLQ_EXCHANGE_NAME", f"dlqReRouter.{self.exchange.name}"

                ),

                exchange_type=os.getenv("DLQ_EXCHANGE_TYPE", "topic"),

                topic=os.getenv("SUBSCRIBE_QUEUE", self.queue.name),

            ),

            queue=Queue(

                name=self.queue.name,

                arguments={

                    "x-dead-letter-exchange": f"dlqReRouter.{self.exchange.name}",

                    "x-dead-letter-routing-key": self.queue.name,

                },

            ),

            delay_strategy=self.delay_strategy,

        )

        self._job_queue = asyncio.Queue(maxsize=self.concurrent)

        self._loop = asyncio.get_event_loop()

    async def configure(self) -> None:

        await asyncio.sleep(3)

        self._channel = await self._client.get_channel()

        await self.qos(prefetch_count=self.concurrent)

        # self._loop.create_task(self._client.watch(self), name="subscribe_watcher")

        self._loop.create_task(self._client.watch(self))

        with suppress(SynchronizationError):

            try:

                await self._configure_queue()

                await self._dlx.configure()

                await self._configure_exchange()

                await self._configure_queue_bind()

            except AttributeNotInitialized:

                logger.debug("Waiting client initialization...SUBSCRIBE")

    async def _configure_exchange(self) -> None:

        await self._channel.exchange_declare(

            exchange_name=self.exchange.name,

            type_name=self.exchange.exchange_type,

            durable=self.exchange.durable,

        )

        await asyncio.sleep(2)

    async def _configure_queue(self) -> None:

        await self._channel.queue_declare(

            queue_name=self.queue.name, durable=self.queue.durable

        )

    async def _configure_queue_bind(self) -> None:

        await self._channel.queue_bind(

            exchange_name=self.exchange.name,

            queue_name=self.queue.name,

            routing_key=self.exchange.topic,

        )

        await self._channel.basic_consume(

            callback=self.callback, queue_name=self.queue.name

        )

    async def callback(

        self, channel: Channel, body: bytes, envelope: Envelope, properties: Properties

    ) -> None:

        if not self._job_queue.full():

            self._job_queue.put_nowait((body, envelope, properties))

            self._loop.create_task(self._run())

        else:

            await self.nack_event(envelope, requeue=True)

            await self._job_queue.join()

    async def _run(self) -> None:

        try:

            body, envelope, properties = await self._job_queue.get()

            await self.task(body)

            self._job_queue.task_done()

            await self.ack_event(envelope, multiple=False)

        except Exception as cause:

            await asyncio.shield(

                asyncio.gather(

                    self.ack_event(envelope, multiple=False),

                    self._dlx.send_event(cause, body, envelope, properties),

                )

            )

    async def ack_event(self, envelope: Envelope, multiple: bool = False) -> None:

        await self._channel.basic_client_ack(

            delivery_tag=envelope.delivery_tag, multiple=multiple

        )

    async def nack_event(

        self, envelope: Envelope, multiple: bool = False, requeue: bool = True

    ) -> None:

        await self._channel.basic_client_nack(

            delivery_tag=envelope.delivery_tag, multiple=multiple, requeue=requeue

        )

    async def reject_event(self, envelope: Envelope, requeue: bool = False) -> None:

        await self._channel.basic_reject(

            delivery_tag=envelope.delivery_tag, requeue=requeue

        )

    async def qos(

        self,

        prefetch_size: int = 0,

        prefetch_count: int = 0,

        connection_global: bool = False,

    ):

        await self._channel.basic_qos(

            prefetch_size=prefetch_size,

            prefetch_count=prefetch_count,

            connection_global=connection_global,

        )

Classes

Subscribe

class Subscribe(
    client: rabbit.client.AioRabbitClient,
    task: Callable,
    exchange: rabbit.exchange.Exchange = Exchange(name='default.in.exchange', exchange_type='topic', topic='#', durable=True),
    queue: rabbit.queue.Queue = Queue(name='default.subscribe.queue', durable=True, arguments={}),
    concurrent: int = 1,
    delay_strategy: Callable = <function constant at 0x7f3ec5471040>
)
View Source
@attr.s(slots=True, repr=False)

class Subscribe:

    _client = attr.ib(

        type=AioRabbitClient,

        validator=attr.validators.instance_of(AioRabbitClient),

        repr=False,

    )

    task = attr.ib(type=Callable, validator=attr.validators.is_callable())

    exchange = attr.ib(

        type=Exchange,

        default=Exchange(

            name=os.getenv("SUBSCRIBE_EXCHANGE_NAME", "default.in.exchange"),

            exchange_type=os.getenv("SUBSCRIBE_EXCHANGE_TYPE", "topic"),

            topic=os.getenv("SUBSCRIBE_TOPIC", "#"),

        ),

        validator=attr.validators.instance_of(Exchange),

    )

    queue = attr.ib(

        type=Queue,

        default=Queue(

            name=os.getenv("SUBSCRIBE_QUEUE_NAME", "default.subscribe.queue")

        ),

        validator=attr.validators.instance_of(Queue),

    )

    concurrent = attr.ib(

        type=int, default=1, validator=attr.validators.instance_of(int)

    )

    delay_strategy = attr.ib(

        type=Callable, default=constant, validator=attr.validators.is_callable()

    )

    _dlx = attr.ib(type=DLX, validator=attr.validators.instance_of(DLX), init=False)

    _job_queue = attr.ib(init=False, repr=False)

    _loop = attr.ib(init=False, repr=False)

    _channel = attr.ib(init=False, repr=False)

    def __repr__(self) -> str:

        return f"Subscribe(task={self.task.__name__}, exchange={self.exchange}, queue={self.queue}, concurrent={self.concurrent}, dlx={self._dlx})"

    def __attrs_post_init__(self) -> None:

        self._dlx = DLX(

            client=self._client,

            exchange=Exchange(

                name=os.getenv("DLX_EXCHANGE_NAME", "DLX"),

                exchange_type=os.getenv("DLX_TYPE", "direct"),

            ),

            dlq_exchange=Exchange(

                name=os.getenv(

                    "DLQ_EXCHANGE_NAME", f"dlqReRouter.{self.exchange.name}"

                ),

                exchange_type=os.getenv("DLQ_EXCHANGE_TYPE", "topic"),

                topic=os.getenv("SUBSCRIBE_QUEUE", self.queue.name),

            ),

            queue=Queue(

                name=self.queue.name,

                arguments={

                    "x-dead-letter-exchange": f"dlqReRouter.{self.exchange.name}",

                    "x-dead-letter-routing-key": self.queue.name,

                },

            ),

            delay_strategy=self.delay_strategy,

        )

        self._job_queue = asyncio.Queue(maxsize=self.concurrent)

        self._loop = asyncio.get_event_loop()

    async def configure(self) -> None:

        await asyncio.sleep(3)

        self._channel = await self._client.get_channel()

        await self.qos(prefetch_count=self.concurrent)

        # self._loop.create_task(self._client.watch(self), name="subscribe_watcher")

        self._loop.create_task(self._client.watch(self))

        with suppress(SynchronizationError):

            try:

                await self._configure_queue()

                await self._dlx.configure()

                await self._configure_exchange()

                await self._configure_queue_bind()

            except AttributeNotInitialized:

                logger.debug("Waiting client initialization...SUBSCRIBE")

    async def _configure_exchange(self) -> None:

        await self._channel.exchange_declare(

            exchange_name=self.exchange.name,

            type_name=self.exchange.exchange_type,

            durable=self.exchange.durable,

        )

        await asyncio.sleep(2)

    async def _configure_queue(self) -> None:

        await self._channel.queue_declare(

            queue_name=self.queue.name, durable=self.queue.durable

        )

    async def _configure_queue_bind(self) -> None:

        await self._channel.queue_bind(

            exchange_name=self.exchange.name,

            queue_name=self.queue.name,

            routing_key=self.exchange.topic,

        )

        await self._channel.basic_consume(

            callback=self.callback, queue_name=self.queue.name

        )

    async def callback(

        self, channel: Channel, body: bytes, envelope: Envelope, properties: Properties

    ) -> None:

        if not self._job_queue.full():

            self._job_queue.put_nowait((body, envelope, properties))

            self._loop.create_task(self._run())

        else:

            await self.nack_event(envelope, requeue=True)

            await self._job_queue.join()

    async def _run(self) -> None:

        try:

            body, envelope, properties = await self._job_queue.get()

            await self.task(body)

            self._job_queue.task_done()

            await self.ack_event(envelope, multiple=False)

        except Exception as cause:

            await asyncio.shield(

                asyncio.gather(

                    self.ack_event(envelope, multiple=False),

                    self._dlx.send_event(cause, body, envelope, properties),

                )

            )

    async def ack_event(self, envelope: Envelope, multiple: bool = False) -> None:

        await self._channel.basic_client_ack(

            delivery_tag=envelope.delivery_tag, multiple=multiple

        )

    async def nack_event(

        self, envelope: Envelope, multiple: bool = False, requeue: bool = True

    ) -> None:

        await self._channel.basic_client_nack(

            delivery_tag=envelope.delivery_tag, multiple=multiple, requeue=requeue

        )

    async def reject_event(self, envelope: Envelope, requeue: bool = False) -> None:

        await self._channel.basic_reject(

            delivery_tag=envelope.delivery_tag, requeue=requeue

        )

    async def qos(

        self,

        prefetch_size: int = 0,

        prefetch_count: int = 0,

        connection_global: bool = False,

    ):

        await self._channel.basic_qos(

            prefetch_size=prefetch_size,

            prefetch_count=prefetch_count,

            connection_global=connection_global,

        )

Class variables

concurrent
delay_strategy
exchange
queue
task

Instance variables

concurrent
delay_strategy
exchange
queue
task

Methods

ack_event

def ack_event(
    self,
    envelope: aioamqp.envelope.Envelope,
    multiple: bool = False
) -> None
View Source
    async def ack_event(self, envelope: Envelope, multiple: bool = False) -> None:

        await self._channel.basic_client_ack(

            delivery_tag=envelope.delivery_tag, multiple=multiple

        )

callback

def callback(
    self,
    channel: aioamqp.channel.Channel,
    body: bytes,
    envelope: aioamqp.envelope.Envelope,
    properties: aioamqp.properties.Properties
) -> None
View Source
    async def callback(

        self, channel: Channel, body: bytes, envelope: Envelope, properties: Properties

    ) -> None:

        if not self._job_queue.full():

            self._job_queue.put_nowait((body, envelope, properties))

            self._loop.create_task(self._run())

        else:

            await self.nack_event(envelope, requeue=True)

            await self._job_queue.join()

configure

def configure(
    self
) -> None
View Source
    async def configure(self) -> None:

        await asyncio.sleep(3)

        self._channel = await self._client.get_channel()

        await self.qos(prefetch_count=self.concurrent)

        # self._loop.create_task(self._client.watch(self), name="subscribe_watcher")

        self._loop.create_task(self._client.watch(self))

        with suppress(SynchronizationError):

            try:

                await self._configure_queue()

                await self._dlx.configure()

                await self._configure_exchange()

                await self._configure_queue_bind()

            except AttributeNotInitialized:

                logger.debug("Waiting client initialization...SUBSCRIBE")

nack_event

def nack_event(
    self,
    envelope: aioamqp.envelope.Envelope,
    multiple: bool = False,
    requeue: bool = True
) -> None
View Source
    async def nack_event(

        self, envelope: Envelope, multiple: bool = False, requeue: bool = True

    ) -> None:

        await self._channel.basic_client_nack(

            delivery_tag=envelope.delivery_tag, multiple=multiple, requeue=requeue

        )

qos

def qos(
    self,
    prefetch_size: int = 0,
    prefetch_count: int = 0,
    connection_global: bool = False
)
View Source
    async def qos(

        self,

        prefetch_size: int = 0,

        prefetch_count: int = 0,

        connection_global: bool = False,

    ):

        await self._channel.basic_qos(

            prefetch_size=prefetch_size,

            prefetch_count=prefetch_count,

            connection_global=connection_global,

        )

reject_event

def reject_event(
    self,
    envelope: aioamqp.envelope.Envelope,
    requeue: bool = False
) -> None
View Source
    async def reject_event(self, envelope: Envelope, requeue: bool = False) -> None:

        await self._channel.basic_reject(

            delivery_tag=envelope.delivery_tag, requeue=requeue

        )