Module rabbit.task.standard

View Source
import asyncio

import logging

import attr

from rabbit.task import Task

logging.getLogger(__name__).addHandler(logging.NullHandler())

@attr.s

class StandardTask(Task):

    async def execute(self, *args, **kwargs):

        logging.debug("Starting StandardExecutor...")

        logging.debug(f"args received: {args}")

        logging.debug(f"kwargs receveid: {kwargs}")

        attr.validate(self)

        if asyncio.iscoroutinefunction(self.job):

            result = await self.job(*args, **kwargs)

            return [result]

        return [self.job(*args, **kwargs)]

Classes

StandardTask

class StandardTask(
    job: Callable = <function echo_job at 0x7f7bc6adc5f0>
)

Helper class that provides a standard way to create an ABC using inheritance.

View Source
class StandardTask(Task):

    async def execute(self, *args, **kwargs):

        logging.debug("Starting StandardExecutor...")

        logging.debug(f"args received: {args}")

        logging.debug(f"kwargs receveid: {kwargs}")

        attr.validate(self)

        if asyncio.iscoroutinefunction(self.job):

            result = await self.job(*args, **kwargs)

            return [result]

        return [self.job(*args, **kwargs)]

Ancestors (in MRO)

  • rabbit.task.abc.Task
  • abc.ABC

Methods

execute
def execute(
    self,
    *args,
    **kwargs
)
View Source
    async def execute(self, *args, **kwargs):

        logging.debug("Starting StandardExecutor...")

        logging.debug(f"args received: {args}")

        logging.debug(f"kwargs receveid: {kwargs}")

        attr.validate(self)

        if asyncio.iscoroutinefunction(self.job):

            result = await self.job(*args, **kwargs)

            return [result]

        return [self.job(*args, **kwargs)]