from abc import ABC, abstractmethod
from pandas import DataFrame
import numpy as np
import copy
[docs]class StreamingDetector(ABC):
"""
Abstract base class for all streaming data-based detectors.
Minimally implements abstract methods common to all stream
based detection algorithms.
"""
[docs] def __init__(self, *args, **kwargs):
self._total_samples = 0
self._samples_since_reset = 0
self._drift_state = None
self._input_cols = None
self._input_col_dim = None
[docs] @abstractmethod
def update(self, X, y_true, y_pred):
"""
Update detector with new sample (data point).
Args:
X (numpy.ndarray): if applicable, one row of features from input data
y_true (numpy.ndarray): if applicable, one true label from input data
y_pred (numpy.ndarray): if applicable, one predicted label from input data
"""
self.total_samples += 1
self.samples_since_reset += 1
[docs] @abstractmethod
def reset(self, *args, **kwargs):
"""
Initialize the detector's drift state and other relevant attributes.
Intended for use after ``drift_state == "drift"``.
"""
self.samples_since_reset = 0
self.drift_state = None
def _validate_X(self, X):
"""Validate that the input only contains one observation, and that its
dimensions/column names match earlier input. If there is no
earlier input, store the dimension/column names.
Args:
X (array-like or numeric): One row from input features.
Raises:
ValueError: if a dataframe has ever been passed, raised if X's
column names don't match
ValueError: if an array has ever been passed, raised if X's number
of columns don't match
ValueError: raised if X contains more than one observation after coercion
"""
if isinstance(X, DataFrame):
# The first update with a dataframe will constrain subsequent input.
if self._input_cols is None:
self._input_cols = X.columns
self._input_col_dim = len(self._input_cols)
elif self._input_cols is not None:
if not X.columns.equals(self._input_cols):
raise ValueError(
"Columns of new data must match with columns of prior data."
)
ary = X.values
else:
ary = copy.copy(X)
ary = np.array(ary)
if len(ary.shape) <= 1:
# only one sample should be passed, so coerce column vectors (e.g. pd.Series) to rows
ary = ary.reshape(1, -1)
if self._input_col_dim is None:
# This allows starting with a dataframe, then later passing bare
# numpy arrays. For now, assume users are not miscreants.
self._input_col_dim = ary.shape[1]
elif self._input_col_dim is not None:
if ary.shape[1] != self._input_col_dim:
raise ValueError(
"Column-dimension of new data must match prior data."
)
if ary.shape[0] != 1:
raise ValueError(
"Input for streaming detectors should contain only one observation."
)
return ary
def _validate_y(self, y):
"""Validate that input contains only one observation.
Args:
y (numeric): the current value for `y_true` or `y_pred`, given to
`update`.
Raises:
ValueError: raised if more than one observation is passed.
"""
ary = np.array(y).ravel()
if ary.shape != (1,):
raise ValueError(
"Input for streaming detectors should contain only one observation."
)
return ary
def _validate_input(self, X, y_true, y_pred):
"""Helper method for `update`. Validates whether the input is appropriate
for a streaming detector. Errors will be raised if the input is more
than one observation, or if X's dimensions don't match prior input.
Args:
X (numpy.ndarray): input data
y_true (numpy.ndarray): if applicable, one true label from input data
y_pred (numpy.ndarray): if applicable, one predicted label from input data
"""
if X is not None:
X = self._validate_X(X)
if y_true is not None:
y_true = self._validate_y(y_true)
if y_pred is not None:
y_pred = self._validate_y(y_pred)
return X, y_true, y_pred
@property
def total_samples(self):
"""Total number of samples the drift detector has been updated with.
Returns:
int
"""
return self._total_samples
@total_samples.setter
def total_samples(self, value):
self._total_samples = value
@property
def samples_since_reset(self):
"""Number of samples since last drift detection.
Returns:
int
"""
return self._samples_since_reset
@samples_since_reset.setter
def samples_since_reset(self, value):
self._samples_since_reset = value
@property
def drift_state(self):
"""Set detector's drift state to ``"drift"``, ``"warning"``, or ``None``."""
return self._drift_state
@drift_state.setter
def drift_state(self, value):
"""Set detector's drift state to ``"drift"``, ``"warning"``, or ``None``.
Args:
value (str): ``"drift"``, ``"warning"``, or ``None``
Raises:
ValueError: raised if disallowed value is given
"""
if value not in ("drift", "warning", None):
raise ValueError("tbd")
else:
self._drift_state = value
[docs]class BatchDetector(ABC):
"""
Abstract base class for all batch data-based detectors.
Minimally implements abstract methods common to all batch
based detection algorithms.
"""
[docs] def __init__(self, *args, **kwargs):
self._total_batches = 0
self._batches_since_reset = 0
self._drift_state = None
self._input_cols = None
self._input_col_dim = None
[docs] @abstractmethod
def update(self, X, y_true, y_pred):
"""
Update detector with new batch of data
Args:
X (numpy.ndarray): input data
y_true (numpy.ndarray): if applicable, true labels of input data
y_pred (numpy.ndarray): if applicable, predicted labels of input data
"""
self.total_batches += 1
self.batches_since_reset += 1
[docs] @abstractmethod
def set_reference(self, X, y_true, y_pred):
"""
Initialize detector with a reference batch.
Args:
X (pandas.DataFrame or numpy.array): baseline dataset
y_true (numpy.array): actual labels of dataset
y_pred (numpy.array): predicted labels of dataset
"""
raise NotImplementedError
[docs] @abstractmethod
def reset(self, *args, **kwargs):
"""
Initialize the detector's drift state and other relevant attributes.
Intended for use after ``drift_state == 'drift'``.
"""
self.batches_since_reset = 0
self.drift_state = None
def _validate_X(self, X):
"""Validate that the input only contains one observation, and that its
dimensions/column names match earlier input. If there is no
earlier input, store the dimension/column names.
Args:
X (array-like or numeric): Input features.
Raises:
ValueError: if a dataframe has ever been passed, raised if X's
column names don't match
ValueError: if an array has ever been passed, raised if X's number
of columns don't match
ValueError: if only one sample has been passed
"""
if isinstance(X, DataFrame):
# The first update with a dataframe will constrain subsequent input.
if self._input_cols is None:
self._input_cols = X.columns
self._input_col_dim = len(self._input_cols)
elif self._input_cols is not None:
if not X.columns.equals(self._input_cols):
raise ValueError(
"Columns of new data must match with columns of prior data."
)
ary = X.values
else:
ary = copy.copy(X)
ary = np.array(ary)
if len(ary.shape) <= 1:
# Batch size of 1 will break downstream - don't allow it.
# Attempts to coerce a row vector into a column vector.
ary = ary.reshape(-1, 1)
if self._input_col_dim is None:
# This allows starting with a dataframe, then later passing bare
# numpy arrays. For now, assume users are not miscreants.
self._input_col_dim = ary.shape[1]
elif self._input_col_dim is not None:
if ary.shape[1] != self._input_col_dim:
raise ValueError(
"Column-dimension of new data must match prior data."
)
if ary.shape[0] <= 1:
raise ValueError(
"Input for batch detectors should contain more than one observation."
)
return ary
def _validate_y(self, y):
"""Validate that input contains only one column.
Args:
y (numeric): the current value for `y_true` or `y_pred`, given to
`update`.
Raises:
ValueError: if an array has been passed that has more than one column
"""
ary = np.array(y)
if len(ary.shape) <= 1:
ary = ary.reshape(1, -1)
if ary.shape[0] == 1:
raise ValueError(
"Input for batch detectors should contain more than one obsevation."
)
if ary.shape[1] != 1:
raise ValueError("y input for detectors should contain only one column.")
return ary
def _validate_input(self, X, y_true, y_pred):
"""Helper method for `update` and `set_reference`. Validates whether the
input is appropriate for a batch detector. Errors will be raised if X's
dimensions don't match prior input, or a y input has more than one
column.
Args:
X (numpy.ndarray): input data
y_true (numpy.ndarray): if applicable, true labels of input data
y_pred (numpy.ndarray): if applicable, predicted labels of input data
"""
if X is not None:
X = self._validate_X(X)
if y_true is not None:
y_true = self._validate_y(y_true)
if y_pred is not None:
y_pred = self._validate_y(y_pred)
return X, y_true, y_pred
@property
def total_batches(self):
"""Total number of batches the drift detector has been updated with.
Returns:
int
"""
return self._total_batches
@total_batches.setter
def total_batches(self, value):
self._total_batches = value
@property
def batches_since_reset(self):
"""Number of batches since last drift detection.
Returns:
int
"""
return self._batches_since_reset
@batches_since_reset.setter
def batches_since_reset(self, value):
self._batches_since_reset = value
@property
def drift_state(self):
"""Set detector's drift state to ``"drift"``, ``"warning"``, or ``None``."""
return self._drift_state
@drift_state.setter
def drift_state(self, value):
"""Set detector's drift state to ``"drift"``, ``"warning"``, or ``None``.
Args:
value (str): ``"drift"``, ``"warning"``, or ``None``
Raises:
ValueError: raised if disallowed value is given
"""
if value not in ("drift", "warning", None):
raise ValueError("tbd")
else:
self._drift_state = value
[docs]class DriftDetector(ABC):
"""
This class is deprecated in 0.2.0+.
Base class for Menelaus drift detectors.
A DriftDetector object implements the ``update`` and ``reset`` methods and
calls the ``super`` methods to initialize and update the attributes below.
Generally, a DriftDetector is instantiated, then repeatedly passed new data
via ``update``. At each ``update`` step, its ``drift_state`` will reflect
whether drift has been detected or almost been detected. After the
detector's state is set to ``"drift"``, ``update`` calls ``reset`` to
re-initialize the relevant attributes.
A "batch" detector will compare a new dataset, passed via ``update``, to a
reference dataset, usually the original reference dataset. A "stream"
detector compares only one new sample at a time, also passed via ``update``.
"""
[docs] def __init__(self, *args, **kwargs):
super().__init__()
self._total_updates = 0
self._updates_since_reset = 0
self._drift_state = None
self._input_type = None
[docs] @abstractmethod
def update(self, X, y_true, y_pred):
"""
Update the detector with a new sample or batch.
Args:
X (numpy.ndarray): input data
y_true (numpy.ndarray): if applicable, true labels of input data
y_pred (numpy.ndarray): if applicable, predicted labels of input data
"""
self.total_updates += 1
self.updates_since_reset += 1
[docs] @abstractmethod
def reset(self, *args, **kwargs):
"""Initialize the detector's drift state and other relevant attributes.
Intended for use after ``drift_state == 'drift'``."""
self.updates_since_reset = (
0 # number of elements the detector has been updated with since last reset
)
self.drift_state = None
@property
def total_updates(self):
"""Number of samples/batches the drift detector has ever been updated
with.
Returns:
int
"""
return self._total_updates
@total_updates.setter
def total_updates(self, value):
self._total_updates = value
@property
def updates_since_reset(self):
"""Number of samples/batches since the last time the drift detector was
reset.
Returns:
int
"""
return self._updates_since_reset
@updates_since_reset.setter
def updates_since_reset(self, value):
self._updates_since_reset = value
@property
def drift_state(self):
"""Detector's current drift state, with values ``"drift"``, ``"warning"``,or
``None``.
"""
return self._drift_state
@drift_state.setter
def drift_state(self, value):
"""Set detector's drift state to ``"drift"``, ``"warning"``, or ``None``.
Args:
value (str): ``"drift"``, ``"warning"``, or ``None``
Raises:
ValueError: raised if disallowed value is given
"""
if value not in ("drift", "warning", None):
raise ValueError(
"""DriftDetector._drift_state must be ``"drift"``,
``"warning"``, or ``None``."""
)
else:
self._drift_state = value
@property
@abstractmethod
def input_type(self):
"""The type of input the detector accepts, either ``"batch"``, with multiple
samples in one call to update(), or ``"stream"``, with one sample per call
to update().
"""
return self._input_type