Deployment Topologies
A basic deployment topology involves an embedded pipeline function connected to an ingress topic plus a number of worker functions.
This code
from statefun_tasks import FlinkTasks()
tasks = FlinkTasks(
default_namespace="example", # default namespace for worker tasks
default_worker_name="generic_worker", # default type for worker tasks
egress_type_name="example/kafka-generic-egress", # egress to use for emitting results
embedded_pipeline_namespace="example", # namespace of the embedded pipeline function
embedded_pipeline_type="embedded_pipeline") # type of the embedded pipeline function
@tasks.bind(worker_name='cpu_worker'):
def a():
# do CPU bound work
pass
@tasks.bind(worker_name='gpu_worker'):
def b():
# do work that requires GPU
pass
@tasks.bind(worker_name='aio_worker'):
async def c():
# do IO bound work
pass
@tasks.bind(worker_name='generic_worker')
def example_workflow():
return a.send().continue_with(b).continue_with(c)
pipeline = example_workflow.send()
result = await client.submit_async(pipeline)
corresponds to the following setup in the Flink module.yaml
kind: io.statefun_tasks.v1/pipeline
spec:
id: example/embedded_pipeline # function namespace/type
stateExpiration: PT1M # state expiration (ISO-8601)
egress: example/kafka-generic-egress # task response egress
eventsEgress: example/kafka-generic-egress # events egress (optional)
eventsTopic: statefun-tasks.events # events topic (optional)
callbackDelayMs: 10 # callback delay (optional)
---
kind: io.statefun.endpoints.v2/http
spec:
functions: example/cpu_worker
urlPathTemplate: http://cpu_worker:8085/statefun
---
kind: io.statefun.endpoints.v2/http
spec:
functions: example/gpu_worker
urlPathTemplate: http://gpu_worker:8085/statefun
---
kind: io.statefun.endpoints.v2/http
spec:
functions: example/aio_worker
urlPathTemplate: http://aio_worker:8085/statefun
---
kind: io.statefun.endpoints.v2/http
spec:
functions: example/generic_worker
urlPathTemplate: http://generic_worker:8085/statefun
---
kind: io.statefun.kafka.v1/ingress
spec:
id: example/worker
address: kafka-broker:9092
consumerGroupId: flink-cluster-id
startupPosition:
type: earliest
topics:
- topic: statefun-tasks.requests
valueType: io.statefun_tasks.types/statefun_tasks.TaskRequest
targets:
- example/embedded_pipeline
- topic: statefun-tasks.actions
valueType: io.statefun_tasks.types/statefun_tasks.TaskActionRequest
targets:
- example/embedded_pipeline
---
kind: io.statefun.kafka.v1/egress
spec:
id: example/kafka-generic-egress
address: kafka-broker:9092
deliverySemantic:
type: exactly-once
transactionTimeout: 15min