Source code for facet.validation._validation

"""
Core implementation of :mod:`facet.validation`.
"""
import warnings
from abc import ABCMeta, abstractmethod
from typing import Any, Generator, Optional, Tuple, Union, cast

import numpy as np
import numpy.typing as npt
import pandas as pd
from sklearn.model_selection import BaseCrossValidator
from sklearn.utils import check_random_state

from pytools.api import AllTracker

__all__ = [
    "BaseBootstrapCV",
    "BootstrapCV",
    "StratifiedBootstrapCV",
    "StationaryBootstrapCV",
]


#
# Ensure all symbols introduced below are included in __all__
#

__tracker = AllTracker(globals())


#
# Class definitions
#


[docs]class BaseBootstrapCV( BaseCrossValidator, # type: ignore metaclass=ABCMeta, ): """ Base class for bootstrap cross-validators. """ def __init__( self, n_splits: int = 1000, random_state: Optional[Union[int, np.random.RandomState]] = None, ) -> None: """ :param n_splits: number of splits to generate (default: 1000) :param random_state: random state to initialise the random generator with (optional) """ if n_splits < 1: raise ValueError(f"arg n_splits={n_splits} must be a positive integer") self.n_splits = n_splits self.random_state = random_state # noinspection PyPep8Naming
[docs] def get_n_splits( self, X: Optional[Union[npt.NDArray[Any], pd.DataFrame]] = None, y: Optional[Union[npt.NDArray[Any], pd.Series, pd.DataFrame]] = None, groups: Optional[npt.ArrayLike] = None, ) -> int: """ Return the number of splits generated by this cross-validator. :param X: for compatibility only, not used :param y: for compatibility only, not used :param groups: for compatibility only, not used :return: the number of splits """ return self.n_splits
# noinspection PyPep8Naming
[docs] def split( self, X: Union[npt.NDArray[Any], pd.DataFrame], y: Union[npt.NDArray[Any], pd.Series, pd.DataFrame, None] = None, groups: Union[npt.NDArray[Any], pd.Series, pd.DataFrame, None] = None, ) -> Generator[Tuple[npt.NDArray[np.int_], npt.NDArray[np.int_]], None, None,]: """ Generate indices to split data into training and test set. :param X: features :param y: target: target variable for supervised learning problems, used as labels for stratification :param groups: ignored; exists for compatibility :return: a generator yielding `(train, test)` tuples where train and test are numpy arrays with train and test indices, respectively """ n: int = len(X) if n < 2: raise ValueError("arg X must have at least 2 rows") if y is not None and n != len(y): raise ValueError("args X and y must have the same length") if groups is not None: warnings.warn(f"ignoring arg groups={groups!r}", stacklevel=2) rs = check_random_state(self.random_state) indices: npt.NDArray[np.int_] = np.arange(n) for _ in range(self.n_splits): while True: train: npt.NDArray[np.int_] = self._select_train_indices( n_samples=n, random_state=rs, y=y ) test_mask: npt.NDArray[np.bool_] = np.ones(n, dtype=bool) test_mask[train] = False test: npt.NDArray[np.int_] = indices[test_mask] # make sure test is not empty, else sample another train set if len(test) > 0: yield train, test break
@abstractmethod def _select_train_indices( self, n_samples: int, random_state: np.random.RandomState, y: Union[npt.NDArray[Any], pd.Series, pd.DataFrame, None], ) -> npt.NDArray[np.int_]: """ :param n_samples: number of indices to sample :param random_state: random state object to be used for random sampling :param y: labels for stratification :return: an array of integer indices with shape ``[n_samples]`` """ pass # noinspection PyPep8Naming def _iter_test_indices( self, X: Any = None, y: Any = None, groups: Any = None ) -> None: # adding this stub just so all abstract methods are implemented pass
[docs]class BootstrapCV(BaseBootstrapCV): """ Bootstrapping cross-validation. Generates CV splits by random sampling with replacement. The resulting training set is the same size as the total sample; the test set consists of all samples not included in the training set. Permissible as the ``cv`` argument of :class:`~sklearn.model_selection.GridSearchCV` object. """ def _select_train_indices( self, n_samples: int, random_state: np.random.RandomState, y: Union[npt.NDArray[Any], pd.Series, pd.DataFrame, None], ) -> npt.NDArray[np.int_]: return random_state.randint(n_samples, size=n_samples)
[docs]class StratifiedBootstrapCV(BaseBootstrapCV): """ Stratified bootstrapping cross-validation. Generates CV splits by random sampling with replacement. The resulting training set is the same size as the total sample; the test set consists of all samples not included in the training set. Sampling is stratified based on a series or 1d array of group labels in the target vector. Bootstrapping is carried out separately for each group. """ def _select_train_indices( self, n_samples: int, random_state: np.random.RandomState, y: Union[npt.NDArray[Any], pd.Series, pd.DataFrame, None], ) -> npt.NDArray[np.int_]: if y is None: raise ValueError( "no target variable specified in arg y as labels for stratification" ) if isinstance(y, pd.Series): y = y.values elif not (isinstance(y, np.ndarray) and y.ndim == 1): raise ValueError( "target labels must be provided as a Series or a 1d numpy array" ) return cast( npt.NDArray[np.int_], pd.Series(np.arange(len(y))) .groupby(by=y) .apply( lambda group: group.sample( n=len(group), replace=True, random_state=random_state ) ) .values, )
[docs]class StationaryBootstrapCV(BaseBootstrapCV): """ Bootstrap for stationary time series, based on Politis and Romano (1994). This bootstrapping approach samples blocks with exponentially distributed sizes, instead of individual random observations as is the case with the regular bootstrap. Intended for use with time series that satisfy the stationarity requirement. """ def __init__( self, n_splits: int = 1000, mean_block_size: Union[int, float] = 0.5, random_state: Optional[Union[int, np.random.RandomState]] = None, ) -> None: """ :param n_splits: number of splits to generate (default: 1000) :param mean_block_size: mean size of coherent blocks to sample. If an ``int``, use this as the absolute number of blocks. If a ``float``, must be in the range (0.0, 1.0) and denotes a block size relative to the total number samples. (default: 0.5) :param random_state: random state to initialise the random generator with (optional) """ super().__init__(n_splits=n_splits, random_state=random_state) if isinstance(mean_block_size, int): if mean_block_size < 2: raise ValueError( f"arg mean_block_size={mean_block_size} must be at least 2" ) elif isinstance(mean_block_size, float): if mean_block_size <= 0.0 or mean_block_size >= 1.0: raise ValueError( f"arg mean_block_size={mean_block_size} must be > 0.0 and < 1.0" ) else: raise TypeError(f"invalid type for arg mean_block_size={mean_block_size}") self.mean_block_size = mean_block_size def _select_train_indices( self, n_samples: int, random_state: np.random.RandomState, y: Union[npt.NDArray[Any], pd.Series, pd.DataFrame, None], ) -> npt.NDArray[np.int_]: mean_block_size = self.mean_block_size if mean_block_size < 1: # if mean block size was set as a percentage, calculate the actual mean # block size mean_block_size = n_samples * mean_block_size p_new_block = 1.0 / mean_block_size train = np.empty(n_samples, dtype=np.int64) for i in range(n_samples): if i == 0 or random_state.uniform() <= p_new_block: idx = random_state.randint(n_samples) else: # noinspection PyUnboundLocalVariable idx += 1 if idx >= n_samples: idx = 0 train[i] = idx return train
__tracker.validate()