Source code for graspologic.inference.group_connection_test

import warnings
from collections import namedtuple
from typing import Union

import numpy as np
import pandas as pd
from beartype import beartype
from scipy.stats import combine_pvalues
from statsmodels.stats.multitest import multipletests

from ..types import AdjacencyMatrix, List
from ..utils import import_graph, is_loopless, is_symmetric, is_unweighted, remove_loops
from .binomial import BinomialTestMethod, binom_2samp
from .utils import compute_density

Labels = Union[np.ndarray, List]

SBMResult = namedtuple(
    "SBMResult", ["probabilities", "observed", "possible", "group_counts"]
)

GroupTestResult = namedtuple("GroupTestResult", ["stat", "pvalue", "misc"])


def fit_sbm(A: AdjacencyMatrix, labels: Labels, loops: bool = False) -> SBMResult:
    """
    Fits a stochastic block model to data for a given network with known group
    identities. Required inputs are the adjacency matrix for the
    network and the group label for each node of the network. The number of labels must
    equal the number of nodes of the network.

    For each possible group-to-group connection, e.g. group 1-to-group 1,
    group 1-to-group 2, etc., this function computes the total number
    of possible edges between the groups, the actual number of edges connecting the
    groups, and the estimated probability of an edge
    connecting each pair of groups (i.e. B_hat). The function also calculates and
    returns the total number of nodes corresponding to each
    provided label.

    Parameters
    ----------
    A: np.array, int shape(n1,n1)
        The adjacency matrix for the network at issue. Entries are either 1
        (edge present) or 0 (edge absent). This is a square matrix with
        side length equal to the number of nodes in the network.

    labels: array-like, int shape(n1,1)
        The group labels corresponding to each node in the network. This is a
        one-dimensional array with a number of entries equal to the
        number of nodes in the network.

    loops: boolean
        This parameter instructs the function to either include or exclude self-loops
        (i.e. connections between a node and itself) when
        fitting the SBM model. This parameter is optional; default is false, meaning
        self-loops will be excluded.

    Returns
    -------
    SBMResult: namedtuple
        This function returns a namedtuple with four key/value pairs:
            ("probabilities",B_hat):
                B_hat: array-like, float shape((number of unique labels)^2,1)
                    This variable stores the computed edge probabilities for all
                    possible group-to-group connections, computed as the ratio
                    of number of actual edges to number of possible edges.
            ("observed",n_observed):
                n_observed: dataframe
                    This variable stores the number of observed edges for each
                    group-to-group connection. Data is indexed as the number
                    of edges between each source group and each target group.
            ("possibe",n_possible):
                n_possible: dataframe
                    This variable stores the total number of possible edges for each
                    group-to-group connection. Indexing is identical to
                    the above. Network density for each group-to-group connection can
                    easily be determined as n_observed/n_possible.
            ("group_counts",counts_labels):
                counts_labels: pd.series
                    This variable stores the number of nodes belonging to each group
                    label.

    """

    if not loops:
        A = remove_loops(A)

    n = A.shape[0]

    node_to_comm_map = dict(zip(np.arange(n), labels))

    # map edges to their incident communities
    source_inds, target_inds = np.nonzero(A)
    comm_mapper = np.vectorize(node_to_comm_map.get)
    source_comm = comm_mapper(source_inds)
    target_comm = comm_mapper(target_inds)

    # get the total number of possible edges for each community -> community cell
    unique_labels, counts_labels = np.unique(labels, return_counts=True)
    K = len(unique_labels)

    n_observed = (
        pd.crosstab(
            source_comm,
            target_comm,
            dropna=False,
            rownames=["source"],
            colnames=["target"],
        )
        .reindex(index=unique_labels, columns=unique_labels)
        .fillna(0.0)
    )

    num_possible = np.outer(counts_labels, counts_labels)

    if not loops:
        # then there would have been n fewer possible edges
        num_possible[np.arange(K), np.arange(K)] = (
            num_possible[np.arange(K), np.arange(K)] - counts_labels
        )

    n_possible = pd.DataFrame(
        data=num_possible, index=unique_labels, columns=unique_labels
    )
    n_possible.index.name = "source"
    n_possible.columns.name = "target"

    B_hat = np.divide(n_observed, n_possible)

    counts_labels = pd.Series(index=unique_labels, data=counts_labels)

    return SBMResult(B_hat, n_observed, n_possible, counts_labels)


def _make_adjacency_dataframe(data: AdjacencyMatrix, index: Labels) -> pd.DataFrame:
    """
    Helper function to convert data with a given index into a dataframe data structure.
    """
    df = pd.DataFrame(data=data, index=index.copy(), columns=index.copy())
    df.index.name = "source"
    df.columns.name = "target"
    return df


[docs] @beartype def group_connection_test( A1: AdjacencyMatrix, A2: AdjacencyMatrix, labels1: Labels, labels2: Labels, density_adjustment: Union[bool, float] = False, method: BinomialTestMethod = "score", combine_method: str = "tippett", correct_method: str = "bonferroni", alpha: float = 0.05, ) -> GroupTestResult: r""" Compares two networks by testing whether edge probabilities between groups are significantly different for the two networks under a stochastic block model assumption. This function requires the group labels in both networks to be known and to have the same categories; although the exact number of nodes belonging to each group does not need to be identical. Note that using group labels inferred from the data may yield an invalid test. This function also permits the user to test whether one network's group connection probabilities are a constant multiple of the other's (see ``density_adjustment`` parameter). Parameters ---------- A1: np.array, shape(n1,n1) The adjacency matrix for network 1. Will be treated as a binary network, regardless of whether it was weighted. A2 np.array, shape(n2,n2) The adjacency matrix for network 2. Will be treated as a binary network, regardless of whether it was weighted. labels1: array-like, shape (n1,) The group labels for each node in network 1. labels2: array-like, shape (n2,) The group labels for each node in network 2. density_adjustment: boolean, optional Whether to perform a density adjustment procedure. If ``True``, will test the null hypothesis that the group-to-group connection probabilities of one network are a constant multiple of those of the other network. Otherwise, no density adjustment will be performed. method: str, optional Specifies the statistical test to be performed to compare each of the group-to-group connection probabilities. By default, this performs the score test (essentially equivalent to chi-squared test when ``density_adjustment=False``), but the user may also enter "chi2" to perform the chi-squared test, or "fisher" for Fisher's exact test. combine_method: str, optional Specifies the method for combining p-values (see Notes and [1]_ for more details). Default is "tippett" for Tippett's method (recommended), but the user can also enter any other method supported by :func:`scipy.stats.combine_pvalues`. correct_method: str, optional Specifies the method for correcting for multiple comparisons. Default value is "holm" to use the Holm-Bonferroni correction method, but many others are possible (see :func:`statsmodels.stats.multitest.multipletests` for more details and options). alpha: float, optional The significance threshold. By default, this is the conventional value of 0.05 but any value on the interval :math:`[0,1]` can be entered. This only affects the results in ``misc['rejections']``. Returns ------- GroupTestResult: namedtuple A tuple containing the following data: stat: float The statistic computed by the method chosen for combining p-values (see ``combine_method``). pvalue: float The p-value for the overall network-to-network comparison using under a stochastic block model assumption. Note that this is the p-value for the comparison of the entire group-to-group connection matrices (i.e., :math:`B_1` and :math:`B_2`). misc: dict A dictionary containing a number of statistics relating to the individual group-to-group connection comparisons. "uncorrected_pvalues", pd.DataFrame The p-values for each group-to-group connection comparison, before correction for multiple comparisons. "stats", pd.DataFrame The test statistics for each of the group-to-group comparisons, depending on ``method``. "probabilities1", pd.DataFrame This contains the B_hat values computed in fit_sbm above for network 1, i.e. the hypothesized group connection density for each group-to-group connection for network 1. "probabilities2", pd.DataFrame Same as above, but for network 2. "observed1", pd.DataFrame The total number of observed group-to-group edge connections for network 1. "observed2", pd.DataFrame Same as above, but for network 2. "possible1", pd.DataFrame The total number of possible edges for each group-to-group pair in network 1. "possible2", pd.DataFrame Same as above, but for network 2. "group_counts1", pd.Series Contains total number of nodes corresponding to each group label for network 1. "group_counts2", pd.Series Same as above, for network 2 "null_ratio", float If the "density adjustment" parameter is set to "true", this variable contains the null hypothesis for the quotient of odds ratios for the group-to-group connection densities for the two networks. In other words, it contains the hypothesized factor by which network 1 is "more dense" or "less dense" than network 2. If "density adjustment" is set to "false", this simply returns a value of 1.0. "n_tests", int This variable contains the number of group-to-group comparisons performed by the function. "rejections", pd.DataFrame Contains a square matrix of boolean variables. The side length of the matrix is equal to the number of distinct group labels. An entry in the matrix is "true" if the null hypothesis, i.e. that the group-to-group connection density corresponding to the row and column of the matrix is equal for both networks (with or without a density adjustment factor), is rejected. In simpler terms, an entry is only "true" if the group-to-group density is statistically different between the two networks for the connection from the group corresponding to the row of the matrix to the group corresponding to the column of the matrix. "corrected_pvalues", pd.DataFrame Contains the p-values for the group-to-group connection densities after correction using the chosen correction_method. Notes ----- Under a stochastic block model assumption, the probability of observing an edge from any node in group :math:`i` to any node in group :math:`j` is given by :math:`B_{ij}`, where :math:`B` is a :math:`K \times K` matrix of connection probabilities if there are :math:`K` groups. This test assumes that both networks came from a stochastic block model with the same number of groups, and a fixed assignment of nodes to groups. The null hypothesis is that the group-to-group connection probabilities are the same .. math:: H_0: B_1 = B_2 The alternative hypothesis is that they are not the same .. math:: H_A: B_1 \neq B_2 Note that this alternative includes the case where even just one of these group-to-group connection probabilities are different between the two networks. The test is conducted by first comparing each group-to-group connection via its own test, i.e., .. math:: H_0: {B_{1}}_{ij} = {B_{2}}_{ij} .. math:: H_A: {B_{1}}_{ij} \neq {B_{2}}_{ij} The p-values for each of these individual comparisons are stored in ``misc['uncorrected_pvalues']``, and after multiple comparisons correction, in ``misc['corrected_pvalues']``. The test statistic and p-value returned by this test are for the overall comparison of the entire group-to-group connection matrices. These are computed by appropriately combining the p-values for each of the individual comparisons. For more details, see [1]_. When ``density_adjustment`` is set to ``True``, the null hypothesis is adjusted to account for the fact that the group-to-group connection densities may be different only up to a multiplicative factor which sets the densities of the two networks the same in expectation. In other words, the null and alternative hypotheses are adjusted to be .. math:: H_0: B_1 = c B_2 .. math:: H_A: B_1 \neq c B_2 where :math:`c` is a constant which sets the densities of the two networks the same. Note that in cases where one of the networks has no edges in a particular group-to-group connection, it is nonsensical to run a statistical test for that particular connection. In these cases, the p-values for that individual comparison are set to ``np.nan``, and that test is not included in the overall test statistic or multiple comparison correction. This test makes several assumptions about the data and test (which could easily be loosened in future versions): We assume that the networks are directed. If the networks are undirected (and the adjacency matrices are thus symmetric), then edges would be counted twice, which would lead to an incorrect calculation of the edge probability. We believe passing in the upper or lower triangle of the adjacency matrix would solve this, but this has not been tested. We assume that the networks are loopless, that is we do not consider the probability of an edge existing between a node and itself. This can be weakened and made an option in future versions. We only implement the alternative hypothesis of "not equals" (two-sided); future versions could implement the one-sided alternative hypotheses. References ---------- .. [1] Pedigo, B.D., Powell, M., Bridgeford, E.W., Winding, M., Priebe, C.E., Vogelstein, J.T. "Generative network modeling reveals quantitative definitions of bilateral symmetry exhibited by a whole insect brain connectome," eLife (2023): e83739. """ A1 = import_graph(A1) A2 = import_graph(A2) if is_symmetric(A1) or is_symmetric(A2): msg = ( "This test assumes that the networks are directed, " "but one or both adjacency matrices are symmetric." ) warnings.warn(msg) if (not is_unweighted(A1)) or (not is_unweighted(A2)): msg = ( "This test assumes that the networks are unweighted, " "but one or both adjacency matrices are weighted." "Test will be run on the binarized version of these adjacency matrices." ) warnings.warn(msg) if (not is_loopless(A1)) or (not is_loopless(A2)): msg = ( "This test assumes that the networks are loopless, " "but one or both adjacency matrices have self-loops." "Test will be run on the loopless version of these adjacency matrices." ) warnings.warn(msg) B1, n_observed1, n_possible1, group_counts1 = fit_sbm(A1, labels1) B2, n_observed2, n_possible2, group_counts2 = fit_sbm(A2, labels2) if not n_observed1.index.equals(n_observed2.index): raise ValueError elif not n_observed1.columns.equals(n_observed2.columns): raise ValueError elif not n_possible1.index.equals(n_possible2.index): raise ValueError elif not n_observed1.columns.equals(n_observed2.columns): raise ValueError index = n_observed1.index.copy() if n_observed1.shape[0] != n_observed2.shape[0]: raise ValueError K = n_observed1.shape[0] uncorrected_pvalues_temp = np.empty( (K, K), dtype=float ) # had to make a new variable to keep mypy happy uncorrected_pvalues = _make_adjacency_dataframe(uncorrected_pvalues_temp, index) stats_temp = np.empty((K, K), dtype=float) stats = _make_adjacency_dataframe(stats_temp, index) if density_adjustment != False: # cause could be float if density_adjustment == True: density1 = compute_density(A1) density2 = compute_density(A2) adjustment_factor = density1 / density2 else: adjustment_factor = density_adjustment else: adjustment_factor = 1.0 for i in index: for j in index: curr_stat, curr_pvalue = binom_2samp( n_observed1.loc[i, j], n_possible1.loc[i, j], n_observed2.loc[i, j], n_possible2.loc[i, j], method=method, null_ratio=adjustment_factor, ) uncorrected_pvalues.loc[i, j] = curr_pvalue stats.loc[i, j] = curr_stat misc = {} misc["uncorrected_pvalues"] = uncorrected_pvalues misc["stats"] = stats misc["probabilities1"] = B1 misc["probabilities2"] = B2 misc["observed1"] = n_observed1 misc["observed2"] = n_observed2 misc["possible1"] = n_possible1 misc["possible2"] = n_possible2 misc["group_counts1"] = group_counts1 misc["group_counts2"] = group_counts2 misc["null_ratio"] = adjustment_factor run_pvalues = uncorrected_pvalues.values indices = np.nonzero(~np.isnan(run_pvalues)) run_pvalues = run_pvalues[indices] n_tests = len(run_pvalues) misc["n_tests"] = n_tests # correct for multiple comparisons rejections_flat, corrected_pvalues_flat, _, _ = multipletests( run_pvalues, alpha, method=correct_method, is_sorted=False, returnsorted=False, ) rejections = np.full((K, K), False, dtype=bool) rejections[indices] = rejections_flat rejections = _make_adjacency_dataframe(rejections, index) misc["rejections"] = rejections corrected_pvalues = np.full((K, K), np.nan, dtype=float) corrected_pvalues[indices] = corrected_pvalues_flat corrected_pvalues = _make_adjacency_dataframe(corrected_pvalues, index) misc["corrected_pvalues"] = corrected_pvalues # combine p-values (on the UNcorrected p-values) if run_pvalues.min() == 0.0: # TODO consider raising a new warning here stat = np.inf pvalue = 0.0 else: stat, pvalue = combine_pvalues(run_pvalues, method=combine_method) return GroupTestResult(stat, pvalue, misc)