Creating a Flow with custom classes for Producers, Transformers, and Consumers#

Introduction#

This notebook introduces the object-oriented API of fluxus. You will learn how to:

  • create your own classes for producers, transformers, and consumers

  • compose these conduits using the >> operator for chaining and the & operator for concurrent execution

  • build flows for asynchronous execution using the asynchronous versions of the classes

Overview#

Flows consist of building blocks that create, transform, or collect data items. We call these data items products, and the building blocks conduits.

There are three types of conduits:

  • Producers: These generate products.

  • Transformers: These modify or process the products.

  • Consumers: These collect the final products.

Creating Conduit classes#

To build your own flow using the object-oriented API, you first need to define classes for the conduits in your flow (for an easier start with fluxus, consider first experimenting with the functional API where all products are dictionaries, and no custom classes are required).

All of these classes must be a subclass of one of Producer, Transformer, or Consumer. All of these classes are subclasses of Conduit.

We will show examples for defining producers, transformers and consumers in the following sections, and will then use these to build and run a complete flow.

Producers#

When creating a Producer subclass, you need to specify the type of products it generates using a type hint. Implement the produce method to yield products of this type.

Imagine a scenario where the Producer generates a sequence of numbers:

[1]:
from collections.abc import Iterator
from fluxus import Producer
from fluxus.core.transformer._transformer_base import (
    T_SourceProduct_arg,
    T_TransformedProduct_ret,
)


class NumberProducer(Producer[int]):

    def __init__(self, n: int) -> None:
        self.n = n

    def produce(self) -> Iterator[int]:
        yield from range(1, self.n + 1)

Note that the type hint for the Producer parent class matches the type hint for the iterator.

Producers are iterable so we can easily test them by iterating over them:

[2]:
list(NumberProducer(5))
[2]:
[1, 2, 3, 4, 5]

Transformers#

When creating a Transformer subclass, you need to specify the input and output (product) types using type hints. Implement the transform method to process input products and yield transformed products of the specified output type.

In our example, we will use Transformer classes to perform mathematical operations. We will create one transformer that doubles each number and another that calculates the square.

[3]:
from fluxus import Transformer


class DoublingTransformer(Transformer[int, int]):
    def transform(self, x: int) -> Iterator[int]:
        yield x * 2


class SquareTransformer(Transformer[int, int]):
    def transform(self, x: int) -> Iterator[int]:
        yield x * x

Once more, note that the type hints used for the Transformer base class match the input and output types of the transform method.

Consumers#

When creating a Consumer subclass, you need to specify the type of products it consumes and the type of output it generates using type hints (use None as the output type if there is no output). Implement the consume method to handle the consumed products.

The following Consumer collects all processed numbers into a list. Note that the consumer method receives products as an iterator of tuples, where the first element is the index of the path taken through the flow and the second element is the product itself. The index is useful when the flow branches into multiple paths, and you want to distinguish between the products generated by each path. We will explore this in more detail further down.

[4]:
from fluxus import Consumer


class NumberConsumer(Consumer[int, list[int]]):
    def consume(self, products: Iterator[tuple[int, int]]) -> list[int]:
        # ignore the path index and return the numbers
        return list(number for _, number in products)

Building a flow#

Now that we have defined the classes for the conduits, we can build a flow by chaining them together using the >> operator.

In this example, we will generate numbers from 1 to 5, double then square them, and then collect the results in a list.

The flow is constructed as follows:

[5]:
flow = (
    NumberProducer(n=5)
    >> DoublingTransformer()
    >> SquareTransformer()
    >> NumberConsumer()
)

The >> operator chains the conduits together, passing the product of one conduit as the input to the next.

If Graphviz is installed, we can visualize the flow:

[6]:
flow
[6]:
Flow NumberProducer_4949763216 NumberProducer n=5 DoublingTransformer_4949758352 DoublingTransformer NumberProducer_4949763216->DoublingTransformer_4949758352 SquareTransformer_4949873680 SquareTransformer NumberConsumer_4949878224 NumberConsumer SquareTransformer_4949873680->NumberConsumer_4949878224 DoublingTransformer_4949758352->SquareTransformer_4949873680

Note that the graph includes the n property of the NumberProducer. All __init__ parameters of a conduit that have a matching instance attribute will be included in representations of the flow, as is the case with the n parameter of the NumberProducer.

More styles are available via the style argument of the draw method:

[7]:
flow.draw(style="graph_dark")
../../_images/user_guide_introduction_to_fluxus_building_a_flow_18_0.svg

We can now run the flow:

[8]:
flow.run()
[8]:
[4, 16, 36, 64, 100]

Parallel flows#

In many cases, you will want to run multiple conduits concurrently. This can be achieved using the & operator.

In the following example, we will generate numbers from 1 to 5. For each of these numbers, we will once calculate the square and once the doubled value, in parallel. We will then collect the results in a list.

The flow is constructed as follows:

[9]:
flow = (
    NumberProducer(n=5)
    >> (DoublingTransformer() & SquareTransformer())
    >> NumberConsumer()
)

The flow looks like this:

[10]:
flow
[10]:
Flow SquareTransformer_4970669328 SquareTransformer NumberConsumer_4968685840 NumberConsumer SquareTransformer_4970669328->NumberConsumer_4968685840 NumberProducer_4970669648 NumberProducer n=5 NumberProducer_4970669648->SquareTransformer_4970669328 DoublingTransformer_4970669584 DoublingTransformer NumberProducer_4970669648->DoublingTransformer_4970669584 DoublingTransformer_4970669584->NumberConsumer_4968685840

We run the flow as before:

[11]:
flow.run()
[11]:
[2, 4, 6, 8, 10, 1, 4, 9, 16, 25]

The Passthrough transformer#

In the previous example, we used the & operator to run two transformers in parallel. A special Passthrough transformer can be included in a parallel composition of conduits to pass the input product through without modification.

Applying the Passthrough transformer to the previous example, we get:

[12]:
from fluxus import Passthrough

flow_passthrough = (
    NumberProducer(n=5)
    >> (DoublingTransformer() & SquareTransformer() & Passthrough())
    >> NumberConsumer()
)

The flow looks like this:

[13]:
flow_passthrough
[13]:
Flow NumberConsumer_4959518928 NumberConsumer DoublingTransformer_4963709264 DoublingTransformer DoublingTransformer_4963709264->NumberConsumer_4959518928 SquareTransformer_4963698128 SquareTransformer SquareTransformer_4963698128->NumberConsumer_4959518928 NumberProducer_4959444688 NumberProducer n=5 NumberProducer_4959444688->NumberConsumer_4959518928 NumberProducer_4959444688->DoublingTransformer_4963709264 NumberProducer_4959444688->SquareTransformer_4963698128

Running the flow:

[14]:
flow_passthrough.run()
[14]:
[2, 4, 6, 8, 10, 1, 4, 9, 16, 25, 1, 2, 3, 4, 5]

Asynchronous Flows#

The real power of the object-oriented API is that it allows you to run conduits concurrently using Python’s asynchronous capabilities. To do this, you need to use the async versions of the producer, transformer, and consumer classes: AsyncProducer, AsyncTransformer, and AsyncConsumer.

Instead of implementing the produce, transform, and consume methods, you need to implement the aproduce, atransform, and aconsume methods, respectively. Calls to the original methods will create an event loop and will defer the execution of the asynchronous methods.

In the following example, we will run the same flow as before, but this time we will use the asynchronous versions of the classes.

[15]:
import random
import asyncio
from collections.abc import AsyncIterator
from fluxus import AsyncProducer, AsyncTransformer, AsyncConsumer


class AsyncNumberProducer(AsyncProducer[int]):

    def __init__(self, n: int) -> None:
        self.n = n

    async def aproduce(self) -> AsyncIterator[int]:
        for i in range(1, self.n + 1):
            yield i


class AsyncDoublingTransformer(AsyncTransformer[int, int]):

    async def atransform(self, x: int) -> AsyncIterator[int]:
        yield x * 2


class AsyncSquareTransformer(AsyncTransformer[int, int]):

    async def atransform(self, x: int) -> AsyncIterator[int]:
        yield x * x


class AsyncNumberConsumer(AsyncConsumer[int, list[int]]):

    async def aconsume(self, products: AsyncIterator[tuple[int, int]]) -> list[int]:
        return [number async for _, number in products]


flow_async = (
    AsyncNumberProducer(n=5)
    >> AsyncDoublingTransformer()
    >> AsyncSquareTransformer()
    >> AsyncNumberConsumer()
)

If you are already inside an event loop (as is the case in Jupyter notebooks), you can run the asynchronous flow using the asynchronous arun method:

[16]:
await flow_async.arun()
[16]:
[4, 16, 36, 64, 100]

Alternatively, calling the synchronous run method on an asynchronous flow will create a new event loop behind the scenes and run the asynchronous methods from there:

[17]:
flow_async.run()
[17]:
[4, 16, 36, 64, 100]

While the above is a toy example which does not benefit from the asynchronous capabilities, asynchronous flows will bring dramatic performance improvements when dealing with I/O-bound tasks, such as calls to remote APIs or database queries.

Concluding remarks#

In this notebook, we have demonstrated how to create custom classes for producers, transformers, and consumers. We have built a flow by chaining these conduits together using the >> operator. We have also shown how to run multiple conduits concurrently using the & operator, and have introduced the Passthrough transformer, which can be used to pass the input product through without modification. Finally, we have demonstrated how to create and run asynchronous flows using the asynchronous versions of the classes.