Source code for eli5.lime.samplers

# -*- coding: utf-8 -*-
from __future__ import absolute_import
import abc
from functools import partial
from typing import List, Tuple, Any, Union, Dict, Optional
import six

import numpy as np
from scipy.stats import itemfreq
from sklearn.base import BaseEstimator, clone
from sklearn.neighbors import KernelDensity
from sklearn.metrics import pairwise_distances
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.utils import check_random_state

from eli5.utils import vstack
from eli5.lime.utils import rbf
from .textutils import generate_samples, DEFAULT_TOKEN_PATTERN, TokenizedText


[docs]@six.add_metaclass(abc.ABCMeta) class BaseSampler(BaseEstimator): """ Base sampler class. Sampler is an object which generates examples similar to a given example. """
[docs] @abc.abstractmethod def sample_near(self, doc, n_samples=1): """ Return (examples, similarity) tuple with generated documents similar to a given document and a vector of similarity values. """ raise NotImplementedError()
[docs] def fit(self, X=None, y=None): return self
[docs]class MaskingTextSampler(BaseSampler): """ Sampler for text data. It randomly removes or replaces tokens from text. Parameters ---------- token_pattern : str, optional Regexp for token matching bow : bool, optional Sampler could either replace all instances of a given token (bow=True, bag of words sampling) or replace just a single token (bow=False). random_state : integer or numpy.random.RandomState, optional random state replacement : str Defalt value is '' - by default tokens are removed. If you want to preserve the total token count set ``replacement`` to a non-empty string, e.g. 'UNKN'. min_replace : int or float A minimum number of tokens to replace. Default is 1, meaning 1 token. If this value is float in range [0.0, 1.0], it is used as a ratio. More than min_replace tokens could be replaced if group_size > 1. max_replace : int or float A maximum number of tokens to replace. Default is 1.0, meaning all tokens can be replaced. If this value is float in range [0.0, 0.1], it is used as a ratio. group_size : int When group_size > 1, groups of nearby tokens are replaced all in once (each token is still replaced with a replacement). Default is 1, meaning individual tokens are replaced. """ def __init__(self, token_pattern=None, # type: Optional[str] bow=True, # type: bool random_state=None, replacement='', # type: str min_replace=1, # type: Union[int, float] max_replace=1.0, # type: Union[int, float] group_size=1, # type: int ): # type: (...) -> None self.token_pattern = token_pattern or DEFAULT_TOKEN_PATTERN self.bow = bow self.random_state = random_state self.replacement = replacement self.min_replace = min_replace self.max_replace = max_replace self.group_size = group_size self.rng_ = check_random_state(self.random_state)
[docs] def sample_near(self, doc, n_samples=1): # type: (str, int) -> Tuple[List[str], np.ndarray] docs, similarities, mask, text = self.sample_near_with_mask( doc=doc, n_samples=n_samples ) return docs, similarities
[docs] def sample_near_with_mask(self, doc, # type: Union[TokenizedText, str] n_samples=1 # type: int ): # type: (...) -> Tuple[List[str], np.ndarray, np.ndarray, TokenizedText] if not isinstance(doc, TokenizedText): doc = TokenizedText(doc, token_pattern=self.token_pattern) gen_samples = partial(generate_samples, doc, n_samples=n_samples, replacement=self.replacement, min_replace=self.min_replace, max_replace=self.max_replace, group_size=self.group_size, random_state=self.rng_) docs, similarity, mask = gen_samples(bow=self.bow) return docs, similarity, mask, doc
[docs]class MaskingTextSamplers(BaseSampler): """ Union of MaskingText samplers, with weights. :meth:`sample_near` or :meth:`sample_near_with_mask` generate a requested number of samples using all samplers; a probability of using a sampler is proportional to its weight. All samplers must use the same token_pattern in order for :meth:`sample_near_with_mask` to work. Create it with a list of {param: value} dicts with :class:`MaskingTextSampler` paremeters. """ def __init__(self, sampler_params, # type: List[Dict[str, Any]] token_pattern=None, # type: Optional[str] random_state=None, weights=None, # type: Union[np.ndarray, List[float]] ): # type: (...) -> None self.random_state = random_state self.rng_ = check_random_state(random_state) self.token_pattern = token_pattern self.samplers = list(map(self._create_sampler, sampler_params)) if weights is None: self.weights = np.ones(len(self.samplers)) else: self.weights = np.array(weights) self.weights /= self.weights.sum() def _create_sampler(self, extra): # type: (Dict) -> MaskingTextSampler params = dict( token_pattern=self.token_pattern, random_state=self.rng_, ) # type: Dict[str, Any] params.update(extra) return MaskingTextSampler(**params)
[docs] def sample_near(self, doc, n_samples=1): # type: (str, int) -> Tuple[List[str], np.ndarray] assert n_samples >= 1 all_docs = [] # type: List[str] similarities = [] for sampler, freq in self._sampler_n_samples(n_samples): docs, sims = sampler.sample_near(doc, n_samples=freq) all_docs.extend(docs) similarities.append(sims) return all_docs, np.hstack(similarities)
[docs] def sample_near_with_mask(self, doc, # type: str n_samples=1 # type: int ): # type: (...) -> Tuple[List[str], np.ndarray, np.ndarray, TokenizedText] assert n_samples >= 1 assert self.token_pattern is not None text = TokenizedText(doc, token_pattern=self.token_pattern) all_docs = [] # type: List[str] similarities = [] masks = [] for sampler, freq in self._sampler_n_samples(n_samples): docs, sims, mask, _text = sampler.sample_near_with_mask(text, freq) all_docs.extend(docs) similarities.append(sims) masks.append(mask) return all_docs, np.hstack(similarities), vstack(masks), text
def _sampler_n_samples(self, n_samples): """ Return (sampler, n_samplers) tuples """ sampler_indices = self.rng_.choice(range(len(self.samplers)), size=n_samples, replace=True, p=self.weights) return [ (self.samplers[idx], freq) for idx, freq in itemfreq(sampler_indices) ]
_BANDWIDTHS = np.hstack([ [1e-6], # for discrete features np.logspace(-3, 4, 15) # general-purpose (0.001 ... 10000) range ]) class _BaseKernelDensitySampler(BaseSampler): def __init__(self, kde=None, metric='euclidean', fit_bandwidth=True, bandwidths=_BANDWIDTHS, sigma='bandwidth', n_jobs=1, random_state=None): if kde is None: kde = KernelDensity(rtol=1e-7, atol=1e-7) self.kde = kde self.fit_bandwidth = fit_bandwidth self.bandwidths = bandwidths self.metric = metric self.n_jobs = n_jobs if not isinstance(sigma, (int, float)): allowed = {'bandwidth'} if sigma not in allowed: raise ValueError("sigma must be either " "a number or one of {}".format(allowed)) self.sigma = sigma self.random_state = random_state self.rng_ = check_random_state(self.random_state) def _get_grid(self): param_grid = {'bandwidth': self.bandwidths} cv = KFold(n_splits=3, shuffle=True, random_state=self.rng_) return GridSearchCV(self.kde, param_grid=param_grid, n_jobs=self.n_jobs, cv=cv) def _fit_kde(self, kde, X): # type: (KernelDensity, np.ndarray) -> Tuple[GridSearchCV, KernelDensity] if self.fit_bandwidth: grid = self._get_grid() grid.fit(X) return grid, grid.best_estimator_ else: return None, clone(kde).fit(X) def _similarity(self, doc, samples): distance = _distances(doc, samples, metric=self.metric) return rbf(distance, sigma=self.sigma_) def _set_sigma(self, bandwidth): if self.sigma == 'bandwidth': # Sigma estimation using optimal bandwidth found by KDE. self.sigma_ = bandwidth else: self.sigma_ = self.sigma
[docs]class MultivariateKernelDensitySampler(_BaseKernelDensitySampler): """ General-purpose sampler for dense continuous data, based on multivariate kernel density estimation. The limitation is that a single bandwidth value is used for all dimensions, i.e. bandwith matrix is a positive scalar times the identity matrix. It is a problem e.g. when features have different variances (e.g. some of them are one-hot encoded and other are continuous). """
[docs] def fit(self, X, y=None): self.grid_, self.kde_ = self._fit_kde(self.kde, X) self._set_sigma(self.kde_.bandwidth) return self
[docs] def sample_near(self, doc, n_samples=1): # XXX: it doesn't sample only near the given document, it # samples everywhere doc = np.asarray(doc) samples = self.kde_.sample(n_samples, random_state=self.rng_) return samples, self._similarity(doc, samples)
[docs]class UnivariateKernelDensitySampler(_BaseKernelDensitySampler): """ General-purpose sampler for dense continuous data, based on univariate kernel density estimation. It estimates a separate probability distribution for each input dimension. The limitation is that variable interactions are not taken in account. Unlike KernelDensitySampler it uses different bandwidths for different dimensions; because of that it can handle one-hot encoded features somehow (make sure to at least tune the default ``sigma`` parameter). Also, at sampling time it replaces only random subsets of the features instead of generating totally new examples. """
[docs] def fit(self, X, y=None): self.kdes_ = [] # type: List[KernelDensity] self.grids_ = [] # type: List[GridSearchCV] num_features = X.shape[-1] for i in range(num_features): grid, kde = self._fit_kde(self.kde, X[:, i].reshape(-1, 1)) self.grids_.append(grid) self.kdes_.append(kde) self._set_sigma(bandwidth=max(kde.bandwidth for kde in self.kdes_)) return self
[docs] def sample_near(self, doc, n_samples=1): """ Sample near the document by replacing some of its features with values sampled from distribution found by KDE. """ doc = np.asarray(doc) num_features = len(self.kdes_) sizes = self.rng_.randint(low=1, high=num_features + 1, size=n_samples) samples = [] for size in sizes: to_change = self.rng_.choice(num_features, size, replace=False) new_doc = doc.copy() for i in to_change: kde = self.kdes_[i] new_doc[i] = kde.sample(random_state=self.rng_).ravel() samples.append(new_doc) samples = np.asarray(samples) return samples, self._similarity(doc, samples)
def _distances(doc, samples, metric): doc = doc.reshape(1, -1) return pairwise_distances(samples, doc, metric=metric).ravel()