Skip to content

Module rabbit.client

None

None

View Source
import asyncio

import aioamqp

import attr

from aioamqp.channel import Channel

from aioamqp.protocol import AmqpProtocol

from .exceptions import AttributeNotInitialized

from .logger import logger

@attr.s(slots=True, repr=False)

class AioRabbitClient:

    _protocol = attr.ib(type=AmqpProtocol, init=False, default=None)

    transport = attr.ib(init=False, default=None)

    _event = attr.ib(init=False)

    def __attrs_post_init__(self):

        self._event = asyncio.Event()

    async def watch(self, item):

        logger.info("Watch connection enabled.")

        self._event.clear()

        await self._event.wait()

        logger.error("Connection lost.")

        if not item.__module__.endswith(".dlx"):

            logger.warning("Trying to establish a new connection...")

            await item.configure()

    @property

    def protocol(self):

        return self._protocol

    @protocol.setter

    def protocol(self, value):

        self._protocol = value

        self._event.set()

    async def get_channel(self) -> Channel:

        if not self.protocol:

            raise AttributeNotInitialized("Connection not initialized.")

        channel = await self.protocol.channel()

        return channel

    async def connect(self, **kwargs) -> None:

        self.transport, self.protocol = await aioamqp.connect(**kwargs)

    async def persistent_connect(self, **kwargs):

        while True:

            try:

                self.transport, self.protocol = await aioamqp.connect(**kwargs)

                await asyncio.sleep(2)

                await self.protocol.wait_closed()

                self.transport.close()

            except (OSError, aioamqp.exceptions.AmqpClosedConnection) as err:

                logger.error(f"Error: {err} - Params: {kwargs}")

                await asyncio.sleep(5)

                await self.persistent_connect(**kwargs)

Classes

AioRabbitClient

class AioRabbitClient(

)
View Source
@attr.s(slots=True, repr=False)

class AioRabbitClient:

    _protocol = attr.ib(type=AmqpProtocol, init=False, default=None)

    transport = attr.ib(init=False, default=None)

    _event = attr.ib(init=False)

    def __attrs_post_init__(self):

        self._event = asyncio.Event()

    async def watch(self, item):

        logger.info("Watch connection enabled.")

        self._event.clear()

        await self._event.wait()

        logger.error("Connection lost.")

        if not item.__module__.endswith(".dlx"):

            logger.warning("Trying to establish a new connection...")

            await item.configure()

    @property

    def protocol(self):

        return self._protocol

    @protocol.setter

    def protocol(self, value):

        self._protocol = value

        self._event.set()

    async def get_channel(self) -> Channel:

        if not self.protocol:

            raise AttributeNotInitialized("Connection not initialized.")

        channel = await self.protocol.channel()

        return channel

    async def connect(self, **kwargs) -> None:

        self.transport, self.protocol = await aioamqp.connect(**kwargs)

    async def persistent_connect(self, **kwargs):

        while True:

            try:

                self.transport, self.protocol = await aioamqp.connect(**kwargs)

                await asyncio.sleep(2)

                await self.protocol.wait_closed()

                self.transport.close()

            except (OSError, aioamqp.exceptions.AmqpClosedConnection) as err:

                logger.error(f"Error: {err} - Params: {kwargs}")

                await asyncio.sleep(5)

                await self.persistent_connect(**kwargs)

Class variables

transport

Instance variables

protocol
transport

Methods

connect

def connect(
    self,
    **kwargs
) -> None
View Source
    async def connect(self, **kwargs) -> None:

        self.transport, self.protocol = await aioamqp.connect(**kwargs)

get_channel

def get_channel(
    self
) -> aioamqp.channel.Channel
View Source
    async def get_channel(self) -> Channel:

        if not self.protocol:

            raise AttributeNotInitialized("Connection not initialized.")

        channel = await self.protocol.channel()

        return channel

persistent_connect

def persistent_connect(
    self,
    **kwargs
)
View Source
    async def persistent_connect(self, **kwargs):

        while True:

            try:

                self.transport, self.protocol = await aioamqp.connect(**kwargs)

                await asyncio.sleep(2)

                await self.protocol.wait_closed()

                self.transport.close()

            except (OSError, aioamqp.exceptions.AmqpClosedConnection) as err:

                logger.error(f"Error: {err} - Params: {kwargs}")

                await asyncio.sleep(5)

                await self.persistent_connect(**kwargs)

watch

def watch(
    self,
    item
)
View Source
    async def watch(self, item):

        logger.info("Watch connection enabled.")

        self._event.clear()

        await self._event.wait()

        logger.error("Connection lost.")

        if not item.__module__.endswith(".dlx"):

            logger.warning("Trying to establish a new connection...")

            await item.configure()