Source code for menelaus.change_detection.adwin

# most of these get called within loops (or an outer loop on the detector),
# so this is more efficient
from numpy import (
    power,
    log,
    sqrt,
    absolute,
    empty_like,
    zeros,
)

from menelaus.detector import StreamingDetector


[docs]class ADWIN(StreamingDetector): """ADWIN (ADaptive WINdowing) is a change detection algorithm which uses a sliding window to estimate the running mean and variance of a given real-valued number. As each sample is added, ADWIN stores a running estimate (mean and variance) for a given statistic, calculated over a sliding window which will grow to the right until drift is detected. The condition for drift is defined over pairs of subwindows at certain cutpoints within the current window. If, for any such pair, the difference between the running estimates of the statistic is over a certain threshold (controlled by delta), we identify drift, and remove the oldest elements of the window until all differences are again below the threshold. The running estimates in each subwindow are maintained by storing summaries of the elements in "buckets," which, in this implementation, are themselves stored in the ``bucket_row_list`` attribute, whose total size scales with the ``max_buckets`` parameter. When drift occurs, the index of the element at the beginning of ADWIN's new window is stored in ``self.retraining_recs``. Ref. :cite:t:`bifet2007learning` """
[docs] def __init__( self, delta=0.002, max_buckets=5, new_sample_thresh=32, window_size_thresh=10, subwindow_size_thresh=5, conservative_bound=False, ): """ Args: delta (float, optional): confidence value on on 0 to 1. ADWIN will incorrectly detect drift with at most probability ``delta``, and correctly detect drift with at least probability ``1 - delta``. Defaults to 0.002. max_buckets (int, optional): the maximum number of buckets to maintain in each BucketRow. Corresponds to the "M" parameter in Bifet 2006. Defaults to 5. new_sample_thresh (int, optional): the drift detection procedure will run every ``new_sample_thresh samples``, not in between. Defaults to 32. window_size_thresh (int, optional): the minimum number of samples in the window required to check for drift. Defaults to 10. subwindow_size_thresh (int, optional): the minimum number of samples in each subwindow reqired to check it for drift. Defaults to 5. conservative_bound (bool, optional): whether to assume a 'large enough' sample when constructing drift cutoff. Defaults to ``False``. Raises: ValueError: If ``ADWIN.delta`` is not on the range 0 to 1. """ super().__init__() self.delta = delta # Although sklearn's standard for estimators is that parameter # sanitization occurs with .fit, a detector might be instantiated and # then not use a given parameter for quite a while in a pipeline. May # need to re-examine this later if user expectations differ from # behavior. if not 0 <= self.delta <= 1: raise ValueError("ADWIN.delta must take values on the range 0 to 1.") self.max_buckets = max_buckets self.new_sample_thresh = new_sample_thresh self.window_size_thresh = window_size_thresh self.subwindow_size_thresh = subwindow_size_thresh self.conservative_bound = conservative_bound self._bucket_row_list = _BucketRowList(self.max_buckets) self._curr_total = ( 0 # this attribute is not scaled by the window size; use .mean ) self._curr_variance = ( 0 # this attribute is not scaled by the window size; use .variance ) self._window_size = 0 self._retraining_recs = [None, None]
[docs] def update(self, X, y_true=None, y_pred=None): """Update the detector with a new sample. Args: X: one row of features from input data. y_true: one true label from input data. Not used by ADWIN. y_pred: one predicted label from input data. Not used by ADWIN. """ if self.drift_state is not None: # note that the other attributes should *not* be initialized after drift self.reset() X, _, _ = super()._validate_input(X, None, None) if len(X.shape) > 1 and X.shape[1] != 1: raise ValueError("ADWIN should only be used to monitor 1 variable.") super().update(X, None, None) # the array should have a single element after validation. X = X[0][0] # add new sample to the head of the window self._window_size += 1 self._add_sample(X) self._shrink_window()
[docs] def reset(self): """Initialize the detector's drift state and other relevant attributes. Intended for use after ``drift_state == 'drift'``. """ super().reset() self._initialize_retraining_recs()
def _initialize_retraining_recs(self): """Sets ``self._retraining_recs`` to ``[None, None]``.""" self._retraining_recs = [None, None] @property def retraining_recs(self): """Recommended indices for retraining. If drift is detected, set to ``[beginning of ADWIN's new window, end of ADWIN's new window]``. If these are e.g. the 5th and 13th sample that ADWIN has been updated with, the values will be ``[4, 12]``. Returns: list: the current retraining recommendations """ return self._retraining_recs def _add_sample(self, new_value): """Make a new bucket containing a single new sample and add it to the ``BucketList``. Compress any ``BucketRows`` which have reached maximum size. Args: new_value: new value to be added to the ``BucketList``. """ # this new bucket should have only one member, so 0 variance. # compress_buckets will maintain proper estimates. self._bucket_row_list.head.add_bucket(new_value, 0) # update running estimates if self._window_size > 1: self._curr_variance += ( (self._window_size - 1) * (new_value - self._curr_total / (self._window_size - 1)) * (new_value - self._curr_total / (self._window_size - 1)) / self._window_size ) self._curr_total += new_value self._compress_buckets() def _compress_buckets(self): """Traverse ``ADWIN.bucket_row_list``, merging the oldest buckets when we find that a given ``BucketRow`` is full.""" curr_bucket_row = self._bucket_row_list.head list_position = 0 while curr_bucket_row is not None: # BucketRow is full if curr_bucket_row.bucket_count == self.max_buckets + 1: next_bucket_row = curr_bucket_row.next_bucket if next_bucket_row is None: self._bucket_row_list.append_tail() next_bucket_row = curr_bucket_row.next_bucket # take the first two buckets in this row and combine them the # number of elements stored in a bucket is 2^(position in the # list) n_elements = power(2, list_position) mean1 = curr_bucket_row.bucket_totals[0] / n_elements mean2 = curr_bucket_row.bucket_totals[1] / n_elements new_total = ( curr_bucket_row.bucket_totals[0] + curr_bucket_row.bucket_totals[1] ) # Chan, et. al variance calculation doi.org/10.1007/978-3-642-51461-6_3 new_variance = ( curr_bucket_row.bucket_variances[0] + curr_bucket_row.bucket_variances[1] + n_elements * (mean1 - mean2) * (mean1 - mean2) / 2 # equivalent: n_elements * n_elements * (mean1 - mean2) * \ # (mean1 - mean2) / (n_elements + n_elements) ) next_bucket_row.add_bucket(new_total, new_variance) # remove the compressed buckets from the current row curr_bucket_row.remove_buckets(2) # if the next bucket isn't full, neither can any of the remaining be if next_bucket_row.bucket_count <= self.max_buckets: break else: # similarly, if the first bucket isn't full, neither are any others break curr_bucket_row = curr_bucket_row.next_bucket list_position += 1 def _shrink_window(self): """Check whether all subwindows (the empty set vs. the whole window, then the oldest element vs all others, then the oldest two vs all others, then ...) satisfy the drift threshold. If not, reduce the window size and set the ``drift_state`` to ``"drift"``. """ if ( self.total_samples % self.new_sample_thresh == 0 and self._window_size > self.window_size_thresh ): # either we reduced the window and must restart to check the new # subwindows, or this is the initial loop start_from_empty_subwindow = True while start_from_empty_subwindow: start_from_empty_subwindow = False exit_shrink = False # window0 begins empty, window1 begins with the full set n_elements0, n_elements1 = 0, self._window_size total0, total1 = 0, self._curr_total # traverse the BucketRowList from tail to head curr_bucket_row = self._bucket_row_list.tail list_pos = ( self._bucket_row_list.size - 1 ) # note that list position begins at 0 # window0 adds oldest elements first: tail of the BucketRowList, # front of the BucketRow arrays while (not exit_shrink) and (curr_bucket_row is not None): n_increment = power(2, list_pos) for bucket_index in range(curr_bucket_row.bucket_count): n_elements0 += n_increment n_elements1 -= n_increment total0 += curr_bucket_row.bucket_totals[bucket_index] total1 -= curr_bucket_row.bucket_totals[bucket_index] # reached the youngest element before finding drift; # remember, the buckets have an extra element at the end # to avoid overflow before compression if (list_pos == 0) and ( bucket_index == curr_bucket_row.bucket_count - 1 ): exit_shrink = True break # check whether to drop elements if ( (n_elements0 >= self.subwindow_size_thresh) and (n_elements1 >= self.subwindow_size_thresh) and self._check_epsilon( n_elements0, total0, n_elements1, total1 ) ): start_from_empty_subwindow = True self.drift_state = "drift" if self._window_size > 0: n_elements0 -= self._remove_last() self._retraining_recs = ( self.total_samples - self._window_size, self.total_samples - 1, ) exit_shrink = True break curr_bucket_row = curr_bucket_row.prev_bucket list_pos -= 1 def _check_epsilon(self, n_elements0, total0, n_elements1, total1): """Calculate ``epsilon_cut`` given the size and totals of two windows (equation 3.1 from Bifet 2006). If the difference between the estimated mean of the two windows (defined by ``n_elements*``, ``total*``) is greater than the calculated threshold, it indicates that drift has occurred. Args: n_elements0 (int): number of elements in the first window total0 (float): running total for elements in the first window n_elements1 (int): number of elements in the second window total1 (float): running total for elements in the second window Returns: bool: whether or not the difference between the two windows' means exceeds the current threshold defined by ADWIN's variance estimate and the chosen delta value. """ window_diff = 1.0 * ((total0 / n_elements0) - (total1 / n_elements1)) variance = self.variance() n_elements = self._window_size # note that this is defined as its reciprocal in Bifet n_harmonic = 1 / (n_elements0 - self.subwindow_size_thresh + 1) + 1 / ( n_elements1 - self.subwindow_size_thresh + 1 ) if not self.conservative_bound: # form below is under normality assumption for 'large' window sizes delta_prime_den = log( 2 * log(n_elements) / self.delta ) # noted in Bifet as sufficient in practice vs. delta/n eps_cut = ( sqrt((2 * n_harmonic) * variance * delta_prime_den) + 1.0 * (2 / 3) * n_harmonic * delta_prime_den ) else: delta_prime_den = log(4 * log(n_elements) / self.delta) eps_cut = sqrt( (0.5 * n_harmonic) * delta_prime_den ) # for "totally rigorous performance guarantees" return absolute(window_diff) > eps_cut def _remove_last(self): """Drop the oldest bucket from the tail of ``bucket_row_list``. Returns: int: the number of elements removed after discarding the bucket """ curr_bucket_row = self._bucket_row_list.tail n_curr = power(2, (self._bucket_row_list.size - 1)) self._window_size -= n_curr self._curr_total -= curr_bucket_row.bucket_totals[0] mean_curr = curr_bucket_row.bucket_totals[0] / n_curr self._curr_variance -= curr_bucket_row.bucket_variances[ 0 ] + n_curr * self._window_size * ( mean_curr - self._curr_total / self._window_size ) * ( mean_curr - self._curr_total / self._window_size ) / ( n_curr + self._window_size ) curr_bucket_row.remove_buckets(1) if curr_bucket_row.bucket_count == 0: self._bucket_row_list.remove_tail() return n_curr
[docs] def mean(self): """ Returns: float: the estimated average of the passed stream, using the current window """ if self._window_size == 0: out = 0 else: out = self._curr_total / self._window_size return out
[docs] def variance(self): """ Returns: float: the estimated variance of the passed stream, using the current window """ if self._window_size == 0: out = 0 else: out = self._curr_variance / self._window_size return out
class _BucketRowList: """Doubly-linked list for use by ADWIN. ``max_buckets`` corresponds to the "M" parameter from Bifet 2006. At each update step, if the ``BucketRows`` are at overflow, their oldest buckets will be moved into the next largest ``BucketRow`` by ``ADWIN._compress_buckets``. So, the tail of the ``bucket_row_list`` will be the estimates corresponding to the oldest elements. Note that each ``BucketRow`` only stores estimates related to 2^i elements in each bucket: so, each position in ``BucketRowList.head``'s arrays corresponds to 2^0 = 1 elements; those for the next correspond to 2^1 = 2 elements; etc. """ def __init__(self, max_buckets): super().__init__() self.max_buckets = max_buckets self.head = None self.tail = None self.size = 0 self.append_head() def append_head(self): """Add an empty ``BucketRow`` to the head of the list.""" new_head = _BucketRow(self.max_buckets, next_bucket=self.head) if self.head is not None: self.head.prev_bucket = new_head self.head = new_head if self.tail is None: self.tail = self.head self.size += 1 def append_tail(self): """Add an empty ``BucketRow`` to the tail of the list.""" self.tail = _BucketRow(self.max_buckets, prev_bucket=self.tail) if self.head is None: # somehow we've removed all the other buckets self.head = self.tail self.size += 1 def remove_tail(self): """Remove the last BucketRow from the tail of the BucketRowList.""" self.tail = self.tail.prev_bucket if self.tail is None: self.head = None else: self.tail.next_bucket = None self.size -= 1 class _BucketRow: """Helper class for ADWIN. A given BucketRow is a single node in ADWIN's bucket_row_list doubly linked list. It stores all the buckets of a particular size in reverse order: the oldest bucket is at index 0. """ def __init__(self, max_buckets, prev_bucket=None, next_bucket=None): """ Args: max_buckets (int): maximum allowed buckets in this row prev_bucket (_BucketRow, optional): prior BucketRow in the BucketRowList. Defaults to None. next_bucket (_BucketRow, optional): next BucketRow in the BucketRowList. Defaults to None. Attributes: bucket_count (int): the number of buckets currently in this row bucket_totals (numpy.array): estimated total for each bucket bucket_variances (numpy.array): estimated variance for each bucket """ super().__init__() self.bucket_count = 0 self.max_buckets = max_buckets self.bucket_totals = zeros(self.max_buckets + 1, dtype=float) self.bucket_variances = zeros(self.max_buckets + 1, dtype=float) # maintain the linked list self.prev_bucket = prev_bucket self.next_bucket = next_bucket if next_bucket is not None: next_bucket.prev_bucket = self if self.prev_bucket is not None: prev_bucket.next_bucket = self def add_bucket(self, total, variance): """Add a new bucket to the end of this BucketRow with total and variance. Args: total: variance: """ self.bucket_totals[self.bucket_count] = total self.bucket_variances[self.bucket_count] = variance self.bucket_count += 1 def remove_buckets(self, num_buckets): """Remove num_buckets (oldest) buckets from the front of the rows's array. Args: num_buckets: the number of buckets to remove from the front of the row """ self.bucket_totals = self.shift(self.bucket_totals, num_buckets) self.bucket_variances = self.shift(self.bucket_variances, num_buckets) self.bucket_count -= num_buckets @staticmethod def shift(arr, num, fill_value=0): """Shift arr toward the 0 index by num indices; the last num indices will be filled with fill_value. Args: arr: the array whose elements should be shifted num: The number of indices to shift the array forward fill_value: The fill value for the now-empty indices, default 0 Returns: """ num = num * -1 result = empty_like(arr) result[num:] = fill_value result[:num] = arr[-num:] return result