Source code for menelaus.partitioners.NNSpacePartitioner

import numpy as np
from sklearn.neighbors import NearestNeighbors


[docs]class NNSpacePartitioner: """ This class encodes the Nearest Neighbors Space Partitioning scheme (NNPS) for use in the Nearest Neigbors Density Variation Identification (NN-DVI) drift detection algorithm, both of which are introduced in Liu et al. (2018). Broadly, NNSP combines two input data samples, finds the adjacency matrix using Nearest Neighbor search, and transforms the data points into a set of shared subspaces for estimating density and changes in density between the two samples, via a distance function. Attributes: k (int): the 'k' in k-Nearest-Neighbor (k-NN) search D (numpy.array): combined data from two samples v1 (numpy.array): indices of first sample data within ``D`` v2 (numpy.array): indices of second sample data within ``D`` nnps_matrix (numpy.array): NNSP shared-subspace representation of ``D`` and its adjacency matrix adjacency_matrix (numpy.array): result of k-NN search on ``D`` """
[docs] def __init__(self, k: int): """ Args: k (int): the 'k' in k-NN search, describing size of searched neighborhood """ self.k = k self.D = None self.v1 = None self.v2 = None self.nnps_matrix = None self.adjacency_matrix = None
[docs] def build(self, sample1: np.array, sample2: np.array): """ Builds an NNSP representation matrix given two samples of data. Internally stores computed union set, adjacency matrix, index arrays for the two samples, and NNSP representation matrix. Args: sample1 (numpy.array): first (possibly the 'reference') sample set sample2 (numpy.array): second (possibly the 'test') sample set """ data = np.vstack((sample1, sample2)) D, inverted_indices = np.unique(data, axis=0, return_inverse=True) self.D = D v1, v2 = np.array_split(inverted_indices, 2) v1_onehot = np.zeros(D.shape[0]) v2_onehot = np.zeros(D.shape[0]) # XXX - Alternatively, v1_onehot = np.identity(adjacency_matrix.shape[0])[v1] - Anmol v1_onehot[v1] = 1.0 v2_onehot[v2] = 1.0 self.v1 = v1_onehot self.v2 = v2_onehot nn = NearestNeighbors(n_neighbors=self.k).fit(D) # TODO: maybe we can gain performance by performing operations using the returned # scipy.sparse array, as opposed to converting this way. M_adj = nn.kneighbors_graph(D).toarray() self.adjacency_matrix = M_adj # XXX - NearestNeighbors already adds the self-neighbors # TODO - check about order preservation P_nnps = M_adj weight_array = np.sum(P_nnps, axis=1).astype(int) Q = np.lcm.reduce(weight_array) m = Q / weight_array m = m * np.identity(len(m)) self.nnps_matrix = np.matmul(m, P_nnps)
[docs] @staticmethod def compute_nnps_distance(nnps_matrix, v1, v2): """ Breaks NNSP reprsentation matrix into NNPS matrices for two samples using indices, computes difference in densities of shared subspaces, between samples. Args: nnps_matrix (numpy.array): NNSP representation matrix v1 (numpy.array): indices of first sample in ``D``, ``nnps_matrix``, in one-hot encoding format v2 (numpy.array): indices of second sample in ``D``, ``nnps_matrix``, in one-hot encoding format Returns: float: distance value between samples, representing difference in shared subspace densities """ M_s1 = np.dot(v1, nnps_matrix) M_s2 = np.dot(v2, nnps_matrix) # These commented lines would only be relevant if there were overlap # between the two vectors, which there never should be for our use case. # Otherwise, this is always going to be the number of elements. # membership = np.sum(np.array([v1, v2]), axis=0) # membership = membership >= 1 # in case of overlap # denom = sum(membership) denom = len(v1) d_nnps = np.sum(np.abs(M_s1 - M_s2) / (M_s1 + M_s2)) d_nnps /= denom return d_nnps