Source code for ts2net.io_polars

"""
Polars-based Parquet ingestion for time series data.

This module provides efficient lazy-loading of time series from Parquet files
using Polars, converting to NumPy arrays for use with ts2net core algorithms.
"""

from __future__ import annotations

from typing import Optional, Union, Dict, Tuple
import numpy as np

try:
    import polars as pl
    HAS_POLARS = True
except ImportError:
    HAS_POLARS = False
    pl = None


[docs] def load_series_from_parquet_polars( path: str, time_col: str, value_col: str, id_col: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None, freq: Optional[str] = None, agg: str = "mean", tz: Optional[str] = None, columns_extra: Optional[list[str]] = None, ) -> Union[Dict[str, np.ndarray], Tuple[np.ndarray, np.ndarray]]: """ Load time series from Parquet file using Polars (lazy evaluation). Uses lazy evaluation to minimize memory usage. Converts to NumPy arrays for compatibility with ts2net core algorithms. Parameters ---------- path : str Path to Parquet file or directory of Parquet files time_col : str Column name for timestamps value_col : str Column name for values id_col : str, optional Column name for series identifier (e.g., meter_id, region) If None, returns single series as tuple (times, values) start : str, optional Start timestamp filter (ISO format or parseable by Polars) end : str, optional End timestamp filter (ISO format or parseable by Polars) freq : str, optional Time frequency for bucketing (e.g., '1h', '1d', '15m') Uses Polars group_by_dynamic for efficient time-based aggregation agg : str, default 'mean' Aggregation function: 'mean', 'sum', 'min', 'max', 'median', 'first', 'last' tz : str, optional Timezone for time_col (e.g., 'UTC', 'Europe/Madrid') columns_extra : list[str], optional Additional columns to keep in output (not used for aggregation) Returns ------- dict[str, np.ndarray] or tuple[np.ndarray, np.ndarray] If id_col is provided: dict mapping id -> values array If id_col is None: tuple of (times, values) arrays Examples -------- >>> # Single series >>> times, values = load_series_from_parquet_polars( ... 'data.parquet', time_col='timestamp', value_col='consumption' ... ) >>> # Multiple series by meter_id >>> series = load_series_from_parquet_polars( ... 'data.parquet', ... time_col='timestamp', ... value_col='consumption', ... id_col='meter_id', ... freq='1h', ... start='2024-01-01', ... end='2024-12-31' ... ) >>> # series = {'meter_1': np.array([...]), 'meter_2': np.array([...]), ...} """ if not HAS_POLARS: raise ImportError( "Polars is required for load_series_from_parquet_polars. " "Install with: pip install ts2net[polars]" ) # Build column selection select_cols = [time_col, value_col] if id_col is not None: select_cols.append(id_col) if columns_extra: select_cols.extend(columns_extra) # Lazy scan - only reads metadata initially df = pl.scan_parquet(path).select(select_cols) # Apply timezone if specified (convert string time_col to datetime if needed) # Note: Polars will handle timezone conversion if time_col is already datetime if tz is not None: # Try to set timezone - will work if column is datetime try: df = df.with_columns( pl.col(time_col).dt.replace_time_zone(tz) ) except Exception: # If time_col is string, parse it first df = df.with_columns( pl.col(time_col).str.strptime(pl.Datetime).dt.replace_time_zone(tz) ) # Apply time filters with pushdown (efficient) # Convert string timestamps to Python datetime objects for comparison if start is not None: if isinstance(start, str): from datetime import datetime try: # Try ISO format first (handles '2024-01-01 00:00:00' and '2024-01-01T00:00:00') start_parsed = datetime.fromisoformat(start.replace('Z', '+00:00')) except ValueError: # Try common format 'YYYY-MM-DD HH:MM:SS' try: start_parsed = datetime.strptime(start, "%Y-%m-%d %H:%M:%S") except ValueError: # Try date only start_parsed = datetime.strptime(start, "%Y-%m-%d") df = df.filter(pl.col(time_col) >= start_parsed) else: df = df.filter(pl.col(time_col) >= start) if end is not None: if isinstance(end, str): from datetime import datetime try: # Try ISO format first end_parsed = datetime.fromisoformat(end.replace('Z', '+00:00')) except ValueError: # Try common format 'YYYY-MM-DD HH:MM:SS' try: end_parsed = datetime.strptime(end, "%Y-%m-%d %H:%M:%S") except ValueError: # Try date only end_parsed = datetime.strptime(end, "%Y-%m-%d") df = df.filter(pl.col(time_col) <= end_parsed) else: df = df.filter(pl.col(time_col) <= end) # Map aggregation function name to Polars method agg_map = { 'mean': lambda col: col.mean(), 'sum': lambda col: col.sum(), 'min': lambda col: col.min(), 'max': lambda col: col.max(), 'median': lambda col: col.median(), 'first': lambda col: col.first(), 'last': lambda col: col.last(), } agg_func = agg_map.get(agg.lower(), agg_map['mean']) # Time-based bucketing if freq is specified if freq is not None: if id_col is not None: # Group by id_col and time buckets using group_by_dynamic with by parameter df = ( df .sort([id_col, time_col]) # Must be sorted for group_by_dynamic .group_by_dynamic( time_col, every=freq, closed="left", label="left", by=id_col ) .agg( agg_func(pl.col(value_col)).alias(value_col) ) ) else: # Single series with time bucketing df = ( df .sort(time_col) # Must be sorted for group_by_dynamic .group_by_dynamic( time_col, every=freq, closed="left", label="left" ) .agg( agg_func(pl.col(value_col)).alias(value_col) ) ) elif id_col is not None: # Group by id_col and time_col (no time bucketing, just deduplication) df = ( df .group_by([id_col, time_col]) .agg( agg_func(pl.col(value_col)).alias(value_col) ) ) # Sort by time (and id_col if present) sort_cols = [id_col, time_col] if id_col is not None else [time_col] df = df.sort(sort_cols) # Materialize and convert to NumPy # Only materialize what we need - lazy until here df_materialized = df.collect() # Drop nulls df_materialized = df_materialized.drop_nulls() if id_col is None: # Single series - return (times, values) # Note: times may be datetime, convert to numpy array times_arr = df_materialized[time_col].to_numpy() # Clean and validate values - handle dtype contamination values_raw = df_materialized[value_col].to_numpy() # Force numeric conversion, handling object dtype and mixed types values = _clean_numeric_array(values_raw) return times_arr, values else: # Multiple series - group by id_col # Ensure id_col is string type to avoid concatenation errors df_materialized = df_materialized.with_columns( pl.col(id_col).cast(pl.Utf8).alias(id_col) ) result = {} unique_ids = df_materialized[id_col].unique().to_list() for id_val in unique_ids: # Ensure id_val is string id_val_str = str(id_val) series_df = df_materialized.filter(pl.col(id_col) == id_val) # Already sorted by time from earlier sort # Clean and validate values values_raw = series_df[value_col].to_numpy() values = _clean_numeric_array(values_raw) result[id_val_str] = values return result
def _clean_numeric_array(arr: np.ndarray) -> np.ndarray: """ Clean and validate numeric array, handling dtype contamination. Parameters ---------- arr : array Input array (may contain non-numeric values) Returns ------- arr : array (float64) Clean numeric array Raises ------ ValueError If no valid numeric values remain after cleaning """ import pandas as pd # Convert to pandas Series for robust numeric coercion if isinstance(arr, np.ndarray): if arr.dtype == object or arr.dtype.kind not in ['f', 'i', 'u']: # Has non-numeric types - use pandas coercion s = pd.Series(arr) s = pd.to_numeric(s, errors='coerce') else: s = pd.Series(arr) else: s = pd.Series(arr) s = pd.to_numeric(s, errors='coerce') # Replace infinities with NaN s = s.replace([np.inf, -np.inf], np.nan) # Drop nulls s = s.dropna() if len(s) == 0: raise ValueError( "No valid numeric values after cleaning. " "Check for non-numeric data, infinities, or all-null series." ) # Convert to float64 numpy array return s.values.astype(np.float64)