Source code for menelaus.concept_drift.ddm

import numpy as np
from menelaus.detector import StreamingDetector


[docs]class DDM(StreamingDetector): """DDM is a drift detection algorithm which uses a binary classifier's error rate, which is binomially distributed. The minimum probability of an error and its standard deviation (``p_min``, ``s_min``) are found during training. If the running estimates for element i in the stream, probability (``p_i``) and its standard deviation (``s_i``), exceeds a certain threshold, then we assume that the distribution of the error rate is no longer stationary (drift has occurred). If ``p_i + s_i >= p_min + self.warning_scale * s_min`` the detector's state is set to ``"warning"``. If ``p_i + s_i >= p_min + self.drift_scale * s_min``, the detector's state is set to ``"drift"``. The index of the first sample which triggered a warning/drift state (relative to ``self.samples_since_reset``) is stored in ``self.retraining_recs``. Ref. :cite:t:`gama2004learning` """ input_type = "stream"
[docs] def __init__(self, n_threshold=30, warning_scale=2, drift_scale=3): """ Args: n_threshold (int, optional): the minimum number of samples required to test whether drift has occurred. Defaults to 30. warning_scale (int, optional): defines the threshold over which to enter the warning state. Defaults to 2. drift_scale (int, optional): defines the threshold over which to enter the drift state. Defaults to 3. """ super().__init__() self.n_threshold = n_threshold self.warning_scale = warning_scale self.drift_scale = drift_scale self._error_rate = 0 self._error_std = 0 self._error_rate_min = float("inf") self._error_std_min = float("inf") self._initialize_retraining_recs()
[docs] def reset(self): """Initialize the detector's drift state and other relevant attributes. Intended for use after ``drift_state == 'drift'``. """ super().reset() self._error_rate = 0 self._error_std = 0 self._error_rate_min = float("inf") self._error_std_min = float("inf") self._initialize_retraining_recs()
# XXX - Order of y_true, y_pred, X differs from abstractmethod signature # for update(). This is done for convenience, so users can call e.g. # DDM.update(1,1) without misinterpretation, but exposes them to a # potential issue where LFR.update(X, y, y) would assign arguments # incorrectly.
[docs] def update(self, y_true, y_pred, X=None): """Update the detector with a new sample. Args: y_true: one true label from input data. y_pred: one predicted label from input data. X: one row of features from input data. Not used in DDM. """ if self.drift_state == "drift": self.reset() _, y_true, y_pred = super()._validate_input(None, y_true, y_pred) super().update(None, y_true, y_pred) # the arrays should have a single element after validation. y_true, y_pred = y_true[0], y_pred[0] classifier_result = int(y_pred != y_true) # with each sample, update estimate of error and its std, along with minimums error_rate_prev = self._error_rate self._error_rate = ( self._error_rate + (classifier_result - self._error_rate) / self.samples_since_reset ) self._error_std = self._error_std + (classifier_result - self._error_rate) * ( classifier_result - error_rate_prev ) self._error_std = np.sqrt(self._error_std / self.samples_since_reset) # it's unclear whether the 'burn-in' period should be updating the # minimums - seems like a bad idea though. if self.samples_since_reset < self.n_threshold: return if ( self._error_rate + self._error_std <= self._error_rate_min + self._error_std_min ): self._error_rate_min = self._error_rate self._error_std_min = self._error_std if ( self._error_rate + self._error_std >= self._error_rate_min + self.drift_scale * self._error_std ): self.drift_state = "drift" elif ( self._error_rate + self._error_std >= self._error_rate_min + self.warning_scale * self._error_std ): self.drift_state = "warning" else: self.drift_state = None if self.drift_state is not None: self._increment_retraining_recs()
def _initialize_retraining_recs(self): """Sets ``self._retraining_recs`` to ``[None, None]``.""" self._retraining_recs = [None, None] def _increment_retraining_recs(self): """Set ``self._retraining_recs`` to the beginning and end of the current drift/warning region. """ if self.drift_state == "warning" and self._retraining_recs[0] is None: self._retraining_recs[0] = self.total_samples - 1 if self.drift_state == "drift": self._retraining_recs[1] = self.total_samples - 1 if self._retraining_recs[0] is None: self._retraining_recs[0] = self.total_samples - 1 @property def retraining_recs(self): """Indices of the first and last recommended training samples. A list of length 2, containing ``[warning index, drift index]``. If no warning occurs, this will instead be ``[drift index, drift index]``. The latter should cause caution, as it indicates an abrupt change. Resets when to ``[None, None]`` after drift is detected. Returns: list: the current retraining recommendations """ return self._retraining_recs