Source code for ultranest.calibrator

# noqa: D400 D205
"""
Calibration of step sampler
---------------------------
"""

import os

import numpy as np

from ultranest.integrator import ReactiveNestedSampler


def _substitute_log_dir(init_args, nsteps):
    """Append `nsteps` to `log_dir` argument, if set.

    Parameters
    ----------
    init_args: dict
        arguments passed :py:class:`ReactiveNestedSampler`,
        may contain the key `'log_dir'`.
    nsteps: int
        number of steps

    Returns
    -------
    new_init_args: dict
        same as init_args, but if `'log_dir'` was set,
        it now has `'-nsteps'+str(nsteps)` appended.
    """
    if 'log_dir' in init_args:
        args = dict(init_args)
        args['log_dir'] = init_args['log_dir'] + '-nsteps%d' % nsteps
        return args
    return init_args


[docs] class ReactiveNestedCalibrator(): """Calibrator for the number of steps in step samplers. The number of steps in a step sampler needs to be chosen. A calibration recommended (e.g. https://ui.adsabs.harvard.edu/abs/2019MNRAS.483.2044H) is to run a sequence of nested sampling runs with increasing number of steps, and stop when log(Z) converges. This class automates this. See the :py:meth:`ReactiveNestedCalibrator.run` for details. Usage ----- Usage is designed to be a drop-in replacement for ReactiveNestedSampler. If your code was:: sampler = ReactiveNestedSampler(my_param_names, my_loglike, my_transform) sampler.stepsampler = SliceSampler(nsteps=10, generate_direction=region_oriented_direction) sampler.run(min_num_livepoints=400) You would change it to:: sampler = ReactiveNestedCalibrator(my_param_names, my_loglike, my_transform) sampler.stepsampler = SliceSampler(nsteps=10, generate_direction=region_oriented_direction) sampler.run(min_num_livepoints=400) The run() command will print the number of slice sampler steps that appear safe for the inference task. The initial value for nsteps (e.g. in `SliceSampler(nsteps=...)`) is overwritten by this class. """ def __init__(self, param_names, loglike, transform=None, **kwargs ): """Initialise nested sampler calibrator. Parameters ----------- param_names: list of str Names of the parameters. Length gives dimensionality of the sampling problem. loglike: function log-likelihood function. transform: function parameter transform from unit cube to physical parameters. kwargs: dict further arguments passed to ReactiveNestedSampler if `log_dir` is set, then the suffix `-nsteps%d` is added for each run where %d is replaced with the number of steps (2, 4, 8 etc). """ self.init_args = dict(param_names=param_names, loglike=loglike, transform=transform, **kwargs) self.stepsampler = None
[docs] def run(self, **kwargs): """Run a sequence of ReactiveNestedSampler runs until convergence. The first run is made with the number of steps set to the number of parameters. Each subsequent run doubles the number of steps. Runs are made until convergence is reached. Then this generator stops yielding results. Convergence is defined as three consecutive runs which 1) are not ordered in their log(Z) results, and 2) the consecutive log(Z) error bars must overlap. Parameters ----------- **kwargs: dict All arguments are passed to :py:meth:`ReactiveNestedSampler.run`. Yields ------- nsteps: int number of steps for the current run result: dict return value of :py:meth:`ReactiveNestedSampler.run` for the current run """ assert self.stepsampler is not None self.run_args = kwargs # start with nsteps=d nsteps = len(self.init_args['param_names']) self.results = [] self.nsteps = [] self.relsteps = [] while True: print("running with %d steps ..." % nsteps) init_args = _substitute_log_dir(self.init_args, nsteps) sampler = ReactiveNestedSampler(**init_args) sampler.stepsampler = self.stepsampler.__class__( nsteps=nsteps, generate_direction=self.stepsampler.generate_direction, check_nsteps=self.stepsampler.check_nsteps, adaptive_nsteps=self.stepsampler.adaptive_nsteps, log=open(init_args['log_dir'] + '/stepsampler.log', 'w') if 'log_dir' in self.init_args else None) # noqa: SIM115 self.sampler = sampler result = sampler.run(**self.run_args) print("Z=%(logz).2f +- %(logzerr).2f" % result) if self.sampler.log_to_disk: sampler.stepsampler.plot(os.path.join(self.sampler.logs['plots'], 'stepsampler.pdf')) sampler.stepsampler.plot_jump_diagnostic_histogram( os.path.join(self.sampler.logs['plots'], 'stepsampler-jumphist.pdf'), histtype='step', bins='auto') sampler.stepsampler.print_diagnostic() if 'jump-distance' in sampler.stepsampler.logstat_labels and 'reference-distance' in sampler.stepsampler.logstat_labels: i = sampler.stepsampler.logstat_labels.index('jump-distance') j = sampler.stepsampler.logstat_labels.index('reference-distance') jump_distances = np.array([entry[i] for entry in sampler.stepsampler.logstat]) reference_distances = np.array([entry[j] for entry in sampler.stepsampler.logstat]) self.relsteps.append(jump_distances / reference_distances) # TODO: handle population step samplers self.results.append(result) self.nsteps.append(nsteps) yield nsteps, result if len(self.results) > 2: last_result = self.results[-2] last_result2 = self.results[-3] # check if they agree within the error bars last_significant = abs(result['logz'] - last_result['logz']) > (result['logzerr']**2 + last_result['logzerr']**2)**0.5 last2_significant = abs(last_result2['logz'] - last_result['logz']) > (last_result2['logzerr']**2 + last_result['logzerr']**2)**0.5 # check if there is order monotonic_increase = result['logz'] > last_result['logz'] > last_result2['logz'] monotonic_decrease = result['logz'] < last_result['logz'] < last_result2['logz'] if last_significant: print("not converged: last two Z were significantly different") elif last2_significant: print("not yet converged: previous two Z were significantly different") elif monotonic_increase: print("not converged: monotonic increase in the last three Z results") elif monotonic_decrease: print("not converged: monotonic decrease in the last three Z results") else: print("converged! nsteps=%d appears safe" % nsteps) break nsteps *= 2
[docs] def plot(self): """Visualise the convergence diagnostics. Stores into `<log_dir>/plots/` folder: * stepsampler.pdf: diagnostic of stepsampler, see :py:meth:`StepSampler.plot` * nsteps-calibration-jumps.pdf: distribution of relative jump distance * nsteps-calibration.pdf: evolution of ln(Z) with nsteps """ self.sampler.stepsampler.plot(os.path.join(self.sampler.logs['plots'], 'stepsampler.pdf')) # plot U-test convergence run length (at 4 sigma) (or niter) vs nsteps # plot step > reference fraction vs nsteps calibration_results = [] import matplotlib.pyplot as plt plt.figure("jump-distance") print("jump distance diagnostic:") for nsteps, relsteps, result in zip(self.nsteps, self.relsteps, self.results): calibration_results.append([ nsteps, result['logz'], result['logzerr'], min(result['niter'], result['insertion_order_MWW_test']['independent_iterations']), result['insertion_order_MWW_test']['converged'] * 1, np.nanmean(relsteps > 1)]) plt.hist(np.log10(relsteps + 1e-10), histtype='step', bins='auto', label=nsteps) print(' %-4d: %.2f%% avg:%.2f' % (nsteps, np.nanmean(relsteps > 1) * 100.0, np.exp(np.nanmean(np.log(relsteps))))) if 'log_dir' in self.init_args: np.savetxt( self.init_args['log_dir'] + 'calibration.csv', calibration_results, delimiter=',', comments='', header='nsteps,logz,logzerr,maxUrun,Uconverged,stepfrac', fmt='%d,%.3f,%.3f,%d,%d,%.5f') plt.xlabel('$log_{10}$(relative step distance)') plt.ylabel('Frequency') plt.legend(title='nsteps', loc='best') if self.sampler.log_to_disk: plt.savefig(os.path.join(self.sampler.logs['plots'], 'nsteps-calibration-jumps.pdf'), bbox_inches='tight') plt.close() plt.figure("logz") plt.errorbar( x=self.nsteps, y=[result['logz'] for result in self.results], yerr=[result['logzerr'] for result in self.results], ) plt.title('Step sampler calibration') plt.xlabel('Number of steps') plt.ylabel('ln(Z)') if self.sampler.log_to_disk: plt.savefig(os.path.join(self.sampler.logs['plots'], 'nsteps-calibration.pdf'), bbox_inches='tight') plt.close() self.sampler.logger.debug('Making nsteps calibration plot ... done')