diff --git a/nuwavsource/nuwavsource.py b/nuwavsource/nuwavsource.py index 8b42112..5412c24 100644 --- a/nuwavsource/nuwavsource.py +++ b/nuwavsource/nuwavsource.py @@ -17,7 +17,7 @@ from warnings import filterwarnings filterwarnings('ignore') -def get_link_list(folder: str, sort_list=True) -> list[str]: +def get_link_list(folder: str, sort_list: bool = True) -> list[str]: """ Returns array of paths to all *_cl.evt files in the directory recursively. """ @@ -40,7 +40,7 @@ def binary_array(num: int) -> list[list[bool]]: return out -def create_array(file_path: str, mode='Sky') -> list[int]: +def create_array(file_path: str, mode: str = 'Sky') -> list[int]: """ Returns a 2d array of counts for given observation file. Modes 'Sky' and 'Det' return arrays in (X,Y) and DET1 respectively. @@ -75,7 +75,7 @@ def get_wcs(file): return wcs -def atrous(level=0, max_size=1001) -> list[list[float]]: +def atrous(level: int = 0, max_size: int = 1001) -> list[list[float]]: """ Returns a trou kernel with the size 2**level and corresponding shape. """ @@ -95,7 +95,15 @@ def atrous(level=0, max_size=1001) -> list[list[float]]: return output -def gauss(level=0, max_size=1000) -> list[list[float]]: +def atrous_sig(level: int = 0) -> float: + sig_values = [0.8908, 0.20066, 0.08551, 0.04122, 0.02042] + if level < 5: + return sig_values[level] + else: + return sig_values[4]/2**(level-4) + + +def gauss(level: int = 0, max_size: int = 1000) -> list[list[float]]: """ Returns gaussian kernel with sigma = 2**level """ @@ -122,8 +130,7 @@ def adjecent(array): np.logical_not(array.mask)) except AttributeError: output = np.logical_and(output, np.logical_not(array)) - output = np.argwhere(output == True) - return output[:, 0], output[:, 1] + return output def add_borders(array, middle=True): @@ -255,41 +262,42 @@ class Observation: # INIT NEEDED VARIABLES wavelet = globals()[mode] max_level = 8 - conv_out = np.zeros((max_level+1, self.data.shape[0], self.data.shape[1])) + conv_out = np.zeros( + (max_level+1, self.data.shape[0], self.data.shape[1]) + ) size = self.data.shape[0] # PREPARE ORIGINAL DATA FOR ANALYSIS: FILL THE HOLES + MIRROR + DETECTOR CORRECTION data = fill_poisson(self.data) if occ_coeff: data = data*self.get_coeff() data = mirror(data) - data_bkg = data.copy() # ITERATIVELY CONDUCT WAVLET DECOMPOSITION for i in range(max_level): conv = fftconvolve(data, wavelet(i), mode='same') + conv[conv < 0] = 0 temp_out = data-conv # ERRORMAP CALCULATION if thresh_max != 0: - sig = ((wavelet(i)**2).sum())**0.5 - bkg = fftconvolve(data_bkg, wavelet(i), mode='same') + if mode == 'gauss': + sig = ((wavelet(i)**2).sum())**0.5 + if mode == 'atrous': + sig = atrous_sig(i) + bkg = fftconvolve(data, wavelet(i), mode='same') bkg[bkg < 0] = 0 err = (1+np.sqrt(bkg+0.75))*sig - significant = (temp_out > thresh_max*err)[size:2*size, size:2*size] + significant = (np.abs(temp_out) > thresh_max*err)[size:2*size, size:2*size] if thresh_add != 0: - add_significant = (temp_out > thresh_add*err)[size:2*size, size:2*size] + add_significant = (np.abs(temp_out) > thresh_add*err)[size:2*size, size:2*size] adj = adjecent(significant) - add_condition = np.logical_and(add_significant[adj[0], adj[1]], - np.logical_not(significant[adj[0], adj[1]])) + add_condition = np.logical_and(adj, add_significant) while (add_condition).any(): - to_add = adj[0][add_condition], adj[1][add_condition] - significant[to_add[0], to_add[1]] = True + significant = np.logical_or(significant, add_condition) adj = adjecent(significant) - add_condition = np.logical_and(add_significant[adj[0], adj[1]], - np.logical_not(significant[adj[0],adj[1]])) - temp_out[size:2*size, size:2*size][np.logical_not(significant)] = 0 + add_condition = np.logical_and(adj, add_significant) + significant = mirror(significant) + temp_out[np.logical_not(significant)] = 0 # WRITING THE WAVELET DECOMP LAYER conv_out[i] = +temp_out[size:2*size, size:2*size] - # DISCARDING NEGATIVE COMPONENTS OF WAVELETS TO MAKE MASK BY SUMMING WAVELET LAYERS - conv_out[i][conv_out[i] < 0] = 0 data = conv conv_out[max_level] = conv[size:2*size, size:2*size] return conv_out @@ -341,11 +349,13 @@ def process(args): good_lvl = np.zeros(bin_num, dtype=bool) good_idx = 0 if obs.exposure > 1000: - wav_obs = obs.wavdecomp('gauss', thresh, occ_coeff=True) + wav_obs = obs.wavdecomp('atrous', thresh, occ_coeff=True) + wav_obs[wav_obs < 0] = 0 occ_coeff = obs.get_coeff() for idx, lvl in enumerate(binary_array(bin_num)): try: region = wav_obs[2:-1][lvl].sum(0) > 0 + # region = fftconvolve(wav_obs[2:-1][lvl].sum(0)>0, gauss(3),mode='same') > 0.5 except ValueError: region = np.zeros(obs.data.shape, dtype=bool) masked_obs = np.ma.masked_array(obs.data, mask=region)*occ_coeff @@ -360,6 +370,7 @@ def process(args): good_lvl = lvl try: region = wav_obs[2:-1][good_lvl].sum(0) > 0 + # region = fftconvolve(wav_obs[2:-1][good_lvl].sum(0)>0, gauss(2),mode='same')>0.2 if region.sum() > 0: region_raw = obs.region_to_raw(region.astype(int)) except ValueError: @@ -453,13 +464,36 @@ def process_folder(input_folder=None, start_new_file=None, fits_folder=None, thr out_table.to_csv(f'{fits_folder}\\test.csv') out_table.to_csv(f'{fits_folder}\\test_skipped.csv') # FILTERING OUT PROCESSED OBSERVATIONS - already_processed_list = read_csv(f'{fits_folder}\\test.csv', index_col=0, dtype={'obs_id':str}) - already_skipped_list = read_csv(f'{fits_folder}\\test_skipped.csv', index_col=0, dtype={'obs_id':str}) - already_processed = (already_processed_list['obs_id'].astype(str)+already_processed_list['detector']).values - already_skipped = (already_skipped_list['obs_id'].astype(str)+already_skipped_list['detector']).values - obs_list_names = [curr[curr.index('nu')+2:curr.index('_cl.evt')-2] for curr in obs_list] - not_processed = np.array([(curr not in already_processed) for curr in obs_list_names]) - not_skipped = np.array([(curr not in already_skipped) for curr in obs_list_names]) + already_processed_list = read_csv( + f'{fits_folder}\\test.csv', + index_col=0, + dtype={'obs_id': str} + ) + already_skipped_list = read_csv( + f'{fits_folder}\\test_skipped.csv', + index_col=0, + dtype={'obs_id': str} + ) + already_processed = ( + already_processed_list['obs_id'].astype(str) + + already_processed_list['detector'] + ).values + already_skipped = ( + already_skipped_list['obs_id'].astype(str) + + already_skipped_list['detector'] + ).values + obs_list_names = [ + curr[curr.index('nu')+2:curr.index('_cl.evt')-2] + for curr in obs_list + ] + not_processed = np.array([ + (curr not in already_processed) + for curr in obs_list_names + ]) + not_skipped = np.array([ + (curr not in already_skipped) + for curr in obs_list_names + ]) obs_list = obs_list[np.logical_and(not_processed, not_skipped)] print(f'Removed already processed observations. {len(obs_list)} observations remain.') # START PROCESSING @@ -468,8 +502,11 @@ def process_folder(input_folder=None, start_new_file=None, fits_folder=None, thr for group_idx in range(len(obs_list)//group_size+1): print(f'Started group {group_idx}') group_list = obs_list[group_size*group_idx:min(group_size*(group_idx+1), len(obs_list))] - max_size = np.array([stat(file).st_size/2**20 for file in group_list]).max() - process_num = cpu_count() if max_size < 50 else (cpu_count()//2 if max_size < 200 else (cpu_count()//4 if max_size < 1000 else 1)) + max_size = np.array([ + stat(file).st_size/2**20 + for file in group_list + ]).max() + process_num = (cpu_count() if max_size < 50 else (cpu_count()//2 if max_size < 200 else (cpu_count()//4 if max_size < 1000 else 1))) print(f"Max file size in group is {max_size:.2f}Mb, create {process_num} processes") with get_context('spawn').Pool(processes=process_num) as pool: packed_args = map(lambda _: (_, thresh), group_list)