Module rabbit.task.process
View Source
import asyncio
import logging
from concurrent.futures import ProcessPoolExecutor
from functools import partial
import attr
from rabbit import loop
from rabbit.task import Task
logging.getLogger(__name__).addHandler(logging.NullHandler())
@attr.s
class ProcessTask(Task):
process = attr.ib(
type=ProcessPoolExecutor,
default=ProcessPoolExecutor(max_workers=1),
validator=attr.validators.instance_of(ProcessPoolExecutor),
)
_loop = attr.ib(default=None)
async def execute(self, *args, **kwargs):
logging.debug("Starting ProcessPoolExecutor...")
logging.debug(f"args received: {args}")
logging.debug(f"kwargs receveid: {kwargs}")
attr.validate(self)
task = [
self.loop.run_in_executor(self.process, partial(self.job, *args, **kwargs))
]
completed, *_ = await asyncio.wait(task)
return (t.result() for t in completed)
@property
def loop(self):
if not self._loop:
return loop()
return self._loop
Classes
ProcessTask
class ProcessTask(
job: Callable = <function echo_job at 0x7f7bc6adc5f0>,
process: concurrent.futures.process.ProcessPoolExecutor = <concurrent.futures.process.ProcessPoolExecutor object at 0x7f7bc6156c10>,
loop=None
)
Helper class that provides a standard way to create an ABC using inheritance.
View Source
class ProcessTask(Task):
process = attr.ib(
type=ProcessPoolExecutor,
default=ProcessPoolExecutor(max_workers=1),
validator=attr.validators.instance_of(ProcessPoolExecutor),
)
_loop = attr.ib(default=None)
async def execute(self, *args, **kwargs):
logging.debug("Starting ProcessPoolExecutor...")
logging.debug(f"args received: {args}")
logging.debug(f"kwargs receveid: {kwargs}")
attr.validate(self)
task = [
self.loop.run_in_executor(self.process, partial(self.job, *args, **kwargs))
]
completed, *_ = await asyncio.wait(task)
return (t.result() for t in completed)
@property
def loop(self):
if not self._loop:
return loop()
return self._loop
Ancestors (in MRO)
- rabbit.task.abc.Task
- abc.ABC
Class variables
process
Instance variables
loop
Methods
execute
def execute(
self,
*args,
**kwargs
)
View Source
async def execute(self, *args, **kwargs):
logging.debug("Starting ProcessPoolExecutor...")
logging.debug(f"args received: {args}")
logging.debug(f"kwargs receveid: {kwargs}")
attr.validate(self)
task = [
self.loop.run_in_executor(self.process, partial(self.job, *args, **kwargs))
]
completed, *_ = await asyncio.wait(task)
return (t.result() for t in completed)