Polling Publisher

rabbit-client-workflow-polling

Pattern

https://microservices.io/patterns/data/polling-publisher.html

Usage Example

Subscribe & Polling Publisher

import asyncio
import logging
import os

from aiohttp import web

from rabbit.client import AioRabbitClient
from rabbit.exchange import Exchange
from rabbit.polling import PollingPublisher
from rabbit.publish import Publish
from rabbit.queue import Queue
from rabbit.subscribe import Subscribe
from rabbit.tlog.event_persist import EventPersist


logging.getLogger().setLevel(logging.DEBUG)
loop = asyncio.get_event_loop()

# consumer
subscribe_client = AioRabbitClient(loop)
loop.create_task(subscribe_client.persistent_connect())
subscribe = Subscribe(
    client=subscribe_client,
    persist=EventPersist()
)

# polling-publisher
polling_client = AioRabbitClient(loop)
loop.create_task(polling_client.persistent_connect())
publish = Publish(
    polling_client,
    exchange=Exchange(
        name=os.getenv('PUBLISH_EXCHANGE', 'default.out.exchange'),
        exchange_type=os.getenv('PUBLISH_EXCHANGE_TYPE', 'topic'),
        topic=os.getenv('PUBLISH_TOPIC', '#')
    ),
    queue=Queue(
        name=os.getenv('PUBLISH_QUEUE', 'default.publish.queue')
    )
)
polling = PollingPublisher(publish)


def configure_polling_publisher(app, polling, publish, subscribe):
    app.loop.run_until_complete(publish.configure())
    app.loop.run_until_complete(subscribe.configure())
    app.loop.create_task(asyncio.sleep(60))
    app.loop.create_task(polling.run())


print(
    "[>] Starting polling job..."
)
app = web.Application(loop=loop)
configure_polling_publisher(app, polling, publish, subscribe)
web.run_app(app, host='0.0.0.0', port=5000)

Publisher

import asyncio
import json
import os

from rabbit.client import AioRabbitClient
from rabbit.exchange import Exchange
from rabbit.publish import Publish
from rabbit.queue import Queue


loop = asyncio.get_event_loop()

client = AioRabbitClient(loop)
loop.run_until_complete(client.connect())

publish = Publish(
    client,
    exchange=Exchange(
        name=os.getenv('SUBSCRIBE_EXCHANGE', 'default.in.exchange'),
        exchange_type=os.getenv('SUBSCRIBE_EXCHANGE_TYPE', 'topic'),
        topic=os.getenv('SUBSCRIBE_TOPIC', '#')
    ),
    queue=Queue(
        name=os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')
    )
)
loop.run_until_complete(publish.configure())
print(
    "[>] Event sent to: "
    f"[exchange: {os.getenv('SUBSCRIBE_EXCHANGE', 'default.in.exchange')}"
    f" | topic: {os.getenv('SUBSCRIBE_TOPIC', '#')} | "
    f"subscribe: {os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')}]"
)

payload = {
    'document': 1,
    'description': '123',
    'pages': [
        {'body': 'abc 123'},
        {'body': 'def 456'},
        {'body': 'ghi 789'}
    ]
}

loop.run_until_complete(
    publish.send_event(
        bytes(json.dumps(payload), 'utf-8')
        # properties={'headers': {'x-delay': 5000}}
    )
)

Polling Publisher

import asyncio
import logging
import os

from rabbit.client import AioRabbitClient
from rabbit.exchange import Exchange
from rabbit.polling import PollingPublisher
from rabbit.publish import Publish
from rabbit.queue import Queue


logging.getLogger().setLevel(logging.DEBUG)
loop = asyncio.get_event_loop()

publish = Publish(
    AioRabbitClient(loop),
    exchange=Exchange(
        name=os.getenv('PUBLISH_EXCHANGE', 'default.out.exchange'),
        exchange_type=os.getenv('PUBLISH_EXCHANGE_TYPE', 'topic'),
        topic=os.getenv('PUBLISH_TOPIC', '#')
    ),
    queue=Queue(
        name=os.getenv('PUBLISH_QUEUE', 'default.publish.queue')
    )
)
loop.run_until_complete(publish.configure())
polling = PollingPublisher(publish)
# loop.run_until_complete(polling.configure())
print(
    "[>] Starting polling job..."
)

while True:
    loop.run_until_complete(asyncio.sleep(5))
    loop.run_until_complete(
        polling.run()
    )