"""
This module provides **functions for approximating forecast error covariance
matrices**, with support for both cross-sectional and temporal frameworks.
These covariance approximations are a key component of optimal combination
forecast reconciliation methods, since the choice of covariance structure
determines the weighting in least-squares adjustments.
Two main toolsets are included:
- :func:`cscov() <forecopy.cov.cscov>`:
For hierarchically, grouped, or otherwise linearly constrained series
observed at the same frequency.
- :func:`tecov() <forecopy.cov.tecov>`:
For temporal hierarchies, where the same time series can be aggregated or
linearly combined at multiple frequencies (e.g., monthly, quarterly,
yearly).
"""
import jax.numpy as jnp
from forecopy.tools import cstools, tetools
from forecopy.fun import shrink_estim, sample_estim, factors, vec2hmat
[docs]
class cscov:
"""
Cross-sectional covariances
---------------------------
Approximate the cross-sectional base forecast error covariance matrix
under several reconciliation assumptions (Wickramasuriya et al., 2019;
Di Fonzo & Girolimetto, 2023).
Parameters
----------
``params``: cstools
A :class:`cstools <forecopy.tools.cstools>` object.
``res``: ndarray
An :math:`(N \\times n)` optional numeric matrix containing the
residuals. This matrix is used to compute some covariance matrices.
``cov_mat``: jnp.ndarray
If given, this :math:`(n \\times n)` matrix is returned by ``fit()``
regardless of `comb`. Use this to plug in a fully custom covariance.
Attributes
----------
``params``: cstools
Cached params.
``n``: int
Number of series.
``res``: jnp.ndarray
Cached residual matrix.
``cov_mat``: jnp.ndarray
Custom covariance to return as-is.
References
----------
* Di Fonzo, T. and Girolimetto, D. (2023), Cross-temporal forecast
reconciliation: Optimal combination method and heuristic alternatives,
`International Journal of Forecasting`, 39, 1, 39-57.
`DOI:10.1016/j.ijforecast.2021.08.004 <https://doi.org/10.1016/j.ijforecast.2021.08.004>`_
* Wickramasuriya, S.L., Athanasopoulos, G. and Hyndman, R.J. (2019),
Optimal forecast reconciliation for hierarchical and grouped time series
through trace minimization, `Journal of the American Statistical
Association`, 114, 526, 804-819.
`DOI:10.1080/01621459.2018.1448825 <https://doi.org/10.1080/01621459.2018.1448825>`_
See Also
--------
:func:`shrink_estim <forecopy.fun.shrink_estim>`
:func:`cstools <forecopy.tools.cstools>`
:func:`csrec <forecopy.lsrec.csrec>`
"""
def __init__(self, params: cstools = None, res: jnp.ndarray = None,
cov_mat = None):
self.params = params
self.n = params.dim[0]
self.res = res
self.cov_mat = cov_mat
[docs]
def fit(self, comb = 'ols', return_vector = False, shr_fun = shrink_estim,
mse = True):
"""
Estimate a cross-sectional covariance approximation.
Parameters
----------
``comb``: str, default 'ols'
The approximation/reconciliation assumption to use:
- `ols`: identity error covariance matrix.
- `str`: structural variances.
- `wls`: series-wise variances from residuals `res`.
- `shr`: shrunk covariance of `res` (Wickramasuriya et al., 2019).
- `sam` : sample covariance of `res`.
``return_vector``: bool, default `False`
If True, return the diagonal of the matrix, only for
{'ols','str', 'wls'}. Otherwise return an :math:`(n \\times n)`
matrix.
``shr_fun``: callable, default :func:`shrink_estim() <forecopy.fun.shrink_estim>`
Shrinkage estimator used when `comb='shr'`. It must accept residuals
`res` and `mse` and return a mapping with at least the key 'cov'
containing the shrunk covariance :math:`(n \\times n)`.
``mse``: bool, default True
When `True`, residual moments are not mean-corrected (i.e., MSE rather
than unbiased variance). When `False`, apply mean correction.
Returns
-------
A :math:`(n \\times n)` symmetric positive (semi-)definite matrix.
"""
if self.cov_mat is not None:
return self.cov_mat
elif comb == "ols":
return _cscov_ols(
n = self.n,
return_vector = return_vector
)
elif comb == "str":
return _cscov_str(
strc_mat = self.params.strc_mat(),
return_vector = return_vector
)
elif comb == "wls":
return _cscov_wls(
res = self.res,
mse = mse,
return_vector = return_vector
)
elif comb == "shr":
out = _cscov_shr(
res = self.res,
mse = mse,
shr_fun = shr_fun
)
self.lmb = out.get('lambda')
return out.get('cov')
elif comb == "sam":
return _cscov_sam(res = self.res, mse = mse)
else:
raise Exception("Error cscov")
[docs]
class tecov:
"""
Temporal covariances
---------------------------
Approximate the temporal base forecast error covariance matrix
under several reconciliation assumptions (Di Fonzo & Girolimetto, 2023).
Parameters
----------
``params``: tetools
A :class:`tetools <forecopy.tools.tetools>` object.
``res``: ndarray
A :math:`[N(k^\\ast+m) \\times 1]` optional numeric vector containing
the residuals ordered from the lowest frequency to the highest
frequency. This vector is used to compute some covariance matrices.
``cov_mat``: jnp.ndarray
If given, this :math:`[(k^\\ast+m) \\times (k^\\ast+m)]` matrix is
returned by ``fit()`` regardless of `comb`.
Use this to plug in a fully custom covariance.
Attributes
----------
``params``: cstools
Cached params.
``kt``: int
Total sum of factors.
``res``: jnp.ndarray
Cached residual matrix.
``cov_mat``: jnp.ndarray
Custom covariance to return as-is.
References
----------
* Di Fonzo, T. and Girolimetto, D. (2023), Cross-temporal forecast
reconciliation: Optimal combination method and heuristic alternatives,
`International Journal of Forecasting`, 39, 1, 39-57.
`DOI:10.1016/j.ijforecast.2021.08.004 <https://doi.org/10.1016/j.ijforecast.2021.08.004>`_
See Also
--------
:func:`shrink_estim <forecopy.fun.shrink_estim>`
:func:`tetools <forecopy.tools.tetools>`
:func:`terec <forecopy.lsrec.terec>`
"""
def __init__(self, params: tetools = None, res: jnp.ndarray = None, cov_mat = None):
self.params = params
self.kt = params.kt
self.res = res
self.cov_mat = cov_mat
[docs]
def fit(self, comb = 'ols', return_vector = False, shr_fun = shrink_estim, mse = True):
"""
Estimate a cross-sectional covariance approximation.
Parameters
----------
``comb``: str, default 'ols'
The approximation/reconciliation assumption to use:
- `ols`: identity error covariance matrix.
- `str`: structural variances.
- `wlsv`: series-wise variances from residuals `res`.
- `shr`: shrunk covariance of `res`.
- `sam` : sample covariance of `res`.
``return_vector``: bool, default `False`
If True, return the diagonal of the matrix, only for
{'ols','str','wlsv'}. Otherwise return an
:math:`[(k^\\ast+m) \\times (k^\\ast+m)]` matrix.
``shr_fun``: callable, default :func:`shrink_estim() <forecopy.fun.shrink_estim>`
Shrinkage estimator used when `comb='shr'`. It must accept residuals
`res` and `mse` and return a mapping with at least the key 'cov'
containing the shrunk covariance
:math:`[(k^\\ast+m) \\times (k^\\ast+m)]`.
``mse``: bool, default True
When `True`, residual moments are not mean-corrected (i.e., MSE
rather than unbiased variance). When `False`, apply mean correction.
Returns
-------
A :math:`[(k^\\ast+m) \\times (k^\\ast+m)]` symmetric positive
(semi-)definite matrix.
"""
if self.cov_mat is not None:
return self.cov_mat
elif comb == "ols":
return _cscov_ols(
n = self.kt,
return_vector = return_vector
)
elif comb == "str":
return _cscov_str(
strc_mat = self.params.strc_mat(),
return_vector = return_vector
)
elif comb == "wlsv":
return _tecov_wlsv(
agg_order = self.params.kset,
res = self.res,
mse = mse,
return_vector = return_vector
)
elif comb == "shr":
out = _tecov_shr(
agg_order = self.params.kset,
res = self.res,
mse = mse,
shr_fun = shr_fun
)
self.lmb = out.get('lambda')
return out.get('cov')
elif comb == "sam":
return _tecov_sam(
agg_order = self.params.kset,
res = self.res,
mse = mse
)
else:
raise Exception("Error tecov")
def _cscov_ols(n, return_vector = False):
if n is None:
raise TypeError("Missing required argument: 'n'")
if return_vector:
return jnp.ones(n)
else:
return jnp.eye(n)
def _cscov_str(strc_mat, return_vector = False):
if strc_mat is None:
raise TypeError("Missing required argument: 'strc_mat'")
if return_vector:
return strc_mat.sum(1)
else:
return jnp.diag(strc_mat.sum(1))
def _cscov_wls(res, mse = True, return_vector = False):
if res is None:
raise TypeError("Missing required argument: 'res'")
if not mse:
res -= jnp.nanmean(res, 0)
var = jnp.nanmean((res**2), 0)
if return_vector:
return var
else:
return jnp.diag(var)
def _cscov_shr(res, mse=True, shr_fun = shrink_estim):
if res is None:
raise TypeError("Missing required argument: 'res'")
cov = shr_fun(x=res, mse=mse)
return cov
def _cscov_sam(res, mse=True):
if res is None:
raise TypeError("Missing required argument: 'res'")
return sample_estim(x=res, mse=mse)
def _tecov_wlsv(agg_order, res, mse: bool = True, return_vector = False):
if agg_order is None:
raise ValueError("Missing required argument: 'agg_order'")
if res is None:
raise ValueError("Missing required argument: 'res'")
if isinstance(agg_order, list):
kset = [int(x) for x in agg_order]
kset = sorted(kset, reverse=True)
else:
kset = factors(int(agg_order))
if len(res.shape) != 1:
if res.shape[0] != 1:
raise ValueError("'res' is not a vector.")
res = res[0,:]
m = max(kset)
div = [int(m/k) for k in kset]
N = int(res.shape[0]/sum([m/k for k in kset]))
idk = jnp.repeat(jnp.array(kset), jnp.array(div)*N)
var_freq = [sample_estim(x=res[idk==k], mse=mse).tolist() for k in kset]
out = jnp.repeat(jnp.array(var_freq), jnp.array(div))
if return_vector:
return out
else:
return jnp.diag(out)
def _tecov_shr(agg_order, res, mse: bool = True, shr_fun = shrink_estim):
if agg_order is None:
raise ValueError("Missing required argument: 'agg_order'")
if res is None:
raise ValueError("Missing required argument: 'res'")
if isinstance(agg_order, list):
kset = [int(x) for x in agg_order]
kset = sorted(kset, reverse=True)
else:
kset = factors(int(agg_order))
if len(res.shape) != 1:
if res.shape[0] != 1:
raise ValueError("'res' is not a vector.")
res = res[0,:]
m = max(kset)
N = res.shape[0]/sum([m/k for k in kset])
res_mat = vec2hmat(vec=res, h=int(N), kset=kset)
return shr_fun(x=res_mat, mse=mse)
def _tecov_sam(agg_order, res, mse: bool = True):
if agg_order is None:
raise ValueError("Missing required argument: 'agg_order'")
if res is None:
raise ValueError("Missing required argument: 'res'")
if isinstance(agg_order, list):
kset = [int(x) for x in agg_order]
kset = sorted(kset, reverse=True)
else:
kset = factors(int(agg_order))
if len(res.shape) != 1:
if res.shape[0] != 1:
raise ValueError("'res' is not a vector.")
res = res[0,:]
m = max(kset)
N = res.shape[0]/sum([m/k for k in kset])
res_mat = vec2hmat(vec=res, h=int(N), kset=kset)
return sample_estim(x=res_mat, mse=mse)