Source code for nwaylib

"""Multiway association between astrometric catalogues"""
from __future__ import division, print_function

from collections import OrderedDict

import numpy
import pandas
from numpy import log10, pi

from . import bayesdistance as bayesdist
from . import fastskymatch as match
from . import magnitudeweights as magnitudeweights
from .logger import NormalLogger, NullOutputLogger

__author__ = """Johannes Buchner"""
__email__ = 'johannes.buchner.acad@gmx.com'
__version__ = '4.7.0'


[docs] class UndersampledException(Exception): pass
[docs] class EmptyResultException(Exception): pass
default_logger = NormalLogger()
[docs] def nway_match(match_tables, match_radius, prior_completeness, mag_include_radius=None, mag_exclude_radius=None, magauto_post_single_minvalue=0.9, prob_ratio_secondary = 0.5, min_prob=0., consider_unrelated_associations=True, store_mag_hists=True, logger=default_logger): """ match_tables: list of catalogues, each a dict with entries: - name (short catalog name, no spaces, used in output columns) - ra (RA in degrees) - dec (dec in degrees) - error (positional error in arcsec) - area (sky area covered by the catalogue in square degrees) - mags (list of additional columns to consider as priors) - magnames (short name for each entry in mags, used in output columns) - maghists (list of prior information for each entry in mags) use None to automatically build a histogram of target/non-target sources. Otherwise, supply the histogram manually: bins_lo, bins_hi, hist_sel, hist_all. match_radius: maximum radius in arcsec to consider. More distant counterparts are cut off. Set to a very large value, e.g. 5 times the largest positional error. Setting to larger values does not change the result, but setting to smaller values improves performance. prior_completeness: expected fraction of sources in the primary catalogue that are expected to have a counterpart (e.g., 90%). If an array is passed, completeness for each catalog. First entry has to be 1. mag_include_radius: search radius for building the magnitude histogram of target sources. If None (default), the Bayesian posterior is used mag_exclude_radius: exclusion radius for building the magnitude histogram of field sources. If None (default), mag_include_radius is used magauto_post_single_minvalue: minimum posterior probability (default: 0.9) for the magnitude histogram of secure target sources. Used in the Bayesian procedure. prob_ratio_secondary: for each primary catalogue entry, the most probable associations is marked with match_flag=1. Comparably good solutions receive match_flag=2 (all others are match_flag=0). The prob_ratio_secondary parameter controls the lowest probability ratio p(secondary)/p(primary) still considered comparable. min_prob: Minimum probability for truncating the output table. default: 0 (no truncation) consider_unrelated_associations: Correction for missing sources in partial assocations. Keep to True. store_mag_hists: Write constructed mag hists to file (filename based on table name and mags). logger: NormalLogger for stderr output and progress bars, NullOutputLogger if silent """ if mag_exclude_radius is None: mag_exclude_radius = mag_include_radius if mag_include_radius is not None: if mag_include_radius >= match_radius: logger.warn('WARNING: magnitude radius is very large (>= matching radius). Consider using a smaller value.') ncats = len(match_tables) table, resultstable, separations, errors = _create_match_table(match_tables, match_radius, logger=logger) if not len(table) > 0: raise EmptyResultException('No matches.') source_densities, source_densities_plus = _compute_source_densities(match_tables, logger=logger) # first pass: find secure matches and secure non-matches prior, log_bf = _compute_single_log_bf(match_tables, source_densities, source_densities_plus, table, separations, errors, prior_completeness, logger=logger) table = table.assign( dist_bayesfactor_uncorrected=log_bf, dist_bayesfactor=log_bf ) if consider_unrelated_associations: _correct_unrelated_associations(table, separations, errors, ncats, source_densities, source_densities_plus, logger=logger) log_bf = table['dist_bayesfactor'] # add the additional columns post = bayesdist.posterior(prior, log_bf) table = table.assign(dist_post=post) # find magnitude biasing functions table, total = _apply_magnitude_biasing(match_tables, table, mag_include_radius, mag_exclude_radius, magauto_post_single_minvalue, store_mag_hists, logger=logger) table = _compute_final_probabilities(match_tables, table, prob_ratio_secondary, prior, total, logger=logger) table = _truncate_table(table, min_prob, logger=logger) return table
def _create_match_table(match_tables, match_radius, logger): # first match input catalogues, compute possible combinations in match_radius ratables = [(t['ra'], t['dec']) for t in match_tables] table_names = [t['name'] for t in match_tables] resultstable = match.crossproduct(ratables, match_radius / 60. / 60, logger=logger) #results = resultstable.view(dtype=[(t['name'], resultstable.dtype) for t in match_tables]).reshape((-1,)) nresults = len(resultstable) keys = [] columns = [] for i, t in enumerate(match_tables): keys.append(t['name']) columns.append(resultstable[:,i]) # matrix of separations separations = [] errors = [] invalid_separations = numpy.ones(nresults) * numpy.nan logger.log(' adding angular separation columns') max_separation = numpy.zeros(nresults) for i in range(len(match_tables)): a_ra = ratables[i][0][resultstable[:,i]] a_dec = ratables[i][1][resultstable[:,i]] errors.append(match_tables[i]['error'][resultstable[:,i]]) row = [] for j in range(len(match_tables)): if i < j: b_ra = ratables[j][0][resultstable[:,j]] b_dec = ratables[j][1][resultstable[:,j]] col = match.dist((a_ra, a_dec), (b_ra, b_dec)) assert not numpy.isnan(col).any(), ['%d distances are nan' % numpy.isnan(col).sum(), a_ra[numpy.isnan(col)], a_dec[numpy.isnan(col)], b_ra[numpy.isnan(col)], b_dec[numpy.isnan(col)]] assert not (a_ra == b_ra).all() # store distance in arcsec col[resultstable[:,i] == -1] = numpy.nan col[resultstable[:,j] == -1] = numpy.nan col[a_ra == -99] = numpy.nan col[b_ra == -99] = numpy.nan col_arcsec = col * 60 * 60 keys.append("Separation_%s_%s" % (table_names[i], table_names[j])) columns.append(col_arcsec) max_separation = numpy.nanmax([col_arcsec, max_separation], axis=0) row.append(col_arcsec) else: # mark the upper right triangle of the matrix as invalid data row.append(invalid_separations) separations.append(row) del row keys.append('Separation_max') columns.append(max_separation) keys.append('ncat') columns.append((resultstable > -1).sum(axis=1)) # now truncate all columns: mask = max_separation < match_radius columns = [c[mask] for c in columns] errors = [e[mask] for e in errors] separations = [[cell[mask] for cell in row] for row in separations] #results = results[mask] resultstable = resultstable[mask,:] nresults = len(resultstable) logger.log('matching: %6d matches after filtering by search radius' % mask.sum()) # now we have columns, which contains the distance information. assert len(columns) == len(keys), (len(columns), len(keys), keys) table = pandas.DataFrame(OrderedDict(zip(keys, columns))) assert len(table) == nresults, (len(table), nresults, mask.sum()) return table, resultstable, separations, errors def _compute_source_densities(match_tables, logger): source_densities = [] source_densities_plus = [] for i, match_table in enumerate(match_tables): n = len(match_table['ra']) area = match_table['area'] * 1.0 # in square degrees area_total = (4 * pi * (180 / pi)**2) density = n / area * area_total logger.log('%s "%s" (%d), density gives %.2e objects on entire sky' % ('Primary catalogue' if i == 0 else 'Catalogue', match_table['name'], n, density)) # this takes into account that the source may be absent density_plus = (n + 1) / area * area_total source_densities.append(density) source_densities_plus.append(density_plus) # source can not be absent in primary catalogue source_densities_plus[0] = source_densities[0] source_densities_plus = numpy.array(source_densities_plus) source_densities = numpy.array(source_densities) return source_densities, source_densities_plus def _compute_single_log_bf(match_tables, source_densities, source_densities_plus, table, separations, errors, prior_completeness, logger): logger.log('Computing distance-based probabilities ...') ncats = len(match_tables) if numpy.shape(prior_completeness) == (): prior_completeness = numpy.array([1.0] + [float(prior_completeness)**(1./(ncats-1)) for i in range(1, ncats)]) if len(prior_completeness) != ncats: raise Exception('Prior completeness needs one value per catalog. Received "%s".' % prior_completeness) assert prior_completeness[0] == 1.0 log_bf = numpy.zeros(len(table)) * numpy.nan prior = numpy.zeros(len(table)) * numpy.nan # handle all cases (also those with missing counterparts in some catalogues) for case in range(2**(ncats-1)): table_mask = numpy.array([True] + [(case // 2**(ti)) % 2 == 0 for ti in range(len(match_tables)-1)]) # select those cases mask = True for i in range(1, len(match_tables)): if table_mask[i]: # require not nan mask = numpy.logical_and(mask, ~numpy.isnan(separations[0][i])) else: mask = numpy.logical_and(mask, numpy.isnan(separations[0][i])) # select errors errors_selected = [e[mask] for e, m in zip(errors, table_mask) if m] separations_selected = [[cell[mask] for cell, m in zip(row, table_mask) if m] for row, m2 in zip(separations, table_mask) if m2] # here we should call the elliptical error variant if errors is a 2d array r = bayesdist.log_bf(separations_selected, errors_selected) assert r.shape == separations_selected[0][0].shape, (r.shape, separations_selected[0][0].shape) assert r.shape == errors_selected[0].shape, (r.shape, errors_selected[0].shape) assert r.shape == mask.sum(), (r.shape, mask.sum(), mask.shape) log_bf[mask] = r prior[mask] = source_densities[0] * numpy.prod(prior_completeness[table_mask]) / numpy.prod(source_densities_plus[table_mask]) assert numpy.isfinite(prior[mask]).all(), (source_densities, prior_completeness[table_mask], numpy.prod(source_densities_plus[table_mask])) assert numpy.isfinite(prior).all(), (prior, log_bf) assert numpy.isfinite(log_bf).all(), (prior, log_bf) return prior, log_bf def _correct_unrelated_associations(table, separations, errors, ncats, source_densities, source_densities_plus, logger): logger.log(' correcting for unrelated associations ...') # correct for unrelated associations # identify those in need of correction # two unconsidered catalogues are needed for an unrelated association primary_id = table[table.columns[0]] #candidates = numpy.unique(primary_id[(ncat <= ncats - 2).values]) pbar = logger.progress() for primary_id_key, group in pbar(table.groupby(primary_id, sort=False)): ncat_here = group['ncat'] # list which ones we are missing best_logpost = 0 # go through more complex associations for j in group[ncat_here > 2].index.values: missing_cats = [k for k, sep in enumerate(separations[0]) if numpy.isnan(sep[j])] # check if this association has sufficient overlap with the one we are looking for # it must contain at least two of the catalogues we are missing augmented_cats = [] for k in missing_cats: if not numpy.isnan(separations[0][k][j]): augmented_cats.append(k) n_augmented_cats = len(augmented_cats) if n_augmented_cats >= 2: # ok, this is helpful. # identify the separations and errors separations_selected = [[[separations[k][k2][j]] for k2 in augmented_cats] for k in augmented_cats] errors_selected = [[errors[k][j]] for k in augmented_cats] # identify the prior prior_j = source_densities[augmented_cats[0]] / numpy.prod(source_densities_plus[augmented_cats]) # compute a log_bf log_bf_j = bayesdist.log_bf(numpy.array(separations_selected), numpy.array(errors_selected)) logpost_j = bayesdist.unnormalised_log_posterior(prior_j, log_bf_j, n_augmented_cats) if logpost_j > best_logpost: #print('post:', logpost_j, log_bf_j, prior_j) best_logpost = logpost_j # ok, we have our correction factor, best_logpost # lets multiply it onto log_bf if best_logpost > 0: group['dist_bayesfactor'] += best_logpost def _apply_magnitude_biasing(match_tables, table, mag_include_radius, mag_exclude_radius, magauto_post_single_minvalue, store_mag_hists, logger): biases = {} for i, t in enumerate(match_tables): table_name = t['name'] for magvals, maghist, magname in zip(t['mags'], t['maghists'], t['magnames']): col_name = magname col = "%s_%s" % (table_name, col_name) mag = "%s:%s" % (table_name, col_name) logger.log('Incorporating bias "%s" ...' % mag) res = table[table.columns[i]].values res_defined = res != -1 # get magnitudes of all # mark -99 as undefined mag_all = magvals mag_all[mag_all == -99] = numpy.nan # get magnitudes of selected mask_all = numpy.isfinite(mag_all) if maghist is None: if mag_include_radius is not None: selection = table['Separation_max'].values < mag_include_radius selection_possible = table['Separation_max'].values < mag_exclude_radius selection_weights = numpy.ones(len(selection)) else: selection = (table['dist_post'] > magauto_post_single_minvalue).values selection_weights = table['dist_post'].values selection_possible = (table['dist_post'] > 0.01).values # ignore cases where counterpart is missing assert res_defined.shape == selection.shape, (res_defined.shape, selection.shape) selection = numpy.logical_and(selection, res_defined) selection_weights = selection_weights[res_defined] selection_possible = numpy.logical_and(selection_possible, res_defined) #print ' selection', selection.sum(), selection_possible.sum(), (-selection_possible).sum() #rows = results[table_name][selection].tolist() assert selection.shape == res.shape, (selection.shape, res.shape) rows, unique_indices = numpy.unique(res[selection], return_index=True) rows_weights = selection_weights[unique_indices] assert len(rows) > 0, 'No magnitude values within radius for "%s".' % mag mag_sel = magvals[rows] mag_sel_weights = rows_weights # remove vaguely possible options from alternative histogram assert selection_possible.shape == res.shape, (selection_possible.shape, res.shape) #print(res, selection, selection_possible) rows_possible = numpy.unique(res[selection_possible]) mask_others = mask_all.copy() mask_others[rows_possible] = False # all options in the total (field+target sources) histogram mask_sel = ~numpy.logical_or(numpy.isnan(mag_sel), numpy.isinf(mag_sel)) #print ' non-nans: ', mask_sel.sum(), mask_others.sum() logger.log('magnitude histogram of column "%s": %d secure matches, %d insecure matches and %d secure non-matches of %d total entries (%d valid)' % (col, mask_sel.sum(), len(rows_possible), mask_others.sum(), len(mag_all), mask_all.sum())) # make function fitting to ratio shape bins, hist_sel, hist_all = magnitudeweights.adaptive_histograms(mag_all[mask_others], mag_sel[mask_sel], weights=mag_sel_weights[mask_sel]) if store_mag_hists: logger.log('magnitude histogram stored to "%s".' % (mag.replace(':', '_') + '_fit.txt')) with open(mag.replace(':', '_') + '_fit.txt', 'wb') as f: f.write(b'# lo hi selected others\n') numpy.savetxt(f, numpy.transpose([bins[:-1], bins[1:], hist_sel, hist_all]), fmt = ["%10.5f"] * 4) if mask_sel.sum() < 100: raise UndersampledException('ERROR: too few secure matches (%d) to make a good histogram. If you are sure you want to use this poorly sampled histogram, replace "auto" with the filename. You can also decrease the mag-auto-minprob parameter.' % mask_sel.sum()) else: logger.log('magnitude histogramming: using user-supplied histogram for "%s"' % (col)) bins_lo, bins_hi, hist_sel, hist_all = maghist bins = numpy.array(list(bins_lo) + [bins_hi[-1]]) func = magnitudeweights.fitfunc_histogram(bins, hist_sel, hist_all) if store_mag_hists: magnitudeweights.plot_fit(bins, hist_sel, hist_all, func, mag) magcol = magvals[res] magcol[~numpy.logical_and(res_defined, numpy.isfinite(magcol))] = -99 weights = log10(func(magcol)) # undefined magnitudes do not contribute weights[numpy.isnan(weights)] = 0 biases[col] = weights # add the bias columns table = table.assign(**{'bias_%s' % col:10**weights for col, weights in biases.items()}) log_bf = table['dist_bayesfactor'].values total = log_bf + sum(biases.values()) return table, total def _compute_final_probabilities(match_tables, table, prob_ratio_secondary, prior, total, logger): logger.log('') logger.log('Computing final probabilities ...') # add the posterior column post = bayesdist.posterior(prior, total) table = table.assign(p_single=post) # compute weights for group posteriors # 4pi comes from Eq. ncat = table['ncat'].values log_post_weight = bayesdist.unnormalised_log_posterior(prior, total, ncat) table = table.assign(log_post_weight=log_post_weight) # flagging of solutions. Go through groups by primary id (IDs in first catalogue) table = table.assign( match_flag = numpy.zeros(len(table), dtype=int), prob_has_match = numpy.zeros(len(table)), prob_this_match = numpy.zeros(len(table)), ) logger.log(' grouping by primary catalogue ID and flagging ...') def compute_group_statistics(group): # group # compute no-match probability values = group['log_post_weight'].values #values = grpvalues.values offset = values.max() bfsum = log10((10**(values - offset)).sum()) + offset if len(values) > 1: offset = values[1:].max() bfsum1 = log10((10**(values[1:] - offset)).sum()) + offset else: bfsum1 = 0 assert group['ncat'].values[0] == 1, group['ncat'] #print(group['ncat'].values[0]) # for p_any, find the one without counterparts p_none = values[0] p_any = 1 - 10**(p_none - bfsum) # this avoids overflows in the no-counterpart solution, # which we want to set to 0 values[0] = bfsum1 p_i = 10**(values - bfsum1) p_i[0] = 0 best_val = p_i.max() # flag best & second best # ignore very poor solutions match_flag = numpy.where(best_val == p_i, 1, numpy.where(p_i > prob_ratio_secondary * best_val, 2, 0)) group['prob_has_match'] = p_any group['prob_this_match'] = p_i group['match_flag'] = match_flag return group table = table.groupby(table.columns[0], sort=False).apply(compute_group_statistics) del table['log_post_weight'] return table def _truncate_table(table, min_prob, logger): # cut away poor posteriors if requested if min_prob > 0: mask = ~(table['prob_this_match'] < min_prob) logger.log(' cutting away %d (below p_i minimum)' % (len(mask) - mask.sum())) table = table[mask] return table