Source code for

#! /usr/bin/env python

Module with utility functions to the MCMC (``emcee``) sampling for NEGFC
parameter estimation.

__author__ = 'V. Christiaens, O. Wertz, Carlos Alberto Gomez Gonzalez'
__all__ = ['gelman_rubin',

import numpy as np
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

[docs] def gelman_rubin(x): r""" Determine the Gelman-Rubin :math:`\hat{R}` statistical test between Markov chains. Parameters ---------- x: numpy.array The numpy.array on which the Gelman-Rubin test is applied. This array should contain at least 2 set of data, i.e. x.shape >= (2,). Returns ------- out: float The Gelman-Rubin :math:`\hat{R}`. Example ------- >>> x1 = np.random.normal(0.0,1.0,(1,100)) >>> x2 = np.random.normal(0.1,1.3,(1,100)) >>> x = np.vstack((x1,x2)) >>> gelman_rubin(x) 1.0366629898991262 >>> gelman_rubin(np.vstack((x1,x1))) 0.99 """ if np.shape(x) < (2,): msg = 'Gelman-Rubin diagnostic requires multiple chains of the same ' msg += 'length' raise ValueError(msg) try: m, n = np.shape(x) except ValueError: print("Bad shape for the chains") return # Calculate between-chain variance B_over_n = np.sum((np.mean(x, 1) - np.mean(x)) ** 2) / (m - 1) # Calculate within-chain variances W = np.sum([(x[i] - xbar) ** 2 for i, xbar in enumerate(np.mean(x, 1))]) / (m * (n - 1)) # (over) estimate of variance s2 = W * (n - 1) / n + B_over_n # Pooled posterior variance estimate V = s2 + B_over_n / m # Calculate PSRF R = V / W return R
[docs] def gelman_rubin_from_chain(chain, burnin): r""" Pack the MCMC chain and determine the Gelman-Rubin :math:`\hat{R}` statistical test. In other words, two sub-sets are extracted from the chain (burnin parts are taken into account) and the Gelman-Rubin statistical test is performed. Parameters ---------- chain: numpy.array The MCMC chain with the shape walkers x steps x model_parameters burnin: float \in [0,1] The fraction of a walker which is discarded. Returns ------- out: float The Gelman-Rubin :math:`\hat{R}`. """ dim = chain.shape[2] k = chain.shape[1] thr0 = int(np.floor(burnin*k)) thr1 = int(np.floor((1-burnin) * k * 0.25)) rhat = np.zeros(dim) for j in range(dim): part1 = chain[:, thr0:thr0+thr1, j].reshape((-1)) part2 = chain[:, thr0+3*thr1:thr0+4*thr1, j].reshape((-1)) series = np.vstack((part1, part2)) rhat[j] = gelman_rubin(series) return rhat
def next_pow_two(n): i = 1 while i < n: i = i << 1 return i def autocorr_func_1d(x, norm=True): x = np.atleast_1d(x) if len(x.shape) != 1: raise ValueError("invalid dimensions for 1D autocorrelation function") n = next_pow_two(len(x)) # Compute the FFT and then (from that) the auto-correlation function f = np.fft.fft(x - np.mean(x), n=2 * n) acf = np.fft.ifft(f * np.conjugate(f))[: len(x)].real acf /= 4 * n # Optionally normalize if norm: acf /= acf[0] return acf def auto_window(taus, c): m = np.arange(len(taus)) < c * taus if np.any(m): return np.argmin(m) return len(taus) - 1 def autocorr(y, c=5.0): f = np.zeros(y.shape[1]) for yy in y: f += autocorr_func_1d(yy) f /= len(y) taus = 2.0 * np.cumsum(f) - 1.0 window = auto_window(taus, c) return taus[window] def autocorr_test(chain): N = chain.shape[1] tau = autocorr(chain) return tau/N