Polling Publisher
Pattern
https://microservices.io/patterns/data/polling-publisher.html
Usage Example
Subscribe & Polling Publisher
import asyncio import logging import os from aiohttp import web from rabbit.client import AioRabbitClient from rabbit.exchange import Exchange from rabbit.polling import PollingPublisher from rabbit.publish import Publish from rabbit.queue import Queue from rabbit.subscribe import Subscribe from rabbit.tlog.event_persist import EventPersist logging.getLogger().setLevel(logging.DEBUG) loop = asyncio.get_event_loop() # consumer subscribe_client = AioRabbitClient(loop) loop.create_task(subscribe_client.persistent_connect()) subscribe = Subscribe( client=subscribe_client, persist=EventPersist() ) # polling-publisher polling_client = AioRabbitClient(loop) loop.create_task(polling_client.persistent_connect()) publish = Publish( polling_client, exchange=Exchange( name=os.getenv('PUBLISH_EXCHANGE', 'default.out.exchange'), exchange_type=os.getenv('PUBLISH_EXCHANGE_TYPE', 'topic'), topic=os.getenv('PUBLISH_TOPIC', '#') ), queue=Queue( name=os.getenv('PUBLISH_QUEUE', 'default.publish.queue') ) ) polling = PollingPublisher(publish) def configure_polling_publisher(app, polling, publish, subscribe): app.loop.run_until_complete(publish.configure()) app.loop.run_until_complete(subscribe.configure()) app.loop.create_task(asyncio.sleep(60)) app.loop.create_task(polling.run()) print( "[>] Starting polling job..." ) app = web.Application(loop=loop) configure_polling_publisher(app, polling, publish, subscribe) web.run_app(app, host='0.0.0.0', port=5000)
Publisher
import asyncio import json import os from rabbit.client import AioRabbitClient from rabbit.exchange import Exchange from rabbit.publish import Publish from rabbit.queue import Queue loop = asyncio.get_event_loop() client = AioRabbitClient(loop) loop.run_until_complete(client.connect()) publish = Publish( client, exchange=Exchange( name=os.getenv('SUBSCRIBE_EXCHANGE', 'default.in.exchange'), exchange_type=os.getenv('SUBSCRIBE_EXCHANGE_TYPE', 'topic'), topic=os.getenv('SUBSCRIBE_TOPIC', '#') ), queue=Queue( name=os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue') ) ) loop.run_until_complete(publish.configure()) print( "[>] Event sent to: " f"[exchange: {os.getenv('SUBSCRIBE_EXCHANGE', 'default.in.exchange')}" f" | topic: {os.getenv('SUBSCRIBE_TOPIC', '#')} | " f"subscribe: {os.getenv('SUBSCRIBE_QUEUE', 'default.subscribe.queue')}]" ) payload = { 'document': 1, 'description': '123', 'pages': [ {'body': 'abc 123'}, {'body': 'def 456'}, {'body': 'ghi 789'} ] } loop.run_until_complete( publish.send_event( bytes(json.dumps(payload), 'utf-8') # properties={'headers': {'x-delay': 5000}} ) )
Polling Publisher
import asyncio import logging import os from rabbit.client import AioRabbitClient from rabbit.exchange import Exchange from rabbit.polling import PollingPublisher from rabbit.publish import Publish from rabbit.queue import Queue logging.getLogger().setLevel(logging.DEBUG) loop = asyncio.get_event_loop() publish = Publish( AioRabbitClient(loop), exchange=Exchange( name=os.getenv('PUBLISH_EXCHANGE', 'default.out.exchange'), exchange_type=os.getenv('PUBLISH_EXCHANGE_TYPE', 'topic'), topic=os.getenv('PUBLISH_TOPIC', '#') ), queue=Queue( name=os.getenv('PUBLISH_QUEUE', 'default.publish.queue') ) ) loop.run_until_complete(publish.configure()) polling = PollingPublisher(publish) # loop.run_until_complete(polling.configure()) print( "[>] Starting polling job..." ) while True: loop.run_until_complete(asyncio.sleep(5)) loop.run_until_complete( polling.run() )