import numpy as np
import pandas as pd
import scipy.linalg
from typing import Optional, Union
from . import core, utilities, tilings
import sklearn.linear_model as lm
import scipy.sparse
import itertools
def _construct_swapinds(t):
"""
Swaps elements t-1 and t of an array of length t.
"""
swapinds = np.arange(t)
swapinds[1::2] -= 1
swapinds[::2] += 1
return np.minimum(swapinds, t-1).astype(int)
def construct_tile_transformation(
subjects: np.array,
times: np.array,
covariates: Optional[Union[np.array, scipy.sparse.sparray]]=None,
invariance='local_exch',
):
"""
Constructs a transformation function for a tile of n observations.
Parameters
----------
subjects : np.array
n-length integer array specifying the subject
for each observation.
times : np.array
n-length integer array specifying the times
for each observation.
covariates : None | np.array | sparse.sparray
Optional n x p (sparse) array of covariates.
invariance : str
A string specifying the type of transformation. Options:
- 'local_exch': swaps time t and t+1 for all even t.
- 'local_exch_cts': within each subject, swaps neighboring timepoints.
- 'local_exch_space': swaps subjects i and i+1 for all odd i.
- 'time_reverse': reverses order of each time series
- 'wild_bs': multiplies all residuals by -1
Returns
-------
transformation : callable
transformation(residuals) returns the desired
transformation of the residuals
"""
n = len(subjects)
if len(subjects) != len(times):
raise ValueError(
f"different lengths for subjects ({len(subjects)}) and times ({len(times)})"
)
# Continuous vs discrete variance
if invariance == 'local_exch' and np.any(times != times.astype(int)):
invariance = 'local_exch_cts'
invariance = str(invariance).lower()
if invariance in ['local_exch', 'local_exch_cts', 'time_reverse']:
permutation = np.zeros(n, dtype=int)
# Loop through subjects
for subject in np.unique(subjects):
# time series for this subject
inds = np.where(subjects == subject)[0]
if len(inds) == 1:
permutation[inds] = inds
continue
# argsort and swap neighboring observations
argsort_times = inds[np.argsort(times[inds])]
rev_inds = np.argsort(np.argsort(times[inds]))
# Different permutations
if invariance == 'local_exch_cts':
subject_swapinds = argsort_times[_construct_swapinds(len(inds))][rev_inds]
elif invariance == 'local_exch':
# custom swaps which only swap times t and t+1 for even t;
# if time 2 is present but not time 3, then time 2 is not swapped.
diffs = times[argsort_times][1:]- times[argsort_times][:-1]
swapinds = np.arange(len(inds))
even = times[argsort_times][:-1] % 2 == 0
swapinds[:-1][(diffs == 1) & even] += 1
swapinds[1:][(diffs == 1) & even] -= 1
# then construct
subject_swapinds = argsort_times[swapinds][rev_inds]
else:
subject_swapinds = np.flip(argsort_times)[rev_inds]
# save
permutation[inds] = subject_swapinds
return lambda x: x[permutation]
if invariance == 'local_exch_space':
return construct_tile_transformation(
subjects=times, times=subjects, invariance='local_exch'
)
if invariance == 'wild_bs':
return lambda x: -x
else:
raise ValueError(f"Unrecognized invariance={invariance}")
[docs]
def ols_residuals_panel(
outcomes: np.array,
covariates: np.array,
):
"""
Computes residuals via OLS (NOT cross-sectional OLS).
Parameters
----------
outcomes : np.array
n-length array of outcome data
covariates : np.array
(n,p)-shaped array of covariates.
Returns
-------
residuals : np.array
n-length array of OLS residuals.
"""
n_cov = covariates.shape[-1]
# Case 1: dense.
if isinstance(covariates, np.ndarray):
# Flatten for QR decomposition
hatbeta = np.linalg.lstsq(a=covariates, b=outcomes, rcond=None)[0]
#A = np.linalg.pinv(covariates.T @ covariates)
#hatbeta = A @ covariates.T @ outcomes
resid = outcomes - covariates @ hatbeta
# Case 2: sparse
elif isinstance(covariates, scipy.sparse.csr_matrix):
ols = lm.LinearRegression(fit_intercept=False)
ols.fit(y=outcomes, X=covariates)
resid = outcomes - ols.predict(covariates)
return resid.reshape(*outcomes.shape, order='C')
def _convert_pd_to_numpy(x):
if isinstance(x, pd.Series) or isinstance(x, pd.DataFrame):
return x.values
return x
[docs]
class MosaicPanelTest(core.MosaicPermutationTest):
"""
Initialize a MosaicPanelTest for panel data analysis.
Parameters
----------
outcomes : np.array or pd.Series
``n``-length array of outcome data for the panel analysis.
test_stat : callable
A function mapping a ``n``-length array of residuals to either:
- A single statistic measuring evidence against the null.
- Alternatively, a 1D array of many statistics, in which
case the p-value will adaptively aggregate evidence across
all test statistics.
subjects : np.array or pd.Series
``n``-length array specifying the subject identifier
for each observation.
times : np.array or pd.Series
``n``-length array specifying the time identifier
for each observation.
clusters : np.array, pd.Series, or scipy.sparse.csr_matrix, optional
``n``-length array where ``clusters[i] = k`` signals that
observation i is in the kth cluster. If not provided,
defalts to using subjects as clusters.
cts_covariates : pd.DataFrame or np.array, optional
``(n, p)``-shaped array of continuous covariates to control for
in the analysis. If not provided, defaults to an intercept-only model.
discrete_covariates : pd.DataFrame, optional
DataFrame of discrete/categorical covariates to be converted
to dummy variables and included in the model.
sparse : bool, default True
If True, uses sparse representation of discrete_covariates.
Ignored if discrete_covariates is not supplied.
tstat_kwargs : dict, optional
Optional kwargs to be passed to ``test_stat``.
invariance : str, default 'local_exch'
A string specifying the type of transformation for permutations. Options:
- 'local_exch': swaps time t and t+1 for all even t.
- 'local_exch_cts': within each subject, swaps neighboring timepoints.
- 'time_reverse': reverses order of each time series.
- 'wild_bs': multiplies all residuals by -1.
ntiles : int, default 10
Number of tiles to use for the mosaic permutation test.
tile_ids : np.array, optional
``n``-length array where ``tile_ids[i] = k`` implies that observation
i is in tile k. If not provided, tiles are constructed automatically.
"""
def __init__(
self,
outcomes: Union[np.array, pd.Series],
test_stat: callable,
subjects: Union[np.array, pd.Series],
times: Union[np.array, pd.Series],
# Optional inputs
clusters: Optional[Union[np.array, pd.Series, scipy.sparse.csr_matrix]]=None,
cts_covariates: Optional[Union[pd.DataFrame, np.array]]=None,
discrete_covariates: Optional[pd.DataFrame]=None,
sparse: bool=True,
# tstat kwargs
tstat_kwargs: Optional[dict]=None,
# tile / permutation kwargs
invariance='local_exch',
ntiles: int=10,
tile_ids: Optional[np.array]=None,
):
# Outcomes, subjects, times, and clusters
self.outcomes = _convert_pd_to_numpy(outcomes)
self.n = len(self.outcomes)
self.orig_subjects = _convert_pd_to_numpy(subjects)
self.subjects = tilings._preprocess_partition(self.orig_subjects)
self.times = _convert_pd_to_numpy(times)
self.invariance = invariance
self.ntiles = ntiles
if clusters is not None:
self.clusters = tilings._preprocess_partition(_convert_pd_to_numpy(clusters))
else:
self.clusters = self.subjects
# Construct covariates
if cts_covariates is not None:
self.covariates = _convert_pd_to_numpy(cts_covariates)
else:
self.covariates = np.ones((self.n, 1))
# Possibly add discrete covariates
if discrete_covariates is not None:
dummies = utilities.get_dummies_sparse(discrete_covariates)
if sparse:
self.covariates = scipy.sparse.hstack(
(scipy.sparse.csr_matrix(self.covariates), dummies)
)
else:
self.covariates = np.concatenate([self.covariates, dummies.toarray()], axis=1)
self.n_cov = self.covariates.shape[1]
self.test_stat = test_stat
self.tstat_kwargs = dict() if tstat_kwargs is None else tstat_kwargs
self._create_tiles(invariance=invariance, ntiles=ntiles, tile_ids=tile_ids)
def _create_tiles(self, invariance: str, ntiles: int, tile_ids: Optional[np.array]=None):
"""
Creates tiles and tile transformations.
"""
# Create tiles
if tile_ids is None:
ntiles = int(min(ntiles, len(np.unique(self.clusters))))
self.tile_ids = tilings.coarsify_partition(self.clusters, k=ntiles, random=False)
else:
self.tile_ids = tile_ids
self.ntiles = len(np.unique(self.tile_ids))
self.tile_inds = [
np.where(self.tile_ids == tile_id)[0] for tile_id in range(self.ntiles)
]
# Create tile transformations
self.invariance = invariance
self.tile_transforms = [
construct_tile_transformation(
subjects=self.subjects[self.tile_inds[tile_id]],
times=self.times[self.tile_inds[tile_id]],
covariates=self.covariates[self.tile_inds[tile_id]],
invariance=self.invariance,
) for tile_id in range(self.ntiles)
]
[docs]
def compute_mosaic_residuals(self):
self.residuals = np.zeros(self.n)
for tile_id, inds in enumerate(self.tile_inds):
transform = self.tile_transforms[tile_id]
# augmented covariates: two cases
if isinstance(self.covariates, np.ndarray):
aug_cov = np.concatenate([self.covariates[inds], transform(self.covariates[inds])], axis=1)
else:
aug_cov = scipy.sparse.hstack((self.covariates[inds], transform(self.covariates[inds])))
# residuals
self.residuals[inds] = ols_residuals_panel(
outcomes=self.outcomes[inds], # this is flat, still fine with this function call
covariates=aug_cov,
)
self._rtilde = self.residuals.copy()
[docs]
def permute_residuals(self, _permute_all=False):
# Note: _permute_all is an internal flag which does all the flips
# uniforms
U = np.random.uniform(size=len(self.tile_inds))
# Loop through and possibly permute
for tile_id, inds in enumerate(self.tile_inds):
if U[tile_id] <= 0.5 or _permute_all:
self._rtilde[inds] = self.tile_transforms[tile_id](self.residuals[inds])
else:
self._rtilde[inds] = self.residuals[inds]
[docs]
class QuadraticMosaicPanelTest(MosaicPanelTest):
"""
Mosaic permutation test for panel data using quadratic test statistics.
This class extends MosaicPanelTest to perform tests based on quadratic forms
of residual covariances or correlations across tiles. It uses dynamic programming
to compute the permutation distribution extremely efficiently.
Parameters
----------
*args
Positional arguments passed to the parent MosaicPanelTest class.
See :class:`MosaicPanelTest` for details on required arguments
(outcomes, test_stat, subjects, times, etc.).
method : str, default 'cov'
The method for computing the quadratic test statistic. Options:
- 'cov': Uses covariances between tile-aggregated residuals.
- 'abscov': Uses absolute values of covariances between tile-aggregated residuals.
- 'corr': Uses correlations between tile-aggregated residuals.
- 'abscorr': Uses absolute values of correlations between tile-aggregated residuals.
weights : np.array or pd.Series, optional
``n_subjects``-length array mapping each subject to its weight
in the aggregation. Weights may be negative. If provided as a
pd.Series, the index should correspond to subject identifiers.
If not provided, defaults to equal weights of 1 for all subjects.
**kwargs
Additional keyword arguments passed to the parent MosaicPanelTest class.
"""
def __init__(self, *args, method: str='cov', weights: Optional[np.array]=None, **kwargs):
super().__init__(*args, **kwargs, test_stat=None)
self.method = method
self.weights = weights
if isinstance(self.weights, pd.Series):
# Ensure index is compatible with processed subjects
orig2new = {self.orig_subjects[x]:self.subjects[x] for x in range(len(self.subjects))}
self.weights.index = self.weights.index.map(orig2new)
if self.weights is None:
self.weights = np.ones(len(np.unique(self.subjects)))
def _compute_p_value(self, nrand: int, verbose: bool) -> float:
### Preprocessing
self.permute_residuals(_permute_all=True)
# Step 1: pivots
rdf = pd.DataFrame(
np.stack([self.residuals, self.times, self.subjects], axis=1),
columns=['residuals', 'times', 'subjects']
).pivot(
index='times',
columns='subjects',
values='residuals'
)
self.rdf = rdf
rtildedf = pd.DataFrame(
np.stack([self._rtilde, self.times, self.subjects], axis=1),
columns=['residuals', 'times', 'subjects']
).pivot(
index='times',
columns='subjects',
values='residuals'
)
tdf = pd.DataFrame(
np.stack([self.tile_ids, self.times, self.subjects], axis=1),
columns=['tiles', 'times', 'subjects']
).pivot(
index='times',
columns='subjects',
values='tiles'
)
self.tdf = tdf
# Step 2: create aggregate tile-specific residuals
weights = pd.Series(self.weights, index=np.sort(np.unique(self.subjects)))
acr = []
acrtilde = []
tilenums = np.unique(self.tile_ids); ntiles = len(tilenums)
for tilenum in tilenums:
for l, df in zip([acr, acrtilde], [rdf, rtildedf]):
l.append((df * weights * (tdf == tilenum)).sum(axis=1))
acr = pd.DataFrame(acr, index=tilenums).T # T x ntiles
acrtilde = pd.DataFrame(acrtilde, index=tilenums).T
self.acr = acr
self.acrtilde = acrtilde
# Step 3: Create covariances
self.all_covs = np.zeros((2, 2, ntiles, ntiles))
if self.method in ['cov', 'abscov']:
self.all_covs = np.stack(
[np.stack([acr.values.T @ acr.values, acr.values.T @ acrtilde.values], axis=0),
np.stack([acrtilde.values.T @ acr.values, acrtilde.values.T @ acrtilde.values], axis=0)],
axis=0
)
elif self.method in ['corr', 'abscorr']:
for z0, acr1 in zip([0, 1], [acr.values, acrtilde.values]):
for z1, acr2 in zip([0, 1], [acr.values, acrtilde.values]):
self.all_covs[z0, z1] = np.corrcoef(
acr1.T, acr2.T
)[0:ntiles][:, ntiles]
if self.method in ['abscov', 'abscorr']:
self.all_covs = np.abs(self.all_covs)
# get rid of diagonals
for z0, z1 in itertools.product([0, 1], [0,1]):
self.all_covs[z0, z1] -= np.diag(np.diag(self.all_covs[z0, z1]))
# Step 4: create p-value
self.statistic = np.mean(self.all_covs[0, 0])
self.null_statistics = np.zeros(nrand).reshape(-1, 1)
Z = np.random.binomial(1, 0.5, size=(nrand, len(tilenums)))
for r in utilities.vrange(nrand, verbose=verbose):
for z0, z1 in itertools.product([0,1], [0,1]):
self.null_statistics[r] += np.mean(self.all_covs[z0, z1] * np.outer(Z[r]==z0, Z[r]==z1).astype(float))
self.pval = (1+np.sum(self.statistic <= self.null_statistics)) / (nrand + 1)
# Compute approximate z-statistic
comb = np.concatenate([[self.statistic], self.null_statistics.flatten()])
# Handle when SD == 0
if comb.std() > 0:
self.apprx_zstat = (self.statistic - comb.mean()) / comb.std()
else:
self.apprx_zstat = 0
return self.pval
def _precompute_lfo_qr(X, transform, lmda=1e-8):
"""
Computes QR of augmented design with regularization to ensure
numerical stability for low-rank designs.
"""
# Normalize
Xaug = np.concatenate([X, transform(X)], axis=1)
ses = Xaug.std(axis=0)
ses[ses == 0] = 1
Xaug = Xaug / ses
# Add
Xaug = np.concatenate([Xaug, lmda * np.eye(Xaug.shape[1])], axis=0)
# QR
return np.linalg.qr(Xaug)
def _compute_lfo_resid(
fullQ: np.array,
fullR: np.array,
outcomes: np.array,
feature: int,
n_cov: int,
Z: Optional[np.array]=None
):
"""
Uses precomputed Q, R of full design to compute leave-feature-out
residuals.
Parameters
----------
fullQ : np.array
(n + 2*n_cov) x (2* n_cov) Q from QR decomp of full augmented matrix.
fullR : np.array
(2*n_cov x 2*n_cov) R from QR decomp of full augmented matrix
outcomes : np.array
array of outcomes
feature : int
Integer between 0 and n_cov - 1 specifying which feature is of interest.
n_cov : int
Number of covariates
Z : np.array
n-length array of values taken by feature of interest, optional.
Returns
-------
resid : np.array
Residuals of ``outcomes" after projecting out the augmented design matrix
without the ``feature``.
Zresid : np.array
Residuals of ``Z``, assuming ``Z`` is provided.
"""
# Update
Q = fullQ.copy()
R = fullR.copy()
for k in [feature + n_cov, feature]:
Q, R = scipy.linalg.qr_delete(Q, R, k=k, p=1, which='col', check_finite=True)
# Get rid of last 2*n_cov rows of Q which are for numerical stability
Qsub = Q[0:len(outcomes)]
resid = outcomes - Qsub @ (Qsub.T @ outcomes)
if Z is None:
return resid
else:
Zresid = Z - Qsub @ (Qsub.T @ Z)
return resid, Zresid
[docs]
class MosaicPanelInference(MosaicPanelTest):
"""
Mosaic permutation-based inference for linear models in panel data.
Parameters
----------
*args
Positional arguments passed to the parent MosaicPanelTest class.
See :class:`MosaicPanelTest` for details on required arguments
(outcomes, subjects, times, etc.). Note that ``test_stat`` is
automatically set to None as it is not used in this inference class.
**kwargs
Additional keyword arguments passed to the parent MosaicPanelTest class.
Common arguments include ``cts_covariates``, ``discrete_covariates``,
``clusters``, ``invariance``, ``ntiles``, etc.
Notes
-----
For dense covariate matrices, the class uses efficient QR decomposition
updates to avoid recomputing regressions for each feature. For sparse
matrices, it falls back to recomputing regressions as needed.
Examples
--------
>>> import numpy as np
>>> import pandas as pd
>>> import mosaicperm as mp
>>>
>>> # Generate synthetic panel data
>>> n_subjects, n_times = 50, 20
>>> n_obs = n_subjects * n_times
>>> subjects = np.repeat(np.arange(n_subjects), n_times)
>>> times = np.tile(np.arange(n_times), n_subjects)
>>>
>>> # Generate covariates and outcomes
>>> X = np.random.randn(n_obs, 3)
>>> beta_true = np.array([1.0, -0.5, 0.2])
>>> outcomes = X @ beta_true + np.random.randn(n_obs) * 0.5
>>>
>>> # Fit mosaic panel inference
>>> mpi = mp.panel.MosaicPanelInference(
... outcomes=outcomes,
... subjects=subjects,
... times=times,
... cts_covariates=X,
... ntiles=8
... )
>>> mpi.fit(nrand=1000, alpha=0.05)
>>> print(mpi.summary)
The summary will show estimates, standard errors, confidence intervals,
and p-values for each coefficient.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, test_stat=None)
[docs]
def compute_mosaic_residuals(self, verbose: bool=True):
"""
Computes full mosaic residuals and precomputes useful quantities.
"""
if isinstance(self.covariates, np.ndarray):
# Loop through tiles
self.residuals = np.zeros(self.n)
self._tile_QRs = dict() # maps tile_num --> (Q, R) of augmented, regularized design
for tile_id in utilities.vrange(len(self.tile_inds), verbose=verbose):
inds = self.tile_inds[tile_id]
# Compute Q, R decomp of augmented design
Q, R = _precompute_lfo_qr(self.covariates[inds], transform=self.tile_transforms[tile_id])
self._tile_QRs[tile_id] = Q, R
# Compute final residuals
Qsub = Q[0:len(inds)]
self.residuals[inds] = self.outcomes[inds] - Qsub @ (Qsub.T @ self.outcomes[inds])
# Initialize rtilde correctly
self._rtilde = self.residuals.copy()
return self.residuals
else:
super().compute_mosaic_residuals()
def _downdate_mosaic_residuals(self, feature: int) -> np.array:
"""
Efficiently computes mosaic residuals leaving
out feature j.
Parameters
----------
feature : int
Integer in {0, ..., n_cov - 1}.
Returns
-------
lfo_resids : np.array
Resids leaving out ``feature``
Notes
-----
Also projects out the influence of the other covariates on
``self.covariates[:, :, feature]`` and stores the result in
``self.orthog_feature``.
"""
# Stores "leave feature out" residuals
self.lfo_resids = np.zeros(self.residuals.shape)
# Stores the feature
self.orthog_feature = np.zeros(self.residuals.shape)
# If Z is the feature and A is the orthogonalized feature,
# stores np.sum(Z * A) and np.sum(Z * transform(A)), etc.
self.Zresid_sums = np.zeros((self.ntiles, 2))
self.ZA_sums = np.zeros((self.ntiles, 2))
for tile_id, inds in enumerate(self.tile_inds):
Z = self.covariates[inds, feature]
transform = self.tile_transforms[tile_id]
# In the non-sparse case, use efficient leave-one-out updates
if isinstance(self.covariates, np.ndarray):
# Downdate QR and obtain residuals
fullQ, fullR = self._tile_QRs[tile_id]
resid, A = _compute_lfo_resid(
fullQ=fullQ,
fullR=fullR,
outcomes=self.outcomes[inds],
feature=feature,
n_cov=self.n_cov,
Z=Z
)
# In the sparse case, just redo each regression
else:
Z = Z.toarray().flatten()
neg_feature = np.array([j for j in range(self.n_cov) if j != feature]).astype(int)
# augmented covariates
Xtile = self.covariates[inds][:, neg_feature]
aug_cov = scipy.sparse.hstack([Xtile, transform(Xtile)])
# residuals
resid = ols_residuals_panel(
outcomes=self.outcomes[inds],
covariates=aug_cov,
)
A = ols_residuals_panel(
outcomes=Z,
covariates=aug_cov,
)
# Save
self.lfo_resids[inds] = resid
self.orthog_feature[inds] = A
# Precompute sums used to compute slopes/intercepts within tiles
# Note that A @ resid = Z @ resid and Z @ A = A @ A
# and using A instead of Z is slightly more stable
self.Zresid_sums[tile_id, 0] = np.sum(A * resid)
self.Zresid_sums[tile_id, 1] = np.sum(A * transform(resid))
self.ZA_sums[tile_id, 0] = np.sum(A * A)
self.ZA_sums[tile_id, 1] = np.sum(A * transform(A))
# return
return self.lfo_resids
def _precompute_confidence_intervals(
self,
nrand: int,
features: Optional[np.array]=None,
verbose: bool=True,
use_centered_stat: bool=False,
):
"""
Computes underlying confidence intervals for :meth:`fit`.
Parameters
----------
alpha : int
Desired nominal Type I error rate.
nrand : int
Number of randomizations.
features : np.array
The list of features to compute CIs for.
Defaults to all features.
verbose : bool
If True, show progress bars.
"""
# process features
if features is None:
features = np.arange(self.n_cov).astype(int)
if not utilities.haslength(features):
features = np.array([features]).astype(int)
self.features = features
# Save
self.estimates = np.zeros(self.n_cov); self.estimates[:] = np.nan
self.ses = self.estimates.copy()
self.lcis = self.ses.copy()
self.ucis = self.ses.copy()
self.pvals = self.ses.copy()
# Save the slopes and statistics
self._stats = np.zeros(self.n_cov); self._stats[:] = np.nan
self._slopes = self._stats.copy()
# save null slopes/statistics
self._null_stats = np.zeros((self.n_cov, nrand))
self._null_stats[:] = np.nan
self._null_slopes = self._null_stats.copy()
# Randomization (common to all features)
flips = np.random.binomial(1, 0.5, size=(nrand, self.ntiles)).astype(int)
# Loop through and compute
xinds = np.stack([np.arange(self.ntiles) for _ in range(nrand)])
for j in utilities.vrange(len(features), verbose=verbose):
feature = features[j]
self._downdate_mosaic_residuals(feature=feature)
if use_centered_stat:
# Observed test statistic + slope as beta changes
stat = self.Zresid_sums[:, 0].sum() - self.Zresid_sums[:, 1].sum()
slope = self.ZA_sums[:, 0].sum() - self.ZA_sums[:, 1].sum()
# Compute null test statistics + slopes as beta changes
null_stats = (self.Zresid_sums[(xinds, flips)] - self.Zresid_sums[(xinds, 1-flips)]).sum(axis=1)
null_slopes = (self.Zresid_sums[(xinds, flips)] - self.Zresid_sums[(xinds, 1-flips)]).sum(axis=1)
else:
# Observe test statistic + slope as beta changes
stat = self.Zresid_sums[:, 0].sum()
slope = self.ZA_sums[:, 0].sum()
# Compute null test statistics + slopes as beta changes
null_stats = self.Zresid_sums[(xinds, flips)].sum(axis=1)
null_slopes = self.ZA_sums[(xinds, flips)].sum(axis=1)
# Save
self._null_stats[j] = null_stats
self._null_slopes[j] = null_slopes
self._stats[j] = stat
self._slopes[j] = slope
# Compute p-value
self.pvals[j] = (1 + np.sum(np.abs(stat) <= np.abs(null_stats))) / (1 + nrand)
# # Estimate
self.estimates[j] = stat / slope
# # null estimators under the null where beta_j = estimate
null_estimates = (null_stats - self.estimates[j] * null_slopes) / slope
self.ses[j] = np.sqrt(np.mean(null_estimates**2))
[docs]
def compute_cis(self, alpha=0.05):
for j in self.features:
self.lcis[j], self.ucis[j] = invert_linear_statistic(
stat=self._stats[j],
slope=self._slopes[j],
null_stats=self._null_stats[j],
null_slopes=self._null_slopes[j],
alpha=alpha
)
# To save the confidence intervals
columns = ['Estimate', 'SE', 'Lower', 'Upper', 'p-value']
self._summary = pd.DataFrame(
np.stack(
[self.estimates, self.ses, self.lcis, self.ucis, self.pvals], axis=1
),
index=np.arange(self.n_cov),
columns=columns,
)
self._summary = self._summary.loc[self._summary['Estimate'].notnull()]
[docs]
def fit(
self,
nrand: int=10000,
alpha: int=0.05,
features: Optional[np.array]=None,
verbose: bool=True,
use_centered_stat: bool=False,
):
"""
Fits the linear model and computes confidence interva
Parameters
----------
nrand : int
Number of randomizations.
alpha : int
Desired nominal Type I error rate.
features : np.array
The list of features to compute CIs for.
Defaults to all features.
verbose : bool
If True, show progress bars.
use_centered_stat : bool
If True, uses a randomization-centered statistic.
This (perhaps pardoxically) leads to slightly higher
standard errors but narrower confidence intervals.
Returns
-------
self : object
"""
if verbose:
print("Computing mosaic residuals.")
#self.compute_mosaic_residuals(verbose=verbose)
self.compute_mosaic_residuals(verbose=verbose)
if verbose:
print("Precomputing confidence intervals.")
self._precompute_confidence_intervals(
nrand=nrand,
features=features,
verbose=verbose,
use_centered_stat=use_centered_stat
)
if verbose:
print("Computing CIs.")
self.compute_cis(alpha=alpha)
return self
@property
def summary(self):
"""
Produces a summary of key inferential results.
"""
return self._summary
### Confidence intervals
def invert_linear_statistic(
stat: float,
slope: float,
null_stats: np.array,
null_slopes: np.array,
alpha: float,
tol: float=1e-8
):
"""
Finds the set of beta such that
|stat - beta slope| >= Q_{alpha}(|null_stats - beta * null_slopes|)
under the assumption that |null_slopes| <= |slope|.
"""
nrand = len(null_stats)
if slope < -tol:
raise ValueError(f"slope={slope} <= 0")
elif np.abs(slope) < tol:
return -np.inf, np.inf
# Adjust absolute values to account for precision errors
to_adjust = np.abs(null_slopes) > slope
null_slopes[to_adjust] = slope * np.sign(null_slopes[to_adjust])
# Equi-tailed threshold
rthresh = int(np.floor(alpha * (nrand + 1))) - 1
if rthresh < 0:
return -np.inf, np.inf
# First set of inflections
inflections0 = np.zeros(nrand) - np.inf
denoms0 = slope - null_slopes
flags0 = denoms0 != 0
inflections0[flags0] = (stat - null_stats)[flags0] / denoms0[flags0]
# Second set of inflections
inflections1 = np.zeros(nrand) + np.inf
denoms1 = slope + null_slopes
flags1 = denoms1 != 0
inflections1[flags1] = (stat + null_stats)[flags1] / denoms1[flags1]
# Combine and sort
inflections = np.sort(np.stack([inflections0, inflections1], axis=1), axis=1)
# Confidence bounds
lci = np.sort(inflections[:, 0])[rthresh]
uci = np.sort(inflections[:, 1])[-(rthresh+1)]
return lci, uci