Source code for skdownscale.pointwise_models.quantile

from __future__ import annotations

import collections
import copy
from typing import Any, Literal

import numpy as np
from numpy.typing import NDArray
from sklearn.base import BaseEstimator, RegressorMixin, TransformerMixin
from sklearn.linear_model import LinearRegression
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted, validate_data

from .trend import LinearTrendTransformer
from .utils import check_max_features, default_none_kwargs

SYNTHETIC_MIN = -1e20
SYNTHETIC_MAX = 1e20

Cdf = collections.namedtuple('CDF', ['pp', 'vals'])


def plotting_positions(n: int, alpha: float = 0.4, beta: float = 0.4) -> NDArray[np.floating[Any]]:
    """Returns a monotonic array of plotting positions.

    Parameters
    ----------
    n : int
        Length of plotting positions to return.
    alpha, beta : float
        Plotting positions parameter. Default is 0.4.

    Returns
    -------
    positions : ndarray
        Quantile mapped data with shape from `input_data` and probability
        distribution from `data_to_match`.

    See Also
    --------
    scipy.stats.mstats.plotting_positions
    """
    return (np.arange(1, n + 1) - alpha) / (n + 1.0 - alpha - beta)


[docs] class QuantileMapper(TransformerMixin, BaseEstimator): """Transform features using quantile mapping. Parameters ---------- detrend : boolean, optional If True, detrend the data before quantile mapping and add the trend back after transforming. Default is False. lt_kwargs : dict, optional Dictionary of keyword arguments to pass to the LinearTrendTransformer qm_kwargs : dict, optional Dictionary of keyword arguments to pass to the QuantileMapper Attributes ---------- x_cdf_fit_ : QuantileTransformer QuantileTranform for fit(X) """ _fit_attributes = ['x_cdf_fit_']
[docs] def __init__( self, detrend: bool = False, lt_kwargs: dict[str, Any] | None = None, qt_kwargs: dict[str, Any] | None = None, ) -> None: self.detrend = detrend self.lt_kwargs = lt_kwargs self.qt_kwargs = qt_kwargs
def _validate_data(self, X, y=None, reset=True, **check_params): """Validate input data using sklearn's validate_data.""" return validate_data(self, X=X, y=y, reset=reset, **check_params)
[docs] def fit(self, X, y=None): """Fit the quantile mapping model. Parameters ---------- X : array-like, shape [n_samples, n_features] Training data """ # TO-DO: fix validate data fctn X = self._validate_data(X) qt_kws = default_none_kwargs(self.qt_kwargs, copy=True) # maybe detrend the input datasets if self.detrend: lt_kwargs = default_none_kwargs(self.lt_kwargs) self.x_trend_fit_ = LinearTrendTransformer(**lt_kwargs).fit(X) x_to_cdf = self.x_trend_fit_.transform(X) else: x_to_cdf = X # calculate the cdfs for X qt = CunnaneTransformer(**qt_kws) self.x_cdf_fit_ = qt.fit(x_to_cdf) return self
[docs] def transform(self, X): """Perform the quantile mapping. Parameters ---------- X : array_like, shape [n_samples, n_features] Samples. Returns ------- y : ndarray of shape (n_samples, ) Transformed data """ # validate input data check_is_fitted(self) # TO-DO: fix validate_data fctn X = self._validate_data(X) # maybe detrend the datasets if self.detrend: lt_kwargs = default_none_kwargs(self.lt_kwargs) x_trend = LinearTrendTransformer(**lt_kwargs).fit(X) x_to_cdf = x_trend.transform(X) else: x_to_cdf = X # do the final mapping qt_kws = default_none_kwargs(self.qt_kwargs, copy=True) x_quantiles = CunnaneTransformer(**qt_kws).fit_transform(x_to_cdf) x_qmapped = self.x_cdf_fit_.inverse_transform(x_quantiles) # add the trend back if self.detrend: x_qmapped = x_trend.inverse_transform(x_qmapped) # reset the baseline (remove bias) x_qmapped -= x_trend.lr_model_.intercept_ - self.x_trend_fit_.lr_model_.intercept_ return x_qmapped
def __sklearn_tags__(self): from dataclasses import replace tags = super().__sklearn_tags__() # Skip tests - this is a specialized transformer that only supports 1 feature tags = replace( tags, _skip_test='QuantileMapper only supports 1 feature and has temporal dependencies' ) return tags
[docs] class QuantileMappingReressor(RegressorMixin, BaseEstimator): """Transform features using quantile mapping. Parameters ---------- extrapolate : str, optional How to extend the cdfs at the tails. Valid options include {`'min'`, `'max'`, `'both'`, `'1to1'`, `None`} n_endpoints : int Number of endpoints to include when extrapolating the tails of the cdf Attributes ---------- _X_cdf : Cdf NamedTuple representing the fit's X cdf _y_cdf : Cdf NamedTuple representing the fit's y cdf """ _fit_attributes = ['_X_cdf', '_y_cdf']
[docs] def __init__( self, extrapolate: Literal['min', 'max', 'both', '1to1'] | None = None, n_endpoints: int = 10, ) -> None: self.extrapolate = extrapolate self.n_endpoints = n_endpoints if self.n_endpoints < 2: raise ValueError('Invalid number of n_endpoints, must be >= 2')
def _validate_data(self, X, y=None, reset=True, **check_params): """Validate input data using sklearn's validate_data.""" return validate_data(self, X=X, y=y, reset=reset, **check_params)
[docs] def fit(self, X, y, **kwargs): """Fit the quantile mapping regression model. Parameters ---------- X : array-like, shape [n_samples, 1] Training data. Returns ------- self : object """ X = check_array( X, dtype='numeric', ensure_min_samples=2 * self.n_endpoints + 1, ensure_2d=True ) y = check_array( y, dtype='numeric', ensure_min_samples=2 * self.n_endpoints + 1, ensure_2d=False ) X = check_max_features(X, n=1) self._X_cdf = self._calc_extrapolated_cdf(X, sort=True, extrapolate=self.extrapolate) self._y_cdf = self._calc_extrapolated_cdf(y, sort=True, extrapolate=self.extrapolate) return self
[docs] def predict(self, X, **kwargs): """Predict regression for target X. Parameters ---------- X : array_like, shape [n_samples, 1] Samples. Returns ------- y : ndarray of shape (n_samples, ) Predicted data. """ check_is_fitted(self, self._fit_attributes) X = check_array(X, ensure_2d=True) X = X[:, 0] sort_inds = np.argsort(X) X_cdf = self._calc_extrapolated_cdf(X[sort_inds], sort=False, extrapolate=self.extrapolate) # Fill value for when x < xp[0] or x > xp[-1] (i.e. when X_cdf vals are out of range for self._X_cdf vals) left = -np.inf if self.extrapolate in ['min', 'both'] else None right = np.inf if self.extrapolate in ['max', 'both'] else None # For all values in future X, find the corresponding percentile in historical X X_cdf.pp[:] = np.interp( X_cdf.vals, self._X_cdf.vals, self._X_cdf.pp, left=left, right=right ) # Extrapolate the tails beyond 1.0 to handle "new extremes", only triggered when the new extremes are even more drastic then # the linear extrapolation result from historical X at SYNTHETIC_MIN and SYNTHETIC_MAX if np.isinf(X_cdf.pp).any(): lower_inds = np.nonzero(-np.inf == X_cdf.pp)[0] upper_inds = np.nonzero(np.inf == X_cdf.pp)[0] model = LinearRegression() if len(lower_inds): s = slice(lower_inds[-1] + 1, lower_inds[-1] + 1 + self.n_endpoints) model.fit(X_cdf.pp[s].reshape(-1, 1), X_cdf.vals[s].reshape(-1, 1)) X_cdf.pp[lower_inds] = model.predict(X_cdf.vals[lower_inds].reshape(-1, 1)) if len(upper_inds): s = slice(upper_inds[0] - self.n_endpoints, upper_inds[0]) model.fit(X_cdf.pp[s].reshape(-1, 1), X_cdf.vals[s].reshape(-1, 1)) X_cdf.pp[upper_inds] = model.predict(X_cdf.vals[upper_inds].reshape(-1, 1)) # do the full quantile mapping y_hat = np.full_like(X, np.nan) y_hat[sort_inds] = np.interp(X_cdf.pp, self._y_cdf.pp, self._y_cdf.vals)[1:-1] # If extrapolate is 1to1, apply the offset between X and y to the # tails of y_hat if self.extrapolate == '1to1': y_hat = self._extrapolate_1to1(X, y_hat) return y_hat
def _extrapolate_1to1(self, X, y_hat): X_fit_len = len(self._X_cdf.vals) X_fit_min = self._X_cdf.vals[0] X_fit_max = self._X_cdf.vals[-1] y_fit_len = len(self._y_cdf.vals) y_fit_min = self._y_cdf.vals[0] y_fit_max = self._y_cdf.vals[-1] # adjust values over fit max inds = X > X_fit_max if inds.any(): if X_fit_len == y_fit_len: y_hat[inds] = y_fit_max + (X[inds] - X_fit_max) elif X_fit_len > y_fit_len: X_fit_at_y_fit_max = np.interp(self._y_cdf.pp[-1], self._X_cdf.pp, self._X_cdf.vals) y_hat[inds] = y_fit_max + (X[inds] - X_fit_at_y_fit_max) elif X_fit_len < y_fit_len: y_fit_at_X_fit_max = np.interp(self._X_cdf.pp[-1], self._y_cdf.pp, self._y_cdf.vals) y_hat[inds] = y_fit_at_X_fit_max + (X[inds] - X_fit_max) # adjust values under fit min inds = X < X_fit_min if inds.any(): if X_fit_len == y_fit_len: y_hat[inds] = y_fit_min + (X[inds] - X_fit_min) elif X_fit_len > y_fit_len: X_fit_at_y_fit_min = np.interp(self._y_cdf.pp[0], self._X_cdf.pp, self._X_cdf.vals) y_hat[inds] = X_fit_min + (X[inds] - X_fit_at_y_fit_min) elif X_fit_len < y_fit_len: y_fit_at_X_fit_min = np.interp(self._X_cdf.pp[0], self._y_cdf.pp, self._y_cdf.vals) y_hat[inds] = y_fit_at_X_fit_min + (X[inds] - X_fit_min) return y_hat def _calc_extrapolated_cdf( self, data, sort=True, extrapolate=None, pp_min=SYNTHETIC_MIN, pp_max=SYNTHETIC_MAX ): """Calculate a new extrapolated cdf The goal of this function is to create a CDF with bounds outside the [0, 1] range. This allows for quantile mapping beyond observed data points. Parameters ---------- data : array_like, shape [n_samples, 1] Input data (can be unsorted) sort : bool If true, sort the data before building the CDF extrapolate : str or None How to extend the cdfs at the tails. Valid options include {`'min'`, `'max'`, `'both'`, `'1to1'`, `None`} pp_min, pp_max : float Plotting position min/max values. Returns ------- cdf : Cdf (NamedTuple) """ n = len(data) # plotting positions pp = np.empty(n + 2) pp[1:-1] = plotting_positions(n) # extended data values (sorted) if data.ndim == 2: data = data[:, 0] if sort: data = np.sort(data) vals = np.full(n + 2, np.nan) vals[1:-1] = data vals[0] = data[0] vals[-1] = data[-1] # Add endpoints to the vector of plotting positions if extrapolate in [None, '1to1']: pp[0] = pp[1] pp[-1] = pp[-2] elif extrapolate == 'both': pp[0] = pp_min pp[-1] = pp_max elif extrapolate == 'max': pp[0] = pp[1] pp[-1] = pp_max elif extrapolate == 'min': pp[0] = pp_min pp[-1] = pp[-2] else: raise ValueError(f'unknown value for extrapolate: {extrapolate}') if extrapolate in ['min', 'max', 'both']: model = LinearRegression() # extrapolate lower end point if extrapolate in ['min', 'both']: s = slice(1, self.n_endpoints + 1) # fit linear model to first n_endpoints model.fit(pp[s].reshape(-1, 1), vals[s].reshape(-1, 1)) # calculate the data value pp[0] vals[0] = model.predict(pp[0].reshape(-1, 1)) # extrapolate upper end point if extrapolate in ['max', 'both']: s = slice(-self.n_endpoints - 1, -1) # fit linear model to last n_endpoints model.fit(pp[s].reshape(-1, 1), vals[s].reshape(-1, 1)) # calculate the data value pp[-1] vals[-1] = model.predict(pp[-1].reshape(-1, 1)) return Cdf(pp, vals) def __sklearn_tags__(self): from dataclasses import replace tags = super().__sklearn_tags__() # Skip tests - only supports 1 feature tags = replace(tags, _skip_test='QuantileMappingReressor only supports 1 feature') return tags
class CunnaneTransformer(TransformerMixin, BaseEstimator): """Quantile transform using Cunnane plotting positions with optional extrapolation. Parameters ---------- alpha : float, optional Plotting positions parameter. Default is 0.4. beta : float, optional Plotting positions parameter. Default is 0.4. extrapolate : str, optional How to extend the cdfs at the tails. Valid options include {`'min'`, `'max'`, `'both'`, `'1to1'`, `None`}. Default is None. n_endpoints : int Number of endpoints to include when extrapolating the tails of the cdf. Usused if ``extrapolate`` is None. Default is 10. Attributes ---------- cdf_ : Cdf NamedTuple representing the fit cdf """ _fit_attributes = ['cdf_'] def __init__( self, *, alpha: float = 0.4, beta: float = 0.4, extrapolate: Literal['min', 'max', 'both', '1to1'] | None = 'both', n_endpoints: int = 10, ) -> None: self.alpha = alpha self.beta = beta self.extrapolate = extrapolate self.n_endpoints = n_endpoints def _validate_data(self, X, y=None, reset=True, **check_params): """Validate input data using sklearn's validate_data.""" return validate_data(self, X=X, y=y, reset=reset, **check_params) def fit(self, X, y=None): """Compute CDF and plotting positions for X. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, 1) The data used to scale along the features axis. If a sparse matrix is provided, it will be converted into a sparse ``csc_matrix``. Additionally, the sparse matrix needs to be nonnegative if `ignore_implicit_zeros` is False. y : None Ignored. Returns ------- self : object Fitted transformer. """ X = check_array(X, ensure_2d=True) if X.shape[1] > 1: raise ValueError('CunnaneTransformer.fit() only supports a single feature') X = X[:, 0] self.cdf_ = Cdf(plotting_positions(len(X)), np.sort(X)) return self def transform(self, X): """Perform the quantile transform. Parameters ---------- X : array_like, shape [n_samples, 1] Samples. Returns ------- y : ndarray of shape (n_samples, ) Transformed data """ X = check_array(X, ensure_2d=True) if X.shape[1] > 1: raise ValueError('CunnaneTransformer.transform() only supports a single feature') X = X[:, 0] left = -np.inf if self.extrapolate in ['min', 'both'] else None right = np.inf if self.extrapolate in ['max', 'both'] else None pps = np.interp(X, self.cdf_.vals, self.cdf_.pp, left=left, right=right) if np.isinf(pps).any(): lower_inds = np.nonzero(-np.inf == pps)[0] upper_inds = np.nonzero(np.inf == pps)[0] model = LinearRegression() if len(lower_inds): s = slice(None, self.n_endpoints) model.fit(self.cdf_.vals[s].reshape(-1, 1), self.cdf_.pp[s].reshape(-1, 1)) pps[lower_inds] = model.predict(X[lower_inds].values.reshape(-1, 1)).squeeze() if len(upper_inds): s = slice(-self.n_endpoints, None) model.fit(self.cdf_.vals[s].reshape(-1, 1), self.cdf_.pp[s].reshape(-1, 1)) pps[upper_inds] = model.predict(X[upper_inds].values.reshape(-1, 1)).squeeze() return pps.reshape(-1, 1) def fit_transform(self, X, y=None): """Fit `CunnaneTransform` to `X`, then transform `X`. Parameters ---------- X : array-like of shape (n_samples, n_features) The data used to generate the fit CDF. y : Ignored Not used, present for API consistency by convention. Returns ------- X_new : ndarray of shape (n_samples, n_features) Transformed data. """ return self.fit(X).transform(X) def inverse_transform(self, X): X = check_array(X, ensure_2d=True) X = X[:, 0] left = -np.inf if self.extrapolate in ['min', 'both'] else None right = np.inf if self.extrapolate in ['max', 'both'] else None vals = np.interp(X, self.cdf_.pp, self.cdf_.vals, left=left, right=right) if np.isinf(vals).any(): lower_inds = np.nonzero(-np.inf == vals)[0] upper_inds = np.nonzero(np.inf == vals)[0] model = LinearRegression() if len(lower_inds): s = slice(None, self.n_endpoints) model.fit(self.cdf_.pp[s].reshape(-1, 1), self.cdf_.vals[s].reshape(-1, 1)) vals[lower_inds] = model.predict(X[lower_inds].reshape(-1, 1)).squeeze() if len(upper_inds): s = slice(-self.n_endpoints, None) model.fit(self.cdf_.pp[s].reshape(-1, 1), self.cdf_.vals[s].reshape(-1, 1)) vals[upper_inds] = model.predict(X[upper_inds].reshape(-1, 1)).squeeze() return vals.reshape(-1, 1) def __sklearn_tags__(self): from dataclasses import replace tags = super().__sklearn_tags__() # Skip tests - only supports 1 feature tags = replace(tags, _skip_test='CunnaneTransformer only supports 1 feature') return tags class EquidistantCdfMatcher(QuantileMappingReressor): """Transform features using equidistant CDF matching, a version of quantile mapping that preserves the difference or ratio between X_test and X_train. Parameters ---------- extrapolate : str, optional How to extend the cdfs at the tails. Valid options include {`'min'`, `'max'`, `'both'`, `'1to1'`, `None`} n_endpoints : int Number of endpoints to include when extrapolating the tails of the cdf Attributes ---------- _X_cdf : Cdf NamedTuple representing the fit's X cdf _y_cdf : Cdf NamedTuple representing the fit's y cdf """ _fit_attributes = ['_X_cdf', '_y_cdf'] def __init__( self, kind: Literal['difference', 'ratio'] = 'difference', extrapolate: Literal['min', 'max', 'both', '1to1'] | None = None, n_endpoints: int = 10, max_ratio: float | None = None, ) -> None: if kind not in ['difference', 'ratio']: raise NotImplementedError('kind must be either difference or ratio') self.kind = kind self.extrapolate = extrapolate self.n_endpoints = n_endpoints # MACA seems to have a max ratio for precip at 5.0 self.max_ratio = max_ratio if self.n_endpoints < 2: raise ValueError('Invalid number of n_endpoints, must be >= 2') def predict(self, X, **kwargs): """Predict regression for target X. Parameters ---------- X : array_like, shape [n_samples, 1] Samples. Returns ------- y : ndarray of shape (n_samples, ) Predicted data. """ check_is_fitted(self, self._fit_attributes) X = check_array(X, ensure_2d=True) X = X[:, 0] sort_inds = np.argsort(X) X_cdf = self._calc_extrapolated_cdf(X[sort_inds], sort=False, extrapolate=self.extrapolate) X_train_vals = np.interp(x=X_cdf.pp, xp=self._X_cdf.pp, fp=self._X_cdf.vals) # generate y value as historical y plus/multiply by quantile difference if self.kind == 'difference': diff = X_cdf.vals - X_train_vals sorted_y_hat = np.interp(x=X_cdf.pp, xp=self._y_cdf.pp, fp=self._y_cdf.vals) + diff elif self.kind == 'ratio': ratio = X_cdf.vals / X_train_vals if self.max_ratio is not None: ratio = np.min(ratio, self.max_ratio) sorted_y_hat = np.interp(x=X_cdf.pp, xp=self._y_cdf.pp, fp=self._y_cdf.vals) * ratio # put things into the right order y_hat = np.full_like(X, np.nan) y_hat[sort_inds] = sorted_y_hat[1:-1] # If extrapolate is 1to1, apply the offset between X and y to the # tails of y_hat if self.extrapolate == '1to1': y_hat = self._extrapolate_1to1(X, y_hat) return y_hat
[docs] class TrendAwareQuantileMappingRegressor(RegressorMixin, BaseEstimator): """Experimental meta estimator for performing trend-aware quantile mapping Parameters ---------- qm_estimator : object, default=None Regressor object such as ``QuantileMappingReressor``. """
[docs] def __init__( self, qm_estimator: BaseEstimator | None = None, trend_transformer: LinearTrendTransformer | None = None, ) -> None: self.qm_estimator = qm_estimator if trend_transformer is None: self.trend_transformer = LinearTrendTransformer()
def _validate_data(self, X, y=None, reset=True, **check_params): """Validate input data using sklearn's validate_data.""" return validate_data(self, X=X, y=y, reset=reset, **check_params)
[docs] def fit(self, X, y): """Fit the model. Parameters ---------- X : array-like, shape [n_samples, n_features] Training data. Returns ------- self : object """ self._X_mean_fit = X.mean() self._y_mean_fit = y.mean() y_trend = copy.deepcopy(self.trend_transformer) y_detrend = y_trend.fit_transform(y) X_trend = copy.deepcopy(self.trend_transformer) x_detrend = X_trend.fit_transform(X) self.qm_estimator.fit(x_detrend, y_detrend) return self
[docs] def predict(self, X): """Predict regression for target X. Parameters ---------- X : array_like, shape [n_samples, n_features] Samples. Returns ------- y : ndarray of shape (n_samples, ) Predicted data. """ X_trend = copy.deepcopy(self.trend_transformer) x_detrend = X_trend.fit_transform(X) y_hat = self.qm_estimator.predict(x_detrend).reshape(-1, 1) # add the mean and trend back # delta: X (predict) - X (fit) + y --> projected change + historical obs mean delta = (X.mean().values - self._X_mean_fit.values) + self._y_mean_fit.values # calculate the trendline # TODO: think about how this would need to change if we're using a rolling average trend trendline = X_trend.trendline(X) trendline -= trendline.mean() # center at 0 # apply the trend and delta y_hat += trendline + delta return y_hat