import numpy as np
import scipy.stats
from menelaus.detector import StreamingDetector
[docs]class STEPD(StreamingDetector):
"""STEPD is a drift detection algorithm based on a binary classifier's
accuracy, intended for an online classifier.
Two windows are defined -- "recent" and "past", with corresponding
accuracies ``p_r`` and ```p_p```. Roughly, the distribution of their absolute
difference, normalized by the accuracy of the two windows combined, T, is
normally distributed. So, this test statistic's p-value P(T) defines the
warning and drift regions.
If ``p_r`` < ``p_p`` (the classifier's accuracy on recent samples is decreased):
* and P(T) < ``alpha_warning``, the detector's state is set to ``"warning"``.
* and P(T) < ``alpha_drift``, the detector's state is set to ``"drift"``.
The index of the first sample which triggered a warning/drift state
(relative to ``self.samples_since_reset``) is stored in ``self._retraining_recs``,
for retraining the classifier when drift occurs.
STEPD is intended for use with an online classifier, which is trained on
every new sample. That is, with each new sample, the question is not whether
the classifier will be retrained; it's whether some part of the previous
training data should be excluded during retraining. The implementation
depends on whether the classifier involved is able to incrementally retrain
using only a single data point vs. being required to retrain on the entire
set.
Ref. :cite:t:`nishida2007detecting`
"""
input_type = "stream"
[docs] def __init__(self, window_size=30, alpha_warning=0.05, alpha_drift=0.003):
"""
Args:
window_size (int, optional): the size of the "recent" window.
Defaults to 30.
alpha_warning (float, optional): defines the threshold over which to
enter the warning state. Defaults to 0.05.
alpha_drift (float, optional): defines the threshold over which to
enter the drift state. Defaults to 0.003.
"""
super().__init__()
self.window_size = window_size
self.alpha_warning = alpha_warning
self.alpha_drift = alpha_drift
self._s, self._r = 0, 0
self._window = []
self._test_statistic = None
self._test_p = None
self._initialize_retraining_recs()
[docs] def reset(self):
"""Initialize the detector's drift state and other relevant attributes.
Intended for use after ``drift_state == 'drift'``.
"""
super().reset()
self._s, self._r = 0, 0
self._window = []
self._test_statistic = None
self._test_p = None
self._initialize_retraining_recs()
[docs] def update(self, y_true, y_pred, X=None):
"""Update the detector with a new sample.
Args:
y_true: one true label from input data
y_pred: one predicted label from input data
X: one row of features from input data. Not used by STEPD.
"""
if self.drift_state == "drift":
self.reset()
_, y_true, y_pred = super()._validate_input(None, y_true, y_pred)
super().update(None, y_true, y_pred)
# the arrays should have a single element after validation.
y_true, y_pred = y_true[0], y_pred[0]
classifier_result = int(y_pred == y_true)
self._s += classifier_result
# update r and s and trim the window
self._window.append(classifier_result)
if len(self._window) > self.window_size:
# points that are larger than the buffer are removed, and only
# tracked through r and n
self._s -= self._window[0]
self._r += self._window[0]
self._window = self._window[1:]
if self.samples_since_reset >= 2 * self.window_size:
recent_accuracy = self.recent_accuracy()
past_accuracy = self.past_accuracy()
overall_accuracy = self.overall_accuracy()
self._test_statistic = (
np.absolute(past_accuracy - recent_accuracy)
- 0.5
* (
(1 / (self.samples_since_reset - self.window_size))
+ (1 / self.window_size)
)
) / np.sqrt(
overall_accuracy
* (1 - overall_accuracy)
* (
(1 / (self.samples_since_reset - self.window_size))
+ (1 / self.window_size)
)
)
self._test_p = 1 - scipy.stats.norm.cdf(
self._test_statistic, 0, 1
) # one-sided test
accuracy_decreased = past_accuracy > recent_accuracy
if accuracy_decreased and self._test_p < self.alpha_drift:
self.drift_state = "drift"
elif accuracy_decreased and self._test_p < self.alpha_warning:
self.drift_state = "warning"
else:
self.drift_state = None
self._initialize_retraining_recs()
if self.drift_state is not None:
self._increment_retraining_recs()
[docs] def recent_accuracy(self):
"""
Returns:
float: the accuracy of the classifier among the last
``self.window_size`` samples the detector has seen
"""
if len(self._window) == 0:
out = 0
else:
out = self._s / len(self._window)
return out
[docs] def past_accuracy(self):
"""
Returns:
float: the accuracy of the classifier among the samples the detector
has seen before its current window, but after the last time the
detector was reset
"""
if (self.samples_since_reset - len(self._window)) == 0:
out = 0
else:
out = self._r / (self.samples_since_reset - len(self._window))
return out
[docs] def overall_accuracy(self):
"""
Returns:
float: the accuracy of the classifier among the samples the detector
has seen since the detector was last reset
"""
if self.samples_since_reset == 0:
out = 0
else:
out = (self._r + self._s) / (self.samples_since_reset)
return out
def _initialize_retraining_recs(self):
"""Sets `self._retraining_recs` to ``[None, None]``."""
self._retraining_recs = np.array([None, None])
def _increment_retraining_recs(self):
"""Set ``self._retraining_recs`` to the beginning and end of the current
drift/warning region.
"""
if self._retraining_recs[0] is None:
self._retraining_recs[0], self._retraining_recs[1] = (
self.total_samples - 1,
self.total_samples - 1,
)
else:
self._retraining_recs[1] += 1
@property
def retraining_recs(self):
"""
Returns:
list: the current retraining recommendations
"""
return self._retraining_recs