Source code for ts2net.multivariate.feature_comparison

"""
Feature-wise network comparisons for multiple time series.

This module provides functions to compute network statistics for multiple
time series and compare them, enabling analysis of how different series
behave in terms of their network properties.
"""

from __future__ import annotations

from typing import Optional, List, Dict, Union, Literal
import warnings
import numpy as np
from numpy.typing import NDArray
import pandas as pd
import networkx as nx
from scipy.sparse import csr_matrix

from ..api import HVG, NVG
from ..core.recurrence import RecurrenceNetwork
from ..core.transition import TransitionNetwork


def _extract_graph_stats(G: Union[nx.Graph, csr_matrix]) -> Dict[str, float]:
    """Extract statistics from graph (NetworkX or sparse matrix)."""
    if isinstance(G, nx.Graph):
        n_nodes = G.number_of_nodes()
        n_edges = G.number_of_edges()
        degrees = np.array([d for _, d in G.degree()])
        return {
            "n_nodes": n_nodes,
            "n_edges": n_edges,
            "density": nx.density(G),
            "avg_degree": float(np.mean(degrees)) if len(degrees) > 0 else 0.0,
            "std_degree": float(np.std(degrees)) if len(degrees) > 1 else 0.0,
            "min_degree": float(np.min(degrees)) if len(degrees) > 0 else 0.0,
            "max_degree": float(np.max(degrees)) if len(degrees) > 0 else 0.0,
        }
    n_nodes = G.shape[0]
    n_edges = G.nnz // 2
    density = n_edges / (n_nodes * (n_nodes - 1)) if n_nodes > 1 else 0.0
    return {
        "n_nodes": n_nodes,
        "n_edges": n_edges,
        "density": density,
    }


def _build_network_stats(method: str, x: NDArray[np.float64], **kwargs) -> Dict[str, float]:
    """Build network and extract statistics using method dispatch."""
    method_builders = {
        "hvg": lambda: HVG(**kwargs),
        "nvg": lambda: NVG(**kwargs),
    }
    
    method_transformers = {
        "recurrence": lambda: RecurrenceNetwork(**kwargs),
        "transition": lambda: TransitionNetwork(**kwargs),
    }
    
    if method in method_builders:
        builder = method_builders[method]()
        builder.build(x)
        return builder.stats()
    
    if method in method_transformers:
        builder = method_transformers[method]()
        G = builder.fit(x).transform()
        return _extract_graph_stats(G)
    
    raise ValueError(f"Unknown method: {method}. Use 'hvg', 'nvg', 'recurrence', or 'transition'")


[docs] def compute_network_features( X: Union[List[NDArray[np.float64]], NDArray[np.float64]], method: Literal["hvg", "nvg", "recurrence", "transition"] = "hvg", series_names: Optional[List[str]] = None, **kwargs ) -> pd.DataFrame: """ Compute network features for multiple time series. For each time series, builds a network and extracts summary statistics. Returns a DataFrame with one row per series and columns for each feature. Parameters ---------- X : list of arrays or array (n_series, n_points) Multiple time series to analyze method : str, default "hvg" Network construction method: "hvg", "nvg", "recurrence", or "transition" series_names : list of str, optional Names for each series (default: "Series_0", "Series_1", ...) **kwargs Additional arguments passed to network builder (e.g., weighted, k, threshold) Returns ------- df : pandas.DataFrame DataFrame with network features for each series: - n_nodes: Number of nodes - n_edges: Number of edges - density: Edge density - avg_degree: Average degree - std_degree: Standard deviation of degree - min_degree: Minimum degree - max_degree: Maximum degree - (and method-specific features) Examples -------- >>> import numpy as np >>> from ts2net.multivariate import compute_network_features >>> >>> # Create multiple time series >>> X = [np.random.randn(100) for _ in range(5)] >>> >>> # Compute HVG features for all series >>> features = compute_network_features(X, method="hvg") >>> print(features) >>> >>> # Compare series >>> print(features.describe()) """ # Normalize input format if isinstance(X, np.ndarray): X = [X] if X.ndim == 1 else [X[i] for i in range(X.shape[0])] if X[0].ndim > 1: raise ValueError(f"X must be 1D or 2D array, got shape {X[0].shape}") n_series = len(X) series_names = series_names or [f"Series_{i}" for i in range(n_series)] if len(series_names) != n_series: raise ValueError(f"series_names length ({len(series_names)}) must match number of series ({n_series})") # Build networks and extract features all_features = [] default_stats = { "n_nodes": np.nan, "n_edges": np.nan, "density": np.nan, "avg_degree": np.nan, "std_degree": np.nan, "min_degree": np.nan, "max_degree": np.nan, } for i, x in enumerate(X): try: stats = _build_network_stats(method, x, **kwargs) stats["series_name"] = series_names[i] all_features.append(stats) except ValueError: raise except Exception as e: warnings.warn(f"Failed to process series {i} ({series_names[i]}): {e}") stats = default_stats.copy() stats["series_name"] = series_names[i] all_features.append(stats) df = pd.DataFrame(all_features) if "series_name" in df.columns: df = df.set_index("series_name") return df
[docs] def compare_network_features( features_df: pd.DataFrame, metric: Optional[str] = None ) -> Dict[str, Union[float, Dict[str, float]]]: """ Compare network features across multiple series. Computes summary statistics and similarity measures for network features across different time series. Parameters ---------- features_df : pandas.DataFrame DataFrame from `compute_network_features()` with network features for multiple series metric : str, optional Specific metric to compare (if None, compares all numeric columns) Returns ------- comparison : dict Dictionary with comparison metrics: - "mean": Mean value across series - "std": Standard deviation across series - "min": Minimum value - "max": Maximum value - "range": Range (max - min) - "cv": Coefficient of variation (std / mean) - "similarity_matrix": Correlation matrix of features (if multiple metrics) Examples -------- >>> features = compute_network_features(X, method="hvg") >>> comparison = compare_network_features(features) >>> print(f"Avg density: {comparison['density']['mean']:.3f}") >>> print(f"Density CV: {comparison['density']['cv']:.3f}") """ numeric_cols = features_df.select_dtypes(include=[np.number]).columns if len(numeric_cols) == 0: return {"error": "No numeric features found"} if metric is not None: if metric not in numeric_cols: raise ValueError(f"Metric '{metric}' not found. Available: {list(numeric_cols)}") numeric_cols = [metric] comparison = {} for col in numeric_cols: values = features_df[col].dropna() if len(values) == 0: comparison[col] = { "mean": np.nan, "std": np.nan, "min": np.nan, "max": np.nan, "range": np.nan, "cv": np.nan, } else: mean_val = float(values.mean()) std_val = float(values.std()) min_val = float(values.min()) max_val = float(values.max()) comparison[col] = { "mean": mean_val, "std": std_val, "min": min_val, "max": max_val, "range": max_val - min_val, "cv": std_val / mean_val if mean_val != 0 else np.nan, } if len(numeric_cols) > 1: similarity = features_df[numeric_cols].T.corr() comparison["similarity_matrix"] = similarity.to_dict() return comparison
[docs] def cluster_series_by_features( features_df: pd.DataFrame, n_clusters: Optional[int] = None, method: str = "kmeans" ) -> Dict[str, int]: """ Cluster time series based on their network features. Groups series with similar network properties together. Parameters ---------- features_df : pandas.DataFrame DataFrame from `compute_network_features()` with network features n_clusters : int, optional Number of clusters (if None, uses elbow method) method : str, default "kmeans" Clustering method: "kmeans" or "hierarchical" Returns ------- clusters : dict Dictionary mapping series name to cluster ID Examples -------- >>> features = compute_network_features(X, method="hvg") >>> clusters = cluster_series_by_features(features, n_clusters=3) >>> print(f"Series grouped into {len(set(clusters.values()))} clusters") """ from sklearn.preprocessing import StandardScaler from sklearn.cluster import KMeans, AgglomerativeClustering numeric_cols = features_df.select_dtypes(include=[np.number]).columns X = features_df[numeric_cols].fillna(0).values scaler = StandardScaler() X_scaled = scaler.fit_transform(X) n_clusters = n_clusters or max(2, int(np.sqrt(len(features_df) / 2))) clusterers = { "kmeans": lambda: KMeans(n_clusters=n_clusters, random_state=42, n_init=10), "hierarchical": lambda: AgglomerativeClustering(n_clusters=n_clusters), } if method not in clusterers: raise ValueError(f"Unknown method: {method}") clusterer = clusterers[method]() labels = clusterer.fit_predict(X_scaled) return {name: int(label) for name, label in zip(features_df.index, labels)}