Module rabbit.publish
None
None
View Source
import asyncio
import os
import attr
from .client import AioRabbitClient
@attr.s(slots=True)
class Publish:
_client = attr.ib(
type=AioRabbitClient,
validator=attr.validators.instance_of(AioRabbitClient),
repr=False,
)
exchange_name = attr.ib(
type=str,
default=os.getenv("PUBLISH_EXCHANGE_NAME", "default.in.exchange"),
validator=attr.validators.instance_of(str),
)
routing_key = attr.ib(
type=str,
default=os.getenv("PUBLISH_ROUTING_KEY", "#"),
validator=attr.validators.instance_of(str),
)
_channel = attr.ib(init=False, repr=False)
async def configure(self) -> None:
await asyncio.sleep(1)
self._channel = await self._client.get_channel()
loop = asyncio.get_running_loop()
# loop.create_task(self._client.watch(self), name="publish_watcher")
loop.create_task(self._client.watch(self))
async def send_event(self, payload: bytes, **kwargs) -> None:
await self._channel.publish(
payload=payload,
exchange_name=self.exchange_name,
routing_key=self.routing_key,
**kwargs,
)
Classes
Publish
class Publish(
client: rabbit.client.AioRabbitClient,
exchange_name: str = 'default.in.exchange',
routing_key: str = '#'
)
View Source
@attr.s(slots=True)
class Publish:
_client = attr.ib(
type=AioRabbitClient,
validator=attr.validators.instance_of(AioRabbitClient),
repr=False,
)
exchange_name = attr.ib(
type=str,
default=os.getenv("PUBLISH_EXCHANGE_NAME", "default.in.exchange"),
validator=attr.validators.instance_of(str),
)
routing_key = attr.ib(
type=str,
default=os.getenv("PUBLISH_ROUTING_KEY", "#"),
validator=attr.validators.instance_of(str),
)
_channel = attr.ib(init=False, repr=False)
async def configure(self) -> None:
await asyncio.sleep(1)
self._channel = await self._client.get_channel()
loop = asyncio.get_running_loop()
# loop.create_task(self._client.watch(self), name="publish_watcher")
loop.create_task(self._client.watch(self))
async def send_event(self, payload: bytes, **kwargs) -> None:
await self._channel.publish(
payload=payload,
exchange_name=self.exchange_name,
routing_key=self.routing_key,
**kwargs,
)
Class variables
exchange_name
routing_key
Instance variables
exchange_name
routing_key
Methods
configure
def configure(
self
) -> None
View Source
async def configure(self) -> None:
await asyncio.sleep(1)
self._channel = await self._client.get_channel()
loop = asyncio.get_running_loop()
# loop.create_task(self._client.watch(self), name="publish_watcher")
loop.create_task(self._client.watch(self))
send_event
def send_event(
self,
payload: bytes,
**kwargs
) -> None
View Source
async def send_event(self, payload: bytes, **kwargs) -> None:
await self._channel.publish(
payload=payload,
exchange_name=self.exchange_name,
routing_key=self.routing_key,
**kwargs,
)