Stateful Tasks
Since functions on Flink are inherently stateful, tasks may also be written statefully. Each task entry in the pipeline has a namespace, worker_name and task_id corresponding to the namespace, address and id of a Flink Stateful Function.
By default, when composing pipelines using the PipelineBuilder, each entry is given a unique task id and therefore its own isolated state. By either entering a fixed task_id in the tasks.bind() decorator or using the .set() function when constructing a pipeline, task entries can be added to a pipeline that share the same task_id and therefore state.
@tasks.bind(with_context=True, task_id='memoised_multiply')
def memoised_multiply(context, x, y):
state = context.get_state() or {}
key = f'{x},{y}'
if not key in state:
state[key] = x * y
context.set_state(state)
return state[key]
pipeline = in_parallel([
memoised_multiply.send(3, 2), # calculates and memoises result
memoised_multiply.send(3, 2) # returns memoised result
memoised_multiply.send(3, 2).set(task_id=str(uuid4())) # has a different task id and therefore independent state
}
result = await client.submit_async(pipeline)