Source code for forecopy.tools

"""
This module provides **utility classes for forecast reconciliation**, with
support for both cross-sectional and temporal frameworks. The tools in
**FoRecoPy** are built around structured matrices that represent aggregation
relationships and linear constraints, which serve as the foundation for
different reconciliation methods. Two complementary toolsets are available:

- :func:`cstools() <forecopy.tools.cstools>`:
  For hierarchically, grouped, or otherwise linearly constrained series
  observed at the same frequency.

- :func:`tetools() <forecopy.tools.tetools>`:
  For temporal hierarchies, where the same time series can be aggregated
  or linearly combined at multiple frequencies (e.g., monthly, quarterly,
  yearly). This allows one to reconcile forecasts across frequencies so that,
  for example, the sum of 12 monthly forecasts matches the corresponding
  annual forecast.

"""

import numpy as np
import jax.numpy as jnp
from forecopy.fun import factors


[docs] class cstools: """ Cross-sectional reconciliation tools ------------------------------------ Utilities for working with linearly constrained (hierarchical/grouped) multivariate time series in a cross-sectional framework. This class encapsulates standard matrices used in forecast reconciliation: the aggregation matrix :math:`\\mathbf{A}`, the structural matrix :math:`\\mathbf{S}`, and the zero-constraints matrix :math:`\\mathbf{C}`. Given either :math:`\\mathbf{A}` or :math:`\\mathbf{C}` at construction, the remaining matrices and dimension metadata are derived. Parameters ---------- ``agg_mat``: ndarray A (:math:`n_a \\times n_b`) numeric matrix representing the cross-sectional aggregation matrix. It maps the :math:`n_b` bottom-level (free) variables into the :math:`n_a` upper (constrained) variables. ``cons_mat``: ndarray A (:math:`n_a \\times n`) numeric matrix representing the cross-sectional zero constraints. It spans the null space for the reconciled forecasts. Attributes ---------- ``agg_mat``: ndarray The aggregation matrix :math:`\\mathbf{A}` (may be inferred from :math:`\\mathbf{C}`). ``dim``: tuple Dimension summary: * if :math:`\\mathbf{A}` given: :math:`(n, n_a, n_b)` with :math:`n = n_a + n_b`. * if :math:`\\mathbf{C}` given: :math:`(n, r, n - r)` with :math:`r = \\text{ rank/rows of } \\mathbf{C}`. ``_cons_mat``: ndarray The constraints matrix :math:`\\mathbf{C}`, built on demand if :math:`\\mathbf{A}` is provided. ``_strc_mat``: ndarray The structural matrix :math:`\\mathbf{S} = [\\mathbf{A}'\\quad\\mathbf{I}_{n_b}]'`. Methods ------- ``strc_mat()`` -> jnp.ndarray Returns the temporal structural matrix :math:`\\mathbf{S}`. Requires `agg_mat` to be available. ``cons_mat()`` -> jnp.ndarray Returns the temporal constraints matrix :math:`\\mathbf{C}`. Notes ----- - Shapes: * :math:`\\mathbf{A}` has shape :math:`(n_a, n_b)`. * :math:`\\mathbf{S}` has shape :math:`(n_a + n_b, n_b) = (n, n_b)`. * :math:`\\mathbf{C}` has shape :math:`(n_a, n) = (n_a, n_a + n_b)` when built from :math:`\\mathbf{A}`. - When initialized with `cons_mat`, :math:`\\mathbf{A}` is only inferred if the leading :math:`(r\\times r)` block equals the identity; specifically, if :math:`\\mathbf{C} = [\\mathbf{I}_{r}\\quad-\\mathbf{A}]`. See Also -------- :func:`cscov <forecopy.cov.cscov>` :func:`csrec <forecopy.lsrec.csrec>` Examples -------- >>> import jax.numpy as jnp >>> import forecopy as rpy >>> # One-level hierarchy: Z = X + Y >>> A = jnp.array([[1., 1.]]) # shape (n_a=1, n_b=2) >>> tools = rpy.cstools(agg_mat=A) >>> tools.dim (3, 1, 2) >>> S = tools.strc_mat() >>> S.shape # [A' ; I_2]' (3, 2) >>> C = tools.cons_mat() >>> C # [I_1 -A] Array([[ 1., -1., -1.]], dtype=float32) >>> >>> # Start from constraints: C = [1, -1, -1] >>> C = jnp.array([[1., -1., -1.]]) # r=1, n=3 >>> tools2 = rpy.cstools(cons_mat=C) >>> tools2.dim (3, 1, 2) >>> tools2.agg_mat # inferred because C = [I | -A] Array([[1., 1.]], dtype=float32) >>> tools2.strc_mat().shape (3, 2) """ def __init__(self, agg_mat: jnp.ndarray = None, cons_mat: jnp.ndarray = None): self.agg_mat = agg_mat self._cons_mat = cons_mat self._strc_mat = None self.dim = () if agg_mat is not None: n = sum(agg_mat.shape) self.dim = (n,) + agg_mat.shape elif cons_mat is not None: r = cons_mat.shape[0] self.dim = (cons_mat.shape[1], r, cons_mat.shape[1] - r) if jnp.all(cons_mat[0:r, 0:r] == jnp.eye(r)): self.agg_mat = -cons_mat[0:r, r:] + 0 else: raise TypeError("Missing required argument: 'agg_mat' or 'cons_mat'") def strc_mat(self): if self.agg_mat is not None: if self._strc_mat is None: self._strc_mat = jnp.vstack( (self.agg_mat, jnp.eye(self.agg_mat.shape[1])) ) return self._strc_mat else: raise TypeError( "A structural representation is not available for this constraints matrix" ) def cons_mat(self): if self._cons_mat is None: self._cons_mat = jnp.hstack((jnp.eye(self.agg_mat.shape[0]), -self.agg_mat)) return self._cons_mat
[docs] class tetools: """ Temporal reconciliation tools ----------------------------- Utilities for forecast reconciliation through temporal hierarchies. Given a set of temporal aggregation orders, this class constructs the temporal aggregation matrix (linear-combination matrix) and provides the corresponding structural and zero-constraint matrices used in temporal reconciliation. Parameters ---------- ``agg_order``: list | int Highest available sampling frequency per seasonal cycle (max. order of temporal aggregation, :math:`m`), or a list representing a subset of :math:`p` factors of :math:`m`. ``tew``: str, default `sum` Temporal aggregation weighting scheme applied within each high-to-low block when building the aggregation matrix: * `sum`: sum over the block. * `avg`: arithmetic average over the block. * `last`: take the last element in the block. * `first`: take the first element in the block. ``fh``: int, default 1 Forecast horizon for the lowest frequency (most temporally aggregated) series. Attributes ---------- ``m``: int Maximum aggregation order, `m = max(agg_order)`. ``kset``: list[int] Decreasing set of temporal aggregation orders that divide `m`. If a single integer was supplied to `agg_order`, this equals the full list of positive factors of `m`. ``p``: int Number of elements in `kset`. ``ks``: int Partial sum of factors :math:`k^\\ast`, defined as `sum(m / kset[:-1])`. ``kt``: int Total sum of factors :math:`k_t`, defined as `sum(m / kset)`. ``tew``: str The requested temporal weighting flag (see parameter description). ``_agg_mat``: jnp.ndarray Temporal aggregation matrix :math:`\\mathbf{A}`. ``_strc_mat``: jnp.ndarray or None Temporal structural matrix, :math:`\\mathbf{S} = [\\mathbf{A}'\\quad\\mathbf{I}_{m}]'`. ``_cons_mat``: jnp.ndarray or None Temporal zero-constraints matrix, :math:`\\mathbf{C} = [\\mathbf{I}_{k^\\ast}\\quad-\\mathbf{A}]`. Methods ------- ``strc_mat()`` -> jnp.ndarray Return the temporal structural matrix :math:`\\mathbf{S}`. ``cons_mat()`` -> jnp.ndarray Return the temporal constraints matrix :math:`\\mathbf{C}``. Notes ----- - Shapes: * :math:`\\mathbf{A}` has shape :math:`(k^\\ast, m)`. * :math:`\\mathbf{S}` has shape :math:`(k^\\ast + m, m) = (k_t, m)`. * :math:`\\mathbf{C}` has shape :math:`(k^\\ast, k_t)`. See Also -------- :func:`tecov <forecopy.cov.tecov>` :func:`terec <forecopy.lsrec.terec>` Examples -------- >>> import jax.numpy as jnp >>> import forecopy as rpy >>> >>> # Quarterly over monthly (m = 4), full factor set {4,2,1} >>> obj = rpy.tetools(agg_order=4, tew='sum', fh=1) >>> obj.kset [4, 2, 1] >>> A = obj._agg_mat >>> S = obj.strc_mat() >>> C = obj.cons_mat() >>> A.shape, S.shape, C.shape ((3, 4), (7, 4), (3, 7)) >>> >>> # Custom set (divisors of m=12 that you care about): {12, 6, 3, 1} >>> obj2 = rpy.tetools(agg_order=[12, 6, 3, 1], fh=2) >>> obj2.kset [12, 6, 3, 1] >>> obj2._agg_mat.shape # i.e., (2+4+24, 12) = (30, 12) ( (12/6)*2 + (12/3)*2 + (12/1)*2, 12 ) """ def __init__(self, agg_order: list | int, tew: str = "sum", fh: int = 1): self.m = np.max(agg_order) kset_full = factors(self.m) if isinstance(agg_order, int): kset = kset_full else: kset = sorted([i for i in agg_order if i in kset_full], reverse=True) if min(kset) != 1: self.kset = self.kset + [1] self.kset = [int(i) for i in kset] self.p = len(self.kset) self.ks = int(sum(self.m / jnp.array(self.kset[0:-1]))) self.kt = int(sum(self.m / jnp.array(self.kset))) self.tew = tew if self.tew == "sum": weights = [jnp.repeat(1, i) for i in self.kset[0:-1]] elif self.tew == "avg": weights = [jnp.repeat(1 / i, i) for i in self.kset[0:-1]] elif self.tew == "last": weights = [jnp.hstack([jnp.repeat(0, i - 1), i]) for i in self.kset[0:-1]] elif self.tew == "first": weights = [jnp.hstack([i, jnp.repeat(0, i - 1)]) for i in self.kset[0:-1]] else: raise ValueError("tew") freq = self.m / np.array(self.kset) agg_mat = [ jnp.kron(jnp.eye(int(freq[i]) * fh), weights[i]) for i in range((len(freq) - 1)) ] self._agg_mat = jnp.vstack(agg_mat) self._cons_mat = None self._strc_mat = None def strc_mat(self): if self._strc_mat is None: self._strc_mat = jnp.vstack( (self._agg_mat, jnp.eye(self._agg_mat.shape[1])) ) return self._strc_mat def cons_mat(self): if self._cons_mat is None: self._cons_mat = jnp.hstack( (jnp.eye(self._agg_mat.shape[0]), -self._agg_mat) ) return self._cons_mat