Module rabbit.cli.producer
None
None
View Source
import asyncio
from tqdm import tqdm
from rabbit.client import AioRabbitClient
from rabbit.publish import Publish
class Producer:
def __init__(
self, payload: bytes, qtd: int, exchange_name: str, routing_key: str, **kwargs
):
self.loop = asyncio.get_event_loop()
self.client = AioRabbitClient()
self.qtd = qtd
self.payload = payload
self.exchange_name = exchange_name
self.routing_key = routing_key
self.loop.run_until_complete(self.client.connect(**kwargs))
def configure_publish(self):
publish = Publish(self.client, self.exchange_name, self.routing_key)
self.loop.run_until_complete(publish.configure())
return publish
def send_event(self):
publish = self.configure_publish()
tasks = []
with tqdm(total=self.qtd, unit="event", desc="events") as pbar:
pbar.set_description("sending events...")
for i in range(0, self.qtd):
task = self.loop.create_task(publish.send_event(self.payload))
tasks.append(task)
pbar.update(1)
self.loop.run_until_complete(asyncio.gather(*tasks))
for task in asyncio.all_tasks(self.loop):
task.cancel()
Classes
Producer
class Producer(
payload: bytes,
qtd: int,
exchange_name: str,
routing_key: str,
**kwargs
)
View Source
class Producer:
def __init__(
self, payload: bytes, qtd: int, exchange_name: str, routing_key: str, **kwargs
):
self.loop = asyncio.get_event_loop()
self.client = AioRabbitClient()
self.qtd = qtd
self.payload = payload
self.exchange_name = exchange_name
self.routing_key = routing_key
self.loop.run_until_complete(self.client.connect(**kwargs))
def configure_publish(self):
publish = Publish(self.client, self.exchange_name, self.routing_key)
self.loop.run_until_complete(publish.configure())
return publish
def send_event(self):
publish = self.configure_publish()
tasks = []
with tqdm(total=self.qtd, unit="event", desc="events") as pbar:
pbar.set_description("sending events...")
for i in range(0, self.qtd):
task = self.loop.create_task(publish.send_event(self.payload))
tasks.append(task)
pbar.update(1)
self.loop.run_until_complete(asyncio.gather(*tasks))
for task in asyncio.all_tasks(self.loop):
task.cancel()
Methods
configure_publish
def configure_publish(
self
)
View Source
def configure_publish(self):
publish = Publish(self.client, self.exchange_name, self.routing_key)
self.loop.run_until_complete(publish.configure())
return publish
send_event
def send_event(
self
)
View Source
def send_event(self):
publish = self.configure_publish()
tasks = []
with tqdm(total=self.qtd, unit="event", desc="events") as pbar:
pbar.set_description("sending events...")
for i in range(0, self.qtd):
task = self.loop.create_task(publish.send_event(self.payload))
tasks.append(task)
pbar.update(1)
self.loop.run_until_complete(asyncio.gather(*tasks))
for task in asyncio.all_tasks(self.loop):
task.cancel()