FlinkTasks

class statefun_tasks.FlinkTasks(default_namespace=None, default_worker_name=None, egress_type_name=None, egress_message_max_size=None, serialiser=None, state_expiration=None, keep_task_state=False, embedded_pipeline_namespace=None, embedded_pipeline_type=None)

Flink Tasks implementation

Parameters:
  • default_namespace (Optional[str]) – namespace to expose functions under. Maps to Flink Statefun function namespace in module.yaml

  • default_worker_name (Optional[str]) – worker name to expose. Maps to Flink Statefun function type in module.yaml

  • egress_type_name (Optional[str]) – egress type name. Maps to Flink Statefun egress in module.yaml

  • egress_message_max_size (optional) – maximum size of an egress message in bytes. If specified attempts to send messages over this size will raise a MessageSizeExceeded exception

  • serialiser (optional) – serialiser to use (will use DefaultSerialiser if not set)

  • state_expiration (optional) – duration after which state will be expired by Flink (expire_after_call)

  • keep_task_state (optional) – whether to keep state (request, result) associated with tasks as well as pipelines (defaults to false)

  • embedded_pipeline_namespace (optional) – namespace of the embedded function that pipelines will be forwarded to

  • embedded_pipeline_type (optional) – type name of the embedded function that pipelines will be forwarded to

__init__(default_namespace=None, default_worker_name=None, egress_type_name=None, egress_message_max_size=None, serialiser=None, state_expiration=None, keep_task_state=False, embedded_pipeline_namespace=None, embedded_pipeline_type=None)

Methods

__init__([default_namespace, ...])

bind([namespace, worker_name, retry_policy, ...])

Decorator to bind a function as a Flink Task

clone_task_request(context)

Clones the TaskRequest associated with this TaskContext

emit_result(context, task_input, task_result)

Emits a result

extend(function[, retry_policy])

Transforms a Python function into a task

fail(context, task_input, ex[, delay, ...])

Sends a failure

get_task(task_type)

Returns the Flink Task instance for a given task type name

register(fun[, wrapper, module_name])

Registers a Python function as a Flink Task.

run_async(context, message)

Runs a Flink Task

send(func, *args, **kwargs)

Returns a PipelineBuilder with this task as the first item in the pipeline

send_result(context, task_request, result[, ...])

Sends a result

unpack_task_request(task_request)

Unpacks a TaskRequest into args, kwargs and state

value_specs()

Value specs to register in Flink Statefun's @functions.bind() attribute

events

EventHandler for this FlinkTasks instance

Attributes

events

EventHandler for this FlinkTasks instance

bind(namespace=None, worker_name=None, retry_policy=None, with_state=False, is_fruitful=True, module_name=None, with_context=False, display_name=None, task_id=None)

Decorator to bind a function as a Flink Task

Parameters:
  • namespace (Optional[str]) – namespace to use in place of the default

  • worker_name (Optional[str]) – worker name to use in place of the default

  • retry_policy (Optional[RetryPolicy]) – retry policy to use should the task throw an exception

  • with_state (bool) – whether to pass a state object as the first (second if with_context is also set) parameter. The return value should be a tuple of state, result (default False)

  • is_fruitful (bool) – whether the function produces a fruitful result or simply returns None (default True)

  • module_name (Optional[str]) – if specified then the task type used in addressing will be module_name.function_name otherwise the Python module containing the function will be used

  • with_context (bool) – whether to pass a Flink context object as the first parameter (default false)

  • display_name (Optional[str]) – optional friendly name for this task

  • task_id (Optional[str]) – optional set the fixed id for this task in order to make it stateful

clone_task_request(context)

Clones the TaskRequest associated with this TaskContext

Parameters:

context (TaskContext) – TaskContext

Return type:

TaskRequest

Returns:

a TaskRequest

async emit_result(context, task_input, task_result, delay=None, cancellation_token=None)

Emits a result

Parameters:
  • context – TaskContext

  • task_input – the incoming TaskRequest or TaskActionRequest

  • task_result – the TaskResult or TaskException to emit

  • delay (optional) – the delay before Flink sends the result

  • cancellation_token (optional) – a cancellation token to associate with this message

property events: EventHandlers

EventHandler for this FlinkTasks instance

static extend(function, retry_policy=None, **params)

Transforms a Python function into a task

fn() is extended with fn.send(), fn.to_task() and fn.defaults() attributes

Parameters:
  • function – the function to wrap

  • retry_policy (Optional[RetryPolicy]) – retry policy to use should the task throw an exception

  • params – any additional parameters to the Flink Task (such as a retry policy)

async fail(context, task_input, ex, delay=None, cancellation_token='')

Sends a failure

Parameters:
  • context – TaskContext

  • task_input – the incoming TaskRequest or TaskActionRequest

  • ex – the exception to return

  • delay (optional) – the delay before Flink sends the result

  • cancellation_token (optional) – a cancellation token to associate with this message

get_task(task_type)

Returns the Flink Task instance for a given task type name

Parameters:

task_type – task type name e.g. examples.multiply

Return type:

FlinkTask

Returns:

the Flink Task

register(fun, wrapper=None, module_name=None, **params)

Registers a Python function as a Flink Task.

Equivalent to decorating a function with @tasks.bind()

Parameters:
  • fun – the python function

  • wrapper (optional) – if wrapping a task function with e.g. functools.wraps then pass the wrapper here

  • module_name (optional) – the module name to register the task under which by default is the Python module name containing the function

  • params – any additional parameters to the Flink Task (such as a retry policy)

async run_async(context, message)

Runs a Flink Task

Parameters:
  • context (Context) – context object provided by Flink

  • message (Message) – the task input protobuf message

static send(func, *args, **kwargs)

Returns a PipelineBuilder with this task as the first item in the pipeline

Parameters:
  • func – a function decorated with @tasks.bind()

  • args – task args

  • kwargs – task kwargs

Return type:

PipelineBuilder

Returns:

a pipeline builder

async send_result(context, task_request, result, state=Ellipsis, delay=None, cancellation_token='')

Sends a result

Parameters:
  • context (TaskContext) – TaskContext

  • task_request (TaskRequest) – the incoming TaskRequest

  • result – the result(s) to return

  • state – the state to include in the result. If not specified it will be copied from the TaskRequest

  • delay (optional) – the delay before Flink sends the result

  • cancellation_token (optional) – a cancellation token to associate with this message

unpack_task_request(task_request)

Unpacks a TaskRequest into args, kwargs and state

Parameters:

task_request (TaskRequest) – TaskRequest

Return type:

tuple

Returns:

args, kwargs and state from this task_request

value_specs()

Value specs to register in Flink Statefun’s @functions.bind() attribute