Skip to content

Module rabbit.cli

None

None

View Source
from rabbit.cli.consumer import Consumer

from rabbit.cli.producer import Producer

__all__ = ["Consumer", "Producer"]

Sub-modules

Classes

Consumer

class Consumer(
    exchange_name: str,
    exchange_type: str,
    exchange_topic: str,
    queue_name: str,
    concurrent: int
)
View Source
class Consumer:

    def __init__(

        self,

        exchange_name: str,

        exchange_type: str,

        exchange_topic: str,

        queue_name: str,

        concurrent: int,

    ):

        self.loop = asyncio.get_event_loop()

        self.subscribe_client = AioRabbitClient()

        self.loop.create_task(self.subscribe_client.persistent_connect())

        self.exchange_type = exchange_type

        self.exchange_topic = exchange_topic

        self.exchange_name = exchange_name

        self.queue_name = queue_name

        self.concurrent = concurrent

    async def init(self, task):

        await asyncio.sleep(1)

        logger.info(f"Using {task.__doc__}")

        subscribe = Subscribe(

            client=self.subscribe_client,

            task=task,

            exchange=Exchange(

                self.exchange_name, self.exchange_type, self.exchange_topic

            ),

            queue=Queue(name=self.queue_name),

            concurrent=self.concurrent,

        )

        await subscribe.configure()

    def run(self, chaos_mode: bool = False):

        task = async_echo_job

        if chaos_mode:

            task = async_chaos_job

        self.loop.run_until_complete(self.init(task))

        self.loop.run_forever()

Methods

init

def init(
    self,
    task
)
View Source
    async def init(self, task):

        await asyncio.sleep(1)

        logger.info(f"Using {task.__doc__}")

        subscribe = Subscribe(

            client=self.subscribe_client,

            task=task,

            exchange=Exchange(

                self.exchange_name, self.exchange_type, self.exchange_topic

            ),

            queue=Queue(name=self.queue_name),

            concurrent=self.concurrent,

        )

        await subscribe.configure()

run

def run(
    self,
    chaos_mode: bool = False
)
View Source
    def run(self, chaos_mode: bool = False):

        task = async_echo_job

        if chaos_mode:

            task = async_chaos_job

        self.loop.run_until_complete(self.init(task))

        self.loop.run_forever()

Producer

class Producer(
    payload: bytes,
    qtd: int,
    exchange_name: str,
    routing_key: str,
    **kwargs
)
View Source
class Producer:

    def __init__(

        self, payload: bytes, qtd: int, exchange_name: str, routing_key: str, **kwargs

    ):

        self.loop = asyncio.get_event_loop()

        self.client = AioRabbitClient()

        self.qtd = qtd

        self.payload = payload

        self.exchange_name = exchange_name

        self.routing_key = routing_key

        self.loop.run_until_complete(self.client.connect(**kwargs))

    def configure_publish(self):

        publish = Publish(self.client, self.exchange_name, self.routing_key)

        self.loop.run_until_complete(publish.configure())

        return publish

    def send_event(self):

        publish = self.configure_publish()

        tasks = []

        with tqdm(total=self.qtd, unit="event", desc="events") as pbar:

            pbar.set_description("sending events...")

            for i in range(0, self.qtd):

                task = self.loop.create_task(publish.send_event(self.payload))

                tasks.append(task)

                pbar.update(1)

            self.loop.run_until_complete(asyncio.gather(*tasks))

        for task in asyncio.all_tasks(self.loop):

            task.cancel()

Methods

configure_publish

def configure_publish(
    self
)
View Source
    def configure_publish(self):

        publish = Publish(self.client, self.exchange_name, self.routing_key)

        self.loop.run_until_complete(publish.configure())

        return publish

send_event

def send_event(
    self
)
View Source
    def send_event(self):

        publish = self.configure_publish()

        tasks = []

        with tqdm(total=self.qtd, unit="event", desc="events") as pbar:

            pbar.set_description("sending events...")

            for i in range(0, self.qtd):

                task = self.loop.create_task(publish.send_event(self.payload))

                tasks.append(task)

                pbar.update(1)

            self.loop.run_until_complete(asyncio.gather(*tasks))

        for task in asyncio.all_tasks(self.loop):

            task.cancel()