BatchTask

Module to improve Prefect concurrency operations.

BatchTask

Wraps a Task to perform Task.map in batches, reducing the number of concurrent tasks, mellowing Prefect API requests, and allowing for faster failure detection.

from prefect import flow, task
from prefecto.concurrency import BatchTask

@task
def add(a, b):
    return a + b

@flow
def my_flow():
    batch_add = BatchTask(add, 2)
    return batch_add.map([1,2,3,4], [2,3,4,5])

The kill_switch argument can be used to stop the execution of the task after a certain condition is met.

from prefect import flow, task
from prefecto.concurrency import BatchTask, AnyFailedSwitch

@task
def add(a, b):
    return a + b

@flow
def my_flow():
    batch_add = BatchTask(add, 2, kill_switch=AnyFailedSwitch())
    # This will error on the first batch and stop the execution.
    return batch_add.map([1,2,3,4], [1,'2',3,4])

See kill switches for more information.

__init__(task, size, kill_switch=None)

Wrap the task to be executed in batches of size.

Args:

    task (Task): The task to wrap.
    size (int): The size of the batches to perform `Task.map` on.
    kill_switch (KillSwitch, optional): A kill switch to stop the execution of the task
        after a certain condition is met.

map(*args, **kwds)

Perform a Task.map operation in batches of the keyword arguments. The arguments must be iterables of equal length.

Args:

*args: Positional arguments to pass to the task.
**kwds: Keyword arguments to pass to the task.
Returns:
  • list[PrefectFuture]

    A list of futures for each batch.

Examples:

```python
from prefect import flow, task
from prefecto.concurrency import BatchTask

@task
def add(a, b):
    return a + b

@flow
def my_flow():
    batch_add = BatchTask(add, 2)
    return batch_add.map([1,2,3,4], [2,3,4,5])

print(my_flow())
```

```log
$ python my_flow.py
01:31:51.012 | INFO    | prefect.engine - Created flow run 'beryl-moth' for flow 'test'
01:31:52.238 | DEBUG   | Flow run 'beryl-moth' - Mapping 'add' batch 1 of 2.
01:31:52.239 | INFO    | Flow run 'beryl-moth' - Created task run 'add-0' for task 'add'
01:31:52.240 | INFO    | Flow run 'beryl-moth' - Submitted task run 'add-0' for execution.
01:31:52.253 | INFO    | Flow run 'beryl-moth' - Created task run 'add-1' for task 'add'
01:31:52.254 | INFO    | Flow run 'beryl-moth' - Submitted task run 'add-1' for execution.
01:31:52.259 | DEBUG   | Flow run 'beryl-moth' - Mapping 'add' batch 2 of 2.
01:31:52.258 | INFO    | Flow run 'beryl-moth' - Created task run 'add-3' for task 'add'
01:31:52.258 | INFO    | Flow run 'beryl-moth' - Submitted task run 'add-3' for execution.
01:31:52.260 | INFO    | Flow run 'beryl-moth' - Created task run 'add-2' for task 'add'
01:31:52.261 | INFO    | Flow run 'beryl-moth' - Submitted task run 'add-2' for execution.
01:31:52.675 | INFO    | Task run 'add-1' - Finished in state Completed()
01:31:52.770 | INFO    | Task run 'add-0' - Finished in state Completed()
01:31:52.885 | INFO    | Task run 'add-2' - Finished in state Completed()
01:31:53.075 | INFO    | Task run 'add-3' - Finished in state Completed()
01:31:53.979 | INFO    | Flow run 'beryl-moth' - Finished in state Completed()
```

```json
[3, 5, 7, 9]
```