Source code for conditional_kde.gaussian

"""Module containing Gaussian versions of the Conditional KDE."""

import numpy as np
from scipy.special import logsumexp
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KernelDensity

from .util import DataWhitener


[docs] class ConditionalGaussian: """Conditional Gaussian. Makes a simple Gaussian fit to the data, allowing for conditioning. Args: bandwidth (float): allows for the additional smoothing/shrinking of the covariance. In most cases, it should be left as 1. """ def __init__( self, bandwidth=1.0, ): if not isinstance(bandwidth, (int, float)): raise ValueError("Bandwidth should be a number.") self.bandwidth = bandwidth self.features = None # names of features self.dw = None # data whitener
[docs] @staticmethod def _log_prob(X, mean, cov, add_norm=True): """Log probability of a gaussian KDE distribution. Args: X (array): array of samples for which probability is calculated. Of shape `(n, n_features)`. mean (array): mean of a gaussian distribution. cov (float, array): covariance matrix of a gaussian distribution. If float, it is a variance shared for all features. If 1D array, it is a variance for every feature separately. if 2D array, it is a full covariance matrix. add_norm (bool): either to add normalization factor to the calculation or not. Returns: Log probabilities. """ n_features = mean.shape[-1] X = np.atleast_2d(X) if X.shape[-1] != n_features: raise ValueError("`n_features` of samples should be the same as the mean.") if not isinstance(cov, (int, float, np.ndarray)): raise TypeError( f"`cov` should be a number or `numpy.ndarray`, " f"but is {type(cov).__name__}" ) if isinstance(cov, (int, float)): Σ_inv = np.identity(n_features, dtype=np.float32) / cov Σ_det = cov**n_features elif isinstance(cov, np.ndarray): if len(cov.shape) == 1: if len(cov) != n_features: raise ValueError("`cov` should be of length `n_features`.") Σ_inv = np.diag(1 / cov) Σ_det = np.prod(cov) elif len(cov.shape) == 2: if cov.shape != (n_features, n_features): raise ValueError( "`cov` should be of shape `(n_features, n_features)`." ) Σ_inv = np.linalg.inv(cov) Σ_det = np.linalg.det(cov) else: raise ValueError( "Dimensionality of a covariance matrix cannot be larger than 2." ) delta = X - mean log_prob = -0.5 * np.einsum("ij,jk,ik->i", delta, Σ_inv, delta) if add_norm: norm = 0.5 * n_features * np.log(2 * np.pi) + 0.5 * np.log(Σ_det) return log_prob - norm else: return log_prob
[docs] @staticmethod def _covariance_decomposition(cov, cond_mask, cond_only=False): """Decomposing covariance matrix into the unconditional, conditional and cross terms. Args: cov (array): covariance matrix. cond_mask (array): boolean array defining conditional dimensions. cond_only (bool): to return only conditional matrix or all decompositions. Returns: If `cond_only is True`, only conditional part of the covariance, otherwise: conditional, unconditional and cross parts, respectively. """ if len(cov) != len(cond_mask): raise ValueError( "Dimensionality of `cov` and `cond_mask` should be the same." ) if len(cov.shape) != 2 or cov.shape[0] != cov.shape[-1]: raise ValueError("`cov` should be 2D square matrix.") mask_cond = np.outer(cond_mask, cond_mask) shape_cond = (sum(cond_mask), sum(cond_mask)) if cond_only: return cov[mask_cond].reshape(shape_cond) else: mask_uncond = np.outer(~cond_mask, ~cond_mask) shape_uncond = (sum(~cond_mask), sum(~cond_mask)) mask_cross = np.outer(cond_mask, ~cond_mask) shape_cross = (sum(cond_mask), sum(~cond_mask)) return ( cov[mask_cond].reshape(shape_cond), cov[mask_uncond].reshape(shape_uncond), cov[mask_cross].reshape(shape_cross), )
[docs] def fit( self, X, weights=None, features=None, ): """Fitting the Conditional Kernel Density. Args: X (array): data of shape `(n_samples, n_features)`. weights (array): weights of every sample, of shape `(n_samples)`. features (list): optional, list defining names for every feature. It's used for referencing conditional dimensions. Defaults to `[0, 1, ..., n_features - 1]`. Returns: An instance of itself. """ n_samples, n_features = X.shape if features is None: self.features = list(range(n_features)) else: if not isinstance(features, list): raise TypeError("`features` should be a `list`.") elif n_features != len(features): raise ValueError( f"`n_features` ({n_features}) should be equal to " f"the length of `features` ({len(features)})." ) elif len(features) != len(set(features)): raise ValueError("All `features` should be unique.") self.features = features self.dw = DataWhitener("ZCA") self.dw.fit(X, weights, save_data=False) return self
[docs] def score_samples(self, X, conditional_features=None): """Compute the (un)conditional log-probability of each sample under the model. Args: X (array): data of shape `(n, n_features)`. Last dimension should match dimension of training data `(n_features)`. conditional_features (list): subset of `self.features`, which dimensions of data to condition upon. Defaults to `None`, meaning unconditional log-probability. Returns: Conditional log probability for each sample in `X`. """ if conditional_features is None: X = np.atleast_2d(X) return self._log_prob(X, self.dw.μ, self.dw.Σ * self.bandwidth**2) else: if not all(k in self.features for k in conditional_features): raise ValueError( "All conditional_features should be in features. If you haven't " "specified features, pick integers from `[0, 1, ..., n_features - 1]`." ) if len(conditional_features) == len(self.features): raise ValueError( "Doesn't make much sense to condition on all features. " "Probability of that is 1." ) cond_variables = [ True if f in conditional_features else False for f in self.features ] cond_variables = np.array(cond_variables, dtype=bool) X = np.atleast_2d(X) p_full = self._log_prob(X, self.dw.μ, self.dw.Σ * self.bandwidth**2) Σ_marginal = self._covariance_decomposition( self.dw.Σ * self.bandwidth**2, cond_variables, cond_only=True ) p_marginal = self._log_prob( X[:, cond_variables], self.dw.μ[:, cond_variables], Σ_marginal, ) return p_full - p_marginal
def _check_conditionals(self, conditionals, n_samples): if not isinstance(conditionals, dict): raise TypeError( "`conditional_features` should be dictionary, but is " f"{type(conditionals).__name__}." ) if not all(k in self.features for k in conditionals.keys()): raise ValueError( "All conditionals should be in features. If you haven't " "specified features, pick integers from `[0, 1, ..., n_features - 1]`." ) if len(conditionals) == len(self.features): raise ValueError("One cannot condition on all features.") if any(not isinstance(v, (float, int)) for v in conditionals.values()): if not all(isinstance(v, np.ndarray) for v in conditionals.values()): raise ValueError( "For vectorized conditionals, all should be `np.ndarray`." ) if not all(v.ndim == 1 for v in conditionals.values()): raise ValueError("For vectorized conditionals, all should be 1D.") lengths = [len(v) for v in conditionals.values()] if not all(length == lengths[0] for length in lengths): raise ValueError( "For vectorized conditionals, all should have the same length." ) vectorized_conditionals = True n_samples = lengths[0] else: vectorized_conditionals = False return vectorized_conditionals, n_samples
[docs] def sample( self, conditionals=None, n_samples=1, random_state=None, keep_dims=False, ): """Generate random samples from the conditional model. There are two modes of sampling: (1) specify conditionals as scalar values and sample `n_samples` out of distribution. (2) specify conditionals as an array, where the number of samples will be the length of an array. Args: conditionals (dict): desired variables (features) to condition upon. Dictionary keys should be only feature names from `features`. For example, if `self.features == ["a", "b", "c"]` and one would like to condition on "a" and "c", then `conditionals = {"a": cond_val_a, "c": cond_val_c}`. Conditioned values can be either `float` or `array`, where in the case of the latter, all conditioned arrays have to be of the same size. Defaults to `None`, i.e. normal KDE. n_samples (int): number of samples to generate. Ignored in the case conditional arrays have been passed in `conditionals`. Defaults to 1. random_state (np.random.RandomState, int): seed or `RandomState` instance, optional. Determines random number generation used to generate random samples. See `Glossary <random_state>`. keep_dims (bool): whether to return non-conditioned dimensions only or keep given conditional values. Defaults to `False`. Returns: Array of samples, of shape `(n_samples, n_features)` if `conditional_variables is None`, or `(n_samples, n_features - len(conditionals))` otherwise. """ if isinstance(random_state, np.random.RandomState): rs = random_state elif random_state is None or isinstance(random_state, int): rs = np.random.RandomState(seed=random_state) else: raise TypeError("`random_state` should be `int` or `RandomState`.") if conditionals is None: return rs.multivariate_normal( self.dw.μ.flatten(), self.dw.Σ * self.bandwidth**2, n_samples ) else: vectorized_conditionals, n_samples = self._check_conditionals( conditionals, n_samples ) # helping variables for conditionals cond_variables = [ True if f in conditionals.keys() else False for f in self.features ] cond_values = [ conditionals[f] for f in self.features if f in conditionals.keys() ] cond_variables = np.array(cond_variables, dtype=bool) if vectorized_conditionals: cond_values = np.stack(cond_values, axis=-1).astype(np.float32) else: cond_values = np.array(cond_values, dtype=np.float32) # decomposing the covarianve Σ_cond, Σ_uncond, Σ_cross = self._covariance_decomposition( self.dw.Σ * self.bandwidth**2, cond_variables ) # distribution is defined from corrected unconditional part Σ_cond_inv = np.linalg.inv(Σ_cond) corr_Σ = Σ_uncond - Σ_cross.T @ Σ_cond_inv @ Σ_cross corr_μ = ( self.dw.μ[:, ~cond_variables] + (cond_values - self.dw.μ[:, cond_variables]) @ Σ_cond_inv @ Σ_cross ) sample = np.empty((n_samples, len(self.features))) sample[:, ~cond_variables] = ( rs.multivariate_normal(np.zeros(len(corr_Σ)), corr_Σ, n_samples) + corr_μ ) if vectorized_conditionals: sample[:, cond_variables] = cond_values else: sample[:, cond_variables] = np.broadcast_to( cond_values, (n_samples, len(cond_values)) ) if keep_dims: return sample else: return sample[:, ~cond_variables]
[docs] class ConditionalGaussianKernelDensity: """Conditional Kernel Density estimator. Args: whitening_algorithm (str): data whitening algorithm, either `None`, "rescale" or "ZCA". See `util.DataWhitener` for more details. "rescale" by default. bandwidth (str, float): the width of the Gaussian centered around every point. It can be either:\n (1) "scott", using Scott's parameter,\n (2) "optimized", which minimizes cross entropy to find the optimal bandwidth, or\n (3) `float`, specifying the actual value.\n By default, it uses Scott's parameter. **kwargs: additional kwargs used in the case of "optimized" bandwidth. steps (int): how many steps to use in optimization, 10 by default.\n cv_fold (int): cross validation fold, 5 by default.\n n_jobs (int): number of jobs to run cross validation in parallel, -1 by default, i.e. using all available processors.\n verbose (int): verbosity of the cross validation run, for more details see `sklearn.model_selection.GridSearchCV`. """ def __init__( self, whitening_algorithm="rescale", bandwidth="scott", **kwargs, ): if whitening_algorithm not in [None, "rescale", "ZCA"]: raise ValueError( f"Whitening lgorithm should be None, rescale or ZCA, but is {whitening_algorithm}." ) self.algorithm = whitening_algorithm if not isinstance(bandwidth, (int, float)): if bandwidth not in ["scott", "optimized"]: raise ValueError( f"""Bandwidth should be a number, "scott" or "optimized", but has value {bandwidth} and type {type(bandwidth).__name__}.""" ) self.bandwidth = bandwidth if self.bandwidth == "optimized": self.bandwidth_kwargs = { "steps": kwargs.get("steps", 10), "cv_fold": kwargs.get("cv_fold", 5), "n_jobs": kwargs.get("n_jobs", -1), "verbose": kwargs.get("verbose", 0), } else: self.bandwidth_kwargs = {} self.features = None # names of features self.dw = None # data whitener
[docs] @staticmethod def log_scott(n_samples, n_features): """Scott's parameter.""" return -1 / (n_features + 4) * np.log10(n_samples)
[docs] @staticmethod def _log_prob(X, data, cov, add_norm=True): """Log probability of a gaussian KDE distribution. Args: X (array): array of samples for which probability is calculated. Of shape `(n, n_features)`. data (array): KDE data, of shape `(n_samples, n_features)`. cov (float, array): covariance matrix of a gaussian distribution. If float, it is a variance shared for all features. If 1D array, it is a variance for every feature separately. if 2D array, it is a full covariance matrix. add_norm (bool): either to add normalization factor to the calculation or not. Returns: Log probabilities. """ n_samples, n_features = data.shape X = np.atleast_2d(X) if X.shape[-1] != n_features: raise ValueError("`n_features` of both arrays should be the same.") if not isinstance(cov, (int, float, np.ndarray)): raise TypeError( f"`cov` should be a number or `numpy.ndarray`, " f"but is {type(cov).__name__}" ) if isinstance(cov, (int, float)): Σ_inv = np.identity(n_features, dtype=np.float32) / cov Σ_det = cov**n_features elif isinstance(cov, np.ndarray): if len(cov.shape) == 1: if len(cov) != n_features: raise ValueError("`cov` should be of length `n_features`.") Σ_inv = np.diag(1 / cov) Σ_det = np.prod(cov) elif len(cov.shape) == 2: if cov.shape != (n_features, n_features): raise ValueError( "`cov` should be of shape `(n_features, n_features)`." ) Σ_inv = np.linalg.inv(cov) Σ_det = np.linalg.det(cov) else: raise ValueError( "Dimensionality of a covariance matrix cannot be larger than 2." ) def calculate_log_prob(x): delta = x - data res = -0.5 * np.einsum("ij,jk,ik->i", delta, Σ_inv, delta) return logsumexp(res) log_prob = np.apply_along_axis(calculate_log_prob, 1, X) if add_norm: norm = ( np.log(n_samples) + 0.5 * n_features * np.log(2 * np.pi) + 0.5 * np.log(Σ_det) ) return log_prob - norm else: return log_prob
[docs] @staticmethod def _covariance_decomposition(cov, cond_mask, cond_only=False): """Decomposing covariance matrix into the unconditional, conditional and cross terms. Args: cov (array): covariance matrix. cond_mask (array): boolean array defining conditional dimensions. cond_only (bool): to return only conditional matrix or all decompositions. Returns: If `cond_only is True`, only conditional part of the covariance, otherwise: conditional, unconditional and cross parts, respectively. """ if len(cov) != len(cond_mask): raise ValueError( "Dimensionality of `cov` and `cond_mask` should be the same." ) if len(cov.shape) != 2 or cov.shape[0] != cov.shape[-1]: raise ValueError("`cov` should be 2D square matrix.") mask_cond = np.outer(cond_mask, cond_mask) shape_cond = (sum(cond_mask), sum(cond_mask)) if cond_only: return cov[mask_cond].reshape(shape_cond) else: mask_uncond = np.outer(~cond_mask, ~cond_mask) shape_uncond = (sum(~cond_mask), sum(~cond_mask)) mask_cross = np.outer(cond_mask, ~cond_mask) shape_cross = (sum(cond_mask), sum(~cond_mask)) return ( cov[mask_cond].reshape(shape_cond), cov[mask_uncond].reshape(shape_uncond), cov[mask_cross].reshape(shape_cross), )
[docs] @staticmethod def _conditional_weights( conditional_values, conditional_data, cov, optimize_memory=False ): """Weights for the sampling from the conditional distribution. They amount to the conditioned part of the gaussian for every data point. Args: conditional_values (array): of length `n_conditionals`. cond_data (array): of shape `(n_samples, n_conditionals)`. Here non-conditional dimensions are already removed. cov (float, array): covariance matrix. If float, it is a variance shared for all features. If 1D array, it is a variance for every feature separately. if 2D array, it is a full covariance matrix. optimize_memory (bool): only for the vectorized conditionals, it makes an effort to minimize memory footprint, and enlarges computational time. Returns: Normalized weights. """ if conditional_values.ndim == 1: delta = conditional_values - conditional_data if isinstance(cov, float): log_weights = -0.5 / cov * np.einsum("ij,ij->i", delta, delta) elif isinstance(cov, np.ndarray) and len(cov.shape) == 1: log_weights = -0.5 * np.einsum("ij,j,ij->i", delta, 1 / cov, delta) elif isinstance(cov, np.ndarray) and len(cov.shape) == 2: cov_inv = np.linalg.inv(cov) log_weights = -0.5 * np.einsum("ij,jk,ik->i", delta, cov_inv, delta) else: raise ValueError("`cov` cannot be more than 2D.") # calculate exp(weights) in a more stable way log_weights_sum = logsumexp(log_weights) else: if optimize_memory: if isinstance(cov, float): log_weights = ( -0.5 / cov * ( np.einsum( "ij,ij->i", conditional_values, conditional_values )[:, np.newaxis] + np.einsum("ij,ij->i", conditional_data, conditional_data)[ np.newaxis, : ] - 2 * np.einsum( "ij,kj->ik", conditional_values, conditional_data ) ) ) elif isinstance(cov, np.ndarray) and len(cov.shape) == 1: log_weights = ( -0.5 * np.einsum( "ij,j,ij->i", conditional_values, 1 / cov, conditional_values, )[:, np.newaxis] - 0.5 * np.einsum( "ij,j,ij->i", conditional_data, 1 / cov, conditional_data )[np.newaxis, :] + np.einsum( "ij,j,kj->ik", conditional_values, 1 / cov, conditional_data ) ) elif isinstance(cov, np.ndarray) and len(cov.shape) == 2: cov_inv = np.linalg.inv(cov) log_weights = ( -0.5 * np.einsum( "ij,jk,ik->i", conditional_values, cov_inv, conditional_values, )[:, np.newaxis] - 0.5 * np.einsum( "ij,jk,ik->i", conditional_data, cov_inv, conditional_data )[np.newaxis, :] + np.einsum( "ij,jk,lk->il", conditional_values, cov_inv, conditional_data, ) ) else: raise ValueError("`cov` cannot be more than 2D.") else: delta = ( conditional_values[:, np.newaxis, :] - conditional_data[np.newaxis, ...] ) if isinstance(cov, float): log_weights = -0.5 / cov * np.einsum("ijk,ijk->ij", delta, delta) elif isinstance(cov, np.ndarray) and len(cov.shape) == 1: log_weights = -0.5 * np.einsum( "ijk,k,ijk->ij", delta, 1 / cov, delta ) elif isinstance(cov, np.ndarray) and len(cov.shape) == 2: cov_inv = np.linalg.inv(cov) log_weights = -0.5 * np.einsum( "ijk,kl,ijl->ij", delta, cov_inv, delta ) else: raise ValueError("`cov` cannot be more than 2D.") log_weights_sum = logsumexp(log_weights, axis=1, keepdims=True) log_weights -= log_weights_sum mask = log_weights < -22 weights = np.exp(log_weights) weights[mask] = 0.0 if conditional_values.ndim == 1: return weights / np.sum(weights) else: return weights / np.sum(weights, axis=1, keepdims=True)
[docs] def fit( self, X, features=None, ): """Fitting the Conditional Kernel Density. Args: X (array): data of shape `(n_samples, n_features)`. features (list): optional, list defining names for every feature. It's used for referencing conditional dimensions. Defaults to `[0, 1, ..., n_features - 1]`. Returns: An instance of itself. """ n_samples, n_features = X.shape if features is None: self.features = list(range(n_features)) else: if not isinstance(features, list): raise TypeError("`features` should be a `list`.") elif n_features != len(features): raise ValueError( f"`n_features` ({n_features}) should be equal to " f"the length of `features` ({len(features)})." ) elif len(features) != len(set(features)): raise ValueError("All `features` should be unique.") self.features = features self.dw = DataWhitener(self.algorithm) self.dw.fit(X, save_data=True) if self.bandwidth == "scott": self.bandwidth = 10 ** self.log_scott(n_samples, n_features) elif self.bandwidth == "optimized": log_scott = self.log_scott(n_samples, n_features) model = GridSearchCV( KernelDensity(), { "bandwidth": np.logspace( log_scott - 1, log_scott + 1, num=self.bandwidth_kwargs["steps"] ) }, cv=self.bandwidth_kwargs["cv_fold"], n_jobs=self.bandwidth_kwargs["n_jobs"], verbose=self.bandwidth_kwargs["verbose"], ) model.fit(self.dw.whitened_data) self.bandwidth = model.best_params_["bandwidth"] return self
[docs] def score_samples(self, X, conditional_features=None): """Compute the (un)conditional log-probability of each sample under the model. Args: X (array): data of shape `(n, n_features)`. Last dimension should match dimension of training data `(n_features)`. conditional_features (list): subset of `self.features`, which dimensions of data to condition upon. Defaults to `None`, meaning unconditional log-probability. Returns: Conditional log probability for each sample in `X`. """ if conditional_features is None: X = np.atleast_2d(X) return self._log_prob(X, self.dw.data, self.dw.Σ * self.bandwidth**2) else: if not all(k in self.features for k in conditional_features): raise ValueError( "All conditional_features should be in features. If you haven't " "specified features, pick integers from `[0, 1, ..., n_features - 1]`." ) if len(conditional_features) == len(self.features): raise ValueError( "Doesn't make much sense to condition on all features. " "Probability of that is 1." ) cond_variables = [ True if f in conditional_features else False for f in self.features ] cond_variables = np.array(cond_variables, dtype=bool) X = np.atleast_2d(X) p_full = self._log_prob(X, self.dw.data, self.dw.Σ * self.bandwidth**2) Σ_marginal = self._covariance_decomposition( self.dw.Σ * self.bandwidth**2, cond_variables, cond_only=True ) p_marginal = self._log_prob( X[:, cond_variables], self.dw.data[:, cond_variables], Σ_marginal, ) return p_full - p_marginal
def _check_conditionals(self, conditionals, n_samples): if not isinstance(conditionals, dict): raise TypeError( "`conditional_features` should be dictionary, but is " f"{type(conditionals).__name__}." ) if not all(k in self.features for k in conditionals.keys()): raise ValueError( "All conditionals should be in features. If you haven't " "specified features, pick integers from `[0, 1, ..., n_features - 1]`." ) if len(conditionals) == len(self.features): raise ValueError("One cannot condition on all features.") if any(not isinstance(v, (float, int)) for v in conditionals.values()): if not all(isinstance(v, np.ndarray) for v in conditionals.values()): raise ValueError( "For vectorized conditionals, all should be `np.ndarray`." ) if not all(v.ndim == 1 for v in conditionals.values()): raise ValueError("For vectorized conditionals, all should be 1D.") lengths = [len(v) for v in conditionals.values()] if not all(length == lengths[0] for length in lengths): raise ValueError( "For vectorized conditionals, all should have the same length." ) vectorized_conditionals = True n_samples = lengths[0] else: vectorized_conditionals = False return vectorized_conditionals, n_samples
[docs] def _sample( self, conditionals=None, n_samples=1, random_state=None, keep_dims=False, ): """Generate random samples from the conditional model. Here there is an assumption that all dimensions have not been distorted, but only rescaled. In other words, it works for `None` and "rescale" whitening algorithms, but not for "ZCA". """ data = self.dw.whitened_data if isinstance(random_state, np.random.RandomState): rs = random_state elif random_state is None or isinstance(random_state, int): rs = np.random.RandomState(seed=random_state) else: raise TypeError("`random_state` should be `int` or `RandomState`.") if conditionals is None: idx = rs.choice(data.shape[0], n_samples) sample = rs.normal(np.atleast_2d(data[idx]), self.bandwidth) return self.dw.unwhiten(sample) else: vectorized_conditionals, n_samples = self._check_conditionals( conditionals, n_samples ) # scaling conditional variables cond_variables = [ True if f in conditionals.keys() else False for f in self.features ] cond_variables = np.array(cond_variables, dtype=bool) if vectorized_conditionals: cond_values = np.zeros( (n_samples, len(self.features)), dtype=np.float32 ) for i, f in enumerate(self.features): if f in conditionals.keys(): cond_values[:, i] = conditionals[f] cond_values = self.dw.whiten(cond_values)[:, cond_variables] else: cond_values = np.zeros(len(self.features), dtype=np.float32) for i, f in enumerate(self.features): if f in conditionals.keys(): cond_values[i] = conditionals[f] cond_values = self.dw.whiten(cond_values)[cond_variables] weights = self._conditional_weights( cond_values, data[:, cond_variables], self.bandwidth**2 ) if vectorized_conditionals: # idx = np.apply_along_axis( # lambda x: rs.choice(data.shape[0], p=x), # 1, # weights, # ) unit_sample = rs.uniform(size=n_samples)[:, np.newaxis] idx = np.argmax(np.cumsum(weights, axis=1) > unit_sample, axis=1) else: idx = rs.choice(data.shape[0], n_samples, p=weights) # pulling the samples sample = np.empty((n_samples, len(self.features))) sample[:, ~cond_variables] = rs.normal( np.atleast_2d(data[idx])[:, ~cond_variables], self.bandwidth ) if vectorized_conditionals: sample[:, cond_variables] = cond_values else: sample[:, cond_variables] = np.broadcast_to( cond_values, (n_samples, len(cond_values)) ) sample = self.dw.unwhiten(sample) if keep_dims: return sample else: return sample[:, ~cond_variables]
[docs] def _sample_general( self, conditionals=None, n_samples=1, random_state=None, keep_dims=False, ): """Generate random samples from the conditional model. This function is the most general sampler, without any assumptions. It should be used for ZCA. """ if isinstance(random_state, np.random.RandomState): rs = random_state elif random_state is None or isinstance(random_state, int): rs = np.random.RandomState(seed=random_state) else: raise TypeError("`random_state` should be `int` or `RandomState`.") if conditionals is None: idx = rs.choice(self.dw.data.shape[0], n_samples) return rs.multivariate_normal( np.zeros(len(self.dw.Σ)), self.dw.Σ * self.bandwidth**2, n_samples ) + np.atleast_2d(self.dw.data[idx]) else: vectorized_conditionals, n_samples = self._check_conditionals( conditionals, n_samples ) # helping variables for conditionals cond_variables = [ True if f in conditionals.keys() else False for f in self.features ] cond_values = [ conditionals[f] for f in self.features if f in conditionals.keys() ] cond_variables = np.array(cond_variables, dtype=bool) if vectorized_conditionals: cond_values = np.stack(cond_values, axis=-1).astype(np.float32) else: cond_values = np.array(cond_values, dtype=np.float32) # decomposing the covarianve Σ_cond, Σ_uncond, Σ_cross = self._covariance_decomposition( self.dw.Σ * self.bandwidth**2, cond_variables ) # weights are defined from conditional part weights = self._conditional_weights( cond_values, self.dw.data[:, cond_variables], Σ_cond, ) if vectorized_conditionals: # idx = np.apply_along_axis( # lambda x: rs.choice(self.dw.data.shape[0], p=x), # 1, # weights, # ) unit_sample = rs.uniform(size=n_samples)[:, np.newaxis] idx = np.argmax(np.cumsum(weights, axis=1) > unit_sample, axis=1) else: idx = rs.choice(self.dw.data.shape[0], n_samples, p=weights) selected_data = np.atleast_2d(self.dw.data[idx]) # distribution is defined from corrected unconditional part Σ_cond_inv = np.linalg.inv(Σ_cond) corr_Σ = Σ_uncond - Σ_cross.T @ Σ_cond_inv @ Σ_cross corr_data = ( selected_data[:, ~cond_variables] + (cond_values - selected_data[:, cond_variables])
[docs] @ Σ_cond_inv @ Σ_cross ) sample = np.empty((n_samples, len(self.features))) sample[:, ~cond_variables] = ( rs.multivariate_normal(np.zeros(len(corr_Σ)), corr_Σ, n_samples) + corr_data ) if vectorized_conditionals: sample[:, cond_variables] = cond_values else: sample[:, cond_variables] = np.broadcast_to( cond_values, (n_samples, len(cond_values)) ) if keep_dims: return sample else: return sample[:, ~cond_variables]
def sample( self, conditionals=None, n_samples=1, random_state=None, keep_dims=False, ): """Generate random samples from the conditional model. There are two modes of sampling: (1) specify conditionals as scalar values and sample `n_samples` out of distribution. (2) specify conditionals as an array, where the number of samples will be the length of an array. Args: conditionals (dict): desired variables (features) to condition upon. Dictionary keys should be only feature names from `features`. For example, if `self.features == ["a", "b", "c"]` and one would like to condition on "a" and "c", then `conditionals = {"a": cond_val_a, "c": cond_val_c}`. Conditioned values can be either `float` or `array`, where in the case of the latter, all conditioned arrays have to be of the same size. Defaults to `None`, i.e. normal KDE. n_samples (int): number of samples to generate. Ignored in the case conditional arrays have been passed in `conditionals`. Defaults to 1. random_state (np.random.RandomState, int): seed or `RandomState` instance, optional. Determines random number generation used to generate random samples. See `Glossary <random_state>`. keep_dims (bool): whether to return non-conditioned dimensions only or keep given conditional values. Defaults to `False`. Returns: Array of samples, of shape `(n_samples, n_features)` if `conditional_variables is None`, or `(n_samples, n_features - len(conditionals))` otherwise. """ if self.algorithm == "ZCA": return self._sample_general( conditionals=conditionals, n_samples=n_samples, random_state=random_state, keep_dims=keep_dims, ) else: return self._sample( conditionals=conditionals, n_samples=n_samples, random_state=random_state, keep_dims=keep_dims, )