Source code for nwaylib.bayesdistance

"""
Probabilistic Cross-Identification of Astronomical Sources

Reference: Budavari & Szalay (2008), ApJ, 679:301-309
Authors: Johannes Buchner (C) 2013-2020
Authors: Tamas Budavari (C) 2012
"""
from __future__ import division, print_function

import numpy
from numpy import e, log, log10, pi

# use base 10 everywhere

log_arcsec2rad = log(3600 * 180 / pi)


[docs] def log_posterior(prior, log_bf): """ Returns log10 posterior probability normalised against alternative hypothesis that the sources are unrelated (Budavari+08) """ return -log10(1 + (1 - prior) * 10**(-log_bf - log10(prior)))
[docs] def posterior(prior, log_bf): """ Returns posterior probability normalised against alternative hypothesis that the sources are unrelated (Budavari+08) """ with numpy.errstate(over='ignore'): return 1. / (1 + (1 - prior) * 10**(-log_bf - log10(prior)))
[docs] def unnormalised_log_posterior(prior, log_bf, ncat): """ Returns posterior probability (without normalisation against alternatives) """ return log_bf + log10(prior)
[docs] def log_bf2(psi, s1, s2): """ log10 of the 2-way Bayes factor, see eq.(16) psi separation s1 and s2=accuracy of coordinates """ s = s1 * s1 + s2 * s2 return (log(2) + 2 * log_arcsec2rad - log(s) - psi * psi / 2 / s) * log10(e)
[docs] def log_bf3(p12,p23,p31, s1,s2,s3): """ log10 of the 3-way Bayes factor, see eq.(17) """ ss1 = s1*s1 ss2 = s2*s2 ss3 = s3*s3 s = ss1*ss2 + ss2*ss3 + ss3*ss1 q = ss3 * p12**2 + ss1 * p23**2 + ss2 * p31**2 return (log(4) + 4 * log_arcsec2rad - log(s) - q / 2 / s) * log10(e)
[docs] def log_bf(p, s): """ log10 of the multi-way Bayes factor, see eq.(18) p: separations matrix (NxN matrix of arrays) s: errors (list of N arrays) """ n = len(s) # precision parameter w = 1/sigma^2 w = [numpy.asarray(si, dtype=float)**-2. for si in s] norm = (n - 1) * log(2) + 2 * (n - 1) * log_arcsec2rad del s wsum = numpy.sum(w, axis=0) slog = numpy.sum(log(w), axis=0) - log(wsum) q = 0 for i, wi in enumerate(w): for j, wj in enumerate(w): if i < j: q += wi * wj * p[i][j]**2 exponent = - q / 2 / wsum return (norm + slog + exponent) * log10(e)
# vectorized in the following means that many 2D matrices/vectors are going to be handled. # i.e., each entry in the matrix or vector, is a vector of numbers.
[docs] def assert_possemdef(M): """Check that the 2x2 matrix M is positive semi-definite, vectorized""" tr = M[0][0] + M[1][1] det = M[0][0] * M[1][1] - M[0][1] * M[0][1] mask = numpy.isclose(tr**2, 4 * det) if mask.all(): return # bad square root: mask_badsqrt = numpy.logical_and(~mask, tr**2 < 4 * det) assert not numpy.any(mask_badsqrt), (tr[mask_badsqrt], det[mask_badsqrt], M) ev1 = (tr[~mask] + (tr[~mask]**2 - 4 * det[~mask])**0.5) / 2 ev2 = (tr[~mask] - (tr[~mask]**2 - 4 * det[~mask])**0.5) / 2 assert (ev1 >= 0).all(), ("EV1:", ev1[~(ev1 > 0)], tr[~mask][~(ev1 > 0)], det[~mask][~(ev1 > 0)], M) assert (ev2 >= 0).all(), ("EV2:", ev2[~(ev2 > 0)], tr[~mask][~(ev2 > 0)], det[~mask][~(ev2 > 0)], M)
[docs] def matrix_add(A, B): """ 2D matrix addition, vectorized """ # assert_possemdef(A) # assert_possemdef(B) (a11, a12), (a21, a22) = A (b11, b12), (b21, b22) = B M = (a11 + b11, a12 + b12), (a21 + b21, a22 + b22) # assert_possemdef(M) return M
[docs] def matrix_multiply(A, B): """ 2D matrix multiplication, vectorized """ (a11, a12), (a21, a22) = A (b11, b12), (b21, b22) = B M = (a11*b11+a12*b21, a11*b12+a12*b22), (a21*b11+a22*b21, a21*b12+a22*b22) return M
[docs] def matrix_invert(A): """ invert a matrix A, vectorized """ (a, b), (c, d) = A F = 1.0 / (a * d - c * b) assert (F > 0).all() return (F * d, -F * b), (-F * c, F * a)
[docs] def matrix_det(A): """ 2D matrix determinant, vectorized """ (a, b), (c, d) = A return a * d - b * c
[docs] def apply_vector_right(A, b): """ multiply 2D vector with 2D matrix, vectorized """ (a11, a12), (a21, a22) = A (b1, b2) = b return a11*b1+a12*b2, a21*b1+a22*b2
[docs] def apply_vector_left(a, B): """ multiply 2D matrix with 2D vector, vectorized """ a1, a2 = a (b11, b12), (b21, b22) = B return a1*b11+a2*b21, a1*b12+a2*b22
[docs] def vector_multiply(a, b): """ multiply two vectors, vectorized """ a1, a2 = a b1, b2 = b return a1*b1+a2*b2
[docs] def vector_normalised(v): vnorm = (v[0]**2 + v[1]**2)**0.5 return ( numpy.where(vnorm == 0, 2**-0.5, v[0] / (vnorm + 1e-300)), numpy.where(vnorm == 0, 2**-0.5, v[1] / (vnorm + 1e-300)), )
[docs] def apply_vABv(v, A, B): """ compute v^T A B v, vectorized """ return vector_multiply(v, apply_vector_right(matrix_add(A, B), v))
#return vector_multiply(apply_vector_left(v, A), apply_vector_right(B, v))
[docs] def make_covmatrix(sigma_x, sigma_y, rho=0): """ create covariance matrix from given standard deviations and normalised correlation rho, vectorized """ return (sigma_x**2, rho * sigma_x * sigma_y), (rho * sigma_x * sigma_y, sigma_y**2)
[docs] def make_invcovmatrix(sigma_x, sigma_y, rho=0): """ create inverse covariance matrix from given standard deviations and normalised correlation rho, vectorized """ F = 1.0 / (sigma_x**2 * sigma_y**2 * (1 - rho**2)) return (F * sigma_y**2, F * -rho * sigma_x * sigma_y), \ (F * -rho * sigma_x * sigma_y, F * sigma_x**2)
[docs] def convert_from_ellipse(a, b, phi): """ create covariance parameters from ellipse major axis a, minor axis b and angle phi; vectorized as in Pineau+16, eq 8-10, for example. """ a2 = a**2 b2 = b**2 s = numpy.sin(phi) c = numpy.cos(phi) s2 = s**2 c2 = c**2 sigma_x = (a2 * s2 + b2 * c2)**0.5 sigma_y = (a2 * c2 + b2 * s2)**0.5 rho = c * s * (a2 - b2) / (sigma_x * sigma_y) return sigma_x, sigma_y, rho
[docs] def log_bf_elliptical(separations_ra, separations_dec, pos_errors): """ log10 of the multi-way Bayes factor, see eq.(18) separations_ra: RA separations matrix (NxN matrix of arrays) separations_dec: DEC separations matrix (NxN matrix of arrays) pos_errors: errors (list of (N,3) arrays) giving sigma_RA, sigma_DEC, rho """ error_matrices = [make_invcovmatrix(si, sj, rho) for si, sj, rho in pos_errors] circ_pos_errors = [((si**2 + sj**2) / 2)**0.5 for si, sj, rho in pos_errors] new_separations = [[None for j in range(len(error_matrices))] for i in range(len(error_matrices))] for i, Mi in enumerate(error_matrices): for j, Mj in enumerate(error_matrices): if i < j: v = (separations_ra[i][j], separations_dec[i][j]) # get separation length d2 = vector_multiply(v, v) d = d2**0.5 # get error in direction of separation vnormed = vector_normalised(v) wi = vector_multiply(apply_vector_left(vnormed, Mi), vnormed) wj = vector_multiply(apply_vector_left(vnormed, Mj), vnormed) # ratio of circular error to directional error: # combine the uncertainties by adding the variances dist_ratio = (circ_pos_errors[i]**2 + circ_pos_errors[j]**2) / (1/wi + 1/wj) # provide new separation new_separations[i][j] = d * dist_ratio**-0.5 return log_bf(new_separations, circ_pos_errors)