Source code for forecopy.cov

"""
This module provides **functions for approximating forecast error covariance
matrices**, with support for both cross-sectional and temporal frameworks.
These covariance approximations are a key component of optimal combination
forecast reconciliation methods, since the choice of covariance structure
determines the weighting in least-squares adjustments.

Two main toolsets are included:

- :func:`cscov() <forecopy.cov.cscov>`: 
  For hierarchically, grouped, or otherwise linearly constrained series
  observed at the same frequency.

- :func:`tecov() <forecopy.cov.tecov>`: 
  For temporal hierarchies, where the same time series can be aggregated or
  linearly combined at multiple frequencies (e.g., monthly, quarterly,
  yearly).
"""
import jax.numpy as jnp
from forecopy.tools import cstools, tetools
from forecopy.fun import shrink_estim, sample_estim, vec2hmat, is_PD

[docs] class cscov: """ Cross-sectional covariances --------------------------- Approximate the cross-sectional base forecast error covariance matrix under several reconciliation assumptions (Wickramasuriya et al., 2019; Di Fonzo & Girolimetto, 2023). Parameters ---------- ``params``: cstools A :class:`cstools <forecopy.tools.cstools>` object. ``res``: ndarray An :math:`(N \\times n)` optional numeric matrix containing the residuals. This matrix is used to compute some covariance matrices. ``cov_mat``: jnp.ndarray If given, this :math:`(n \\times n)` matrix is returned by ``fit()`` regardless of `comb`. Use this to plug in a fully custom covariance. Attributes ---------- ``params``: cstools Cached params. ``n``: int Number of series. ``res``: jnp.ndarray Cached residual matrix. ``cov_mat``: jnp.ndarray Custom covariance to return as-is. References ---------- * Di Fonzo, T. and Girolimetto, D. (2023), Cross-temporal forecast reconciliation: Optimal combination method and heuristic alternatives, `International Journal of Forecasting`, 39, 1, 39-57. `DOI:10.1016/j.ijforecast.2021.08.004 <https://doi.org/10.1016/j.ijforecast.2021.08.004>`_ * Wickramasuriya, S.L., Athanasopoulos, G. and Hyndman, R.J. (2019), Optimal forecast reconciliation for hierarchical and grouped time series through trace minimization, `Journal of the American Statistical Association`, 114, 526, 804-819. `DOI:10.1080/01621459.2018.1448825 <https://doi.org/10.1080/01621459.2018.1448825>`_ See Also -------- :func:`shrink_estim <forecopy.fun.shrink_estim>` :func:`cstools <forecopy.tools.cstools>` :func:`csrec <forecopy.lsrec.csrec>` """ def __init__(self, params: cstools = None, res: jnp.ndarray = None): if res is not None: assert res.shape[1] == params.dim[0], "The size of the residuals does not match." self.params = params self.n = params.dim[0] self.res = res self.cov_mat = None
[docs] def fit(self, comb: str | jnp.ndarray = 'ols', return_vector = False, shr_fun = shrink_estim, mse = True): """ Estimate a cross-sectional covariance approximation. Parameters ---------- ``comb``: str or ndarray, default 'ols' The approximation/reconciliation assumption to use: - `ols`: identity error covariance matrix. - `str`: structural variances. - `wls`: series-wise variances from residuals `res`. - `shr`: shrunk covariance of `res` (Wickramasuriya et al., 2019). - `sam` : sample covariance of `res`. - A custom :math:`(n \\times n)` covariance matrix. ``return_vector``: bool, default `False` If True, return the diagonal of the matrix, only for {'ols','str', 'wls'}. Otherwise return an :math:`(n \\times n)` matrix. ``shr_fun``: callable, default :func:`shrink_estim() <forecopy.fun.shrink_estim>` Shrinkage estimator used when `comb='shr'`. It must accept residuals `res` and `mse` and return a mapping with at least the key 'cov' containing the shrunk covariance :math:`(n \\times n)`. ``mse``: bool, default True When `True`, residual moments are not mean-corrected (i.e., MSE rather than unbiased variance). When `False`, apply mean correction. Returns ------- A :math:`(n \\times n)` symmetric positive (semi-)definite matrix. """ if not isinstance(comb, str): self.cov_mat = jnp.array(comb) assert self.cov_mat.shape[0] == self.cov_mat.shape[1] == self.n, "custom matrix should be square." assert is_PD(self.cov_mat), "custom matrix should be positive definite." return self.cov_mat elif comb == "ols": self.cov_mat = _cscov_ols( n = self.n, return_vector = return_vector ) elif comb == "str": self.cov_mat = _cscov_str( strc_mat = self.params.strc_mat(), return_vector = return_vector ) elif comb == "wls": self.cov_mat =_cscov_wls( res = self.res, mse = mse, return_vector = return_vector ) elif comb == "shr": out = _cscov_shr( res = self.res, mse = mse, shr_fun = shr_fun ) self.lmb = out.get('lambda') self.cov_mat = out.get('cov') return out.get('cov') elif comb == "sam": self.cov_mat =_cscov_sam(res = self.res, mse = mse) else: raise Exception("Error cscov") return self.cov_mat
[docs] class tecov: """ Temporal covariances --------------------------- Approximate the temporal base forecast error covariance matrix under several reconciliation assumptions (Di Fonzo & Girolimetto, 2023). Parameters ---------- ``params``: tetools A :class:`tetools <forecopy.tools.tetools>` object. ``res``: ndarray A :math:`[N(k^\\ast+m) \\times 1]` optional numeric vector containing the residuals ordered from the lowest frequency to the highest frequency. This vector is used to compute some covariance matrices. ``cov_mat``: jnp.ndarray If given, this :math:`[(k^\\ast+m) \\times (k^\\ast+m)]` matrix is returned by ``fit()`` regardless of `comb`. Use this to plug in a fully custom covariance. Attributes ---------- ``params``: cstools Cached params. ``kt``: int Total sum of factors. ``res``: jnp.ndarray Cached residual matrix. ``cov_mat``: jnp.ndarray Custom covariance to return as-is. References ---------- * Di Fonzo, T. and Girolimetto, D. (2023), Cross-temporal forecast reconciliation: Optimal combination method and heuristic alternatives, `International Journal of Forecasting`, 39, 1, 39-57. `DOI:10.1016/j.ijforecast.2021.08.004 <https://doi.org/10.1016/j.ijforecast.2021.08.004>`_ See Also -------- :func:`shrink_estim <forecopy.fun.shrink_estim>` :func:`tetools <forecopy.tools.tetools>` :func:`terec <forecopy.lsrec.terec>` """ def __init__(self, params: tetools = None, res: jnp.ndarray = None): if res is not None: assert len(res.shape) == 1, "residual should be a vector" if res.shape[0] % params.kt != 0: raise ValueError("Incorrect residual length.") self.params = params self.kt = params.kt self.res = res self.cov_mat = None
[docs] def fit(self, comb = 'ols', return_vector = False, shr_fun = shrink_estim, mse = True): """ Estimate a cross-sectional covariance approximation. Parameters ---------- ``comb``: str | ndarray, default 'ols' The approximation/reconciliation assumption to use: - `ols`: identity error covariance matrix. - `str`: structural variances. - `wlsv`: series-wise variances from residuals `res`. - `shr`: shrunk covariance of `res`. - `sam` : sample covariance of `res`. - A custom :math:`[(k^\\ast+m) \\times (k^\\ast+m)]` covariance matrix. ``return_vector``: bool, default `False` If True, return the diagonal of the matrix, only for {'ols','str','wlsv'}. Otherwise return an :math:`[(k^\\ast+m) \\times (k^\\ast+m)]` matrix. ``shr_fun``: callable, default :func:`shrink_estim() <forecopy.fun.shrink_estim>` Shrinkage estimator used when `comb='shr'`. It must accept residuals `res` and `mse` and return a mapping with at least the key 'cov' containing the shrunk covariance :math:`[(k^\\ast+m) \\times (k^\\ast+m)]`. ``mse``: bool, default True When `True`, residual moments are not mean-corrected (i.e., MSE rather than unbiased variance). When `False`, apply mean correction. Returns ------- A :math:`[(k^\\ast+m) \\times (k^\\ast+m)]` symmetric positive (semi-)definite matrix. """ if not isinstance(comb, str): self.cov_mat = jnp.array(comb) assert self.cov_mat.shape[0] == self.cov_mat.shape[1] == self.kt, "custom matrix should be square." assert is_PD(self.cov_mat), "custom matrix should be positive definite." return self.cov_mat elif comb == "ols": return _cscov_ols( n = self.kt, return_vector = return_vector ) elif comb == "str": return _cscov_str( strc_mat = self.params.strc_mat(), return_vector = return_vector ) elif comb == "wlsv": return _tecov_wlsv( kset = self.params.kset, res = self.res, mse = mse, return_vector = return_vector ) elif comb == "shr": out = _tecov_shr( kset = self.params.kset, res = self.res, mse = mse, shr_fun = shr_fun ) self.lmb = out.get('lambda') return out.get('cov') elif comb == "sam": return _tecov_sam( kset = self.params.kset, res = self.res, mse = mse ) else: raise Exception("Error tecov")
def _cscov_ols(n, return_vector = False): if return_vector: return jnp.ones(n) else: return jnp.eye(n) def _cscov_str(strc_mat, return_vector = False): if return_vector: return strc_mat.sum(1) else: return jnp.diag(strc_mat.sum(1)) def _cscov_wls(res, mse = True, return_vector = False): assert res is not None, "Missing required argument: 'res'" if not mse: res -= jnp.nanmean(res, 0) var = jnp.nanmean((res**2), 0) if return_vector: return var else: return jnp.diag(var) def _cscov_shr(res, mse=True, shr_fun = shrink_estim): assert res is not None, "Missing required argument: 'res'" return shr_fun(x=res, mse=mse) def _cscov_sam(res, mse=True): assert res is not None, "Missing required argument: 'res'" return sample_estim(x=res, mse=mse) def _tecov_wlsv(kset, res, mse: bool = True, return_vector = False): assert res is not None, "Missing required argument: 'res'" kset = jnp.array(kset) m = kset.max() div = jnp.array([m//k for k in kset]) N = int(res.shape[0] // div.sum()) idk = jnp.repeat(kset, div*N) var_freq = jnp.array([sample_estim(x=res[idk==k], mse=mse).tolist() for k in kset]) out = jnp.repeat(var_freq, div) if return_vector: return out else: return jnp.diag(out) def _tecov_shr(kset, res, mse: bool = True, shr_fun = shrink_estim): assert res is not None, "Missing required argument: 'res'" m = max(kset) N = res.shape[0] // sum([m/k for k in kset]) res_mat = vec2hmat(vec=res, h=int(N), kset=kset) return shr_fun(x=res_mat, mse=mse) def _tecov_sam(kset, res, mse: bool = True): assert res is not None, "Missing required argument: 'res'" m = max(kset) N = res.shape[0] // sum([m/k for k in kset]) res_mat = vec2hmat(vec=res, h=int(N), kset=kset) return sample_estim(x=res_mat, mse=mse)