Module rabbit.task.abc

View Source
import abc

from typing import Callable

import attr

from rabbit import echo_job

@attr.s

class Task(abc.ABC):

    job = attr.ib(

        type=Callable, default=echo_job, validator=attr.validators.is_callable()

    )

    async def execute(self, *args, **kwargs):

        raise NotImplementedError

Classes

Task

class Task(
    job: Callable = <function echo_job at 0x7f7bc6adc5f0>
)

Helper class that provides a standard way to create an ABC using inheritance.

View Source
class Task(abc.ABC):

    job = attr.ib(

        type=Callable, default=echo_job, validator=attr.validators.is_callable()

    )

    async def execute(self, *args, **kwargs):

        raise NotImplementedError

Ancestors (in MRO)

  • abc.ABC

Descendants

  • rabbit.task.process.ProcessTask
  • rabbit.task.standard.StandardTask

Class variables

job

Methods

execute
def execute(
    self,
    *args,
    **kwargs
)
View Source
    async def execute(self, *args, **kwargs):

        raise NotImplementedError