Module rabbit.subscribe

View Source
import asyncio

import json

import logging

import os

from contextlib import suppress

from typing import List, Optional

from aioamqp.channel import Channel

from aioamqp.envelope import Envelope

from aioamqp.exceptions import SynchronizationError

from aioamqp.properties import Properties

import attr

from rabbit.client import AioRabbitClient

from rabbit.dlx import DLX

from rabbit.exceptions import AttributeNotInitialized

from rabbit.exchange import Exchange

from rabbit.job import echo_job

from rabbit.publish import Publish

from rabbit.queue import Queue

from rabbit.task import Task

from rabbit.tlog.db import DB

logging.getLogger(__name__).addHandler(logging.NullHandler())

@attr.s(slots=True)

class Subscribe:

    client = attr.ib(

        type=AioRabbitClient,

        validator=attr.validators.instance_of(AioRabbitClient)

    )

    exchange = attr.ib(

        type=Exchange,

        default=Exchange(

            name=os.getenv('SUBSCRIBE_EXCHANGE', 'default.in.exchange'),

            exchange_type=os.getenv('SUBSCRIBE_EXCHANGE_TYPE', 'topic'),

            topic=os.getenv('SUBSCRIBE_TOPIC', '#')

        ),

        validator=attr.validators.instance_of(Exchange)

    )

    queue = attr.ib(

        type=Queue,

        default=Queue(

            name=os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')

        ),

        validator=attr.validators.instance_of(Queue)

    )

    dlx = attr.ib(

        type=DLX,

        default=DLX(

            dlq_queue=Queue(

                name=os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue'),

                arguments={

                    'x-dead-letter-exchange': os.getenv(

                        'SUBSCRIBE_EXCHANGE', 'default.in.exchange'

                    ),

                    'x-dead-letter-routing-key': os.getenv(

                        'SUBSCRIBE_TOPIC', '#'

                    )

                }

            ),

            routing_key=os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')

        ),

        validator=attr.validators.instance_of(DLX)

    )

    task = attr.ib(

        type=Task,

        default=Task(

            job=echo_job

        ),

        validator=attr.validators.instance_of(Task)

    )

    task_type = attr.ib(

        default='standard',

        validator=attr.validators.and_(

            attr.validators.in_(['standard', 'process']),

            attr.validators.instance_of(str)

        )

    )

    _publish = attr.ib(

        type=Optional[Publish],

        default=None,

        validator=attr.validators.optional(

            validator=attr.validators.instance_of(Publish)

        )

    )

    _db = attr.ib(

        type=Optional[DB],

        default=None,

        validator=attr.validators.optional(

            validator=attr.validators.instance_of(DB)

        )

    )

    def __attrs_post_init__(self) -> None:

        self.client.monitor_connection(self)

        self.dlx.client = self.client

        if self.publish:

            self.publish.client = self.client

    @property

    def publish(self) -> Optional[Publish]:

        return self._publish

    @publish.setter

    def publish(self, publish: Publish) -> None:

        logging.info('Registering connection monitoring')

        attr.validate(publish)

        self._publish = publish

        self._publish.client = self.client

    async def configure(self) -> None:

        with suppress(SynchronizationError):

            await asyncio.sleep(5)

            try:

                await self._configure_exchange()

                await self._configure_queue()

                await self.dlx.configure()

                await self._configure_publish()

                await self._configure_queue_bind()

            except AttributeNotInitialized:

                logging.debug('Waiting client initialization...SUBSCRIBE')

    async def _configure_publish(self) -> None:

        if self.publish:

            await self.publish.configure()

    async def _configure_exchange(self) -> None:

        await self.client.channel.exchange_declare(

            exchange_name=self.exchange.name,

            type_name=self.exchange.exchange_type,

            durable=self.exchange.durable

        )

        await asyncio.sleep(5)

    async def _configure_queue(self) -> None:

        await self.client.channel.queue_declare(

            queue_name=self.queue.name,

            durable=self.queue.durable

        )

    async def _configure_queue_bind(self) -> None:

        await self.client.channel.queue_bind(

            exchange_name=self.exchange.name,

            queue_name=self.queue.name,

            routing_key=self.exchange.topic

        )

        await self.client.channel.basic_consume(

            callback=self.callback,

            queue_name=self.queue.name

        )

    async def callback(self,

                       channel: Channel,

                       body: bytes,

                       envelope: Envelope,

                       properties: Properties):

        process_result = [bytes()]

        try:

            await self.ack_event(envelope)

            process_result = await self._execute(body)

            if self.publish:

                for result in process_result:

                    await self.publish.send_event(result)

            elif self._db:

                for result in process_result:

                    created_by = self._get_created_by(body)

                    await self._db.save(result, created_by)

            else:

                return process_result

        except Exception as cause:

            await self.dlx.send_event(cause, body, envelope, properties)

    async def _execute(self, data: bytes) -> List[bytes]:

        process_result = [bytes()]

        logging.info(f'Initializing event processing...')

        if self.task_type == 'process':

            process_result = await self.task.process_executor(data)

            logging.info(f'Event successfully processed.')

            return process_result

        process_result = await self.task.std_executor(data)

        logging.info(f'Event successfully processed.')

        return process_result

    def _get_created_by(self, payload: bytes) -> str:

        raw_data = json.loads(payload)

        return str(raw_data.get('createdBy', 'anonymous'))

    async def reject_event(self, envelope: Envelope, requeue: bool = False) -> None:

        await self.client.channel.basic_client_nack(

            delivery_tag=envelope.delivery_tag,

            requeue=requeue

        )

    async def ack_event(self, envelope: Envelope) -> None:

        await self.client.channel.basic_client_ack(

            delivery_tag=envelope.delivery_tag

        )

Classes

Subscribe

class Subscribe(
    client: rabbit.client.AioRabbitClient,
    exchange: rabbit.exchange.Exchange = Exchange(name='default.in.exchange', exchange_type='topic', topic='#', durable=True),
    queue: rabbit.queue.Queue = Queue(name='default.subscribe.queue', durable=True, arguments={}),
    dlx: rabbit.dlx.DLX = DLX(dlq_queue=Queue(name='default.subscribe.queue', durable=True, arguments={'x-dead-letter-exchange': 'default.in.exchange', 'x-dead-letter-routing-key': '#'}), routing_key='default.subscribe.queue', _client=None, dlx_exchange=Exchange(name='DLX', exchange_type='direct', topic='', durable=True)),
    task: rabbit.task.Task = Task(_app=<_UnixSelectorEventLoop running=False closed=False debug=False>, job=<function echo_job at 0x7fb8ac125620>, process=<concurrent.futures.process.ProcessPoolExecutor object at 0x7fb8a71ad080>),
    task_type='standard',
    publish: Union[rabbit.publish.Publish, NoneType] = None,
    db: Union[rabbit.tlog.db.DB, NoneType] = None
)
View Source
class Subscribe:

    client = attr.ib(

        type=AioRabbitClient,

        validator=attr.validators.instance_of(AioRabbitClient)

    )

    exchange = attr.ib(

        type=Exchange,

        default=Exchange(

            name=os.getenv('SUBSCRIBE_EXCHANGE', 'default.in.exchange'),

            exchange_type=os.getenv('SUBSCRIBE_EXCHANGE_TYPE', 'topic'),

            topic=os.getenv('SUBSCRIBE_TOPIC', '#')

        ),

        validator=attr.validators.instance_of(Exchange)

    )

    queue = attr.ib(

        type=Queue,

        default=Queue(

            name=os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')

        ),

        validator=attr.validators.instance_of(Queue)

    )

    dlx = attr.ib(

        type=DLX,

        default=DLX(

            dlq_queue=Queue(

                name=os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue'),

                arguments={

                    'x-dead-letter-exchange': os.getenv(

                        'SUBSCRIBE_EXCHANGE', 'default.in.exchange'

                    ),

                    'x-dead-letter-routing-key': os.getenv(

                        'SUBSCRIBE_TOPIC', '#'

                    )

                }

            ),

            routing_key=os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')

        ),

        validator=attr.validators.instance_of(DLX)

    )

    task = attr.ib(

        type=Task,

        default=Task(

            job=echo_job

        ),

        validator=attr.validators.instance_of(Task)

    )

    task_type = attr.ib(

        default='standard',

        validator=attr.validators.and_(

            attr.validators.in_(['standard', 'process']),

            attr.validators.instance_of(str)

        )

    )

    _publish = attr.ib(

        type=Optional[Publish],

        default=None,

        validator=attr.validators.optional(

            validator=attr.validators.instance_of(Publish)

        )

    )

    _db = attr.ib(

        type=Optional[DB],

        default=None,

        validator=attr.validators.optional(

            validator=attr.validators.instance_of(DB)

        )

    )

    def __attrs_post_init__(self) -> None:

        self.client.monitor_connection(self)

        self.dlx.client = self.client

        if self.publish:

            self.publish.client = self.client

    @property

    def publish(self) -> Optional[Publish]:

        return self._publish

    @publish.setter

    def publish(self, publish: Publish) -> None:

        logging.info('Registering connection monitoring')

        attr.validate(publish)

        self._publish = publish

        self._publish.client = self.client

    async def configure(self) -> None:

        with suppress(SynchronizationError):

            await asyncio.sleep(5)

            try:

                await self._configure_exchange()

                await self._configure_queue()

                await self.dlx.configure()

                await self._configure_publish()

                await self._configure_queue_bind()

            except AttributeNotInitialized:

                logging.debug('Waiting client initialization...SUBSCRIBE')

    async def _configure_publish(self) -> None:

        if self.publish:

            await self.publish.configure()

    async def _configure_exchange(self) -> None:

        await self.client.channel.exchange_declare(

            exchange_name=self.exchange.name,

            type_name=self.exchange.exchange_type,

            durable=self.exchange.durable

        )

        await asyncio.sleep(5)

    async def _configure_queue(self) -> None:

        await self.client.channel.queue_declare(

            queue_name=self.queue.name,

            durable=self.queue.durable

        )

    async def _configure_queue_bind(self) -> None:

        await self.client.channel.queue_bind(

            exchange_name=self.exchange.name,

            queue_name=self.queue.name,

            routing_key=self.exchange.topic

        )

        await self.client.channel.basic_consume(

            callback=self.callback,

            queue_name=self.queue.name

        )

    async def callback(self,

                       channel: Channel,

                       body: bytes,

                       envelope: Envelope,

                       properties: Properties):

        process_result = [bytes()]

        try:

            await self.ack_event(envelope)

            process_result = await self._execute(body)

            if self.publish:

                for result in process_result:

                    await self.publish.send_event(result)

            elif self._db:

                for result in process_result:

                    created_by = self._get_created_by(body)

                    await self._db.save(result, created_by)

            else:

                return process_result

        except Exception as cause:

            await self.dlx.send_event(cause, body, envelope, properties)

    async def _execute(self, data: bytes) -> List[bytes]:

        process_result = [bytes()]

        logging.info(f'Initializing event processing...')

        if self.task_type == 'process':

            process_result = await self.task.process_executor(data)

            logging.info(f'Event successfully processed.')

            return process_result

        process_result = await self.task.std_executor(data)

        logging.info(f'Event successfully processed.')

        return process_result

    def _get_created_by(self, payload: bytes) -> str:

        raw_data = json.loads(payload)

        return str(raw_data.get('createdBy', 'anonymous'))

    async def reject_event(self, envelope: Envelope, requeue: bool = False) -> None:

        await self.client.channel.basic_client_nack(

            delivery_tag=envelope.delivery_tag,

            requeue=requeue

        )

    async def ack_event(self, envelope: Envelope) -> None:

        await self.client.channel.basic_client_ack(

            delivery_tag=envelope.delivery_tag

        )

Class variables

client
dlx
exchange
queue
task
task_type

Instance variables

client
dlx
exchange
publish
queue
task
task_type

Methods

ack_event
def ack_event(
    self,
    envelope: aioamqp.envelope.Envelope
) -> None
View Source
    async def ack_event(self, envelope: Envelope) -> None:

        await self.client.channel.basic_client_ack(

            delivery_tag=envelope.delivery_tag

        )
callback
def callback(
    self,
    channel: aioamqp.channel.Channel,
    body: bytes,
    envelope: aioamqp.envelope.Envelope,
    properties: aioamqp.properties.Properties
)
View Source
    async def callback(self,

                       channel: Channel,

                       body: bytes,

                       envelope: Envelope,

                       properties: Properties):

        process_result = [bytes()]

        try:

            await self.ack_event(envelope)

            process_result = await self._execute(body)

            if self.publish:

                for result in process_result:

                    await self.publish.send_event(result)

            elif self._db:

                for result in process_result:

                    created_by = self._get_created_by(body)

                    await self._db.save(result, created_by)

            else:

                return process_result

        except Exception as cause:

            await self.dlx.send_event(cause, body, envelope, properties)
configure
def configure(
    self
) -> None
View Source
    async def configure(self) -> None:

        with suppress(SynchronizationError):

            await asyncio.sleep(5)

            try:

                await self._configure_exchange()

                await self._configure_queue()

                await self.dlx.configure()

                await self._configure_publish()

                await self._configure_queue_bind()

            except AttributeNotInitialized:

                logging.debug('Waiting client initialization...SUBSCRIBE')
reject_event
def reject_event(
    self,
    envelope: aioamqp.envelope.Envelope,
    requeue: bool = False
) -> None
View Source
    async def reject_event(self, envelope: Envelope, requeue: bool = False) -> None:

        await self.client.channel.basic_client_nack(

            delivery_tag=envelope.delivery_tag,

            requeue=requeue

        )