PipelineBuilder
- class statefun_tasks.PipelineBuilder(pipeline=None)
Builder class for creating pipelines
- Parameters:
pipeline (optional) – list of initial pipeline entries e.g. from another builder
- __init__(pipeline=None)
Methods
__init__
([pipeline])append
(task)Appends a single task onto this pipeline
append_group
(pipelines[, max_parallelism, ...])Appends tasks from another pipeline builders into a new in_parallel group inside this one
append_to
(other)Appends tasks from another pipeline builder into this one
continue_if
(condition, continuation, *args, ...)Conditionally adds continuation to the pipeline
continue_with
(continuation, *args, **kwargs)Adds continuation to the pipeline
exceptionally
(exception_task, *args, **kwargs)Adds exceptionally to the pipeline
finally_do
(finally_action, *args, **kwargs)Adds finally to the pipeline
from_proto
(pipeline_proto)Deserialises the pipeline from protobuf
Returns the initial destination of the pipeline as None - i.e. use the default ingress.
Returns a list of all tasks identifiers (namespace, worker name, id) that make up this pipeline
inline
([is_inline])Marks the pipeline as being inline (or not).
is_empty
()Tests if the pipeline contains any tasks
send
(fun, *args, **kwargs)Adds a task entry for the given Flink Task and arguments
set
([retry_policy, namespace, worker_name, ...])Sets task properties on the last entry added to the builder
set_task_defaults
(default_namespace, ...)Sets defaults on task entries if they are not set
to_proto
([serialiser])Serialises the pipeline to protobuf
to_task_request
(serialiser)Serialises the pipeline as a TaskRequest with a task type of builtin.run_pipeline
validate
()Validates the pipeline raising a ValueError if the pipeline is invalid
wait
()Causes the pipeline to automatically pause at this point
with_initial
([args, kwargs, state])Optionally sets the initial args kwargs and state to be passed to the initial tasks(s) in this pipeline
Returns true if the pipeline is has initial parameters (args, kwargs, state)
The ID of this pipeline
Returns true if the pipeline is inline or not
Attributes
Returns true if the pipeline is has initial parameters (args, kwargs, state)
The ID of this pipeline
Returns true if the pipeline is inline or not
- append(task)
Appends a single task onto this pipeline
- Parameters:
task (
Task
) – the task to append- Return type:
- Returns:
the builder
- append_group(pipelines, max_parallelism=None, return_exceptions=False, ordered=True)
Appends tasks from another pipeline builders into a new in_parallel group inside this one
- Parameters:
pipelines (
Iterable
[PipelineBuilder
]) – the other pipeline buildersmax_parallelism (option) – the maximum number of tasks to invoke in parallel for this group
return_exceptions (option) – if True then tasks that raise exceptions will not cause an aggregated exception to be thrown but instead will appear in the results
ordered (option) – if False then the results of the group will come back unordered. Unordered groups are more efficiently aggregated by Flink.
- Return type:
- Returns:
the builder
- append_to(other)
Appends tasks from another pipeline builder into this one
- Parameters:
other (
PipelineBuilder
) – the other pipeline builder- Return type:
- Returns:
the builder
- continue_if(condition, continuation, *args, **kwargs)
Conditionally adds continuation to the pipeline
- Parameters:
condition (
bool
) – the conditioncontinuation – the python function which should be decorated with @tasks.bind()
args – the task args
kwargs – the task kwargs
- Return type:
- Returns:
the builder
- continue_with(continuation, *args, **kwargs)
Adds continuation to the pipeline
- Parameters:
continuation – the python function which should be decorated with @tasks.bind()
args – the task args
kwargs – the task kwargs
- Return type:
- Returns:
the builder
- exceptionally(exception_task, *args, **kwargs)
Adds exceptionally to the pipeline
- Parameters:
exception_task – the python function which should be decorated with @tasks.bind()
args – the task args
kwargs – the task kwargs
- Return type:
- Returns:
the builder
- finally_do(finally_action, *args, **kwargs)
Adds finally to the pipeline
- Parameters:
finally_action – the python function which should be decorated with @tasks.bind()
args – the task args
kwargs – the task kwargs
- Return type:
- Returns:
the builder
- static from_proto(pipeline_proto)
Deserialises the pipeline from protobuf
- Parameters:
pipeline_proto (
Pipeline
) – the pipeline as protobuf- Return type:
- Returns:
Pipeline protobuf message
- get_destination()
Returns the initial destination of the pipeline as None - i.e. use the default ingress
- get_tasks()
Returns a list of all tasks identifiers (namespace, worker name, id) that make up this pipeline
- Return type:
list
- Returns:
list of tuples of namespace, worker name, id for each task
- property has_initial_parameters
Returns true if the pipeline is has initial parameters (args, kwargs, state)
- property id
The ID of this pipeline
- inline(is_inline=True)
Marks the pipeline as being inline (or not).
By default pipelines are not inline. Inline pipelines accept inputs from and share state with their parent task.
- Return type:
- Returns:
the builder
- is_empty()
Tests if the pipeline contains any tasks
- Returns:
true if empty, false otherwise
- property is_inline
Returns true if the pipeline is inline or not
- send(fun, *args, **kwargs)
Adds a task entry for the given Flink Task and arguments
- Parameters:
fun – the python function which should be decorated with @tasks.bind()
args – the task args
kwargs – the task kwargs
- Return type:
- Returns:
the builder
- set(retry_policy=None, namespace=None, worker_name=None, is_fruitful=None, display_name=None, task_id=None)
Sets task properties on the last entry added to the builder
- Parameters:
retry_policy (option) – the task retry policy to use
namespace (option) – the task namespace
worker_name (option) – the task worker_name
is_fruitful (option) – set to false to drop the results of tasks
display_name (option) – optional friendly name for this task
task_id (option) – optional task id for this task
- Return type:
- Returns:
the builder
- set_task_defaults(default_namespace, default_worker_name)
Sets defaults on task entries if they are not set
- Return type:
- Returns:
the builder
- to_proto(serialiser=None)
Serialises the pipeline to protobuf
- Parameters:
serialiser – the serialiser to use such as DefaultSerialiser
- Return type:
Pipeline
- Returns:
Pipeline protobuf message
- to_task_request(serialiser)
Serialises the pipeline as a TaskRequest with a task type of builtin.run_pipeline
- Parameters:
serialiser – the serialiser to use such as DefaultSerialiser
- Return type:
TaskRequest
- Returns:
TaskRequest protobuf message
- validate()
Validates the pipeline raising a ValueError if the pipeline is invalid
- Return type:
- Returns:
the builder
- wait()
Causes the pipeline to automatically pause at this point
- Return type:
- Returns:
the builder
- with_initial(args=Ellipsis, kwargs=Ellipsis, state=Ellipsis)
Optionally sets the initial args kwargs and state to be passed to the initial tasks(s) in this pipeline
- Parameters:
args (option) – arguments as tuple or TupleOfAny
state (option) – state
kwargs (option) – keyword arguments as dict or MapStringToAny
- Return type:
- Returns:
the builder