BatchTask
Module to improve Prefect concurrency operations.
BatchTask
Wraps a Task
to perform Task.map
in batches, reducing the number of
concurrent tasks, mellowing Prefect API requests, and allowing for faster
failure detection.
from prefect import flow, task
from prefecto.concurrency import BatchTask
@task
def add(a, b):
return a + b
@flow
def my_flow():
batch_add = BatchTask(add, 2)
return batch_add.map([1,2,3,4], [2,3,4,5])
The kill_switch
argument can be used to stop the execution of the task
after a certain condition is met.
from prefect import flow, task
from prefecto.concurrency import BatchTask, AnyFailedSwitch
@task
def add(a, b):
return a + b
@flow
def my_flow():
batch_add = BatchTask(add, 2, kill_switch=AnyFailedSwitch())
# This will error on the first batch and stop the execution.
return batch_add.map([1,2,3,4], [1,'2',3,4])
See kill switches for more information.
__init__(task, size, kill_switch=None)
Wrap the task
to be executed in batches of size
.
Args:
task (Task): The task to wrap.
size (int): The size of the batches to perform `Task.map` on.
kill_switch (KillSwitch, optional): A kill switch to stop the execution of the task
after a certain condition is met.
map(*args, **kwds)
Perform a Task.map
operation in batches of the keyword arguments. The
arguments must be iterables of equal length.
Args:
*args: Positional arguments to pass to the task.
**kwds: Keyword arguments to pass to the task.
Returns: |
|
---|
Examples:
```python
from prefect import flow, task
from prefecto.concurrency import BatchTask
@task
def add(a, b):
return a + b
@flow
def my_flow():
batch_add = BatchTask(add, 2)
return batch_add.map([1,2,3,4], [2,3,4,5])
print(my_flow())
```
```log
$ python my_flow.py
01:31:51.012 | INFO | prefect.engine - Created flow run 'beryl-moth' for flow 'test'
01:31:52.238 | DEBUG | Flow run 'beryl-moth' - Mapping 'add' batch 1 of 2.
01:31:52.239 | INFO | Flow run 'beryl-moth' - Created task run 'add-0' for task 'add'
01:31:52.240 | INFO | Flow run 'beryl-moth' - Submitted task run 'add-0' for execution.
01:31:52.253 | INFO | Flow run 'beryl-moth' - Created task run 'add-1' for task 'add'
01:31:52.254 | INFO | Flow run 'beryl-moth' - Submitted task run 'add-1' for execution.
01:31:52.259 | DEBUG | Flow run 'beryl-moth' - Mapping 'add' batch 2 of 2.
01:31:52.258 | INFO | Flow run 'beryl-moth' - Created task run 'add-3' for task 'add'
01:31:52.258 | INFO | Flow run 'beryl-moth' - Submitted task run 'add-3' for execution.
01:31:52.260 | INFO | Flow run 'beryl-moth' - Created task run 'add-2' for task 'add'
01:31:52.261 | INFO | Flow run 'beryl-moth' - Submitted task run 'add-2' for execution.
01:31:52.675 | INFO | Task run 'add-1' - Finished in state Completed()
01:31:52.770 | INFO | Task run 'add-0' - Finished in state Completed()
01:31:52.885 | INFO | Task run 'add-2' - Finished in state Completed()
01:31:53.075 | INFO | Task run 'add-3' - Finished in state Completed()
01:31:53.979 | INFO | Flow run 'beryl-moth' - Finished in state Completed()
```
```json
[3, 5, 7, 9]
```