"""Class to perform under-sampling using balace cascade."""
# Authors: Guillaume Lemaitre <g.lemaitre58@gmail.com>
# Christos Aridas
# License: MIT
import warnings
from collections import Counter
import numpy as np
from sklearn.base import ClassifierMixin
from sklearn.neighbors import KNeighborsClassifier
from sklearn.utils import check_random_state
from sklearn.externals.six import string_types
from sklearn.model_selection import cross_val_predict
from .base import BaseEnsembleSampler
from ..utils import check_ratio
[docs]class BalanceCascade(BaseEnsembleSampler):
"""Create an ensemble of balanced sets by iteratively under-sampling the
imbalanced dataset using an estimator.
This method iteratively select subset and make an ensemble of the
different sets. The selection is performed using a specific classifier.
Parameters
----------
ratio : str, dict, or callable, optional (default='auto')
Ratio to use for resampling the data set.
- If ``str``, has to be one of: (i) ``'minority'``: resample the
minority class; (ii) ``'majority'``: resample the majority class,
(iii) ``'not minority'``: resample all classes apart of the minority
class, (iv) ``'all'``: resample all classes, and (v) ``'auto'``:
correspond to ``'all'`` with for over-sampling methods and ``'not
minority'`` for under-sampling methods. The classes targeted will be
over-sampled or under-sampled to achieve an equal number of sample
with the majority or minority class.
- If ``dict``, the keys correspond to the targeted classes. The values
correspond to the desired number of samples.
- If callable, function taking ``y`` and returns a ``dict``. The keys
correspond to the targeted classes. The values correspond to the
desired number of samples.
return_indices : bool, optional (default=True)
Whether or not to return the indices of the samples randomly
selected from the majority class.
random_state : int, RandomState instance or None, optional (default=None)
If int, ``random_state`` is the seed used by the random number
generator; If ``RandomState`` instance, random_state is the random
number generator; If ``None``, the random number generator is the
``RandomState`` instance used by ``np.random``.
n_max_subset : int or None, optional (default=None)
Maximum number of subsets to generate. By default, all data from
the training will be selected that could lead to a large number of
subsets. We can probably deduce this number empirically.
classifier : str, optional (default=None)
The classifier that will be selected to confront the prediction
with the real labels. The choices are the following: ``'knn'``,
``'decision-tree'``, ``'random-forest'``, ``'adaboost'``,
``'gradient-boosting'``, and ``'linear-svm'``.
.. deprecated:: 0.2
``classifier`` is deprecated from 0.2 and will be replaced in 0.4.
Use ``estimator`` instead.
estimator : object, optional (default=KNeighborsClassifier())
An estimator inherited from :class:`sklearn.base.ClassifierMixin` and
having an attribute :func:`predict_proba`.
bootstrap : bool, optional (default=True)
Whether to bootstrap the data before each iteration.
**kwargs : keywords
The parameters associated with the classifier provided.
.. deprecated:: 0.2
``**kwargs`` has been deprecated from 0.2 and will be replaced in
0.4. Use ``estimator`` object instead to pass parameters associated
to an estimator.
Notes
-----
The method is described in [1]_.
Supports mutli-class resampling.
Examples
--------
>>> from collections import Counter
>>> from sklearn.datasets import make_classification
>>> from imblearn.ensemble import \
BalanceCascade # doctest: +NORMALIZE_WHITESPACE
>>> X, y = make_classification(n_classes=2, class_sep=2,
... weights=[0.1, 0.9], n_informative=3, n_redundant=1, flip_y=0,
... n_features=20, n_clusters_per_class=1, n_samples=1000, random_state=10)
>>> print('Original dataset shape {}'.format(Counter(y)))
Original dataset shape Counter({1: 900, 0: 100})
>>> bc = BalanceCascade(random_state=42)
>>> X_res, y_res = bc.fit_sample(X, y)
>>> print('Resampled dataset shape {}'.format(Counter(y_res[0]))) \
# doctest: +ELLIPSIS
Resampled dataset shape Counter({...})
References
----------
.. [1] X. Y. Liu, J. Wu and Z. H. Zhou, "Exploratory Undersampling for
Class-Imbalance Learning," in IEEE Transactions on Systems, Man, and
Cybernetics, Part B (Cybernetics), vol. 39, no. 2, pp. 539-550,
April 2009.
"""
[docs] def __init__(self,
ratio='auto',
return_indices=False,
random_state=None,
n_max_subset=None,
classifier=None,
estimator=None,
**kwargs):
super(BalanceCascade, self).__init__(ratio=ratio,
random_state=random_state)
self.return_indices = return_indices
self.classifier = classifier
self.estimator = estimator
self.n_max_subset = n_max_subset
self.kwargs = kwargs
[docs] def fit(self, X, y):
"""Find the classes statistics before to perform sampling.
Parameters
----------
X : ndarray, shape (n_samples, n_features)
Matrix containing the data which have to be sampled.
y : ndarray, shape (n_samples, )
Corresponding label for each sample in X.
Returns
-------
self : object,
Return self.
"""
super(BalanceCascade, self).fit(X, y)
self.ratio_ = check_ratio(self.ratio, y, 'under-sampling')
return self
def _validate_estimator(self):
"""Private function to create the classifier"""
if self.classifier is not None:
warnings.warn('`classifier` will be replaced in version'
' 0.4. Use a `estimator` instead.',
DeprecationWarning)
self.estimator = self.classifier
if (self.estimator is not None and
isinstance(self.estimator, ClassifierMixin) and
hasattr(self.estimator, 'predict')):
self.estimator_ = self.estimator
elif self.estimator is None:
self.estimator_ = KNeighborsClassifier()
# To be removed in 0.4
elif (self.estimator is not None and
isinstance(self.estimator, string_types)):
warnings.warn('`estimator` will be replaced in version'
' 0.4. Use a classifier object instead of a string.',
DeprecationWarning)
# Define the classifier to use
if self.estimator == 'knn':
self.estimator_ = KNeighborsClassifier(**self.kwargs)
elif self.estimator == 'decision-tree':
from sklearn.tree import DecisionTreeClassifier
self.estimator_ = DecisionTreeClassifier(
random_state=self.random_state, **self.kwargs)
elif self.estimator == 'random-forest':
from sklearn.ensemble import RandomForestClassifier
self.estimator_ = RandomForestClassifier(
random_state=self.random_state, **self.kwargs)
elif self.estimator == 'adaboost':
from sklearn.ensemble import AdaBoostClassifier
self.estimator_ = AdaBoostClassifier(
random_state=self.random_state, **self.kwargs)
elif self.estimator == 'gradient-boosting':
from sklearn.ensemble import GradientBoostingClassifier
self.estimator_ = GradientBoostingClassifier(
random_state=self.random_state, **self.kwargs)
elif self.estimator == 'linear-svm':
from sklearn.svm import LinearSVC
self.estimator_ = LinearSVC(
random_state=self.random_state, **self.kwargs)
else:
raise NotImplementedError
else:
raise ValueError('Invalid parameter `estimator`. Got {}.'.format(
type(self.estimator)))
self.logger.debug(self.estimator_)
def _sample(self, X, y):
"""Resample the dataset.
Parameters
----------
X : ndarray, shape (n_samples, n_features)
Matrix containing the data which have to be sampled.
y : ndarray, shape (n_samples, )
Corresponding label for each sample in X.
Returns
-------
X_resampled : ndarray, shape (n_subset, n_samples_new, n_features)
The array containing the resampled data.
y_resampled : ndarray, shape (n_subset, n_samples_new)
The corresponding label of `X_resampled`
idx_under : ndarray, shape (n_subset, n_samples, )
If `return_indices` is `True`, a boolean array will be returned
containing the which samples have been selected.
"""
self._validate_estimator()
random_state = check_random_state(self.random_state)
# array to know which samples are available to be taken
samples_mask = np.ones(y.shape, dtype=bool)
# where the different set will be stored
X_resampled = []
y_resampled = []
idx_under = []
n_subsets = 0
b_subset_search = True
while b_subset_search:
target_stats = Counter(y[samples_mask])
# build the data set to be classified
X_subset = np.empty((0, X.shape[1]), dtype=X.dtype)
y_subset = np.empty((0, ), dtype=y.dtype)
# store the index of the data to under-sample
index_under_sample = np.empty((0, ), dtype=y.dtype)
# value which will be picked at each round
X_constant = np.empty((0, X.shape[1]), dtype=X.dtype)
y_constant = np.empty((0, ), dtype=y.dtype)
index_constant = np.empty((0, ), dtype=y.dtype)
for target_class in target_stats.keys():
if target_class in self.ratio_.keys():
n_samples = self.ratio_[target_class]
# extract the data of interest for this round from the
# current class
index_class = np.flatnonzero(y == target_class)
index_class_interest = index_class[samples_mask[
y == target_class]]
X_class = X[index_class_interest]
y_class = y[index_class_interest]
# select randomly the desired features
index_target_class = random_state.choice(
range(y_class.size), size=n_samples, replace=False)
X_subset = np.concatenate((X_subset,
X_class[index_target_class]),
axis=0)
y_subset = np.concatenate((y_subset,
y_class[index_target_class]),
axis=0)
# index of the data
index_under_sample = np.concatenate(
(index_under_sample,
index_class_interest[index_target_class]),
axis=0)
else:
X_constant = np.concatenate((X_constant,
X[y == target_class]),
axis=0)
y_constant = np.concatenate((y_constant,
y[y == target_class]),
axis=0)
index_constant = np.concatenate(
(index_constant,
np.flatnonzero(y == target_class)),
axis=0)
# store the set created
n_subsets += 1
X_resampled.append(np.concatenate((X_subset, X_constant),
axis=0))
y_resampled.append(np.concatenate((y_subset, y_constant),
axis=0))
idx_under.append(np.concatenate((index_under_sample,
index_constant),
axis=0))
# fit and predict using cross validation
pred = cross_val_predict(self.estimator_,
np.concatenate((X_subset, X_constant),
axis=0),
np.concatenate((y_subset, y_constant),
axis=0))
# extract the prediction about the targeted classes only
pred_target = pred[:y_subset.size]
index_classified = index_under_sample[pred_target == y_subset]
samples_mask[index_classified] = False
# check the stopping criterion
if self.n_max_subset is not None:
if n_subsets == self.n_max_subset:
b_subset_search = False
# check that there is enough samples for another round
target_stats = Counter(y[samples_mask])
for target_class in self.ratio_.keys():
if target_stats[target_class] < self.ratio_[target_class]:
b_subset_search = False
if self.return_indices:
return (np.array(X_resampled), np.array(y_resampled),
np.array(idx_under))
else:
return np.array(X_resampled), np.array(y_resampled)