Source code for menelaus.partitioners.KDQTreePartitioner

import numpy as np
import pandas as pd
import scipy.stats


[docs]class KDQTreePartitioner: """This class encapsulates a KDQ Tree for partitioning data which runs through drift detection algorithms, as described by Dasu et. al. (2006). The general algorithm to build a tree performs until a stop condition: * Identify axis to split upon. * Identify midpoint at axis. * Split into lower/upper halves using midpoint. * Recursively split sub-trees. This class can be used to build a tree, 'fill' a tree (send new data to be forcibly partitioned according to an existing build), access leaf nodes, and perform visualization / printing. There are also convenience functions to calculate distances between a built and filled tree. Attributes: count_ubound (int): upper bound of count of data points to be partitioned into any one cell cutpoint_proportion_lbound (float): min. proportion of all features' range values that any constructed cell size must satisfy node (KDQTreeNode): reference to root node of build tree leaves (list(KDQTreeNode)): list of leaf nodes of build tree """
[docs] def __init__(self, count_ubound=200, cutpoint_proportion_lbound=0.25): """ Args: count_ubound (int, optional): upper bound of count of data points to be partitioned into any one cell. Default ``200``. cutpoint_proportion_lbound (float, optional): min. proportion of all features' range values that any constructed cell size must satisfy. Default ``0.25``. """ self.count_ubound = count_ubound self.cutpoint_proportion_lbound = cutpoint_proportion_lbound self.node = None self.leaves = []
[docs] def build(self, data): """ Creates a new kdqTree by partitioning the data into square nodes in the feature-space. Args: data (numpy.array): data to be partitioned Returns: KDQTreeNode: root node of constructed KDQ-Tree """ if len(data.shape) <= 1: return None _, num_cols = data.shape min_cutpoint_sizes = [ int(self.cutpoint_proportion_lbound * np.ptp(data[:, axis])) for axis in range(num_cols) ] self.node = KDQTreeNode.build( data, self.count_ubound, min_cutpoint_sizes, self.leaves ) return self.node
[docs] def reset(self, value=0, tree_id="build"): """Sets the counts for the given tree_id to the given value. Args: value (int, optional): value to be written for each node. Default ``0``. tree_id (str, optional): identifier for set of counts to be overwritten. Default ``'build'``. """ KDQTreeNode.reset(self.node, value=value, tree_id=tree_id)
[docs] def fill(self, data, tree_id, reset=False): """For a new sample data, partition it according to the pre-existing tree structure. Its counts will be added to those for the tree_id index. Args: data (numpy.array): new data to be partitioned tree_id (str): identifier for new data counts to be stored reset (bool, optional): if ``True``, counts for this ``tree_id`` will be overwritten with ones calculated from current sample, else added. Default ``False`` Returns: KDQTreeNode: root node of KDQ-Tree """ if self.node is None or len(data.shape) <= 1: return None KDQTreeNode.fill(data, self.node, self.count_ubound, tree_id, reset) return self.node
[docs] def leaf_counts(self, tree_id): """Return the counts for each leaf of the tree at ``tree_id``. Args: tree_id (str): identifier of tree for which to return counts Returns: list: list of counts at leaves of KDQ-Tree """ ret = None if self.leaves: ret = [ leaf.num_samples_in_compared_subtrees[tree_id] for leaf in self.leaves ] return ret
[docs] def kl_distance(self, tree_id1, tree_id2): """For the empirical distributions defined by the counts at tree_id1 and tree_id2, calculate the Kullback-Leibler divergence from tree_id2 to tree_id1. Args: tree_id1 (str): identifier for reference tree tree_id2 (str): identifier for comparison tree Returns: float: Kullback-Leibler divergence among trees """ if self.leaves == []: return None counts1 = self.leaf_counts(tree_id1) counts2 = self.leaf_counts(tree_id2) hist1 = KDQTreePartitioner._distn_from_counts(counts1) hist2 = KDQTreePartitioner._distn_from_counts(counts2) distance = scipy.stats.entropy(hist1, hist2) return distance
@staticmethod def _distn_from_counts(counts): """Calculate an empirical distribution across the alphabet defined by the bins (here, indices in the counts array), using Dasu's correction. Args: counts (numpy.array): array of counts Returns: numpy.array: array of frequencies """ total = np.sum(counts) hist = np.array(counts) + 0.5 hist = hist / (total + len(hist) / 2) return hist
[docs] def to_plotly_dataframe( self, tree_id1="build", tree_id2=None, max_depth=None, input_cols=None ): """Generates a dataframe containing information about the kdqTree's structure and some node characteristics, intended for use with plotly. DataFrame columns capture: * ``name``, a label corresponding to which feature this split is on. * ``idx``, a unique ID for the node, to pass to ``plotly.express.treemap``'s ``id`` argument. * ``parent_idx``, the ID of the node's parent. * ``cell_count``, how many samples are in this node in the reference tree. * ``depth``, how deep the node is in the tree. * ``count_diff``, if ``tree_id2`` is specified, the change in counts from the reference tree. * ``kss``, the Kulldorff Spatial Scan Statistic for this node, defined as the KL-divergence for this node between the reference and test trees, using the individual node and all other nodes combined as the bins for the distributions. Args: tree_id1 (str): identifier for reference tree. Default ``'build'`` tree_id2 (str): identifier for test tree. Default ``None``. max_depth (int, optional): tree depth up to which the method recurses. Default ``None``. input_cols (list, optional): list of column names for the input data. Default ``None``. Returns: pandas.DataFrame: where each row represents a node """ arr = [] KDQTreeNode.as_flattened_array( self.node, tree_id1=tree_id1, tree_id2=tree_id2, output=arr, input_cols=input_cols, ) df = pd.DataFrame.from_dict(arr) if max_depth: df = df[df.depth <= max_depth] if tree_id2 is not None: # could be more efficient by avoiding the use of pd.apply? kss_counts = pd.DataFrame( { "node_count_ref": df["cell_count"], "node_count_test": df["count_diff"] + df["cell_count"], } ) test_max = kss_counts["node_count_test"].max() ref_max = kss_counts["node_count_ref"].max() df["kss"] = kss_counts.apply( KDQTreePartitioner._calculate_kss, args=(ref_max, test_max), axis=1 ) return df
@staticmethod def _calculate_kss(df, ref_max, test_max): """For the given node's test counts and reference counts, the Kulldorff Spatial Scan Statistic can be calculated by treating that node and its complement (all the other nodes in the tree, described by ref_max and test_max) as two bins in an empirical distribution, and then calculating the Kullback-Leibler divergence from the test to the reference. Args: df (pandas.DataFrame): single-row dataframe containing test and reference counts ref_max (int): total number of observations in reference sample test_max (int): total number of observations in test sample Returns: float: KSS from test data to reference data """ ref_dist = KDQTreePartitioner._distn_from_counts( np.array([df["node_count_ref"], ref_max - df["node_count_ref"]]) ) test_dist = KDQTreePartitioner._distn_from_counts( np.array([df["node_count_test"], test_max - df["node_count_test"]]) ) return scipy.stats.entropy(ref_dist, test_dist)
[docs]class KDQTreeNode: """ This class represents a node in the KDQ Tree data structure described above. Its static methods provide the engine for recursively building, filling, and displaying trees. Build / fill trees are identified by a tree ID. During initialization, a node typically takes ``num_samples_in_compared_subtrees`` (among other values needed for building), which is a ``dict`` containing counts for each tree ID. Each node stores the number of data points contained within itself and its subtrees as a value in this ``dict``. A ``tree_id`` key identifies which build those counts are associated with (e.g., the first call to ``build()`` stores counts in the ``node.num_samples_in_compared_subtrees['build']`` by default). Subsequent calls to ``fill()`` with new data store additional counts for the node, according to the ID the user passes in. Attributes: num_samples_in_compared_subtrees (dict): number of data points, by tree ID, contained within node and its children axis (int): axis at which to build/fill (for recursive construction) midpoint_at_axis (float): midpoint at provided axis left (KDQTreeNode): left child right (KDQTreeNode): right child """
[docs] def __init__( self, num_samples_in_compared_subtrees, axis, midpoint_at_axis, left, right ): """ Args: num_samples_in_compared_subtrees (dict): number of data points, by tree ID, contained within node and its children axis (int): axis at which to build/fill (for recursive construction) midpoint_at_axis (float): midpoint at provided axis left (KDQTreeNode): left child right (KDQTreeNode): right child """ self.num_samples_in_compared_subtrees = num_samples_in_compared_subtrees self.axis = axis self.midpoint_at_axis = midpoint_at_axis self.left = left self.right = right
[docs] @staticmethod def build(data, count_ubound, min_cutpoint_sizes, leaves, depth=0): """Creates a new kdqTree by partitioning the data into square nodes in the feature-space. Args: data (numpy.array): data to be partitioned count_ubound (int): upper bound of count of points to be put into any 1 cell min_cutpoint_sizes (list): minimum sizes of features that a leaf can have leaves (list): list of leaf nodes depth (int, optional): current depth of tree. Default ``0``. Returns: KDQTreeNode: root node of tree """ n, m = data.shape if n == 0 or m == 0: return None axis = depth % m min_value_at_axis = np.min(data[:, axis]) midpoint_at_axis = min_value_at_axis + (np.ptp(data[:, axis]) / 2) new_cell_size = midpoint_at_axis - min_value_at_axis if ( n <= count_ubound or np.unique(data).size <= count_ubound or new_cell_size <= min_cutpoint_sizes[axis] ): leaf = KDQTreeNode({"build": n}, None, None, None, None) leaves.append(leaf) return leaf upper_data = data[data[:, axis] > midpoint_at_axis] lower_data = data[data[:, axis] <= midpoint_at_axis] total_points = upper_data.shape[0] + lower_data.shape[0] node = KDQTreeNode( {"build": total_points}, axis=axis, midpoint_at_axis=midpoint_at_axis, left=KDQTreeNode.build( lower_data, count_ubound, min_cutpoint_sizes, leaves, depth + 1 ), right=KDQTreeNode.build( upper_data, count_ubound, min_cutpoint_sizes, leaves, depth + 1 ), ) return node
[docs] @staticmethod def fill(data, node, count_ubound, tree_id, reset=False): """ For a new sample, partition it according to the pre-existing subtree rooted at node. Its counts will be added to those for the ``tree_id`` index. Args: data (numpy.array): new data to be partitioned into existing tree node (KDQTreeNode): current node count_ubound (int): upper bound of points in any one cell tree_id (str): identifier for new data counts to be stored reset (bool, optional): if ``True``, counts for this ``tree_id`` will be overwritten with ones calculated from current sample, else added. Default ``False``. """ # case: no more nodes if node is None: return # this line shows as dead to coverage statistics, but that should be a bug, since otherwise this function wouldn't terminate n = data.shape[0] axis = node.axis # basically, matches the return Node(n, None) case above if (node is not None) and node.axis is None: # update by ID if tree_id not in node.num_samples_in_compared_subtrees.keys() or reset: node.num_samples_in_compared_subtrees[tree_id] = n else: node.num_samples_in_compared_subtrees[tree_id] += n return # case: continue parsing midpoint_at_axis = node.midpoint_at_axis upper_data = data[data[:, axis] > midpoint_at_axis] lower_data = data[data[:, axis] <= midpoint_at_axis] total_points = upper_data.shape[0] + lower_data.shape[0] # update by ID if tree_id not in node.num_samples_in_compared_subtrees.keys() or reset: node.num_samples_in_compared_subtrees[tree_id] = total_points else: node.num_samples_in_compared_subtrees[tree_id] += total_points # recurse KDQTreeNode.fill(upper_data, node.right, count_ubound, tree_id, reset) KDQTreeNode.fill(lower_data, node.left, count_ubound, tree_id, reset)
[docs] @staticmethod def reset(node, value, tree_id): """For the subtree rooted at ``node``, set the counts for ``tree_id`` equal to ``value``. Args: node (KDQTreeNode): root of subtree value (int): value to set counts to tree_id (str): identifier for tree """ if node: node.num_samples_in_compared_subtrees[tree_id] = value KDQTreeNode.reset(node.left, value, tree_id) KDQTreeNode.reset(node.right, value, tree_id)
[docs] @staticmethod def as_text(node, tree_id="build"): """Produce a text representation of the tree structure and its counts. Args: node (KDQTreeNode): root node of desired subtree tree_id (str, optional): identifier for desired subtree. Default ``build``. """ # TODO - to avoid a recursive printing problem, this prints rather than storing an ongoing output string if node: print(f"\nsubtree count: {node.num_samples_in_compared_subtrees[tree_id]}") if node.left: print(f"\tleft: {node.left.num_samples_in_compared_subtrees[tree_id]}") if node.right: print( f"\tright: {node.right.num_samples_in_compared_subtrees[tree_id]}" ) KDQTreeNode.as_text(node.left, tree_id) KDQTreeNode.as_text(node.right, tree_id)
[docs] @staticmethod def as_flattened_array( node, tree_id1, tree_id2, output=[], name="kdqTree", parent_idx=None, depth=0, input_cols=None, ): """Generates a list containing dicts with information about each node's structure for the tree rooted at node. Args: node (KDQTreeNode): root node of desired subtree tree_id1 (str): identifier for reference tree tree_id2 (str): identifier for test tree output (list, optional): list of dictionaries containing information about each node. Default ``[]``. name (str): name of root node. Default ``'kdqTree'``. parent_idx (int, optional): unique ID for parent of current node. Default ``None``. depth (int, optional): depth of current subtree. Default ``0``. input_cols (list, optional): list of column names for the input data. Default ``None``. """ if node and (tree_id1 in node.num_samples_in_compared_subtrees.keys()): # basic plotting features current_data = { "name": name, "idx": id(node), "parent_idx": parent_idx, "cell_count": node.num_samples_in_compared_subtrees[tree_id1], "depth": depth, } # advanced plotting features if tree_id2: if tree_id2 in node.num_samples_in_compared_subtrees.keys(): count_diff = ( node.num_samples_in_compared_subtrees[tree_id2] - node.num_samples_in_compared_subtrees[tree_id1] ) else: count_diff = 0 - node.num_samples_in_compared_subtrees[tree_id1] current_data["count_diff"] = count_diff # append and recurse output.append(current_data) if node.left is not None: if input_cols is not None: axis_name = ( f"{input_cols[node.axis]} <= {round(node.midpoint_at_axis, 3)}" ) else: axis_name = f"ax {node.axis} <= {round(node.midpoint_at_axis, 3)}" KDQTreeNode.as_flattened_array( node.left, tree_id1, tree_id2, output, axis_name, id(node), depth + 1, input_cols, ) if node.right is not None: if input_cols is not None: axis_name = ( f"{input_cols[node.axis]} > {round(node.midpoint_at_axis, 3)}" ) else: axis_name = f"ax {node.axis} > {round(node.midpoint_at_axis, 3)}" KDQTreeNode.as_flattened_array( node.right, tree_id1, tree_id2, output, axis_name, id(node), depth + 1, input_cols, )