Pipelines

Continuations

Tasks can be combined into a pipeline using continuations:

# task results are passed as arguments to continuations
pipeline = multiply.send(3, 2).continue_with(divide, 2)

result = await client.submit_async(pipeline)

Fan out / Fan in

The in_parallel function is used to submit tasks in parallel:

pipeline = in_parallel([
    multiply.send(3, 2),
    multiply.send(4, 5)
])

result = await client.submit_async(pipeline)

Parallel tasks results can be aggregated into a continuation:

@tasks.bind()
def average(values: list):
    return sum(values) / len(values)


pipeline = in_parallel([
    multiply.send(3, 2),
    multiply.send(4, 5)
]).continue_with(average)

Nesting is also permitted:

pipeline = in_parallel([
    multiply.send(3, 2).continue_with(divide, 2),  # continuation within in_parallel
    in_parallel([                                  # nested in_parallel
        multiply.send(4, 5),
        multiply.send(3, 4)
    ]).continue_with(average)
]).continue_with(average)

Parallel pipelines can have their concurrency limited using a semphore like max_parallelism parameter:

pipeline = in_parallel([
    multiply.send(3, 2),
    multiply.send(4, 5),
    ...
], max_parallelism=10)

result = await client.submit_async(pipeline)

Large parallelisms may be limited by the performance of a single pipeline function aggregating the results. In this case it is possible to split up the parallelism into multiple inline pipelines ‘map/reduce’ style using the num_stages parameter:

pipeline = in_parallel([
    multiply.send(3, 2),
    multiply.send(4, 5),
    ...
], num_stages=10)

result = await client.submit_async(pipeline)

Passing State

State can be shared and passed across tasks in a pipeline. Tasks that access state should declare so in @tasks.bind():

@tasks.bind(with_state=True)       # sets initial state
def multiply(state, x, y):
    state = 10
    return state, x * y


@tasks.bind()                      # state is passed across
def subtract(x, y):
    return x - y


@tasks.bind(with_state=True)       # accesses state
def add_state(state, x):
    return state, state + x


pipeline = multiply.send(3, 2) \   # 6
    .continue_with(subtract, 1) \  # 5
    .continue_with(add_state)      # 15

Error Handling

Any task within a pipeline may throw exceptions and if not caught by a retry these will terminate the pipeline. Unhandled exceptions are returned to the client as they are with single tasks.

Exceptions can be caught using exceptionally tasks

@tasks.bind()
def handle_error(ex):
    # handle error either by returning a result
    # or raising a new exception


pipeline = multiply.send(3, 2).exceptionally(handle_error)

It is possible to have more than one exceptionally task in a pipeline

pipeline = a.send().exceptionally(b).continue_with(c).exceptionally(d).finally_do(e)

Pipelines may also include a finally_do task as their final step which will be called regardless of success or failure. This is a good place to put any clean up logic.

The finally_do task is non-fruitful so the result of the pipeline is the result of the previous task (or exception):

@tasks.bind(with_state=True)
def cleanup(state, *args):
    # do cleanup


pipeline = multiply.send(3, 2).finally_do(cleanup)

Setting Initial Parameters

Consider a pipeline that multiplies in parallel the numbers 1 to 10000 by 2.

pipeline = in_parallel([
    multiply.send(2, 1),
    multiply.send(2, 2),
    ...
    multiply.send(2, 10000)
], num_stages=10)

When serialised to protobuf the first parameter to each function is repeated in each serialised task. To reduce message size this parameter can be set on the pipeline using the ‘with_initial’ function on the PipelineBuilder:

pipeline = in_parallel([
    multiply.send(1),
    multiply.send(2),
    ...
    multiply.send(10000)
], num_stages=10).with_initial(args=2)

Initial kwargs and task state may also be set this way.

Orchestrator Tasks

Tasks may also return pipelines allowing for dynamic workflows with features such as:

Composition

@tasks.bind()
def multiply_and_subtract(mult_a, mult_b, sub_c):
    return multiply.send(mult_a, mult_b).continue_with(subtract, sub_c)


pipeline = multiply_and_subtract.send(3, 2, 1).continue_with(...)

Conditional Execution

@tasks.bind()
def add_positive(x, y):
    return add.send(x, y).continue_with(make_positive)


@tasks.bind()
def make_positive(x):
    if x > 0:
        return x                        # either return value
    else:
        return multiply.send(x, -1)     # or another pipeline


pipeline = add_positive.send(-3, 2)

Recursion

@tasks.bind()
def count_to_100(x):
    return add.send(x, 1).continue_with(check_result)


@tasks.bind()
def check_result(x):
    if x == 100:
        return x
    else:
        return count_to_100.send(x)     # recursive


pipeline = count_to_100.send(0)

Inline Pipelines

State is isolated by default between a parent pipeline and any child pipelines that it creates. This is done on the assumption that a pipeline that calls a task that itself creates a pipeline would rather treat that child pipeline as a so called black box implementation and hence the states should be kept independent.

This behaviour can be overriden by declaring the child pipeline as ‘inline’.