Source code for menelaus.data_drift.pca_cd

import numpy as np
import pandas as pd
from scipy.spatial.distance import jensenshannon
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KernelDensity
from menelaus.detector import StreamingDetector
from menelaus.change_detection.page_hinkley import PageHinkley


[docs]class PCACD(StreamingDetector): """Principal Component Analysis Change Detection (PCA-CD) is a drift detection algorithm which checks for change in the distribution of the given data using one of several divergence metrics calculated on the data's principal components. First, principal components are built from the reference window - the initial ``window_size`` samples. New samples from the test window, of the same width, are projected onto these principal components. The divergence metric is calculated on these scores for the reference and test windows; if this metric diverges enough, then we consider drift to have occurred. This threshold is determined dynamically through the use of the Page-Hinkley test. Once drift is detected, the reference window is replaced with the current test window, and the test window is initialized. Ref. :cite:t:`qahtan2015pca` Attributes: step (int): how frequently (by number of samples), to detect drift. This is either 100 samples or ``sample_period * window_size``, whichever is smaller. ph_threshold (float): threshold parameter for the internal Page-Hinkley detector. Takes the value of ``.01 * window_size``. num_pcs (int): the number of principal components being used to meet the specified ``ev_threshold`` parameter. """ input_type = "streaming"
[docs] def __init__( self, window_size, ev_threshold=0.99, delta=0.1, divergence_metric="kl", sample_period=0.05, online_scaling=True, ): """ Args: window_size (int): size of the reference window. Note that ``PCA_CD`` will only try to detect drift periodically, either every 100 observations or 5% of the ``window_size``, whichever is smaller. ev_threshold (float, optional): Threshold for percent explained variance required when selecting number of principal components. Defaults to 0.99. delta (float, optional): Parameter for Page Hinkley test. Minimum amplitude of change in data needed to sound alarm. Defaults to 0.1. divergence_metric (str, optional): divergence metric when comparing the two distributions when detecting drift. Defaults to "kl". * "kl" - Jensen-Shannon distance, a symmetric bounded form of Kullback-Leibler divergence, uses kernel density estimation with Epanechnikov kernel * "intersection" - intersection area under the curves for the estimated density functions, uses histograms to estimate densities of windows. A discontinuous, less accurate estimate that should only be used when efficiency is of concern. sample_period (float, optional): how often to check for drift. This is 100 samples or ``sample_period * window_size``, whichever is smaller. Default .05, or 5% of the window size. online_scaling (bool, optional): whether to standardize the data as it comes in, using the reference window, before applying PCA. Defaults to ``True``. """ super().__init__() self.window_size = window_size self.ev_threshold = ev_threshold self.divergence_metric = divergence_metric self.sample_period = sample_period # Initialize parameters self.step = min(100, round(self.sample_period * window_size)) self.ph_threshold = round(0.01 * window_size) self.bins = int(np.floor(np.sqrt(self.window_size))) self.delta = delta self._drift_detection_monitor = PageHinkley( delta=self.delta, threshold=self.ph_threshold, burn_in=0 ) self.num_pcs = None self.online_scaling = online_scaling if self.online_scaling is True: self._reference_scaler = StandardScaler() self._build_reference_and_test = True self._reference_window = pd.DataFrame() self._test_window = pd.DataFrame() self._pca = None self._reference_pca_projection = pd.DataFrame() self._test_pca_projection = pd.DataFrame() self._density_reference = {} self._change_score = [0]
[docs] def update(self, X, y_true=None, y_pred=None): """Update the detector with a new observation. Args: X (numpy.ndarray): next observation y_true (numpy.ndarray): true label of observation - not used in PCACD y_pred (numpy.ndarray): predicted label of observation - not used in PCACD """ X, _, _ = super()._validate_input(X, None, None) super().update(X, None, None) if self._build_reference_and_test: if self.drift_state is not None: self._reference_window = self._test_window.copy() if self.online_scaling is True: # we'll need to refit the scaler. this occurs when both # reference and test windows are full, so, inverse_transform # first, here self._reference_window = pd.DataFrame( self._reference_scaler.inverse_transform(self._reference_window) ) self._test_window = pd.DataFrame() self.reset() self._drift_detection_monitor.reset() elif len(self._reference_window) < self.window_size: self._reference_window = pd.concat( [self._reference_window, pd.DataFrame(X)] ) elif len(self._test_window) < self.window_size: self._test_window = pd.concat([self._test_window, pd.DataFrame(X)]) if len(self._test_window) == self.window_size: self._build_reference_and_test = False # Fit Reference window onto PCs if self.online_scaling is True: self._reference_window = pd.DataFrame( self._reference_scaler.fit_transform(self._reference_window) ) self._test_window = pd.DataFrame( self._reference_scaler.transform(self._test_window) ) # Compute principal components self._pca = PCA(self.ev_threshold) self._pca.fit(self._reference_window) self.num_pcs = len(self._pca.components_) # Project reference window onto PCs self._reference_pca_projection = pd.DataFrame( self._pca.transform(self._reference_window), ) # Project test window onto PCs self._test_pca_projection = pd.DataFrame( self._pca.transform(self._test_window), ) # Compute reference distribution for i in range(self.num_pcs): if self.divergence_metric == "intersection": # Histograms need the same bin edges so find bounds from # both windows to inform range for reference and test self.lower = min( self._reference_pca_projection.iloc[:, i].min(), self._test_pca_projection.iloc[:, i].min(), ) self.upper = max( self._reference_pca_projection.iloc[:, i].max(), self._test_pca_projection.iloc[:, i].max(), ) self._density_reference[f"PC{i + 1}"] = self._build_histograms( self._reference_pca_projection.iloc[:, i], bins=self.bins, bin_range=(self.lower, self.upper), ) else: self._density_reference[f"PC{i + 1}"] = self._build_kde( self._reference_pca_projection.iloc[:, i] ) else: # Add new obs to test window if self.online_scaling is True: next_obs = pd.DataFrame(self._reference_scaler.transform(X)) self._test_window = pd.concat([self._test_window.iloc[1:, :], next_obs]) # Project new observation onto PCs next_proj = pd.DataFrame( self._pca.transform(np.array(next_obs).reshape(1, -1)), ) # Winsorize incoming data to align with reference and test histograms if self.divergence_metric == "intersection": for i in range(self.num_pcs): if next_proj.iloc[0, i] < self.lower: next_proj.iloc[0, i] = self.lower elif next_proj.iloc[0, i] > self.upper: next_proj.iloc[0, i] = self.upper # Add projection to test projection data self._test_pca_projection = pd.concat( [self._test_pca_projection.iloc[1:, :], next_proj] ) # Compute change score if (((self.total_samples - 1) % self.step) == 0) and ( (self.total_samples - 1) != 0 ): # Compute density distribution for test data self._density_test = {} for i in range(self.num_pcs): if self.divergence_metric == "intersection": self._density_test[f"PC{i + 1}"] = self._build_histograms( self._test_pca_projection.iloc[:, i], bins=self.bins, bin_range=(self.lower, self.upper), ) elif self.divergence_metric == "kl": self._density_test[f"PC{i + 1}"] = self._build_kde( self._test_pca_projection.iloc[:, i] ) # Compute current score change_scores = [] if self.divergence_metric == "kl": for i in range(self.num_pcs): change_scores.append( self._jensen_shannon_distance( self._density_reference[f"PC{i + 1}"], self._density_test[f"PC{i + 1}"], ) ) elif self.divergence_metric == "intersection": for i in range(self.num_pcs): change_scores.append( self._intersection_divergence( self._density_reference[f"PC{i + 1}"], self._density_test[f"PC{i + 1}"], ) ) change_score = max(change_scores) self._change_score.append(change_score) self._drift_detection_monitor.update(X=change_score) if self._drift_detection_monitor.drift_state is not None: self._build_reference_and_test = True self.drift_state = "drift"
[docs] def reset(self): """Initialize the detector's drift state and other relevant attributes. Intended for use after ``drift_state == 'drift'``. """ super().reset()
@classmethod def _build_kde(cls, sample): """Compute the Kernel Density Estimate for a given 1D data stream Args: sample: 1D data for which we desire to estimate its density function Returns: Dict with density estimates for each value and KDE object """ sample_length = len(sample) bandwidth = 1.06 * np.std(sample, ddof=1) * (sample_length ** (-1 / 5)) kde_object = KernelDensity(bandwidth=bandwidth, kernel="epanechnikov").fit( sample.values.reshape(-1, 1) ) # score_samples gives log-likelihood for each point, true density values # should be > 0 so exponentiate density = np.exp(kde_object.score_samples(sample.values.reshape(-1, 1))) return {"density": density, "object": kde_object} @staticmethod def _build_histograms(sample, bins, bin_range): """ Compute the histogram density estimates for a given 1D data stream. Density estimates consist of the value of the pdf in each bin, normalized s.t. integral over the entire range is 1 Args: sample: 1D array in which we desire to estimate its density function bins: number of bins for estimating histograms. Equal to sqrt of cardinality of ref window bin_range: (float, float) lower and upper bound of histogram bins Returns: Dict of bin edges and corresponding density values (normalized s.t. they sum to 1) """ density = np.histogram(sample, bins=bins, range=bin_range, density=True) return { "bin_edges": list(density[1]), "density": list(density[0] / np.sum(density[0])), } @classmethod def _jensen_shannon_distance(cls, density_reference, density_test): """Computes Jensen Shannon between two distributions Args: density_reference (dict): dictionary of density values and object from ref distribution density_test (dict): dictionary of density values and object from test distribution Returns: Change Score """ js = jensenshannon(density_reference["density"], density_test["density"]) return js @staticmethod def _intersection_divergence(density_reference, density_test): """ Computes Intersection Area similarity between two distributions using histogram density estimation method. A value of 0 means the distributions are identical, a value of 1 means they are completely different Args: density_reference (dict): dictionary of density values from reference distribution density_test (dict): dictionary of density values from test distribution Returns: Change score """ intersection = np.sum( np.minimum(density_reference["density"], density_test["density"]) ) divergence = 1 - intersection return divergence