TaskContext

class statefun_tasks.TaskContext(context, egress_type_name, egress_message_max_size=None, serialiser=None)

Task context wrapper around Flink context

Parameters:
  • context (Context) – Flink context

  • egress_type_name (str) – egress type name to use when calling send_egress_message()

  • egress_message_max_size (optional) – maximum size of an egress message in bytes. If specified attempts to send messages over this size will raise a MessageSizeExceeded exception

  • serialiser (optional) – serialiser to use (will use DefaultSerialiser if not set)

__init__(context, egress_type_name, egress_message_max_size=None, serialiser=None)

Methods

__init__(context, egress_type_name[, ...])

cancel_message(cancellation_token)

Attempts to cancel a message associated with this cancellation token

contextualise_from(task_request)

Sets additional context properties from the supplied task request

get_address()

Own address in the form of context.address.typename

get_caller_address()

Caller address in the form of context.caller.typename

get_caller_id()

Caller task Id in the form of context.caller.id

get_display_name()

get_namespace()

Own namespace in the form of context.address.typename

get_original_caller_address()

Original caller address irrespective of whether this task is a retry (caller is self) or not

get_original_caller_id()

Original caller id irrespective of whether this task is a retry (caller is self) or not

get_parent_pipeline_address()

Address of the parent pipeline if this task is called as part of a pipeline else None

get_parent_pipeline_id()

ID of the parent pipeline if this task is called as part of a pipeline else None

get_parent_task_address()

ID of the parent task address if this task has one else None

get_parent_task_id()

ID of the parent task if this task has one else None

get_pipeline_address()

Address of the pipeline if this task is called as part of a pipeline else None

get_pipeline_id()

ID of the pipeline if this task is called as part of a pipeline else None

get_root_pipeline_address()

Address of the top most pipeline if this task is called as part of a pipeline else None.

get_root_pipeline_id()

ID of the top most pipeline if this task is called as part of a pipeline else None.

get_state([default])

get_task_id()

Own task Id in the form of context.address.id

get_worker_name()

Own worker_name in the form of context.address.typename

safe_send_egress_message(topic, value, ...)

Attempts to send a message to an egress topic

send_egress_message(topic, value)

Sends a message to an egress topic

send_message(destination, target_id, value)

Sends a message to another Flink Task worker

send_message_after(delay, destination, ...)

Sends a message to another Flink Task worker after some delay

set_state(obj)

to_address_and_id(address)

Converts SdkAddress into a string representation

storage

task_state

task_name

The name of this task

task_uid

The unique ID of this task.

Attributes

storage

task_state

task_name

The name of this task

task_uid

The unique ID of this task.

cancel_message(cancellation_token)

Attempts to cancel a message associated with this cancellation token

Parameters:

cancellation_token (str) – the cancellation token

contextualise_from(task_request)

Sets additional context properties from the supplied task request

Parameters:

task_request (TaskRequest) – the task request

get_address()

Own address in the form of context.address.typename

Returns:

address

get_caller_address()

Caller address in the form of context.caller.typename

Returns:

address

get_caller_id()

Caller task Id in the form of context.caller.id

Returns:

task Id

get_namespace()

Own namespace in the form of context.address.typename

Returns:

address

get_original_caller_address()

Original caller address irrespective of whether this task is a retry (caller is self) or not

Returns:

address

get_original_caller_id()

Original caller id irrespective of whether this task is a retry (caller is self) or not

Returns:

address

get_parent_pipeline_address()

Address of the parent pipeline if this task is called as part of a pipeline else None

Returns:

pipeline address

get_parent_pipeline_id()

ID of the parent pipeline if this task is called as part of a pipeline else None

Returns:

pipeline ID

get_parent_task_address()

ID of the parent task address if this task has one else None

Returns:

parent task address

get_parent_task_id()

ID of the parent task if this task has one else None

Returns:

parent task ID

get_pipeline_address()

Address of the pipeline if this task is called as part of a pipeline else None

Returns:

pipeline address

get_pipeline_id()

ID of the pipeline if this task is called as part of a pipeline else None

Returns:

pipeline ID

get_root_pipeline_address()

Address of the top most pipeline if this task is called as part of a pipeline else None.

Returns:

root pipeline address

get_root_pipeline_id()

ID of the top most pipeline if this task is called as part of a pipeline else None.

This will be different from get_pipeline_id() if the pipeline is nested

Returns:

root pipeline ID

get_task_id()

Own task Id in the form of context.address.id

Returns:

task Id

get_worker_name()

Own worker_name in the form of context.address.typename

Returns:

address

safe_send_egress_message(topic, value, error_function)

Attempts to send a message to an egress topic

If this fails due to a MessageSizeExceeded exception sends the value created by the error error_function instead

Parameters:
  • topic – the topic name

  • value – the message to send

  • error_function (Callable[[Exception], Any]) – a callable to create the error if a MessageSizeExceeded exception was raised

send_egress_message(topic, value)

Sends a message to an egress topic

Parameters:
  • topic – the topic name

  • value – the message to send

send_message(destination, target_id, value, delay=None, cancellation_token='')

Sends a message to another Flink Task worker

Parameters:
  • destination – the destination to send the message to (e.g. example/worker)

  • target_id – the target Id

  • value – the message to send

  • delay (optional) – the delay (if any then same behaviour send_message_after())

  • cancellation_token (optional) – a cancellation token to associate with this message

send_message_after(delay, destination, target_id, value, cancellation_token='')

Sends a message to another Flink Task worker after some delay

Parameters:
  • delay (timedelta) – the delay

  • destination – the destination to send the message to (e.g. example/worker)

  • target_id – the target Id

  • value – the message to send

  • cancellation_token (optional) – a cancellation token to associate with this message

property task_name

The name of this task

property task_uid

The unique ID of this task.

Not to be confused with context.get_task_id() which returns the Flink statefun identity of this task

static to_address_and_id(address)

Converts SdkAddress into a string representation

Parameters:

address (SdkAddress) – SDK address

Returns:

address and id in the format namespace/type/id