Source code for graspologic.cluster.divisive_cluster

# Copyright (c) Microsoft Corporation and contributors.
# Licensed under the MIT License.

from typing import Any, Optional, cast

import numpy as np
from anytree import LevelOrderIter, NodeMixin
from sklearn.base import BaseEstimator
from sklearn.mixture import GaussianMixture
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from typing_extensions import Literal

from graspologic.types import Dict, Tuple

from .autogmm import AutoGMMCluster
from .kclust import KMeansCluster


def _check_common_inputs(
    min_components: int, max_components: int, cluster_kws: Dict[str, Any]
) -> None:
    if not isinstance(min_components, int):
        raise TypeError("min_components must be an int")
    elif min_components < 1:
        raise ValueError("min_components must be > 0")

    if not isinstance(max_components, int):
        raise TypeError("max_components must be an int")
    elif max_components < 1:
        raise ValueError("max_components must be > 0")
    elif max_components < min_components:
        raise ValueError("max_components must be >= min_components")

    if not isinstance(cluster_kws, dict):
        raise TypeError("cluster_kws must be a dict")


def _check_fcluster(fcluster: bool, level: Optional[int]) -> None:
    if level is not None:
        if not isinstance(level, int):
            raise TypeError("level must be an int")
        elif level < 1:
            raise ValueError("level must be positive")
        elif fcluster is False:
            msg = "level-specific flat clustering is availble\
                only if 'fcluster' is enabled"
            raise ValueError(msg)


[docs] class DivisiveCluster(NodeMixin, BaseEstimator): """ Recursively clusters data based on a chosen clustering algorithm. This algorithm implements a "divisive" or "top-down" approach. Parameters ---------- cluster_method : str {"gmm", "kmeans"}, defaults to "gmm". The underlying clustering method to apply. If "gmm" will use :class:`~graspologic.cluster.AutoGMMCluster`. If "kmeans", will use :class:`~graspologic.cluster.KMeansCluster`. min_components : int, defaults to 1. The minimum number of mixture components/clusters to consider for the first split if "gmm" is selected as ``cluster_method``; and is set to 1 for later splits. If ``cluster_method`` is "kmeans", it is set to 2 for all splits. max_components : int, defaults to 2. The maximum number of mixture components/clusters to consider at each split. min_split : int, defaults to 1. The minimum size of a cluster for it to be considered to be split again. max_level : int, defaults to 4. The maximum number of times to recursively cluster the data. delta_criter : float, non-negative, defaults to 0. The smallest difference between selection criterion values of a new model and the current model that is required to accept the new model. Applicable only if ``cluster_method`` is "gmm". cluster_kws : dict, defaults to {} Keyword arguments (except ``min_components`` and ``max_components``) for chosen clustering method. Attributes ---------- model_ : GaussianMixture or KMeans object Fitted clustering object based on which ``cluster_method`` was used. See Also -------- graspologic.cluster.AutoGMMCluster graspologic.cluster.KMeansCluster anytree.node.nodemixin.NodeMixin Notes ----- This class inherits from :class:`anytree.node.nodemixin.NodeMixin`, a lightweight class for doing various simple operations on trees. This algorithm was strongly inspired by maggotcluster, a divisive clustering algorithm in https://github.com/neurodata/maggot_models and the algorithm for estimating a hierarchical stochastic block model presented in [2]_. References ---------- .. [1] Athey, T. L., & Vogelstein, J. T. (2019). AutoGMM: Automatic Gaussian Mixture Modeling in Python. arXiv preprint arXiv:1909.02688. .. [2] Lyzinski, V., Tang, M., Athreya, A., Park, Y., & Priebe, C. E (2016). Community detection and classification in hierarchical stochastic blockmodels. IEEE Transactions on Network Science and Engineering, 4(1), 13-26. """ parent: Optional["DivisiveCluster"]
[docs] def __init__( self, cluster_method: Literal["gmm", "kmeans"] = "gmm", min_components: int = 1, max_components: int = 2, cluster_kws: Dict[str, Any] = {}, min_split: int = 1, max_level: int = 4, delta_criter: float = 0, ): _check_common_inputs(min_components, max_components, cluster_kws) if cluster_method not in ["gmm", "kmeans"]: msg = "clustering method must be one of" msg += "{gmm, kmeans}" raise ValueError(msg) if delta_criter < 0: raise ValueError("delta_criter must be non-negative") self.parent = None self.min_components = min_components self.max_components = max_components self.cluster_method = cluster_method self.cluster_kws = cluster_kws self.min_split = min_split self.max_level = max_level self.delta_criter = delta_criter
[docs] def fit(self, X: np.ndarray) -> "DivisiveCluster": """ Fits clustering models to the data as well as resulting clusters Parameters ---------- X : array-like, shape (n_samples, n_features) Returns ------- self : object Returns an instance of self. """ self.fit_predict(X) return self
[docs] def fit_predict( self, X: np.ndarray, fcluster: bool = False, level: Optional[int] = None ) -> np.ndarray: """ Fits clustering models to the data as well as resulting clusters and using fitted models to predict a hierarchy of labels Parameters ---------- X : array-like, shape (n_samples, n_features) fcluster: bool, default=False if True, returned labels will be re-numbered so that each column of labels represents a flat clustering at current level, and each label corresponds to a cluster indexed the same as the corresponding node in the overall clustering dendrogram level: int, optional (default=None) the level of a single flat clustering to generate only available if ``fcluster`` is True Returns ------- labels : array_label, shape (n_samples, n_levels) if no level specified; otherwise, shape (n_samples,) """ X = check_array(X, dtype=[np.float64, np.float32], ensure_min_samples=1) _check_fcluster(fcluster, level) if self.max_components > X.shape[0]: msg = "max_components must be >= n_samples, but max_components = " msg += "{}, n_samples = {}".format(self.max_components, X.shape[0]) raise ValueError(msg) labels = self._fit(X) # delete the last column if predictions at the last level # are all zero vectors if (labels.shape[1] > 1) and (np.max(labels[:, -1]) == 0): labels = labels[:, :-1] if level is not None: if level > labels.shape[1]: msg = "input exceeds max level = {}".format(labels.shape[1]) raise ValueError(msg) if fcluster: labels = self._relabel(labels, level) return labels
def _cluster_and_decide(self, X: np.ndarray) -> np.ndarray: if self.is_root: min_components = self.min_components else: min_components = 1 if self.cluster_method == "gmm": cluster = AutoGMMCluster( min_components=min_components, max_components=self.max_components, **self.cluster_kws, ) cluster.fit(X) model = cluster.model_ criter = cluster.criter_ k = cluster.n_components_ pred = cluster.predict(X) if self.delta_criter > 0: single_cluster = AutoGMMCluster( min_components=1, max_components=1, **self.cluster_kws ) single_cluster.fit(X) criter_single_cluster = single_cluster.criter_ if k > 1: # check whether the difference between the criterion # of "split" and "not split" is greater than # the threshold, delta_criter if criter_single_cluster - criter < self.delta_criter: pred = np.zeros((len(X), 1), dtype=int) elif self.cluster_method == "kmeans": cluster = KMeansCluster( max_clusters=self.max_components, **self.cluster_kws ) cluster.fit(X) model = cluster.model_ pred = cluster.predict(X) self.model_ = model return pred def _fit(self, X: np.ndarray) -> np.ndarray: pred = self._cluster_and_decide(X) self.children: Tuple["DivisiveCluster"] = cast( Tuple["DivisiveCluster"], tuple() ) uni_labels = np.unique(pred) labels = pred.reshape((-1, 1)).copy() if len(uni_labels) > 1: for ul in uni_labels: inds = pred == ul new_X = X[inds] dc = DivisiveCluster( cluster_method=self.cluster_method, max_components=self.max_components, min_split=self.min_split, max_level=self.max_level, cluster_kws=self.cluster_kws, delta_criter=self.delta_criter, ) dc.parent = self if ( len(new_X) > self.max_components and len(new_X) >= self.min_split and self.depth + 1 < self.max_level ): child_labels = dc._fit(new_X) while labels.shape[1] <= child_labels.shape[1]: labels = np.column_stack(( labels, np.zeros((len(X), 1), dtype=int), )) labels[inds, 1 : child_labels.shape[1] + 1] = child_labels else: # make a "GaussianMixture" model for clusters # that were not fitted if self.cluster_method == "gmm": cluster_idx = len(dc.parent.children) - 1 parent_model = dc.parent.model_ model = GaussianMixture() model.weights_ = np.array([1]) model.means_ = parent_model.means_[cluster_idx].reshape(1, -1) model.covariance_type = parent_model.covariance_type if model.covariance_type == "tied": model.covariances_ = parent_model.covariances_ model.precisions_ = parent_model.precisions_ model.precisions_cholesky_ = ( parent_model.precisions_cholesky_ ) else: cov_types = ["spherical", "diag", "full"] n_features = model.means_.shape[-1] cov_shapes = [ (1,), (1, n_features), (1, n_features, n_features), ] cov_shape_idx = cov_types.index(model.covariance_type) model.covariances_ = parent_model.covariances_[ cluster_idx ].reshape(cov_shapes[cov_shape_idx]) model.precisions_ = parent_model.precisions_[ cluster_idx ].reshape(cov_shapes[cov_shape_idx]) model.precisions_cholesky_ = ( parent_model.precisions_cholesky_[cluster_idx].reshape( cov_shapes[cov_shape_idx] ) ) dc.model_ = model return labels
[docs] def predict( self, X: np.ndarray, fcluster: bool = False, level: Optional[int] = None ) -> np.ndarray: """ Predicts a hierarchy of labels based on fitted models Parameters ---------- X : array-like, shape (n_samples, n_features) fcluster: bool, default=False if True, returned labels will be re-numbered so that each column of labels represents a flat clustering at current level, and each label corresponds to a cluster indexed the same as the corresponding node in the overall clustering dendrogram level: int, optional (default=None) the level of a single flat clustering to generate only available if ``fcluster`` is True Returns ------- labels : array-like, shape (n_samples, n_levels) if no level specified; otherwise, shape (n_samples,) """ check_is_fitted(self, ["model_"], all_or_any=all) X = check_array(X, dtype=[np.float64, np.float32], ensure_min_samples=1) _check_fcluster(fcluster, level) labels = self._predict_labels(X) if (level is not None) and (level > labels.shape[1]): msg = "input exceeds max level = {}".format(labels.shape[1]) raise ValueError(msg) if fcluster: # convert labels to stacked flat clusterings # based on the flat clusterings on fitted data inds = [(labels == row).all(1).any() for row in self._labels] labels = self._new_labels[inds] if level is not None: labels = labels[:, level - 1] return labels
def _predict_labels(self, X: np.ndarray) -> np.ndarray: if not self.is_leaf: pred_labels = np.zeros((len(X), self.height), dtype=int) current_pred_labels = self.model_.predict(X) pred_labels[:, 0] = current_pred_labels for label in np.unique(current_pred_labels): current_child = self.children[label] if not current_child.is_leaf: child_pred_labels = current_child._predict_labels( X[current_pred_labels == label] ) pred_labels[ current_pred_labels == label, 1 : child_pred_labels.shape[1] + 1 ] = child_pred_labels else: # only for cases where root is a leaf cluster, i.e., # only 1 cluster predicted at 1st level if self.is_root: pred_labels = np.zeros((len(X), 1), dtype=int) return pred_labels def _relabel(self, labels: np.ndarray, level: Optional[int] = None) -> np.ndarray: # re-number "labels" so that each cluster at each level recieves # a unique label = index of corresponding node in overall dendrogram # assign each cluster a new label based on its index in the dendrogram all_clusters = [node for node in LevelOrderIter(self)][1:] for indx in range(len(all_clusters)): all_clusters[indx]._label = indx new_labels = labels.copy() for lvl in range(1, self.height): uni_paths, path_inds = np.unique( labels[:, : lvl + 1], axis=0, return_index=True ) for p in range(len(uni_paths)): label_inds = (labels[:, : lvl + 1] == uni_paths[p]).all(1) current_path = labels[path_inds[p], : lvl + 1] cluster = self.root current_lvl = 0 # find the cluster corresponding to "current_path" while current_lvl <= lvl: if not cluster.is_leaf: cluster = cluster.children[current_path[current_lvl]] current_lvl += 1 else: break new_labels[label_inds, lvl] = cluster._label # stored to do relabeling for non-fitted data self._labels = labels self._new_labels = new_labels if level is not None: new_labels = new_labels[:, level - 1] return new_labels