"""
Core module for time series to network conversion.
This module provides implementations of various time series to network conversion
algorithms including Recurrence Networks, Visibility Graphs, and Transition Networks.
"""
from __future__ import annotations
# Standard library imports
import math
import random
from dataclasses import dataclass, fields
from itertools import combinations
from typing import Any, Dict, Iterable, List, Literal, Optional, Sequence, Tuple, Union, Callable
# Third-party imports
import numpy as np
import networkx as nx
from scipy import sparse
from scipy.sparse import csr_matrix
# Local imports
from .recurrence import RecurrenceNetwork
from .transition import TransitionNetwork
from .visibility import HVG, NVG
# Removed circular import - these classes are defined in this file
try:
from numba import njit
except ImportError:
[docs]
def njit(*args, **kwargs):
"""Dummy decorator when numba is not available."""
def deco(f):
return f
return deco
# Third-party imports
try:
from scipy.sparse import csr_matrix
except ImportError:
csr_matrix = None
try:
from networkx import from_scipy_sparse_array as _nx_from_csr
except ImportError:
_nx_from_csr = None
try:
from networkx import from_scipy_sparse_matrix as _nx_from_csr_legacy
except ImportError:
_nx_from_csr_legacy = None
# Local application imports - temporarily disable Rust imports due to build issues
try:
from .core_rust import (
hvg_graph as _hvg_graph_rs,
knn as _knn_rs,
nvg_graph as _nvg_graph_rs,
radius as _radius_rs,
rn_adj_epsilon as _rn_adj_eps_rs,
)
_rust_available = True
except ImportError:
# Rust extension not available, use Python fallbacks
_hvg_graph_rs = None
_knn_rs = None
_nvg_graph_rs = None
_radius_rs = None
_rn_adj_eps_rs = None
_rust_available = False
from .stats_summary import GraphSummaryResult
# SKMixin is a simple mixin class for scikit-learn compatibility
[docs]
class SKMixin:
"""Mixin class for scikit-learn compatibility."""
[docs]
def get_params(self, deep=True):
"""Get parameters for this estimator."""
return {k: v for k, v in self.__dict__.items() if not k.endswith('_')}
[docs]
def set_params(self, **params):
"""Set parameters for this estimator."""
for key, value in params.items():
setattr(self, key, value)
return self
# HVG and NVG implementations have been moved to their respective modules
# in the visibility/ subpackage
# ---- Weighted visibility helpers ----
def _vis_weights(y: np.ndarray, E: np.ndarray, mode: str) -> np.ndarray:
_WEIGHT_MODES = {
"absdiff": lambda y, E: np.abs(y[E[:, 0]] - y[E[:, 1]]).astype(float),
"inv_dist": lambda y, E: 1.0 / (np.abs(E[:, 1] - E[:, 0]).astype(float)),
"slope": lambda y, E: (y[E[:, 1]] - y[E[:, 0]]) / (E[:, 1] - E[:, 0]),
"distance": lambda y, E: np.abs(E[:, 1] - E[:, 0]).astype(float),
"product": lambda y, E: (y[E[:, 0]] * y[E[:, 1]]).astype(float),
"mean": lambda y, E: ((y[E[:, 0]] + y[E[:, 1]]) / 2.0).astype(float),
}
weight_fn = _WEIGHT_MODES.get(mode)
if weight_fn is None:
raise ValueError(f"Unknown mode: {mode}")
return weight_fn(y, E)
def _graph_from_edges_weighted(
n: int, E: np.ndarray, w: np.ndarray, sparse: bool
) -> Tuple[nx.Graph, object]:
if sparse and csr_matrix is not None:
rows = E[:, 0].astype(np.int64)
cols = E[:, 1].astype(np.int64)
A = csr_matrix((w.astype(float), (rows, cols)), shape=(n, n))
A = A.maximum(A.T)
A.setdiag(0.0)
A.eliminate_zeros()
G = nx.from_scipy_sparse_array(A)
return G, A
A = np.zeros((n, n), float)
for (i, j), ww in zip(E, w):
A[i, j] = ww
A[j, i] = ww
np.fill_diagonal(A, 0.0)
G = nx.from_numpy_array(A)
return G, A
def _ordinal_patterns(x: np.ndarray, order: int) -> np.ndarray:
n = x.size
if order < 2:
raise ValueError("order must be >= 2")
L = n - order + 1
if L <= 0:
raise ValueError("Series too short for chosen order.")
pats = np.empty(L, dtype=np.int64)
base = np.arange(order)
for i in range(L):
w = x[i : i + order]
idx = np.lexsort((base, w))
code = 0
used = np.zeros(order, dtype=np.int64)
for j in range(order):
r = idx[j]
code += (r - used[:r].sum()) * math.factorial(order - j - 1)
used[r] = 1
pats[i] = code
return pats
def _adj_to_graph(A: np.ndarray, directed: bool = False) -> nx.Graph:
G = nx.DiGraph() if directed else nx.Graph()
n = A.shape[0]
G.add_nodes_from(range(n))
rows, cols = np.nonzero(A)
for i, j in zip(rows, cols):
if i == j:
continue
if directed:
G.add_edge(i, j, weight=float(A[i, j]))
else:
if i < j:
G.add_edge(i, j, weight=float(A[i, j]))
return G
@njit(cache=True)
def _hvg_edges(y: np.ndarray) -> np.ndarray:
n = y.size
ei = []
ej = []
stack = []
for j in range(n):
while stack and y[stack[-1]] < y[j]:
i = stack.pop()
ei.append(i)
ej.append(j)
if stack:
i = stack[-1]
ei.append(i)
ej.append(j)
stack.append(j)
m = len(ei)
E = np.empty((m, 2), dtype=np.int64)
for k in range(m):
E[k, 0] = ei[k]
E[k, 1] = ej[k]
return E
[docs]
def triangle_count(G: nx.Graph) -> int:
H = G.to_undirected() if G.is_directed() else G
tri_dict = nx.triangles(H)
return int(sum(tri_dict.values()) // 3)
[docs]
def wedge_count(G: nx.Graph) -> int:
H = G.to_undirected() if G.is_directed() else G
deg = np.array([d for _, d in H.degree()], dtype=np.int64)
return int(np.sum(deg * (deg - 1) // 2))
[docs]
def motif_summary(G: Union[nx.Graph, nx.DiGraph]) -> dict:
und = G.to_undirected() if G.is_directed() else G
T = triangle_count(und)
W = wedge_count(und)
n = und.number_of_nodes()
N3 = n * (n - 1) * (n - 2) // 6
S = max(N3 - (T + W), 0)
return {"triangles": T, "wedges": W, "other_3_node": S, "N3": N3}
def _giant_component(H: nx.Graph) -> nx.Graph:
if H.number_of_nodes() == 0:
return H
comps = sorted(nx.connected_components(H), key=len, reverse=True)
return H.subgraph(next(iter([c for c in comps]))).copy()
[docs]
def small_world_summary(G: Union[nx.Graph, nx.DiGraph]) -> dict:
H = G.to_undirected() if G.is_directed() else G
GCC = _giant_component(H)
n = GCC.number_of_nodes()
m = GCC.number_of_edges()
if n <= 1 or m == 0:
return {
"C": np.nan,
"L": np.nan,
"C_er": np.nan,
"L_er": np.nan,
"sigma": np.nan,
}
k_bar = 2.0 * m / n
C = nx.average_clustering(GCC)
try:
L = nx.average_shortest_path_length(GCC)
except nx.NetworkXError:
L = np.nan
p = (2.0 * m) / (n * (n - 1))
C_er = p
if k_bar > 1.0:
L_er = math.log(n) / math.log(k_bar)
else:
L_er = np.nan
num = (C / C_er) if C_er > 0 else np.nan
den = (L / L_er) if (L_er is not None and not np.isnan(L_er)) else np.nan
sigma = (
num / den if (not np.isnan(num) and not np.isnan(den) and den != 0) else np.nan
)
return {"C": C, "L": L, "C_er": C_er, "L_er": L_er, "sigma": sigma}
def _sampled_combinations(nodes, r, max_samples=None, seed=3363):
nodes = list(nodes)
if max_samples is None:
for combo in combinations(nodes, r):
yield combo
return
rng = random.Random(seed)
n = len(nodes)
seen = set()
attempts = 0
cap = min(max_samples, 1_000_000)
while len(seen) < cap and attempts < cap * 10:
combo = tuple(sorted(rng.sample(nodes, r)))
if combo not in seen:
seen.add(combo)
yield combo
attempts += 1
[docs]
def directed_3node_motifs(
G: nx.DiGraph, max_samples: Optional[int] = None, seed: int = 3363
) -> dict:
if not G.is_directed():
raise ValueError("Graph must be directed.")
counts = {}
total = 0
for trio in _sampled_combinations(G.nodes(), 3, max_samples, seed):
sub = G.subgraph(trio).copy()
if not nx.is_weakly_connected(sub):
continue
mapping = dict(zip(sub.nodes(), range(3)))
sub = nx.relabel_nodes(sub, mapping)
bits = [
int(sub.has_edge(0, 1)),
int(sub.has_edge(0, 2)),
int(sub.has_edge(1, 0)),
int(sub.has_edge(1, 2)),
int(sub.has_edge(2, 0)),
int(sub.has_edge(2, 1)),
]
code = "".join(map(str, bits))
counts[code] = counts.get(code, 0) + 1
total += 1
if total == 0:
return {}
return {k: {"count": v, "freq": v / total} for k, v in counts.items()}
def _deg_seq(sub: nx.Graph) -> List[int]:
return sorted([d for _, d in sub.degree()])
[docs]
def undirected_4node_motifs(
G: nx.Graph, max_samples: Optional[int] = None, seed: int = 3363
) -> dict:
if G.is_directed():
raise ValueError("Graph must be undirected.")
out = {"K4": 0, "C4": 0, "triangle_tail": 0, "P4": 0, "K1,3": 0, "other": 0}
total = 0
for quad in _sampled_combinations(G.nodes(), 4, max_samples, seed):
sub = G.subgraph(quad)
if not nx.is_connected(sub):
continue
m = sub.number_of_edges()
d = _deg_seq(sub)
if m == 6:
out["K4"] += 1
elif m == 4 and d == [2, 2, 2, 2]:
out["C4"] += 1
elif m == 4 and d == [1, 2, 2, 3]:
out["triangle_tail"] += 1
elif m == 3 and d == [1, 1, 2, 2]:
out["P4"] += 1
elif m == 3 and d == [1, 1, 1, 3]:
out["K1,3"] += 1
else:
out["other"] += 1
total += 1
if total == 0:
return {}
return {k: {"count": v, "freq": v / total} for k, v in out.items()}
[docs]
def motif_counts(
G: Union[nx.Graph, nx.DiGraph],
motif_type: str = '3node',
max_samples: Optional[int] = None,
seed: int = 3363,
) -> Dict[str, int]:
"""
Count motifs in a graph.
Parameters
----------
G : nx.Graph or nx.DiGraph
Input graph
motif_type : str, default '3node'
Type of motifs to count ('3node' or '4node')
max_samples : int, optional
Maximum number of samples for motif counting
seed : int, default 3363
Random seed for sampling
Returns
-------
Dict[str, int]
Dictionary of motif counts
"""
if motif_type == '3node' and isinstance(G, nx.DiGraph):
return directed_3node_motifs(G, max_samples, seed)
elif motif_type == '4node' and isinstance(G, nx.Graph):
return undirected_4node_motifs(G, max_samples, seed)
else:
return {}
[docs]
def graph_summary(
G: Union[nx.Graph, nx.DiGraph],
motifs: Optional[str] = None,
motif_samples: Optional[int] = None,
seed: int = 3363,
) -> dict:
"""
Compute a comprehensive summary of graph properties.
Parameters
----------
G : nx.Graph or nx.DiGraph
Input graph
motifs : str, optional
Type of motifs to compute ('3node', '4node', or None)
motif_samples : int, optional
Maximum number of samples for motif counting
seed : int, default 3363
Random seed for sampling
Returns
-------
dict
Dictionary of graph properties
"""
und = G.to_undirected() if G.is_directed() else G
deg = dict(und.degree())
deg_vals = np.array(list(deg.values()), dtype=float)
out = {
"n": und.number_of_nodes(),
"m": und.number_of_edges(),
"density": nx.density(und),
"deg_mean": float(deg_vals.mean()) if deg_vals.size else 0.0,
"deg_std": float(deg_vals.std(ddof=1)) if deg_vals.size > 1 else 0.0,
"avg_degree": float(deg_vals.mean()) if deg_vals.size else 0.0,
"assortativity": (
nx.degree_assortativity_coefficient(und)
if und.number_of_edges()
else np.nan
),
"avg_clustering": (
nx.average_clustering(und) if und.number_of_nodes() else np.nan
),
}
out.update(small_world_summary(und))
out.update(motif_summary(und))
if motifs in ("directed3", "all") and G.is_directed():
out["motifs_directed_3"] = directed_3node_motifs(
G, max_samples=motif_samples, seed=seed
)
if motifs in ("undirected4", "all"):
base = G.to_undirected() if G.is_directed() else G
out["motifs_undirected_4"] = undirected_4node_motifs(
base, max_samples=motif_samples, seed=seed
)
return out
def _as_1d(x: Union[np.ndarray, Iterable[float]]) -> np.ndarray:
a = np.asarray(x, dtype=float)
if a.ndim != 1:
raise ValueError("Expected a 1-D array.")
return a
[docs]
def embed(x: np.ndarray, m: int, tau: int) -> np.ndarray:
"""
Time-delay embedding of a time series.
Parameters
----------
x : np.ndarray
1D time series
m : int
Embedding dimension
tau : int
Time delay
Returns
-------
np.ndarray
2D array of shape (L, m) where L = len(x) - (m-1)*tau
"""
x = _as_1d(x)
n = len(x)
L = n - (m - 1) * tau
if L <= 0:
raise ValueError(f"Series too short: n={n} < required={(m-1)*tau+1} for m={m}, tau={tau}")
E = np.empty((L, m), dtype=float)
for i in range(m):
E[:, i] = x[i * tau : i * tau + L]
return E
def _run_single(series: np.ndarray, builder: str, kwargs: dict):
_BUILDERS = {
"HVG": lambda s, kw: HVG().fit_transform(s),
"NVG": lambda s, kw: NVG().fit_transform(s),
"RN": lambda s, kw: RecurrenceNetwork(**kw).fit_transform(s),
"TN": lambda s, kw: TransitionNetwork(**kw).fit_transform(s),
}
builder_fn = _BUILDERS.get(builder)
if builder_fn is None:
raise ValueError(f"Unknown builder: {builder}")
return builder_fn(series, kwargs)
# Builder registry for batch processing
_BUILDERS = {
'hvg': lambda x, **kwargs: HVG(**kwargs).fit_transform(x)[0],
'nvg': lambda x, **kwargs: NVG(**kwargs).fit_transform(x)[0],
'recurrence': lambda x, **kwargs: RecurrenceNetwork(**kwargs).fit_transform(x)[0],
'transition': lambda x, **kwargs: TransitionNetwork(**kwargs).fit_transform(x)[0],
}