PipelineBuilder

class statefun_tasks.PipelineBuilder(pipeline=None)

Builder class for creating pipelines

Parameters:

pipeline (optional) – list of initial pipeline entries e.g. from another builder

__init__(pipeline=None)

Methods

__init__([pipeline])

append(task)

Appends a single task onto this pipeline

append_group(pipelines[, max_parallelism, ...])

Appends tasks from another pipeline builders into a new in_parallel group inside this one

append_to(other)

Appends tasks from another pipeline builder into this one

continue_if(condition, continuation, *args, ...)

Conditionally adds continuation to the pipeline

continue_with(continuation, *args, **kwargs)

Adds continuation to the pipeline

exceptionally(exception_task, *args, **kwargs)

Adds exceptionally to the pipeline

finally_do(finally_action, *args, **kwargs)

Adds finally to the pipeline

from_proto(pipeline_proto)

Deserialises the pipeline from protobuf

get_destination()

Returns the initial destination of the pipeline as None - i.e. use the default ingress.

get_tasks()

Returns a list of all tasks identifiers (namespace, worker name, id) that make up this pipeline

inline([is_inline])

Marks the pipeline as being inline (or not).

is_empty()

Tests if the pipeline contains any tasks

send(fun, *args, **kwargs)

Adds a task entry for the given Flink Task and arguments

set([retry_policy, namespace, worker_name, ...])

Sets task properties on the last entry added to the builder

set_task_defaults(default_namespace, ...)

Sets defaults on task entries if they are not set

to_proto([serialiser])

Serialises the pipeline to protobuf

to_task_request(serialiser)

Serialises the pipeline as a TaskRequest with a task type of builtin.run_pipeline

validate()

Validates the pipeline raising a ValueError if the pipeline is invalid

wait()

Causes the pipeline to automatically pause at this point

with_initial([args, kwargs, state])

Optionally sets the initial args kwargs and state to be passed to the initial tasks(s) in this pipeline

has_initial_parameters

Returns true if the pipeline is has initial parameters (args, kwargs, state)

id

The ID of this pipeline

is_inline

Returns true if the pipeline is inline or not

Attributes

has_initial_parameters

Returns true if the pipeline is has initial parameters (args, kwargs, state)

id

The ID of this pipeline

is_inline

Returns true if the pipeline is inline or not

append(task)

Appends a single task onto this pipeline

Parameters:

task (Task) – the task to append

Return type:

PipelineBuilder

Returns:

the builder

append_group(pipelines, max_parallelism=None, return_exceptions=False, ordered=True)

Appends tasks from another pipeline builders into a new in_parallel group inside this one

Parameters:
  • pipelines (Iterable[PipelineBuilder]) – the other pipeline builders

  • max_parallelism (option) – the maximum number of tasks to invoke in parallel for this group

  • return_exceptions (option) – if True then tasks that raise exceptions will not cause an aggregated exception to be thrown but instead will appear in the results

  • ordered (option) – if False then the results of the group will come back unordered. Unordered groups are more efficiently aggregated by Flink.

Return type:

PipelineBuilder

Returns:

the builder

append_to(other)

Appends tasks from another pipeline builder into this one

Parameters:

other (PipelineBuilder) – the other pipeline builder

Return type:

PipelineBuilder

Returns:

the builder

continue_if(condition, continuation, *args, **kwargs)

Conditionally adds continuation to the pipeline

Parameters:
  • condition (bool) – the condition

  • continuation – the python function which should be decorated with @tasks.bind()

  • args – the task args

  • kwargs – the task kwargs

Return type:

PipelineBuilder

Returns:

the builder

continue_with(continuation, *args, **kwargs)

Adds continuation to the pipeline

Parameters:
  • continuation – the python function which should be decorated with @tasks.bind()

  • args – the task args

  • kwargs – the task kwargs

Return type:

PipelineBuilder

Returns:

the builder

exceptionally(exception_task, *args, **kwargs)

Adds exceptionally to the pipeline

Parameters:
  • exception_task – the python function which should be decorated with @tasks.bind()

  • args – the task args

  • kwargs – the task kwargs

Return type:

PipelineBuilder

Returns:

the builder

finally_do(finally_action, *args, **kwargs)

Adds finally to the pipeline

Parameters:
  • finally_action – the python function which should be decorated with @tasks.bind()

  • args – the task args

  • kwargs – the task kwargs

Return type:

PipelineBuilder

Returns:

the builder

static from_proto(pipeline_proto)

Deserialises the pipeline from protobuf

Parameters:

pipeline_proto (Pipeline) – the pipeline as protobuf

Return type:

PipelineBuilder

Returns:

Pipeline protobuf message

get_destination()

Returns the initial destination of the pipeline as None - i.e. use the default ingress

get_tasks()

Returns a list of all tasks identifiers (namespace, worker name, id) that make up this pipeline

Return type:

list

Returns:

list of tuples of namespace, worker name, id for each task

property has_initial_parameters

Returns true if the pipeline is has initial parameters (args, kwargs, state)

property id

The ID of this pipeline

inline(is_inline=True)

Marks the pipeline as being inline (or not).

By default pipelines are not inline. Inline pipelines accept inputs from and share state with their parent task.

Return type:

PipelineBuilder

Returns:

the builder

is_empty()

Tests if the pipeline contains any tasks

Returns:

true if empty, false otherwise

property is_inline

Returns true if the pipeline is inline or not

send(fun, *args, **kwargs)

Adds a task entry for the given Flink Task and arguments

Parameters:
  • fun – the python function which should be decorated with @tasks.bind()

  • args – the task args

  • kwargs – the task kwargs

Return type:

PipelineBuilder

Returns:

the builder

set(retry_policy=None, namespace=None, worker_name=None, is_fruitful=None, display_name=None, task_id=None)

Sets task properties on the last entry added to the builder

Parameters:
  • retry_policy (option) – the task retry policy to use

  • namespace (option) – the task namespace

  • worker_name (option) – the task worker_name

  • is_fruitful (option) – set to false to drop the results of tasks

  • display_name (option) – optional friendly name for this task

  • task_id (option) – optional task id for this task

Return type:

PipelineBuilder

Returns:

the builder

set_task_defaults(default_namespace, default_worker_name)

Sets defaults on task entries if they are not set

Return type:

PipelineBuilder

Returns:

the builder

to_proto(serialiser=None)

Serialises the pipeline to protobuf

Parameters:

serialiser – the serialiser to use such as DefaultSerialiser

Return type:

Pipeline

Returns:

Pipeline protobuf message

to_task_request(serialiser)

Serialises the pipeline as a TaskRequest with a task type of builtin.run_pipeline

Parameters:

serialiser – the serialiser to use such as DefaultSerialiser

Return type:

TaskRequest

Returns:

TaskRequest protobuf message

validate()

Validates the pipeline raising a ValueError if the pipeline is invalid

Return type:

PipelineBuilder

Returns:

the builder

wait()

Causes the pipeline to automatically pause at this point

Return type:

PipelineBuilder

Returns:

the builder

with_initial(args=Ellipsis, kwargs=Ellipsis, state=Ellipsis)

Optionally sets the initial args kwargs and state to be passed to the initial tasks(s) in this pipeline

Parameters:
  • args (option) – arguments as tuple or TupleOfAny

  • state (option) – state

  • kwargs (option) – keyword arguments as dict or MapStringToAny

Return type:

PipelineBuilder

Returns:

the builder