Fixed wavelet decomp. Optimized neighbour search.

This commit is contained in:
Andrey Mukhin 2022-10-21 13:11:19 +03:00
parent d09c715505
commit 3d95e8356a

View File

@ -17,7 +17,7 @@ from warnings import filterwarnings
filterwarnings('ignore')
def get_link_list(folder: str, sort_list=True) -> list[str]:
def get_link_list(folder: str, sort_list: bool = True) -> list[str]:
"""
Returns array of paths to all *_cl.evt files in the directory recursively.
"""
@ -40,7 +40,7 @@ def binary_array(num: int) -> list[list[bool]]:
return out
def create_array(file_path: str, mode='Sky') -> list[int]:
def create_array(file_path: str, mode: str = 'Sky') -> list[int]:
"""
Returns a 2d array of counts for given observation file.
Modes 'Sky' and 'Det' return arrays in (X,Y) and DET1 respectively.
@ -75,7 +75,7 @@ def get_wcs(file):
return wcs
def atrous(level=0, max_size=1001) -> list[list[float]]:
def atrous(level: int = 0, max_size: int = 1001) -> list[list[float]]:
"""
Returns a trou kernel with the size 2**level and corresponding shape.
"""
@ -95,7 +95,15 @@ def atrous(level=0, max_size=1001) -> list[list[float]]:
return output
def gauss(level=0, max_size=1000) -> list[list[float]]:
def atrous_sig(level: int = 0) -> float:
sig_values = [0.8908, 0.20066, 0.08551, 0.04122, 0.02042]
if level < 5:
return sig_values[level]
else:
return sig_values[4]/2**(level-4)
def gauss(level: int = 0, max_size: int = 1000) -> list[list[float]]:
"""
Returns gaussian kernel with sigma = 2**level
"""
@ -122,8 +130,7 @@ def adjecent(array):
np.logical_not(array.mask))
except AttributeError:
output = np.logical_and(output, np.logical_not(array))
output = np.argwhere(output == True)
return output[:, 0], output[:, 1]
return output
def add_borders(array, middle=True):
@ -255,41 +262,42 @@ class Observation:
# INIT NEEDED VARIABLES
wavelet = globals()[mode]
max_level = 8
conv_out = np.zeros((max_level+1, self.data.shape[0], self.data.shape[1]))
conv_out = np.zeros(
(max_level+1, self.data.shape[0], self.data.shape[1])
)
size = self.data.shape[0]
# PREPARE ORIGINAL DATA FOR ANALYSIS: FILL THE HOLES + MIRROR + DETECTOR CORRECTION
data = fill_poisson(self.data)
if occ_coeff:
data = data*self.get_coeff()
data = mirror(data)
data_bkg = data.copy()
# ITERATIVELY CONDUCT WAVLET DECOMPOSITION
for i in range(max_level):
conv = fftconvolve(data, wavelet(i), mode='same')
conv[conv < 0] = 0
temp_out = data-conv
# ERRORMAP CALCULATION
if thresh_max != 0:
sig = ((wavelet(i)**2).sum())**0.5
bkg = fftconvolve(data_bkg, wavelet(i), mode='same')
if mode == 'gauss':
sig = ((wavelet(i)**2).sum())**0.5
if mode == 'atrous':
sig = atrous_sig(i)
bkg = fftconvolve(data, wavelet(i), mode='same')
bkg[bkg < 0] = 0
err = (1+np.sqrt(bkg+0.75))*sig
significant = (temp_out > thresh_max*err)[size:2*size, size:2*size]
significant = (np.abs(temp_out) > thresh_max*err)[size:2*size, size:2*size]
if thresh_add != 0:
add_significant = (temp_out > thresh_add*err)[size:2*size, size:2*size]
add_significant = (np.abs(temp_out) > thresh_add*err)[size:2*size, size:2*size]
adj = adjecent(significant)
add_condition = np.logical_and(add_significant[adj[0], adj[1]],
np.logical_not(significant[adj[0], adj[1]]))
add_condition = np.logical_and(adj, add_significant)
while (add_condition).any():
to_add = adj[0][add_condition], adj[1][add_condition]
significant[to_add[0], to_add[1]] = True
significant = np.logical_or(significant, add_condition)
adj = adjecent(significant)
add_condition = np.logical_and(add_significant[adj[0], adj[1]],
np.logical_not(significant[adj[0],adj[1]]))
temp_out[size:2*size, size:2*size][np.logical_not(significant)] = 0
add_condition = np.logical_and(adj, add_significant)
significant = mirror(significant)
temp_out[np.logical_not(significant)] = 0
# WRITING THE WAVELET DECOMP LAYER
conv_out[i] = +temp_out[size:2*size, size:2*size]
# DISCARDING NEGATIVE COMPONENTS OF WAVELETS TO MAKE MASK BY SUMMING WAVELET LAYERS
conv_out[i][conv_out[i] < 0] = 0
data = conv
conv_out[max_level] = conv[size:2*size, size:2*size]
return conv_out
@ -341,11 +349,13 @@ def process(args):
good_lvl = np.zeros(bin_num, dtype=bool)
good_idx = 0
if obs.exposure > 1000:
wav_obs = obs.wavdecomp('gauss', thresh, occ_coeff=True)
wav_obs = obs.wavdecomp('atrous', thresh, occ_coeff=True)
wav_obs[wav_obs < 0] = 0
occ_coeff = obs.get_coeff()
for idx, lvl in enumerate(binary_array(bin_num)):
try:
region = wav_obs[2:-1][lvl].sum(0) > 0
# region = fftconvolve(wav_obs[2:-1][lvl].sum(0)>0, gauss(3),mode='same') > 0.5
except ValueError:
region = np.zeros(obs.data.shape, dtype=bool)
masked_obs = np.ma.masked_array(obs.data, mask=region)*occ_coeff
@ -360,6 +370,7 @@ def process(args):
good_lvl = lvl
try:
region = wav_obs[2:-1][good_lvl].sum(0) > 0
# region = fftconvolve(wav_obs[2:-1][good_lvl].sum(0)>0, gauss(2),mode='same')>0.2
if region.sum() > 0:
region_raw = obs.region_to_raw(region.astype(int))
except ValueError:
@ -453,13 +464,36 @@ def process_folder(input_folder=None, start_new_file=None, fits_folder=None, thr
out_table.to_csv(f'{fits_folder}\\test.csv')
out_table.to_csv(f'{fits_folder}\\test_skipped.csv')
# FILTERING OUT PROCESSED OBSERVATIONS
already_processed_list = read_csv(f'{fits_folder}\\test.csv', index_col=0, dtype={'obs_id':str})
already_skipped_list = read_csv(f'{fits_folder}\\test_skipped.csv', index_col=0, dtype={'obs_id':str})
already_processed = (already_processed_list['obs_id'].astype(str)+already_processed_list['detector']).values
already_skipped = (already_skipped_list['obs_id'].astype(str)+already_skipped_list['detector']).values
obs_list_names = [curr[curr.index('nu')+2:curr.index('_cl.evt')-2] for curr in obs_list]
not_processed = np.array([(curr not in already_processed) for curr in obs_list_names])
not_skipped = np.array([(curr not in already_skipped) for curr in obs_list_names])
already_processed_list = read_csv(
f'{fits_folder}\\test.csv',
index_col=0,
dtype={'obs_id': str}
)
already_skipped_list = read_csv(
f'{fits_folder}\\test_skipped.csv',
index_col=0,
dtype={'obs_id': str}
)
already_processed = (
already_processed_list['obs_id'].astype(str) +
already_processed_list['detector']
).values
already_skipped = (
already_skipped_list['obs_id'].astype(str) +
already_skipped_list['detector']
).values
obs_list_names = [
curr[curr.index('nu')+2:curr.index('_cl.evt')-2]
for curr in obs_list
]
not_processed = np.array([
(curr not in already_processed)
for curr in obs_list_names
])
not_skipped = np.array([
(curr not in already_skipped)
for curr in obs_list_names
])
obs_list = obs_list[np.logical_and(not_processed, not_skipped)]
print(f'Removed already processed observations. {len(obs_list)} observations remain.')
# START PROCESSING
@ -468,8 +502,11 @@ def process_folder(input_folder=None, start_new_file=None, fits_folder=None, thr
for group_idx in range(len(obs_list)//group_size+1):
print(f'Started group {group_idx}')
group_list = obs_list[group_size*group_idx:min(group_size*(group_idx+1), len(obs_list))]
max_size = np.array([stat(file).st_size/2**20 for file in group_list]).max()
process_num = cpu_count() if max_size < 50 else (cpu_count()//2 if max_size < 200 else (cpu_count()//4 if max_size < 1000 else 1))
max_size = np.array([
stat(file).st_size/2**20
for file in group_list
]).max()
process_num = (cpu_count() if max_size < 50 else (cpu_count()//2 if max_size < 200 else (cpu_count()//4 if max_size < 1000 else 1)))
print(f"Max file size in group is {max_size:.2f}Mb, create {process_num} processes")
with get_context('spawn').Pool(processes=process_num) as pool:
packed_args = map(lambda _: (_, thresh), group_list)