Source code for mt


import healpy as hp
import numpy as np
import matplotlib.pyplot as plt
from scipy.special import gamma as gafun
from scipy.stats import norm
import scipy.integrate as integrate
# import os
# import warnings
# import subprocess
import pandas as pd
# import numpy.ma as ma

 
[docs]def f_fromks(k1,k2): '''Get the theoretical maxima distribution f, from the values of k_1, k_2. For more information, see [1]_ or [2]_ (in the latter, k_1 corresponds to (\kappa_j)^2 and k_2 to (\eta_j)^2). Parameters ---------- k1 : float The parameter k_1. k2 : float The parameter k_2. Returns ------- function Theoretical distribution of the maxima, f. .. [1] Cheng, D., Cammarota, V., Fantaye, Y., Marinucci, D., & Schwartzman, A. (2016). Multiple testing of local maxima for detection of peaks on the (celestial) sphere. *Bernoulli, in press*, arXiv preprint arXiv:1602.08296. .. [2] Carron Duque, J., Buzzelli, A., Fantaye, Y., Marinucci, D., Schwartzman, A., & Vittorio, N. (2019). Point Source Detection and False Discovery Rate Control on CMB Maps. https://doi.org/10.1016/j.ascom.2019.100310 . ''' def f(x): fx=((2.*np.sqrt(3.+k1))/(2.+k1*np.sqrt(3.+k1))* ((k1+k2*(x**2.-1.))*norm.pdf(x)*norm.cdf((x*np.sqrt(k2))/(np.sqrt(2.+k1-k2))) + np.sqrt(k2*(2.+k1-k2))/(2.*np.pi)*x*np.exp((-(2.+k1)*x**2.)/(2.*(2.+k1-k2))) + np.sqrt(2./(np.pi*(3.+k1-k2)))*np.exp((-(3.+k1)*x**2.)/(2.*(3.+k1-k2)))* norm.cdf((np.sqrt(k2)*x)/np.sqrt((2.+k1-k2)*(3.+k1-k2))) )) return(fx) return(f)
[docs]def f_fromcl(cls): '''Get the theoretical maxima distribution f, from the angular power spectra C_l of a map. For more information, see [1]_ or [2]_ (in the latter, k_1 corresponds to (\kappa_j)^2 and k_2 to (\eta_j)^2). Parameters ---------- cls : np.ndarray Angular power spectrum of the map. Returns ------- function Theoretical distribution of the maxima, f. .. [1] Cheng, D., Cammarota, V., Fantaye, Y., Marinucci, D., & Schwartzman, A. (2016). Multiple testing of local maxima for detection of peaks on the (celestial) sphere. *Bernoulli, in press*, arXiv preprint arXiv:1602.08296. .. [2] Carron Duque, J., Buzzelli, A., Fantaye, Y., Marinucci, D., Schwartzman, A., & Vittorio, N. (2019). Point Source Detection and False Discovery Rate Control on CMB Maps. https://doi.org/10.1016/j.ascom.2019.100310 . ''' ls=np.arange(len(cls)) c1=np.sum((2.*ls+1.)*ls*(ls+1.)/(8.*np.pi)*cls) c2=np.sum((2.*ls+1.)*ls*(ls+1.)*(ls-1.)*(ls+2.)/(32.*np.pi)*cls) k1=c1/c2 k2=c1**2./c2 f=f_fromks(k1,k2) return(f)
def _getcs(gamma,n,p=1.): '''Returns c_{p,2n}(gamma) ''' return(2.**(gamma/2. - 2. - n - 2.*p) * gafun(1.- gamma/2. + n +2.*p))
[docs]def f_fromSW(j,B,gamma=2.5,p=1): '''Get the theoretical maxima distribution for a Sachs-Wolfe-like spectra filtered with a Mexican needlet. For more information, see [1]_. Parameters ---------- j : float Frequency ``j`` of the Mexican needlet B : float Parameter ``B`` of the Mexican needlet. gamma : float, optional Parameter ``gamma`` for the Sach-Wolfe spectra. In the CMB, 2 < gamma < 3. p : int, optional Order of the Mexican needlet Returns ------- function Theoretical distribution of the maxima, f. .. [1] Cheng, D., Cammarota, V., Fantaye, Y., Marinucci, D., & Schwartzman, A. (2016). Multiple testing of local maxima for detection of peaks on the (celestial) sphere. *Bernoulli, in press*, arXiv preprint arXiv:1602.08296. ''' cp0=_getcs(gamma,0.,p) cp2=_getcs(gamma,1.,p) cp4=_getcs(gamma,2.,p) k1=4.*(cp2/cp4)*(B**(-2.*j)) k2=2.*(cp2)**2./(cp0*cp4) f=f_fromks(k1,k2) return(f)
def _pvalue(x,f,**kwargs): '''Not vectorised version, accept only one x''' return(integrate.quad(f,x,np.inf,**kwargs))
[docs]def pvalues(xvec,f,returnerror=False,**kwargs): '''Calculate the p-values for a certain maxima distribution f, of diferent values of the intensity. Parameters ---------- xvec : np.ndarray Array with the intensities where the p-values will be calculated. f : function Theoretical distribution of the maxima, f. returnerror : bool, optional If ``True``, the output will contain both the p-value and the error of the integration. **kwargs : dict Additional arguments will be passed to np.integrate.quad for the calculation of the p-value. Returns ------- np.ndarray Array containing the p-values calculated. If ``returnerror`` is ``True``, it will also containg the error of the integration. ''' pvals=np.vectorize(_pvalue) if returnerror == True: return(pvals(xvec,f,**kwargs)) else: return(pvals(xvec,f,**kwargs)[0])
def _max_getpvalue_exact(vec_max,f,**kwargs): '''Calculate the exact p-values for the given intensities.''' return(pvalues(vec_max,f,**kwargs)) def _max_getpvalue_approx(vec_max,f,step=0.05,**kwargs): '''Calculate the approximated p-values for the given intensities. It interpolates from exact values calculated at intervals of ``step``.''' xvec=np.arange(vec_max.min()-step,vec_max.max()+step,step) f_inxvec=pvalues(xvec,f,**kwargs) approx_pvalues=np.interp(vec_max,xvec,f_inxvec) return(approx_pvalues)
[docs]def max_getpvalue(maxima,f,n_exact=1000,step=0.05,correct=True,**kwargs): '''Get the p-values for the maxima for a given expected distribution. Parameters ---------- maxima : pd.DataFrame Pandas DataFrame with the information of the maxima. Has to contain at least the column 'Intensity'. f : function Theoretical distribution of the maxima, f. n_exact : int, optional Number of maxima where the p-value is computed directly. After the ``n_exact`` most intense values, the p-values are computed in an array with step ``step`` and then interpolated. This introduces a small error in the computation but greatly speeds up the procedure. step : float, optional Step of the array where the p-values are exactly computed after the first ``n_exact`` maxima. correct : bool, optional If ``True``, check that the pvalues are decreasing with intensity and, if it is not the case, change the value accordingly. Returns ------- pd.DataFrame The same Pandas DataFrame as the input ``maxima``, but with an additional column ``pvalues``. ''' maxima=maxima.sort_values(by='Intensity',ascending=False) max_exact=maxima.iloc[:n_exact] max_approx=maxima.iloc[n_exact:] pval_exact=_max_getpvalue_exact(max_exact['Intensity'],f,**kwargs) pval_approx=_max_getpvalue_approx(max_approx['Intensity'],f,step=step,**kwargs) pvals=np.concatenate((pval_exact,pval_approx)) maxima['pvalue']=pvals if correct: for icorrect in np.argwhere(maxima['pvalue'].diff()<0.)[:,0]: #correct the pvalues maxima['pvalue'].iloc[icorrect]=maxima['pvalue'].iloc[icorrect-1] return(maxima)
[docs]def benjamini_hochberg(maxima,alpha,plot=False): '''Select a subset of the maxima to be candidates to Point Source. It applies the procedure in multiple testing called Benjamini-Hochberg procedure. Parameters ---------- maxima : pd.DataFrame Pandas DataFrame with the information of the maxima. Has to contain at least the column 'pvalue'. alpha : float Parameter alpha of the Benjamini-Hochberg procedure. Should be between 0 and 1. plot : bool, optional If ``True``, the function prints the expected number of sources for the intensities of the maxima versus the actual number of maxima at that intensity, along with the threshold (expected*alpha). Two plots are given: one with all the detections plus 5; and one with all the maxima. Returns ------- pd.DataFrame The same Pandas DataFrame as the input ``maxima``, but only with the candidates to be point sources. ''' if 'pvalue' not in maxima: raise ValueError('maxima does not have a "pvalue" column, try running max_getpvalue(maxima,f) to calculate it.') pval=maxima['pvalue'] expect=pval*pval.size limit=np.arange(1,pval.size+1)*alpha if np.sum(expect<limit) == 0: detect=0 else: detect=np.argwhere(expect<limit)[-1][0]+1 print(f'{detect} points have been reported as detections (alpha={alpha})') if plot == True: fig,[ax1,ax2]=plt.subplots(1,2,figsize=(10,5)) ax1.plot(np.arange(detect+5)+1,expect[:detect+5],label='Measured') ax1.plot(np.arange(detect+5)+1,limit[:detect+5],label=f'Expected * {alpha}') lim1y=ax1.get_ylim() lim1x=ax1.get_xlim() ax1.plot(np.arange(detect+6),np.arange(detect+6),'--',color='grey',linewidth=1,label='Expected') ax1.set_ylim(lim1y) ax1.set_xlim(lim1x) ax1.legend() ax2.plot(np.arange(pval.size)+1,expect,label='Measured') ax2.plot(np.arange(pval.size)+1,limit,label=f'Expected * {alpha}') lim2y=ax2.get_ylim() lim2x=ax2.get_xlim() ax2.plot(np.arange(pval.size+1),np.arange(pval.size+1),'--',color='grey',linewidth=1,label='Expected') ax2.set_ylim(lim2y) ax2.set_xlim(lim2x) ax2.legend() plt.tight_layout() return(maxima[:detect])