Source code for astroML.density_estimation.xdeconv

"""
Extreme deconvolution solver

This follows Bovy et al.
http://arxiv.org/pdf/0905.2979v2.pdf

Arbitrary mixing matrices R are not yet implemented: currently, this only
works with R = I.
"""
from time import time

import numpy as np
from scipy import linalg
try:  # SciPy >= 0.19
    from scipy.special import logsumexp as logsumexp
except ImportError:
    from scipy.misc import logsumexp as logsumexp

from sklearn.mixture import GaussianMixture
from sklearn.utils import check_random_state

from ..utils import log_multivariate_gaussian


[docs]class XDGMM: """Extreme Deconvolution Fit an extreme deconvolution (XD) model to the data Parameters ---------- n_components: integer number of gaussian components to fit to the data max_iter: integer (optional) number of EM iterations to perform (default=100) tol: float (optional) stopping criterion for EM iterations (default=1E-5) Notes ----- This implementation follows Bovy et al. arXiv 0905.2979 """
[docs] def __init__(self, n_components, max_iter=100, tol=1E-5, verbose=False, random_state = None): self.n_components = n_components self.max_iter = max_iter self.tol = tol self.verbose = verbose self.random_state = random_state # model parameters: these are set by the fit() method self.V = None self.mu = None self.alpha = None
def fit(self, X, Xerr, R=None): """Fit the XD model to data Parameters ---------- X: array_like Input data. shape = (n_samples, n_features) Xerr: array_like Error on input data. shape = (n_samples, n_features, n_features) R : array_like (TODO: not implemented) Transformation matrix from underlying to observed data. If unspecified, then it is assumed to be the identity matrix. """ if R is not None: raise NotImplementedError("mixing matrix R is not yet implemented") X = np.asarray(X) Xerr = np.asarray(Xerr) n_samples, n_features = X.shape # assume full covariances of data assert Xerr.shape == (n_samples, n_features, n_features) # initialize components via a few steps of GaussianMixture # this doesn't take into account errors, but is a fast first-guess gmm = GaussianMixture(self.n_components, max_iter=10, covariance_type='full', random_state=self.random_state).fit(X) self.mu = gmm.means_ self.alpha = gmm.weights_ self.V = gmm.covariances_ logL = self.logL(X, Xerr) for i in range(self.max_iter): t0 = time() self._EMstep(X, Xerr) logL_next = self.logL(X, Xerr) t1 = time() if self.verbose: print("%i: log(L) = %.5g" % (i + 1, logL_next)) print(" (%.2g sec)" % (t1 - t0)) if logL_next < logL + self.tol: break logL = logL_next return self def logprob_a(self, X, Xerr): """ Evaluate the probability for a set of points Parameters ---------- X: array_like Input data. shape = (n_samples, n_features) Xerr: array_like Error on input data. shape = (n_samples, n_features, n_features) Returns ------- p: ndarray Probabilities. shape = (n_samples,) """ X = np.asarray(X) Xerr = np.asarray(Xerr) n_samples, n_features = X.shape # assume full covariances of data assert Xerr.shape == (n_samples, n_features, n_features) X = X[:, np.newaxis, :] Xerr = Xerr[:, np.newaxis, :, :] T = Xerr + self.V return log_multivariate_gaussian(X, self.mu, T) + np.log(self.alpha) def logL(self, X, Xerr): """Compute the log-likelihood of data given the model Parameters ---------- X: array_like data, shape = (n_samples, n_features) Xerr: array_like errors, shape = (n_samples, n_features, n_features) Returns ------- logL : float log-likelihood """ return np.sum(logsumexp(self.logprob_a(X, Xerr), -1)) def _EMstep(self, X, Xerr): """ Perform the E-step (eq 16 of Bovy et al) """ n_samples, n_features = X.shape X = X[:, np.newaxis, :] Xerr = Xerr[:, np.newaxis, :, :] w_m = X - self.mu T = Xerr + self.V #------------------------------------------------------------ # compute inverse of each covariance matrix T Tshape = T.shape T = T.reshape([n_samples * self.n_components, n_features, n_features]) Tinv = np.array([linalg.inv(T[i]) for i in range(T.shape[0])]).reshape(Tshape) T = T.reshape(Tshape) #------------------------------------------------------------ # evaluate each mixture at each point N = np.exp(log_multivariate_gaussian(X, self.mu, T, Vinv=Tinv)) #------------------------------------------------------------ # E-step: # compute q_ij, b_ij, and B_ij q = (N * self.alpha) / np.dot(N, self.alpha)[:, None] tmp = np.sum(Tinv * w_m[:, :, np.newaxis, :], -1) b = self.mu + np.sum(self.V * tmp[:, :, np.newaxis, :], -1) tmp = np.sum(Tinv[:, :, :, :, np.newaxis] * self.V[:, np.newaxis, :, :], -2) B = self.V - np.sum(self.V[:, :, :, np.newaxis] * tmp[:, :, np.newaxis, :, :], -2) #------------------------------------------------------------ # M-step: # compute alpha, m, V qj = q.sum(0) self.alpha = qj / n_samples self.mu = np.sum(q[:, :, np.newaxis] * b, 0) / qj[:, np.newaxis] m_b = self.mu - b tmp = m_b[:, :, np.newaxis, :] * m_b[:, :, :, np.newaxis] tmp += B tmp *= q[:, :, np.newaxis, np.newaxis] self.V = tmp.sum(0) / qj[:, np.newaxis, np.newaxis] def sample(self, size=1, random_state=None): if random_state is None: random_state = self.random_state rng = check_random_state(random_state) shape = tuple(np.atleast_1d(size)) + (self.mu.shape[1],) npts = np.prod(size) alpha_cs = np.cumsum(self.alpha) r = np.atleast_1d(np.random.random(size)) r.sort() ind = r.searchsorted(alpha_cs) ind = np.concatenate(([0], ind)) if ind[-1] != size: ind[-1] = size draw = np.vstack([np.random.multivariate_normal(self.mu[i], self.V[i], (ind[i + 1] - ind[i],)) for i in range(len(self.alpha))]) return draw.reshape(shape)