Publish/Subscribe

rabbit-client-workflow

Usage Example

Consumer

import asyncio
import logging
import os

from aiohttp import web

from rabbit.client import AioRabbitClient
from rabbit.exchange import Exchange
from rabbit.publish import Publish
from rabbit.queue import Queue
from rabbit.subscribe import Subscribe
from rabbit.tlog.event_persist import EventPersist


logging.getLogger().setLevel(logging.DEBUG)
loop = asyncio.get_event_loop()

subscribe_client = AioRabbitClient(loop)
loop.create_task(subscribe_client.persistent_connect())

# publisher
publish = Publish(
    subscribe_client,
    exchange=Exchange(
        name=os.getenv('PUBLISH_EXCHANGE', 'default.out.exchange'),
        exchange_type=os.getenv('PUBLISH_EXCHANGE_TYPE', 'topic'),
        topic=os.getenv('PUBLISH_TOPIC', '#')
    ),
    queue=Queue(
        name=os.getenv('PUBLISH_QUEUE', 'default.publish.queue')
    )
)

# consumer
subscribe = Subscribe(
    client=subscribe_client,
    publish=publish
)


def configure_consumer(app, publish, subscribe):
    app.loop.run_until_complete(publish.configure())
    app.loop.run_until_complete(subscribe.configure())


print(
    "[>] Starting consumer example..."
)
app = web.Application(loop=loop)
configure_consumer(app, publish, subscribe)
web.run_app(app, host='0.0.0.0', port=5000)

Producer

import asyncio
import json
import os

from rabbit.client import AioRabbitClient
from rabbit.exchange import Exchange
from rabbit.publish import Publish
from rabbit.queue import Queue


loop = asyncio.get_event_loop()

client = AioRabbitClient(loop)
loop.run_until_complete(client.connect())

publish = Publish(
    client,
    exchange=Exchange(
        name=os.getenv('SUBSCRIBE_EXCHANGE', 'default.in.exchange'),
        exchange_type=os.getenv('SUBSCRIBE_EXCHANGE_TYPE', 'topic'),
        topic=os.getenv('SUBSCRIBE_TOPIC', '#')
    ),
    queue=Queue(
        name=os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')
    )
)
loop.run_until_complete(publish.configure())
print(
    "[>] Event sent to: "
    f"[exchange: {os.getenv('SUBSCRIBE_EXCHANGE', 'default.in.exchange')}"
    f" | topic: {os.getenv('SUBSCRIBE_TOPIC', '#')} | "
    f"subscribe: {os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')}]"
)

payload = {
    'document': 1,
    'description': '123',
    'pages': [
        {'body': 'abc 123'},
        {'body': 'def 456'},
        {'body': 'ghi 789'}
    ]
}

loop.run_until_complete(
    publish.send_event(
        bytes(json.dumps(payload), 'utf-8')
        # properties={'headers': {'x-delay': 5000}}
    )
)