Source code for getdist.gaussian_mixtures

import copy

import numpy as np

from getdist.densities import Density1D, Density2D
from getdist.mcsamples import MCSamples
from getdist.paramnames import ParamNames


def make_2D_Cov(sigmax, sigmay, corr):
    return np.array([[sigmax**2, sigmax * sigmay * corr], [sigmax * sigmay * corr, sigmay**2]])


[docs] class MixtureND: """ Gaussian mixture model with optional boundary ranges. Includes functions for generating samples and projecting. MixtureND instances can be used to plot theoretical smooth contours instead of samples (e.g. for Fisher forecasts). For a simple Gaussian, can use GaussianND inherited class. """ def __init__(self, means, covs, weights=None, lims=None, names=None, label="", labels=None): """ :param means: list of y for each Gaussian in the mixture :param covs: list of covariances for the Gaussians in the mixture :param weights: optional weight for each component (defaults to equal weight) :param lims: optional list of hard limits for each parameter, [[x1min,x1max], [x2min,x2max]]; use None for no limit :param names: list of names (strings) for each parameter. If not set, set to "param1", "param2"... :param label: name for labelling this mixture :param labels: list of latex labels for each parameter. If not set, defaults to p_{1}, p_{2}... """ self.means = np.asarray(means) self.dim = self.means.shape[1] self.covs = [np.array(cov) for cov in covs] self.invcovs = [np.linalg.inv(cov) for cov in self.covs] if weights is None: weights = [1.0 / len(means)] * len(means) self.weights = np.array(weights, dtype=np.float64) if np.sum(self.weights) <= 0: raise ValueError("Weight <= 0 in MixtureND") self.weights /= np.sum(weights) self.norms = (2 * np.pi) ** (0.5 * self.dim) * np.array([np.sqrt(np.linalg.det(cov)) for cov in self.covs]) self.lims = lims self.paramNames = ParamNames(names=names, default=self.dim, labels=labels) self.names = self.paramNames.list() self.label = label self.total_mean = np.atleast_1d(np.dot(self.weights, self.means)) self.total_cov = np.zeros((self.dim, self.dim)) for mean, cov, weight, totmean in zip(self.means, self.covs, self.weights, self.total_mean): self.total_cov += weight * (cov + np.outer(mean - totmean, mean - totmean))
[docs] def sim(self, size, random_state=None): """ Generate an array of independent samples :param size: number of samples :param random_state: random number Generator or seed :return: 2D array of sample values """ tot = 0 res = [] block = None random_state = np.random.default_rng(random_state) while True: for num, mean, cov in zip(random_state.multinomial(block or size, self.weights), self.means, self.covs): if num > 0: v = random_state.multivariate_normal(mean, cov, size=num) if self.lims is not None: for i, (mn, mx) in enumerate(self.lims): if mn is not None: v = v[v[:, i] >= mn] if mx is not None: v = v[v[:, i] <= mx] tot += v.shape[0] res.append(v) if tot >= size: break if block is None: block = min(max(size, 100000), int(1.1 * (size * (size - tot))) // max(tot, 1) + 1) samples = np.vstack(res) if len(res) > 1: samples = random_state.permutation(samples) if tot != size: samples = samples[: -(tot - size), :] return samples
[docs] def MCSamples(self, size, names=None, logLikes=False, random_state=None, **kwargs): """ Gets a set of independent samples from the mixture as a :class:`.mcsamples.MCSamples` object ready for plotting etc. :param size: number of samples :param names: set to override existing names :param logLikes: if True set the sample -log(posterior) values from the pdf, if false, don't store loglikes :param random_state: random seed or Generator :return: a new :class:`.mcsamples.MCSamples` instance """ samples = self.sim(size, random_state=random_state) if logLikes: loglikes = -np.log(self.pdf(samples)) else: loglikes = None return MCSamples( samples=samples, loglikes=loglikes, paramNamesFile=copy.deepcopy(self.paramNames), names=names, ranges=self.lims, **kwargs, )
def autoRanges(self, sigma_max=4, lims=None): res = [] if lims is None: lims = self.lims if lims is None: lims = [(None, None) for _ in range(self.dim)] for i, (mn, mx) in enumerate(lims): covmin = None covmax = None if mn is None or mx is None: for mean, cov in zip(self.means, self.covs): sigma = np.sqrt(cov[i, i]) xmin, xmax = mean[i] - sigma_max * sigma, mean[i] + sigma_max * sigma if mn is not None: xmax = max(xmax, mn + sigma_max * sigma) if mx is not None: xmin = min(xmin, mx - sigma_max * sigma) covmin = min(xmin, covmin) if covmin is not None else xmin covmax = max(xmax, covmax) if covmax is not None else xmax res.append((covmin if mn is None else mn, covmax if mx is None else mx)) return res
[docs] def pdf(self, x): """ Calculate the PDF. Note this assumes x is within the boundaries (does not return zero outside) Result is also only normalized if no boundaries. :param x: array of parameter values to evaluate at :return: pdf at x """ tot = None x = np.asarray(x) for i, (mean, icov, weight, norm) in enumerate(zip(self.means, self.invcovs, self.weights, self.norms)): dx = x - mean if len(x.shape) == 1: res = np.exp(-icov.dot(dx).dot(dx) / 2) / norm else: res = np.exp(-np.einsum("ik,km,im->i", dx, icov, dx) / 2) / norm if not i: tot = res * weight else: tot += res * weight return tot
[docs] def pdf_marged(self, index, x, no_limit_marge=False): """ Calculate the 1D marginalized PDF. Only works if no other parameter limits are marginalized :param index: index or name of parameter :param x: value to evaluate PDF at :param no_limit_marge: if true don't raise an error if mixture has limits :return: marginalized 1D pdf at x """ if isinstance(index, str): index = self.names.index(index) if not no_limit_marge: self.checkNoLimits([index]) tot = None for i, (mean, cov, weight) in enumerate(zip(self.means, self.covs, self.weights)): dx = x - mean[index] var = cov[index, index] res = np.exp(-(dx**2) / var / 2) / np.sqrt(2 * np.pi * var) if not i: tot = res * weight else: tot += res * weight return tot
[docs] def density1D(self, index=0, num_points=1024, sigma_max=4, no_limit_marge=False): """ Get 1D marginalized density. Only works if no hard limits in other parameters. :param index: parameter name or index :param num_points: number of grid points to evaluate PDF :param sigma_max: maximum number of standard deviations away from y to include in computed range :param no_limit_marge: if true don't raise error if limits on other parameters :return: :class:`~.densities.Density1D` instance """ if isinstance(index, str): index = self.names.index(index) if not no_limit_marge: self.checkNoLimits([index]) mn, mx = self.autoRanges(sigma_max)[index] x = np.linspace(mn, mx, num_points) like = self.pdf_marged(index, x) return Density1D(x, like)
[docs] def density2D(self, params=None, num_points=1024, xmin=None, xmax=None, ymin=None, ymax=None, sigma_max=5): """ Get 2D marginalized density for a pair of parameters. :param params: list of two parameter names or indices to use. If already 2D, can be None. :param num_points: number of grid points for evaluation :param xmin: optional lower value for first parameter :param xmax: optional upper value for first parameter :param ymin: optional lower value for second parameter :param ymax: optional upper value for second parameter :param sigma_max: maximum number of standard deviations away from mean to include in calculated range :return: :class:`~.densities.Density2D` instance """ if self.dim > 2 or params is not None or not isinstance(self, Mixture2D): mixture = self.marginalizedMixture(params=params) elif self.dim != 2: raise Exception("density2D requires at least two dimensions") else: mixture = self # noinspection PyProtectedMember return mixture._density2D( num_points=num_points, xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax, sigma_max=sigma_max )
def _params_to_indices(self, params): indices = [] if params is None: params = self.names for p in params: if isinstance(p, str): indices.append(self.names.index(p)) elif hasattr(p, "name"): indices.append(self.names.index(p.name)) else: indices.append(p) return indices
[docs] def marginalizedMixture(self, params, label=None, no_limit_marge=False) -> "MixtureND": """ Calculates a reduced mixture model by marginalization over unwanted parameters :param params: array of parameter names or indices to retain. If none, will simply return a copy of this mixture. :param label: optional label for the marginalized mixture :param no_limit_marge: if true don't raise an error if mixture has limits. :return: a new marginalized :class:`MixtureND` instance """ indices = self._params_to_indices(params) if not no_limit_marge: self.checkNoLimits(indices) indices = np.array(indices) if self.names is not None: names = [self.names[i] for i in indices] else: names = None if self.lims is not None: lims = [self.lims[i] for i in indices] else: lims = None if label is None: label = self.label covs = [cov[np.ix_(indices, indices)] for cov in self.covs] means = [mean[indices] for mean in self.means] if len(indices) == 2: tp = Mixture2D else: tp = MixtureND mixture = tp(means, covs, self.weights, lims=lims, names=names, label=label) mixture.paramNames.setLabelsAndDerivedFromParamNames(self.paramNames) return mixture
[docs] def conditionalMixture(self, fixed_params, fixed_param_values, label=None): """ Returns a reduced conditional mixture model for the distribution when certainly parameters are fixed. :param fixed_params: list of names or numbers of parameters to fix :param fixed_param_values: list of values for the fixed parameters :param label: optional label for the new mixture :return: A new :class:`MixtureND` instance with cov_i = Projection(Cov_i^{-1})^{-1} and shifted conditional y """ fixed_params = self._params_to_indices(fixed_params) self.checkNoLimits(fixed_params) keep_params = [i for i in range(self.dim) if i not in fixed_params] if not len(keep_params): raise ValueError("conditionalMixture must leave at least one non-fixed parameter") new_means = [] new_covs = [] new_weights = [] for mean, cov, invcov, weight in zip(self.means, self.covs, self.invcovs, self.weights): deltas = np.asarray(fixed_param_values) - mean[fixed_params] new_cov = np.linalg.inv(invcov[np.ix_(keep_params, keep_params)]) new_mean = mean[keep_params] - new_cov.dot(invcov[np.ix_(keep_params, fixed_params)].dot(deltas)) if len(self.weights) == 1 and False: logw = 0 else: logw = invcov[np.ix_(fixed_params, fixed_params)].dot(deltas).dot(deltas) + np.log( np.linalg.det( cov[np.ix_(fixed_params, fixed_params)] - cov[np.ix_(fixed_params, keep_params)].dot( np.linalg.inv(cov[np.ix_(keep_params, keep_params)]).dot( cov[np.ix_(keep_params, fixed_params)] ) ) ) ) new_weights.append(logw) new_means.append(new_mean) new_covs.append(new_cov) new_weights = np.exp(-(np.asarray(new_weights) - min(new_weights)) / 2) if self.names is not None: names = [self.names[i] for i in keep_params] else: names = None mixture = MixtureND(new_means, new_covs, new_weights, names=names, label=label) mixture.paramNames.setLabelsAndDerivedFromParamNames(self.paramNames) return mixture
def checkNoLimits(self, keep_params): if self.lims is None: return for i, lim in enumerate(self.lims): if i not in keep_params and (lim[0] is not None or lim[1] is not None): raise Exception( "In general can only marginalize analytically if no hard boundary limits: " + self.label ) def getUpper(self, name): if self.lims is None: return None return self.lims[self.names.index(name)][1] def getLower(self, name): if self.lims is None: return None return self.lims[self.names.index(name)][1]
[docs] class Mixture2D(MixtureND): """ Gaussian mixture model in 2D with optional boundaries for fixed x and y ranges """ def __init__( self, means, covs, weights=None, lims=None, names=("x", "y"), xmin=None, xmax=None, ymin=None, ymax=None, **kwargs, ): """ :param means: list of y for each Gaussian in the mixture :param covs: list of covariances for the Gaussians in the mixture. Instead of 2x2 arrays, each cov can also be a list of [sigma_x, sigma_y, correlation] parameters :param weights: optional weight for each component (defaults to equal weight) :param lims: optional list of hard limits for each parameter, [[x1min,x1max], [x2min,x2max]]; use None for no limit :param names: list of names (strings) for each parameter. If not set, set to x, y :param xmin: optional lower hard bound for x :param xmax: optional upper hard bound for x :param ymin: optional lower hard bound for y :param ymax: optional upper hard bound for y :param kwargs: arguments passed to :class:`MixtureND` """ if lims is not None: limits = self._updateLimits(lims, xmin, xmax, ymin, ymax) else: limits = [(xmin, xmax), (ymin, ymax)] mats = [] for cov in covs: if isinstance(cov, (list, tuple)) and len(cov) == 3 and not isinstance(cov[0], (list, tuple)): mats.append(make_2D_Cov(*cov)) else: mats.append(cov) super().__init__(means, mats, weights, limits, names=names, **kwargs) def _updateLimits(self, lims, xmin=None, xmax=None, ymin=None, ymax=None): xmin = xmin if xmin is not None else lims[0][0] xmax = xmax if xmax is not None else lims[0][1] ymin = ymin if ymin is not None else lims[1][0] ymax = ymax if ymax is not None else lims[1][1] return [(xmin, xmax), (ymin, ymax)] def _density2D(self, num_points=1024, xmin=None, xmax=None, ymin=None, ymax=None, sigma_max=5): lims = self._updateLimits(self.lims, xmin, xmax, ymin, ymax) (xmin, xmax), (ymin, ymax) = self.autoRanges(sigma_max, lims=lims) x = np.linspace(xmin, xmax, num_points) y = np.linspace(ymin, ymax, num_points) xx, yy = np.meshgrid(x, y) like = self.pdf(xx, yy) return Density2D(x, y, like)
[docs] def pdf(self, x, y=None): """ Calculate the PDF. Note this assumes x and y are within the boundaries (does not return zero outside) Result is also only normalized if no boundaries :param x: value of x to evaluate pdf :param y: optional value of y to evaluate pdf. If not specified, returns 1D marginalized value for x. :return: value of pdf at x or x,y """ if y is None: return super().pdf(x) tot = None for i, (mean, icov, weight, norm) in enumerate(zip(self.means, self.invcovs, self.weights, self.norms)): dx = x - mean[0] dy = y - mean[1] res = np.exp(-(dx**2 * icov[0, 0] + 2 * dx * dy * icov[0, 1] + dy**2 * icov[1, 1]) / 2) / norm if not i: tot = res * weight else: tot += res * weight return tot
[docs] class Gaussian2D(Mixture2D): """ Simple special case of a 2D Gaussian mixture model with only one Gaussian component """ def __init__(self, mean, cov, **kwargs): """ :param mean: 2 element array with mean :param cov: 2x2 array of covariance, or list of [sigma_x, sigma_y, correlation] values :param kwargs: arguments passed to :class:`Mixture2D` """ super().__init__([mean], [cov], **kwargs)
[docs] class GaussianND(MixtureND): """ Simple special case of a Gaussian mixture model with only one Gaussian component """ def __init__(self, mean, cov, is_inv_cov=False, **kwargs): """ :param mean: array specifying y of parameters :param cov: covariance matrix (or filename of text file with covariance matrix) :param is_inv_cov: set True if cov is actually an inverse covariance :param kwargs: arguments passed to :class:`MixtureND` """ if isinstance(mean, str): mean = np.loadtxt(mean) if isinstance(cov, str): cov = np.loadtxt(cov) if is_inv_cov: cov = np.linalg.inv(cov) super().__init__([mean], [cov], **kwargs)
[docs] class Mixture1D(MixtureND): """ Gaussian mixture model in 1D with optional boundaries for fixed ranges """ def __init__(self, means, sigmas, weights=None, lims=None, name="x", xmin=None, xmax=None, **kwargs): """ :param means: array of y for each component :param sigmas: array of standard deviations for each component :param weights: weights for each component (defaults to equal weight) :param lims: optional array limits for each component :param name: parameter name (default 'x') :param xmin: optional lower limit :param xmax: optional upper limit :param kwargs: arguments passed to :class:`MixtureND` """ if lims is not None: limits = [(xmin if xmin is not None else lims[0], xmax if xmax is not None else lims[1])] else: limits = [(xmin, xmax)] covs = [np.atleast_2d(sigma**2) for sigma in sigmas] means = [[mean] for mean in means] super().__init__(means, covs, weights, limits, names=[name], **kwargs)
[docs] def pdf(self, x): return self.pdf_marged(0, x)
[docs] class Gaussian1D(Mixture1D): """ Simple 1D Gaussian """ def __init__(self, mean, sigma, **kwargs): """ :param mean: mean :param sigma: standard deviation :param kwargs: arguments passed to :class:`Mixture1D` """ super().__init__([mean], [sigma], **kwargs)
[docs] class RandomTestMixtureND(MixtureND): """ class for randomly generating an N-D gaussian mixture for testing (a mixture with random parameters, not random samples from the mixture). """ def __init__(self, ndim=4, ncomponent=1, names=None, weights=None, seed=None, label="RandomMixture"): """ :param ndim: number of dimensions :param ncomponent: number of components :param names: names for the parameters :param weights: weights for each component :param seed: random seed or Generator :param label: label for the generated mixture """ random_state = np.random.default_rng(seed) covs = [] for _ in range(ncomponent): A = random_state.random((ndim, ndim)) covs.append(np.dot(A, A.T)) super().__init__( random_state.random((ncomponent, ndim)), covs, weights=weights, lims=None, names=names, label=label )
[docs] def randomTestMCSamples(ndim=4, ncomponent=1, nsamp=10009, nMCSamples=1, seed=10, names=None, labels=None): """ get a MCSamples instance, or list of MCSamples instances with random samples from random covariances and y """ if names is None: names = ["x%s" % i for i in range(ndim)] if labels is None: labels = ["x_{%s}" % i for i in range(ndim)] seed = np.random.default_rng(seed) result = [ RandomTestMixtureND(ndim, ncomponent, names, seed=seed).MCSamples( nsamp, labels=labels, name_tag="Sim %s" % (i + 1), random_state=seed ) for i in range(nMCSamples) ] if nMCSamples > 1: return result else: return result[0]