Module rabbit.task.standard
View Source
import asyncio
import logging
import attr
from rabbit.task import Task
logging.getLogger(__name__).addHandler(logging.NullHandler())
@attr.s
class StandardTask(Task):
async def execute(self, *args, **kwargs):
logging.debug("Starting StandardExecutor...")
logging.debug(f"args received: {args}")
logging.debug(f"kwargs receveid: {kwargs}")
attr.validate(self)
if asyncio.iscoroutinefunction(self.job):
result = await self.job(*args, **kwargs)
return [result]
return [self.job(*args, **kwargs)]
Classes
StandardTask
class StandardTask(
job: Callable = <function echo_job at 0x7f7bc6adc5f0>
)
Helper class that provides a standard way to create an ABC using inheritance.
View Source
class StandardTask(Task):
async def execute(self, *args, **kwargs):
logging.debug("Starting StandardExecutor...")
logging.debug(f"args received: {args}")
logging.debug(f"kwargs receveid: {kwargs}")
attr.validate(self)
if asyncio.iscoroutinefunction(self.job):
result = await self.job(*args, **kwargs)
return [result]
return [self.job(*args, **kwargs)]
Ancestors (in MRO)
- rabbit.task.abc.Task
- abc.ABC
Methods
execute
def execute(
self,
*args,
**kwargs
)
View Source
async def execute(self, *args, **kwargs):
logging.debug("Starting StandardExecutor...")
logging.debug(f"args received: {args}")
logging.debug(f"kwargs receveid: {kwargs}")
attr.validate(self)
if asyncio.iscoroutinefunction(self.job):
result = await self.job(*args, **kwargs)
return [result]
return [self.job(*args, **kwargs)]