Added comments to all functions

This commit is contained in:
Andrey Mukhin 2022-09-20 12:46:57 +03:00
parent 6c99182db4
commit e3046baa68

View File

@ -17,7 +17,10 @@ from warnings import filterwarnings
filterwarnings('ignore') filterwarnings('ignore')
def get_link_list(folder, sort_list=True): def get_link_list(folder: str, sort_list=True) -> list[str]:
"""
Returns array of paths to all *_cl.evt files in the directory recursively.
"""
links = glob(f'{folder}\\**\\*_cl.evt', recursive=True) links = glob(f'{folder}\\**\\*_cl.evt', recursive=True)
if sort_list: if sort_list:
sorted_list = sorted(links, key=lambda x: stat(x).st_size) sorted_list = sorted(links, key=lambda x: stat(x).st_size)
@ -26,7 +29,10 @@ def get_link_list(folder, sort_list=True):
return np.array(links) return np.array(links)
def binary_array(num): def binary_array(num: int) -> list[list[bool]]:
"""
Returns list of all possible combinations of num of bool values.
"""
variants = [[0, 1], ]*num variants = [[0, 1], ]*num
out = np.zeros((2**num, num), bool) out = np.zeros((2**num, num), bool)
for idx, level in enumerate(itertools.product(*variants)): for idx, level in enumerate(itertools.product(*variants)):
@ -34,8 +40,12 @@ def binary_array(num):
return out return out
def create_array(filename, mode='Sky'): def create_array(file_path: str, mode='Sky') -> list[int]:
temp = fits.getdata(filename, 1) """
Returns a 2d array of counts for given observation file.
Modes 'Sky' and 'Det' return arrays in (X,Y) and DET1 respectively.
"""
temp = fits.getdata(file_path, 1)
if mode == 'Sky': if mode == 'Sky':
return np.histogram2d(temp['Y'], return np.histogram2d(temp['Y'],
temp['X'], temp['X'],
@ -49,6 +59,10 @@ def create_array(filename, mode='Sky'):
def get_wcs(file): def get_wcs(file):
"""
Returns WCS for given observation.
Note that argument here is an opened fits file, not a path.
"""
header = file[1].header header = file[1].header
wcs = WCS({ wcs = WCS({
'CTYPE1': header['TCTYP38'], 'CTYPE2': header['TCTYP39'], 'CTYPE1': header['TCTYP38'], 'CTYPE2': header['TCTYP39'],
@ -61,7 +75,10 @@ def get_wcs(file):
return wcs return wcs
def atrous(level=0, max_size=1001): def atrous(level=0, max_size=1001) -> list[list[float]]:
"""
Returns a trou kernel with the size 2**level and corresponding shape.
"""
base = 1/256*np.array([ base = 1/256*np.array([
[1, 4, 6, 4, 1], [1, 4, 6, 4, 1],
[4, 16, 24, 16, 4], [4, 16, 24, 16, 4],
@ -78,7 +95,10 @@ def atrous(level=0, max_size=1001):
return output return output
def gauss(level=0, max_size=1000): def gauss(level=0, max_size=1000) -> list[list[float]]:
"""
Returns gaussian kernel with sigma = 2**level
"""
size = min(5*2**(level+1)+1, max_size) size = min(5*2**(level+1)+1, max_size)
sigma = 2**(level) sigma = 2**(level)
A = 1/(2*np.pi*sigma**2)**0.5 A = 1/(2*np.pi*sigma**2)**0.5
@ -88,6 +108,9 @@ def gauss(level=0, max_size=1000):
def adjecent(array): def adjecent(array):
"""
Returns two lists of indices of cells adjecent or diagonal to non-zero cells of given array
"""
grid = np.array([ grid = np.array([
[1, 1, 1], [1, 1, 1],
[1, 0, 1], [1, 0, 1],
@ -104,6 +127,9 @@ def adjecent(array):
def add_borders(array, middle=True): def add_borders(array, middle=True):
"""
Returns border mask for an DET1 observation array
"""
mask = np.zeros(array.shape) mask = np.zeros(array.shape)
datax, datay = np.any(array > 0, 0), np.any(array > 0, 1) datax, datay = np.any(array > 0, 0), np.any(array > 0, 1)
# Add border masks # Add border masks
@ -119,6 +145,9 @@ def add_borders(array, middle=True):
def fill_poisson(array, size_input=32): def fill_poisson(array, size_input=32):
"""
Fills all masked elements of an array with poisson signal with local expected value.
"""
if not (isinstance(array, np.ma.MaskedArray)): if not (isinstance(array, np.ma.MaskedArray)):
print('No mask found') print('No mask found')
return array return array
@ -137,6 +166,10 @@ def fill_poisson(array, size_input=32):
def mirror(array): def mirror(array):
"""
Returns 3 times bigger array with mirrored elements.
Original array is located in center.
"""
size = array.shape[0] size = array.shape[0]
output = np.tile(array, (3, 3)) output = np.tile(array, (3, 3))
output[0:size] = np.flipud(output[0:size]) output[0:size] = np.flipud(output[0:size])
@ -147,6 +180,9 @@ def mirror(array):
class Observation: class Observation:
"""
Main class, contains information about the observation given.
"""
def __init__(self, file_name, E_borders=[3, 20]): def __init__(self, file_name, E_borders=[3, 20]):
self.filename = file_name self.filename = file_name
self.name = file_name[file_name.find('nu'):].replace('_cl.evt','') self.name = file_name[file_name.find('nu'):].replace('_cl.evt','')
@ -163,11 +199,19 @@ class Observation:
self.exposure = self.header['EXPOSURE'] self.exposure = self.header['EXPOSURE']
def get_coeff(self): def get_coeff(self):
"""
Returns normalalizing coefficients for different chips of the observation detector.
Coefficients are obtained from stacked observations in OCC mode.
"""
coeff = np.array([0.977, 0.861, 1.163, 1.05]) if self.det == 'A' else np.array([1.004, 0.997, 1.025, 0.979]) coeff = np.array([0.977, 0.861, 1.163, 1.05]) if self.det == 'A' else np.array([1.004, 0.997, 1.025, 0.979])
resized_coeff = (coeff).reshape(2, 2).repeat(180, 0).repeat(180, 1) resized_coeff = (coeff).reshape(2, 2).repeat(180, 0).repeat(180, 1)
return resized_coeff return resized_coeff
def get_data(self, file, E_borders=[3, 20]): def get_data(self, file, E_borders=[3, 20]):
"""
Returns masked array with DET1 image data for given energy band.
Mask is created from observations badpix tables and to mask the border and gaps.
"""
PI_min, PI_max = (np.array(E_borders)-1.6)/0.04 PI_min, PI_max = (np.array(E_borders)-1.6)/0.04
data = file[1].data.copy() data = file[1].data.copy()
idx_mask = (data['STATUS'].sum(1) == 0) idx_mask = (data['STATUS'].sum(1) == 0)
@ -182,6 +226,9 @@ class Observation:
return output, mask return output, mask
def get_bad_pix(self, file): def get_bad_pix(self, file):
"""
Creates a mask for observation based on badpix tables.
"""
output = np.zeros((360, 360)) output = np.zeros((360, 360))
kernel = np.ones((5, 5)) kernel = np.ones((5, 5))
for i in range(4): for i in range(4):
@ -197,6 +244,9 @@ class Observation:
return output return output
def wavdecomp(self, mode='gauss', thresh=False, occ_coeff=False): def wavdecomp(self, mode='gauss', thresh=False, occ_coeff=False):
"""
Performs a wavelet decomposition of image.
"""
# THRESHOLD # THRESHOLD
if type(thresh) is int: if type(thresh) is int:
thresh_max, thresh_add = thresh, thresh/2 thresh_max, thresh_add = thresh, thresh/2
@ -245,6 +295,9 @@ class Observation:
return conv_out return conv_out
def region_to_raw(self, region): def region_to_raw(self, region):
"""
Returns a hdu_list with positions of masked pixels in RAW coordinates.
"""
x_region, y_region = np.where(region) x_region, y_region = np.where(region)
tables = [] tables = []
for i in range(4): for i in range(4):
@ -349,6 +402,10 @@ def process(args):
def process_folder(input_folder=None, start_new_file=None, fits_folder=None, thresh=None): def process_folder(input_folder=None, start_new_file=None, fits_folder=None, thresh=None):
"""
Generates a fits-table of parameters, folder with mask images in DET1 and BADPIX tables in RAW for all observations in given folder.
Note that observations with exposure < 1000 sec a skipped.
"""
# DIALOGUE # DIALOGUE
if not (input_folder): if not (input_folder):
print('Enter path to the input folder') print('Enter path to the input folder')