"""
Network construction from distance matrices.
Implements R ts2net API for network builders: k-NN, ε-NN, weighted.
API Design Credit
-----------------
The network construction methods and API design are based on the R ts2net
package by Leonardo N. Ferreira:
Ferreira, L.N. (2024). From time series to networks in R with the ts2net
package. Applied Network Science, 9(1), 32.
https://doi.org/10.1007/s41109-024-00642-2
Original R package: https://github.com/lnferreira/ts2net
This Python implementation extends the R API with:
- Approximate nearest neighbors for large datasets (pynndescent)
- Directed/undirected graph options
- NetworkX integration
"""
import numpy as np
from numpy.typing import NDArray
from typing import Tuple, Optional
import networkx as nx
from scipy import sparse
import logging
logger = logging.getLogger(__name__)
# Optional: pynndescent for approximate nearest neighbors
try:
from pynndescent import NNDescent
HAS_PYNNDESCENT = True
except ImportError:
HAS_PYNNDESCENT = False
[docs]
def net_knn(D: NDArray[np.float64], k: int, mutual: bool = False,
weighted: bool = False, directed: bool = False) -> Tuple[nx.Graph, np.ndarray]:
"""
k-Nearest Neighbors network from distance matrix.
Each node is connected to its k nearest neighbors.
Parameters
----------
D : array (n, n)
Distance matrix (smaller = more similar)
k : int
Number of nearest neighbors per node
mutual : bool
If True, require mutual k-NN (i in kNN(j) AND j in kNN(i))
weighted : bool
If True, edge weights = distances
directed : bool
If True, create directed graph (i → j if j in kNN(i))
Returns
-------
G : networkx.Graph or DiGraph
k-NN network
A : array (n, n)
Adjacency matrix (weighted if weighted=True)
Examples
--------
>>> D = np.random.rand(10, 10)
>>> D = (D + D.T) / 2 # Make symmetric
>>> np.fill_diagonal(D, 0)
>>> G, A = net_knn(D, k=3, mutual=False, weighted=True)
>>> G.number_of_edges()
30
"""
n = D.shape[0]
if D.shape != (n, n):
raise ValueError(f"D must be square, got shape {D.shape}")
if k <= 0 or k >= n:
raise ValueError(f"k must be in range [1, {n-1}], got {k}")
# Create adjacency matrix
A = np.zeros((n, n))
for i in range(n):
# Find k nearest neighbors (excluding self)
distances = D[i].copy()
distances[i] = np.inf # Exclude self
neighbors = np.argpartition(distances, k - 1)[:k]
for j in neighbors:
weight = D[i, j] if weighted else 1.0
A[i, j] = weight
# Apply mutual k-NN constraint
if mutual:
if directed:
A = A * A.T # Both i→j and j→i must exist
else:
A = np.minimum(A, A.T) # Symmetric minimum
# For undirected non-mutual, symmetrize
if not directed and not mutual:
A = np.maximum(A, A.T)
# Create graph
if directed:
G = nx.DiGraph()
else:
G = nx.Graph()
G.add_nodes_from(range(n))
# Add edges
edges = []
if directed:
for i in range(n):
for j in range(n):
if A[i, j] > 0:
if weighted:
edges.append((i, j, {'weight': A[i, j]}))
else:
edges.append((i, j))
else:
# Undirected: only add upper triangle to avoid duplicate edges
for i in range(n):
for j in range(i + 1, n):
if A[i, j] > 0:
if weighted:
edges.append((i, j, {'weight': A[i, j]}))
else:
edges.append((i, j))
G.add_edges_from(edges)
logger.info(f"Built k-NN network: k={k}, nodes={G.number_of_nodes()}, "
f"edges={G.number_of_edges()}, mutual={mutual}, weighted={weighted}")
return G, A
[docs]
def net_enn(D: NDArray[np.float64], epsilon: Optional[float] = None,
percentile: Optional[float] = None, weighted: bool = False,
directed: bool = False) -> Tuple[nx.Graph, np.ndarray]:
"""
ε-Nearest Neighbors network from distance matrix.
Nodes are connected if distance < ε.
Parameters
----------
D : array (n, n)
Distance matrix
epsilon : float, optional
Distance threshold (connect if D[i,j] < epsilon)
percentile : float, optional
Use percentile of distances as epsilon (0-100)
If both epsilon and percentile given, epsilon takes precedence
weighted : bool
If True, edge weights = distances
directed : bool
If True, create directed graph
Returns
-------
G : networkx.Graph or DiGraph
ε-NN network
A : array (n, n)
Adjacency matrix
Examples
--------
>>> D = np.random.rand(10, 10)
>>> # Connect top 30% shortest distances
>>> G, A = net_enn(D, percentile=30, weighted=False)
"""
n = D.shape[0]
if D.shape != (n, n):
raise ValueError(f"D must be square, got shape {D.shape}")
# Determine epsilon
if epsilon is None and percentile is None:
raise ValueError("Must specify either epsilon or percentile")
if epsilon is None:
# Extract upper triangle (excluding diagonal)
upper_tri = D[np.triu_indices(n, k=1)]
epsilon = np.percentile(upper_tri, percentile)
logger.info(f"Using {percentile}th percentile: ε={epsilon:.4f}")
# Create adjacency matrix
A = np.zeros((n, n))
for i in range(n):
for j in range(n):
if i != j and D[i, j] < epsilon:
weight = D[i, j] if weighted else 1.0
A[i, j] = weight
if not directed:
A[j, i] = weight
# Create graph
if directed:
G = nx.DiGraph()
else:
G = nx.Graph()
G.add_nodes_from(range(n))
edges = []
for i in range(n):
for j in range(n):
if A[i, j] > 0 and (directed or j > i):
if weighted:
edges.append((i, j, {'weight': A[i, j]}))
else:
edges.append((i, j))
G.add_edges_from(edges)
logger.info(f"Built ε-NN network: ε={epsilon:.4f}, nodes={G.number_of_nodes()}, "
f"edges={G.number_of_edges()}, weighted={weighted}")
return G, A
[docs]
def net_weighted(D: NDArray[np.float64], threshold: Optional[float] = None,
directed: bool = False) -> Tuple[nx.Graph, np.ndarray]:
"""
Complete weighted network from distance matrix.
All pairs connected with edge weight = distance.
Parameters
----------
D : array (n, n)
Distance matrix
threshold : float, optional
Remove edges with distance > threshold
directed : bool
If True, create directed graph
Returns
-------
G : networkx.Graph or DiGraph
Weighted network
A : array (n, n)
Adjacency matrix (weighted)
Examples
--------
>>> D = np.random.rand(10, 10)
>>> G, A = net_weighted(D, threshold=0.5)
"""
n = D.shape[0]
if D.shape != (n, n):
raise ValueError(f"D must be square, got shape {D.shape}")
# Create adjacency matrix
A = D.copy()
# Apply threshold
if threshold is not None:
A[A > threshold] = 0.0
# Remove diagonal
np.fill_diagonal(A, 0.0)
# Create graph
if directed:
G = nx.DiGraph()
else:
G = nx.Graph()
G.add_nodes_from(range(n))
edges = []
for i in range(n):
for j in range(n):
if A[i, j] > 0 and (directed or j > i):
edges.append((i, j, {'weight': A[i, j]}))
G.add_edges_from(edges)
logger.info(f"Built weighted network: threshold={threshold}, "
f"nodes={G.number_of_nodes()}, edges={G.number_of_edges()}")
return G, A
[docs]
def net_knn_approx(D: NDArray[np.float64], k: int, metric: str = 'precomputed',
n_neighbors: int = 15, weighted: bool = False,
directed: bool = False) -> Tuple[nx.Graph, np.ndarray]:
"""
Approximate k-NN network using PyNNDescent.
Much faster than exact k-NN for large datasets (>1000 nodes),
but may miss some nearest neighbors.
Parameters
----------
D : array (n, n)
Distance matrix (if metric='precomputed')
OR raw feature matrix (if metric='euclidean', etc.)
k : int
Number of nearest neighbors
metric : str
'precomputed' (use D as distance matrix)
OR 'euclidean', 'cosine', 'manhattan', etc. (compute on the fly)
n_neighbors : int
Number of neighbors for approximation (>= k, larger = more accurate)
weighted : bool
If True, edge weights = distances
directed : bool
If True, create directed graph
Returns
-------
G : networkx.Graph or DiGraph
Approximate k-NN network
A : array (n, n)
Adjacency matrix
Notes
-----
Requires: pip install pynndescent
Speed comparison (n=10,000 nodes):
- Exact k-NN: ~2 minutes
- Approximate k-NN: ~3 seconds (40x faster)
Examples
--------
>>> # For very large datasets
>>> X = np.random.randn(10000, 1000) # 10k series, 1k points each
>>> G, A = net_knn_approx(X, k=10, metric='euclidean')
>>> # Or with precomputed distances
>>> D = ts_dist(X, method='correlation', n_jobs=-1)
>>> G, A = net_knn_approx(D, k=10, metric='precomputed')
"""
if not HAS_PYNNDESCENT:
raise ImportError(
"net_knn_approx requires pynndescent. "
"Install with: pip install pynndescent"
)
n = D.shape[0]
if k <= 0 or k >= n:
raise ValueError(f"k must be in range [1, {n-1}], got {k}")
if n_neighbors < k:
logger.warning(f"n_neighbors ({n_neighbors}) < k ({k}), setting n_neighbors={k}")
n_neighbors = k
logger.info(f"Building approximate k-NN with pynndescent (n={n}, k={k})...")
# Build approximate nearest neighbor index
index = NNDescent(
D,
metric=metric,
n_neighbors=min(n_neighbors, n - 1),
random_state=42
)
# Query k nearest neighbors for each point
indices, distances = index.query(D, k=k + 1) # +1 to exclude self
# Build adjacency matrix
A = np.zeros((n, n))
for i in range(n):
for j_idx in range(1, k + 1): # Skip first (self)
j = indices[i, j_idx]
dist = distances[i, j_idx]
weight = dist if weighted else 1.0
A[i, j] = weight
# Symmetrize for undirected
if not directed:
A = np.maximum(A, A.T)
# Create graph
if directed:
G = nx.DiGraph()
else:
G = nx.Graph()
G.add_nodes_from(range(n))
# Add edges
edges = []
if directed:
for i in range(n):
for j in range(n):
if A[i, j] > 0:
if weighted:
edges.append((i, j, {'weight': A[i, j]}))
else:
edges.append((i, j))
else:
for i in range(n):
for j in range(i + 1, n):
if A[i, j] > 0:
if weighted:
edges.append((i, j, {'weight': A[i, j]}))
else:
edges.append((i, j))
G.add_edges_from(edges)
logger.info(f"Built approximate k-NN: k={k}, nodes={G.number_of_nodes()}, "
f"edges={G.number_of_edges()} (approximate)")
return G, A
[docs]
def net_enn_approx(D: NDArray[np.float64], epsilon: Optional[float] = None,
percentile: Optional[float] = None, metric: str = 'precomputed',
n_neighbors: int = 50, weighted: bool = False,
directed: bool = False) -> Tuple[nx.Graph, np.ndarray]:
"""
Approximate ε-NN network using PyNNDescent.
Faster than exact ε-NN for large datasets, but may miss some edges.
Parameters
----------
D : array (n, n)
Distance matrix or feature matrix
epsilon : float, optional
Distance threshold
percentile : float, optional
Use percentile of distances (if epsilon is None)
metric : str
'precomputed' or distance metric name
n_neighbors : int
Number of neighbors to search (larger = more accurate)
weighted : bool
If True, edge weights = distances
directed : bool
If True, create directed graph
Returns
-------
G : networkx.Graph or DiGraph
Approximate ε-NN network
A : array (n, n)
Adjacency matrix
Notes
-----
Requires: pip install pynndescent
"""
if not HAS_PYNNDESCENT:
raise ImportError(
"net_enn_approx requires pynndescent. "
"Install with: pip install pynndescent"
)
n = D.shape[0]
# Determine epsilon
if epsilon is None and percentile is None:
raise ValueError("Must specify either epsilon or percentile")
if epsilon is None:
# If metric is 'precomputed', D is a distance matrix
# Otherwise, D is a feature matrix and we need to compute distances first
if metric == 'precomputed':
upper_tri = D[np.triu_indices(n, k=1)]
epsilon = np.percentile(upper_tri, percentile)
else:
# For feature matrices, compute pairwise distances to determine epsilon
from scipy.spatial.distance import pdist, squareform
dists = pdist(D, metric=metric)
epsilon = np.percentile(dists, percentile)
logger.info(f"Using {percentile}th percentile: ε={epsilon:.4f}")
logger.info(f"Building approximate ε-NN with pynndescent (n={n}, ε={epsilon:.4f})...")
# Build approximate nearest neighbor index
index = NNDescent(
D,
metric=metric,
n_neighbors=min(n_neighbors, n - 1),
random_state=42
)
# Query neighbors within epsilon
k_query = min(n_neighbors, n - 1)
indices, distances = index.query(D, k=k_query)
# Build adjacency matrix
A = np.zeros((n, n))
# Get actual number of neighbors returned
# indices and distances are 2D arrays: (n, k_actual) where k_actual <= k_query
max_neighbors = indices.shape[1]
for i in range(n):
# Iterate from 1 to max_neighbors-1 (skip first which is self)
# Use min to ensure we never exceed bounds
for j_idx in range(1, min(max_neighbors, k_query)):
if j_idx >= max_neighbors:
break
j = int(indices[i, j_idx])
dist = float(distances[i, j_idx])
if dist < epsilon:
weight = dist if weighted else 1.0
A[i, j] = weight
# Symmetrize for undirected
if not directed:
A = np.maximum(A, A.T)
# Create graph
if directed:
G = nx.DiGraph()
else:
G = nx.Graph()
G.add_nodes_from(range(n))
# Add edges
edges = []
if directed:
for i in range(n):
for j in range(n):
if A[i, j] > 0:
if weighted:
edges.append((i, j, {'weight': A[i, j]}))
else:
edges.append((i, j))
else:
for i in range(n):
for j in range(i + 1, n):
if A[i, j] > 0:
if weighted:
edges.append((i, j, {'weight': A[i, j]}))
else:
edges.append((i, j))
G.add_edges_from(edges)
logger.info(f"Built approximate ε-NN: ε={epsilon:.4f}, nodes={G.number_of_nodes()}, "
f"edges={G.number_of_edges()} (approximate)")
return G, A