srg/srglib/correlate_utils.py
2024-04-26 12:43:00 +03:00

537 lines
22 KiB
Python

import pandas
from scipy.spatial import cKDTree
import numpy as np
import pickle
from astropy.io import fits
from astropy.table import Table
from math import sin, cos, pi, sqrt, log
from scipy.sparse import coo_matrix
from scipy.stats import norm as normdist
import healpy
import sys, os
"""
a note on who rosat-erosita scaling was obtained
significant amount of rosat sources in rxs2 catalog have accumulated count below 10 photons,
therefore poisson statisitcs has to be applied for fitting.
first 5000 erosita sources were crossmatched with 2rxs, 1365 counterparts were found
along which 744 were one-to-one counterparts
we cat that to 596 sources, by excluding 0-0.1 and 0.9-1.0 quantiles of rxs2.flux/erosita.flux distribution
(we assume that those quantiles contain significant fraction of transients and small fraction of statistical outliers.
for the rest of the sources we minimized lkl function L = -ln \prod_i \int N(x, eflux_i, esigma_i)*Poisson(rasscounts_i, x*rassexposure_i) dx
thus coefficient erosita.flux = 5.73*rxs2.countrate was obtained
"""
FSCALES = {"3XMM": 1.24e+12, "2rxs": 5.74, "sxps2": 16.73, "csc2": 2.3e3}
CFLUX = {"3XMM": lambda x: (x.EP_1_FLUX + x.EP_2_FLUX + x.EP_3_FLUX).values, "2rxs": lambda x: x.RATE.values, "sxps2": lambda x: (x.Rate_band0 + x.Rate_band1 + x.Rate_band2).values}
EFLUX = {"3XMM": lambda x: np.sqrt(x.EP_1_FLUX_ERR**2. + x.EP_2_FLUX_ERR**2. + x.EP_3_FLUX_ERR**2.).values, "2rxs": lambda x: x.ERATE.values, "sxps2": lambda x: (x.Rate_band0_pos + x.Rate_band1_pos + x.Rate_band2_pos).values}
CATALOGUES = {"3XMM": "xmm3_2.pickle", "2rxs": "rxs2table.pickle", "sxps2": "sxps2.pickle", }
CATCOLORS = {"3XMM": "r", "2rxs": "g", "sxps2": "b", "csc2": "m"}
RASSEMAP = None
def load_rassemap():
global RASSEMAP
if RASSEMAP is None:
RASSEMAP = healpy.read_map("/data/rosat/RASS_EXP_1_2048.fits")
def vectopol(cat1):
"""
from the catalouge in the form of pandas table, which has RA and DEC columns with values in degrees
produces 1 length vectors in cartesin space
Parameters:
catalogue with RA and DEC columns in degrees
Returns:
vector of shape Nx3 where N - length of the catalogue, each column contains X, Y and Z cartesian coordinates
"""
vec = np.empty((cat1.index.size, 3), np.double)
vec[:, 0] = np.cos(cat1.dec.values*pi/180.)*np.sin(cat1.ra.values*pi/180.)
vec[:, 1] = np.cos(cat1.dec.values*pi/180.)*np.cos(cat1.ra.values*pi/180.)
vec[:, 2] = np.sin(cat1.dec.values*pi/180.)
return vec
def prepare_ecat(ecat):
ecat["error_radius"] = ecat.RADEC_ERR
ecat["fllux"] = ecat.ML_FLUX_0
return makehealpixindex(ecat)
def pairwise_separation(cat1, cat2):
"""
with two pandas catalogues provided, produces one to one distances in arcseconds based ont the RA, DEC coorindates of each entry
"""
vec1 = vectopol(cat1)
vec2 = vectopol(cat2)
return np.arccos(np.minimum(np.sum(vec1*vec2, axis=1), 1))*180.*3600./pi
def makedistmat3d(cat1, cat2, angularseparation=10.):
"""
produces distance matrix between two catalogues, each must have ra and dec columns
Parameters:
cat1 and cat2: two catalogues in the form of pandas table, which have RA and DEC columns in degrees
angularseparation: maximum accepted angular separation between objects in catalogues
Returns:
distance sparce matrix of the separation lower then angularseparation between two input catalgoues
"""
vec1 = vectopol(cat1)
vec2 = vectopol(cat2)
dd = 2.*sin(angularseparation*pi/180./3600./2.)
c1 = cKDTree(vec1)
c2 = cKDTree(vec2)
dmat = c1.sparse_distance_matrix(c2, dd).tocoo()
dmat.data = 2.*np.arcsin(dmat.data/2.)*180./pi*3600. #distance in arcseconds
sidx = np.argsort(dmat.row)
return coo_matrix((dmat.data[sidx], (dmat.row[sidx], dmat.col[sidx])), dmat.shape)
def get_closest_unique(cat1, cat2, angularseparation=5):
dmat = makedistmat3d(cat1, cat2, angularseparation)
idxd = np.argsort(dmat.data)
row = np.copy(dmat.row[idxd])
col = np.copy(dmat.col[idxd])
urow, uidx = np.unique(row, return_index=True)
ucol, uuidx, ucts = np.unique(col[uidx], return_counts=True, return_index=True)
masku = np.zeros(urow.size, np.bool)
masku[uuidx] = True
urow, ucol = urow[masku], col[uidx][masku] #one to one closest neighbours
return urow, ucol
def select_closest_pairs(dist, row, col):
sidx = np.argsort(dist)
dist = dist[sidx]
row = row[sidx]
col = col[sidx]
ur, uc = [], []
while True:
urow, uidx = np.unique(row, return_index=True)
ucol, uuidx, ucts = np.unique(col[uidx], return_counts=True, return_index=True)
masku = np.zeros(urow.size, np.bool)
masku[uuidx] = True
urow, ucol = urow[masku], col[uidx][masku] #one to one closest neighbours
ur.append(urow)
uc.append(ucol)
maskrow = np.isin(row, urow, invert=True)
maskcol = np.isin(col, ucol, invert=True)
mask = maskrow & maskcol
if not np.any(mask):
break
row, col, dist = row[mask], col[mask], dist[mask]
return np.concatenate(ur).astype(np.int), np.concatenate(uc).astype(np.int)
def get_all_closest_unique(cat1, cat2, asep):
dmat = makedistmat3d(cat1.iloc[idx1], cat2.iloc[idx2], asep)
return select_closest_pairs(dmat.data, dmat.row, dmat.col)
def get_closest(scat, ccat):
ccat.error_radius.fillna(0., inplace=True)
dmat = makedistmat3d(scat, ccat, 1200)
sidx = np.argsort(dmat.data)
urow, uidx = np.unique(dmat.row[sidx], return_index=True)
ucol = dmat.col[sidx][uidx]
return urow, ucol
def cutfaraway(cat1, cat2, dmat, minrad=30, exclrad=0.):
"""
given distance matrix and two catalogues, cut all the elements of matrixes wich are
greater then triple common distance error for corresponding catalogues elements
catalogues should have error_radius column in it for this function to work
"""
mask = (dmat.data < np.maximum(np.sqrt(cat1.iloc[dmat.row].error_radius.values**2. + cat2.iloc[dmat.col].error_radius.values**2.)*3, minrad)) & (dmat.data > exclrad)
dmat = coo_matrix((dmat.data[mask], (dmat.row[mask], dmat.col[mask])), dmat.shape)
return dmat
def makeposerrdist(cat1, cat2, maxdist=None, minrad=30, **kwargs):
"""
assuming a 2d gaussian probability distribution of the true sourcce position relative to catalogue positions
computes probaility of a common
"""
if maxdist is None:
maxdist = max(min(sqrt(cat1.error_radius.dropna().values.max()**2. + cat2.error_radius.dropna().values.max()**2.)*3, 1200), minrad) #search radius not more then 20'
print("used maxdist", maxdist)
dmat = makedistmat3d(cat1, cat2, maxdist)
return cutfaraway(cat1, cat2, dmat, minrad, **kwargs)
def dmattodistprobmat(cat1, cat2, dmat):
pmat = coo_matrix((normdist.pdf(dmat.data/np.sqrt(cat1.iloc[dmat.row].error_radius.values**2. + cat2.iloc[dmat.col].error_radius.values**2.)), (dmat.row, dmat.col)), dmat.shape)
return pmat
def makehealpixindex(cat, nside=1024, overwrite=False):
"""
produces healpix index for catalogue
parameteres:
cat [mandatory]: a catalogue containing RA and DEC columns in degrees
nside : nside for healpix
overwrite[True]: overwrite healpix indexation if exists
"""
if not overwrite and isinstance(cat.index, pandas.MultiIndex) and "healpix" in cat.index.names:
print("already has healpix indexing")
return cat
cat.reset_index(drop=True, inplace=True)
hpix = healpy.ang2pix(nside, cat.RA.values, cat.DEC.values, lonlat=True)
cat.set_index(hpix, inplace=True)
cat.sort_index(inplace=True)
hpix = np.copy(cat.index.values)
cat.set_index(pandas.MultiIndex.from_arrays(
[hpix, np.arange(cat.index.size)],
sortorder=0, names=["healpix", "Num"]), inplace=True)
cat.sort_index(inplace=True)
return cat
def merge_coo_matrixes(dmatlist):
"""
assuming that the are several fraction of sparse matrix produces common sparse distance matrix
"""
rows = np.concatenate([d.row for d in dmatlist])
cols = np.concatenate([d.col for d in dmatlist])
data = np.concatenate([d.data for d in dmatlist])
return coo_matrix((data, (rows, cols)), shape=np.max([d.shape for d in dmatlist], axis=0))
def distwithhealpix(cat1, cat2, nside=1024, maxarea=20, maxcsize=100000, **kwargs):
"""
find counterparts of cat1 relative to cat2 using healpixes
"""
print("kwargs", kwargs)
hpix = np.unique(cat1.index.get_level_values("healpix").values)
hpix2 = np.unique(cat2.index.get_level_values("healpix").values)
invert = False
if hpix2.size < hpix.size:
cat1, cat2 = cat2, cat1
invert = True
hpix = hpix2
print("run distwithhealpix, unique pix size in first catalogue", hpix.size)
catarea = hpix.size*healpy.nside2pixarea(nside)
print("catalogue surface area", catarea)
if catarea > maxarea or hpix.size > maxcsize:
print("catalogues area is gt then maximal allowed")
chank = int(max(catarea//maxarea + 1, hpix.size//maxcsize + 1))
print("spit on %d parts" % chank)
step = (hpix.size + (chank - 1))//chank
dmats = []
for i in range(chank):
#print("start %d chank, from %d pices" % (i, chank))
dmats.append(distwithhealpix(cat1.loc[hpix[i:min(i + step, hpix.size)]], cat2, nside, maxarea + 1, **kwargs))
return merge_coo_matrixes(dmats)
hpix = np.unique(np.concatenate([hpix, healpy.get_all_neighbours(256, hpix).ravel()]))
catsm = cat2.loc[hpix]
dmat = makeposerrdist(cat1, catsm, **kwargs)
rows = cat1.index.get_level_values("Num").values[dmat.row]
cols = catsm.index.get_level_values("Num").values[dmat.col]
if invert:
rows, cols = cols, rows
sidx = np.argsort(rows)
dmat = coo_matrix((dmat.data[sidx], (rows[sidx], cols[sidx])), shape = (rows.max() + 1, cols.max() + 1))
return dmat
def get_brightest_counterpart(cat1, cat2, dmat, axis=0):
if axis == 0:
fluxcat2 = coo_matrix((cat2.flux.values[dmat.col], (dmat.row, dmat.col)), dmat.shape)
urow, idx1 = np.unique(dmat.row, return_index=True)
fmcol = np.asarray(np.argmax(fluxcat2, axis=1)[urow]).reshape(-1)
return urow, fmcol
if axis == 1:
fluxcat1 = coo_matrix((cat1.flux.values[dmat.row], (dmat.row, dmat.col)), dmat.shape)
ucol = np.unique(dmat.col)
fmrow = np.asarray(np.argmax(fluxcat1, axis=0)[:, ucol]).reshape(-1)
return ucol, fmrow
def get_brighter_counterpart(cat1, cat2, dmat):
fluxcat1 = coo_matrix((cat1.flux.values[dmat.row], (dmat.row, dmat.col)), dmat.shape)
fluxcat2 = coo_matrix((cat2.flux.values[dmat.col], (dmat.row, dmat.col)), dmat.shape)
urow = np.unique(dmat.row)
fmcol = np.unique(np.asarray(np.argmax(fluxcat1, axis=1)[urow]).reshape(-1))
fmrow = np.asarray(np.argmax(fluxcat2, axis=0)[:, fmcol]).reshape(-1)
return fmrow, fmcol
def get_closest_counterpart(dmat, axis=0):
dmati = coo_matrix((1./np.maximum(dmat.data, 1e-9), (dmat.row, dmat.col)), dmat.shape)
if axis==0:
fmrow = np.unique(dmat.row)
fmcol = np.asarray(np.argmax(dmati, axis=1)[fmrow]).reshape(-1)
if axis==1:
fmcol = np.unique(dmat.col)
fmrow = np.asarray(np.argmax(dmati, axis=0)[:, fmcol]).reshape(-1)
return fmrow, fmcol
def coo_mat_argmax(coomat, axis=0):
if axis==0:
fmrow = np.unique(coomat.row)
fmcol = np.asarray(np.argmax(coomat, axis=1)[fmrow]).reshape(-1)
return fmcol
if axis==1:
fmcol = np.unique(coomat.col)
fmrow = np.asarray(np.argmax(coomat, axis=0)[:, fmcol]).reshape(-1)
return fmrow
def argmax_plane_sorted_row_sparse_mat(smat):
"""
for a sparse matrix filled with some vlaues
produces 1d array of indexes cooresponding to the data structure stored in smat, which provides
argmax for each column of matrix
"""
urow, uiidx, ucts = np.unique(smat.row, return_counts=True, return_inverse=True)
ii = np.arange(uiidx.size)
uicts = np.empty(uiidx.size, np.int)
uicts[:] = ucts[uiidx]
ipos = np.empty(urow.size, np.int)
for i in range(1, ucts.max() + 1):
mask = ucts == i
if not np.any(mask):
continue
maskall = uicts == i
ms = mask.sum()
ipos[mask] = ii[maskall].reshape((-1, i))[np.arange(ms), smat.data[maskall].reshape((-1, i)).argmax(axis=1)]
return ipos
def get_all_neighbours_fluxes(fluxmat, axis=0):
ucol, uidx = np.return_index(fluxmat.row, return_counts=True)
sidx = np.argsort(fluxmat.row)
def onetoonedistance(cat1, cat2):
vec1 = vectopol(cat1)
vec2 = vectopol(cat2)
return np.arccos(np.sum(vec1*vec2, axis=1))*180./pi*3600.
class CrossCorr(object):
def __init__(self, cat1, cat2):
self.cat1 = cat1
self.cat2 = cat2
self.dmat = distwithhealpix(cat1, cat2)
self.flux2 = self.cat2.flux.values[self.dmat.col]
self.flux2err = self.cat2.fluxerr.values[self.dmat.col]
self.flux1 = self.cat1.flux.values[self.dmat.row]
self.pval = normdist.pdf(dmat.data/np.sqrt(self.cat1.iloc[dmat.row].error_radius.values**2. + \
self.cat2.iloc[dmat.col].error_radius.values**2.))
self.urow, self.idx, self.iidx, self.ucts = np.unique(self.dmat.row,
return_index=True, return_inverse=True, return_counts=True)
self.pidx = np.empty(self.ucts.size + 1, np.int)
self.pidx[1:] = np.cumsum(self.ucts)
self.pidx[0] = 0
self.eidx = np.lib.stride_tricks.as_strided(self.pidx,
(self.pidx.size - 1, 2),
self.pidx.strides*2)
self.mflux = np.empty(self.urow.size, np.double)
self.maxflux = np.zeros(self.urow.size, np.double)
np.maximum.at(self.maxflux, self.iidx, self.flux2)
self.minflux = np.full(self.urow.size, np.inf, np.double)
np.minimum.at(self.minflux, self.iidx, self.flux2)
self.medflux = np.array([np.median(self.flux2[s:e]) for s, e in self.eidx])
self.uflux = self.cat1["flux"].iloc[self.urow] #flux1.data[self.idx]
self.newmask = np.isin(self.cat1.index.get_level_values("Num").values, self.urow, assume_unique=True, invert=True)
self.hascandidates = np.logical_not(self.newmask)
self.new = self.cat1[self.newmask]
self.newidx = self.new.index.get_level_values("Num").values
self.analyse_fluxes("")
self.bestdmat = get_closest_counterpart(self.dmat)[1]
self.bestpmat = coo_mat_argmax(self.pmat)
self.bestdist = pairwise_separation(self.cat1.iloc[self.urow], self.cat2.iloc[self.bestdmat])
self.bestdistprob = normdist.pdf(self.bestdist/np.sqrt(cat1.iloc[self.urow].error_radius.values**2. + cat2.iloc[self.bestdmat].error_radius.values**2.))
self.bestdflux = self.cat2.iloc[self.bestdmat].flux.values
def analyse_fluxes(self, IndexSlice):
for i in range(2, self.ucts.max() + 1):
mask = self.ucts == i
fl = np.array([self.flux2.data[self.pidx[mask] + j] for j in range(i)])
flmax = np.max(fl, axis=0)
flmin = np.min(fl, axis=0)
flmed = np.median(fl, axis=0)
self.mflux[mask] = flmed
self.maxflux[mask] = flmax
self.minflux[mask] = flmin
#for k, uidx in enumerate(self.urow[mask]):
# print(uidx, flmax[k], flmin[k], flmed[k], i)
def get_all_counterparts(self, idx):
idx = np.sort(np.asarray(idx))
idxmask = self.hascandidates[idx]
urmask = np.isin(self.dmat.row, idx)
catidx = pandas.DataFrame.copy(self.cat2.iloc[self.dmat.col[urmask]])
catidx["counterpart"] = self.dmat.row[urmask]
return catidx
def get_flux_outliers_mask(self, fscale):
#print("uflux size", self.uflux.size)
mask = np.logical_or(self.uflux > self.maxflux*fscale*10., self.uflux < self.minflux*fscale/10.)
maskout = np.zeros(self.cat1.index.size, np.bool)
maskout[self.hascandidates] = mask
return maskout
#RASSEMAP = healpy.read_map("RASS_EXP_1_2048.fits")
#rets, rcval = pickle.load(open("rxsmediandetection.pickle", "rb"))
#rcval = np.array(rcval)
def rxs_detection_limit(exposure):
"""
returns amount of counts for the source to be detected depending on exposure
"""
return 4.*(1. + (exposure/425)**4.)**(0.6/4.)
def check_rss_cts(ecat):
expp = RASSEMAP[healpy.ang2pix(2048, ecat.RA.values, ecat.DEC.values, lonlat=True)]
idx = np.searchsorted(rets, expp)
alert = ecat.flux.values > rxs_detection_limit(expp)**FSCALES["2rxs"]*10./expp
return ecat.index.get_level_values("Num").values[alert]
def check_rss_cts_individual(esrc):
expp = RASSEMAP[healpy.ang2pix(2048, esrc.lii, esrc.bii, lonlat=True)]
alert = False if expp == 0 else esrc.ML_FLUX_0 > rxs_detection_limit(expp)*FSCALES["2rxs"]*10/expp
return alert
#========================================================================================
#to use with django HeasarcBase models
def check_rass_transient(erositasource, rasssources):
rassfluxes = np.array([rass.count_rate for rass in rasssources])
if np.any(erositasource.ML_FLUX_0 < rassfluxes*FSCALES["2rxs"]/10.) or \
np.any(erositasource.ML_FLUX_0 > rassfluxes*FSCALES["2rxs"]*10.):
erositasource.rosat_transient = True
def check_sxps_transients(erositasource, sxpscounterparts, transientstatus):
flux = np.array([s.PeakRate_band1 + s.PeakRate_band2 for s in sxpscounterparts])
transient = np.all(flux*10*FSCALES["sxps2"] < erositasource.ML_FLUX_0) or np.all(flux*0.1*FSCALES["sxps2"] > erositasource.ML_FLUX_0)
return transientstatus & transient
def check_xmm3_transients(erositasource, xmmcounterparts, transientstatus):
flux = np.array([s.ep_1_flux + s.ep_3_flux + s.ep_2_flux for s in xmmcounterparts])
transient = np.all(flux*10*FSCALES["3XMM"] < erositasource.ML_FLUX_0)
return transientstatus & transient
#======================================================================================
def run_all(ecat):
newsources = np.ones(ecat.index.size, np.bool)
transients = None #no information which of new sources are transients yet
transient_catalogues = {}
crosscat = {}
crclist = {}
rosat_outliers = {}
#take a look 2rxs is last one run
for catalog in ["3XMM", "sxps2", "2rxs"]:
cattable = pickle.load(open(CATALOGUES[catalog], "rb"))
crc = CrossCorr(ecat, cattable)
"""
if catalog == "sxps2":
midx = dmat.col[np.argsort(dmat.row)]
mflx = (cattable.iloc[midx].PeakRate_band0.values + cattable.iloc[midx].PeakRate_band1.values)/1.9
crc.maxflux[crc.dmat.row] = (cattable.iloc[crc.dmat.col].PeakRate_band0.values + cattable.iloc[crc.dmat.col].PeakRate_band1.values)/1.9
"""
crclist[catalog] = crc
outliers = crc.get_flux_outliers_mask(FSCALES[catalog])
transient_catalogues[catalog] = crc.get_all_counterparts(ecat.index.get_level_values("Num")[outliers])
if transients is None:
"""first candidates to be transients"""
transients = outliers
print("outliers indices", ecat.index.get_level_values("Num").values[outliers])
print("%s catalog outliers" % catalog, outliers.sum())
print("%s catalog newsources" % catalog, crc.newmask.sum())
print("transient and new", np.sum(transients & crc.newmask))
print("transient and outlier", np.sum(transients & outliers))
transients = np.logical_or(transients & crc.newmask, transients & outliers)
newsources = np.logical_and(newsources, crc.newmask)
print("transients:", transients.sum())
nonrxs2_outliers = check_rss_cts(ecat[crc.newmask])
haslcs = np.logical_not(newsources[nonrxs2_outliers])
transients = transients & outliers
transients[nonrxs2_outliers] = True
#transients = transients & nonrxs2_outliers
return transient_catalogues, transients, newsources, nonrxs2_outliers, crclist
def make_report(ecat, transients):
ntrans = transients.sum()
ndetect = newsources.size - newsources.sum() - transients.sum()
nnew = newsources.sum()
report = \
f"""
total sources in erosita catalogue {ecat.index.size},
number of detections: {ndetect},
new sources: {nnew},
"""
if __name__ == "__main__":
"""
idea to implement heare:
generally speaking we want to find potnential transients in the daily produced by erosita source catalogue
therefore in main I expected to find counterparts in several X-ray catalogues to the input erosita source catalogues
"""
#fname = sys.argv[1]
#data = pickle.load(open("eruds2.pickle", "rb")) #fits.getdata(fname)
#data = pickle.load(open("/srg/work/medvedev/for_andrey/catalog_10000000101.pkl", "rb"))
print("\n\n\n\nstart %s analysis" % sys.argv[1])
data = pickle.load(open(sys.argv[1], "rb"))
try:
ecat = pandas.DataFrame(data)
ecat[:10]
except ValueError:
dswp = data.newbyteorder("l")
dswp.byteswap(inplace=True)
data = dswp
ecat = pandas.DataFrame(data)
ecat["error_radius"] = ecat.RADEC_ERR
ecat["flux"] = ecat.ML_FLUX_0
#ecat.DEC += 0.001
ecat = makehealpixindex(ecat)
transient_catalogues, transients, newsources, nonrxs2_outliers, crclist = run_all(ecat)
ecat_transients = ecat[transients]
print("overall statistics, number of sources in catalogue", ecat.index.size, "newsources:", newsources.sum(), "transients:", transients.sum())
#for row in ecat.transients:
# print(row[["RA", "DEC", "flux", "DET_LIKE_0"]])
#for transient_catalogues
#rssoutl = check_rss_cts(ecat)
#rssoutl = rssout[newsources[rssoutl]
if transients.sum():
erastranients = ecat[transients]
erastranients["Num"] = erastranients.index.get_level_values("Num").values
hdulist = [fits.PrimaryHDU(), fits.BinTableHDU(Table.from_pandas(erastranients), name="SRGE"),]
for catalogue in transient_catalogues:
cat = transient_catalogues[catalogue]
tcat = cat[transients[cat.counterpart.values]]
if tcat.index.size > 0:
hdulist.append(fits.BinTableHDU(Table.from_pandas(tcat), name=catalogue))
hdulist[-1].header.update({"CATALOG": catalogue})
obsid = os.path.basename(sys.argv[1])
fits.HDUList(hdulist).writeto("transients_%s.fits" % obsid[8:-4], overwrite=True)