Source code for fluxus.functional.product._product
# -----------------------------------------------------------------------------
# © 2024 Boston Consulting Group. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
"""
Implementation of the functional API for the flow module.
"""
from __future__ import annotations
import logging
from collections.abc import Iterator, Mapping
from typing import Any
from pytools.api import inheritdoc
from pytools.repr import HasDictRepr
from ...lineage import HasLineage
log = logging.getLogger(__name__)
__all__ = [
"DictProduct",
]
[docs]
@inheritdoc(match="""[see superclass]""")
class DictProduct(HasLineage["DictProduct | None"], Mapping[str, Any], HasDictRepr):
"""
A flow product that consists of an attribute-value mapping.
The product name is the name of the step that created this product.
The attribute names must be valid Python identifiers.
"""
#: The product attribute name for the start time of the step that created this
#: product.
KEY_START_TIME = "[start]"
#: The product attribute name for the end time of the step that created this
#: product.
KEY_END_TIME = "[end]"
#: The name of this product.
name: str
#: The attributes that were changed or added by the step that created this product.
_product_attributes: Mapping[str, Any]
#: The complete set of attributes of this product, including both input and output
#: attributes; output attributes take precedence over input attributes of the same
#: name.
attributes: Mapping[str, Any]
#: The start CPU time of the step that created this product, in seconds since an
#: arbitrary starting point.
start_time: float
#: The end CPU time of the step that created this product, in seconds since an
#: arbitrary starting point.
end_time: float
#: The precursor of this product.
_precursor: DictProduct | None
def __init__(
self,
name: str,
product_attributes: Mapping[str, Any],
*,
precursor: DictProduct | None = None,
start_time: float,
end_time: float,
) -> None:
"""
:param name: the name of this product
:param product_attributes: the attributes that were changed or added by the
step that created this product, comprising both fixed attributes and
dynamically generated attributes
:param precursor: the precursor of this product (optional)
:param start_time: the start CPU time of the step that created this product, in
seconds since an arbitrary starting point
:param end_time: the end CPU time of the step that created this product, in
seconds since an arbitrary starting point
:raises TypeError: if the product attributes are not a mapping
:raises ValueError: if the attribute names are not valid Python identifiers
"""
# Validate that the attributes are a mapping with valid identifiers as keys
_validate_product_attributes(product_attributes, step_name=name)
self.name = name
self.start_time = start_time
self.end_time = end_time
self._product_attributes = product_attributes
self._precursor = precursor
# We calculate the complete set of attributes by combining the input and output,
# with product attributes (the output) taking precedence over input attributes
# of the same name.
self.attributes = (
{**precursor.attributes, **product_attributes}
if precursor
else product_attributes
)
@property
def product_name(self) -> str:
return self.name
@property
def product_attributes(self) -> Mapping[str, Any]:
"""[see superclass]"""
return self._product_attributes
@property
def precursor(self) -> DictProduct | None:
"""[see superclass]"""
return self._precursor
def __getitem__(self, __key: str) -> Any:
return self._product_attributes[__key]
def __len__(self) -> int:
return len(self._product_attributes)
def __iter__(self) -> Iterator[str]:
return iter(self._product_attributes)
def _validate_product_attributes(
attributes: Mapping[str, Any], *, step_name: str
) -> None:
"""
Validate that the output of a step is a :class:`Mapping`, and that its attribute
names are valid Python identifiers.
:param attributes: the step output to validate
:param step_name: the name of the step producing the mapping
:raises TypeError: if the step output is not a mapping
:raises ValueError: if the attribute names are invalid
"""
if not isinstance(attributes, Mapping):
raise TypeError(
f"Expected step {step_name!r} to produce mappings, but got: {attributes!r}"
)
invalid_names = [
name
for name in attributes
if not (isinstance(name, str) and name.isidentifier())
]
if invalid_names:
raise ValueError(
f"Attribute names in output of step {step_name!r} must be valid Python "
f"identifiers, but included invalid names: "
+ ", ".join(map(repr, invalid_names))
)