"""Important utilities."""
import itertools
import numpy as np
from scipy.interpolate import RegularGridInterpolator
# Removed import of private scipy function _ndim_coords_from_arrays
[docs]
class DataWhitener:
"""Whitening of the data.
Implements several algorithms, depending on the desired whitening properties.
Args:
algorithm (str): either `None`, `"center"`, `"rescale"`, `"PCA"` or `"ZCA"`.
**None** - leaves the data as is. **center** - calculates mean in
every dimension and removes it from the data. **rescale** - calculates
mean and standard deviation in each dimension and rescales it to zero-mean,
unit-variance. In the absence of high correlations between dimensions, this is often sufficient.
**PCA** - data is transformed into its PCA space and divided by the
standard deviation of each dimension. **ZCA** - equivalent to the `"PCA"`,
with additional step of rotating back to original space, i.e. the
orientation of space is preserved.
"""
def __init__(self, algorithm="rescale"):
if algorithm not in [None, "center", "rescale", "PCA", "ZCA"]:
raise ValueError("algorithm should be None, center, rescale, PCA or ZCA.")
self.algorithm = algorithm
self.μ = None # mean
self.Σ = None # covariance matrix
self.W = None # whitening matrix
self.WI = None # unwhitening matrix
[docs]
def fit(self, X, weights=None, save_data=False):
"""Fitting the whitener on the data X.
Args:
X (array): of shape `(n_samples, n_dim)`.
weights (array): of shape `(n_samples,)`, weights of each sample in `X`.
save_data (bool): if `True`, saves the data and whitened data as
`self.data`, `self.whitened_data`.
Returns:
Whitened array.
"""
if weights is None:
weights = np.ones((len(X), 1), dtype=X.dtype)
else:
if len(weights) != len(X):
raise ValueError("Weights and X should be of the same length.")
weights = weights / np.sum(weights) * len(weights)
weights = weights.reshape(-1, 1)
if self.algorithm is None:
self.μ = np.zeros((1, X.shape[-1]), dtype=X.dtype)
self.Σ = np.identity(X.shape[-1], dtype=X.dtype)
else:
# self.μ = np.mean(X, axis=0, keepdims=True)
self.μ = np.sum(weights * X, axis=0, keepdims=True) / np.sum(weights)
dX = X - self.μ
self.Σ = np.einsum("ji,jk->ik", weights * dX, dX) / (np.sum(weights) - 1)
if self.algorithm == "rescale":
# self.Σ = np.diag(np.var(X, axis=0))
self.Σ = np.diag(np.diag(self.Σ))
elif self.algorithm in ["PCA", "ZCA"]:
# self.Σ = np.cov(X.T)
evals, evecs = np.linalg.eigh(self.Σ)
if self.algorithm is None:
self.W = np.identity(X.shape[-1], dtype=X.dtype)
self.WI = np.identity(X.shape[-1], dtype=X.dtype)
elif self.algorithm == "rescale":
self.W = np.diag(np.diag(self.Σ) ** (-1 / 2))
self.WI = np.diag(np.diag(self.Σ) ** (1 / 2))
elif self.algorithm == "PCA":
self.W = np.einsum("ij,kj->ik", np.diag(evals ** (-1 / 2)), evecs)
self.WI = np.einsum("ij,jk->ik", evecs, np.diag(evals ** (1 / 2)))
elif self.algorithm == "ZCA":
self.W = np.einsum("ij,jk,lk->il", evecs, np.diag(evals ** (-1 / 2)), evecs)
self.WI = np.einsum("ij,jk,lk->il", evecs, np.diag(evals ** (1 / 2)), evecs)
if save_data:
self.data = X
self.whitened_data = self.whiten(X)
[docs]
def whiten(self, X):
"""Whiten the data by making it unit covariance.
Args:
X (array): of shape `(n_samples, n_dims)`.
Data to whiten. `n_dims` has to be the same as self.data.
Returns:
Whitened data, of shape `(n_samples, n_dims)`.
"""
if self.algorithm is None:
return X
if len(X.shape) == 1:
X = np.expand_dims(X, axis=0)
squeeze = True
else:
squeeze = False
if self.algorithm == "center":
X_whitened = X - self.μ
else:
X_whitened = np.einsum("ij,kj->ki", self.W, X - self.μ)
return np.squeeze(X_whitened) if squeeze else X_whitened
[docs]
def unwhiten(self, X_whitened):
"""Un-whiten the sample with whitening parameters from the data.
Args:
X_whitened (array): array_like, shape `(n_samples, n_dims)`
Sample of the data to un-whiten.
`n_dims` has to be the same as `self.data`.
Returns:
Unwhitened data, of shape `(n_samples, n_dims)`.
"""
if self.algorithm is None:
return X_whitened
if len(X_whitened.shape) == 1:
X_whitened = np.expand_dims(X_whitened, axis=0)
squeeze = True
else:
squeeze = False
if self.algorithm == "center":
X = X_whitened + self.μ
else:
X = np.einsum("ij,kj->ki", self.WI, X_whitened) + self.μ
return np.squeeze(X) if squeeze else X
[docs]
class Interpolator(RegularGridInterpolator):
"""Regular grid interpolator.
Inherits from `scipy.interpolate.RegularGridInterpolator`.
The difference with respect to the original class is to make
weights and edges explicitly visible, for the more general usage case.
Args:
points (list): list of lists, where every sub-list defines the grid points.
values (array): array of function values to interpolate.
If not given, `Interpolator` won't return any values, only weights and edges.
method (str): either "linear" or "nearest", defining the interpolation method.
As the name says, "linear" returns linearly interpolated value
and "nearest" only the nearest value on the grid.
bounds_error (bool): either to raise an error if the point for which
interpolation is requested is out of the grid bounds.
fill_value (float): value to be filled for out of the grid points.
"""
def __init__(
self, points, values=None, method="linear", bounds_error=True, fill_value=np.nan
):
self.interpolate_values = False if values is None else True
if values is None:
values = np.zeros(tuple(len(p) for p in points), dtype=float)
super().__init__(
points,
values,
method=method,
bounds_error=bounds_error,
fill_value=fill_value,
)
[docs]
def __call__(self, xi, method=None, return_aux=True):
"""Interpolation at coordinates.
If values were not passed during initialization, it doesn't interpolate
but returns grid coordinates and weights only.
Args:
xi (array): the coordinates to sample the gridded data at, of shape `(..., ndim)`.
method (str): The method of interpolation to perform.
Supported are "linear" and "nearest".
return_aux (bool): If `True`, return includes grid coordinates and weights.
Returns:
If function values were set during initialization, returns an array of interpolated values.
If `return_aux is True` it will further return grid coordinates for every item in `xi`.
In the case method is "nearest", this will be only one relevant coordinate per `xi` sample,
or multiple ones for "linear" method. For the latter, also weights of every coordinate will be returned.
"""
if self.interpolate_values is False and return_aux is False:
raise ValueError(
"Please either define `values` or set `return_aux` to `True`. "
"Otherwise there's nothing to compute."
)
method = self.method if method is None else method
if method not in ["linear", "nearest"]:
raise ValueError(f"Method {method} is not defined")
ndim = len(self.grid)
# Replace _ndim_coords_from_arrays with explicit handling
if isinstance(xi, np.ndarray):
if xi.ndim == 1:
# 1D array: interpret as single point with ndim coordinates
xi = xi.reshape(1, -1)
else:
# xi is a tuple/list of coordinate arrays
xi = np.column_stack([np.asarray(x).ravel() for x in xi])
if xi.shape[-1] != len(self.grid):
raise ValueError(
f"The requested sample points xi have dimension "
f"{xi.shape[1]}, but this RegularGridInterpolator has "
f"dimension {ndim}"
)
xi_shape = xi.shape
xi = xi.reshape(-1, xi_shape[-1])
if self.bounds_error:
for i, p in enumerate(xi.T):
if not np.logical_and(
np.all(self.grid[i][0] <= p), np.all(p <= self.grid[i][-1])
):
raise ValueError(
f"One of the requested xi is out of bounds in dimension {i}"
)
# In newer scipy versions, _find_indices only returns 2 values
find_indices_result = self._find_indices(xi.T)
if len(find_indices_result) == 2:
indices, norm_distances = find_indices_result
# Calculate out_of_bounds manually
out_of_bounds = np.zeros(xi.shape[0], dtype=bool)
for i, (p, grid) in enumerate(zip(xi.T, self.grid)):
out_of_bounds |= (p < grid[0]) | (p > grid[-1])
else:
indices, norm_distances, out_of_bounds = find_indices_result
if method == "linear":
result, edges, weights = self._evaluate_linear(
indices, norm_distances, out_of_bounds
)
elif method == "nearest":
result, edges = self._evaluate_nearest(
indices, norm_distances, out_of_bounds
)
if not self.bounds_error and self.fill_value is not None:
result[out_of_bounds] = self.fill_value
result = result.reshape(xi_shape[:-1] + self.values.shape[ndim:])
out = ()
if self.interpolate_values:
out = out + (result,)
if return_aux:
out = out + (edges,)
if method == "linear":
out = out + (weights,)
return out[0] if len(out) == 1 else out
def _evaluate_linear(self, indices, norm_distances, out_of_bounds):
# slice for broadcasting over trailing dimensions in self.values
vslice = (slice(None),) + (None,) * (self.values.ndim - len(indices))
# find relevant values
# each i and i+1 represents a edge
edges = list(itertools.product(*[[i, i + 1] for i in indices]))
weights = []
values = 0.0
for edge_indices in edges:
weight = 1.0
for ei, i, yi in zip(edge_indices, indices, norm_distances):
weight *= np.where(ei == i, 1 - yi, yi)
values += np.asarray(self.values[edge_indices]) * weight[vslice]
weights.append(weight[vslice])
return values, edges, weights
def _evaluate_nearest(self, indices, norm_distances, out_of_bounds):
idx_res = [
np.where(yi <= 0.5, i, i + 1) for i, yi in zip(indices, norm_distances)
]
edges = tuple(idx_res)
return self.values[edges], edges