from __future__ import print_function
"""
PCA-based background model.
"""
import numpy
import json
import logging
import warnings
import os
if 'MAKESPHINXDOC' not in os.environ:
import sherpa.astro.ui as ui
from sherpa.stats import Cash, CStat
from sherpa.models.parameter import Parameter
from sherpa.models import ArithmeticModel, CompositeModel
from sherpa.astro.ui import *
from sherpa.astro.instrument import RSPModelNoPHA, RMFModelNoPHA, create_delta_rmf, create_arf
else:
# mock objects when sherpa doc is built
ArithmeticModel = object
class CompositeModel(object):
pass
RSPModelNoPHA, RMFModelNoPHA = object, object
def pca(M):
mean = M.mean(axis=0)
Moffset = M - mean.reshape((1,-1))
U, s, Vt = numpy.linalg.svd(Moffset, full_matrices=False)
V = Vt.T
print('variance explained:', s**2/len(M))
c = numpy.cumsum(s**2/len(M))
c = c / c[-1]
for cut in 0.80, 0.90, 0.95, 0.99:
idx, = numpy.where(c>cut)
n = idx.min() + 1
print(' --> need %d components to explain %.2f%%' % (n, cut))
return U, s, V, mean
def pca_predict(U, s, V, mean):
S = numpy.diag(s)
return numpy.dot(U, numpy.dot(S, V.T)) + mean.reshape((1,-1))
def pca_get_vectors(s, V, mean):
#U = numpy.eye(len(s))
#return pca_predict(U, s, V, mean)
Sroot = numpy.diag(s**0.5)
return numpy.dot(Sroot, V.T)
def pca_cut(U, s, V, mean, ncomponents=20):
return U[:, :ncomponents], s[:ncomponents], V[:,:ncomponents], mean
def pca_cut_adapt(U, s, V, mean):
return U[:, :ncomponents], s[:ncomponents], V[:,:ncomponents], mean
def pca_check(M, U, s, V, mean):
# if we use only the first 20 PCs the reconstruction is less accurate
Mhat2 = pca_predict(U, s, V, mean)
print("Using %d PCs, MSE = %.6G" % (len(s), numpy.mean((M - Mhat2)**2)))
return M - Mhat2
[docs]
def get_unit_response(i):
copy_data(i,"temp_unitrsp")
unit_arf = get_bkg_arf("temp_unitrsp")
unit_arf.specresp = 0. * unit_arf.specresp + 1.0
bunitrsp = get_response("temp_unitrsp", bkg_id=1)
delete_data("temp_unitrsp")
return bunitrsp
class IdentityResponse(RSPModelNoPHA):
def __init__(self, n, model, arf, rmf):
self.n = n
RSPModelNoPHA.__init__(self, arf=arf, rmf=rmf, model=model)
self.elo = numpy.arange(n)
self.ehi = numpy.arange(n)
self.lo = numpy.arange(n)
self.hi = numpy.arange(n)
self.xlo = numpy.arange(n)
self.xhi = numpy.arange(n)
def apply_rmf(src):
return src
def calc(self, p, x, xhi=None, *args, **kwargs):
src = self.model.calc(p, self.xlo, self.xhi)
assert numpy.isfinite(src).all(), src
return src
class IdentityRMF(RMFModelNoPHA):
def __init__(self, n, model, rmf):
self.n = n
RMFModelNoPHA.__init__(self, rmf=rmf, model=model)
self.elo = numpy.arange(n)
self.ehi = numpy.arange(n)
self.lo = numpy.arange(n)
self.hi = numpy.arange(n)
self.xlo = numpy.arange(n)
self.xhi = numpy.arange(n)
def apply_rmf(src):
return src
def calc(self, p, x, xhi=None, *args, **kwargs):
src = self.model.calc(p, self.xlo, self.xhi)
assert numpy.isfinite(src).all(), src
return src
[docs]
def get_identity_response(i):
rmf = get_rmf(i)
delta_rmf = create_delta_rmf(rmf.e_min, rmf.e_max, offset=1, e_min=rmf.e_min, e_max=rmf.e_max, ethresh=rmf.ethresh)
flat_arf = create_arf(rmf.e_min, rmf.e_max, ethresh=rmf.ethresh)
return lambda model: RSPModelNoPHA(flat_arf, delta_rmf, model)
logf = logging.getLogger('bxa.Fitter')
logf.setLevel(logging.INFO)
# Model
[docs]
class PCAModel(ArithmeticModel):
def __init__(self, modelname, data):
self.U = data['U']
self.V = numpy.matrix(data['components'])
self.mean = data['mean']
self.s = data['values']
self.ilo = data['ilo']
self.ihi = data['ihi']
p0 = Parameter(modelname=modelname, name='lognorm', val=1, min=-5, max=20,
hard_min=-100, hard_max=100)
pars = [p0]
for i in range(len(self.s)):
pi = Parameter(modelname=modelname, name='PC%d' % (i+1),
val=0, min=-20, max=20,
hard_min=-1e300, hard_max=1e300)
pars.append(pi)
super(ArithmeticModel, self).__init__(modelname, pars=pars)
[docs]
def calc(self, p, left, right, *args, **kwargs):
try:
lognorm = p[0]
pars = numpy.array(p[1:])
y = numpy.array(pars * self.V.T + self.mean).flatten()
cts = (10**y - 1) * 10**lognorm
out = left * 0.0
out[self.ilo:self.ihi] = cts
return out
except Exception as e:
print("Exception in PCA model:", e, p)
raise e
[docs]
def startup(self, *args):
pass
[docs]
def teardown(self, *args):
pass
[docs]
def guess(self, dep, *args, **kwargs):
self._load_params()
class GaussModel(ArithmeticModel):
def __init__(self, modelname):
self.LineE = Parameter(modelname=modelname, name='LineE', val=1, min=0, max=1e38)
self.Sigma = Parameter(modelname=modelname, name='Sigma', val=1, min=0, max=1e38)
self.norm = Parameter(modelname=modelname, name='norm', val=1, min=0, max=1e38)
pars = (self.LineE, self.Sigma, self.norm)
super(ArithmeticModel, self).__init__(modelname, pars=pars)
def calc(self, p, left, right, *args, **kwargs):
try:
LineE, Sigma, norm = p
cts = norm * numpy.exp(-0.5 * ((left - LineE)/Sigma)**2)
out = cts.cumsum()
return cts
except Exception as e:
print("Exception in PCA model:", e, p)
raise e
def startup(self):
pass
def teardown(self):
pass
def guess(self, dep, *args, **kwargs):
self._load_params()
[docs]
class PCAFitter(object):
"""Fitter mixing PCA-based templates and gaussian lines """
def __init__(self, id=None):
"""
id: which data id to fit
filename: prefix for where to store background information
load: whether the background file should be loaded now
"""
self.id = id
logf.info('PCAFitter(for ID=%s)' % (id))
hdr = get_bkg(id).header
self.ndata = len(get_bkg(id).counts)
telescope = hdr.get('TELESCOP','')
instrument = hdr.get('INSTRUME', '')
if telescope == '' and instrument == '':
raise Exception('ERROR: The TELESCOP/INSTRUME headers are not set in the data file.')
for folder in os.environ.get('BKGMODELDIR', '.'), os.path.dirname(__file__):
filename = os.path.join(folder, ('%s_%s_%d.json' % (telescope, instrument, self.ndata)).lower())
if os.path.exists(filename):
self.load(filename)
return
filename = os.path.join(folder, ('%s_%d.json' % (telescope, self.ndata)).lower())
if os.path.exists(filename):
self.load(filename)
return
raise Exception('ERROR: Could not load PCA components for this detector (%s %s, %d channels). Try the SingleFitter instead.' % (telescope, instrument, self.ndata))
[docs]
def load(self, filename):
logf.info('loading PCA information from %s' % (filename))
data = json.load(open(filename))
self.pca = dict()
for k, v in data.items():
self.pca[k] = numpy.array(v)
[docs]
def decompose(self):
ilo = int(self.pca['ilo'])
ihi = int(self.pca['ihi'])
lo = self.pca['lo']
hi = self.pca['hi']
mean = self.pca['mean']
V = numpy.matrix(self.pca['components'])
s = self.pca['values']
U = self.pca['U']
cts = get_data(self.id).counts[ilo:ihi]
ncts = cts.sum()
logf.info('have %d background counts for deconvolution' % ncts)
y = numpy.log10(cts * 1. / ncts + 1.0)
z = (y - mean) * V
assert z.shape == (1,len(s)), z.shape
z = z.tolist()[0]
return numpy.array([numpy.log10(ncts + 0.1)] + z)
[docs]
def calc_bkg_stat(self):
ss = [s for s in get_stat_info() if self.id in s.ids and s.bkg_ids is not None and len(s.bkg_ids) > 0]
if len(ss) != 1:
for s in get_stat_info():
if self.id in s.ids and len(s.bkg_ids) > 0:
print('get_stat_info returned: ids=%s bkg_ids=%s' % (s.ids, s.bkg_ids))
assert len(ss) == 1
return ss[0].statval
[docs]
def fit(self, max_lines=10):
# try a PCA decomposition of this spectrum
logf.info('fitting background of ID=%s using PCA method' % (self.id))
initial = self.decompose()
logf.info('fit: initial PCA decomposition: %s' % (initial))
id = self.id
set_method('neldermead')
bkgmodel = PCAModel('pca%s' % id, data=self.pca)
self.bkgmodel = bkgmodel
response = get_identity_response(self.id)
convbkgmodel = response(bkgmodel)
set_bkg_full_model(self.id, convbkgmodel)
for p, v in zip(bkgmodel.pars, initial):
p.val = v
srcmodel = get_model(self.id)
set_full_model(self.id, srcmodel)
fit_bkg(id=self.id)
logf.info('fit: first full fit done')
final = [p.val for p in get_bkg_model(id).pars]
logf.info('fit: parameters: %s' % (final))
initial_v = self.calc_bkg_stat()
logf.info('fit: stat: %s' % (initial_v))
# lets try from zero
logf.info('fit: second full fit from zero')
for p in bkgmodel.pars:
p.val = 0
fit_bkg(id=self.id)
initial_v0 = self.calc_bkg_stat()
logf.info('fit: parameters: %s' % (final))
logf.info('fit: stat: %s' % (initial_v0))
# pick the better starting point
if initial_v0 < initial_v:
logf.info('fit: using zero-fit')
initial_v = initial_v0
final = [p.val for p in get_bkg_model(id).pars]
else:
logf.info('fit: using decomposed-fit')
for p, v in zip(bkgmodel.pars, final):
p.val = v
# start with the full fit and remove(freeze) parameters
print('%d parameters, stat=%.2f' % (len(initial), initial_v))
results = [(2 * len(final) + initial_v, final, len(final), initial_v)]
for i in range(len(initial)-1, 0, -1):
bkgmodel.pars[i].val = 0
bkgmodel.pars[i].freeze()
fit_bkg(id=self.id)
final = [p.val for p in get_bkg_model(id).pars]
v = self.calc_bkg_stat()
print('--> %d parameters, stat=%.2f' % (i, v))
results.insert(0, (v + 2*i, final, i, v))
print()
print('Background PCA fitting AIC results:')
print('-----------------------------------')
print()
print('stat Ncomp AIC')
for aic, params, nparams, val in results:
print('%-05.1f %2d %-05.1f' % (val, nparams, aic))
aic, final, nparams, val = min(results)
for p, v in zip(bkgmodel.pars, final):
p.val = v
for i in range(nparams):
bkgmodel.pars[i].thaw()
print()
print('Increasing parameters again...')
# now increase the number of parameters again
#results = [(aic, final, nparams, val)]
last_aic, last_final, last_nparams, last_val = aic, final, nparams, val
for i in range(last_nparams, len(bkgmodel.pars)):
next_nparams = i + 1
bkgmodel.pars[i].thaw()
for p, v in zip(bkgmodel.pars, last_final):
p.val = v
fit_bkg(id=self.id)
next_final = [p.val for p in get_bkg_model(id).pars]
v = self.calc_bkg_stat()
next_aic = v + 2*next_nparams
if next_aic < last_aic:
# accept
print('%d parameters, aic=%.2f ** accepting' % (next_nparams, next_aic))
last_aic, last_final, last_nparams, last_val = next_aic, next_final, next_nparams, v
else:
print('%d parameters, aic=%.2f' % (next_nparams, next_aic))
# stop if we are 3 parameters ahead what we needed
if next_nparams >= last_nparams + 3:
break
print('Final choice: %d parameters, aic=%.2f' % (last_nparams, last_aic))
# reset to the last good solution
for p, v in zip(bkgmodel.pars, last_final):
p.val = v
last_model = convbkgmodel
for i in range(1, max_lines + 1):
print()
print('Adding Gaussian#%d' % (i))
# find largest discrepancy
set_analysis(id, "ener", "rate")
m = get_bkg_fit_plot(id)
y = m.dataplot.y.cumsum()
z = m.modelplot.y.cumsum()
diff_rate = numpy.abs(y - z)
set_analysis(id, "ener", "counts")
m = get_bkg_fit_plot(id)
x = m.dataplot.x
y = m.dataplot.y.cumsum()
z = m.modelplot.y.cumsum()
diff = numpy.abs(y - z)
imax = numpy.argmax(diff)
energies = x
e = x[imax]
print('largest remaining discrepancy at %.3fkeV[%d], need %d counts' % (x[imax], imax, diff[imax]))
power = diff_rate[imax]
# power = diff[imax]
# lets try to inject a gaussian there
g = xsgaussian('g_%d_%d' % (id, i))
print('placing gaussian at %.2fkeV, with power %s' % (e, power))
# we work in energy bins, not energy
g.LineE.min = energies[0]
g.LineE.max = energies[-1]
g.LineE.val = e
if imax > len(diff) - 2:
imax = len(diff) - 2
if imax < 2:
imax = 2
g.Sigma = (x[imax + 1] - x[imax - 1])
try:
g.Sigma.hard_max = 1e10
except:
try:
g.Sigma._hard_max = 1e10
except:
pass
g.Sigma.min = (x[imax + 1] - x[imax - 1])/3
g.Sigma.max = x[-1] - x[0]
g.norm.min = power * 1e-6
g.norm.val = power
convbkgmodel2 = response(g)
next_model = last_model + convbkgmodel2
set_bkg_full_model(self.id, next_model)
fit_bkg(id=self.id)
next_final = [p.val for p in get_bkg_model(id).pars]
next_nparams = len(next_final)
v = self.calc_bkg_stat()
next_aic = v + 2 * next_nparams
print('with Gaussian:', next_aic, '; change: %.1f (negative is good)' % (next_aic - last_aic))
if next_aic < last_aic:
print('accepting')
last_model = next_model
last_aic, last_final, last_nparams, last_val = next_aic, next_final, next_nparams, v
else:
print('not significant, rejecting')
set_bkg_full_model(self.id, last_model)
for p, v in zip(last_model.pars, last_final):
p.val = v
break
[docs]
def auto_background(id, max_lines=10):
"""Automatically fits background *id* based on PCA-based templates,
and additional gaussian lines as needed by AIC."""
bkgmodel = PCAFitter(id)
log_sherpa = logging.getLogger('sherpa.astro.ui.utils')
prev_level = log_sherpa.level
try:
log_sherpa.setLevel(logging.WARN)
bkgmodel.fit(max_lines)
finally:
log_sherpa.setLevel(prev_level)
m = get_bkg_fit_plot(id)
numpy.savetxt('test_bkg.txt', numpy.transpose([m.dataplot.x, m.dataplot.y, m.modelplot.x, m.modelplot.y]))
return get_bkg_model(id)
__all__ = ['PCAFitter', 'PCAModel', 'auto_background', 'get_identity_response', 'get_unit_response']