Source code for ts2net.multivariate.builders

"""
Network construction from distance matrices.

Implements R ts2net API for network builders: k-NN, ε-NN, weighted.

API Design Credit
-----------------
The network construction methods and API design are based on the R ts2net 
package by Leonardo N. Ferreira:

    Ferreira, L.N. (2024). From time series to networks in R with the ts2net 
    package. Applied Network Science, 9(1), 32.
    https://doi.org/10.1007/s41109-024-00642-2
    
Original R package: https://github.com/lnferreira/ts2net

This Python implementation extends the R API with:
- Approximate nearest neighbors for large datasets (pynndescent)
- Directed/undirected graph options
- NetworkX integration
"""

import numpy as np
from numpy.typing import NDArray
from typing import Tuple, Optional
import networkx as nx
from scipy import sparse
import logging

logger = logging.getLogger(__name__)

# Optional: pynndescent for approximate nearest neighbors
try:
    from pynndescent import NNDescent
    HAS_PYNNDESCENT = True
except ImportError:
    HAS_PYNNDESCENT = False


[docs] def net_knn(D: NDArray[np.float64], k: int, mutual: bool = False, weighted: bool = False, directed: bool = False) -> Tuple[nx.Graph, np.ndarray]: """ k-Nearest Neighbors network from distance matrix. Each node is connected to its k nearest neighbors. Parameters ---------- D : array (n, n) Distance matrix (smaller = more similar) k : int Number of nearest neighbors per node mutual : bool If True, require mutual k-NN (i in kNN(j) AND j in kNN(i)) weighted : bool If True, edge weights = distances directed : bool If True, create directed graph (i → j if j in kNN(i)) Returns ------- G : networkx.Graph or DiGraph k-NN network A : array (n, n) Adjacency matrix (weighted if weighted=True) Examples -------- >>> D = np.random.rand(10, 10) >>> D = (D + D.T) / 2 # Make symmetric >>> np.fill_diagonal(D, 0) >>> G, A = net_knn(D, k=3, mutual=False, weighted=True) >>> G.number_of_edges() 30 """ n = D.shape[0] if D.shape != (n, n): raise ValueError(f"D must be square, got shape {D.shape}") if k <= 0 or k >= n: raise ValueError(f"k must be in range [1, {n-1}], got {k}") # Create adjacency matrix A = np.zeros((n, n)) for i in range(n): # Find k nearest neighbors (excluding self) distances = D[i].copy() distances[i] = np.inf # Exclude self neighbors = np.argpartition(distances, k - 1)[:k] for j in neighbors: weight = D[i, j] if weighted else 1.0 A[i, j] = weight # Apply mutual k-NN constraint if mutual: if directed: A = A * A.T # Both i→j and j→i must exist else: A = np.minimum(A, A.T) # Symmetric minimum # For undirected non-mutual, symmetrize if not directed and not mutual: A = np.maximum(A, A.T) # Create graph if directed: G = nx.DiGraph() else: G = nx.Graph() G.add_nodes_from(range(n)) # Add edges edges = [] if directed: for i in range(n): for j in range(n): if A[i, j] > 0: if weighted: edges.append((i, j, {'weight': A[i, j]})) else: edges.append((i, j)) else: # Undirected: only add upper triangle to avoid duplicate edges for i in range(n): for j in range(i + 1, n): if A[i, j] > 0: if weighted: edges.append((i, j, {'weight': A[i, j]})) else: edges.append((i, j)) G.add_edges_from(edges) logger.info(f"Built k-NN network: k={k}, nodes={G.number_of_nodes()}, " f"edges={G.number_of_edges()}, mutual={mutual}, weighted={weighted}") return G, A
[docs] def net_enn(D: NDArray[np.float64], epsilon: Optional[float] = None, percentile: Optional[float] = None, weighted: bool = False, directed: bool = False) -> Tuple[nx.Graph, np.ndarray]: """ ε-Nearest Neighbors network from distance matrix. Nodes are connected if distance < ε. Parameters ---------- D : array (n, n) Distance matrix epsilon : float, optional Distance threshold (connect if D[i,j] < epsilon) percentile : float, optional Use percentile of distances as epsilon (0-100) If both epsilon and percentile given, epsilon takes precedence weighted : bool If True, edge weights = distances directed : bool If True, create directed graph Returns ------- G : networkx.Graph or DiGraph ε-NN network A : array (n, n) Adjacency matrix Examples -------- >>> D = np.random.rand(10, 10) >>> # Connect top 30% shortest distances >>> G, A = net_enn(D, percentile=30, weighted=False) """ n = D.shape[0] if D.shape != (n, n): raise ValueError(f"D must be square, got shape {D.shape}") # Determine epsilon if epsilon is None and percentile is None: raise ValueError("Must specify either epsilon or percentile") if epsilon is None: # Extract upper triangle (excluding diagonal) upper_tri = D[np.triu_indices(n, k=1)] epsilon = np.percentile(upper_tri, percentile) logger.info(f"Using {percentile}th percentile: ε={epsilon:.4f}") # Create adjacency matrix A = np.zeros((n, n)) for i in range(n): for j in range(n): if i != j and D[i, j] < epsilon: weight = D[i, j] if weighted else 1.0 A[i, j] = weight if not directed: A[j, i] = weight # Create graph if directed: G = nx.DiGraph() else: G = nx.Graph() G.add_nodes_from(range(n)) edges = [] for i in range(n): for j in range(n): if A[i, j] > 0 and (directed or j > i): if weighted: edges.append((i, j, {'weight': A[i, j]})) else: edges.append((i, j)) G.add_edges_from(edges) logger.info(f"Built ε-NN network: ε={epsilon:.4f}, nodes={G.number_of_nodes()}, " f"edges={G.number_of_edges()}, weighted={weighted}") return G, A
[docs] def net_weighted(D: NDArray[np.float64], threshold: Optional[float] = None, directed: bool = False) -> Tuple[nx.Graph, np.ndarray]: """ Complete weighted network from distance matrix. All pairs connected with edge weight = distance. Parameters ---------- D : array (n, n) Distance matrix threshold : float, optional Remove edges with distance > threshold directed : bool If True, create directed graph Returns ------- G : networkx.Graph or DiGraph Weighted network A : array (n, n) Adjacency matrix (weighted) Examples -------- >>> D = np.random.rand(10, 10) >>> G, A = net_weighted(D, threshold=0.5) """ n = D.shape[0] if D.shape != (n, n): raise ValueError(f"D must be square, got shape {D.shape}") # Create adjacency matrix A = D.copy() # Apply threshold if threshold is not None: A[A > threshold] = 0.0 # Remove diagonal np.fill_diagonal(A, 0.0) # Create graph if directed: G = nx.DiGraph() else: G = nx.Graph() G.add_nodes_from(range(n)) edges = [] for i in range(n): for j in range(n): if A[i, j] > 0 and (directed or j > i): edges.append((i, j, {'weight': A[i, j]})) G.add_edges_from(edges) logger.info(f"Built weighted network: threshold={threshold}, " f"nodes={G.number_of_nodes()}, edges={G.number_of_edges()}") return G, A
[docs] def net_knn_approx(D: NDArray[np.float64], k: int, metric: str = 'precomputed', n_neighbors: int = 15, weighted: bool = False, directed: bool = False) -> Tuple[nx.Graph, np.ndarray]: """ Approximate k-NN network using PyNNDescent. Much faster than exact k-NN for large datasets (>1000 nodes), but may miss some nearest neighbors. Parameters ---------- D : array (n, n) Distance matrix (if metric='precomputed') OR raw feature matrix (if metric='euclidean', etc.) k : int Number of nearest neighbors metric : str 'precomputed' (use D as distance matrix) OR 'euclidean', 'cosine', 'manhattan', etc. (compute on the fly) n_neighbors : int Number of neighbors for approximation (>= k, larger = more accurate) weighted : bool If True, edge weights = distances directed : bool If True, create directed graph Returns ------- G : networkx.Graph or DiGraph Approximate k-NN network A : array (n, n) Adjacency matrix Notes ----- Requires: pip install pynndescent Speed comparison (n=10,000 nodes): - Exact k-NN: ~2 minutes - Approximate k-NN: ~3 seconds (40x faster) Examples -------- >>> # For very large datasets >>> X = np.random.randn(10000, 1000) # 10k series, 1k points each >>> G, A = net_knn_approx(X, k=10, metric='euclidean') >>> # Or with precomputed distances >>> D = ts_dist(X, method='correlation', n_jobs=-1) >>> G, A = net_knn_approx(D, k=10, metric='precomputed') """ if not HAS_PYNNDESCENT: raise ImportError( "net_knn_approx requires pynndescent. " "Install with: pip install pynndescent" ) n = D.shape[0] if k <= 0 or k >= n: raise ValueError(f"k must be in range [1, {n-1}], got {k}") if n_neighbors < k: logger.warning(f"n_neighbors ({n_neighbors}) < k ({k}), setting n_neighbors={k}") n_neighbors = k logger.info(f"Building approximate k-NN with pynndescent (n={n}, k={k})...") # Build approximate nearest neighbor index index = NNDescent( D, metric=metric, n_neighbors=min(n_neighbors, n - 1), random_state=42 ) # Query k nearest neighbors for each point indices, distances = index.query(D, k=k + 1) # +1 to exclude self # Build adjacency matrix A = np.zeros((n, n)) for i in range(n): for j_idx in range(1, k + 1): # Skip first (self) j = indices[i, j_idx] dist = distances[i, j_idx] weight = dist if weighted else 1.0 A[i, j] = weight # Symmetrize for undirected if not directed: A = np.maximum(A, A.T) # Create graph if directed: G = nx.DiGraph() else: G = nx.Graph() G.add_nodes_from(range(n)) # Add edges edges = [] if directed: for i in range(n): for j in range(n): if A[i, j] > 0: if weighted: edges.append((i, j, {'weight': A[i, j]})) else: edges.append((i, j)) else: for i in range(n): for j in range(i + 1, n): if A[i, j] > 0: if weighted: edges.append((i, j, {'weight': A[i, j]})) else: edges.append((i, j)) G.add_edges_from(edges) logger.info(f"Built approximate k-NN: k={k}, nodes={G.number_of_nodes()}, " f"edges={G.number_of_edges()} (approximate)") return G, A
[docs] def net_enn_approx(D: NDArray[np.float64], epsilon: Optional[float] = None, percentile: Optional[float] = None, metric: str = 'precomputed', n_neighbors: int = 50, weighted: bool = False, directed: bool = False) -> Tuple[nx.Graph, np.ndarray]: """ Approximate ε-NN network using PyNNDescent. Faster than exact ε-NN for large datasets, but may miss some edges. Parameters ---------- D : array (n, n) Distance matrix or feature matrix epsilon : float, optional Distance threshold percentile : float, optional Use percentile of distances (if epsilon is None) metric : str 'precomputed' or distance metric name n_neighbors : int Number of neighbors to search (larger = more accurate) weighted : bool If True, edge weights = distances directed : bool If True, create directed graph Returns ------- G : networkx.Graph or DiGraph Approximate ε-NN network A : array (n, n) Adjacency matrix Notes ----- Requires: pip install pynndescent """ if not HAS_PYNNDESCENT: raise ImportError( "net_enn_approx requires pynndescent. " "Install with: pip install pynndescent" ) n = D.shape[0] # Determine epsilon if epsilon is None and percentile is None: raise ValueError("Must specify either epsilon or percentile") if epsilon is None: # If metric is 'precomputed', D is a distance matrix # Otherwise, D is a feature matrix and we need to compute distances first if metric == 'precomputed': upper_tri = D[np.triu_indices(n, k=1)] epsilon = np.percentile(upper_tri, percentile) else: # For feature matrices, compute pairwise distances to determine epsilon from scipy.spatial.distance import pdist, squareform dists = pdist(D, metric=metric) epsilon = np.percentile(dists, percentile) logger.info(f"Using {percentile}th percentile: ε={epsilon:.4f}") logger.info(f"Building approximate ε-NN with pynndescent (n={n}, ε={epsilon:.4f})...") # Build approximate nearest neighbor index index = NNDescent( D, metric=metric, n_neighbors=min(n_neighbors, n - 1), random_state=42 ) # Query neighbors within epsilon k_query = min(n_neighbors, n - 1) indices, distances = index.query(D, k=k_query) # Build adjacency matrix A = np.zeros((n, n)) # Get actual number of neighbors returned # indices and distances are 2D arrays: (n, k_actual) where k_actual <= k_query max_neighbors = indices.shape[1] for i in range(n): # Iterate from 1 to max_neighbors-1 (skip first which is self) # Use min to ensure we never exceed bounds for j_idx in range(1, min(max_neighbors, k_query)): if j_idx >= max_neighbors: break j = int(indices[i, j_idx]) dist = float(distances[i, j_idx]) if dist < epsilon: weight = dist if weighted else 1.0 A[i, j] = weight # Symmetrize for undirected if not directed: A = np.maximum(A, A.T) # Create graph if directed: G = nx.DiGraph() else: G = nx.Graph() G.add_nodes_from(range(n)) # Add edges edges = [] if directed: for i in range(n): for j in range(n): if A[i, j] > 0: if weighted: edges.append((i, j, {'weight': A[i, j]})) else: edges.append((i, j)) else: for i in range(n): for j in range(i + 1, n): if A[i, j] > 0: if weighted: edges.append((i, j, {'weight': A[i, j]})) else: edges.append((i, j)) G.add_edges_from(edges) logger.info(f"Built approximate ε-NN: ε={epsilon:.4f}, nodes={G.number_of_nodes()}, " f"edges={G.number_of_edges()} (approximate)") return G, A