Source code for menelaus.ensemble.ensemble

from collections import defaultdict

from menelaus.detector import BatchDetector, StreamingDetector


#############
# Ensembles
#############


[docs]class Ensemble: """ Parent class for Ensemble detectors. Does not inherit from any detector parent class, but has similar functions ``set_reference``, ``update``, ``reset``. Can also evaluate the results from all detectors per some voting scheme. Any class hoping to implement ensemble functionality should implement from this. """
[docs] def __init__(self, detectors: dict, election, column_selectors: dict = {}): # XXX - Since rigid type-checking is sort of discouraged in Python # it makes the most sense to just treat election as (always) # a function operating on detectors. self.detectors = detectors.copy() self.election = election def default_column_selector(): return lambda data: data self.column_selectors = defaultdict(default_column_selector) self.column_selectors.update(column_selectors)
[docs] def update(self, X, y_true=None, y_pred=None): """ Update each detector in ensemble with new batch of data. Calls self.evaluate() at the end, to determine voting result. Args: X (numpy.ndarray): input data y_true (numpy.ndarray): if applicable, true labels of input data y_pred (numpy.ndarray): if applicable, predicted labels of input data """ for det_key in self.detectors: # XXX - Cannot re-define X = constrain(), else external reference is modified # Need to see why this is happening and where to put e.g. a copy() stmt. X_selected = self.column_selectors[det_key](X) self.detectors[det_key].update(X=X_selected, y_true=y_true, y_pred=y_pred) det_list = list(self.detectors.values()) self.drift_state = self.election(det_list)
[docs] def reset(self): """ Initialize each detector's drift state and other relevant attributes. Intended for use after ``drift_state == 'drift'``. """ for det_key in self.detectors: self.detectors[det_key].reset()
[docs]class StreamingEnsemble(StreamingDetector, Ensemble): """ Implements Ensemble class for streaming drift detectors. Inherits from ``Ensemble`` and ``StreamingDetector`` (i.e., ``StreamingEnsemble`` IS-A ``StreamingDetector``). As such it has the functions of a regular detector: ``update``, ``reset``, etc. Internally, these operate not only on the ensemble's own attributes, but on the set of detectors given to it. """
[docs] def __init__(self, detectors: dict, election, column_selectors: dict = {}): """ Args: detectors (dict): Dictionary of detectors in ensemble, where the key is some unique identifier for a detector, and the value is the initialized detector object. For instance, ``{'a': ADWIN()}``. election (str): Initialized ``Election`` object for ensemble to evaluate drift among constituent detectors. See implemented election schemes in ``menelaus.ensemble``. columns_selectors (dict, optional): Functions to use for each detector. Functions should take data instance X and return the columns of X that the corresponding detector should operate on. Should match format of ``detectors`` i.e. ``{'a': ADWIN()}`` would need an entry ``{'a': function}`` to use this feature. By default, no column selection function is applied to any detector, and they will all use the entirely of the attributes in X. """ StreamingDetector.__init__(self) Ensemble.__init__(self, detectors, election, column_selectors)
[docs] def update(self, X, y_true, y_pred): """ Update ensemble itself, and each constituent detector with new data. Calls ``Ensemble.update`` and ``StreamingDetector.update`` to do so. Args: X (numpy.ndarray): input data y_true (numpy.ndarray): if applicable, true labels of input data y_pred (numpy.ndarray): if applicable, predicted labels of input data """ Ensemble.update(self, X=X, y_true=y_true, y_pred=y_pred) StreamingDetector.update(self, X=X, y_true=y_true, y_pred=y_pred)
[docs] def reset(self): """ Reset ensemble itself, and each constituent detector's drift state and other relevant attributes. Intended for use after ``drift_state == 'drift'``. Calls ``Ensemble.reset`` and ``StreamingDetector.reset`` to do so. """ Ensemble.reset(self) StreamingDetector.reset(self)
[docs]class BatchEnsemble(BatchDetector, Ensemble): """ Implements ``Ensemble`` class for batch-based drift detectors. Inherits from ``Ensemble`` and ``BatchDetector`` (i.e., ``BatchEnsemble`` IS-A ``BatchDetector``). As such it has the functions of a regular detector, ``set_reference``, ``update``, and ``reset``. These functions will operate not only on the ensemble's own attributes, but on the set of detectors given to it. """
[docs] def __init__(self, detectors: dict, election, column_selectors: dict = {}): """ Args: detectors (dict): Dictionary of detectors in ensemble, where the key is some unique identifier for a detector, and the value is the initialized detector object. For instance, ``{'p': PCA_CD()}``. election (str): Initialized ``Election`` object for ensemble to evaluate drift among constituent detectors. See implemented election schemes in ``menelaus.ensemble``. columns_selectors (dict, optional): Table of functions to use for each detector. Functions should take data instance X and return the columns of X that the corresponding detector should operate on. Should match format of ``detectors`` i.e. ``{'p': PCA_CD()}`` would need an entry ``{'a': function}`` to use this feature. By default, no column selection function is applied to any detector, and they will all use the entirely of the attributes in X. """ BatchDetector.__init__(self) Ensemble.__init__(self, detectors, election, column_selectors)
[docs] def update(self, X, y_true=None, y_pred=None): """ Update ensemble itself, and each constituent detector with new data. Calls ``Ensemble.update`` and ``BatchDetector.update`` to do so. Args: X (numpy.ndarray): input data y_true (numpy.ndarray): if applicable, true labels of input data y_pred (numpy.ndarray): if applicable, predicted labels of input data """ Ensemble.update(self, X=X, y_true=y_true, y_pred=y_pred) BatchDetector.update(self, X=X, y_true=y_true, y_pred=y_pred)
[docs] def reset(self): """ Reset ensemble itself, and each constituent detector's drift state and other relevant attributes. Intended for use after ``drift_state == 'drift'``. Calls ``Ensemble.reset`` and ``BatchDetector.reset`` to do so. """ Ensemble.reset(self) BatchDetector.reset(self)
[docs] def set_reference(self, X, y_true=None, y_pred=None): """ Initialize ensemble itself, and each constituent detector with a reference batch. Calls ``Ensemble.set_reference`` to do so. Args: X (pandas.DataFrame or numpy.array): baseline dataset y_true (numpy.array): actual labels of dataset y_pred (numpy.array): predicted labels of dataset """ for det_key in self.detectors: # XXX - Cannot re-define X = constrain(), else external reference is modified # Need to see why this is happening and where to put e.g. a copy() stmt. X_selected = self.column_selectors[det_key](X) self.detectors[det_key].set_reference( X=X_selected, y_true=y_true, y_pred=y_pred )