Tasks

Registering a Task

Any Python function can be attributed with tasks.bind() to register it as a Flink Task. Multiple tasks may be invoked by a single Stateful Function:

@tasks.bind()
def multiply(x, y):
    return x * y


@tasks.bind()
def subtract(x, y):
    return x - y


@functions.bind("example/worker", specs=tasks.value_specs())
async def worker(context, message):
    await tasks.run_async(context, message)

Error Handling

Exceptions thrown by a task can either be passed back to the caller or they can trigger a retry. Retry parameters are set using a RetryPolicy:

@tasks.bind(retry_policy=RetryPolicy(retry_for=[ValueError], max_retries=2, delay=timedelta(seconds=5), exponential_back_off=True))
def unreliable_task():
    ...

Calling a Task

Tasks can be called using the FlinkTasksClient:

result = multiply(3, 2)             # direct function call for testing

task = multiply.send(3, 2)
result = await client.submit_async(task)  # indirect invocation as a Flink Task

Accessing the Context

A wrapper around the Flink Statefun context can be accessed by declaring so in @tasks.bind():

@tasks.bind(with_context=True)
def task_using_context(context):
    caller = context.get_caller_id()
    return f'{caller}, you called me'