import abc
from functools import partial
from typing import Any, Union, Optional
import numpy as np
from sklearn.base import BaseEstimator, clone
from sklearn.neighbors import KernelDensity
from sklearn.metrics import pairwise_distances
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.utils import check_random_state
from eli5.utils import vstack
from eli5.lime.utils import rbf
from .textutils import generate_samples, DEFAULT_TOKEN_PATTERN, TokenizedText
[docs]
class BaseSampler(BaseEstimator, metaclass=abc.ABCMeta):
"""
Base sampler class.
Sampler is an object which generates examples similar to a given example.
"""
[docs]
@abc.abstractmethod
def sample_near(self, doc, n_samples=1):
"""
Return (examples, similarity) tuple with generated documents
similar to a given document and a vector of similarity values.
"""
raise NotImplementedError()
[docs]
def fit(self, X=None, y=None):
return self
[docs]
class MaskingTextSampler(BaseSampler):
"""
Sampler for text data. It randomly removes or replaces tokens from text.
Parameters
----------
token_pattern : str, optional
Regexp for token matching
bow : bool, optional
Sampler could either replace all instances of a given token
(bow=True, bag of words sampling) or replace just a single token
(bow=False).
random_state : integer or numpy.random.RandomState, optional
random state
replacement : str
Defalt value is '' - by default tokens are removed. If you want to
preserve the total token count set ``replacement`` to a non-empty
string, e.g. 'UNKN'.
min_replace : int or float
A minimum number of tokens to replace. Default is 1, meaning 1 token.
If this value is float in range [0.0, 1.0], it is used as a ratio.
More than min_replace tokens could be replaced if group_size > 1.
max_replace : int or float
A maximum number of tokens to replace. Default is 1.0, meaning
all tokens can be replaced. If this value is float in range
[0.0, 0.1], it is used as a ratio.
group_size : int
When group_size > 1, groups of nearby tokens are replaced all
in once (each token is still replaced with a replacement).
Default is 1, meaning individual tokens are replaced.
"""
def __init__(self,
token_pattern: Optional[str] = None,
bow: bool = True,
random_state=None,
replacement: str = '',
min_replace: Union[int, float] = 1,
max_replace: Union[int, float] = 1.0,
group_size: int = 1,
):
self.token_pattern = token_pattern or DEFAULT_TOKEN_PATTERN
self.bow = bow
self.random_state = random_state
self.replacement = replacement
self.min_replace = min_replace
self.max_replace = max_replace
self.group_size = group_size
self.rng_ = check_random_state(self.random_state)
[docs]
def sample_near(self, doc: str, n_samples: int = 1) -> tuple[list[str], np.ndarray]:
docs, similarities, mask, text = self.sample_near_with_mask(
doc=doc, n_samples=n_samples
)
return docs, similarities
[docs]
def sample_near_with_mask(
self,
doc: Union[TokenizedText, str],
n_samples: int = 1,
) -> tuple[list[str], np.ndarray, np.ndarray, TokenizedText]:
if not isinstance(doc, TokenizedText):
doc = TokenizedText(doc, token_pattern=self.token_pattern)
gen_samples = partial(generate_samples, doc,
n_samples=n_samples,
replacement=self.replacement,
min_replace=self.min_replace,
max_replace=self.max_replace,
group_size=self.group_size,
random_state=self.rng_)
docs, similarity, mask = gen_samples(bow=self.bow)
return docs, similarity, mask, doc
[docs]
class MaskingTextSamplers(BaseSampler):
"""
Union of MaskingText samplers, with weights.
:meth:`sample_near` or :meth:`sample_near_with_mask` generate
a requested number of samples using all samplers; a probability of
using a sampler is proportional to its weight.
All samplers must use the same token_pattern in order for
:meth:`sample_near_with_mask` to work.
Create it with a list of {param: value} dicts
with :class:`MaskingTextSampler` paremeters.
"""
def __init__(self,
sampler_params: list[dict[str, Any]],
token_pattern: Optional[str] = None,
random_state=None,
weights: Optional[Union[np.ndarray, list[float]]] = None,
):
self.random_state = random_state
self.rng_ = check_random_state(random_state)
self.token_pattern = token_pattern
self.samplers = list(map(self._create_sampler, sampler_params))
self.weights: np.ndarray
if weights is None:
self.weights = np.ones(len(self.samplers))
else:
self.weights = np.array(weights)
self.weights /= self.weights.sum()
def _create_sampler(self, extra: dict) -> MaskingTextSampler:
params: dict[str, Any] = dict(
token_pattern=self.token_pattern,
random_state=self.rng_,
)
params.update(extra)
return MaskingTextSampler(**params)
[docs]
def sample_near(self, doc: str, n_samples: int = 1) -> tuple[list[str], np.ndarray]:
assert n_samples >= 1
all_docs: list[str] = [] # type
similarities = []
for sampler, freq in self._sampler_n_samples(n_samples):
docs, sims = sampler.sample_near(doc, n_samples=freq)
all_docs.extend(docs)
similarities.append(sims)
return all_docs, np.hstack(similarities)
[docs]
def sample_near_with_mask(
self, doc: str, n_samples: int = 1,
) -> tuple[list[str], np.ndarray, np.ndarray, TokenizedText]:
assert n_samples >= 1
assert self.token_pattern is not None
text = TokenizedText(doc, token_pattern=self.token_pattern)
all_docs: list[str] = []
similarities = []
masks = []
for sampler, freq in self._sampler_n_samples(n_samples):
docs, sims, mask, _text = sampler.sample_near_with_mask(text, freq)
all_docs.extend(docs)
similarities.append(sims)
masks.append(mask)
return all_docs, np.hstack(similarities), vstack(masks), text
def _sampler_n_samples(self, n_samples):
""" Return (sampler, n_samplers) tuples """
sampler_indices = self.rng_.choice(range(len(self.samplers)),
size=n_samples,
replace=True,
p=self.weights)
return [
(self.samplers[idx], freq)
for idx, freq in zip(*np.unique(sampler_indices, return_counts=True))
]
_BANDWIDTHS = np.hstack([
[1e-6], # for discrete features
np.logspace(-3, 4, 15) # general-purpose (0.001 ... 10000) range
])
class _BaseKernelDensitySampler(BaseSampler):
def __init__(self, kde=None, metric='euclidean', fit_bandwidth=True,
bandwidths=_BANDWIDTHS, sigma='bandwidth', n_jobs=1,
random_state=None):
if kde is None:
kde = KernelDensity(rtol=1e-7, atol=1e-7)
self.kde = kde
self.fit_bandwidth = fit_bandwidth
self.bandwidths = bandwidths
self.metric = metric
self.n_jobs = n_jobs
if not isinstance(sigma, (int, float)):
allowed = {'bandwidth'}
if sigma not in allowed:
raise ValueError("sigma must be either "
"a number or one of {}".format(allowed))
self.sigma = sigma
self.random_state = random_state
self.rng_ = check_random_state(self.random_state)
def _get_grid(self):
param_grid = {'bandwidth': self.bandwidths}
cv = KFold(n_splits=3, shuffle=True, random_state=self.rng_)
return GridSearchCV(self.kde, param_grid=param_grid, n_jobs=self.n_jobs,
cv=cv)
def _fit_kde(self, kde: KernelDensity, X: np.ndarray) -> tuple[GridSearchCV, KernelDensity]:
if self.fit_bandwidth:
grid = self._get_grid()
grid.fit(X)
return grid, grid.best_estimator_
else:
return None, clone(kde).fit(X)
def _similarity(self, doc, samples):
distance = _distances(doc, samples, metric=self.metric)
return rbf(distance, sigma=self.sigma_)
def _set_sigma(self, bandwidth):
if self.sigma == 'bandwidth':
# Sigma estimation using optimal bandwidth found by KDE.
self.sigma_ = bandwidth
else:
self.sigma_ = self.sigma
[docs]
class MultivariateKernelDensitySampler(_BaseKernelDensitySampler):
"""
General-purpose sampler for dense continuous data, based on multivariate
kernel density estimation.
The limitation is that a single bandwidth value is used for all dimensions,
i.e. bandwith matrix is a positive scalar times the identity matrix.
It is a problem e.g. when features have different variances
(e.g. some of them are one-hot encoded and other are continuous).
"""
[docs]
def fit(self, X=None, y=None):
self.grid_, self.kde_ = self._fit_kde(self.kde, X)
self._set_sigma(self.kde_.bandwidth)
return self
[docs]
def sample_near(self, doc, n_samples=1):
# XXX: it doesn't sample only near the given document, it
# samples everywhere
doc = np.asarray(doc)
samples = self.kde_.sample(n_samples, random_state=self.rng_)
return samples, self._similarity(doc, samples)
[docs]
class UnivariateKernelDensitySampler(_BaseKernelDensitySampler):
"""
General-purpose sampler for dense continuous data, based on univariate
kernel density estimation. It estimates a separate probability
distribution for each input dimension.
The limitation is that variable interactions are not taken in account.
Unlike KernelDensitySampler it uses different bandwidths for different
dimensions; because of that it can handle one-hot encoded features somehow
(make sure to at least tune the default ``sigma`` parameter).
Also, at sampling time it replaces only random subsets
of the features instead of generating totally new examples.
"""
[docs]
def fit(self, X=None, y=None):
self.kdes_: list[KernelDensity] = []
self.grids_: list[GridSearchCV] = []
num_features = X.shape[-1]
for i in range(num_features):
grid, kde = self._fit_kde(self.kde, X[:, i].reshape(-1, 1))
self.grids_.append(grid)
self.kdes_.append(kde)
self._set_sigma(bandwidth=max(kde.bandwidth for kde in self.kdes_))
return self
[docs]
def sample_near(self, doc, n_samples=1):
"""
Sample near the document by replacing some of its features
with values sampled from distribution found by KDE.
"""
doc = np.asarray(doc)
num_features = len(self.kdes_)
sizes = self.rng_.randint(low=1, high=num_features + 1, size=n_samples)
samples = []
for size in sizes:
to_change = self.rng_.choice(num_features, size, replace=False)
new_doc = doc.copy()
for i in to_change:
kde = self.kdes_[i]
new_doc[i] = kde.sample(random_state=self.rng_).ravel()
samples.append(new_doc)
samples_array = np.asarray(samples)
return samples_array, self._similarity(doc, samples_array)
def _distances(doc, samples, metric):
doc = doc.reshape(1, -1)
return pairwise_distances(samples, doc, metric=metric).ravel()