"""
ts2net public API — four graph builders and a factory function.
Primary interface: ``builder.build(x)`` — builds and returns self.
Sklearn interface: ``builder.fit_transform(x)`` — returns NetworkX graph.
Split interface: ``builder.fit(x).transform()`` — deprecated; prefer build().
All builders share the same output methods after build():
.n_nodes int
.n_edges int
.degree_sequence() NDArray[int64]
.adjacency_matrix() sparse CSR (default) or dense
.stats() dict of summary metrics
.as_networkx() nx.Graph (optional; large graphs refused unless force=True)
"""
import numpy as np
from numpy.typing import NDArray
from typing import Optional, Tuple, Union, Literal, List
import networkx as nx
from .core.graph import Graph
from .core.visibility.weights import compute_weight, WeightMode
# Import existing implementations from ts2net.core
from .core.visibility import HVG as _HVG_Old
from .core.visibility import NVG as _NVG_Old
from .core.recurrence import RecurrenceNetwork as _RN_Old
from .core.transition import TransitionNetwork as _TN_Old
def _validate_and_clean_series(x, method_name: str = "ts2net") -> NDArray[np.float64]:
"""
Validate and clean time series input, handling dtype contamination.
Parameters
----------
x : array-like
Input time series (may contain non-numeric values)
method_name : str
Method name for error messages
Returns
-------
x : array (float64)
Clean numeric array
Raises
------
ValueError
If input cannot be converted to valid numeric array
"""
import pandas as pd
# Convert to numpy array first
if not isinstance(x, np.ndarray):
x = np.asarray(x)
# Check if already numeric
if x.dtype.kind in ['f', 'i', 'u']:
# Numeric type - just ensure float64 and check for inf/nan
x = x.astype(np.float64)
x = np.where(np.isfinite(x), x, np.nan)
x = x[~np.isnan(x)]
else:
# Non-numeric or object dtype - use pandas coercion
s = pd.Series(x)
s = pd.to_numeric(s, errors='coerce')
s = s.replace([np.inf, -np.inf], np.nan)
s = s.dropna()
x = s.values.astype(np.float64)
# Final validation
if len(x) == 0:
raise ValueError(
f"{method_name}: No valid numeric values in input series. "
f"Check for non-numeric data, infinities, or all-null values."
)
if x.ndim != 1:
raise ValueError(
f"{method_name}: Input must be 1D array, got shape {x.shape}"
)
# Enhanced validation with warnings for common issues
import warnings
# Check for constant series (can cause issues in some methods)
if np.std(x) == 0:
warnings.warn(
f"{method_name}: Constant series detected (std=0). "
f"Results may be degenerate.",
UserWarning
)
# Check for very short series
if len(x) < 3:
warnings.warn(
f"{method_name}: Very short series (n={len(x)}). "
f"Network may be trivial or degenerate. Consider using longer series.",
UserWarning
)
# Check for very long series (may be slow)
if len(x) > 100_000:
warnings.warn(
f"{method_name}: Very long series (n={len(x)}). "
f"This may be slow. Consider using limit parameter or resampling.",
UserWarning
)
# Check for potential numerical issues
if np.any(np.abs(x) > 1e10):
warnings.warn(
f"{method_name}: Series contains very large values (max={np.max(np.abs(x)):.2e}). "
f"This may cause numerical issues.",
UserWarning
)
return x
[docs]
class HVG:
"""
Horizontal Visibility Graph (HVG).
Two nodes i < j are connected if every intermediate value is strictly
below the lower of the two endpoints::
x[k] < min(x[i], x[j]) for all i < k < j
Complexity: O(n) time and space (monotone stack algorithm).
Key invariants (random i.i.d. series):
- Mean degree → 4 as n → ∞ (Luque et al. 2009)
- Degree distribution follows P(k) ~ (1/3)(2/3)^(k-2) for k ≥ 2
Parameters
----------
weighted : bool
Edge weights = abs(y_i - y_j)
limit : int, optional
Maximum temporal distance between connected nodes. ``None`` means
unconstrained (default). Useful for very long series where distant
connections are not meaningful.
output : {"edges", "degrees", "stats"}, default "edges"
``"edges"`` stores the full edge list; ``"degrees"`` stores only the
degree sequence (fast, low memory); ``"stats"`` stores only summary
statistics (lowest memory).
directed : bool, default False
Produce a directed graph with edges pointing forward in time
(i → j for i < j). Enables irreversibility analysis.
References
----------
Luque, B., Lacasa, L., Ballesteros, F., & Luque, J. (2009). Horizontal
visibility graphs: Exact results for random time series. *Physical Review
E*, 80(4), 046103. https://doi.org/10.1103/PhysRevE.80.046103
Examples
--------
>>> import numpy as np
>>> from ts2net import HVG
>>> x = np.random.randn(1000)
>>> hvg = HVG().build(x) # build() returns self for chaining
>>> print(hvg.n_nodes, hvg.n_edges)
>>> degrees = hvg.degree_sequence
>>> A = hvg.adjacency_matrix() # sparse CSR by default
>>> G_nx = hvg.as_networkx() # Optional
"""
def __init__(self, weighted: Union[bool, str] = False, limit: Optional[int] = None,
only_degrees: bool = False, output: str = "edges", directed: bool = False,
weight_mode: Optional[str] = None):
"""
Parameters
----------
weighted : bool or str, default False
If True, use "absdiff" weight mode. If str, use that weight mode.
Valid modes: "absdiff", "time_gap", "slope", "min_clearance"
weight_mode : str, optional
Explicit weight mode (overrides weighted if provided).
Valid modes: "absdiff", "time_gap", "slope", "min_clearance"
limit : int, optional
Maximum temporal distance
only_degrees : bool
DEPRECATED: Use output="degrees" instead. Performance mode - skip edge storage
output : str, default "edges"
Output mode: "edges" (full edge list), "degrees" (degree sequence only),
or "stats" (summary statistics only, most memory efficient)
directed : bool, default False
If True, create directed graph with edges forward in time (i → j where i < j).
Enables irreversibility analysis useful for fault detection in time series.
"""
# Resolve weight mode
if weight_mode is not None:
self.weight_mode = weight_mode
self.weighted = True
elif isinstance(weighted, str):
self.weight_mode = weighted
self.weighted = True
elif weighted is True:
self.weight_mode = "absdiff" # Default mode
self.weighted = True
else:
self.weight_mode = None
self.weighted = False
self.limit = limit
self.directed = directed
# Backward compatibility
if only_degrees:
self.output = "degrees"
else:
self.output = output
# For _HVG_Old, use bool weighted (it only supports absdiff)
# We'll recompute weights after if needed
self._impl = _HVG_Old(weighted=(self.weighted and self.weight_mode == "absdiff"),
limit=limit, directed=directed)
self._graph = None
self._x = None # Store series for weight recomputation
[docs]
def fit(self, x: NDArray[np.float64]) -> 'HVG':
"""
Fit the HVG model to the time series (scikit-learn compatible).
Parameters
----------
x : array-like
Input time series (1D array)
Returns
-------
self : HVG
Returns self for method chaining
Examples
--------
>>> from ts2net import HVG
>>> import numpy as np
>>> x = np.random.randn(100)
>>> hvg = HVG()
>>> hvg.fit(x) # scikit-learn style
>>> hvg.transform() # Get graph
"""
# Validate and clean input (handles dtype contamination)
x = _validate_and_clean_series(x, "HVG")
self._x = x.copy() # Store for weight recomputation
self._fitted = True
return self
def _compute_degrees(self, G_nx: nx.Graph) -> Tuple[NDArray, Optional[NDArray], Optional[NDArray]]:
"""Compute degree sequences (vectorized)."""
if self.directed:
out_degrees = np.array([d for _, d in G_nx.out_degree()])
in_degrees = np.array([d for _, d in G_nx.in_degree()])
return out_degrees, in_degrees, out_degrees
degrees = np.array([d for _, d in G_nx.degree()])
return degrees, None, None
def _build_degrees_graph(self, G_nx: nx.Graph, x: NDArray) -> Graph:
"""Build graph in degrees-only mode."""
degrees, in_degrees, out_degrees = self._compute_degrees(G_nx)
return Graph(
edges=[],
n_nodes=len(x),
directed=self.directed,
weighted=self.weighted,
_adjacency=None,
_degrees=degrees,
_in_degrees=in_degrees,
_out_degrees=out_degrees
)
def _build_stats_graph(self, G_nx: nx.Graph, x: NDArray) -> Graph:
"""Build graph in stats-only mode."""
degrees, in_degrees, out_degrees = self._compute_degrees(G_nx)
graph = Graph(
edges=[],
n_nodes=len(x),
directed=self.directed,
weighted=self.weighted,
_adjacency=None,
_degrees=degrees,
_in_degrees=in_degrees,
_out_degrees=out_degrees
)
graph._n_edges_cached = G_nx.number_of_edges()
return graph
def _build_edges_graph(self, G_nx: nx.Graph, x: NDArray) -> Graph:
"""Build graph in full edges mode."""
if self.weighted and self.weight_mode:
# Always compute weights fresh — the underlying _HVG_Old / _NVG_Old
# implementations do not store edge weights reliably.
edges = [
(u, v, compute_weight(x, u, v, self.weight_mode))
for u, v in G_nx.edges()
]
else:
edges = [(u, v) for u, v in G_nx.edges()]
return Graph(
edges=edges,
n_nodes=len(x),
directed=self.directed,
weighted=self.weighted,
_adjacency=None
)
[docs]
def build(self, x: NDArray[np.float64]) -> "HVG":
"""
Build HVG from time series.
Equivalent to ``fit(x)``; returns ``self`` for method chaining.
Parameters
----------
x : array-like of shape (n,)
Input time series.
Returns
-------
self : HVG
"""
# Validate and clean input (handles dtype contamination)
x = _validate_and_clean_series(x, "HVG")
self._x = x.copy() # Store for weight recomputation
# Use old implementation
G_nx, A = self._impl.fit_transform(x)
# Convert to new Graph object based on output mode
output_handlers = {
"degrees": lambda: self._build_degrees_graph(G_nx, x),
"stats": lambda: self._build_stats_graph(G_nx, x),
"edges": lambda: self._build_edges_graph(G_nx, x),
}
handler = output_handlers.get(self.output)
if handler is None:
raise ValueError(f"Unknown output mode: {self.output}")
self._graph = handler()
self._fitted = True
return self
@property
def edges(self):
"""Edge list (None if output='degrees' or 'stats')"""
if self._graph is None:
raise ValueError("Call build() first")
if self.output in ("degrees", "stats"):
return None
return self._graph.edges
@property
def n_nodes(self):
"""Number of nodes"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.n_nodes
@property
def n_edges(self):
"""Number of edges"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.n_edges
[docs]
def degree_sequence(self) -> NDArray[np.int64]:
"""
Node degree sequence.
Returns
-------
d : NDArray[int64] of shape (n_nodes,)
Out-degree for directed graphs; total degree for undirected.
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.degree_sequence()
[docs]
def in_degree_sequence(self):
"""In-degree sequence (only valid for directed graphs)"""
if self._graph is None:
raise ValueError("Call build() first")
if not self.directed:
raise ValueError("in_degree_sequence() only valid for directed graphs")
return self._graph.in_degree_sequence()
[docs]
def out_degree_sequence(self):
"""Out-degree sequence (only valid for directed graphs)"""
if self._graph is None:
raise ValueError("Call build() first")
if not self.directed:
raise ValueError("out_degree_sequence() only valid for directed graphs")
return self._graph.out_degree_sequence()
[docs]
def stats(self, include_triangles: bool = False) -> dict:
"""Summary statistics (memory efficient, no dense matrix)"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.summary(include_triangles=include_triangles)
[docs]
def network_metrics(
self,
include: Optional[List[str]] = None,
sample_size: Optional[int] = None,
**kwargs
) -> dict:
"""
Compute advanced network metrics (clustering, path lengths, modularity).
Parameters
----------
include : list, optional
Metrics to include: ["clustering", "path_lengths", "modularity"]
If None, includes all metrics
sample_size : int, optional
For large graphs, sample nodes/pairs for expensive computations
**kwargs
Additional arguments passed to metric functions (e.g., method, weight, resolution, seed)
Returns
-------
dict
Dictionary with network metrics:
- Clustering: avg_clustering, transitivity
- Path lengths: avg_path_length, diameter, radius
- Modularity: modularity, n_communities
Examples
--------
>>> from ts2net import HVG
>>> import numpy as np
>>> x = np.random.randn(100)
>>> hvg = HVG()
>>> hvg.build(x)
>>> metrics = hvg.network_metrics()
>>> print(f"Clustering: {metrics['avg_clustering']:.3f}")
>>> print(f"Avg path length: {metrics['avg_path_length']:.3f}")
>>> print(f"Modularity: {metrics['modularity']:.3f}")
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.network_metrics(include=include, sample_size=sample_size, **kwargs)
[docs]
def test_significance(
self,
metric: str = "density",
method: str = "shuffle",
n_surrogates: int = 200,
alpha: float = 0.05,
rng: Optional[np.random.Generator] = None,
**kwargs
):
"""
Test significance of a network metric against null distribution.
Parameters
----------
metric : str, default "density"
Metric to test. Options: "density", "deg_mean", "deg_std",
"avg_clustering", "assortativity", or any key from stats()
method : str, default "shuffle"
Surrogate generation method: "shuffle", "phase", "circular", "iaaft", "block_bootstrap"
n_surrogates : int, default 200
Number of surrogate series to generate
alpha : float, default 0.05
Significance level (two-tailed)
rng : np.random.Generator, optional
Random number generator
**kwargs
Additional arguments for surrogate generation (e.g., block_size for block_bootstrap)
Returns
-------
result : NetworkSignificanceResult
Significance test result
Examples
--------
>>> from ts2net import HVG
>>> import numpy as np
>>> x = np.random.randn(100)
>>> hvg = HVG()
>>> hvg.build(x)
>>> result = hvg.test_significance(metric="density", method="phase", n_surrogates=100)
>>> print(result)
"""
from .stats.null_models import compute_network_metric_significance
if self._graph is None:
raise ValueError("Call build() first")
if self._x is None:
raise ValueError("Cannot test significance: original time series not stored")
# Create metric function
def metric_fn(ts):
hvg_temp = HVG(
weighted=self.weighted,
weight_mode=self.weight_mode,
limit=self.limit,
directed=self.directed,
output=self.output
)
hvg_temp.build(ts)
stats = hvg_temp.stats()
if metric not in stats:
raise ValueError(f"Unknown metric: {metric}. Available: {list(stats.keys())}")
return float(stats[metric])
return compute_network_metric_significance(
self._x,
metric_fn,
method=method,
n_surrogates=n_surrogates,
alpha=alpha,
metric_name=metric,
rng=rng,
**kwargs
)
[docs]
def adjacency_matrix(
self, format: str = "sparse"
) -> "Union[csr_matrix, coo_matrix, NDArray[np.float64]]":
"""
Adjacency matrix.
Parameters
----------
format : {"sparse", "coo", "dense"}, default "sparse"
``"sparse"`` returns a SciPy CSR matrix.
``"coo"`` returns a SciPy COO matrix.
``"dense"`` returns a NumPy array; **refused** for n > 50 000
nodes (would require ≥ 20 GB RAM).
Returns
-------
A : csr_matrix | coo_matrix | ndarray
Symmetric adjacency matrix of shape (n_nodes, n_nodes).
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.adjacency_matrix(format=format)
[docs]
def edges_coo(
self,
) -> "Tuple[NDArray[np.int64], NDArray[np.int64], Optional[NDArray[np.float64]]]":
"""
Edge list in COO (coordinate) format.
Returns
-------
rows : NDArray[int64] — source node indices
cols : NDArray[int64] — target node indices
weights : NDArray[float64] or None — edge weights (None if unweighted)
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.edges_coo()
[docs]
def as_networkx(self, force: bool = False) -> nx.Graph:
"""
Convert to a NetworkX graph.
Parameters
----------
force : bool, default False
If False, raises for n > 200 000 nodes to prevent accidental
allocation of very large objects.
Returns
-------
G : nx.Graph or nx.DiGraph
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.as_networkx(force=force)
[docs]
class NVG:
"""
Natural Visibility Graph (NVG).
Two nodes i < j are connected if the straight line between (i, x[i]) and
(j, x[j]) lies strictly above all intermediate data points::
x[k] < x[i] + (x[j] - x[i]) * (k - i) / (j - i) for all i < k < j
NVG is a superset of HVG: every HVG edge is also an NVG edge.
Complexity: O(n log n) average (sweep-line algorithm).
Parameters
----------
weighted : bool or str, default False
See HVG for weight mode options.
limit : int, optional
Horizon limit — maximum temporal distance between connected nodes.
**Recommended for series > 10 000 points** (2 000–5 000 suggested).
output : {"edges", "degrees", "stats"}, default "edges"
Controls what is stored after build().
References
----------
Lacasa, L., Luque, B., Ballesteros, F., Luque, J., & Nuño, J. C. (2008).
From time series to complex networks: The visibility graph. *PNAS*,
105(13), 4972–4975. https://doi.org/10.1073/pnas.0709247105
Examples
--------
>>> import numpy as np
>>> from ts2net import NVG
>>> x = np.random.randn(500)
>>> nvg = NVG(limit=500).build(x)
>>> print(nvg.n_nodes, nvg.n_edges)
"""
def __init__(self, weighted: Union[bool, str] = False, limit: Optional[int] = None,
only_degrees: bool = False, output: str = "edges",
max_edges: Optional[int] = None, max_edges_per_node: Optional[int] = None,
max_memory_mb: Optional[float] = None, weight_mode: Optional[str] = None):
"""
Parameters
----------
weighted : bool or str, default False
If True, use "absdiff" weight mode. If str, use that weight mode.
Valid modes: "absdiff", "time_gap", "slope", "min_clearance"
weight_mode : str, optional
Explicit weight mode (overrides weighted if provided).
Valid modes: "absdiff", "time_gap", "slope", "min_clearance"
limit : int, optional
Maximum temporal distance (horizon limit, critical scale control)
only_degrees : bool
DEPRECATED: Use output="degrees" instead
output : str, default "edges"
Output mode: "edges", "degrees", or "stats"
max_edges : int, optional
Maximum total edges (safety cap)
max_edges_per_node : int, optional
Maximum edges per node (additional scale control)
max_memory_mb : float, optional
Maximum memory in MB (converted to max_edges estimate)
"""
# Resolve weight mode (same logic as HVG)
if weight_mode is not None:
self.weight_mode = weight_mode
self.weighted = True
elif isinstance(weighted, str):
self.weight_mode = weighted
self.weighted = True
elif weighted is True:
self.weight_mode = "absdiff" # Default mode
self.weighted = True
else:
self.weight_mode = None
self.weighted = False
self.limit = limit
if only_degrees:
self.output = "degrees"
else:
self.output = output
# For _NVG_Old, use bool weighted (it only supports absdiff)
self._impl = _NVG_Old(
weighted=(self.weighted and self.weight_mode == "absdiff"), limit=limit,
max_edges=max_edges, max_edges_per_node=max_edges_per_node,
max_memory_mb=max_memory_mb
)
self._graph = None
self._x = None # Store series for weight recomputation
[docs]
def fit(self, x: NDArray[np.float64]) -> 'NVG':
"""
Fit the NVG model to the time series (scikit-learn compatible).
Parameters
----------
x : array-like
Input time series (1D array)
Returns
-------
self : NVG
Returns self for method chaining
Examples
--------
>>> from ts2net import NVG
>>> import numpy as np
>>> x = np.random.randn(100)
>>> nvg = NVG()
>>> nvg.fit(x) # scikit-learn style
>>> nvg.transform() # Get graph
"""
# Validate and clean input (handles dtype contamination)
x = _validate_and_clean_series(x, "NVG")
self._x = x.copy() # Store for weight recomputation
self._fitted = True
return self
[docs]
def build(self, x: NDArray[np.float64]) -> "NVG":
"""
Build NVG from time series.
Equivalent to ``fit(x)``; returns ``self`` for method chaining.
Parameters
----------
x : array-like of shape (n,)
Input time series.
Returns
-------
self : NVG
"""
# Validate and clean input (handles dtype contamination)
x = _validate_and_clean_series(x, "NVG")
self._x = x.copy() # Store for weight recomputation
G_nx, A = self._impl.fit_transform(x)
output_handlers = {
"degrees": lambda: self._build_degrees_graph(G_nx, x),
"stats": lambda: self._build_stats_graph(G_nx, x),
"edges": lambda: self._build_edges_graph(G_nx, x),
}
handler = output_handlers.get(self.output)
if handler is None:
raise ValueError(f"Unknown output mode: {self.output}")
self._graph = handler()
self._fitted = True
return self
def _build_degrees_graph(self, G_nx: nx.Graph, x: NDArray) -> Graph:
"""Build graph in degrees-only mode."""
degrees = np.array([d for _, d in G_nx.degree()])
return Graph(
edges=[],
n_nodes=len(x),
directed=False,
weighted=self.weighted,
_adjacency=None,
_degrees=degrees
)
def _build_stats_graph(self, G_nx: nx.Graph, x: NDArray) -> Graph:
"""Build graph in stats-only mode."""
degrees = np.array([d for _, d in G_nx.degree()])
graph = Graph(
edges=[],
n_nodes=len(x),
directed=False,
weighted=self.weighted,
_adjacency=None,
_degrees=degrees
)
graph._n_edges_cached = G_nx.number_of_edges()
return graph
def _build_edges_graph(self, G_nx: nx.Graph, x: NDArray) -> Graph:
"""Build graph in full edges mode."""
if self.weighted and self.weight_mode:
# Always compute weights fresh — the underlying _HVG_Old / _NVG_Old
# implementations do not store edge weights reliably.
edges = [
(u, v, compute_weight(x, u, v, self.weight_mode))
for u, v in G_nx.edges()
]
else:
edges = [(u, v) for u, v in G_nx.edges()]
return Graph(
edges=edges,
n_nodes=len(x),
directed=False,
weighted=self.weighted,
_adjacency=None
)
@property
def edges(self):
if self._graph is None:
raise ValueError("Call build() first")
if self.output in ("degrees", "stats"):
return None
return self._graph.edges
@property
def n_nodes(self) -> int:
"""Number of nodes (equals length of input series)."""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.n_nodes
@property
def n_edges(self) -> int:
"""Number of edges."""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.n_edges
[docs]
def degree_sequence(self) -> NDArray[np.int64]:
"""Node degree sequence — shape (n_nodes,)."""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.degree_sequence()
[docs]
def stats(self, include_triangles: bool = False) -> dict:
"""Summary statistics (no dense matrix required)."""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.summary(include_triangles=include_triangles)
[docs]
def network_metrics(
self,
include: Optional[List[str]] = None,
sample_size: Optional[int] = None,
**kwargs
) -> dict:
"""
Compute advanced network metrics (clustering, path lengths, modularity).
Parameters
----------
include : list, optional
Metrics to include: ["clustering", "path_lengths", "modularity"]
If None, includes all metrics
sample_size : int, optional
For large graphs, sample nodes/pairs for expensive computations
**kwargs
Additional arguments passed to metric functions (e.g., method, weight, resolution, seed)
Returns
-------
dict
Dictionary with network metrics:
- Clustering: avg_clustering, transitivity
- Path lengths: avg_path_length, diameter, radius
- Modularity: modularity, n_communities
Examples
--------
>>> from ts2net import HVG
>>> import numpy as np
>>> x = np.random.randn(100)
>>> hvg = HVG()
>>> hvg.build(x)
>>> metrics = hvg.network_metrics()
>>> print(f"Clustering: {metrics['avg_clustering']:.3f}")
>>> print(f"Avg path length: {metrics['avg_path_length']:.3f}")
>>> print(f"Modularity: {metrics['modularity']:.3f}")
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.network_metrics(include=include, sample_size=sample_size, **kwargs)
[docs]
def test_significance(
self,
metric: str = "density",
method: str = "shuffle",
n_surrogates: int = 200,
alpha: float = 0.05,
rng: Optional[np.random.Generator] = None,
**kwargs
):
"""
Test significance of a network metric against null distribution.
Parameters
----------
metric : str, default "density"
Metric to test. Options: "density", "deg_mean", "deg_std",
"avg_clustering", "assortativity", or any key from stats()
method : str, default "shuffle"
Surrogate generation method: "shuffle", "phase", "circular", "iaaft", "block_bootstrap"
n_surrogates : int, default 200
Number of surrogate series to generate
alpha : float, default 0.05
Significance level (two-tailed)
rng : np.random.Generator, optional
Random number generator
**kwargs
Additional arguments for surrogate generation (e.g., block_size for block_bootstrap)
Returns
-------
result : NetworkSignificanceResult
Significance test result
Examples
--------
>>> from ts2net import NVG
>>> import numpy as np
>>> x = np.random.randn(100)
>>> nvg = NVG()
>>> nvg.build(x)
>>> result = nvg.test_significance(metric="density", method="phase", n_surrogates=100)
>>> print(result)
"""
from .stats.null_models import compute_network_metric_significance
if self._graph is None:
raise ValueError("Call build() first")
if self._x is None:
raise ValueError("Cannot test significance: original time series not stored")
# Create metric function
def metric_fn(ts):
# Get parameters from implementation, with safe defaults
max_edges = getattr(self._impl, 'max_edges', None)
max_edges_per_node = getattr(self._impl, 'max_edges_per_node', None)
max_memory_mb = getattr(self._impl, 'max_memory_mb', None)
nvg_temp = NVG(
weighted=self.weighted,
weight_mode=self.weight_mode,
limit=self.limit,
max_edges=max_edges,
max_edges_per_node=max_edges_per_node,
max_memory_mb=max_memory_mb,
output=self.output
)
nvg_temp.build(ts)
stats = nvg_temp.stats()
if metric not in stats:
raise ValueError(f"Unknown metric: {metric}. Available: {list(stats.keys())}")
return float(stats[metric])
return compute_network_metric_significance(
self._x,
metric_fn,
method=method,
n_surrogates=n_surrogates,
alpha=alpha,
metric_name=metric,
rng=rng,
**kwargs
)
[docs]
def adjacency_matrix(
self, format: str = "sparse"
) -> "Union[csr_matrix, coo_matrix, NDArray[np.float64]]":
"""
Adjacency matrix.
Parameters
----------
format : {"sparse", "coo", "dense"}, default "sparse"
``"sparse"`` returns a SciPy CSR matrix.
``"coo"`` returns a SciPy COO matrix.
``"dense"`` returns a NumPy array; **refused** for n > 50 000
nodes (would require ≥ 20 GB RAM).
Returns
-------
A : csr_matrix | coo_matrix | ndarray
Symmetric adjacency matrix of shape (n_nodes, n_nodes).
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.adjacency_matrix(format=format)
[docs]
def edges_coo(
self,
) -> "Tuple[NDArray[np.int64], NDArray[np.int64], Optional[NDArray[np.float64]]]":
"""
Edge list in COO (coordinate) format.
Returns
-------
rows : NDArray[int64] — source node indices
cols : NDArray[int64] — target node indices
weights : NDArray[float64] or None — edge weights (None if unweighted)
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.edges_coo()
[docs]
def as_networkx(self, force: bool = False) -> nx.Graph:
"""
Convert to a NetworkX graph.
Parameters
----------
force : bool, default False
If False, raises for n > 200 000 nodes to prevent accidental
allocation of very large objects.
Returns
-------
G : nx.Graph or nx.DiGraph
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.as_networkx(force=force)
[docs]
class RecurrenceNetwork:
"""
Recurrence Network.
A recurrence network is derived from a recurrence plot. After
delay-embedding the series into an m-dimensional phase space, two
states i, j are connected if their distance falls below a threshold
(``rule="epsilon"``) or if one is among the other's k nearest neighbours
(``rule="knn"``).
Parameters
----------
m : int, optional
Embedding dimension. ``None`` means no embedding (1-D, i.e., the
raw series is used as the phase-space trajectory).
tau : int, default 1
Time delay for Takens embedding.
rule : {"knn", "epsilon"}, default "knn"
``"knn"`` connects each point to its *k* nearest neighbours.
``"epsilon"`` connects all pairs closer than *epsilon*.
k : int, default 5
Number of nearest neighbours (``rule="knn"`` only).
epsilon : float, default 0.1
Recurrence threshold (``rule="epsilon"`` only).
metric : str, default "euclidean"
Distance metric for phase-space proximity.
output : {"edges", "degrees", "stats"}, default "edges"
Controls what is stored after build().
References
----------
Marwan, N., Donges, J. F., Zou, Y., Donner, R. V., & Kurths, J. (2009).
Complex network approach for recurrence analysis of time series.
*Physics Letters A*, 373(46), 4246–4254.
https://doi.org/10.1016/j.physleta.2009.09.042
Examples
--------
>>> import numpy as np
>>> from ts2net import RecurrenceNetwork
>>> x = np.sin(np.linspace(0, 8*np.pi, 300)) + 0.1*np.random.randn(300)
>>> rn = RecurrenceNetwork(rule="epsilon", epsilon=0.3).build(x)
>>> print(rn.n_nodes, rn.n_edges)
"""
def __init__(self, m: Optional[int] = None, tau: int = 1, rule: str = 'knn',
k: int = 5, epsilon: float = 0.1, metric: str = 'euclidean',
only_degrees: bool = False, output: str = "edges"):
"""
Parameters
----------
m : int, optional
Embedding dimension
tau : int
Time delay
rule : str
'knn', 'epsilon', 'radius'
k : int
Neighbors for k-NN
epsilon : float
Threshold for epsilon-recurrence
metric : str
Distance metric
only_degrees : bool
DEPRECATED: Use output="degrees" instead
output : str, default "edges"
Output mode: "edges", "degrees", or "stats"
"""
self.m = m
self.tau = tau
self.rule = rule
self.k = k
self.epsilon = epsilon
self.metric = metric
if only_degrees:
self.output = "degrees"
else:
self.output = output
# Map parameters to old implementation (uses threshold instead of epsilon)
# For k-NN rule, use k parameter; for epsilon rule, use threshold
threshold = epsilon if rule == 'epsilon' else None
self._impl = _RN_Old(m=m, tau=tau, rule=rule, k=k, metric=metric, threshold=threshold)
self._graph = None
[docs]
def build(self, x: NDArray[np.float64]) -> "RecurrenceNetwork":
"""
Build recurrence network from time series.
Parameters
----------
x : array-like of shape (n,)
Input time series.
Returns
-------
self : RecurrenceNetwork
"""
# Validate and clean input (handles dtype contamination)
x = _validate_and_clean_series(x, "RecurrenceNetwork")
G_nx, A = self._impl.fit_transform(x)
# Convert based on output mode
if self.output == "degrees":
degrees = np.array([d for _, d in G_nx.degree()])
n = G_nx.number_of_nodes()
self._graph = Graph(
edges=[],
n_nodes=n,
directed=False,
weighted=False,
_adjacency=None,
_degrees=degrees
)
elif self.output == "stats":
degrees = np.array([d for _, d in G_nx.degree()])
n = G_nx.number_of_nodes()
n_edges = G_nx.number_of_edges()
self._graph = Graph(
edges=[],
n_nodes=n,
directed=False,
weighted=False,
_adjacency=None,
_degrees=degrees
)
self._graph._n_edges_cached = n_edges
else: # output == "edges"
edges = list(G_nx.edges())
self._graph = Graph(
edges=edges,
n_nodes=G_nx.number_of_nodes(),
directed=False,
weighted=False,
_adjacency=None
)
return self
@property
def edges(self):
if self._graph is None:
raise ValueError("Call build() first")
if self.output in ("degrees", "stats"):
return None
return self._graph.edges
@property
def n_nodes(self) -> int:
"""Number of nodes (equals length of input series)."""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.n_nodes
@property
def n_edges(self) -> int:
"""Number of edges."""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.n_edges
[docs]
def degree_sequence(self) -> NDArray[np.int64]:
"""Node degree sequence — shape (n_nodes,)."""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.degree_sequence()
[docs]
def stats(self, include_triangles: bool = False) -> dict:
"""Summary statistics (no dense matrix required)."""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.summary(include_triangles=include_triangles)
[docs]
def adjacency_matrix(
self, format: str = "sparse"
) -> "Union[csr_matrix, coo_matrix, NDArray[np.float64]]":
"""
Adjacency matrix.
Parameters
----------
format : {"sparse", "coo", "dense"}, default "sparse"
``"sparse"`` returns a SciPy CSR matrix.
``"coo"`` returns a SciPy COO matrix.
``"dense"`` returns a NumPy array; **refused** for n > 50 000
nodes (would require ≥ 20 GB RAM).
Returns
-------
A : csr_matrix | coo_matrix | ndarray
Symmetric adjacency matrix of shape (n_nodes, n_nodes).
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.adjacency_matrix(format=format)
[docs]
def edges_coo(
self,
) -> "Tuple[NDArray[np.int64], NDArray[np.int64], Optional[NDArray[np.float64]]]":
"""
Edge list in COO (coordinate) format.
Returns
-------
rows : NDArray[int64] — source node indices
cols : NDArray[int64] — target node indices
weights : NDArray[float64] or None — edge weights (None if unweighted)
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.edges_coo()
[docs]
def as_networkx(self, force: bool = False) -> nx.Graph:
"""
Convert to a NetworkX graph.
Parameters
----------
force : bool, default False
If False, raises for n > 200 000 nodes to prevent accidental
allocation of very large objects.
Returns
-------
G : nx.Graph or nx.DiGraph
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.as_networkx(force=force)
[docs]
class TransitionNetwork:
"""
Transition Network (Ordinal Partition Network).
The time series is symbolised — each subsequence of length ``order+1``
is mapped to its rank-order pattern (an ordinal pattern). A directed
graph is then built where nodes are distinct patterns and a weighted
edge (i → j) records how often pattern i is immediately followed by
pattern j.
The resulting graph captures the Markov-like transition dynamics of the
series and is sensitive to nonlinear structure invisible to linear
methods.
Parameters
----------
symbolizer : {"ordinal", "equal_width", "equal_freq", "kmeans"}
How to discretise the series into symbolic states.
``"ordinal"`` (default) uses rank-order patterns — invariant to
monotone transformations and well-studied analytically.
order : int, default 3
Pattern length = ``order + 1``. ``order=3`` gives 3! = 6 possible
patterns. Larger values capture longer-range dependencies at the
cost of sparsity (and needing longer series).
delay : int, default 1
Sampling delay for pattern extraction.
output : {"edges", "degrees", "stats"}, default "edges"
Controls what is stored after build().
References
----------
Small, M. (2013). Complex networks from time series: Capturing
nonlinear dynamics. *Chaos*, 23(3), 033127.
https://doi.org/10.1063/1.4818261
Examples
--------
>>> import numpy as np
>>> from ts2net import TransitionNetwork
>>> x = np.random.randn(500)
>>> tn = TransitionNetwork(order=3).build(x)
>>> print(tn.n_nodes, tn.n_edges) # nodes = distinct patterns, directed
"""
def __init__(self, symbolizer: str = 'ordinal', order: int = 3, delay: int = 1,
tie_rule: str = 'stable', bins: int = 5, normalize: bool = True,
sparse: bool = False, only_degrees: bool = False, output: str = "edges"):
"""
Parameters
----------
symbolizer : str
Symbolization method
order : int
Order of patterns
delay : int
Time delay
tie_rule : str
How to handle ties
bins : int
Number of bins
normalize : bool
Normalize input
sparse : bool
DEPRECATED: Adjacency matrices are always sparse now
only_degrees : bool
DEPRECATED: Use output="degrees" instead
output : str, default "edges"
Output mode: "edges", "degrees", or "stats"
"""
self.symbolizer = symbolizer
self.order = order
self.delay = delay
self.tie_rule = tie_rule
self.bins = bins
self.normalize = normalize
self.sparse = sparse
if only_degrees:
self.output = "degrees"
else:
self.output = output
# Map parameters to old implementation (doesn't accept normalize or sparse)
self._impl = _TN_Old(
symbolizer=symbolizer, order=order, delay=delay,
tie_rule=tie_rule, bins=bins
)
self._graph = None
[docs]
def build(self, x: NDArray[np.float64]) -> "TransitionNetwork":
"""
Build transition network from time series.
Parameters
----------
x : array-like of shape (n,)
Input time series.
Returns
-------
self : TransitionNetwork
"""
# Validate and clean input (handles dtype contamination)
x = _validate_and_clean_series(x, "TransitionNetwork")
G_nx, A = self._impl.fit_transform(x)
# Convert based on output mode
if self.output == "degrees":
degrees = np.array([d for _, d in G_nx.degree()])
n = G_nx.number_of_nodes()
self._graph = Graph(
edges=[],
n_nodes=n,
directed=True, # Transition networks are directed
weighted=False,
_adjacency=None,
_degrees=degrees
)
elif self.output == "stats":
degrees = np.array([d for _, d in G_nx.degree()])
n = G_nx.number_of_nodes()
n_edges = G_nx.number_of_edges()
self._graph = Graph(
edges=[],
n_nodes=n,
directed=True,
weighted=False,
_adjacency=None,
_degrees=degrees
)
self._graph._n_edges_cached = n_edges
else: # output == "edges"
edges = list(G_nx.edges())
self._graph = Graph(
edges=edges,
n_nodes=G_nx.number_of_nodes(),
directed=True,
weighted=False,
_adjacency=None
)
return self
@property
def edges(self):
if self._graph is None:
raise ValueError("Call build() first")
if self.output in ("degrees", "stats"):
return None
return self._graph.edges
@property
def n_nodes(self) -> int:
"""Number of nodes (equals length of input series)."""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.n_nodes
@property
def n_edges(self) -> int:
"""Number of edges."""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.n_edges
[docs]
def degree_sequence(self) -> NDArray[np.int64]:
"""Node degree sequence — shape (n_nodes,)."""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.degree_sequence()
[docs]
def stats(self, include_triangles: bool = False) -> dict:
"""Summary statistics (no dense matrix required)."""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.summary(include_triangles=include_triangles)
[docs]
def adjacency_matrix(
self, format: str = "sparse"
) -> "Union[csr_matrix, coo_matrix, NDArray[np.float64]]":
"""
Adjacency matrix.
Parameters
----------
format : {"sparse", "coo", "dense"}, default "sparse"
``"sparse"`` returns a SciPy CSR matrix.
``"coo"`` returns a SciPy COO matrix.
``"dense"`` returns a NumPy array; **refused** for n > 50 000
nodes (would require ≥ 20 GB RAM).
Returns
-------
A : csr_matrix | coo_matrix | ndarray
Symmetric adjacency matrix of shape (n_nodes, n_nodes).
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.adjacency_matrix(format=format)
[docs]
def edges_coo(
self,
) -> "Tuple[NDArray[np.int64], NDArray[np.int64], Optional[NDArray[np.float64]]]":
"""
Edge list in COO (coordinate) format.
Returns
-------
rows : NDArray[int64] — source node indices
cols : NDArray[int64] — target node indices
weights : NDArray[float64] or None — edge weights (None if unweighted)
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.edges_coo()
[docs]
def as_networkx(self, force: bool = False) -> nx.Graph:
"""
Convert to a NetworkX graph.
Parameters
----------
force : bool, default False
If False, raises for n > 200 000 nodes to prevent accidental
allocation of very large objects.
Returns
-------
G : nx.Graph or nx.DiGraph
"""
if self._graph is None:
raise ValueError("Call build() first")
return self._graph.as_networkx(force=force)
# Factory function
[docs]
def build_network(x: NDArray[np.float64], method: str, **kwargs):
"""
Build network from time series (factory pattern).
Parameters
----------
x : array
Time series
method : str
'hvg', 'nvg', 'recurrence', 'transition'
**kwargs
Method-specific parameters
Returns
-------
graph : Graph-like object
Built network with .edges, .degree_sequence(), etc.
Examples
--------
>>> x = np.random.randn(1000)
>>> hvg = build_network(x, 'hvg')
>>> rn = build_network(x, 'recurrence', m=3, rule='knn', k=5)
"""
builders = {
'hvg': HVG,
'nvg': NVG,
'recurrence': RecurrenceNetwork,
'transition': TransitionNetwork,
}
method = method.lower()
if method not in builders:
raise ValueError(f"Unknown method: {method}. Choose from {list(builders.keys())}")
builder_cls = builders[method]
builder = builder_cls(**kwargs)
return builder.build(x)