Source code for ts2net.bsts.features

"""
Extract features from time series with optional BSTS decomposition.

Combines structural decomposition with network analysis of residuals.
"""

from dataclasses import dataclass
from typing import Optional, List, Dict, Any
import numpy as np
from numpy.typing import NDArray

from .decompose import BSTSSpec, decompose, DecompositionResult
from ..api import HVG, NVG, TransitionNetwork


[docs] @dataclass class FeaturesResult: """Result of feature extraction with BSTS decomposition. Attributes ---------- raw_stats : dict Basic statistics of raw series (mean, std, min, max, etc.) structural_stats : dict Structural component statistics (variances, seasonal strength, etc.) residual_network_stats : dict Network statistics computed on residual (HVG, NVG, transition) """ raw_stats: Dict[str, Any] structural_stats: Dict[str, Any] residual_network_stats: Dict[str, Any]
def _compute_raw_stats(series: NDArray[np.float64]) -> Dict[str, float]: """Compute basic statistics of raw series.""" return { 'mean': float(np.mean(series)), 'std': float(np.std(series)), 'min': float(np.min(series)), 'max': float(np.max(series)), 'median': float(np.median(series)), 'q25': float(np.percentile(series, 25)), 'q75': float(np.percentile(series, 75)), 'n_points': len(series) } def _compute_structural_stats(decomp: DecompositionResult) -> Dict[str, Any]: """Compute statistics of structural components.""" stats = { 'level_variance': decomp.level_variance, 'error_variance': decomp.error_variance, 'residual_std': float(np.std(decomp.residual)), 'residual_mean': float(np.mean(decomp.residual)), } # Lag-1 autocorrelation of residual (whiteness proxy) if len(decomp.residual) > 1: lag1_corr = np.corrcoef(decomp.residual[:-1], decomp.residual[1:])[0, 1] stats['residual_lag1_autocorr'] = float(lag1_corr) if not np.isnan(lag1_corr) else 0.0 else: stats['residual_lag1_autocorr'] = 0.0 if decomp.trend_variance is not None: stats['trend_variance'] = decomp.trend_variance if decomp.seasonal_variance: stats['seasonal_variance'] = decomp.seasonal_variance # Compute seasonal strength (variance ratio) total_variance = decomp.level_variance + decomp.error_variance if decomp.trend_variance: total_variance += decomp.trend_variance seasonal_var_sum = sum(decomp.seasonal_variance.values()) if total_variance > 0: stats['seasonal_strength'] = float(seasonal_var_sum / total_variance) else: stats['seasonal_strength'] = 0.0 return stats def _compute_network_stats( residual: NDArray[np.float64], methods: List[str], nvg_limit: Optional[int] = None ) -> Dict[str, Any]: """Compute network statistics on residual.""" stats = {} if 'hvg' in methods: try: hvg = HVG(output='stats') hvg.build(residual) hvg_stats = hvg.stats() stats['hvg'] = { 'n_nodes': hvg_stats['n_nodes'], 'n_edges': hvg_stats['n_edges'], 'avg_degree': hvg_stats['avg_degree'], 'density': hvg_stats['density'] } except Exception as e: stats['hvg_error'] = str(e) if 'nvg' in methods: try: nvg = NVG(output='stats', limit=nvg_limit or 3000) nvg.build(residual) nvg_stats = nvg.stats() stats['nvg'] = { 'n_nodes': nvg_stats['n_nodes'], 'n_edges': nvg_stats['n_edges'], 'avg_degree': nvg_stats['avg_degree'], 'density': nvg_stats['density'] } except Exception as e: stats['nvg_error'] = str(e) if 'transition' in methods: try: tn = TransitionNetwork(symbolizer='ordinal', order=3, output='stats') tn.build(residual) tn_stats = tn.stats() stats['transition'] = { 'n_nodes': tn_stats['n_nodes'], 'n_edges': tn_stats['n_edges'], 'avg_degree': tn_stats['avg_degree'] } except Exception as e: stats['transition_error'] = str(e) return stats
[docs] def features( series: NDArray[np.float64], methods: List[str] = None, bsts: Optional[BSTSSpec] = None, window: Optional[int] = None, nvg_limit: Optional[int] = None ) -> FeaturesResult: """ Extract features from time series with optional BSTS decomposition. If BSTS is enabled, decomposes series and analyzes residual with network methods. If BSTS is disabled, analyzes raw series. Parameters ---------- series : array Input time series methods : list of str, optional Network methods to apply: 'hvg', 'nvg', 'transition' Default: ['hvg', 'transition'] bsts : BSTSSpec, optional BSTS decomposition specification. If None, analyzes raw series. window : int, optional Window size for windowed analysis. If None, analyzes full series. nvg_limit : int, optional Horizon limit for NVG (default: 3000) Returns ------- FeaturesResult Three blocks: raw_stats, structural_stats, residual_network_stats Examples -------- >>> from ts2net.bsts import features, BSTSSpec >>> spec = BSTSSpec(level=True, seasonal_periods=[24, 168]) >>> result = features(x, methods=['hvg', 'transition'], bsts=spec) >>> print(result.structural_stats['seasonal_strength']) >>> print(result.residual_network_stats['hvg']['avg_degree']) """ if methods is None: methods = ['hvg', 'transition'] # Compute raw statistics raw_stats = _compute_raw_stats(series) # If no BSTS, analyze raw series if bsts is None: network_stats = _compute_network_stats(series, methods, nvg_limit) return FeaturesResult( raw_stats=raw_stats, structural_stats={}, residual_network_stats=network_stats ) # Decompose with BSTS try: decomp = decompose(series, bsts) except Exception as e: # If decomposition fails, return raw stats and error return FeaturesResult( raw_stats=raw_stats, structural_stats={'decomposition_error': str(e)}, residual_network_stats={} ) # Compute structural statistics structural_stats = _compute_structural_stats(decomp) # Analyze residual with network methods if window is None: # Analyze full residual residual_network_stats = _compute_network_stats(decomp.residual, methods, nvg_limit) else: # Windowed analysis - aggregate across windows window_stats = [] for i in range(0, len(decomp.residual) - window + 1, window): window_residual = decomp.residual[i:i + window] if np.std(window_residual) > 0: # Skip constant windows window_stat = _compute_network_stats(window_residual, methods, nvg_limit) window_stats.append(window_stat) # Aggregate window statistics (median, IQR) if window_stats: residual_network_stats = {} for method in methods: if method in window_stats[0]: method_stats = [ws[method] for ws in window_stats if method in ws] if method_stats: # Aggregate key metrics avg_degrees = [ms['avg_degree'] for ms in method_stats] n_edges = [ms['n_edges'] for ms in method_stats] residual_network_stats[method] = { 'avg_degree_median': float(np.median(avg_degrees)), 'avg_degree_q25': float(np.percentile(avg_degrees, 25)), 'avg_degree_q75': float(np.percentile(avg_degrees, 75)), 'n_edges_median': float(np.median(n_edges)), 'n_windows': len(method_stats) } else: residual_network_stats = {} return FeaturesResult( raw_stats=raw_stats, structural_stats=structural_stats, residual_network_stats=residual_network_stats )