Source code for fluxus.functional.conduit._consumer
# -----------------------------------------------------------------------------
# © 2024 Boston Consulting Group. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
"""
Implementation of ``DictConsumer``.
"""
from __future__ import annotations
import logging
import time
from collections import defaultdict
from collections.abc import AsyncIterable
from typing import Any, cast
from pytools.api import inheritdoc
from pytools.expression.repr import ListWithExpressionRepr
from ... import AsyncConsumer
from .._result import RunResult
from ..product import DictProduct
from ._conduit import DictConduit
log = logging.getLogger(__name__)
__all__ = [
"DictConsumer",
]
#
# Classes
#
[docs]
@inheritdoc(match="""[see superclass]""")
class DictConsumer(DictConduit, AsyncConsumer[DictProduct, RunResult]):
"""
A consumer of dictionary products.
Combines all dictionaries into one or more lists of dictionaries.
If the ingoing flow is sequential, the output will be a list of dictionaries.
If the ingoing flow is concurrent, the output will be a list of lists of
dictionaries, where each list corresponds to one distinct path through the
flow.
"""
#: If ``True``, include start and end timestamps for each step in the lineage
#: attributes; if ``False``, do not include timestamps.
timestamps: bool
def __init__(self, *, name: str = "output", timestamps: bool = False) -> None:
"""
:param name: the name of this consumer
:param timestamps: if ``True``, include start and end timestamps for each step
in the lineage attributes; if ``False``, do not include timestamps
"""
super().__init__(name=name)
self.timestamps = timestamps
[docs]
async def aconsume(
self, products: AsyncIterable[tuple[int, DictProduct]]
) -> RunResult:
"""[see superclass]"""
lineage_by_group: dict[int, list[dict[str, dict[str, Any]]]] = defaultdict(
ListWithExpressionRepr
)
# Get the timestamps flag
timestamps = self.timestamps
# Initialize summary statistics
run_start = time.perf_counter()
cumulative_time = 0.0
async for group, end_product in products:
# Get the lineage attributes for the end product, which is a dictionary
# of dictionaries. The outer dictionary maps product names to their
# attributes, and the inner dictionaries map attribute names to their
# values.
attributes = end_product.get_lineage_attributes()
# Iterate over products and their attribute dicts
for product, product_attributes in zip(
cast(list[DictProduct], end_product.get_lineage()), attributes.values()
):
# Get the time stamps from the product
start_time = product.start_time
end_time = product.end_time
# Update summary statistics
cumulative_time += end_time - start_time
if timestamps:
# Add the time stamps to the attributes
product_attributes[DictProduct.KEY_START_TIME] = (
start_time - run_start
)
product_attributes[DictProduct.KEY_END_TIME] = end_time - run_start
lineage_by_group[group].append(attributes)
# Log a summary message
run_duration = time.perf_counter() - run_start
summary_message = (
f"Run took {run_duration :g} seconds, with {cumulative_time:g} seconds "
"of total wait time."
)
if run_duration > 0:
speedup = cumulative_time / run_duration
summary_message += (
f" Concurrent execution achieved a speedup factor of {speedup:g} "
f"over sequential execution."
)
log.info(summary_message)
# Return the lineage by group
n_groups = max(lineage_by_group.keys()) + 1
if n_groups == 1:
# If there is only one group, return the lineage for that group as a list
return RunResult(lineage_by_group[0])
else:
# If there are multiple groups, return the lineage for each group in a
# separate list
return RunResult(*(lineage_by_group[group] for group in range(n_groups)))