import numpy as np
import itertools
from typing import Optional, Union
[docs]
def check_valid_tiling(tiles: list) -> None:
"""
Checks validity of a tiling.
Parameters
----------
tiles : list
``tiles[i]`` is a tuple of two np.arrays of indices defining tile ``i``.
I.e, tile ``i`` of an array is ``data[tile[i][0]][:, tile[i][1]]``.
Raises
------
ValueError: if ``tiles`` is an invalid tiling.
"""
## Check disjointness and support
n_obs = int(np.max([np.max(tile[0]) for tile in tiles]) + 1)
n_subjects = int(np.max([np.max(tile[1]) for tile in tiles]) + 1)
counts = np.zeros((n_obs, n_subjects))
for m, (batch, group) in enumerate(tiles):
if np.any(counts[np.ix_(batch, group)] != 0):
raise ValueError(f"Tile {m} is not disjoint from tiles 0-{m-1}.")
counts[np.ix_(batch, group)] += 1
## Check support
if not np.all(counts == 1):
raise ValueError(f"Tiles are disjoint but do not partition [n_obs] x [n_subjects]")
[docs]
class Tiling(list):
"""
A class to store tilings used in mosaic tests.
Parameters
----------
tiles : list
List of tuples of two integer np.arrays. E.g., tile
``i`` of an array is ``data[tile[i][0]][:, tile[i][1]]``.
check_valid : bool
If True, runs :func:`check_valid_tiling` during initialization.
Raises
------
ValueError : If tiles does not define a partition of the cartesian
product of ``{0, ..., n}`` x ``{0, ..., k}`` for some integers
``n`` and ``k``.
Examples
--------
>>> import numpy as np
>>> import mosaicperm as mp
>>>
>>> # a valid tiling of {0, 1} x {0, 1, 2, 3}
>>> tiles0 = [
... (np.array([0]), np.array([0, 2])),
... (np.array([0]), np.array([1, 3])),
... (np.array([1]), np.array([0, 1])),
... (np.array([1]), np.array([2, 3])),
... ]
>>> tiling = mp.tilings.Tiling(tiles0, check_valid=True)
>>> # an invalid tiling due to repeats
>>> tiles1 = [
... (np.array([0, 2]), np.array([0, 1])),
... (np.array([1]), np.array([0, 1])),
... (np.array([0]), np.array([1])),
... ]
>>> tiling = mp.tilings.Tiling(tiles1, check_valid=True)
Traceback (most recent call last):
...
ValueError: Tile 2 is not disjoint from tiles 0-1.
Notes
-----
This class thinly wraps python ``list``.
"""
def __init__(self, tiles: list, check_valid: bool=False):
self.tiles = tiles
if check_valid:
check_valid_tiling(tiles)
super().__init__(tiles)
def __str__(self, *args, **kwargs):
return "Tiling" + super().__str__(*args, **kwargs)
[docs]
def save(self, filename):
"""
Saves the tiling to ``filename.npy``.
"""
# The save format is an integer numpy array.
# 1. The first element is an integer ``nbreaks``
# 2. The next ``nbreaks`` elements specify the breaks of the (batch, group)
# 3. The rest of the elements are a concatenation of the tiling
# Form concatenation and count breaks
comb = []
breaks = [0]
counter = 0
for batch, group in self.tiles:
for obj in [batch, group]:
comb.append(obj)
counter += len(obj)
breaks.append(counter)
# Concatenate
comb = np.concatenate(comb)
breaks = np.array(breaks)
tosave = np.concatenate([np.array([len(breaks)]), breaks, comb]).astype(np.int64)
np.save(filename, tosave)
return None
[docs]
@classmethod
def load(cls, filename, **kwargs):
"""
Loads a tiling from a .npy file.
Parameters
----------
filename : str
File where the tiling is stored.
kwargs : dict
Other kwargs for ``__init__``.
Notes
-----
The .npy file must have been generated using
:meth:`save` or this method will not work.
"""
raw = np.load(filename)
# 1. The length of the array of breaks
breaksize = raw[0]
# 2. The breaks
breaks = raw[1:(breaksize+1)]
objsraw = raw[(breaksize+1):]
# 3. Loop through and load objects
objs = []
for j in range(breaksize - 1):
objs.append(objsraw[breaks[j]:breaks[j+1]])
# 4. Pair the appropriate batches/groups
tiles = []
for jj in range(int(len(objs) / 2)):
tiles.append((objs[2*jj], objs[2*jj+1]))
return cls(tiles, **kwargs)
def even_random_partition(n, k, shuffle=True):
"""
Partitions {0, ..., n-1} into k subsets.
Parameters
----------
n : int
k : int
shuffle : bool
If True, shuffles before partitioning
to produce a uniformly random partition.
"""
if k > n:
raise ValueError(f"k={k} > n={n}")
groups = np.sort(np.arange(n) % k)
if shuffle:
np.random.shuffle(groups)
return [
np.where(groups == ell)[0] for ell in range(k)
]
def _preprocess_partition(partition):
"""
Preprocesses a partition so that its groups are numbered
0 through k, for some k.
"""
vals = np.unique(partition)
output = np.zeros(len(partition), dtype=int)
for i, v in enumerate(vals):
output[partition == v] = i
return output
def coarsify_partition(
partition: np.array,
k: int,
minsize: int=0,
random: bool=True,
):
"""
Produces a random coarsening of ``partition``.
Parameters
----------
partition : array
n-length integer array, so that ``partition[i] = l``
implies that item i is in group l of the partition.
k : int
Desired number of elements of the coarsened partition.
minsize : int
Minimum number of elements in each set in the coarsened partition.
random : bool
If true, produces a randomized coarsening. Otherwise returns
a deterministic result.
Returns
-------
partition : array
n-length integer array containing values 0 through k-1,
so that ``partition[i] = l`` implies that item i is
in group l of the partition.
"""
# note: this copies partition so the results are never in-place
partition = _preprocess_partition(partition)
# number of groups to start
kstart = np.max(partition) + 1
inds = np.arange(kstart)
# precompute sizes of each partition
sizes = np.array([np.sum(partition==k) for k in np.unique(partition)]).astype(float)
sizes[sizes == 0] = np.inf
for _ in range(kstart-1):
# Compute sizes
# sizes = np.array([np.sum(partition==k) for k in range(kstart)]).astype(float)
# sizes[sizes == 0] = np.inf
# Check stopping condition
if np.all(sizes >= minsize) and len(sizes[sizes < np.inf]) <= k:
break
## Random variant
if random:
# Sampling probs
probs = 1 / sizes
# three cases based on the number of groups below minsize
n_too_small = np.sum(sizes < minsize)
if n_too_small == 1:
# Case 1: merge the smallest with another
i0 = np.where(sizes < minsize)[0].item()
probs[i0] = 0; probs /= probs.sum()
i1 = np.random.choice(inds, p=probs, size=1).item()
else:
# Case 2: only merge groups below minsize
if n_too_small > 1:
probs[sizes >= minsize] = 0
# Case 3: probs are undajusted (no groups below minsize)
probs /= probs.sum()
i0, i1 = np.random.choice(inds, p=probs, size=2, replace=False)
else:
i0, i1 = np.argpartition(sizes, 2)[:2]
# Sample and merge
sizes[i0] += np.sum(partition == i1)
sizes[i1] = np.inf
partition[partition == i1] = i0
return _preprocess_partition(partition)
[docs]
def random_tiles(
n_obs: int,
n_subjects: int,
nbatches: int,
ngroups: int,
seed: int=123,
) -> Tiling:
"""
Partitions outcomes into ``nbatches`` x ``ngroups`` random tiles.
Parameters
----------
n_obs : int
Number of observations/timepoints.
n_subjects : int
Number of subjects/assets.
nbatches : int
Number of batches to split observations into.
ngroups : int
Number of groups to split subjects into.
seed : int
Random seed.
Returns
-------
tiles : mosaicperm.tilings.Tiling
The default tiling as a :class:`.Tiling` object.
"""
state = np.random.get_state()
np.random.seed(seed)
# partition along timepoints
batches = even_random_partition(n=n_obs, k=nbatches, shuffle=False)
# partition along subjects
tiles = []
for batch in batches:
groups = even_random_partition(n=n_subjects, k=ngroups, shuffle=True)
tiles.extend([(batch, group) for group in groups])
# return tiles; also reset random state
np.random.set_state(state)
return Tiling(tiles)
[docs]
def default_factor_tiles(
exposures: np.array,
n_obs: Optional[int]=None,
batches: Optional[list[np.array]]=None,
max_batchsize: Optional[int]=10,
ngroups: Optional[int]=None,
clusters: Optional[np.array]=None,
seed: int=123,
) -> Tiling:
"""
Computes default tiling for factor models.
Parameters
----------
exposures : np.array
(``n_obs``, ``n_subjects``, ``n_factors``) array of factor exposures
OR
(``n_subjects``, ``n_factors``) array of factor exposures if
the exposures do not change with time.
n_obs : int
Number of timepoints. Optional unless exposures is 2D.
batches : int
Optional list of np.arrays partitioning {0, 1, ..., ``n_obs-1``}.
Otherwise will be created automatically (recommended).
max_batchsize : int
Maximum length (in time) of a tile.
ngroups : int
Number of groups to partition the subjects into.
Default is the max integer such that each group contains
5 times as many subjects as there are factors.
clusters : np.array
An ``n_subjects``-length array of cluster_ids, so
``clusters[i] = k`` implies subject i is in the kth cluster.
If clusters is provided, the output will preserve the cluster
structure.
seed: int
Random seed.
Notes
-----
One of exposures or batches must be provided.
Returns
-------
tiles : mosaicperm.tilings.Tiling
The default tiling as a :class:`.Tiling` object.
"""
state = np.random.get_state()
np.random.seed(seed)
# Choose batches
if len(exposures.shape) == 3:
n_obs, n_subjects, n_factors = exposures.shape
if batches is None:
batches = _exposures_to_batches(exposures, max_batchsize=max_batchsize)
elif len(exposures.shape) == 2:
# Defaults for dimensionality
if n_obs is None:
raise ValueError(f"n_obs must be provided if exposures is a 2D array.")
n_subjects, n_factors = exposures.shape
# Create batches
if batches is None:
nbatches = int(np.ceil(n_obs / max_batchsize))
batches = even_random_partition(n=n_obs, k=nbatches, shuffle=False)
else:
raise ValueError(f"exposures should be a 2D or 3D array but has shape {exposures.shape}")
# Partition subjects and construct tiles
if ngroups is None:
ngroups = max(2, int(np.floor(n_subjects / (5*n_factors))))
tiles = []
for batch in batches:
# Create groups
if clusters is None:
groups = even_random_partition(n=n_subjects, k=ngroups, shuffle=True)
else:
coarsened = coarsify_partition(clusters, k=ngroups, minsize=2*n_factors)
groups = [np.where(coarsened == k)[0] for k in np.unique(coarsened)]
# Add to tiles
tiles.extend([(batch.astype(int), group.astype(int)) for group in groups])
# return; also reset state
np.random.set_state(state)
return Tiling(tiles)
def _exposures_to_batches(exposures, max_batchsize=10):
"""
Parameters
----------
exposures : np.array
Array of factor exposures of shape n_obs x n_subects x n_factors.
max_batchsize : int
Maximum batchsize
Returns
-------
batches : list of np.arrays
A list of batches which will define the tiles.
"""
# starts and ends of naive batches
T = exposures.shape[0]
changes = ~np.all(exposures[0:-1] == exposures[1:], axis=-1).all(axis=-1)
ends = np.concatenate([np.where(changes)[0] + 1, [T]])
starts = np.concatenate([[0], ends[:-1]])
lengths = ends - starts
nstarts = len(starts)
# Loop through and create batches
batches = []
i = 0
while i < nstarts:
# Case 1: naive batchsize is 1
if i < nstarts - 1 and lengths[i] == 1:
# Case 1(a): next length is also 1
if lengths[i+1] == 1:
batches.append(np.arange(starts[i], ends[i+1]))
i += 2
# Case 1(b): next length is > 2, so we take one from it
elif lengths[i+1] > 2:
batches.append(np.arange(starts[i], ends[i]+1))
starts[i+1] += 1
i += 1
# Case 1(c): next length is == 2 (we give up and use a batchsize of 1)
else:
batches.append(np.array([starts[i]]))
i += 1
# Case 2: naive batchsize is too large
elif lengths[i] > max_batchsize:
nsplit = int(np.ceil(lengths[i] / max_batchsize))
new_starts = np.around(np.linspace(starts[i], ends[i], nsplit+1))
for j in range(nsplit):
batches.append(np.arange(new_starts[j], new_starts[j+1]))
i += 1
# Case 3: naive batchsize is the right size
else:
batches.append(np.arange(starts[i], ends[i]))
i += 1
return [batch.astype(int) for batch in batches]
def default_panel_tiles(
n_obs: int,
n_subjects: int,
n_cov: int,
clusters: Optional[np.array]=None,
ngroups: Optional[int]=None,
ntiles: Optional[int]=None,
seed: int=123,
):
"""
Creates the default tiling for panel data.
Parameters
----------
n_obs : int
Number of observations per subject / timepoints.
n_subjects : int
Number of subjects in the panel data.
n_cov : int
Number of covariates
clusters : np.array
An ``n_subjects``-length array of cluster_ids, so
``clusters[i] = k`` implies subject i is in the kth cluster.
If clusters is provided, the output will preserve the cluster
structure.
ngroups : int
Number of groups to split the subjects into.
ntiles : int
Total number of tiles to use.
Returns
-------
tiles : mosaicperm.tilings.Tiling
The default tiling as a :class:`.Tiling` object.
"""
state = np.random.get_state()
np.random.seed(seed)
# Goal: total number of tiles
if ntiles is None:
ntiles = min(int(n_obs * n_subjects / (4*n_cov)), 10)
# Create groups
if ngroups is None:
ngroups = min(ntiles, n_subjects)
if clusters is None:
groups = even_random_partition(n=n_subjects, k=ngroups)
else:
coarsened = coarsify_partition(
clusters, k=ngroups, minsize=int(np.ceil(n_obs / (3*n_cov)))
)
groups = [np.where(coarsened == k)[0] for k in np.unique(coarsened)]
# Create batches
nbatches = min(int(np.floor(n_obs / 2)), int(np.ceil(ntiles / len(groups))))
batches = even_random_partition(
n=n_obs, k=nbatches, shuffle=False
)
tiles = list(itertools.product(batches, groups))
# reset state and return
np.random.set_state(state)
return Tiling(tiles)
if __name__ == "__main__":
import doctest
doctest.testmod()