Source code for menelaus.change_detection.cusum

import numpy as np
from menelaus.detector import StreamingDetector


[docs]class CUSUM(StreamingDetector): """CUSUM is a method from the field of statistical process control. This detector tests for changes in the mean of a time series by calculating a moving average over recent observations. CUSUM can be used for tracking a single model performance metric, or could be applied to the mean of a feature variable of interest. Ref. :cite:t:`page1954continuous` """ input_type = "stream"
[docs] def __init__( self, target=None, sd_hat=None, burn_in=30, delta=0.005, threshold=5, direction=None, ): """ Args: target (float, optional): Known mean of stream (e.g. validation accuracy). If ``None``, will be inferred from observations in the burn-in window. Defaults to ``None``. sd_hat (float, optional): Known standard deviation of stream (e.g. SD of validation accuracy). If ``None``, will be inferred from observations in the burn-in window. Defaults to ``None``. burn_in (int, optional): Length of the burn-in period, during which time no alarms will sound. Also determines how many prior samples are used to calculate new estimates for mean and SD after drift occurs. Defaults to 30. delta (float, optional): The amount of "slack" in the CUSUM test statistic. Defaults to 0.005. threshold (int, optional): The threshold at which the CUSUM test statistic is evaluated against. Defaults to 50. direction (str, optional): * If ``'positive'``, drift is only considered when the stream drifts in the positive direction. * If ``'negative'``, drift is only considered when the stream drifts in the negative direction. * If ``None``, alarms to drift in either the positive or negative direction. Defaults to ``None``. """ super().__init__() self.target = target self.sd_hat = sd_hat self.burn_in = burn_in self.delta = delta self.threshold = threshold self.direction = direction self._upper_bound = [0] self._lower_bound = [0] self._stream = []
[docs] def reset(self): """Initialize the detector's drift state and other relevant attributes. Intended for use after ``drift_state == 'drift'``. """ # make last upper and lower bound = 0 super().reset() self._upper_bound = [0] self._lower_bound = [0]
[docs] def update(self, X, y_true=None, y_pred=None): """Update the detector with a new sample. Args: X (numpy.ndarray): one row of features from input data. y_true (numpy.ndarray): one true label from input data. Not used in CUSUM. y_pred (numpy.ndarray): one predicted label from input data. Not used in CUSUM. """ # if the last run resulted in drift, reset everything and use last 30 # obs to estimate stats if self.drift_state == "drift": self.target = np.mean(self._stream[-self.burn_in :]) self.sd_hat = np.std(self._stream[-self.burn_in :]) self.reset() X, _, _ = super()._validate_input(X, None, None) if len(X.shape) > 1 and X.shape[1] != 1: raise ValueError("CUSUM should only be used to monitor 1 variable.") super().update(X, None, None) self._stream.append(X) # cannot compute s_h/s_l so set to 0 if (self.target is None) & (self.samples_since_reset < self.burn_in): s_h = 0 s_l = 0 self._upper_bound.append(s_h) self._lower_bound.append(s_l) # derive mean and sd from first n points if they are not specified if (self.target is None) & (self.samples_since_reset == self.burn_in): self.target = np.mean(self._stream) self.sd_hat = np.std(self._stream) # if sd = 0 then no variance in stream and no drift -- raise error if (self.sd_hat == 0) & (self.samples_since_reset > self.burn_in): raise ValueError( """Standard deviation is 0. Confirm imput is a time series with more than 1 unique value.""" ) # find new upper and lower cusum stats if self.target is not None: s_h = max( 0, self._upper_bound[self.samples_since_reset - 1] + (self._stream[self.samples_since_reset - 1] - self.target) / self.sd_hat - self.delta, ) s_l = max( 0, self._lower_bound[self.samples_since_reset - 1] - self.delta - (self._stream[self.samples_since_reset - 1] - self.target) / self.sd_hat, ) self._upper_bound.append(s_h) self._lower_bound.append(s_l) # check alarm if past burn in if self.samples_since_reset > self.burn_in: if self.direction is None: if (self._upper_bound[self.samples_since_reset] > self.threshold) | ( self._lower_bound[self.samples_since_reset] > self.threshold ): self.drift_state = "drift" elif self.direction == "positive": if self._upper_bound[self.samples_since_reset] > self.threshold: self.drift_state = "drift" elif self.direction == "negative": if self._lower_bound[self.samples_since_reset] > self.threshold: self.drift_state = "drift"