import copy
import numpy as np
import pandas as pd
import scipy.stats
from menelaus.detector import BatchDetector
from scipy.spatial.distance import jensenshannon
[docs]class HistogramDensityMethod(BatchDetector):
"""
The Histogram Density Method (HDM) is the base class for both HDDDM and
CDBD. HDDDM differs from CDBD by relying upon the Hellinger distance measure
while CDBD uses KL divergence.
This method relies upon three statistics:
* Distance metric:
* Hellinger distance (default if called via HDDDM): the sum of the
normalized, squared differences in frequency counts for each bin
between reference and test datasets, averaged across all features.
* KL divergence (default if called via CDBD): Jensen-Shannon
distances, a symmetric and bounded measure based upon
Kullback-Leibler Divergence
* Optional user-defined distance metric
* Epsilon: the differences in Hellinger distances between sets of
reference and test batches.
* Beta: the adaptive threshold adapted at each time stamp. It is based
on the mean of Epsilon plus the scaled standard deviation of Epsilon.
The scale applied to the standard deviation is determined by the
``statistic`` parameter. It is either the number of standard
deviations deemed significant (``"stdev"``) or the t-statistic
(``"tstat"``).
HDM operates by:
#. Estimating density functions of reference and test data using
histograms. The number of bins in each histogram equals the square
root of the length of reference window. Bins are aligned by computing
the minimum and maximum value for each feature from both test and
reference window.
#. Computing the distance between reference and test distributions. In
HDDDM, the Hellinger distance is first calculated between each
feature in the reference and test batches. Then, the final Hellinger
statistic used is the average of each feature's distance. In CDBD,
the KL divergence metric is used to calculate the distance between
univariate histograms.
#. Computing Epsilon.
#. Computing the adaptive threshold Beta.
#. Comparing current Epsilon to Beta. If Epsilon > Beta, drift is
detected. The new reference batch is now the test batch on which
drift was detected. All statistics are reset. If Epsilon <= Beta,
drift is not detected. The reference batch is updated to include this
most recent test batch. All statistics are maintained.
Two key modifications were added to Ditzler and Polikar's presentation:
* For HDDDM, to answer the question of "where is drift occuring?", it
stores the distance values and Epsilon values for each feature. These
statistics can be used to identify and visualize the features
containing the most significant drifts.
* The Hellinger distance values are calculated for each feature in
the test batch. These values can be accessed when drift occurs
using the self.feature_info dictionary.
* The Epsilon values for each feature are stored, for each set of
reference and test batches. For each feature, these values
represent the difference in Hellinger distances within the test
and reference batch at time *t*, to the Hellinger distances within
the test and reference batch at time *t-1*. These can be acccessed
with each update call using the self.feature_epsilons variable.
They also can be accessed when drift occurs using the
self.feature_info dictionary.
* The original algorithm cannot detect drift until it is updated with
the third test batch after either a) initilization or b) reset upon
drift, because the threshold for drift detection is defined from the
*difference* Epsilon. To have sufficient values to define this
threshold, then, three batches are needed. The ``detect_batch``
parameter can be set such that bootstrapping is used to define this
threshold earlier than the third test batch.
* if ``detect_batch == 3``, HDM will operate as described in
:cite:t:`ditzler2011hellinger`.
* if ``detect_batch == 2``, HDM will detect drift on the second test
batch. On the second test batch, HDM uses bootstrapped samples
from the reference batch to estimate the mean and standard
deviation of Epsilon; this is used to calculate the necessary
threshold. On the third test batch, this value is removed from all
proceeding
* if ``detect_batch`` = 1, HDM will detect drift on the first test
batch. The initial reference batch is split randomly into two
halves. The first half will serve as the original reference batch.
The second half will serve as a proxy for the first test batch,
allowing us to calculate the distance statistic. When HDM is
updated with the first actual test batch, HDM will perform the
method for bootstrapping Epsilon, as described in the above bullet
for ``detect_batch`` = 2. This will allow a Beta threshold to be
calculated using the first test batch, allowing for detection of
drift on this batch.
Ref. :cite:t:`lindstrom2013drift` and :cite:t:`ditzler2011hellinger`
Attributes:
Epsilon (list): stores Epsilon values since the last drift detection.
reference_n (int): number of samples in reference batch.
total_epsilon (int): stores running sum of Epsilon values until drift is
detected, initialized to 0.
distances (dict): For each batch seen (key), stores the distance between
test and reference batch (value). Useful for visualizing drift
detection statistics.
epsilon_values (dict):For each batch seen (key), stores the Epsilon
value between the current and previous test and reference batches
(value). Useful for visualizing drift detection statistics. Does not
store the bootstrapped estimate of Epsilon, if used.
thresholds (dict): For each batch seen (key), stores the Beta thresholds
between test and reference batch (value). Useful for visualizing
drift detection statistics.
"""
input_type = "batch"
[docs] def __init__(
self,
divergence,
detect_batch,
statistic,
significance,
subsets,
):
"""
Args:
divergence (str or function): divergence measure used to compute
distance between histograms. Default is "H".
* "H" - Hellinger distance, original use is for HDDDM
* "KL" - Kullback-Leibler Divergence, original use is for CDBD
* User can pass in custom divergence function. Input is two
two-dimensional arrays containing univariate histogram
estimates of density, one for reference, one for test. It must
return the distance value between histograms. To be a valid
distance metric, it must satisfy the following properties:
non-negativity, identity, symmetry, and triangle inequality,
e.g. that in examples/cdbd_example.py or
examples/hdddm_example.py.
detect_batch (int): the test batch on which drift will be detected.
See class docstrings for more information on this modification.
Defaults to 1.
* if ``detect_batch == 1`` - HDM can detect drift on the first
test batch passed to the update method
* if ``detect_batch == 2`` - HDM can detect drift on the second
test batch passed to the update method
* if ``detect_batch == 3`` - HDM can detect drift on the third
test batch passed to the update method
statistic (str): statistical method used to compute adaptive
threshold. Defaults to ``"tstat"``.
* ``"tstat"`` - t-statistic with desired significance level and
degrees of freedom = 2 for hypothesis testing on two
populations
* ``"stdev"`` - uses number of standard deviations deemed
significant to compute threhsold
significance (float): statistical significance used to identify
adaptive threshold. Defaults to 0.05.
* if statistic = ``"tstat"`` - statistical significance of
t-statistic, e.g. .05 for 95% significance level
* if statistic = ``"stdev"`` - number of standard deviations of
change around the mean accepted
subsets (int): the number of subsets of reference data to take to
compute initial estimate of Epsilon.
* if too small - initial Epsilon value will be too small.
Increases risk of missing drift
* if too high - intial Epsilon value will be too large.
Increases risk of false alarms.
"""
super().__init__()
# Initialize parameters
self.detect_batch = detect_batch
self.statistic = statistic
self.significance = significance
self.subsets = subsets
if divergence == "H":
self.distance_function = self._hellinger_distance
elif divergence == "KL":
self.distance_function = self._KL_divergence
else:
self.distance_function = divergence
self._lambda = 0 # batch number on which last drift was detected.
# For visualizations
self.distances = {}
self.epsilon_values = {}
self.thresholds = {}
[docs] def set_reference(self, X, y_true=None, y_pred=None):
"""
Initialize detector with a reference batch. After drift, reference batch
is automatically set to most recent test batch. Option for user to
specify alternative reference batch using this method.
Args:
X (pandas.DataFrame): initial baseline dataset
y_true (numpy.array): true labels for dataset - not used by HDM
y_pred (numpy.array): predicted labels for dataset - not used by HDM
"""
X, _, _ = super()._validate_input(X, None, None)
X = pd.DataFrame(
X, columns=self._input_cols
) # TODO: subsequent operations expect dataframes, not numpy arrays
# Initialize attributes
self.reference = copy.deepcopy(X)
self.reset()
[docs] def update(self, X, y_true=None, y_pred=None):
"""
Update the detector with a new test batch. If drift is detected, new
reference batch becomes most recent test batch. If drift is not
detected, reference batch is updated to include most recent test batch.
Args:
X (DataFrame): next batch of data to detect drift on.
y_true (numpy.ndarray): true labels of next batch - not used in HDM
y_pred (numpy.ndarray): predicted labels of next batch - not used in HDM
"""
if self._drift_state == "drift":
self.reset()
X, _, _ = super()._validate_input(X, None, None)
X = pd.DataFrame(
X, columns=self._input_cols
) # TODO: subsequent operations expect dataframes, not numpy arrays
super().update(X, None, None)
test_n = X.shape[0]
# Estimate reference and test histograms
mins = []
maxes = []
for f in range(self._input_col_dim):
reference_variable = self.reference.iloc[:, f]
test_variable = X.iloc[:, f]
mins.append(np.concatenate((reference_variable, test_variable)).min())
maxes.append(np.concatenate((reference_variable, test_variable)).max())
self._reference_density = self._build_histograms(self.reference, mins, maxes)
test_density = self._build_histograms(X, mins, maxes)
# Divergence metric
total_distance = 0
feature_distances = []
for f in range(self._input_col_dim):
f_distance = self.distance_function(
self._reference_density[f], test_density[f]
)
total_distance += f_distance
feature_distances.append(f_distance)
self.current_distance = (1 / self._input_col_dim) * total_distance
self.distances[self.total_batches] = self.current_distance
# For each feature, calculate Epsilon, difference in distances
if self.total_batches > 1:
self.feature_epsilons = [
a_i - b_i
for a_i, b_i in zip(feature_distances, self._prev_feature_distances)
]
# Compute Epsilon and Beta
if self.batches_since_reset >= 2:
if self.batches_since_reset == 2 and self.detect_batch != 3:
initial_epsilon = self._estimate_initial_epsilon(
self.reference, self.subsets, mins, maxes
)
self.epsilon.append(initial_epsilon)
current_epsilon = abs(self.current_distance - self._prev_distance) * 1.0
self.epsilon.append(current_epsilon)
self.epsilon_values[self.total_batches] = current_epsilon
condition1 = bool(self.batches_since_reset >= 2 and self.detect_batch != 3)
condition2 = bool(self.batches_since_reset >= 3 and self.detect_batch == 3)
if condition1 or condition2:
self.beta = self._adaptive_threshold(self.statistic, test_n)
self.thresholds[self.total_batches] = self.beta
# Detect drift
if current_epsilon > self.beta:
# Feature information
if self._input_col_dim > 1:
self.feature_info = {
"Epsilons": self.feature_epsilons,
"Feature_Distances": feature_distances,
"Significant_drift_in_variable ": self.feature_epsilons.index(
max(self.feature_epsilons)
),
}
self._drift_state = "drift"
self.reference = X
self._lambda = self.total_batches
if self._drift_state != "drift":
self._prev_distance = self.current_distance
self._prev_feature_distances = feature_distances
self.reference = pd.concat([self.reference, X])
self.reference_n = self.reference.shape[0]
# number of bins for histogram, from reference batch
self._bins = int(np.floor(np.sqrt(self.reference_n)))
[docs] def reset(self):
"""
Initialize relevant attributes to original values, to ensure information
only stored from batches_since_reset (lambda) onwards. Intended for use
after ``drift_state == 'drift'``.
"""
super().reset()
if self.detect_batch == 1:
# The reference and test data will be (re-)concatenated by the later
# call to update(), since drift cannot be detected on the first
# batch, in this case.
test_proxy = self.reference.iloc[
int(len(self.reference) / 2) :,
]
self.reference = self.reference.iloc[
0 : int(len(self.reference) / 2),
]
self.reference_n = self.reference.shape[0]
self._bins = int(np.floor(np.sqrt(self.reference_n)))
self.epsilon = []
self.total_epsilon = 0
if self.detect_batch == 1:
self.update(test_proxy)
def _build_histograms(self, dataset, min_values, max_values):
"""
Computes histogram for each feature in dataset. Bins are equidistantly
spaced from minimum value to maximum value to ensure exact alignment of
bins between test and reference data sets.
Args:
dataset (DataFrame): DataFrame on which to estimate density using
histograms.
min_values (list): List of the minimum value for each feature.
max_values (list): List of the maximum value for each feature.
Returns:
List of histograms for each feature. Histograms stored as list of
frequency count of data in each bin.
"""
histograms = [
np.histogram(
dataset.iloc[:, f],
bins=self._bins,
range=(min_values[f], max_values[f]),
)[0]
for f in range(self._input_col_dim)
]
return histograms
def _hellinger_distance(self, reference_density, test_density):
"""
Computes Hellinger distance between reference and test histograms
Args:
reference_density (list): Univariate output of _build_histograms
from reference batch.
test_density (list): Univariate tput of _build_histograms from test
batch.
Returns:
Hellinger distance between univariate reference and test density
"""
f_distance = 0
r_length = sum(reference_density)
t_length = sum(test_density)
for b in range(self._bins):
f_distance += (
np.sqrt(test_density[b] / t_length)
- np.sqrt(reference_density[b] / r_length)
) ** 2
return np.sqrt(f_distance)
def _adaptive_threshold(self, stat, test_n):
"""
Computes adaptive threshold. If computing threshold for third test
batch, removes our estimate of initial Epsilon from future estimates of
epsilon_hat and std.
Args:
stat (string): Desired statistical method for computing threshold.
test_n (integer): Number of samples in test batch.
Returns:
Adaptive threshold Beta.
"""
if self.batches_since_reset == 3 and self.detect_batch != 3:
self.total_epsilon -= self.epsilon[0]
self.epsilon = self.epsilon[1:]
# update scale for denominator (t - lambda - 1), accounting for our initial Epsilon estimate
if self.batches_since_reset == 2 and self.detect_batch != 3:
d_scale = 1
else:
d_scale = self.total_batches - self._lambda - 1
# Increment running mean of epsilon from batches_since_reset (lambda) -> t-1
self.total_epsilon += self.epsilon[-2] # was -2 before total samples change...
epsilon_hat = (1 / d_scale) * self.total_epsilon
# Compute standard deviation for batches_since_reset (lambda) -> t-1
total_stdev = sum(
(self.epsilon[i] - epsilon_hat) ** 2 for i in range(len(self.epsilon) - 1)
)
stdev = np.sqrt(total_stdev / (d_scale))
if stat == "tstat":
t_stat = scipy.stats.t.ppf(
1 - (self.significance / 2), self.reference_n + test_n - 2
)
beta = epsilon_hat + t_stat * (stdev / np.sqrt(d_scale))
else:
beta = epsilon_hat + self.significance * stdev
return beta
def _estimate_initial_epsilon(
self, reference, num_subsets, histogram_mins, histogram_maxes
):
"""Computes a bootstrapped initial estimate of Epsilon on 2nd test
batch, allowing HDM to detect drift on the 2nd batch.
1. Subsets reference data with replacement
2. Computes distance between each subset.
3. Computes Epsilon: difference in distances.
4. Averages Epsilon estimates.
Args:
reference (DataFrame): DataFrame consists of reference batch and
first test batch.
num_subsets (int): desired number of subsets to be sampled from
reference data.
histogram_mins (list): List of minimum values for each feature align
histogram bins.
histogram_maxes (list): List of maximum values for each feature
align histogram bins.
Returns:
Bootstrapped estimate of intial Epsilon value.
"""
# Resampling data
bootstraps = []
size = int((1 - (1 / num_subsets)) * self.reference_n)
for i in range(num_subsets):
subset = reference.sample(n=size, replace=True)
bootstraps.append(
self._build_histograms(subset, histogram_mins, histogram_maxes)
)
# Distance between each subset
distances = []
for df_indx in range(len(bootstraps)):
j = df_indx + 1
while j < len(bootstraps):
subset1 = bootstraps[df_indx]
subset2 = bootstraps[j]
# Divergence metric
total_distance = 0
for f in range(self._input_col_dim):
f_distance = self.distance_function(subset1[f], subset2[f])
total_distance += f_distance
distances.append(total_distance)
j += 1
# Epsilons between each distance
epsilon = 0
for delta_indx in range(len(distances)):
j = delta_indx + 1
while j < len(distances):
epsilon += abs(distances[delta_indx] - distances[j]) * 1.0
j += 1
epsilon0 = epsilon / num_subsets
return epsilon0
def _KL_divergence(self, reference_density, test_density):
"""
Computes Jensen Shannon (JS) divergence between reference and test
histograms. JS is a bounded, symmetric form of KL divergence.
Args:
reference_density (list): Univariate output of _build_histograms
from reference batch.
test_density (list): Univariate output of _build_histograms from
test batch.
Returns:
JS divergence between univariate reference and test density
"""
return jensenshannon(reference_density, test_density)