nuwavdet/nuwavsource.py
2022-09-09 12:04:06 +03:00

356 lines
17 KiB
Python

# %%
import numpy as np
import itertools
from pandas import DataFrame, read_csv
from astropy.table import Table
from astropy.coordinates import SkyCoord
from astropy import units as u
from multiprocessing import get_context, cpu_count
from time import perf_counter
from os import stat, makedirs
from os.path import dirname
from scipy.signal import fftconvolve, convolve2d
from astropy.io import fits
from astropy.wcs import WCS
from glob import glob
from warnings import filterwarnings
filterwarnings('ignore')
def get_link_list(folder, sort_list=True):
links = glob(f'{folder}\\**\\*_cl.evt',recursive=True)
if sort_list:
sorted_list = sorted(links, key=lambda x: stat(x).st_size)
return np.array(sorted_list)
else:
return np.array(links)
def binary_array(num):
variants = [[0,1],]*num
out = np.zeros((2**num,num),bool)
for idx, level in enumerate(itertools.product(*variants)):
out[idx] = np.array(level,dtype=bool)
return out
def create_array(filename,mode='Sky'):
temp = fits.getdata(filename,1)
if mode == 'Sky':
return np.histogram2d(temp['Y'],temp['X'],1000,[[0, 1000], [0, 1000]])[0]
if mode == 'Det':
return np.histogram2d(temp['DET1Y'],temp['DET1X'],360,[[0, 360], [0, 360]])[0]
def get_wcs(file):
header = file[1].header
wcs = WCS({
'CTYPE1': header['TCTYP38'], 'CTYPE2': header['TCTYP39'],
'CUNIT1': header['TCUNI38'], 'CUNIT2': header['TCUNI39'],
'CDELT1': header['TCDLT38'], 'CDELT2': header['TCDLT39'],
'CRPIX1': header['TCRPX38'], 'CRPIX2': header['TCRPX39'],
'CRVAL1': header['TCRVL38'], 'CRVAL2': header['TCRVL39'],
'NAXIS1': header['TLMAX38'], 'NAXIS2': header['TLMAX39']
})
return wcs
def atrous(level = 0, max_size = 1001):
base = 1/256*np.array([
[1, 4, 6, 4, 1],
[4, 16, 24, 16, 4],
[6, 24, 36, 24, 6],
[4, 16, 24, 16, 4],
[1, 4, 6, 4, 1],
])
size = 2**level * (base.shape[0]-1)+1
output = np.zeros((size,size))
output[::2**level, ::2**level] = base
if output.shape[0]>max_size:
return output[(size-1)//2-(max_size-1)//2:(size-1)//2+(max_size-1)//2+1, (size-1)//2-(max_size-1)//2:(size-1)//2+(max_size-1)//2+1]
return output
def gauss(level=0, max_size = 1000):
size = min(5*2**(level+1)+1, max_size)
sigma = 2**(level)
A = 1/(2*np.pi*sigma**2)**0.5
x = A*np.exp( (-(np.arange(size)-(size-1)//2)**2)/(2*sigma**2))
out = np.multiply.outer(x,x)
return out
def adjecent(array):
grid = np.array([
[1,1,1],
[1,0,1],
[1,1,1]
])
output = fftconvolve(array,grid,mode='same') >= 0.5
try:
output = np.logical_and(np.logical_and(output, np.logical_not(array)),np.logical_not(array.mask))
except AttributeError:
output = np.logical_and(output, np.logical_not(array))
output = np.argwhere(output == True)
return output[:,0], output[:,1]
def add_borders(array,middle=True):
mask = np.zeros(array.shape)
datax, datay = np.any(array>0,0), np.any(array>0,1)
#Add border masks
x_min, y_min = np.argmax(datax), np.argmax(datay)
x_max, y_max = len(datax) - np.argmax(datax[::-1]), len(datay) - np.argmax(datay[::-1])
mask[y_min:y_max,x_min:x_max] = True
if middle is True:
mask[176:191,:] = False
mask[:,176:191] = False
mask = np.logical_not(mask)
return mask
def fill_poisson(array, size_input=32):
if not(isinstance(array,np.ma.MaskedArray)):
print('No mask found')
return array
size = size_input
output = array.data.copy()
mask = array.mask.copy()
while mask.sum()>1:
kernel = np.ones((size,size))/size**2
coeff = fftconvolve(np.logical_not(mask),kernel,mode='same')
mean = fftconvolve(output,kernel,mode='same')
idx = np.where(np.logical_and(mask,coeff>0.1))
output[idx] = np.random.poisson(np.abs(mean[idx]/coeff[idx]))
mask[idx] = False
size *= 2
return output
def mirror(array):
size = array.shape[0]
output = np.tile(array,(3,3))
output[0:size] = np.flipud(output[0:size])
output[2*size:3*size] = np.flipud(output[2*size:3*size])
output[:,0:size] = np.fliplr(output[:,0:size])
output[:,2*size:3*size] = np.fliplr(output[:,2*size:3*size])
return output
class Observation:
def __init__(self, file_name, E_borders=[3,20]):
self.filename = file_name
self.name = file_name[file_name.find('nu'):].replace('_cl.evt','')
with fits.open(file_name) as file:
self.obs_id = file[0].header['OBS_ID']
self.ra = file[0].header['RA_NOM']
self.dec = file[0].header['DEC_NOM']
self.time_start = file[0].header['TSTART']
self.header = file[0].header
self.det = self.header['INSTRUME'][-1]
self.wcs = get_wcs(file)
self.data = np.ma.masked_array(*self.get_data(file,E_borders))
self.hard_mask = add_borders(self.data.data,middle=False)
self.exposure = self.header['EXPOSURE']
def get_coeff(self):
coeff = np.array([0.977,0.861,1.163,1.05]) if self.det=='A' else np.array([1.004,0.997,1.025,0.979])
resized_coeff = (coeff).reshape(2,2).repeat(180,0).repeat(180,1)
return resized_coeff
def get_data(self, file, E_borders=[3,20]):
PI_min, PI_max = (np.array(E_borders)-1.6)/0.04
data = file[1].data.copy()
idx_mask = (data['STATUS'].sum(1) == 0)
idx_output = np.logical_and(idx_mask,np.logical_and((data['PI']>PI_min),(data['PI']<PI_max)))
data_output = data[idx_output]
data_mask = data[np.logical_not(idx_mask)]
build_hist = lambda array: np.histogram2d(array['DET1Y'],array['DET1X'],360,[[0,360],[0,360]])[0]
output = build_hist(data_output)
mask = build_hist(data_mask)
mask = np.logical_or(mask,add_borders(output))
mask = np.logical_or(mask, self.get_bad_pix(file))
return output, mask
def get_bad_pix(self, file):
output = np.zeros((360,360))
kernel = np.ones((5,5))
for i in range(4):
current_dir = dirname(__file__)
pixpos_file = fits.getdata(f'{current_dir}\\pixpos\\nu{self.det}pixpos20100101v007.fits',i+1)
bad_pix_file = file[3+i].data.copy()
temp = np.zeros(len(pixpos_file),dtype=bool)
for x,y in zip(bad_pix_file['rawx'],bad_pix_file['rawy']):
temp = np.logical_or(temp, np.equal(pixpos_file['rawx'],x)*np.equal(pixpos_file['rawy'],y))
temp = pixpos_file[temp]
output += np.histogram2d(temp['REF_DET1Y'],temp['REF_DET1X'], 360, [[0,360],[0,360]])[0]
output = convolve2d(output, kernel, mode='same') > 0
return output
def wavdecomp(self, mode = 'gauss', thresh=False,occ_coeff = False):
#THRESHOLD
if type(thresh) is int: thresh_max, thresh_add = thresh, thresh/2
elif type(thresh) is tuple: thresh_max, thresh_add = thresh
#INIT NEEDED VARIABLES
wavelet = globals()[mode]
max_level = 8
conv_out = np.zeros((max_level+1,self.data.shape[0],self.data.shape[1]))
size = self.data.shape[0]
#PREPARE ORIGINAL DATA FOR ANALYSIS: FILL THE HOLES + MIRROR + DETECTOR CORRECTION
data = fill_poisson(self.data)
if occ_coeff: data = data*self.get_coeff()
data = mirror(data)
data_bkg = data.copy()
#ITERATIVELY CONDUCT WAVLET DECOMPOSITION
for i in range(max_level):
conv = fftconvolve(data,wavelet(i),mode='same')
temp_out = data-conv
#ERRORMAP CALCULATION
if thresh_max != 0:
sig = ((wavelet(i)**2).sum())**0.5
bkg = fftconvolve(data_bkg, wavelet(i),mode='same')
bkg[bkg<0] = 0
# err = (1+np.sqrt(bkg/sig**2 + 0.75))*sig**3
err = (1+np.sqrt(bkg+0.75))*sig
# significant = (np.abs(temp_out)> thresh_max*err)[size:2*size,size:2*size]
significant = (temp_out > thresh_max*err)[size:2*size,size:2*size]
if thresh_add != 0:
# add_significant = (np.abs(temp_out)> thresh_add*err)[size:2*size,size:2*size]
add_significant = (temp_out > thresh_add*err)[size:2*size,size:2*size]
adj = adjecent(significant)
add_condition = np.logical_and(add_significant[adj[0],adj[1]],np.logical_not(significant[adj[0],adj[1]]))
while (add_condition).any():
to_add = adj[0][add_condition], adj[1][add_condition]
significant[to_add[0],to_add[1]] = True
adj = adjecent(significant)
add_condition = np.logical_and(add_significant[adj[0],adj[1]],np.logical_not(significant[adj[0],adj[1]]))
# add_condition = np.logical_and(np.abs(temp_out)[adj[0],adj[1]] >= thresh_add*err[adj[0],adj[1]], np.logical_not(significant)[adj[0],adj[1]])
temp_out[size:2*size,size:2*size][np.logical_not(significant)] = 0
#WRITING THE WAVELET DECOMP LAYER
conv_out[i] = +temp_out[size:2*size,size:2*size]
conv_out[i][conv_out[i]<0]=0 #leave only positive data to prevent problems while summing layers
data = conv
conv_out[max_level] = conv[size:2*size,size:2*size]
return conv_out
def process(obs_path):
bin_num = 6
try:
obs = Observation(obs_path)
sky_coord = SkyCoord(ra=obs.ra*u.deg,dec=obs.dec*u.deg,frame='fk5').transform_to('galactic')
lon, lat = sky_coord.l.value, sky_coord.b.value
rem_signal, rem_area, poiss_comp, rms = np.zeros((4,2**bin_num))
region = np.zeros(obs.data.shape, dtype=bool)
rem_region = np.logical_and(region, np.logical_not(obs.data.mask))
masked_obs = np.ma.masked_array(obs.data, mask = region)
good_lvl = np.zeros(bin_num,dtype=bool)
good_idx = 0
if obs.exposure > 1000:
wav_obs = obs.wavdecomp('gauss',(5,3),occ_coeff=True)
occ_coeff = obs.get_coeff()
for idx, lvl in enumerate(binary_array(bin_num)):
try:
region = wav_obs[2:-1][lvl].sum(0)>0
except ValueError:
region = np.zeros(obs.data.shape,dtype=bool)
masked_obs = np.ma.masked_array(obs.data, mask = region)*occ_coeff
rem_region = np.logical_and(region, np.logical_not(obs.data.mask))
rem_signal[idx] = 1-obs.data[region].sum()/obs.data.sum()
rem_area[idx] = 1 - rem_region.sum()/np.logical_not(obs.data.mask).sum()
poiss_comp[idx] = np.mean((masked_obs-masked_obs.mean())**2/masked_obs.mean())
rms[idx] = np.sqrt(((masked_obs-masked_obs.mean())**2).mean())
parameter = lambda idx: ((poiss_comp[idx])**2+((1-rem_area[idx])*0.5)**2)
if (parameter(idx)<parameter(good_idx)):
good_idx = idx
good_lvl = lvl
try:
region = wav_obs[2:-1][good_lvl].sum(0)>0
except ValueError:
region = np.zeros(obs.data.shape,dtype=bool)
masked_obs = np.ma.masked_array(obs.data, mask = region)
rem_region = np.logical_and(region, np.logical_not(obs.data.mask))
to_table = [obs.obs_id,
obs.det,
obs.ra,
obs.dec,
lon,
lat,
obs.time_start,
obs.exposure,
masked_obs.mean()/obs.exposure, #count rate
1 - rem_region.sum()/np.logical_not(obs.data.mask).sum(), #rem_area
poiss_comp[good_idx],
poiss_comp[0],
rms[good_idx]
]
else:
to_table = [obs.obs_id,
obs.det,
obs.ra,
obs.dec,
lon,
lat,
obs.time_start,
obs.exposure,
-1,
-1, #rem_signal
-1, #rem_area
-1,
-1,
-1
]
return to_table, region.astype(int)
except TypeError:
return obs_path, np.zeros((360,360))
def process_folder(input_folder = None, continue_old = None, fits_folder = None):
#DIALOGUE
if not(input_folder):
print('Enter path to the input folder')
input_folder = input()
if not(continue_old):
print('Create new file for this processing? y/n')
continue_old = input()
if continue_old == 'y':
start_new = True
elif continue_old == 'n':
start_new = False
else:
print('Cannot interprete input, closing script')
raise SystemExit(0)
if not(fits_folder):
print(f'Enter path to the output folder')
fits_folder = input()
region_folder = f'{fits_folder}\\Region'
#CREATE ALL NECESSARY FILES AND VARIBALES
obs_list = get_link_list(input_folder,sort_list = True)
start = perf_counter()
group_size = 50
makedirs(region_folder,exist_ok = True)
#FILTERING BY THE FILE SIZE
print(f'Finished scanning folders. Found {len(obs_list)} observations.')
table = {
'obs_id':[], 'detector':[], 'ra':[], 'dec':[], 'lon':[], 'lat':[], 't_start':[], 'exposure':[],
'count_rate':[], 'remaining_area':[], 'poisson_chi2':[], 'poisson_chi2_full':[], 'rms':[]
}
if start_new:
out_table = DataFrame(table)
out_table.to_csv(f'{fits_folder}\\test.csv')
out_table.to_csv(f'{fits_folder}\\test_skipped.csv')
#FILTERING OUT PROCESSED OBSERVATIONS
already_processed_list = read_csv(f'{fits_folder}\\test.csv',index_col=0,dtype={'obs_id':str})
already_skipped_list = read_csv(f'{fits_folder}\\test_skipped.csv',index_col=0,dtype={'obs_id':str})
already_processed = (already_processed_list['obs_id'].astype(str)+already_processed_list['detector']).values
already_skipped = (already_skipped_list['obs_id'].astype(str)+already_skipped_list['detector']).values
obs_list_names = [curr[curr.index('nu')+2:curr.index('_cl.evt')-2] for curr in obs_list]
not_processed = np.array([(curr not in already_processed) for curr in obs_list_names])
not_skipped = np.array([(curr not in already_skipped) for curr in obs_list_names])
obs_list = obs_list[np.logical_and(not_processed,not_skipped)]
print(f'Removed already processed observations. {len(obs_list)} observations remain.')
#START PROCESSING
print('Started processing...')
num = 0
for group_idx in range(len(obs_list)//group_size+1):
print(f'Started group {group_idx}')
group_list = obs_list[group_size*group_idx:min(group_size*(group_idx+1),len(obs_list))]
max_size = np.array([stat(file).st_size/2**20 for file in group_list]).max()
process_num = cpu_count() if max_size<50 else (cpu_count()//2 if max_size<200 else (cpu_count()//4 if max_size<1000 else 1))
print(f"Max file size in group is {max_size:.2f}Mb, create {process_num} processes")
with get_context('spawn').Pool(processes=process_num) as pool:
for result,region in pool.imap(process,group_list):
if type(result) is np.str_:
obs_id = result[result.index('nu'):result.index('_cl.evt')]
print(f'{num:>3} is skipped. File {obs_id}')
num +=1
continue
for key,value in zip(table.keys(),result):
table[key] = [value]
if table['exposure'][0] < 1000:
print(f'{num:>3} {str(result[0])+result[1]} is skipped. Exposure < 1000')
DataFrame(table).to_csv(f'{fits_folder}\\test_skipped.csv',mode='a',header=False)
num +=1
continue
DataFrame(table).to_csv(f'{fits_folder}\\test.csv',mode='a',header=False)
fits.writeto(f'{region_folder}\\{str(result[0])+result[1]}_region.fits', region, overwrite= True)
print(f'{num:>3} {str(result[0])+result[1]} is written.')
num +=1
print('Converting generated csv to fits file...')
print(f'Current time in: {(perf_counter()-start):.2f}')
print(f'Processed {num/len(obs_list)*100:.2f} percent')
csv_file = read_csv(f'{fits_folder}\\test.csv',index_col=0,dtype={'obs_id':str})
Table.from_pandas(csv_file).write(f'{fits_folder}\\test.fits',overwrite=True)
print(f'Finished writing: {perf_counter()-start}')