first commit

This commit is contained in:
Carla Floricel
2022-08-02 09:52:52 -04:00
parent 417ea8660b
commit 05e52aa52b
10444 changed files with 2300232 additions and 0 deletions

View File

@@ -0,0 +1,293 @@
from itertools import product
import numpy as np
import math
from numpy.testing import (
assert_almost_equal,
assert_array_almost_equal,
assert_array_equal,
)
import pytest
from sklearn import datasets
from sklearn import manifold
from sklearn import neighbors
from sklearn import pipeline
from sklearn import preprocessing
from sklearn.datasets import make_blobs
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.utils._testing import assert_allclose, assert_allclose_dense_sparse
from scipy.sparse import rand as sparse_rand
eigen_solvers = ["auto", "dense", "arpack"]
path_methods = ["auto", "FW", "D"]
def create_sample_data(n_pts=25, add_noise=False):
# grid of equidistant points in 2D, n_components = n_dim
n_per_side = int(math.sqrt(n_pts))
X = np.array(list(product(range(n_per_side), repeat=2)))
if add_noise:
# add noise in a third dimension
rng = np.random.RandomState(0)
noise = 0.1 * rng.randn(n_pts, 1)
X = np.concatenate((X, noise), 1)
return X
@pytest.mark.parametrize("n_neighbors, radius", [(24, None), (None, np.inf)])
def test_isomap_simple_grid(n_neighbors, radius):
# Isomap should preserve distances when all neighbors are used
n_pts = 25
X = create_sample_data(n_pts=n_pts, add_noise=False)
# distances from each point to all others
if n_neighbors is not None:
G = neighbors.kneighbors_graph(X, n_neighbors, mode="distance")
else:
G = neighbors.radius_neighbors_graph(X, radius, mode="distance")
for eigen_solver in eigen_solvers:
for path_method in path_methods:
clf = manifold.Isomap(
n_neighbors=n_neighbors,
radius=radius,
n_components=2,
eigen_solver=eigen_solver,
path_method=path_method,
)
clf.fit(X)
if n_neighbors is not None:
G_iso = neighbors.kneighbors_graph(
clf.embedding_, n_neighbors, mode="distance"
)
else:
G_iso = neighbors.radius_neighbors_graph(
clf.embedding_, radius, mode="distance"
)
assert_allclose_dense_sparse(G, G_iso)
@pytest.mark.parametrize("n_neighbors, radius", [(24, None), (None, np.inf)])
def test_isomap_reconstruction_error(n_neighbors, radius):
# Same setup as in test_isomap_simple_grid, with an added dimension
n_pts = 25
X = create_sample_data(n_pts=n_pts, add_noise=True)
# compute input kernel
if n_neighbors is not None:
G = neighbors.kneighbors_graph(X, n_neighbors, mode="distance").toarray()
else:
G = neighbors.radius_neighbors_graph(X, radius, mode="distance").toarray()
centerer = preprocessing.KernelCenterer()
K = centerer.fit_transform(-0.5 * G**2)
for eigen_solver in eigen_solvers:
for path_method in path_methods:
clf = manifold.Isomap(
n_neighbors=n_neighbors,
radius=radius,
n_components=2,
eigen_solver=eigen_solver,
path_method=path_method,
)
clf.fit(X)
# compute output kernel
if n_neighbors is not None:
G_iso = neighbors.kneighbors_graph(
clf.embedding_, n_neighbors, mode="distance"
)
else:
G_iso = neighbors.radius_neighbors_graph(
clf.embedding_, radius, mode="distance"
)
G_iso = G_iso.toarray()
K_iso = centerer.fit_transform(-0.5 * G_iso**2)
# make sure error agrees
reconstruction_error = np.linalg.norm(K - K_iso) / n_pts
assert_almost_equal(reconstruction_error, clf.reconstruction_error())
@pytest.mark.parametrize("n_neighbors, radius", [(2, None), (None, 0.5)])
def test_transform(n_neighbors, radius):
n_samples = 200
n_components = 10
noise_scale = 0.01
# Create S-curve dataset
X, y = datasets.make_s_curve(n_samples, random_state=0)
# Compute isomap embedding
iso = manifold.Isomap(
n_components=n_components, n_neighbors=n_neighbors, radius=radius
)
X_iso = iso.fit_transform(X)
# Re-embed a noisy version of the points
rng = np.random.RandomState(0)
noise = noise_scale * rng.randn(*X.shape)
X_iso2 = iso.transform(X + noise)
# Make sure the rms error on re-embedding is comparable to noise_scale
assert np.sqrt(np.mean((X_iso - X_iso2) ** 2)) < 2 * noise_scale
@pytest.mark.parametrize("n_neighbors, radius", [(2, None), (None, 10.0)])
def test_pipeline(n_neighbors, radius):
# check that Isomap works fine as a transformer in a Pipeline
# only checks that no error is raised.
# TODO check that it actually does something useful
X, y = datasets.make_blobs(random_state=0)
clf = pipeline.Pipeline(
[
("isomap", manifold.Isomap(n_neighbors=n_neighbors, radius=radius)),
("clf", neighbors.KNeighborsClassifier()),
]
)
clf.fit(X, y)
assert 0.9 < clf.score(X, y)
def test_pipeline_with_nearest_neighbors_transformer():
# Test chaining NearestNeighborsTransformer and Isomap with
# neighbors_algorithm='precomputed'
algorithm = "auto"
n_neighbors = 10
X, _ = datasets.make_blobs(random_state=0)
X2, _ = datasets.make_blobs(random_state=1)
# compare the chained version and the compact version
est_chain = pipeline.make_pipeline(
neighbors.KNeighborsTransformer(
n_neighbors=n_neighbors, algorithm=algorithm, mode="distance"
),
manifold.Isomap(n_neighbors=n_neighbors, metric="precomputed"),
)
est_compact = manifold.Isomap(
n_neighbors=n_neighbors, neighbors_algorithm=algorithm
)
Xt_chain = est_chain.fit_transform(X)
Xt_compact = est_compact.fit_transform(X)
assert_array_almost_equal(Xt_chain, Xt_compact)
Xt_chain = est_chain.transform(X2)
Xt_compact = est_compact.transform(X2)
assert_array_almost_equal(Xt_chain, Xt_compact)
def test_different_metric():
# Test that the metric parameters work correctly, and default to euclidean
def custom_metric(x1, x2):
return np.sqrt(np.sum(x1**2 + x2**2))
# metric, p, is_euclidean
metrics = [
("euclidean", 2, True),
("manhattan", 1, False),
("minkowski", 1, False),
("minkowski", 2, True),
(custom_metric, 2, False),
]
X, _ = datasets.make_blobs(random_state=0)
reference = manifold.Isomap().fit_transform(X)
for metric, p, is_euclidean in metrics:
embedding = manifold.Isomap(metric=metric, p=p).fit_transform(X)
if is_euclidean:
assert_array_almost_equal(embedding, reference)
else:
with pytest.raises(AssertionError, match="not almost equal"):
assert_array_almost_equal(embedding, reference)
def test_isomap_clone_bug():
# regression test for bug reported in #6062
model = manifold.Isomap()
for n_neighbors in [10, 15, 20]:
model.set_params(n_neighbors=n_neighbors)
model.fit(np.random.rand(50, 2))
assert model.nbrs_.n_neighbors == n_neighbors
def test_sparse_input():
X = sparse_rand(100, 3, density=0.1, format="csr")
# Should not error
for eigen_solver in eigen_solvers:
for path_method in path_methods:
clf = manifold.Isomap(
n_components=2,
eigen_solver=eigen_solver,
path_method=path_method,
n_neighbors=8,
)
clf.fit(X)
def test_isomap_fit_precomputed_radius_graph():
# Isomap.fit_transform must yield similar result when using
# a precomputed distance matrix.
X, y = datasets.make_s_curve(200, random_state=0)
radius = 10
g = neighbors.radius_neighbors_graph(X, radius=radius, mode="distance")
isomap = manifold.Isomap(n_neighbors=None, radius=radius, metric="precomputed")
isomap.fit(g)
precomputed_result = isomap.embedding_
isomap = manifold.Isomap(n_neighbors=None, radius=radius, metric="minkowski")
result = isomap.fit_transform(X)
assert_allclose(precomputed_result, result)
def test_isomap_raise_error_when_neighbor_and_radius_both_set():
# Isomap.fit_transform must raise a ValueError if
# radius and n_neighbors are provided.
X, _ = datasets.load_digits(return_X_y=True)
isomap = manifold.Isomap(n_neighbors=3, radius=5.5)
msg = "Both n_neighbors and radius are provided"
with pytest.raises(ValueError, match=msg):
isomap.fit_transform(X)
def test_multiple_connected_components():
# Test that a warning is raised when the graph has multiple components
X = np.array([0, 1, 2, 5, 6, 7])[:, None]
with pytest.warns(UserWarning, match="number of connected components"):
manifold.Isomap(n_neighbors=2).fit(X)
def test_multiple_connected_components_metric_precomputed():
# Test that an error is raised when the graph has multiple components
# and when X is a precomputed neighbors graph.
X = np.array([0, 1, 2, 5, 6, 7])[:, None]
# works with a precomputed distance matrix (dense)
X_distances = pairwise_distances(X)
with pytest.warns(UserWarning, match="number of connected components"):
manifold.Isomap(n_neighbors=1, metric="precomputed").fit(X_distances)
# does not work with a precomputed neighbors graph (sparse)
X_graph = neighbors.kneighbors_graph(X, n_neighbors=2, mode="distance")
with pytest.raises(RuntimeError, match="number of connected components"):
manifold.Isomap(n_neighbors=1, metric="precomputed").fit(X_graph)
def test_get_feature_names_out():
"""Check get_feature_names_out for Isomap."""
X, y = make_blobs(random_state=0, n_features=4)
n_components = 2
iso = manifold.Isomap(n_components=n_components)
iso.fit_transform(X)
names = iso.get_feature_names_out()
assert_array_equal([f"isomap{i}" for i in range(n_components)], names)

View File

@@ -0,0 +1,186 @@
from itertools import product
import numpy as np
from sklearn.utils._testing import (
assert_allclose,
assert_array_equal,
)
from scipy import linalg
import pytest
from sklearn import neighbors, manifold
from sklearn.datasets import make_blobs
from sklearn.manifold._locally_linear import barycenter_kneighbors_graph
from sklearn.utils._testing import ignore_warnings
eigen_solvers = ["dense", "arpack"]
# ----------------------------------------------------------------------
# Test utility routines
def test_barycenter_kneighbors_graph(global_dtype):
X = np.array([[0, 1], [1.01, 1.0], [2, 0]], dtype=global_dtype)
graph = barycenter_kneighbors_graph(X, 1)
expected_graph = np.array(
[[0.0, 1.0, 0.0], [1.0, 0.0, 0.0], [0.0, 1.0, 0.0]], dtype=global_dtype
)
assert graph.dtype == global_dtype
assert_allclose(graph.toarray(), expected_graph)
graph = barycenter_kneighbors_graph(X, 2)
# check that columns sum to one
assert_allclose(np.sum(graph.toarray(), axis=1), np.ones(3))
pred = np.dot(graph.toarray(), X)
assert linalg.norm(pred - X) / X.shape[0] < 1
# ----------------------------------------------------------------------
# Test LLE by computing the reconstruction error on some manifolds.
def test_lle_simple_grid(global_dtype):
# note: ARPACK is numerically unstable, so this test will fail for
# some random seeds. We choose 42 because the tests pass.
# for arm64 platforms 2 makes the test fail.
# TODO: rewrite this test to make less sensitive to the random seed,
# irrespective of the platform.
rng = np.random.RandomState(42)
# grid of equidistant points in 2D, n_components = n_dim
X = np.array(list(product(range(5), repeat=2)))
X = X + 1e-10 * rng.uniform(size=X.shape)
X = X.astype(global_dtype, copy=False)
n_components = 2
clf = manifold.LocallyLinearEmbedding(
n_neighbors=5, n_components=n_components, random_state=rng
)
tol = 0.1
N = barycenter_kneighbors_graph(X, clf.n_neighbors).toarray()
reconstruction_error = linalg.norm(np.dot(N, X) - X, "fro")
assert reconstruction_error < tol
for solver in eigen_solvers:
clf.set_params(eigen_solver=solver)
clf.fit(X)
assert clf.embedding_.shape[1] == n_components
reconstruction_error = (
linalg.norm(np.dot(N, clf.embedding_) - clf.embedding_, "fro") ** 2
)
assert reconstruction_error < tol
assert_allclose(clf.reconstruction_error_, reconstruction_error, atol=1e-1)
# re-embed a noisy version of X using the transform method
noise = rng.randn(*X.shape).astype(global_dtype, copy=False) / 100
X_reembedded = clf.transform(X + noise)
assert linalg.norm(X_reembedded - clf.embedding_) < tol
@pytest.mark.parametrize("method", ["standard", "hessian", "modified", "ltsa"])
@pytest.mark.parametrize("solver", eigen_solvers)
def test_lle_manifold(global_dtype, method, solver):
rng = np.random.RandomState(0)
# similar test on a slightly more complex manifold
X = np.array(list(product(np.arange(18), repeat=2)))
X = np.c_[X, X[:, 0] ** 2 / 18]
X = X + 1e-10 * rng.uniform(size=X.shape)
X = X.astype(global_dtype, copy=False)
n_components = 2
clf = manifold.LocallyLinearEmbedding(
n_neighbors=6, n_components=n_components, method=method, random_state=0
)
tol = 1.5 if method == "standard" else 3
N = barycenter_kneighbors_graph(X, clf.n_neighbors).toarray()
reconstruction_error = linalg.norm(np.dot(N, X) - X)
assert reconstruction_error < tol
clf.set_params(eigen_solver=solver)
clf.fit(X)
assert clf.embedding_.shape[1] == n_components
reconstruction_error = (
linalg.norm(np.dot(N, clf.embedding_) - clf.embedding_, "fro") ** 2
)
details = "solver: %s, method: %s" % (solver, method)
assert reconstruction_error < tol, details
assert (
np.abs(clf.reconstruction_error_ - reconstruction_error)
< tol * reconstruction_error
), details
# Test the error raised when parameter passed to lle is invalid
def test_lle_init_parameters():
X = np.random.rand(5, 3)
clf = manifold.LocallyLinearEmbedding(eigen_solver="error")
msg = "unrecognized eigen_solver 'error'"
with pytest.raises(ValueError, match=msg):
clf.fit(X)
clf = manifold.LocallyLinearEmbedding(method="error")
msg = "unrecognized method 'error'"
with pytest.raises(ValueError, match=msg):
clf.fit(X)
def test_pipeline():
# check that LocallyLinearEmbedding works fine as a Pipeline
# only checks that no error is raised.
# TODO check that it actually does something useful
from sklearn import pipeline, datasets
X, y = datasets.make_blobs(random_state=0)
clf = pipeline.Pipeline(
[
("filter", manifold.LocallyLinearEmbedding(random_state=0)),
("clf", neighbors.KNeighborsClassifier()),
]
)
clf.fit(X, y)
assert 0.9 < clf.score(X, y)
# Test the error raised when the weight matrix is singular
def test_singular_matrix():
M = np.ones((10, 3))
f = ignore_warnings
with pytest.raises(ValueError):
f(
manifold.locally_linear_embedding(
M,
n_neighbors=2,
n_components=1,
method="standard",
eigen_solver="arpack",
)
)
# regression test for #6033
def test_integer_input():
rand = np.random.RandomState(0)
X = rand.randint(0, 100, size=(20, 3))
for method in ["standard", "hessian", "modified", "ltsa"]:
clf = manifold.LocallyLinearEmbedding(method=method, n_neighbors=10)
clf.fit(X) # this previously raised a TypeError
def test_get_feature_names_out():
"""Check get_feature_names_out for LocallyLinearEmbedding."""
X, y = make_blobs(random_state=0, n_features=4)
n_components = 2
iso = manifold.LocallyLinearEmbedding(n_components=n_components)
iso.fit(X)
names = iso.get_feature_names_out()
assert_array_equal(
[f"locallylinearembedding{i}" for i in range(n_components)], names
)

View File

@@ -0,0 +1,44 @@
import numpy as np
from numpy.testing import assert_array_almost_equal
import pytest
from sklearn.manifold import _mds as mds
def test_smacof():
# test metric smacof using the data of "Modern Multidimensional Scaling",
# Borg & Groenen, p 154
sim = np.array([[0, 5, 3, 4], [5, 0, 2, 2], [3, 2, 0, 1], [4, 2, 1, 0]])
Z = np.array([[-0.266, -0.539], [0.451, 0.252], [0.016, -0.238], [-0.200, 0.524]])
X, _ = mds.smacof(sim, init=Z, n_components=2, max_iter=1, n_init=1)
X_true = np.array(
[[-1.415, -2.471], [1.633, 1.107], [0.249, -0.067], [-0.468, 1.431]]
)
assert_array_almost_equal(X, X_true, decimal=3)
def test_smacof_error():
# Not symmetric similarity matrix:
sim = np.array([[0, 5, 9, 4], [5, 0, 2, 2], [3, 2, 0, 1], [4, 2, 1, 0]])
with pytest.raises(ValueError):
mds.smacof(sim)
# Not squared similarity matrix:
sim = np.array([[0, 5, 9, 4], [5, 0, 2, 2], [4, 2, 1, 0]])
with pytest.raises(ValueError):
mds.smacof(sim)
# init not None and not correct format:
sim = np.array([[0, 5, 3, 4], [5, 0, 2, 2], [3, 2, 0, 1], [4, 2, 1, 0]])
Z = np.array([[-0.266, -0.539], [0.016, -0.238], [-0.200, 0.524]])
with pytest.raises(ValueError):
mds.smacof(sim, init=Z, n_init=1)
def test_MDS():
sim = np.array([[0, 5, 3, 4], [5, 0, 2, 2], [3, 2, 0, 1], [4, 2, 1, 0]])
mds_clf = mds.MDS(metric=False, n_jobs=3, dissimilarity="precomputed")
mds_clf.fit(sim)

View File

@@ -0,0 +1,482 @@
import pytest
import numpy as np
from scipy import sparse
from scipy.sparse import csgraph
from scipy.linalg import eigh
from sklearn.manifold import SpectralEmbedding
from sklearn.manifold._spectral_embedding import _graph_is_connected
from sklearn.manifold._spectral_embedding import _graph_connected_component
from sklearn.manifold import spectral_embedding
from sklearn.metrics.pairwise import rbf_kernel
from sklearn.metrics import normalized_mutual_info_score
from sklearn.neighbors import NearestNeighbors
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs
from sklearn.utils.extmath import _deterministic_vector_sign_flip
from sklearn.utils._testing import assert_array_almost_equal
from sklearn.utils._testing import assert_array_equal
try:
from pyamg import smoothed_aggregation_solver # noqa
pyamg_available = True
except ImportError:
pyamg_available = False
skip_if_no_pyamg = pytest.mark.skipif(
not pyamg_available, reason="PyAMG is required for the tests in this function."
)
# non centered, sparse centers to check the
centers = np.array(
[
[0.0, 5.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 4.0, 0.0, 0.0],
[1.0, 0.0, 0.0, 5.0, 1.0],
]
)
n_samples = 1000
n_clusters, n_features = centers.shape
S, true_labels = make_blobs(
n_samples=n_samples, centers=centers, cluster_std=1.0, random_state=42
)
def _assert_equal_with_sign_flipping(A, B, tol=0.0):
"""Check array A and B are equal with possible sign flipping on
each columns"""
tol_squared = tol**2
for A_col, B_col in zip(A.T, B.T):
assert (
np.max((A_col - B_col) ** 2) <= tol_squared
or np.max((A_col + B_col) ** 2) <= tol_squared
)
def test_sparse_graph_connected_component():
rng = np.random.RandomState(42)
n_samples = 300
boundaries = [0, 42, 121, 200, n_samples]
p = rng.permutation(n_samples)
connections = []
for start, stop in zip(boundaries[:-1], boundaries[1:]):
group = p[start:stop]
# Connect all elements within the group at least once via an
# arbitrary path that spans the group.
for i in range(len(group) - 1):
connections.append((group[i], group[i + 1]))
# Add some more random connections within the group
min_idx, max_idx = 0, len(group) - 1
n_random_connections = 1000
source = rng.randint(min_idx, max_idx, size=n_random_connections)
target = rng.randint(min_idx, max_idx, size=n_random_connections)
connections.extend(zip(group[source], group[target]))
# Build a symmetric affinity matrix
row_idx, column_idx = tuple(np.array(connections).T)
data = rng.uniform(0.1, 42, size=len(connections))
affinity = sparse.coo_matrix((data, (row_idx, column_idx)))
affinity = 0.5 * (affinity + affinity.T)
for start, stop in zip(boundaries[:-1], boundaries[1:]):
component_1 = _graph_connected_component(affinity, p[start])
component_size = stop - start
assert component_1.sum() == component_size
# We should retrieve the same component mask by starting by both ends
# of the group
component_2 = _graph_connected_component(affinity, p[stop - 1])
assert component_2.sum() == component_size
assert_array_equal(component_1, component_2)
@pytest.mark.parametrize(
"eigen_solver",
[
"arpack",
"lobpcg",
pytest.param("amg", marks=skip_if_no_pyamg),
],
)
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
def test_spectral_embedding_two_components(eigen_solver, dtype, seed=36):
# Test spectral embedding with two components
random_state = np.random.RandomState(seed)
n_sample = 100
affinity = np.zeros(shape=[n_sample * 2, n_sample * 2])
# first component
affinity[0:n_sample, 0:n_sample] = (
np.abs(random_state.randn(n_sample, n_sample)) + 2
)
# second component
affinity[n_sample::, n_sample::] = (
np.abs(random_state.randn(n_sample, n_sample)) + 2
)
# Test of internal _graph_connected_component before connection
component = _graph_connected_component(affinity, 0)
assert component[:n_sample].all()
assert not component[n_sample:].any()
component = _graph_connected_component(affinity, -1)
assert not component[:n_sample].any()
assert component[n_sample:].all()
# connection
affinity[0, n_sample + 1] = 1
affinity[n_sample + 1, 0] = 1
affinity.flat[:: 2 * n_sample + 1] = 0
affinity = 0.5 * (affinity + affinity.T)
true_label = np.zeros(shape=2 * n_sample)
true_label[0:n_sample] = 1
se_precomp = SpectralEmbedding(
n_components=1,
affinity="precomputed",
random_state=np.random.RandomState(seed),
eigen_solver=eigen_solver,
)
embedded_coordinate = se_precomp.fit_transform(affinity.astype(dtype))
# thresholding on the first components using 0.
label_ = np.array(embedded_coordinate.ravel() < 0, dtype=np.int64)
assert normalized_mutual_info_score(true_label, label_) == pytest.approx(1.0)
@pytest.mark.parametrize("X", [S, sparse.csr_matrix(S)], ids=["dense", "sparse"])
@pytest.mark.parametrize(
"eigen_solver",
[
"arpack",
"lobpcg",
pytest.param("amg", marks=skip_if_no_pyamg),
],
)
@pytest.mark.parametrize("dtype", (np.float32, np.float64))
def test_spectral_embedding_precomputed_affinity(X, eigen_solver, dtype, seed=36):
# Test spectral embedding with precomputed kernel
gamma = 1.0
se_precomp = SpectralEmbedding(
n_components=2,
affinity="precomputed",
random_state=np.random.RandomState(seed),
eigen_solver=eigen_solver,
)
se_rbf = SpectralEmbedding(
n_components=2,
affinity="rbf",
gamma=gamma,
random_state=np.random.RandomState(seed),
eigen_solver=eigen_solver,
)
embed_precomp = se_precomp.fit_transform(rbf_kernel(X.astype(dtype), gamma=gamma))
embed_rbf = se_rbf.fit_transform(X.astype(dtype))
assert_array_almost_equal(se_precomp.affinity_matrix_, se_rbf.affinity_matrix_)
_assert_equal_with_sign_flipping(embed_precomp, embed_rbf, 0.05)
def test_precomputed_nearest_neighbors_filtering():
# Test precomputed graph filtering when containing too many neighbors
n_neighbors = 2
results = []
for additional_neighbors in [0, 10]:
nn = NearestNeighbors(n_neighbors=n_neighbors + additional_neighbors).fit(S)
graph = nn.kneighbors_graph(S, mode="connectivity")
embedding = (
SpectralEmbedding(
random_state=0,
n_components=2,
affinity="precomputed_nearest_neighbors",
n_neighbors=n_neighbors,
)
.fit(graph)
.embedding_
)
results.append(embedding)
assert_array_equal(results[0], results[1])
@pytest.mark.parametrize("X", [S, sparse.csr_matrix(S)], ids=["dense", "sparse"])
def test_spectral_embedding_callable_affinity(X, seed=36):
# Test spectral embedding with callable affinity
gamma = 0.9
kern = rbf_kernel(S, gamma=gamma)
se_callable = SpectralEmbedding(
n_components=2,
affinity=(lambda x: rbf_kernel(x, gamma=gamma)),
gamma=gamma,
random_state=np.random.RandomState(seed),
)
se_rbf = SpectralEmbedding(
n_components=2,
affinity="rbf",
gamma=gamma,
random_state=np.random.RandomState(seed),
)
embed_rbf = se_rbf.fit_transform(X)
embed_callable = se_callable.fit_transform(X)
assert_array_almost_equal(se_callable.affinity_matrix_, se_rbf.affinity_matrix_)
assert_array_almost_equal(kern, se_rbf.affinity_matrix_)
_assert_equal_with_sign_flipping(embed_rbf, embed_callable, 0.05)
# TODO: Remove when pyamg does replaces sp.rand call with np.random.rand
# https://github.com/scikit-learn/scikit-learn/issues/15913
@pytest.mark.filterwarnings(
"ignore:scipy.rand is deprecated:DeprecationWarning:pyamg.*"
)
# TODO: Remove when pyamg removes the use of np.float
@pytest.mark.filterwarnings(
"ignore:`np.float` is a deprecated alias:DeprecationWarning:pyamg.*"
)
# TODO: Remove when pyamg removes the use of pinv2
@pytest.mark.filterwarnings(
"ignore:scipy.linalg.pinv2 is deprecated:DeprecationWarning:pyamg.*"
)
@pytest.mark.skipif(
not pyamg_available, reason="PyAMG is required for the tests in this function."
)
@pytest.mark.parametrize("dtype", (np.float32, np.float64))
def test_spectral_embedding_amg_solver(dtype, seed=36):
se_amg = SpectralEmbedding(
n_components=2,
affinity="nearest_neighbors",
eigen_solver="amg",
n_neighbors=5,
random_state=np.random.RandomState(seed),
)
se_arpack = SpectralEmbedding(
n_components=2,
affinity="nearest_neighbors",
eigen_solver="arpack",
n_neighbors=5,
random_state=np.random.RandomState(seed),
)
embed_amg = se_amg.fit_transform(S.astype(dtype))
embed_arpack = se_arpack.fit_transform(S.astype(dtype))
_assert_equal_with_sign_flipping(embed_amg, embed_arpack, 1e-5)
# same with special case in which amg is not actually used
# regression test for #10715
# affinity between nodes
row = [0, 0, 1, 2, 3, 3, 4]
col = [1, 2, 2, 3, 4, 5, 5]
val = [100, 100, 100, 1, 100, 100, 100]
affinity = sparse.coo_matrix(
(val + val, (row + col, col + row)), shape=(6, 6)
).toarray()
se_amg.affinity = "precomputed"
se_arpack.affinity = "precomputed"
embed_amg = se_amg.fit_transform(affinity.astype(dtype))
embed_arpack = se_arpack.fit_transform(affinity.astype(dtype))
_assert_equal_with_sign_flipping(embed_amg, embed_arpack, 1e-5)
# TODO: Remove filterwarnings when pyamg does replaces sp.rand call with
# np.random.rand:
# https://github.com/scikit-learn/scikit-learn/issues/15913
@pytest.mark.filterwarnings(
"ignore:scipy.rand is deprecated:DeprecationWarning:pyamg.*"
)
# TODO: Remove when pyamg removes the use of np.float
@pytest.mark.filterwarnings(
"ignore:`np.float` is a deprecated alias:DeprecationWarning:pyamg.*"
)
# TODO: Remove when pyamg removes the use of pinv2
@pytest.mark.filterwarnings(
"ignore:scipy.linalg.pinv2 is deprecated:DeprecationWarning:pyamg.*"
)
@pytest.mark.skipif(
not pyamg_available, reason="PyAMG is required for the tests in this function."
)
@pytest.mark.parametrize("dtype", (np.float32, np.float64))
def test_spectral_embedding_amg_solver_failure(dtype, seed=36):
# Non-regression test for amg solver failure (issue #13393 on github)
num_nodes = 100
X = sparse.rand(num_nodes, num_nodes, density=0.1, random_state=seed)
X = X.astype(dtype)
upper = sparse.triu(X) - sparse.diags(X.diagonal())
sym_matrix = upper + upper.T
embedding = spectral_embedding(
sym_matrix, n_components=10, eigen_solver="amg", random_state=0
)
# Check that the learned embedding is stable w.r.t. random solver init:
for i in range(3):
new_embedding = spectral_embedding(
sym_matrix, n_components=10, eigen_solver="amg", random_state=i + 1
)
_assert_equal_with_sign_flipping(embedding, new_embedding, tol=0.05)
@pytest.mark.filterwarnings("ignore:the behavior of nmi will change in version 0.22")
def test_pipeline_spectral_clustering(seed=36):
# Test using pipeline to do spectral clustering
random_state = np.random.RandomState(seed)
se_rbf = SpectralEmbedding(
n_components=n_clusters, affinity="rbf", random_state=random_state
)
se_knn = SpectralEmbedding(
n_components=n_clusters,
affinity="nearest_neighbors",
n_neighbors=5,
random_state=random_state,
)
for se in [se_rbf, se_knn]:
km = KMeans(n_clusters=n_clusters, random_state=random_state)
km.fit(se.fit_transform(S))
assert_array_almost_equal(
normalized_mutual_info_score(km.labels_, true_labels), 1.0, 2
)
def test_spectral_embedding_unknown_eigensolver(seed=36):
# Test that SpectralClustering fails with an unknown eigensolver
se = SpectralEmbedding(
n_components=1,
affinity="precomputed",
random_state=np.random.RandomState(seed),
eigen_solver="<unknown>",
)
with pytest.raises(ValueError):
se.fit(S)
def test_spectral_embedding_unknown_affinity(seed=36):
# Test that SpectralClustering fails with an unknown affinity type
se = SpectralEmbedding(
n_components=1,
affinity="<unknown>",
random_state=np.random.RandomState(seed),
)
with pytest.raises(ValueError):
se.fit(S)
def test_connectivity(seed=36):
# Test that graph connectivity test works as expected
graph = np.array(
[
[1, 0, 0, 0, 0],
[0, 1, 1, 0, 0],
[0, 1, 1, 1, 0],
[0, 0, 1, 1, 1],
[0, 0, 0, 1, 1],
]
)
assert not _graph_is_connected(graph)
assert not _graph_is_connected(sparse.csr_matrix(graph))
assert not _graph_is_connected(sparse.csc_matrix(graph))
graph = np.array(
[
[1, 1, 0, 0, 0],
[1, 1, 1, 0, 0],
[0, 1, 1, 1, 0],
[0, 0, 1, 1, 1],
[0, 0, 0, 1, 1],
]
)
assert _graph_is_connected(graph)
assert _graph_is_connected(sparse.csr_matrix(graph))
assert _graph_is_connected(sparse.csc_matrix(graph))
def test_spectral_embedding_deterministic():
# Test that Spectral Embedding is deterministic
random_state = np.random.RandomState(36)
data = random_state.randn(10, 30)
sims = rbf_kernel(data)
embedding_1 = spectral_embedding(sims)
embedding_2 = spectral_embedding(sims)
assert_array_almost_equal(embedding_1, embedding_2)
def test_spectral_embedding_unnormalized():
# Test that spectral_embedding is also processing unnormalized laplacian
# correctly
random_state = np.random.RandomState(36)
data = random_state.randn(10, 30)
sims = rbf_kernel(data)
n_components = 8
embedding_1 = spectral_embedding(
sims, norm_laplacian=False, n_components=n_components, drop_first=False
)
# Verify using manual computation with dense eigh
laplacian, dd = csgraph.laplacian(sims, normed=False, return_diag=True)
_, diffusion_map = eigh(laplacian)
embedding_2 = diffusion_map.T[:n_components]
embedding_2 = _deterministic_vector_sign_flip(embedding_2).T
assert_array_almost_equal(embedding_1, embedding_2)
def test_spectral_embedding_first_eigen_vector():
# Test that the first eigenvector of spectral_embedding
# is constant and that the second is not (for a connected graph)
random_state = np.random.RandomState(36)
data = random_state.randn(10, 30)
sims = rbf_kernel(data)
n_components = 2
for seed in range(10):
embedding = spectral_embedding(
sims,
norm_laplacian=False,
n_components=n_components,
drop_first=False,
random_state=seed,
)
assert np.std(embedding[:, 0]) == pytest.approx(0)
assert np.std(embedding[:, 1]) > 1e-3
@pytest.mark.parametrize(
"eigen_solver",
[
"arpack",
"lobpcg",
pytest.param("amg", marks=skip_if_no_pyamg),
],
)
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
def test_spectral_embedding_preserves_dtype(eigen_solver, dtype):
"""Check that `SpectralEmbedding is preserving the dtype of the fitted
attribute and transformed data.
Ideally, this test should be covered by the common test
`check_transformer_preserve_dtypes`. However, this test only run
with transformers implementing `transform` while `SpectralEmbedding`
implements only `fit_transform`.
"""
X = S.astype(dtype)
se = SpectralEmbedding(
n_components=2, affinity="rbf", eigen_solver=eigen_solver, random_state=0
)
X_trans = se.fit_transform(X)
assert X_trans.dtype == dtype
assert se.embedding_.dtype == dtype
assert se.affinity_matrix_.dtype == dtype
@pytest.mark.skipif(
pyamg_available,
reason="PyAMG is installed and we should not test for an error.",
)
def test_error_pyamg_not_available():
se_precomp = SpectralEmbedding(
n_components=2,
affinity="rbf",
eigen_solver="amg",
)
err_msg = "The eigen_solver was set to 'amg', but pyamg is not available."
with pytest.raises(ValueError, match=err_msg):
se_precomp.fit_transform(S)