FlinkTasks
- class statefun_tasks.FlinkTasks(default_namespace=None, default_worker_name=None, egress_type_name=None, egress_message_max_size=None, serialiser=None, state_expiration=None, keep_task_state=False, embedded_pipeline_namespace=None, embedded_pipeline_type=None)
Flink Tasks implementation
- Parameters:
default_namespace (
Optional
[str
]) – namespace to expose functions under. Maps to Flink Statefun function namespace in module.yamldefault_worker_name (
Optional
[str
]) – worker name to expose. Maps to Flink Statefun function type in module.yamlegress_type_name (
Optional
[str
]) – egress type name. Maps to Flink Statefun egress in module.yamlegress_message_max_size (optional) – maximum size of an egress message in bytes. If specified attempts to send messages over this size will raise a MessageSizeExceeded exception
serialiser (optional) – serialiser to use (will use DefaultSerialiser if not set)
state_expiration (optional) – duration after which state will be expired by Flink (expire_after_call)
keep_task_state (optional) – whether to keep state (request, result) associated with tasks as well as pipelines (defaults to false)
embedded_pipeline_namespace (optional) – namespace of the embedded function that pipelines will be forwarded to
embedded_pipeline_type (optional) – type name of the embedded function that pipelines will be forwarded to
- __init__(default_namespace=None, default_worker_name=None, egress_type_name=None, egress_message_max_size=None, serialiser=None, state_expiration=None, keep_task_state=False, embedded_pipeline_namespace=None, embedded_pipeline_type=None)
Methods
__init__
([default_namespace, ...])bind
([namespace, worker_name, retry_policy, ...])Decorator to bind a function as a Flink Task
clone_task_request
(context)Clones the TaskRequest associated with this TaskContext
emit_result
(context, task_input, task_result)Emits a result
extend
(function[, retry_policy])Transforms a Python function into a task
fail
(context, task_input, ex[, delay, ...])Sends a failure
get_task
(task_type)Returns the Flink Task instance for a given task type name
register
(fun[, wrapper, module_name])Registers a Python function as a Flink Task.
run_async
(context, message)Runs a Flink Task
send
(func, *args, **kwargs)Returns a PipelineBuilder with this task as the first item in the pipeline
send_result
(context, task_request, result[, ...])Sends a result
unpack_task_request
(task_request)Unpacks a TaskRequest into args, kwargs and state
Value specs to register in Flink Statefun's @functions.bind() attribute
EventHandler for this FlinkTasks instance
Attributes
EventHandler for this FlinkTasks instance
- bind(namespace=None, worker_name=None, retry_policy=None, with_state=False, is_fruitful=True, module_name=None, with_context=False, display_name=None, task_id=None)
Decorator to bind a function as a Flink Task
- Parameters:
namespace (
Optional
[str
]) – namespace to use in place of the defaultworker_name (
Optional
[str
]) – worker name to use in place of the defaultretry_policy (
Optional
[RetryPolicy
]) – retry policy to use should the task throw an exceptionwith_state (
bool
) – whether to pass a state object as the first (second if with_context is also set) parameter. The return value should be a tuple of state, result (default False)is_fruitful (
bool
) – whether the function produces a fruitful result or simply returns None (default True)module_name (
Optional
[str
]) – if specified then the task type used in addressing will be module_name.function_name otherwise the Python module containing the function will be usedwith_context (
bool
) – whether to pass a Flink context object as the first parameter (default false)display_name (
Optional
[str
]) – optional friendly name for this tasktask_id (
Optional
[str
]) – optional set the fixed id for this task in order to make it stateful
- clone_task_request(context)
Clones the TaskRequest associated with this TaskContext
- Parameters:
context (
TaskContext
) – TaskContext- Return type:
TaskRequest
- Returns:
a TaskRequest
- async emit_result(context, task_input, task_result, delay=None, cancellation_token=None)
Emits a result
- Parameters:
context – TaskContext
task_input – the incoming TaskRequest or TaskActionRequest
task_result – the TaskResult or TaskException to emit
delay (optional) – the delay before Flink sends the result
cancellation_token (optional) – a cancellation token to associate with this message
- property events: EventHandlers
EventHandler for this FlinkTasks instance
- static extend(function, retry_policy=None, **params)
Transforms a Python function into a task
fn() is extended with fn.send(), fn.to_task() and fn.defaults() attributes
- Parameters:
function – the function to wrap
retry_policy (
Optional
[RetryPolicy
]) – retry policy to use should the task throw an exceptionparams – any additional parameters to the Flink Task (such as a retry policy)
- async fail(context, task_input, ex, delay=None, cancellation_token='')
Sends a failure
- Parameters:
context – TaskContext
task_input – the incoming TaskRequest or TaskActionRequest
ex – the exception to return
delay (optional) – the delay before Flink sends the result
cancellation_token (optional) – a cancellation token to associate with this message
- get_task(task_type)
Returns the Flink Task instance for a given task type name
- Parameters:
task_type – task type name e.g. examples.multiply
- Return type:
FlinkTask
- Returns:
the Flink Task
- register(fun, wrapper=None, module_name=None, **params)
Registers a Python function as a Flink Task.
Equivalent to decorating a function with @tasks.bind()
- Parameters:
fun – the python function
wrapper (optional) – if wrapping a task function with e.g. functools.wraps then pass the wrapper here
module_name (optional) – the module name to register the task under which by default is the Python module name containing the function
params – any additional parameters to the Flink Task (such as a retry policy)
- async run_async(context, message)
Runs a Flink Task
- Parameters:
context (
Context
) – context object provided by Flinkmessage (
Message
) – the task input protobuf message
- static send(func, *args, **kwargs)
Returns a PipelineBuilder with this task as the first item in the pipeline
- Parameters:
func – a function decorated with @tasks.bind()
args – task args
kwargs – task kwargs
- Return type:
- Returns:
a pipeline builder
- async send_result(context, task_request, result, state=Ellipsis, delay=None, cancellation_token='')
Sends a result
- Parameters:
context (
TaskContext
) – TaskContexttask_request (
TaskRequest
) – the incoming TaskRequestresult – the result(s) to return
state – the state to include in the result. If not specified it will be copied from the TaskRequest
delay (optional) – the delay before Flink sends the result
cancellation_token (optional) – a cancellation token to associate with this message
- unpack_task_request(task_request)
Unpacks a TaskRequest into args, kwargs and state
- Parameters:
task_request (
TaskRequest
) – TaskRequest- Return type:
tuple
- Returns:
args, kwargs and state from this task_request
- value_specs()
Value specs to register in Flink Statefun’s @functions.bind() attribute