Source code for mosaicperm.factor

import numpy as np
import pandas as pd
from . import tilings, utilities, core, statistics
from typing import Optional

[docs] def ols_residuals( outcomes: np.array, exposures: np.array, ): """ Computes residuals via cross-sectional OLS. Parameters ---------- outcomes : np.array (``n_obs``, ``n_subjects``) array of outcomes, e.g., asset returns. ``outcomes`` may contain nans to indicate missing values. exposures : np.array (``n_obs``, ``n_subjects``, ``n_factors``) array of factor exposures OR (``n_subjects``, ``n_factors``) array of factor exposures if the exposures do not change with time. Returns ------- residuals : np.array (n_obs, n_subjects) array of residuals from OLS cross-sectional regressions. """ # Case 1: exposures do not change with time if len(exposures.shape) == 2: A = np.linalg.pinv(exposures.T @ exposures) hatbeta = A @ exposures.T @ outcomes.T return outcomes - (exposures @ hatbeta).T # Case 2: exposures change with time elif len(exposures.shape) == 3: residuals = np.zeros(outcomes.shape) for i in range(len(residuals)): A = np.linalg.pinv(exposures[i].T @ exposures[i]) hatbeta = (A @ exposures[i].T) @ outcomes[i] residuals[i] = outcomes[i] - exposures[i] @ hatbeta return residuals else: raise ValueError(f"Exposures must be a 2D or 3D array, but has shape {exposures.shape}")
_mosaic_factor_data_doc = """ outcomes : np.array (``n_obs``, ``n_subjects``) array of outcomes, e.g., asset returns. ``outcomes`` may contain nans to indicate missing values. exposures : np.array (``n_obs``, ``n_subjects``, ``n_factors``) array of factor exposures OR (``n_subjects``, ``n_factors``) array of factor exposures if the exposures do not change with time. """
[docs] class MosaicFactorTest(core.MosaicPermutationTest): __doc__ = """ Mosaic test for factor models with known exposures. Parameters ---------- {data_doc} test_stat : function A function mapping a (``n_obs``, ``n_subjects``)-size array of residuals to either: - A single statistic measuring evidence against the null. - Alternatively, a 1D array of many statistics, in which case the p-value will adaptively aggregate evidence across all test statistics. test_stat_kwargs : dict Optional kwargs to be passed to ``test_stat``. tiles : mosaicperm.tilings.Tiling An optional :class:`.Tiling` to use as the tiling. clusters : np.array An optional ``n_subject``-length array where ``clusters[i] = k`` signals that subject i is in the kth cluster. If ``tiles`` is not provided, this argument allows one to test the null that the residuals are independent between clusters but possibly dependent within clusters. If ``tiles`` is provided, this argument is ignored. impute_zero : bool If True, missing outcomes are represented as exact zeros in the residual matrix. Else, they are represented as np.nan. Note both methods yield provably valid p-values even if the zero imputation is highly inaccurate; the only difference is convenience. **kwargs : dict Optional kwargs to :func:`.default_factor_tiles`. Ignored if ``tiles`` is provided. Examples -------- We run a mosaic permutation test on synthetic data: >>> import numpy as np >>> import mosaicperm as mp >>> >>> # synthetic outcomes and exposures >>> n_obs, n_subjects, n_factors = 100, 200, 20 >>> outcomes = np.random.randn(n_obs, n_subjects) >>> exposures = np.random.randn(n_obs, n_subjects, n_factors) >>> >>> # example of missing data >>> outcomes[0:10][:, 0:5] = np.nan >>> exposures[0:10][:, 0:5] = np.nan >>> >>> # fit mosaic permutation test >>> mpt = mp.factor.MosaicFactorTest( ... outcomes=outcomes, ... exposures=exposures, ... test_stat=mp.statistics.mean_maxcorr_stat, >>> ) >>> mpt.fit().summary() We can also produce a time series plot of this analysis: >>> mpt.fit_tseries(nrand=100, n_timepoints=20) >>> fig, ax = mpt.plot_tseries() """.format(data_doc=_mosaic_factor_data_doc) def __init__( self, outcomes: np.array, exposures: np.array, test_stat: callable, test_stat_kwargs: Optional[dict]=None, tiles: Optional[tilings.Tiling]=None, clusters: Optional[np.array]=None, impute_zero: bool=True, **kwargs, ): # Process data self.outcomes, self.exposures, self.missing_pattern = core._preprocess_data( outcomes=outcomes, covariates=exposures ) self.n_obs, self.n_subjects = outcomes.shape if np.any(self.missing_pattern): # in this case, must make exposures 3-dimensional since the nan # pattern will causes the exposures to change with time if len(self.exposures.shape) == 2: self.exposures = np.stack( [self.exposures for _ in range(self.n_obs)], axis=0 ) # fill with zeros (provably preserving validity) self.exposures[self.missing_pattern] = 0 self.outcomes[self.missing_pattern] = 0 # fill additional missing exposures with zero self.exposures[np.isnan(self.exposures)] = 0 # Remove factors with all zero exposures if len(self.exposures.shape) == 3: to_keep = np.any(self.exposures != 0, axis=(0,1)) else: to_keep = np.any(self.exposures != 0, axis=0) self.exposures = self.exposures[..., to_keep] # Test statistic self.test_stat = test_stat self.tstat_kwargs = test_stat_kwargs if self.tstat_kwargs is None: self.tstat_kwargs = {} # Tiles self.clusters = clusters self.tiles = tiles if self.tiles is None: self.tiles = tilings.default_factor_tiles( exposures=self.exposures, n_obs=len(self.outcomes), clusters=self.clusters, **kwargs ) # Readjust outcomes to ensure that missing pattern # does not yield to local exchangeability violations if np.any(self.missing_pattern): for (batch, group) in self.tiles: missing_subjects = np.any(self.missing_pattern[np.ix_(batch, group)], axis=0) self.missing_pattern[np.ix_(batch, group[missing_subjects])] = True self.outcomes[self.missing_pattern] = 0 self.exposures[self.missing_pattern] = 0 # Save for later self.impute_zero = impute_zero # initialize super().__init__()
[docs] def compute_mosaic_residuals(self) -> np.array: self.residuals = np.zeros(self.outcomes.shape) for tile in self.tiles: batch, group = tile # tile-specific loadings # Case 1: loadings do not change with time if len(self.exposures.shape) == 2: Ltile = self.exposures[group] # Case 2: loadings do change with time else: # Check exposures are constant within tile. Else, adjust constant = np.all([ np.all(self.exposures[j, group] == self.exposures[batch[0], group]) for j in batch ]) if constant: Ltile = self.exposures[batch[0], group] else: Ltile = np.concatenate( [self.exposures[j, group] for j in batch], axis=1 ) # for computational efficiency, get rid of duplicate columns Ltile = np.unique(Ltile, axis=1) # Compute residuals Ytile = self.outcomes[np.ix_(batch, group)] # len(batch) x len(group) ## This is faster but appears to be numerically unstable in edge cases? #Q, _ = np.linalg.qr(Ltile) # Q is len(group) x n_factors #self.residuals[np.ix_(batch, group)] = (Ytile.T - Q @ (Q.T @ Ytile.T)).T ## This is 2x as expensive but stable A = np.linalg.pinv(Ltile.T @ Ltile) @ Ltile.T hatbeta = (A @ Ytile.T) self.residuals[np.ix_(batch, group)] = Ytile - (Ltile @ hatbeta).T # internally, missing data are represented as zeros. # externally, can show that they are nans if not self.impute_zero: self.residuals[self.missing_pattern] = np.nan # initialize null permutation object self._rtilde = self.residuals.copy() return self.residuals
def _window_sum(x: np.array, window: Optional[int], mode: str='valid'): """ Returns np.cumsum(x) if window is None. Else, returns np.convolve(x, np.ones(window), 'valid'). """ if window is None: return np.cumsum(x) else: return np.convolve(x, np.ones(window), mode)
[docs] class MosaicBCV(MosaicFactorTest): __doc__ = """ Mosaic factor test based on :func:`mosaicperm.statistics.adaptive_mosaic_bcv_stat` Parameters ---------- {data_doc} new_exposures : np.array or list (``n_models``, ``n_subject``)-shaped array such that ``new_exposures[i]`` is an array of new exposures. tiles : mosaicperm.tilings.Tiling An optional :class:`.Tiling` to use as the tiling. **kwargs : dict Optional kwargs to :func:`.default_factor_tiles`. Ignored if ``tiles`` is provided. Examples -------- >>> import numpy as np >>> import mosaicperm as mp >>> >>> # synthetic outcomes and exposures >>> n_obs, n_subjects, n_factors = 100, 200, 20 >>> outcomes = np.random.randn(n_obs, n_subjects) >>> exposures = np.random.randn(n_obs, n_subjects, n_factors) >>> >>> # estimate candidate exposures on the first half of the data >>> n0 = int(0.5 * n_obs) >>> resid0 = mp.factor.ols_residuals(outcomes[0:n0], exposures[0:n0]) >>> new_exposures = mp.statistics.approximate_sparse_pcas(np.corrcoef(resid0.T)) >>> >>> # fit mosaic permutation test >>> mpt_bcv = mp.factor.MosaicFactorTest( ... outcomes=outcomes[n0:], ... exposures=exposures[n0:], ... new_exposures=new_exposures, >>> ) >>> mpt_bcv.fit().summary() """.format( data_doc=_mosaic_factor_data_doc, ) def __init__( self, new_exposures: np.array, *args, **kwargs, ): super().__init__( *args, **kwargs, test_stat=None, ) # Ensure a consistent input format if len(new_exposures.shape) == 1: new_exposures = new_exposures.reshape(1, -1) self.new_exposures = new_exposures # test statistic and kwargs self.test_stat = statistics.adaptive_mosaic_bcv_stat self.tstat_kwargs['tiles'] = self.tiles self.tstat_kwargs['new_exposures'] = self.new_exposures
[docs] def summary(self, *args, **kwargs): if 'coordinate_index' not in kwargs: kwargs['coordinate_index'] = [ fr"$R^2$ for candidate model {i}" for i in range(len(self.new_exposures)) ] return super().summary(*args, **kwargs)
def _compute_p_value_tseries( self, nrand: int, verbose: bool, n_timepoints: int, window: Optional[int], convolution_mode: str='valid', ): if convolution_mode not in ['valid', 'full']: raise ValueError(f"convolution_mode={convolution_mode} is not supported.") # Note: # The docs/signature are inherited from the core MosaicFactorTest class. # oos_r2s new_exposures = self.tstat_kwargs['new_exposures'] tiles = self.tstat_kwargs['tiles'] mus = self.tstat_kwargs.get("mus", None) if mus is None: mus = self.residuals.mean(axis=0) n_models = len(new_exposures) active = np.any(new_exposures != 0, axis=0) # actuve features baseline = _window_sum( np.sum(self.residuals[:, active]**2, axis=1), window=window, mode=convolution_mode, ) _stats = np.zeros((len(baseline), nrand+1, n_models)) # Initialize so _stats[:, 0] is the true statistic. self._rtilde = self.residuals.copy() for r in utilities.vrange(nrand+1, verbose=verbose): for i in range(n_models): oos_resids = statistics._bcv_oos_resids( self._rtilde, new_exposure=new_exposures[i], tiles=tiles, mus=mus ) oos_l2s = np.sum(oos_resids[:, active]**2, axis=1) # n_obs length array oos_errors = _window_sum(oos_l2s, window=window, mode=convolution_mode) _stats[:, r, i] = 1 - oos_errors / baseline # Permute self.permute_residuals() # compute adaptive p-value and put everything in the right place self.null_tseries = _stats[:, 1:] self.stats_tseries = _stats[:, 0] self.adapt_stats_tseries = np.zeros(len(baseline)) self.null_adapt_tseries = np.zeros((len(baseline), nrand)) self.pval_tseries = np.zeros(len(baseline)) for i in range(len(baseline)): out = core.compute_adaptive_pval( self.stats_tseries[i], self.null_tseries[i] ) self.pval_tseries[i] = out[0] self.adapt_stats_tseries[i] = out[1] self.null_adapt_tseries[i] = out[2] # z-statistics self._compute_apprx_zstat_tseries() # create self.starts/self.ends to signal indices of output if window is not None: if convolution_mode == 'valid': self.starts = np.arange(0, self.n_obs - window + 1) self.ends = np.arange(window, self.n_obs + 1) elif convolution_mode == 'full': self.starts = np.concatenate( [np.zeros(window - 1), np.arange(0, self.n_obs)] ).astype(int) self.ends = np.concatenate( [np.arange(1, self.n_obs+1), self.n_obs * np.ones(window-1)] ).astype(int) else: self.starts = np.zeros(self.n_obs) self.ends = np.arange(1, self.n_obs+1)