"""
Distance functions for comparing multiple time series.
Implements R ts2net API for distance matrix calculation.
API Design Credit
-----------------
The distance functions and API design are based on the R ts2net package by
Leonardo N. Ferreira:
Ferreira, L.N. (2024). From time series to networks in R with the ts2net
package. Applied Network Science, 9(1), 32.
https://doi.org/10.1007/s41109-024-00642-2
Original R package: https://github.com/lnferreira/ts2net
This Python implementation extends the R API with:
- Numba acceleration for performance
- Parallel processing with joblib
- Additional distance functions
"""
import numpy as np
from numpy.typing import NDArray
from typing import Optional, Callable, Dict
import logging
from scipy.stats import pearsonr
from scipy.signal import correlate
from joblib import Parallel, delayed
logger = logging.getLogger(__name__)
try:
from numba import njit
HAS_NUMBA = True
except ImportError:
HAS_NUMBA = False
def njit(*args, **kwargs):
def decorator(func):
return func
return decorator if not args else decorator(args[0])
# ============================================================================
# Distance Functions
# ============================================================================
def tsdist_cor(x: np.ndarray, y: np.ndarray) -> float:
"""Pearson correlation distance: d = 1 - |ρ|"""
if len(x) != len(y):
raise ValueError("Time series must have same length")
rho = np.corrcoef(x, y)[0, 1]
return 1.0 - abs(rho)
def tsdist_ccf(x: np.ndarray, y: np.ndarray, max_lag: Optional[int] = None) -> float:
"""Cross-correlation distance: d = 1 - max(|CCF|)"""
if len(x) != len(y):
raise ValueError("Time series must have same length")
# Normalize
x_norm = (x - np.mean(x)) / (np.std(x) + 1e-10)
y_norm = (y - np.mean(y)) / (np.std(y) + 1e-10)
# Calculate cross-correlation
ccf = correlate(x_norm, y_norm, mode='full')
ccf /= len(x)
# Restrict to max_lag
if max_lag is not None:
center = len(ccf) // 2
ccf = ccf[center - max_lag:center + max_lag + 1]
return 1.0 - np.max(np.abs(ccf))
def tsdist_dtw(x: np.ndarray, y: np.ndarray, window: Optional[int] = None,
normalize: bool = True) -> float:
"""Dynamic Time Warping distance"""
if HAS_NUMBA:
return _dtw_numba(x, y, window, normalize)
return _dtw_python(x, y, window, normalize)
@njit(cache=True)
def _dtw_numba(x: np.ndarray, y: np.ndarray, window: Optional[int],
normalize: bool) -> float:
"""Numba-accelerated DTW"""
n, m = len(x), len(y)
# Initialize cost matrix with infinity
cost = np.full((n + 1, m + 1), np.inf)
cost[0, 0] = 0.0
# Fill cost matrix
for i in range(1, n + 1):
# Apply Sakoe-Chiba band if window specified
j_start = max(1, i - window) if window is not None else 1
j_end = min(m + 1, i + window + 1) if window is not None else m + 1
for j in range(j_start, j_end):
dist = abs(x[i - 1] - y[j - 1])
cost[i, j] = dist + min(cost[i - 1, j], cost[i, j - 1], cost[i - 1, j - 1])
distance = cost[n, m]
if normalize:
distance /= (n + m)
return distance
def _dtw_python(x: np.ndarray, y: np.ndarray, window: Optional[int],
normalize: bool) -> float:
"""Python fallback for DTW"""
n, m = len(x), len(y)
cost = np.full((n + 1, m + 1), np.inf)
cost[0, 0] = 0.0
for i in range(1, n + 1):
j_start = max(1, i - window) if window is not None else 1
j_end = min(m + 1, i + window + 1) if window is not None else m + 1
for j in range(j_start, j_end):
dist = abs(x[i - 1] - y[j - 1])
cost[i, j] = dist + min(cost[i - 1, j], cost[i, j - 1], cost[i - 1, j - 1])
distance = cost[n, m]
if normalize:
distance /= (n + m)
return distance
def tsdist_nmi(x: np.ndarray, y: np.ndarray, bins: int = 10) -> float:
"""Normalized Mutual Information distance: d = 1 - NMI"""
# Discretize
x_binned = np.digitize(x, np.linspace(x.min(), x.max(), bins + 1)[1:-1])
y_binned = np.digitize(y, np.linspace(y.min(), y.max(), bins + 1)[1:-1])
# Calculate joint histogram
hist_2d, _, _ = np.histogram2d(x_binned, y_binned, bins=bins)
# Probabilities
pxy = hist_2d / len(x)
px = np.sum(pxy, axis=1)
py = np.sum(pxy, axis=0)
# Entropies
px_py = px[:, None] * py[None, :]
# Avoid log(0)
mask = (pxy > 0) & (px_py > 0)
mi = np.sum(pxy[mask] * np.log(pxy[mask] / px_py[mask]))
hx = -np.sum(px[px > 0] * np.log(px[px > 0]))
hy = -np.sum(py[py > 0] * np.log(py[py > 0]))
nmi = 2 * mi / (hx + hy) if (hx + hy) > 0 else 0.0
return 1.0 - nmi
def tsdist_es(x: np.ndarray, y: np.ndarray, threshold: Optional[float] = None,
tau: int = 1) -> float:
"""Event Synchronization distance"""
# Detect events (peaks above threshold)
if threshold is None:
threshold = np.median(x)
events_x = np.where(x > threshold)[0]
events_y = np.where(y > threshold)[0]
if len(events_x) == 0 or len(events_y) == 0:
return 1.0
# Count synchronized events
sync_count = 0
for ex in events_x:
for ey in events_y:
if abs(ex - ey) <= tau:
sync_count += 1
break
# Normalize
sync = 2 * sync_count / (len(events_x) + len(events_y))
return 1.0 - sync
def tsdist_voi(x: np.ndarray, y: np.ndarray, bins: int = 10) -> float:
"""
Variation of Information distance.
VOI = H(X) + H(Y) - 2*MI(X,Y) where H is entropy and MI is mutual information.
Parameters
----------
x, y : array
Time series to compare
bins : int
Number of bins for discretization
Returns
-------
float
VOI distance (0 = identical, higher = more different)
"""
# Discretize
x_binned = np.digitize(x, np.linspace(x.min(), x.max(), bins + 1)[1:-1])
y_binned = np.digitize(y, np.linspace(y.min(), y.max(), bins + 1)[1:-1])
# Calculate joint histogram
hist_2d, _, _ = np.histogram2d(x_binned, y_binned, bins=bins)
# Probabilities
pxy = hist_2d / len(x)
px = np.sum(pxy, axis=1)
py = np.sum(pxy, axis=0)
# Entropies
hx = -np.sum(px[px > 0] * np.log(px[px > 0]))
hy = -np.sum(py[py > 0] * np.log(py[py > 0]))
# Mutual information
px_py = px[:, None] * py[None, :]
mask = (pxy > 0) & (px_py > 0)
mi = np.sum(pxy[mask] * np.log(pxy[mask] / px_py[mask]))
# VOI = H(X) + H(Y) - 2*MI(X,Y)
voi = hx + hy - 2 * mi
return voi
def tsdist_mic(x: np.ndarray, y: np.ndarray, alpha: float = 0.6, c: int = 15) -> float:
"""
Maximal Information Coefficient distance: d = 1 - MIC.
MIC measures general association strength between variables.
Requires the `minepy` package.
Parameters
----------
x, y : array
Time series to compare
alpha : float
MIC parameter controlling search grid resolution
c : int
MIC parameter controlling grid complexity
Returns
-------
float
MIC-based distance (0 = perfect association, 1 = no association)
Notes
-----
Install minepy: pip install minepy
"""
try:
from minepy import MINE
except ImportError:
raise ImportError(
"tsdist_mic requires the 'minepy' package. "
"Install it with: pip install minepy"
)
mine = MINE(alpha=alpha, c=c)
mine.compute_score(x, y)
mic = mine.mic()
return 1.0 - mic
def tsdist_vr(x: np.ndarray, y: np.ndarray, tau: float = 1.0,
threshold: Optional[float] = None) -> float:
"""
van Rossum spike train distance.
Treats time series as spike trains and compares them using
exponentially decaying kernels.
Parameters
----------
x, y : array
Time series to compare
tau : float
Exponential kernel time constant
threshold : float, optional
Spike detection threshold (None = median)
Returns
-------
float
van Rossum distance (0 = identical spike trains)
"""
# Detect spikes (events above threshold)
if threshold is None:
threshold_x = np.median(x)
threshold_y = np.median(y)
else:
threshold_x = threshold_y = threshold
spikes_x = np.where(x > threshold_x)[0]
spikes_y = np.where(y > threshold_y)[0]
if len(spikes_x) == 0 and len(spikes_y) == 0:
return 0.0
if len(spikes_x) == 0 or len(spikes_y) == 0:
return 1.0
# Create convolved spike trains
n = len(x)
t = np.arange(n)
# Convolve with exponential kernel
fx = np.zeros(n)
for spike in spikes_x:
fx += np.exp(-np.abs(t - spike) / tau)
fy = np.zeros(n)
for spike in spikes_y:
fy += np.exp(-np.abs(t - spike) / tau)
# van Rossum distance = integral of squared difference
vr_dist = np.sqrt(np.sum((fx - fy) ** 2))
# Normalize by max possible distance
max_dist = np.sqrt(np.sum(fx ** 2) + np.sum(fy ** 2))
return vr_dist / max_dist if max_dist > 0 else 0.0
# ============================================================================
# Distance Method Registry
# ============================================================================
_DISTANCE_REGISTRY: Dict[str, Callable] = {
'correlation': tsdist_cor,
'cor': tsdist_cor,
'ccf': tsdist_ccf,
'cross_correlation': tsdist_ccf,
'dtw': tsdist_dtw,
'nmi': tsdist_nmi,
'mutual_information': tsdist_nmi,
'voi': tsdist_voi,
'variation_of_information': tsdist_voi,
'es': tsdist_es,
'event_sync': tsdist_es,
'mic': tsdist_mic,
'maximal_information_coefficient': tsdist_mic,
'vr': tsdist_vr,
'van_rossum': tsdist_vr,
}
# ============================================================================
# Master Distance Function
# ============================================================================
[docs]
def ts_dist(X: NDArray[np.float64], method: str = 'correlation',
n_jobs: int = 1, **kwargs) -> NDArray[np.float64]:
"""
Calculate pairwise distance matrix between multiple time series.
Parameters
----------
X : array (n_series, n_timepoints)
Multiple time series to compare
method : str
Distance function: 'correlation', 'ccf', 'dtw', 'nmi', 'es'
n_jobs : int
Number of parallel workers (-1 = all cores)
**kwargs
Distance-specific parameters
Returns
-------
D : array (n_series, n_series)
Distance matrix (symmetric, diagonal = 0)
Examples
--------
>>> import numpy as np
>>> X = np.random.randn(10, 100)
>>> D = ts_dist(X, method='correlation', n_jobs=-1)
>>> D.shape
(10, 10)
"""
if X.ndim != 2:
raise ValueError(f"X must be 2D array, got shape {X.shape}")
n_series, n_points = X.shape
# Get distance function
dist_func = _DISTANCE_REGISTRY.get(method.lower())
if dist_func is None:
raise ValueError(f"Unknown distance method: {method}. "
f"Available: {list(_DISTANCE_REGISTRY.keys())}")
logger.info(f"Computing {method} distance matrix for {n_series} series ({n_points} points each)")
# Calculate distance matrix
if n_jobs == 1:
# Serial computation
D = _compute_distance_matrix_serial(X, dist_func, **kwargs)
else:
# Parallel computation
D = _compute_distance_matrix_parallel(X, dist_func, n_jobs, **kwargs)
return D
def _compute_distance_matrix_serial(X: np.ndarray, dist_func: Callable,
**kwargs) -> np.ndarray:
"""Serial computation of distance matrix"""
n_series = X.shape[0]
D = np.zeros((n_series, n_series))
for i in range(n_series):
for j in range(i + 1, n_series):
d = dist_func(X[i], X[j], **kwargs)
D[i, j] = d
D[j, i] = d
return D
def _compute_distance_matrix_parallel(X: np.ndarray, dist_func: Callable,
n_jobs: int, **kwargs) -> np.ndarray:
"""Parallel computation of distance matrix"""
n_series = X.shape[0]
# Generate pairs
pairs = [(i, j) for i in range(n_series) for j in range(i + 1, n_series)]
# Compute distances in parallel
# Use threading backend to avoid loky multiprocessing segfaults in test environments
distances = Parallel(n_jobs=n_jobs, backend='threading')(
delayed(dist_func)(X[i], X[j], **kwargs) for i, j in pairs
)
# Fill matrix
D = np.zeros((n_series, n_series))
for (i, j), d in zip(pairs, distances):
D[i, j] = d
D[j, i] = d
return D
[docs]
def ts_dist_part(X: NDArray[np.float64], start_idx: int, end_idx: int,
method: str = 'correlation', **kwargs) -> NDArray[np.float64]:
"""
Calculate partial distance matrix for HPC batch processing.
Parameters
----------
X : array (n_series, n_timepoints)
All time series
start_idx : int
Start row index (inclusive)
end_idx : int
End row index (exclusive)
method : str
Distance function name
**kwargs
Distance-specific parameters
Returns
-------
D_part : array (end_idx - start_idx, n_series)
Partial distance matrix (rows start_idx:end_idx)
Examples
--------
>>> # On cluster node 1
>>> D_part = ts_dist_part(X, 0, 100, method='dtw')
>>> np.save('D_part_0_100.npy', D_part)
"""
if X.ndim != 2:
raise ValueError(f"X must be 2D array, got shape {X.shape}")
n_series = X.shape[0]
if start_idx < 0 or end_idx > n_series or start_idx >= end_idx:
raise ValueError(f"Invalid indices: start={start_idx}, end={end_idx}, n={n_series}")
# Get distance function
dist_func = _DISTANCE_REGISTRY.get(method.lower())
if dist_func is None:
raise ValueError(f"Unknown distance method: {method}")
# Calculate partial matrix
n_rows = end_idx - start_idx
D_part = np.zeros((n_rows, n_series))
for i in range(start_idx, end_idx):
for j in range(n_series):
if i == j:
D_part[i - start_idx, j] = 0.0
else:
D_part[i - start_idx, j] = dist_func(X[i], X[j], **kwargs)
logger.info(f"Computed partial distance matrix: rows {start_idx}:{end_idx}")
return D_part