artkit.flow#
This module provides a functional API for defining and executing complex and asynchronous workflows. The API is inspired by the functional programming paradigm and is designed to be simple, expressive, and composable.
It is based on the fluxus.functional
module of the fluxus library.
Please refer to the fluxus documentation for more information.
The functions step()
, chain()
, and parallel()
are used to
define the steps of a flow. Function step()
defines a single step, function
chain()
composes steps or flows sequentially, and the parallel()
composes
steps or flows in parallel.
Alternatively, the >>
operator can be used to chain steps or flows, and the
&
operator can be used to compose steps or flows in parallel.
Function passthrough()
can be included in a parallel composition to pass the
unchanged input through to the next step.
Function run()
is used to execute a flow. The flow is executed asynchronously,
and all results are returned as a RunResult
instance. Run results include
the input to the flow as well as all intermediate and final results along all paths
through the flow.
Example:
from artkit.flow import step, chain, parallel, passthrough, run
from collections.abc import AsyncIterator
from typing import Any
def add_one(x) -> dict[str, Any]:
# We can return a single dict
return dict(x=x + 1)
async def add_two(x) -> dict[str, Any]:
# We can use asynchronous functions
return dict(x=x + 2)
async def add_three_or_five(x) -> AsyncIterator[dict[str, Any]]:
# We can yield rather than return values to generate multiple products
yield dict(x=x + 3)
yield dict(x=x + 5)
flow = chain(
step("add_1", add_one),
parallel(
step("add_2", add_two),
step("add_3_or_5", add_three_or_five),
passthrough(),
),
)
result = run(flow, input=[dict(x=1), dict(x=10)])
print(result)
This will output:
RunResult(
[
{'input': {'x': 1}, 'add_1': {'x': 2}, 'add_2': {'x': 4}},
{'input': {'x': 10}, 'add_1': {'x': 11}, 'add_2': {'x': 13}}
],
[
{'input': {'x': 1}, 'add_1': {'x': 2}, 'add_3_or_5': {'x': 5}},
{'input': {'x': 1}, 'add_1': {'x': 2}, 'add_3_or_5': {'x': 7}},
{'input': {'x': 10}, 'add_1': {'x': 11}, 'add_3_or_5': {'x': 14}},
{'input': {'x': 10}, 'add_1': {'x': 11}, 'add_3_or_5': {'x': 16}}
],
[
{'input': {'x': 1}, 'add_1': {'x': 2}},
{'input': {'x': 10}, 'add_1': {'x': 11}}
]
)
The following functions and classes are provided by this module: