Source code for lours.dataset.split.balanced_groups

from typing import Any

import numpy as np
import pandas as pd
from ot import dist, emd2, sinkhorn2
from scipy.stats import entropy


[docs] def earth_mover_distance( left: pd.Series, right: pd.Series, continuous_weights: pd.Series, sinkhorn_lambda: float = 0, ) -> float: """Compute earth mover distance between two columns of a dataframe. Note: In the case of ``sinkhorn_lambda`` > 0 this uses the sinkhorn algorithm for a faster approximate value. See :func:`ot.sinkhorn2` Args: left: input Series that represents histograms (not necessarily normalized), and the index represent the histogram bins right: input Series that represents histograms (not necessarily normalized), and the index represent the histogram bins. Note that ``left`` and ``right`` don't necessarily share the same bins. continuous_weights: Series of index level names to consider in the ``left_right_df`` dataframe for the sinkhorn algorithm. sinkhorn_lambda: regularization weight for sinkhorn algorithm. If 0, will use literal earth mover distance without regularization (slower but more accurate). Defaults to 0. Returns: distance between the two histograms """ if len(continuous_weights) == 0: return 0 left_sum, right_sum = left.sum(), right.sum() if left_sum == 0 or right_sum == 0: return float("inf") left = left[left > 0] right = right[right > 0] left_bins = ( left.index.to_frame()[continuous_weights.index].astype(float) * continuous_weights ).to_numpy() right_bins = ( right.index.to_frame()[continuous_weights.index].astype(float) * continuous_weights ).to_numpy() distance_matrix = dist(left_bins, right_bins, metric="cityblock") distance_matrix = distance_matrix / distance_matrix.max() normalized_left = (left / left.sum()).to_numpy() normalized_right = (right / right.sum()).to_numpy() if sinkhorn_lambda == 0: return emd2( normalized_left, normalized_right, distance_matrix ) # pyright: ignore else: return sinkhorn2( normalized_left, normalized_right, distance_matrix, reg=sinkhorn_lambda ) # pyright: ignore
[docs] def check_groups( histogram: pd.DataFrame | pd.Series, category_groups: pd.Series, continuous_groups: pd.Series, ) -> None: """Check that histogram and groups are well-formed. Namely: - There should be no overlap between the two groups - histogram must have as many index dimensions as the total number of groups - histogram multi-index names must be unique - there should be a bijection between histogram index names and given category and continuous groups Args: histogram: Series or DataFrame with one or two columns, and a multi index whose names must match the next two groups category_groups: Series whose index are names of category groups, which should be contained in the histogram index continuous_groups: Series whose index are names of continuous groups, which should be contained in the histogram index Raises: AssertionError: raises an error when histogram and groups don't respect aforementioned criteria """ if len(category_groups) == 0 and len(continuous_groups) == 0: raise AssertionError("no group to use pandas' groupby on") elif len(category_groups) == 0: total_groups = continuous_groups.index elif len(continuous_groups) == 0: total_groups = category_groups.index else: total_groups = pd.concat([category_groups, continuous_groups]).index assert ( total_groups.is_unique ), "category and continuous groups must have unique and non overlapping values" histogram_names = set(histogram.index.names) assert len(histogram_names) == len( histogram.index.names ), "histogram multi index must have index with exclusive names" assert histogram_names == set(total_groups), ( "category and continuous group must be a " "perfect partition of the histogram index" ) return
[docs] def hist_distance( left: pd.Series, right: pd.Series, category_weights: pd.Series, continuous_weights: pd.Series, sinkhorn_lambda: float = 0, ) -> float: r"""Compute the distance between two distributions described in pandas Series representing histograms. Both index must match and may have categorical data or continuous data. Distance between categorical data is made with Kullback–Leibler divergence and distance between continuous data us made with Earth mover distance. the distance formula is then .. math:: :label: hist_cost D = \sum_{0 \le i < p} \alpha_i KL\left( P_{cat, C_i}, Q_{cat, C_i} \right) + || \beta || \sum_{i \in \Omega_{cat}} \left( P_{cat}(i) \times EMD(P^\beta(i), Q^\beta(i)) \right) where - :math:`p \in \mathbb{N}` and :math:`q \in \mathbb{N}` are respectively the number of categorical dimensions and continuous dimensions - :math:`\Omega_{cat} \subset \mathbb{N}^p` is the set of all possible categories, subdivided into :math:`p` dimensions .. math:: \Omega_{cat} &= \{ c_{0,0}, c_{1,0} \cdots, c_{n_0, 0} \} \times \cdots \times \{ c_{0, p}, \cdots, c_{n_p, p} \} \\ \Omega_{cat} &= C_0 \times \cdots \times C_p - :math:`P` is the probability function of the histogram .. math:: P : \begin{array}{lll} \Omega_{cat} \times \mathbb{R}^q & \rightarrow & [ 0, 1 ] \\ (x,y) = (x_0, \cdots, x_p, y_0 \cdots y_p) & \mapsto & P(x,y) \end{array} - :math:`P_{cat}` is the agglomeration of :math:`P` over continuous dimensions. .. math:: P_{cat} : \begin{array}{lll} \Omega_{cat} & \rightarrow & [0, 1] \\ x & \mapsto & \iint_{y \in \mathbb{R}^q} P(x, y) dy \end{array} - :math:`P_{cat, C_i}` is the agglomeration of :math:`P` over continuous dimensions and category dimensions except :math:`C_i` .. math:: P_{cat, C_i} &: C_i \rightarrow [0, 1] P(x) &= \sum_{ x' \in C_0 \times \cdots \times C_{i-1} \times C_{i+1} \times \cdots \times C_p } \iint_{y \in \mathbb{R}^q} P(x'_0, \cdots x'_{i-1}, x, x'_{i+1} \cdots x'_p, y) dy - :math:`P(x)` is the probability distribution over continuous dimensions for a particular category :math:`x \ in \Omega_{cat}`. .. math:: P(x) : \begin{array}{lll} \mathbb{R}^q & \rightarrow & [0, 1] \\ y & \mapsto & \ P(x, y) \end{array} - :math:`P^\beta(x)` is the weighted probability distribution over continuous dimensions for a particular class :math:`x` and a weight vector :math:`\beta` .. math:: P^\beta(x) &: \mathbb{R}^q \rightarrow [0, 1] P^\beta(x,y) &= P \left(x, \frac{\beta}{|| \beta ||} \odot y\right) - :math:`\alpha \in \mathbb{R}^p` and :math:`\beta \in \mathbb{R}^q` are weight vectors associated to importance of each dimensions of :math:`\Omega_{cat} \times \mathbb{R}^q` - :math:`\odot` is the Hadamard product .. math:: \beta \odot y = (\beta_j y_j)_{0 \le j < p} - :math:`KL` is the Kullback–Leibler divergence - :math:`EMD` is the Earth Mover distance Note: This formula is not symmetric, it is more suited to compare a reference distribution (the left one) to a candidate distribution (the right one). Args: left: pandas Series representing left distribution of probability (i.e. the reference) right: pandas Series representing left distribution of probability (i.e. the candidate) category_weights: weights Series vector associated with :math:`\alpha` which is applied to the KL divergence (see formula :eq:`hist_cost`). Its index must be the names of category groups, that represent ``left`` and ``right`` indexes dimensions on which to apply KL divergence. continuous_weights: weight Series vector associated with :math:`\beta` which is applied to the Earth mover's distance (see formula :eq:`hist_cost`). Its index must be the names of category groups, that represent ``left`` and ``right`` indexes dimensions on which to apply EMD. sinkhorn_lambda: regularization term applied to EMV (see :func:`earth_mover_distance`). Defaults to 0 Returns: distance between the two multimodal distributions. """ left_right = pd.concat([left, right], axis="columns", keys=["left", "right"]) check_groups(left_right, category_weights, continuous_weights) if 0 in left_right.sum().values: return float("inf") kl_div = 0 if len(category_weights) > 0: by_cat = left_right.groupby(list(category_weights.index), observed=True) by_cat_count = by_cat.sum() for axis, weight in category_weights.items(): assert isinstance(axis, str) by_cat_count = left_right.groupby(axis, observed=True).sum() axis_entropy = entropy( by_cat_count["left"].values, by_cat_count["right"].values ) kl_div += weight * axis_entropy emd_by_cat = by_cat.apply( lambda left_right_df: earth_mover_distance( left=left_right_df["left"], right=left_right_df["right"], continuous_weights=continuous_weights, sinkhorn_lambda=sinkhorn_lambda, ) ) emd = (by_cat_count["left"] * emd_by_cat).sum() / by_cat_count["left"].sum() else: emd = earth_mover_distance( left=left_right["left"], right=left_right["right"], continuous_weights=continuous_weights, sinkhorn_lambda=sinkhorn_lambda, ) return ( np.linalg.norm(category_weights.to_numpy()) * kl_div # pyright: ignore + np.linalg.norm(continuous_weights.to_numpy()) * emd )
[docs] def df_to_hist( data: pd.DataFrame, groupby: Any, full_index: pd.Index | pd.MultiIndex | None = None, ) -> pd.Series: """Convert dataframe to histograms by using pandas' `GroupBy <https://pandas.pydata.org/docs/reference/groupby.html>`__ feature Args: data: DataFrame from which the histogram will be computed. Must have the columns specified in groups option. groupby: Same ``by`` option for :meth:`pandas.DataFrame.groupby`, will be passed directly to ``data.groupby`` method. Can be a mapping, a function, a label, or a list of labels. full_index: Optional index to reindex the resulting histogram. Useful when some value have an occurrence count of 0 and thus don't appear in the induced index. Defaults to None. Returns: pandas Series with multiindex corresponding to the count of occurrences for each specified group. """ hist = data.groupby(groupby, observed=False).size() hist.name = "histogram" if full_index is None: if isinstance(hist.index, pd.MultiIndex): full_index = pd.MultiIndex.from_product(hist.index.levels) else: full_index = hist.index return hist.reindex(full_index).fillna(0)
[docs] def dataset_share_distance(left_share: pd.Series, right_share: pd.Series) -> float: r"""Compute the distance between two dataset share histograms (where bins are splits) by using Intersection over Union (IoU). We use this distance instead of KL because we don't want an infinite distance when one of the split is empty. .. math:: D = \frac{\sum_{i=0}^{n_{splits}} min(left(i), right(i))} {\sum_{i=0}^{n_{splits}} max(left(i), right(i))} Args: left_share: Series representing target histogram of split sizes. It has to be normalized. right_share: candidate histogram of split sizes Returns: distance computed """ left_share = left_share / left_share.sum() right_share = right_share / right_share.sum() intersection = np.minimum(left_share, right_share) union = np.maximum(left_share, right_share) if union.sum() == 0: print(left_share, right_share) raise ValueError() return 1 - intersection.sum() / union.sum()