Source code for menelaus.ensemble.election

from abc import ABC, abstractmethod


#######################
# Ensemble Evaluators
#######################


[docs]class Election(ABC): """ Abstract base class for implementations of election schemes used to evaluate drift state of ensembles, by operating on the drift states of constituent detectors. Constructors for sub-classes may differ, but all ``Election`` classes are callable classes, where the call takes only the list of detectors to evaluate. The surrounding ``Ensemble`` class will update its drift state within its ``update`` function, by calling the ``Election`` it is given at initialization-time, upon its detectors. """ @abstractmethod def __call__(self, detectors: list): # pragma: no cover raise NotImplemented
[docs]class SimpleMajorityElection(Election): """ ``Election`` that determines drift for an ensemble, based on whether a simple majority of the ensemble's detectors have voted for drift. Ref :cite:t:`duelectionreference` """ def __call__(self, detectors: list): """ Args: detectors (list): detector objects to examine Returns str: ``"drift"`` if drift is determined, or ``None`` """ simple_majority_threshold = len(detectors) // 2 alarms = [d for d in detectors if d.drift_state == "drift"] num_drift = len(alarms) if num_drift > simple_majority_threshold: return "drift" else: return None
[docs]class MinimumApprovalElection(Election): """ ``Election`` that determines drift based on whether a minimum number of provided detectors have alarmed. This threshold can be 1 to the maximum number of detectors. """
[docs] def __init__(self, approvals_needed: int = 1): """ Args: approvals_needed (int): minimum approvals to alarm """ self.approvals_needed = approvals_needed
def __call__(self, detectors: list): """ Args: detectors (list): detector objects to examine Returns: str: ``"drift_state"`` if drift is determined, or ``None`` """ num_approvals = 0 for d in detectors: if d.drift_state == "drift": num_approvals += 1 if num_approvals >= self.approvals_needed: return "drift" return None
[docs]class OrderedApprovalElection(Election): """ ``Election`` that determines drift based on whether: 1) An initial ``a`` count of detectors alarmed for drift. 2) A subsequent ``c`` count of detectors confirmed drift. Hypothethically, the distinction between this and ``MinimumApprovalElection(a+c)``, is if the detectors were added to a collection in a meaningful order. As such this voting scheme iterates over detectors in preserved order of insertion into the user-defined list, and uses the first ``approvals_needed`` amount for initial detection, and the next ``confirmations_needed`` amount for confirmation of drift. """
[docs] def __init__(self, approvals_needed: int = 1, confirmations_needed: int = 1): """ Args: approvals_needed (int): Minimum number of detectors that must alarm for the ensemble to alarm. confirmations_needed (int): Minimum number of confirmations needed to alarm, after ``approvals_needed`` alarms have been observed. """ self.approvals_needed = approvals_needed self.confirmations_needed = confirmations_needed
def __call__(self, detectors: list): """ Args: detectors (list): detector objects to examine Returns: str: ``"drift_state"`` if drift is determined, or ``None`` """ num_approvals = 0 num_confirmations = 0 for d in detectors: if d.drift_state == "drift": if num_approvals < self.approvals_needed: num_approvals += 1 else: num_confirmations += 1 if ( num_approvals >= self.approvals_needed and num_confirmations >= self.confirmations_needed ): return "drift" return None
[docs]class ConfirmedElection(Election): """ ``Election`` for handling detectors, typically in streaming setting. In this scheme, when a single detector alarms, the ``Election`` will wait for a certain number of samples, until one or more other detectors also alarm, confirming the drift. Derived from the ensemble scheme described in :cite:t:`macielelectionreference`. """
[docs] def __init__(self, sensitivity: int, wait_time: int): """ Args: sensitivity (int): how many combined waiting/new drift alarms should result in ensemble alarm wait_time (int): after how many steps of waiting, should each detector reset its time spent waiting post-drift-alarm """ self.sensitivity = sensitivity self.wait_time = wait_time self.wait_period_counters = None
def __call__(self, detectors: list): """ Args: detectors (list): detector objects to examine Returns: str: ``"drift_state"`` if drift is determined, or ``None`` """ if self.wait_period_counters is None: self.wait_period_counters = [0] * len(detectors) num_drift = 0 num_warning = 0 states = [d.drift_state for d in detectors] for i, state in enumerate(states): if state == "drift" and self.wait_period_counters[i] == 0: num_drift += 1 self.wait_period_counters[i] += 1 elif state == "warning": num_warning += 1 elif self.wait_period_counters[i] != 0: num_drift += 1 self.wait_period_counters[i] += 1 if num_drift >= self.sensitivity: ret = "drift" elif num_warning + num_drift >= self.sensitivity: ret = "warning" else: ret = None for i, count in enumerate(self.wait_period_counters): if count > self.wait_time: self.wait_period_counters[i] = 0 return ret