import abc
import numpy as np
import pandas as pd
from scipy import stats
from . import utilities
from typing import Optional
[docs]
def compute_adaptive_pval(
statistic: np.array,
null_statistics: np.array,
) -> tuple:
"""
Computes an adaptive p-value based on one or more test statistics.
Parameters
----------
statistic : float or np.array
Either a float (one statistic) or 1D array (many test statistics).
null_statistics : np.array
(``nrand``, ``d``)-shaped array of null statistics computed based on
null permutations, where ``d`` is the dimension of ``statistic``.
Returns
-------
pval : float
The adaptive p-value.
adapt_statistic : float
The adaptive statistic value.
null_adapt_statistics : np.array
nrand-shaped array of null adaptive statistic values.
"""
# Dimensions
nrand, d = null_statistics.shape
if not utilities.haslength(statistic):
statistic = np.array([statistic])
# concatenate and take mean/sd
all_stats = np.concatenate(
[statistic.reshape(1, d), null_statistics], axis=0
)
mu0, sd0 = all_stats.mean(axis=0), all_stats.std(axis=0)
sd0[sd0 == 0] = 1
# t-statistic and take maximum
tstats = (all_stats - mu0) / sd0
adapt_stats = np.max(tstats, axis=1)
# return pval
pval = np.sum(adapt_stats[0] <= adapt_stats) / (nrand + 1)
return pval, adapt_stats[0], adapt_stats[1:]
def _preprocess_data(outcomes: np.array, covariates: np.array) -> tuple:
"""
Parameters
----------
outcomes : np.array
n_obs x n_subjects array of outcome data
covariates : np.array
Either n_obs x n_subjects x n_cov array of covariates/exposures
or a 2D n_subjects x n_cov array if the values do not change with time.
Returns
-------
outcomes : np.array
processed outcome data
covariates : np.array
processed covariate data
missing_pattern : np.array
boolean array showing where outcomes were previously missing.
"""
### 1. Convert from pandas to numpy
if isinstance(outcomes, pd.DataFrame):
outcomes = outcomes.values
if isinstance(covariates, pd.DataFrame):
covariates = covariates.values
### 2. Remove nans
missing_pattern = np.isnan(outcomes)
outcomes = outcomes.copy()
covariates = covariates.copy()
n_obs = len(outcomes)
if np.any(np.isnan(outcomes)):
# in this case, must make exposures 3-dimensional since the nan
# pattern will causes the exposures to change with time
if len(covariates.shape) == 2:
covariates = np.stack(
[covariates for _ in range(n_obs)], axis=0
)
# fill with zeros (preserves validity)
covariates[missing_pattern] = 0
outcomes[missing_pattern] = 0
# Fill missing additional covariates with zero
covariates[np.isnan(covariates)] = 0
### 3. remove exposures/covariates which are all zero
to_keep = np.any(covariates != 0, axis=tuple(range(covariates.ndim-1)))
covariates = covariates[..., to_keep]
return outcomes, covariates, missing_pattern
def _create_tseries_plot(
xvals: np.array, # x-axix
ystat: np.array, # statistic values
pvals: np.array, # p-values
zapprx: np.array, # approximate Z-statistics
ynull_mean: Optional[np.array]=None, # null means
yquant: Optional[np.array]=None, # null quantiles
alpha: float=0.05, # threshold
show_plot: bool=True,
**figure_kwargs
):
"""
Private function which is used to create tseries plots.
"""
import matplotlib.pyplot as plt
figure_kwargs['figsize'] = figure_kwargs.get("figsize", (9, 7)) # default
# Create subplots
fig = plt.figure(**figure_kwargs)
ax0 = plt.subplot(2, 1, 1)
ax1 = plt.subplot(2, 2, 3)
ax2 = plt.subplot(2, 2, 4)
axes = [ax0, ax1, ax2]
## First plot: statistic value
# Plot statistic value
axes[0].plot(xvals, ystat, color='cornflowerblue', label='Mosaic test statistic')
axes[0].scatter(xvals, ystat, color='cornflowerblue')
# Possibly add null mean
if ynull_mean is not None:
axes[0].plot(xvals, ynull_mean, color='orangered', label='Null mean')
axes[0].scatter(xvals, ynull_mean, color='orangered')
# Possibly add quantile
if yquant is not None:
axes[0].plot(
xvals,
yquant,
color='orangered',
label=rf'Null quantile, $\alpha$={alpha}',
linestyle='dotted',
)
axes[0].scatter(xvals, yquant, color='orangered')
axes[0].set(title="Statistic value and threshold")
else:
axes[0].set(title='Statistic value')
axes[0].set(xlabel='Time', ylabel='Statistic value')
axes[0].legend()
## Subplot 1: p-value
zvals = stats.norm.ppf(1-pvals)
if np.any(zvals != -np.inf):
zvals[zvals == - np.inf] = zvals[zvals > - np.inf].min()
axes[1].plot(xvals, zvals, color='blue', label='Observed')
axes[1].scatter(xvals, zvals, color='blue')
axes[1].axhline(
stats.norm.ppf(1-alpha),
color='black',
linestyle='dotted',
label=rf'Threshold ($\alpha$={alpha})'
)
axes[1].set(xlabel='Time', ylabel=r'Z-statistic: $\Phi(1-p)$')
axes[1].set(title="Exact z-statistic")
axes[1].legend()
axes[1].set_ylim(min(0, zvals.min()-zvals.std()/20))
## Subplot 2: approximate z-statistic
axes[2].plot(xvals, zapprx, color='blue', label='Observed')
axes[2].scatter(xvals, zapprx, color='blue')
axes[2].axhline(
stats.norm.ppf(1-alpha),
color='black',
linestyle='dotted',
label=rf'Threshold ($\alpha$={alpha})'
)
axes[2].set(xlabel='Time', ylabel=r'(S - E[S]) / sd(S)')
axes[2].set(title="Approximate z-statistic")
axes[2].set_ylim(min(0, zapprx.min()-zapprx.std()/20))
axes[2].legend()
# Adjust
plt.subplots_adjust(hspace=0.2)
#return
if show_plot:
plt.show()
else:
return fig, axes
[docs]
class MosaicPermutationTest(abc.ABC):
"""
Generic class meant for subclassing.
"""
def __init__(self):
self._precompute_permutation_helpers()
def _precompute_permutation_helpers(self):
"""
Precomputes some matrices to ensure faster tile permutations.
"""
# maps i, j to the tile number
self._tilenums = np.empty(self.outcomes.shape[0:2], dtype=int)
# sum of batch lengths
self._batchlen_sum = np.sum([len(batch) for batch, _ in self.tiles])
# self._batchinds[i, j] maps coordinate (i, j) to a unique coordinate
# from 0 to self.batchlen_sum depending only on i and the tile of i,j
self._batchinds = np.zeros(self.outcomes.shape[0:2], dtype=int)
# loop through and fill
counter = 0
for i, (batch, group) in enumerate(self.tiles):
br = batch.reshape(-1, 1)
gr = group.reshape(1, -1)
self._tilenums[br, gr] = i
self._batchinds[br, gr] = np.arange(counter, counter+len(batch)).reshape(-1, 1)
counter += len(batch)
[docs]
@abc.abstractmethod
def compute_mosaic_residuals(self):
"""
Computes mosaic-style residual estimates.
Returns
-------
residuals : np.array
(``n_obs``, ``n_subjects``)-shaped array of residual estimates.
"""
raise NotImplementedError()
[docs]
def permute_residuals(
self,
method: str='argsort'
) -> None:
"""
Permutes residuals within tiles. Result is stored in ``self._rtilde``.
Parameters
----------
method : str
- 'ix' is naive and has complexity O(``n_subjects`` * ``n_obs``) but uses
a for loop and can be slow in practice.
- 'argsort' has complexity O(``n_obs`` log(``n_obs``) * ``n_subjects``) but
is often faster in practice.
Returns
-------
None : NoneType
"""
# Compute null variant
if method == 'argsort':
self._rtilde = np.take_along_axis(
self.residuals,
np.argsort(
self._tilenums + np.random.uniform(size=self._batchlen_sum)[self._batchinds],
axis=0,
),
axis=0
)
else:
for (batch, group) in self.tiles:
# Permute within tile
# this is faster than using np.ix_
br_shuffle = np.random.permutation(batch).reshape(-1, 1)
br = batch.reshape(-1, 1)
gr = group.reshape(1, -1)
self._rtilde[br, gr] = self.residuals[br_shuffle, gr]
def _compute_p_value(self, nrand: int, verbose: bool) -> float:
"""
Produces the underlying mosaic p-value in :meth:`fit`.
Parameters
----------
nrand : int
Number of randomizations to perform.
verbose : bool
If True (default), displays a progress bar.
Returns
-------
pval : float
"""
# Compute statistic and infer its dimension (for adaptive statistics)
self.statistic = self.test_stat(self.residuals, **self.tstat_kwargs)
if utilities.haslength(self.statistic):
d = len(self.statistic)
else:
d = 1
# compute null statistics
self.null_statistics = np.zeros((nrand, d))
for r in utilities.vrange(nrand, verbose=verbose):
# Compute null statistic
self.permute_residuals()
self.null_statistics[r] = self.test_stat(self._rtilde, **self.tstat_kwargs)
# compute p-value
out = compute_adaptive_pval(
self.statistic, self.null_statistics
)
self.pval, self.adapt_stat, self.null_adapt_stats = out
# Compute approximate z-statistic
# Use adaptive vs. original statistic
if d == 1:
nstats = self.null_statistics
stat = self.statistic
else:
stat = self.adapt_stat
nstats = self.null_adapt_stats
comb = np.concatenate([[stat], nstats.flatten()])
# Handle when SD == 0
if comb.std() > 0:
self.apprx_zstat = (stat - comb.mean()) / comb.std()
else:
self.apprx_zstat = 0
# return p-value
return self.pval
[docs]
def fit(
self,
nrand: int=500,
verbose: bool=True,
):
"""
Runs the mosaic permutation test.
Parameters
----------
nrand : int
Number of randomizations to perform.
verbose : bool
If True (default), displays a progress bar.
Returns
-------
self : object
"""
self.compute_mosaic_residuals()
self._compute_p_value(nrand=nrand, verbose=verbose)
return self
[docs]
def summary(self, coordinate_index=None):
"""
Produces an output summarizing the test.
Parameters
----------
coordinate_index : pd.Index or np.array
Array of index information for the dimensions of the test statistic,
when using a multidimensional test statistic. Otherwise ignored.
Returns
-------
summary : pd.DataFrame
dataframe of summary information.
"""
fields = ['statistic', 'null_statistic_mean', 'p_value']
d = self.null_statistics.shape[1]
if d == 1:
return pd.Series(
[self.statistic, self.null_statistics.mean(), self.pval],
index=fields
)
else:
# Index
if coordinate_index is None:
coordinate_index = [f"Coordinate {i}" for i in range(d)]
# Marginal p-values
marg_pvals = 1 + np.sum(self.statistic <= self.null_statistics, axis=0).astype(float)
marg_pvals /= float(1 + self.null_statistics.shape[0])
# Construct dfs
marg_out = pd.DataFrame(
np.stack(
[self.statistic, self.null_statistics.mean(axis=0), marg_pvals],
axis=1
),
index=coordinate_index,
columns=fields
)
adapt_out = pd.DataFrame(
[[self.adapt_stat, self.null_adapt_stats.mean(), self.pval]],
index=['Adaptive z-stat'],
columns=fields,
)
out = pd.concat([adapt_out, marg_out], axis='index')
out.index.name = 'Statistic type'
return out
[docs]
def summary_plot(self, show_plot=False, **subplots_kwargs):
"""
Produces a plot summarizing the results of the test.
Parameters
----------
show_plot : bool
If true, shows the plot.
**subplots_kwargs : dict
kwargs for ``plt.subplots()``, e.g., ``figsize``.
Returns
-------
fig : :class:`matplotlib.Figure`
The figure from the ``plt.subplots()`` call.
ax: array of :class:`matplotlib.Axes`
Axes from the ``plt.subplots()`` call.
"""
import matplotlib.pyplot as plt
import seaborn as sns
fig, ax = plt.subplots(**subplots_kwargs)
d = self.null_statistics.shape[1]
if d == 1:
nstats = self.null_statistics
stat = self.statistic
else:
stat = self.adapt_stat
nstats = self.null_adapt_stats
sns.histplot(
nstats,
label='Null statistics',
color='cornflowerblue',
alpha=0.5,
ax=ax,
)
zstat = str(np.around(self.apprx_zstat, 2))
if self.pval < 1e-3:
pval = "{:e}".format(self.pval)
else:
pval = str(np.around(self.pval, 3))
# formatting for p-value
ax.axvline(
stat,
color='red',
label=f'Observed statistic\n(Apprx. Z={zstat}, pval={pval})',
)
plt.legend()
if show_plot:
plt.show()
return fig, ax
def _compute_apprx_zstat_tseries(self):
"""
Minor helper function used within _compute_p_value_tseries.
"""
### Compute z-statistics
if self.stats_tseries.shape[1] == 1:
ystat = self.stats_tseries[:, 0]
ynulls = self.null_tseries[:, :, 0]
else:
ystat = self.adapt_stats_tseries
ynulls = self.null_adapt_tseries
# Concatenate for exact validity
ycomb = np.concatenate([ystat.reshape(-1, 1), ynulls], axis=1)
ses = ycomb.std(axis=1)
# prevent zero div errors; note if ses == 0, the zstat == 0
zero_flags = ses == 0
ses[zero_flags] = 1
# Form z-stats
self.apprx_zstat_tseries = (ystat - ycomb.mean(axis=1)) / ses
self.apprx_zstat_tseries[zero_flags] = 0
def _compute_p_value_tseries(
self, nrand: int, verbose: bool, n_timepoints: int, window: Optional[int],
) -> None:
"""
Computes the underlying p-values in :meth:`fit_tseries`.
Parameters
----------
nrand : int
Number of randomizations to perform.
verbose : bool
If True (default), displays a progress bar.
n_timepoints : int
Computes p-values at ``n_timepoints`` evenly spaced timepoints.
window : int
Window size.
Returns
-------
None : NoneType
"""
n_obs = self.outcomes.shape[0]
nvals = min(n_obs, n_timepoints)
# starts/ends of windows to compute test statistic
if window is None:
self.ends = np.round(np.linspace(0, n_obs, nvals+1)[1:]).astype(int)
self.ends = np.sort(np.unique(self.ends[self.ends > 0]))
self.starts = np.zeros(len(self.ends)).astype(int)
else:
self.ends = np.round(np.linspace(window, n_obs, nvals)).astype(int)
self.ends = np.sort(np.unique(self.ends[self.ends > 0]))
self.starts = np.maximum(0, self.ends - window).astype(int)
nvals = len(self.ends)
# compute tseries statistic
self.stats_tseries = np.stack(
[
self.test_stat(self.residuals[start:end], **self.tstat_kwargs)
for start, end in zip(self.starts, self.ends)
],
axis=0
).reshape(nvals, -1)
d = self.stats_tseries.shape[1]
# compute null statistics
self.null_tseries = np.zeros((nvals, nrand, d))
for r in utilities.vrange(nrand, verbose=verbose):
self.permute_residuals()
self.null_tseries[:, r] = np.stack(
[self.test_stat(self._rtilde[start:end], **self.tstat_kwargs)
for start, end in zip(self.starts, self.ends)],
axis=0
).reshape(nvals, -1)
# Compute p-values
self.pval_tseries = np.zeros(nvals)
self.adapt_stats_tseries = np.zeros(nvals)
self.null_adapt_tseries = np.zeros((nvals, nrand))
for i in range(nvals):
out = compute_adaptive_pval(
self.stats_tseries[i], self.null_tseries[i]
)
self.pval_tseries[i] = out[0]
self.adapt_stats_tseries[i] = out[1]
self.null_adapt_tseries[i] = out[2]
### Compute z-statistics
self._compute_apprx_zstat_tseries()
[docs]
def fit_tseries(
self,
nrand: int=500,
verbose: bool=True,
n_timepoints: int=20,
window: Optional[int]=None,
**kwargs,
):
"""
Runs mosaic permutation tests for various windows of the data,
producing a time series of p-values.
Parameters
----------
nrand : int
Number of randomizations to perform.
verbose : bool
If True (default), displays a progress bar.
n_timepoints : int
Computes p-values at ``n_timepoints`` evenly spaced
timepoints. Default: 20.
window : int
Window size. Default: None (use all available data).
kwargs : dict
Optional kwargs for private _compute_p_value_tseries() method.
Returns
-------
self : object
"""
self.compute_mosaic_residuals()
self._compute_p_value_tseries(
nrand=nrand, verbose=verbose, n_timepoints=n_timepoints, window=window, **kwargs
)
return self
[docs]
def plot_tseries(
self,
time_index=None,
alpha: float=0.05,
show_plot: bool=True,
**figure_kwargs
) -> None:
"""
Plots the results of :meth:`fit_tseries`.
Parameters
----------
time_index : np.array or pd.Index
n_obs-length index information (e.g. datetimes) for each observation.
alpha : float
Nominal level.
show_plot : bool
If True, runs ``matplotlib.pyplot.show()``.
**figure_kwargs : dict
kwargs for ``plt.figure()``, e.g., ``figsize``.
Returns
-------
None : NoneType
Returns None if show_plot=True. Else returns the following.
fig : :class:`matplotlib.Figure`
The figure from the ``plt.subplots()`` call.
ax: array of :class:`matplotlib.Axes`
Axes from the ``plt.subplots()`` call.
"""
# Create x-values
if time_index is None:
xvals = self.ends-1
else:
xvals = time_index[self.ends-1]
# statistic value, null mean, and quantile
if self.stats_tseries.shape[1] == 1:
ystat = self.stats_tseries[:, 0]
ynulls = self.null_tseries[:, :, 0]
else:
ystat = self.adapt_stats_tseries
ynulls = self.null_adapt_tseries
# Compute quantile
nrand = ynulls.shape[1]
ynull_mean = ynulls.mean(axis=1)
yquant = np.concatenate([ystat.reshape(-1, 1), ynulls], axis=1) # shape: nvals x (nrand + 1)
yquant = np.sort(yquant, axis=1)
rank = int(nrand + 1 - np.floor(alpha * (nrand + 1)))
yquant = yquant[:, rank-1] # the -1 accounts for zero indexing
return _create_tseries_plot(
xvals=xvals,
ystat=ystat,
pvals=self.pval_tseries,
zapprx=self.apprx_zstat_tseries,
ynull_mean=ynull_mean,
yquant=yquant,
alpha=alpha,
show_plot=show_plot,
**figure_kwargs
)
### Methods of aggregation
def _combine_pvals(pvals: np.array, how_combine: str='mean'):
how_combine = str(how_combine).lower()
if how_combine in ['mean', 'average']:
return np.minimum(1, 2 * np.mean(pvals, axis=0))
elif how_combine == 'median':
return np.minimum(1, 2 * np.median(pvals, axis=0))
elif how_combine == 'min':
return np.minimum(1, len(pvals) * np.min(pvals, axis=0))
else:
raise ValueError(f"Unrecognized how_combine={how_combine}.")
[docs]
def combine_mosaic_tests(
mosaic_objects: list[MosaicPermutationTest],
how_combine_pvals='median',
):
"""
Combines results across :class:`MosaicPermutationTest` objects.
Parameters
----------
mosaic_objects : list
List of objects inheriting from :class:`MosaicPermutationTest`,
after calling ``.fit()`` for each object.
how_combine_pvals : str
How to combine the p-values. One of several options:
- 'median' : twice the median
- 'mean' : twice the mean
- 'min' : len(mosaic_objects) times the minimum (Bonferroni correction).
Returns
-------
summary : pd.Series
Series of key inferential results.
Notes
-----
This function combines results when using the same set
of test statistics and different tilings. To combine results
using the same tiling and different test statistics,
just use a vector-valued test statistic.
"""
mpts = mosaic_objects
try:
pval = _combine_pvals(
pvals=np.array([mpt.pval for mpt in mpts]),
how_combine=how_combine_pvals,
)
apprx_zstat = np.mean([mpt.apprx_zstat for mpt in mpts])
# decide whether or not to use the adaptive variant
d = mpts[0].null_statistics.shape[1]
if d == 1:
stat = np.mean([mpt.statistic for mpt in mpts])
null_stat = np.mean([np.mean(mpt.null_statistics) for mpt in mpts])
else:
stat = np.mean([mpt.adapt_stat for mpt in mpts])
null_stat = np.mean([np.mean(mpt.null_adapt_stats) for mpt in mpts])
# Return summary
return pd.Series(
np.array([stat, null_stat, apprx_zstat, pval]),
index=['statistic', 'null_statistic_mean', 'apprx_zstat', 'p-value'],
)
except AttributeError as e:
raise AttributeError(f"combine_mosaic_tests raised AttributeError {e}---try calling .fit()?")
[docs]
def combine_mosaic_tests_tseries(
mosaic_objects,
how_combine_pvals='median',
plot: bool=True,
alpha: float=0.05,
**figure_kwargs,
):
"""
Combines time-series results across :class:`MosaicPermutationTest` objects.
Parameters
----------
mosaic_objects : list
List of objects inheriting from :class:`MosaicPermutationTest`,
after calling ``.fit_tseries()`` for each object.
how_combine_pvals : str
How to combine the p-values. One of several options:
- 'median' : twice the median
- 'mean' : twice the mean
- 'min' : len(mosaic_objects) times the minimum (Bonferroni correction).
plot : bool
If True, displays a plot of results similar to the
``MosaicPermutationTest.plot_tseries()`` method.
alpha : float
Nominal level for error control.
**figure_kwargs : dict
kwargs for ``plt.figure()``, e.g., ``figsize``.
Returns
-------
df : pd.DataFrame
DataFrame of key inferential results.
Notes
-----
This function combines results when using the same set
of test statistics and different tilings. To combine results
using the same tiling and different test statistics,
just use a vector-valued test statistic.
"""
mpts = mosaic_objects
try:
# Combine p-values and approximate z-statistics
pvals = _combine_pvals(
pvals=np.stack([mpt.pval_tseries for mpt in mpts], axis=0),
how_combine=how_combine_pvals,
)
apprx_zstats = np.stack(
[mpt.apprx_zstat_tseries for mpt in mpts], axis=0
).mean(axis=0)
# Combine statistics and null statistics
if mpts[0].stats_tseries.shape[1] == 1:
stat_tseries = np.stack(
[mpt.stats_tseries.flatten() for mpt in mpts], axis=0
).mean(axis=0)
null_means = np.stack(
[mpt.null_tseries[:, :, 0] for mpt in mpts], axis=0
).mean(axis=0).mean(axis=1)
else:
stat_tseries = np.stack(
[mpt.adapt_stats_tseries for mpt in mpts], axis=0
).mean(axis=0)
null_means = np.stack(
[mpt.null_adapt_tseries for mpt in mpts], axis=0
).mean(axis=0).mean(axis=1)
# Dataframe to return
df = pd.DataFrame(
np.stack(
[
mpts[0].ends, stat_tseries, null_means, apprx_zstats, pvals
],
axis=1
),
columns=['Window end', 'statistic', 'null_statistic_mean', 'apprx_zstat', 'p-value'],
)
# Possibly plot
if plot:
show_plot = figure_kwargs.pop("show_plot", True)
_create_tseries_plot(
xvals=mpts[0].ends,
ystat=stat_tseries,
pvals=pvals,
zapprx=apprx_zstats,
ynull_mean=null_means,
alpha=alpha,
show_plot=show_plot,
**figure_kwargs,
)
# Return
return df
except AttributeError as e:
raise AttributeError(
"combine_mosaic_tests_tseries raised AttributeError---try calling .fit_tseries()?"
)