# %% import numpy as np import itertools from os import stat from scipy.signal import fftconvolve, convolve2d from astropy.io import fits from astropy.wcs import WCS from glob import glob # %% def binary_array(num): variants = [[0,1],]*num out = np.zeros((2**num,num),bool) for idx, level in enumerate(itertools.product(*variants)): out[idx] = np.array(level,dtype=bool) return out def create_array(filename,mode='Sky'): temp = fits.getdata(filename,1) if mode == 'Sky': return np.histogram2d(temp['Y'],temp['X'],1000,[[0, 1000], [0, 1000]])[0] if mode == 'Det': return np.histogram2d(temp['DET1Y'],temp['DET1X'],360,[[0, 360], [0, 360]])[0] def get_wcs(file): header = file[1].header wcs = WCS({ 'CTYPE1': header['TCTYP38'], 'CTYPE2': header['TCTYP39'], 'CUNIT1': header['TCUNI38'], 'CUNIT2': header['TCUNI39'], 'CDELT1': header['TCDLT38'], 'CDELT2': header['TCDLT39'], 'CRPIX1': header['TCRPX38'], 'CRPIX2': header['TCRPX39'], 'CRVAL1': header['TCRVL38'], 'CRVAL2': header['TCRVL39'], 'NAXIS1': header['TLMAX38'], 'NAXIS2': header['TLMAX39'] }) return wcs def get_link_list(folder, sort_list=True): links = glob(f'{folder}\\**\\*_cl.evt',recursive=True) if sort_list: sorted_list = sorted(links, key=lambda x: stat(x).st_size) return np.array(sorted_list) else: return np.array(links) def atrous(level = 0, resize = False, max_size = 1001): base = 1/256*np.array([ [1, 4, 6, 4, 1], [4, 16, 24, 16, 4], [6, 24, 36, 24, 6], [4, 16, 24, 16, 4], [1, 4, 6, 4, 1], ]) size = 2**level * (base.shape[0]-1)+1 output = np.zeros((size,size)) output[::2**level, ::2**level] = base if resize: output = np.pad(output, pad_width=2**(level+1)) if output.shape[0]>max_size: return output[(size-1)//2-(max_size-1)//2:(size-1)//2+(max_size-1)//2+1, (size-1)//2-(max_size-1)//2:(size-1)//2+(max_size-1)//2+1] return output def gauss(level=0, resize=False, max_size = 1000): size = min(5*2**(level+1)+1, max_size) sigma = 2**(level) A = 1/(2*np.pi*sigma**2)**0.5 x = A*np.exp( (-(np.arange(size)-(size-1)//2)**2)/(2*sigma**2)) out = np.multiply.outer(x,x) return out def adjecent(array): grid = np.array([ [1,1,1], [1,0,1], [1,1,1] ]) output = fftconvolve(array,grid,mode='same') >= 0.5 try: output = np.logical_and(np.logical_and(output, np.logical_not(array)),np.logical_not(array.mask)) except AttributeError: output = np.logical_and(output, np.logical_not(array)) output = np.argwhere(output == True) return output[:,0], output[:,1] def add_borders(array,middle=True): mask = np.zeros(array.shape) datax, datay = np.any(array>0,0), np.any(array>0,1) #Add border masks x_min, y_min = np.argmax(datax), np.argmax(datay) x_max, y_max = len(datax) - np.argmax(datax[::-1]), len(datay) - np.argmax(datay[::-1]) mask[y_min:y_max,x_min:x_max] = True if middle is True: mask[176:191,:] = False mask[:,176:191] = False mask = np.logical_not(mask) return mask def fill_poisson(array, size_input=32): if not(isinstance(array,np.ma.MaskedArray)): print('No mask found') return array size = size_input output = array.data.copy() mask = array.mask.copy() while mask.sum()>1: kernel = np.ones((size,size))/size**2 coeff = fftconvolve(np.logical_not(mask),kernel,mode='same') mean = fftconvolve(output,kernel,mode='same') idx = np.where(np.logical_and(mask,coeff>0.1)) output[idx] = np.random.poisson(np.abs(mean[idx]/coeff[idx])) mask[idx] = False size *= 2 return output def fill_mean(array,size_input=3): size = size_input if not(isinstance(array,np.ma.MaskedArray)): print('No mask found') return array output = array.filled(0) for i,j in zip(*np.where(array.mask)): output[i,j] = array[max(0,i-size):min(array.shape[0]-1,i+size+1),max(0,j-size):min(array.shape[1]-1,j+size+1)].mean() while np.isnan(output).any(): size += 5 for i,j in zip(*np.where(np.isnan(output))): output[i,j] = array[max(0,i-size):min(array.shape[0]-1,i+size+1),max(0,j-size):min(array.shape[1]-1,j+size+1)].mean() return output def mirror(array): size = array.shape[0] output = np.tile(array,(3,3)) output[0:size] = np.flipud(output[0:size]) output[2*size:3*size] = np.flipud(output[2*size:3*size]) output[:,0:size] = np.fliplr(output[:,0:size]) output[:,2*size:3*size] = np.fliplr(output[:,2*size:3*size]) return output class Observation: def __init__(self, file_name, E_borders=[3,20]): self.filename = file_name self.name = file_name[file_name.find('nu'):].replace('_cl.evt','') with fits.open(file_name) as file: self.obs_id = file[0].header['OBS_ID'] self.ra = file[0].header['RA_NOM'] self.dec = file[0].header['DEC_NOM'] self.time_start = file[0].header['TSTART'] self.header = file[0].header self.det = self.header['INSTRUME'][-1] self.wcs = get_wcs(file) self.data = np.ma.masked_array(*self.get_data(file,E_borders)) self.hard_mask = add_borders(self.data.data,middle=False) shift = shift = int((360-64)/2) self.model = (fits.getdata(f'D:\Programms\Jupyter\Science\Source_mask\Model/det1_fpm{self.det}.cxb.fits',0) *(180/np.pi/10150)**2)[shift:360+shift,shift:360+shift] self.exposure = self.header['EXPOSURE'] def get_coeff(self): coeff = np.array([0.977,0.861,1.163,1.05]) if self.det=='A' else np.array([1.004,0.997,1.025,0.979]) resized_coeff = (coeff).reshape(2,2).repeat(180,0).repeat(180,1) return resized_coeff def get_data(self, file, E_borders=[3,20]): PI_min, PI_max = (np.array(E_borders)-1.6)/0.04 data = file[1].data.copy() idx_mask = (data['STATUS'].sum(1) == 0) idx_output = np.logical_and(idx_mask,np.logical_and((data['PI']>PI_min),(data['PI'] 0 return output def wavdecomp(self, mode = 'atrous', thresh=False,occ_coeff = False): #THRESHOLD if type(thresh) is int: thresh_max, thresh_add = thresh, thresh/2 elif type(thresh) is tuple: thresh_max, thresh_add = thresh #INIT NEEDED VARIABLES wavelet = globals()[mode] max_level = 8 conv_out = np.zeros((max_level+1,self.data.shape[0],self.data.shape[1])) size = self.data.shape[0] #PREPARE ORIGINAL DATA FOR ANALYSIS: FILL THE HOLES + MIRROR + DETECTOR CORRECTION data = fill_poisson(self.data) if occ_coeff: data = data*self.get_coeff() data = mirror(data) data_bkg = data.copy() #ITERATIVELY CONDUCT WAVLET DECOMPOSITION for i in range(max_level): conv = fftconvolve(data,wavelet(i),mode='same') temp_out = data-conv #ERRORMAP CALCULATION if thresh_max != 0: sig = ((wavelet(i)**2).sum())**0.5 bkg = fftconvolve(data_bkg, wavelet(i),mode='same') bkg[bkg<0] = 0 # err = (1+np.sqrt(bkg/sig**2 + 0.75))*sig**3 err = (1+np.sqrt(bkg+0.75))*sig # significant = (np.abs(temp_out)> thresh_max*err)[size:2*size,size:2*size] significant = (temp_out > thresh_max*err)[size:2*size,size:2*size] if thresh_add != 0: # add_significant = (np.abs(temp_out)> thresh_add*err)[size:2*size,size:2*size] add_significant = (temp_out > thresh_add*err)[size:2*size,size:2*size] adj = adjecent(significant) add_condition = np.logical_and(add_significant[adj[0],adj[1]],np.logical_not(significant[adj[0],adj[1]])) while (add_condition).any(): to_add = adj[0][add_condition], adj[1][add_condition] significant[to_add[0],to_add[1]] = True adj = adjecent(significant) add_condition = np.logical_and(add_significant[adj[0],adj[1]],np.logical_not(significant[adj[0],adj[1]])) # add_condition = np.logical_and(np.abs(temp_out)[adj[0],adj[1]] >= thresh_add*err[adj[0],adj[1]], np.logical_not(significant)[adj[0],adj[1]]) temp_out[size:2*size,size:2*size][np.logical_not(significant)] = 0 #WRITING THE WAVELET DECOMP LAYER conv_out[i] = +temp_out[size:2*size,size:2*size] conv_out[i][conv_out[i]<0]=0 #leave only positive data to prevent problems while summing layers data = conv conv_out[max_level] = conv[size:2*size,size:2*size] return conv_out # %%