artkit.flow#

This module provides a functional API for defining and executing complex and asynchronous workflows. The API is inspired by the functional programming paradigm and is designed to be simple, expressive, and composable.

It is based on the fluxus.functional module of the fluxus library. Please refer to the fluxus documentation for more information.

The functions step(), chain(), and parallel() are used to define the steps of a flow. Function step() defines a single step, function chain() composes steps or flows sequentially, and the parallel() composes steps or flows in parallel.

Alternatively, the >> operator can be used to chain steps or flows, and the & operator can be used to compose steps or flows in parallel.

Function passthrough() can be included in a parallel composition to pass the unchanged input through to the next step.

Function run() is used to execute a flow. The flow is executed asynchronously, and all results are returned as a RunResult instance. Run results include the input to the flow as well as all intermediate and final results along all paths through the flow.

Example:

from artkit.flow import step, chain, parallel, passthrough, run
from collections.abc import AsyncIterator
from typing import Any

def add_one(x) -> dict[str, Any]:
    # We can return a single dict
    return dict(x=x + 1)

async def add_two(x) -> dict[str, Any]:
    # We can use asynchronous functions
    return dict(x=x + 2)

async def add_three_or_five(x) -> AsyncIterator[dict[str, Any]]:
    # We can yield rather than return values to generate multiple products
    yield dict(x=x + 3)
    yield dict(x=x + 5)

flow = chain(
    step("add_1", add_one),
    parallel(
        step("add_2", add_two),
        step("add_3_or_5", add_three_or_five),
        passthrough(),
    ),
)

result = run(flow, input=[dict(x=1), dict(x=10)])

print(result)

This will output:

RunResult(
    [
        {'input': {'x': 1}, 'add_1': {'x': 2}, 'add_2': {'x': 4}},
        {'input': {'x': 10}, 'add_1': {'x': 11}, 'add_2': {'x': 13}}
    ],
    [
        {'input': {'x': 1}, 'add_1': {'x': 2}, 'add_3_or_5': {'x': 5}},
        {'input': {'x': 1}, 'add_1': {'x': 2}, 'add_3_or_5': {'x': 7}},
        {'input': {'x': 10}, 'add_1': {'x': 11}, 'add_3_or_5': {'x': 14}},
        {'input': {'x': 10}, 'add_1': {'x': 11}, 'add_3_or_5': {'x': 16}}
    ],
    [
        {'input': {'x': 1}, 'add_1': {'x': 2}},
        {'input': {'x': 10}, 'add_1': {'x': 11}}
    ]
)

The following functions and classes are provided by this module: