Source code for ts2net.multivariate.joint_cross

"""
Joint and cross methods for multivariate time series network analysis.

This module provides methods for analyzing relationships between multiple
time series using network-based approaches:
- Joint recurrence networks: edges when multiple series are recurrent simultaneously
- Cross visibility graphs: visibility relationships between different series
- Coupling measures: synchronization and coupling strength metrics
- Multivariate network comparison: metrics for comparing multiple networks
"""

from __future__ import annotations

from typing import Tuple, Optional, List, Dict, Union
import numpy as np
from numpy.typing import NDArray
import networkx as nx
from scipy import sparse
from scipy.sparse import csr_matrix

from ..core.graph import Graph
from ..core.visibility.weights import compute_weight, WeightMode


[docs] def joint_recurrence_network( x1: NDArray[np.float64], x2: NDArray[np.float64], threshold: Optional[float] = None, k: Optional[int] = None, method: str = "epsilon", metric: str = "euclidean", weighted: bool = False, directed: bool = False ) -> Tuple[nx.Graph, NDArray]: """ Construct a joint recurrence network from two time series. A joint recurrence occurs when both series are recurrent at the same time. An edge (i, j) exists if: - Series 1: points i and j are recurrent (within threshold or k-NN) - Series 2: points i and j are recurrent (within threshold or k-NN) Parameters ---------- x1 : array (n,) First time series x2 : array (n,) Second time series (must have same length as x1) threshold : float, optional Distance threshold for epsilon recurrence (required if method="epsilon") k : int, optional Number of nearest neighbors for k-NN recurrence (required if method="knn") method : str, default "epsilon" Recurrence method: "epsilon" (threshold-based) or "knn" (k-nearest neighbors) metric : str, default "euclidean" Distance metric (only used for embedding if needed) weighted : bool, default False If True, weight edges by average distance directed : bool, default False If True, create directed graph Returns ------- G : networkx.Graph or DiGraph Joint recurrence network A : array (n, n) Adjacency matrix Examples -------- >>> import numpy as np >>> x1 = np.random.randn(100) >>> x2 = np.random.randn(100) >>> G, A = joint_recurrence_network(x1, x2, threshold=0.5, method="epsilon") >>> print(f"Joint recurrence network: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges") """ if len(x1) != len(x2): raise ValueError(f"Series must have same length: {len(x1)} != {len(x2)}") n = len(x1) # Build recurrence matrices for each series R1 = _build_recurrence_matrix(x1, threshold=threshold, k=k, method=method) R2 = _build_recurrence_matrix(x2, threshold=threshold, k=k, method=method) # Joint recurrence: both must be recurrent R_joint = R1.multiply(R2) # Element-wise AND # Convert to networkx graph if directed: G = nx.DiGraph() else: G = nx.Graph() G.add_nodes_from(range(n)) # Add edges if weighted: # Use average distance as weight D1 = _pairwise_distances(x1) D2 = _pairwise_distances(x2) D_avg = (D1 + D2) / 2.0 rows, cols = R_joint.nonzero() for i, j in zip(rows, cols): if not directed or i < j: weight = float(D_avg[i, j]) G.add_edge(i, j, weight=weight) else: rows, cols = R_joint.nonzero() for i, j in zip(rows, cols): if not directed or i < j: G.add_edge(i, j) # Convert to dense adjacency matrix for return A = nx.adjacency_matrix(G, nodelist=range(n)).toarray() return G, A
[docs] def cross_visibility_graph( x1: NDArray[np.float64], x2: NDArray[np.float64], method: str = "hvg", weighted: Union[bool, str] = False, weight_mode: Optional[str] = None, limit: Optional[int] = None, directed: bool = False ) -> Tuple[nx.Graph, NDArray]: """ Construct a cross visibility graph between two time series. A cross visibility graph connects points from different series if they are visible to each other. Visibility is determined by the visibility criterion applied across series boundaries. Parameters ---------- x1 : array (n1,) First time series x2 : array (n2,) Second time series (can have different length) method : str, default "hvg" Visibility method: "hvg" (horizontal) or "nvg" (natural) weighted : bool or str, default False If True, use "absdiff" weight mode. If str, use that weight mode. weight_mode : str, optional Explicit weight mode (overrides weighted if provided) limit : int, optional Maximum temporal distance for visibility directed : bool, default False If True, create directed graph Returns ------- G : networkx.Graph or DiGraph Cross visibility graph (bipartite: nodes 0..n1-1 from x1, n1..n1+n2-1 from x2) A : array (n1+n2, n1+n2) Adjacency matrix Examples -------- >>> import numpy as np >>> x1 = np.random.randn(50) >>> x2 = np.random.randn(50) >>> G, A = cross_visibility_graph(x1, x2, method="hvg") >>> print(f"Cross visibility: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges") """ n1, n2 = len(x1), len(x2) n_total = n1 + n2 # Resolve weight mode if weight_mode is not None: w_mode = weight_mode is_weighted = True elif isinstance(weighted, str): w_mode = weighted is_weighted = True elif weighted is True: w_mode = "absdiff" is_weighted = True else: w_mode = None is_weighted = False # Create graph G = nx.DiGraph() if directed else nx.Graph() # Add nodes: 0..n1-1 for x1, n1..n1+n2-1 for x2 G.add_nodes_from(range(n_total)) # Build cross visibility edges visibility_checkers = { "hvg": _cross_hvg_visible, "nvg": _cross_nvg_visible, } checker = visibility_checkers.get(method) if checker is None: raise ValueError(f"Unknown method: {method}. Use 'hvg' or 'nvg'") edges = [] for i in range(n1): for j in range(n1, n_total): j_idx = j - n1 if limit is not None and abs(i - j_idx) > limit: continue if checker(x1, x2, i, j_idx): if is_weighted: weight = _compute_cross_weight(x1, x2, i, j_idx, w_mode) edges.append((i, j, weight)) else: edges.append((i, j)) # Add edges to graph if is_weighted: G.add_weighted_edges_from(edges) else: G.add_edges_from(edges) # Build adjacency matrix A = nx.adjacency_matrix(G, nodelist=range(n_total)).toarray() return G, A
def _compute_cross_weight( x1: NDArray[np.float64], x2: NDArray[np.float64], i: int, j: int, w_mode: str ) -> float: """Compute weight for cross visibility edge.""" weight_functions = { "absdiff": lambda: abs(x1[i] - x2[j]), "time_gap": lambda: abs(i - j), "slope": lambda: (x2[j] - x1[i]) / (j - i + 1), } func = weight_functions.get(w_mode, weight_functions["absdiff"]) return float(func()) def _cross_hvg_visible( x1: NDArray[np.float64], x2: NDArray[np.float64], i: int, j: int ) -> bool: """Check if point i in x1 and point j in x2 are horizontally visible.""" threshold = min(x1[i], x2[j]) return (np.all(x1[i + 1:] < threshold) and np.all(x2[:j] < threshold)) def _cross_nvg_visible( x1: NDArray[np.float64], x2: NDArray[np.float64], i: int, j: int ) -> bool: """Check if point i in x1 and point j in x2 are naturally visible.""" # For natural visibility, the line from (i, x1[i]) to (j, x2[j]) # should not intersect intermediate points # Map to a common coordinate system # Use indices as x-coordinates, values as y-coordinates x_i, y_i = i, x1[i] x_j, y_j = len(x1) + j, x2[j] # x2 starts after x1 # Check intermediate points in x1 (after i) for k in range(i + 1, len(x1)): x_k, y_k = k, x1[k] line_y = y_i + (y_j - y_i) * (x_k - x_i) / (x_j - x_i) if y_k > line_y: return False # Check intermediate points in x2 (before j) for k in range(j): x_k, y_k = len(x1) + k, x2[k] line_y = y_i + (y_j - y_i) * (x_k - x_i) / (x_j - x_i) if y_k > line_y: return False return True
[docs] def coupling_strength( x1: NDArray[np.float64], x2: NDArray[np.float64], method: str = "joint_recurrence", threshold: Optional[float] = None, k: Optional[int] = None ) -> Dict[str, float]: """ Compute coupling strength between two time series. Parameters ---------- x1 : array (n,) First time series x2 : array (n,) Second time series method : str, default "joint_recurrence" Coupling method: "joint_recurrence" or "cross_visibility" threshold : float, optional Threshold for recurrence (if method="joint_recurrence") k : int, optional k for k-NN recurrence (if method="joint_recurrence") Returns ------- metrics : dict Dictionary with coupling metrics: - coupling_strength: Overall coupling strength (0-1) - joint_recurrence_rate: Fraction of joint recurrences - synchronization: Degree of synchronization - asymmetry: Asymmetry in coupling (0 = symmetric) Examples -------- >>> import numpy as np >>> x1 = np.random.randn(100) >>> x2 = x1 + 0.1 * np.random.randn(100) # Coupled series >>> metrics = coupling_strength(x1, x2, method="joint_recurrence", threshold=0.5) >>> print(f"Coupling strength: {metrics['coupling_strength']:.3f}") """ if len(x1) != len(x2): raise ValueError(f"Series must have same length: {len(x1)} != {len(x2)}") n = len(x1) if method == "joint_recurrence": # Build individual recurrence matrices R1 = _build_recurrence_matrix(x1, threshold=threshold, k=k, method="epsilon" if threshold else "knn") R2 = _build_recurrence_matrix(x2, threshold=threshold, k=k, method="epsilon" if threshold else "knn") # Joint recurrence R_joint = R1.multiply(R2) # Individual recurrence rates RR1 = R1.sum() / (n * n) RR2 = R2.sum() / (n * n) RR_joint = R_joint.sum() / (n * n) # Coupling strength: ratio of joint to expected (if independent) expected_joint = RR1 * RR2 if expected_joint > 0: coupling_strength = RR_joint / expected_joint else: coupling_strength = 0.0 # Synchronization: correlation of recurrence patterns R1_vec = R1.toarray().flatten() R2_vec = R2.toarray().flatten() synchronization = float(np.corrcoef(R1_vec, R2_vec)[0, 1]) if np.isnan(synchronization): synchronization = 0.0 # Asymmetry: difference in individual recurrence rates asymmetry = abs(RR1 - RR2) return { "coupling_strength": float(coupling_strength), "joint_recurrence_rate": float(RR_joint), "synchronization": float(synchronization), "asymmetry": float(asymmetry), "recurrence_rate_1": float(RR1), "recurrence_rate_2": float(RR2), } elif method == "cross_visibility": # Build cross visibility graph G, A = cross_visibility_graph(x1, x2, method="hvg") # Coupling strength: density of cross visibility n1, n2 = len(x1), len(x2) max_edges = n1 * n2 actual_edges = G.number_of_edges() coupling_strength = actual_edges / max_edges if max_edges > 0 else 0.0 # Additional metrics from cross visibility # Degree distribution analysis degrees_x1 = [G.degree(i) for i in range(n1)] degrees_x2 = [G.degree(i) for i in range(n1, n1 + n2)] return { "coupling_strength": float(coupling_strength), "cross_visibility_density": float(coupling_strength), "mean_degree_x1": float(np.mean(degrees_x1)), "mean_degree_x2": float(np.mean(degrees_x2)), "synchronization": float(np.corrcoef(degrees_x1, degrees_x2[:n1] if n1 == n2 else [0])[0, 1]) if n1 == n2 else 0.0, "asymmetry": float(abs(np.mean(degrees_x1) - np.mean(degrees_x2))), } else: raise ValueError(f"Unknown method: {method}. Use 'joint_recurrence' or 'cross_visibility'")
[docs] def network_comparison_metrics( networks: List[nx.Graph], names: Optional[List[str]] = None ) -> Dict[str, Union[float, NDArray]]: """ Compute comparison metrics for multiple networks. Parameters ---------- networks : list of networkx.Graph List of networks to compare names : list of str, optional Names for each network Returns ------- metrics : dict Dictionary with comparison metrics: - density_similarity: Pairwise density correlations - degree_correlation: Pairwise degree sequence correlations - edge_overlap: Pairwise edge overlap (Jaccard similarity) - structural_similarity: Overall structural similarity matrix """ n_networks = len(networks) if names is None: names = [f"network_{i}" for i in range(n_networks)] if len(names) != n_networks: raise ValueError(f"Number of names ({len(names)}) must match number of networks ({n_networks})") # Ensure all networks have same nodes all_nodes = set() for G in networks: all_nodes.update(G.nodes()) all_nodes = sorted(all_nodes) # Compute metrics for each network densities = [] degree_sequences = [] edge_sets = [] for G in networks: # Density n = G.number_of_nodes() m = G.number_of_edges() max_edges = n * (n - 1) / 2 if not G.is_directed() else n * (n - 1) density = m / max_edges if max_edges > 0 else 0.0 densities.append(density) # Degree sequence (0 for nodes not in this network) degrees = np.array([G.degree(node) if node in G else 0 for node in all_nodes]) degree_sequences.append(degrees) # Edge set (only edges that exist in this network) edge_sets.append(set(G.edges())) # Pairwise comparisons # Always create n_networks x n_networks matrix density_similarity = np.ones((n_networks, n_networks)) if n_networks > 1: densities_array = np.array(densities) if np.std(densities_array) > 1e-10: # Need variation to compute correlation try: corr_result = np.corrcoef(densities_array) # np.corrcoef always returns n x n for n inputs if corr_result.shape == (n_networks, n_networks): density_similarity = corr_result # If shape is wrong (shouldn't happen), keep identity matrix except (ValueError, np.linalg.LinAlgError): # If correlation fails, use identity (all similar) pass degree_correlation = np.zeros((n_networks, n_networks)) edge_overlap = np.zeros((n_networks, n_networks)) for i in range(n_networks): for j in range(n_networks): # Degree correlation if np.std(degree_sequences[i]) > 0 and np.std(degree_sequences[j]) > 0: corr = np.corrcoef(degree_sequences[i], degree_sequences[j])[0, 1] degree_correlation[i, j] = corr if not np.isnan(corr) else 0.0 else: degree_correlation[i, j] = 1.0 if np.allclose(degree_sequences[i], degree_sequences[j]) else 0.0 # Edge overlap (Jaccard similarity) intersection = len(edge_sets[i] & edge_sets[j]) union = len(edge_sets[i] | edge_sets[j]) edge_overlap[i, j] = intersection / union if union > 0 else 0.0 # Overall structural similarity (average of normalized metrics) structural_similarity = ( (density_similarity + 1) / 2 + # Normalize correlation to [0, 1] (degree_correlation + 1) / 2 + edge_overlap ) / 3.0 return { "density_similarity": density_similarity, "degree_correlation": degree_correlation, "edge_overlap": edge_overlap, "structural_similarity": structural_similarity, "network_names": names, }
# Helper functions def _build_recurrence_matrix( x: NDArray[np.float64], threshold: Optional[float] = None, k: Optional[int] = None, method: str = "epsilon" ) -> csr_matrix: """Build recurrence matrix for a single series.""" n = len(x) if method == "epsilon": if threshold is None: raise ValueError("threshold required for epsilon method") # Build sparse recurrence matrix rows, cols = [], [] for i in range(n): for j in range(i + 1, n): dist = abs(x[i] - x[j]) if dist <= threshold: rows.append(i) cols.append(j) rows.append(j) cols.append(i) data = np.ones(len(rows), dtype=float) R = csr_matrix((data, (rows, cols)), shape=(n, n)) return R elif method == "knn": if k is None: k = 5 # Default k value # Build k-NN recurrence matrix rows, cols = [], [] for i in range(n): # Compute distances distances = [(j, abs(x[i] - x[j])) for j in range(n) if i != j] distances.sort(key=lambda x: x[1]) # Add k nearest neighbors for j, _ in distances[:k]: rows.append(i) cols.append(j) data = np.ones(len(rows), dtype=float) R = csr_matrix((data, (rows, cols)), shape=(n, n)) return R else: raise ValueError(f"Unknown method: {method}") def _pairwise_distances(x: NDArray[np.float64]) -> NDArray[np.float64]: """Compute pairwise distances for a 1D series.""" n = len(x) D = np.zeros((n, n)) for i in range(n): for j in range(n): D[i, j] = abs(x[i] - x[j]) return D