# Copyright (c) Microsoft Corporation and contributors.
# Licensed under the MIT License.
from typing import Any, Callable, NamedTuple, Optional, Union
import numpy as np
from hyppo.ksample import KSample
from joblib import Parallel, delayed
from scipy import stats
from sklearn.metrics.pairwise import PAIRED_DISTANCES, PAIRWISE_KERNEL_FUNCTIONS
from sklearn.utils import check_array, check_random_state
from typing_extensions import Literal
from graspologic.types import Dict, Tuple
from ..align import SeedlessProcrustes, SignFlips
from ..embed import AdjacencySpectralEmbed, select_dimension
from ..types import AdjacencyMatrix, GraphRepresentation
from ..utils import fit_plug_in_variance_estimator, import_graph
_VALID_DISTANCES = list(PAIRED_DISTANCES.keys())
_VALID_KERNELS = list(PAIRWISE_KERNEL_FUNCTIONS.keys())
_VALID_KERNELS.append("gaussian") # can use hyppo's medial gaussian kernel too
_VALID_METRICS = _VALID_DISTANCES + _VALID_KERNELS
_VALID_TESTS = ["cca", "dcorr", "hhg", "rv", "hsic", "mgc"]
LdtTestType = Literal["cca", "dcorr", "hhg", "rv", "hsic", "mgc"]
class ldt_result(NamedTuple):
stat: float
pvalue: float
misc_dict: Dict[str, Any]
[docs]
def latent_distribution_test(
A1: GraphRepresentation,
A2: GraphRepresentation,
test: LdtTestType = "dcorr",
metric: Union[str, Callable] = "euclidean",
n_components: Optional[int] = None,
n_bootstraps: int = 500,
random_state: Optional[
Union[int, np.random.RandomState, np.random.Generator]
] = None,
workers: Optional[int] = None,
size_correction: bool = True,
pooled: bool = False,
align_type: Optional[Literal["sign_flips", "seedless_procrustes"]] = "sign_flips",
align_kws: Dict[str, Any] = {},
input_graph: bool = True,
) -> ldt_result:
"""Two-sample hypothesis test for the problem of determining whether two random
dot product graphs have the same distributions of latent positions.
This test can operate on two graphs where there is no known matching
between the vertices of the two graphs, or even when the number of vertices
is different. Currently, testing is only supported for undirected graphs.
Read more in the `Latent Distribution Two-Graph Testing Tutorial
<https://microsoft.github.io/graspologic/tutorials/inference/latent_distribution_test.html>`_
Parameters
----------
A1, A2 : variable (see description of 'input_graph')
The two graphs, or their embeddings to run a hypothesis test on.
Expected variable type and shape depends on input_graph attribute
test : str (default="dcorr")
Backend hypothesis test to use, one of ["cca", "dcorr", "hhg", "rv", "hsic", "mgc"].
These tests are typically used for independence testing, but here they
are used for a two-sample hypothesis test on the latent positions of
two graphs. See :class:`hyppo.ksample.KSample` for more information.
metric : str or function (default="euclidean")
Distance or a kernel metric to use, either a callable or a valid string.
Kernel metrics (e.g. "gaussian") must be used with kernel-based HSIC test
and distances (e.g. "euclidean") with all other tests. If a callable,
then it should behave similarly to either
:func:`sklearn.metrics.pairwise_distances` or to
:func:`sklearn.metrics.pairwise.pairwise_kernels`.
Valid strings for distance ``metric`` are, as defined in
:func:`sklearn.metrics.pairwise_distances`,
- From scikit-learn: [``"euclidean"``, ``"cityblock"``, ``"cosine"``,
``"l1"``, ``"l2"``, ``"manhattan"``].
- From scipy.spatial.distance: [``"braycurtis"``, ``"canberra"``,
``"chebyshev"``, ``"correlation"``, ``"dice"``, ``"hamming"``,
``"jaccard"``, ``"kulsinski"``, ``"mahalanobis"``, ``"minkowski"``,
``"rogerstanimoto"``, ``"russellrao"``, ``"seuclidean"``,
``"sokalmichener"``, ``"sokalsneath"``, ``"sqeuclidean"``,
``"yule"``] See the documentation for :mod:`scipy.spatial.distance` for
details on these metrics.
Valid strings for kernel ``metric`` are, as defined in
:func:`sklearn.metrics.pairwise.pairwise_kernels`,
[``"additive_chi2"``, ``"chi2"``, ``"linear"``, ``"poly"``,
``"polynomial"``, ``"rbf"``,
``"laplacian"``, ``"sigmoid"``, ``"cosine"``]
Note ``"rbf"`` and ``"gaussian"`` are the same metric, which will use
an adaptively selected bandwidth.
n_components : int or None (default=None)
Number of embedding dimensions. If None, the optimal embedding
dimensions are found by the Zhu and Godsi algorithm.
See :func:`~graspologic.embed.select_svd` for more information.
This argument is ignored if ``input_graph`` is False.
n_bootstraps : int (default=200)
Number of bootstrap iterations for the backend hypothesis test.
See :class:`hyppo.ksample.KSample` for more information.
random_state : {None, int, `~np.random.RandomState`, `~np.random.Generator`}
This parameter defines the object to use for drawing random
variates.
If `random_state` is ``None`` the `~np.random.RandomState` singleton is
used.
If `random_state` is an int, a new ``RandomState`` instance is used,
seeded with `random_state`.
If `random_state` is already a ``RandomState`` or ``Generator``
instance, then that object is used.
Default is None.
workers : int or None (default=None)
Number of workers to use. If more than 1, parallelizes the code.
Supply -1 to use all cores available. None is a marker for
'unset' that will be interpreted as ``workers=1`` (sequential execution) unless
the call is performed under a Joblib parallel_backend context manager that sets
another value for ``workers``. See :class:joblib.Parallel for more details.
size_correction : bool (default=True)
Ignored when the two graphs have the same number of vertices. The test
degrades in validity as the number of vertices of the two graphs
diverge from each other, unless a correction is performed.
- True
Whenever the two graphs have different numbers of vertices,
estimates the plug-in estimator for the variance and uses it to
correct the embedding of the larger graph.
- False
Does not perform any modifications (not recommended).
pooled : bool (default=False)
Ignored whenever the two graphs have the same number of vertices or
``size_correction`` is set to False. In order to correct the adjacency
spectral embedding used in the test, it is needed to estimate the
variance for each of the latent position estimates in the larger graph,
which requires to compute different sample moments. These moments can
be computed either over the larger graph (False), or over both graphs
(True). Setting it to True should not affect the behavior of the test
under the null hypothesis, but it is not clear whether it has more
power or less power under which alternatives. Generally not recomended,
as it is untested and included for experimental purposes.
align_type : str, {'sign_flips' (default), 'seedless_procrustes'} or None
Random dot product graphs have an inherent non-identifiability,
associated with their latent positions. Thus, two embeddings of
different graphs may not be orthogonally aligned. Without this accounted
for, two embeddings of different graphs may appear different, even
if the distributions of the true latent positions are the same.
There are several options in terms of how this can be addresssed:
- 'sign_flips'
A simple heuristic that flips the signs of one of the embeddings,
if the medians of the two embeddings in that dimension differ from
each other. See :class:`graspologic.align.SignFlips` for more
information on this procedure. In the limit, this is guaranteed to
lead to a valid test, as long as matrix :math:`X^T X`, where
:math:`X` is the latent positions does not have repeated non-zero
eigenvalues. This may, however, result in an invalid test in the
finite sample case if the some eigenvalues are same or close.
- 'seedless_procrustes'
An algorithm that learns an orthogonal alignment matrix. This
procedure is slower than sign flips, but is guaranteed to yield a
valid test in the limit, and also makes the test more valid in some
finite sample cases, in which the eigenvalues are very close to
each other. See :class:`graspologic.align.SignFlips` for more information
on the procedure.
- None
Do not use any alignment technique. This is strongly not
recommended, as it may often result in a test that is not valid.
align_kws : dict
Keyword arguments for the aligner of choice, either
:class:`graspologic.align.SignFlips` or
:class:`graspologic.align.SeedlessProcrustes`, depending on the ``align_type``.
See respective classes for more information.
input_graph : bool (default=True)
Flag whether to expect two full graphs, or the embeddings.
- True
This function expects graphs, either as NetworkX graph objects
or as adjacency matrices, provided as ndarrays of size (n, n) and
(m, m). They will be embedded using adjacency spectral embeddings.
- False
This function expects adjacency spectral embeddings of the graphs,
they must be ndarrays of size (n, d) and (m, d), where
d must be same. n_components attribute is ignored in this case.
Returns
-------
stat : float
The observed difference between the embedded latent positions of the
two input graphs.
pvalue : float
The overall p value from the test.
misc_dict : dictionary
A collection of other statistics obtained from the latent position test
- null_distribution : ndarray, shape (n_bootstraps,)
The distribution of T statistics generated under the null.
- n_components : int
Number of embedding dimensions.
- Q : array, size (d, d)
Final orthogonal matrix, used to modify ``X``.
References
----------
.. [1] Tang, M., Athreya, A., Sussman, D. L., Lyzinski, V., & Priebe, C. E. (2017).
"A nonparametric two-sample hypothesis testing problem for random graphs."
Bernoulli, 23(3), 1599-1630.
.. [2] Panda, S., Palaniappan, S., Xiong, J., Bridgeford, E., Mehta, R., Shen, C., & Vogelstein, J. (2019).
"hyppo: A Comprehensive Multivariate Hypothesis Testing Python Package."
arXiv:1907.02088.
.. [3] Alyakin, A. A., Agterberg, J., Helm, H. S., Priebe, C. E. (2020).
"Correcting a Nonparametric Two-sample Graph Hypothesis Test for Graphs with Different Numbers of Vertices"
arXiv:2008.09434
"""
# check test argument
if not isinstance(test, str):
msg = "test must be a str, not {}".format(type(test))
raise TypeError(msg)
elif test not in _VALID_TESTS:
msg = "Unknown test {}. Valid tests are {}".format(test, _VALID_TESTS)
raise ValueError(msg)
# check metric argument
if not isinstance(metric, str) and not callable(metric):
msg = "Metric must be str or callable, not {}".format(type(metric))
raise TypeError(msg)
elif metric not in _VALID_METRICS and not callable(metric):
msg = "Unknown metric {}. Valid metrics are {}, or a callable".format(
metric, _VALID_METRICS
)
raise ValueError(msg)
if metric in _VALID_DISTANCES:
if test == "hsic":
msg = (
f"{test} is a kernel-based test, but {metric} "
"is a distance. Use a different test or one of "
f"the kernels: {_VALID_KERNELS} as a metric."
)
raise ValueError(msg)
elif metric in _VALID_KERNELS:
if test != "hsic":
msg = (
f"{test} is a distance-based test, but {metric} "
"is a kernel. Use either a HSIC as the test or one of "
f"the distances: {_VALID_DISTANCES} as a metric."
)
raise ValueError(msg)
# check n_components argument
if n_components is not None:
if not isinstance(n_components, int):
msg = "n_components must be an int, not {}.".format(type(n_components))
raise TypeError(msg)
# check n_bootstraps argument
if not isinstance(n_bootstraps, int):
msg = "n_bootstraps must be an int, not {}".format(type(n_bootstraps))
raise TypeError(msg)
elif n_bootstraps < 0:
msg = "{} is invalid number of bootstraps, must be non-negative"
raise ValueError(msg.format(n_bootstraps))
# check workers argument
if workers is not None and not isinstance(workers, (int, np.integer)):
msg = "workers must be an int or None, not {}".format(type(workers))
raise TypeError(msg)
# check size_correction argument
if not isinstance(size_correction, bool):
msg = "size_correction must be a bool, not {}".format(type(size_correction))
raise TypeError(msg)
# check pooled argument
if not isinstance(pooled, bool):
msg = "pooled must be a bool, not {}".format(type(pooled))
raise TypeError(msg)
# check align_type argument
if (not isinstance(align_type, str)) and (align_type is not None):
msg = "align_type must be a string or None, not {}".format(type(align_type))
raise TypeError(msg)
align_types_supported = ["sign_flips", "seedless_procrustes", None]
if align_type not in align_types_supported:
msg = "supported align types are {}".format(align_types_supported)
raise ValueError(msg)
# check align_kws argument
if not isinstance(align_kws, dict):
msg = "align_kws must be a dictionary of keyword arguments, not {}".format(
type(align_kws)
)
raise TypeError(msg)
# check input_graph argument
if not isinstance(input_graph, bool):
msg = "input_graph must be a bool, not {}".format(type(input_graph))
raise TypeError(msg)
if input_graph:
A1 = import_graph(A1)
A2 = import_graph(A2)
X1_hat, X2_hat = _embed(A1, A2, n_components)
else:
# check for nx objects, since they are castable to arrays,
# but we don't want that
if not isinstance(A1, np.ndarray):
msg = (
f"Embedding of the first graph is of type {type(A1)}, not "
"np.ndarray. If input_graph is False, the inputs need to be "
"adjacency spectral embeddings, with shapes (n, d) and "
"(m, d), passed as np.ndarrays."
)
raise TypeError(msg)
if not isinstance(A2, np.ndarray):
msg = (
f"Embedding of the second graph is of type {type(A2)}, not an "
"array. If input_graph is False, the inputs need to be "
"adjacency spectral embeddings, with shapes (n, d) and "
"(m, d), passed as np.ndarrays."
)
raise TypeError(msg)
if A1.ndim != 2:
msg = (
"Embedding array of the first graph does not have two dimensions. "
"If input_graph is False, the inputs need to be adjacency "
"spectral embeddings, with shapes (n, d) and (m, d)"
)
raise ValueError(msg)
if A2.ndim != 2:
msg = (
"Embedding array of the second graph does not have two dimensions. "
"If input_graph is False, the inputs need to be adjacency "
"spectral embeddings, with shapes (n, d) and (m, d)"
)
raise ValueError(msg)
if A1.shape[1] != A2.shape[1]:
msg = (
"Two input embeddings have different number of components. "
"If input_graph is False, the inputs need to be adjacency "
"spectral embeddings, with shapes (n, d) and (m, d)"
)
raise ValueError(msg)
# checking for inf values
X1_hat = check_array(A1)
X2_hat = check_array(A2)
if align_type == "sign_flips":
aligner = SignFlips(**align_kws)
X1_hat = aligner.fit_transform(X1_hat, X2_hat)
Q = aligner.Q_
elif align_type == "seedless_procrustes":
aligner = SeedlessProcrustes(**align_kws)
X1_hat = aligner.fit_transform(X1_hat, X2_hat)
Q = aligner.Q_
else:
Q = np.identity(X1_hat.shape[0])
if size_correction:
X1_hat, X2_hat = _sample_modified_ase(
X1_hat, X2_hat, workers=workers, random_state=random_state, pooled=pooled
)
test_obj = KSample(test, compute_distkern=metric)
data = test_obj.test(
X1_hat,
X2_hat,
reps=n_bootstraps,
workers=workers,
auto=False,
random_state=random_state,
)
null_distribution = test_obj.indep_test.null_dist
misc_dict = {
"null_distribution": null_distribution,
"n_components": n_components,
"Q": Q,
}
stat = data[0]
pvalue = data[1]
return ldt_result(stat, pvalue, misc_dict)
def _embed(
A1: AdjacencyMatrix, A2: AdjacencyMatrix, n_components: Optional[int]
) -> Tuple[np.ndarray, np.ndarray]:
if n_components is None:
num_dims1 = select_dimension(A1)[0][-1]
num_dims2 = select_dimension(A2)[0][-1]
n_components = max(num_dims1, num_dims2)
ase = AdjacencySpectralEmbed(n_components=n_components)
X1_hat = ase.fit_transform(A1)
X2_hat = ase.fit_transform(A2)
if isinstance(X1_hat, tuple) and isinstance(X2_hat, tuple):
X1_hat = np.concatenate(X1_hat, axis=-1)
X2_hat = np.concatenate(X2_hat, axis=-1)
elif isinstance(X1_hat, tuple) ^ isinstance(X2_hat, tuple):
msg = (
"input graphs do not have same directedness. "
"consider symmetrizing the directed graph."
)
raise ValueError(msg)
return X1_hat, X2_hat # type: ignore
def _sample_modified_ase(X, Y, workers, random_state, pooled=False): # type: ignore
N, M = len(X), len(Y)
# return if graphs are same order, else ensure X the larger graph.
if N == M:
return X, Y
elif M > N:
reverse_order = True
X, Y = Y, X
N, M = M, N
else:
reverse_order = False
# estimate the central limit theorem variance
if pooled:
two_samples = np.concatenate([X, Y], axis=0)
get_sigma = fit_plug_in_variance_estimator(two_samples)
else:
get_sigma = fit_plug_in_variance_estimator(X)
X_sigmas = get_sigma(X) * (N - M) / (N * M)
# increase the variance of X by sampling from the asy dist
X_sampled = np.zeros(X.shape)
if random_state is not None:
rng = check_random_state(random_state)
random_state = rng.randint(np.iinfo(np.int32).max, size=N)
else:
random_state = np.random.randint(np.iinfo(np.int32).max, size=N)
X_sampled = np.asarray(
Parallel(n_jobs=workers)(
delayed(add_variance)(X[i, :], X_sigmas[i], r)
for i, r in zip(range(N), random_state)
)
)
# return the embeddings in the appropriate order
return (Y, X_sampled) if reverse_order else (X_sampled, Y)
def add_variance(X_orig, X_sigma, random_state): # type: ignore
rng = check_random_state(random_state)
return rng.multivariate_normal(mean=X_orig, cov=X_sigma)