initial project code commit

This commit is contained in:
2026-04-22 13:15:55 +03:00
parent e624299842
commit 9dcc2f9473
69 changed files with 7387 additions and 0 deletions

BIN
arttools/.DS_Store vendored Normal file

Binary file not shown.

6
arttools/pyproject.toml Normal file
View File

@@ -0,0 +1,6 @@
[build-system]
requires = [
"setuptools>=42",
]
build-backend = "setuptools.build_meta"

19
arttools/setup.py Normal file
View File

@@ -0,0 +1,19 @@
from setuptools import setup
# __version__ = "1.0.0"
exec(open('src/arttools/version.py').read())
setup(
name="arttools",
version=__version__,
author="M.Pavlinsky SRG/ART-XC software team",
description="SRG/ARTXC software tools",
package_dir={"": "src"},
packages=["arttools"],
entry_points={
"console_scripts": [
"arttime = arttools.arttime:main"
]
}
)

BIN
arttools/src/.DS_Store vendored Normal file

Binary file not shown.

BIN
arttools/src/arttools/.DS_Store vendored Normal file

Binary file not shown.

View File

View File

@@ -0,0 +1,27 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2021-2024 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the M. Pavlinsky SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import argparse
class KvAction(argparse.Action):
def __init__(self, option_strings, dest, **kwargs):
super(KvAction, self).__init__(option_strings, dest, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
if "=" in values:
key, val = values.split("=", 1)
else:
key, val = self.dest, values
setattr(namespace, key, val)
if __name__ == '__main__':
pass

18
arttools/src/arttools/art.py Executable file
View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
#
# -----------------------------------------------------------------------------
# Copyright (c) 2024 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the SRG/ART-XC software project.
# -----------------------------------------------------------------------------
if __name__ == '__main__':
pass

623
arttools/src/arttools/artindex.py Executable file
View File

@@ -0,0 +1,623 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2024 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import os
import shutil
from datetime import datetime
import json
from dataclasses import dataclass
import numpy as np
from astropy.io import fits
import polars as pl
import astropy.units as u
from astropy.time.formats import erfa
from astropy.time import Time
from astropy.time import TimezoneInfo
from arttools.arttime import Dtime
from arttools.arttime import ArtDay
from arttools.version import __version__
MJDREFD = 51543.875
MJDREF = Time(51543.875, format='mjd') # 01.01.2000 00:00:00 (UTC+3)
TZ_UTC = TimezoneInfo(utc_offset=0*u.hour) # UTC time zone
TZ_MSK = TimezoneInfo(utc_offset=3*u.hour) # UTC+3 (Moscow) time zone
@dataclass
class SessionIndex:
def __init__(self, fname):
self.key = []
self.npkey = np.array(self.key)
self.stem = []
self.station = []
self.sequence = []
self.data_quality = []
self.total_quality = []
self.processed = []
self.start = []
self.stop = []
self.T1_start = []
self.T1_stop = []
self.T2_start = []
self.T2_stop = []
self.T3_start = []
self.T3_stop = []
self.T4_start = []
self.T4_stop = []
self.T5_start = []
self.T5_stop = []
self.T6_start = []
self.T6_stop = []
self.T7_start = []
self.T7_stop = []
self.flag = []
self._index_file = fname
if os.path.isfile(self._index_file):
self.read(self._index_file)
def read(self, fname):
with fits.open(fname) as hdul:
self.stem = hdul[1].data['STEM'].tolist()
self.station = hdul[1].data['STATION'].tolist()
self.sequence = hdul[1].data['SEQUENCE'].tolist()
self.data_quality = hdul[1].data['DATA_QUALITY'].tolist()
self.total_quality = hdul[1].data['TOTAL_QUALITY'].tolist()
self.processed = hdul[1].data['PROCESSED'].tolist()
self.start = hdul[1].data['START' ].tolist()
self.stop = hdul[1].data['STOP' ].tolist()
self.T1_start = hdul[1].data['T1_START'].tolist()
self.T1_stop = hdul[1].data['T1_STOP' ].tolist()
self.T2_start = hdul[1].data['T2_START'].tolist()
self.T2_stop = hdul[1].data['T2_STOP' ].tolist()
self.T3_start = hdul[1].data['T3_START'].tolist()
self.T3_stop = hdul[1].data['T3_STOP' ].tolist()
self.T4_start = hdul[1].data['T4_START'].tolist()
self.T4_stop = hdul[1].data['T4_STOP' ].tolist()
self.T5_start = hdul[1].data['T5_START'].tolist()
self.T5_stop = hdul[1].data['T5_STOP' ].tolist()
self.T6_start = hdul[1].data['T6_START'].tolist()
self.T6_stop = hdul[1].data['T6_STOP' ].tolist()
self.T7_start = hdul[1].data['T7_START'].tolist()
self.T7_stop = hdul[1].data['T7_STOP' ].tolist()
self.flag = hdul[1].data['FLAG'].tolist()
for stm, sta, seq in zip(self.stem, self.station, self.sequence):
self.key.append('.'.join((stm, sta, seq)))
self.npkey = np.array(self.key)
def process(self, frame):
frame.make_key()
if frame.key in self.npkey:
index = np.where(self.npkey == frame.key)[0][0]
T1_start = frame.intervals['T1'][0]
T1_stop = frame.intervals['T1'][1]
T2_start = frame.intervals['T2'][0]
T2_stop = frame.intervals['T2'][1]
T3_start = frame.intervals['T3'][0]
T3_stop = frame.intervals['T3'][1]
T4_start = frame.intervals['T4'][0]
T4_stop = frame.intervals['T4'][1]
T5_start = frame.intervals['T5'][0]
T5_stop = frame.intervals['T5'][1]
T6_start = frame.intervals['T6'][0]
T6_stop = frame.intervals['T6'][1]
T7_start = frame.intervals['T7'][0]
T7_stop = frame.intervals['T7'][1]
times_ok = [
np.isclose(self.T1_start[index], T1_start),
np.isclose(self.T1_stop [index], T1_stop ),
np.isclose(self.T2_start[index], T2_start),
np.isclose(self.T2_stop [index], T2_stop ),
np.isclose(self.T3_start[index], T3_start),
np.isclose(self.T3_stop [index], T3_stop ),
np.isclose(self.T4_start[index], T4_start),
np.isclose(self.T4_stop [index], T4_stop ),
np.isclose(self.T5_start[index], T5_start),
np.isclose(self.T5_stop [index], T5_stop ),
np.isclose(self.T6_start[index], T6_start),
np.isclose(self.T6_stop [index], T6_stop ),
np.isclose(self.T7_start[index], T7_start),
np.isclose(self.T7_stop [index], T7_stop ),
]
quality_ok = [
np.isclose(self.data_quality[index] , frame.data_quality ),
np.isclose(self.total_quality[index], frame.total_quality),
]
all_ok = np.all(times_ok) and np.all(quality_ok)
if not all_ok:
print(
"WARNING: session {}.{}.{} differs from indexed".format(
frame.stem, frame.station, frame.sequence
)
)
else:
self.key.append(frame.key)
self.npkey = np.array(self.key)
self.stem.append(frame.stem)
self.station.append(frame.station)
self.sequence.append(frame.sequence)
self.data_quality.append(frame.data_quality)
self.total_quality.append(frame.total_quality)
self.processed.append(frame.processed)
start = np.array([
frame.intervals["T1"][0],
frame.intervals["T2"][0],
frame.intervals["T3"][0],
frame.intervals["T4"][0],
frame.intervals["T5"][0],
frame.intervals["T6"][0],
frame.intervals["T7"][0],
]).min()
self.start.append(start)
stop = np.array([
frame.intervals["T1"][1],
frame.intervals["T2"][1],
frame.intervals["T3"][1],
frame.intervals["T4"][1],
frame.intervals["T5"][1],
frame.intervals["T6"][1],
frame.intervals["T7"][1],
]).max()
self.stop.append(stop )
self.T1_start.append(frame.intervals["T1"][0])
self.T1_stop .append(frame.intervals["T1"][1])
self.T2_start.append(frame.intervals["T2"][0])
self.T2_stop .append(frame.intervals["T2"][1])
self.T3_start.append(frame.intervals["T3"][0])
self.T3_stop .append(frame.intervals["T3"][1])
self.T4_start.append(frame.intervals["T4"][0])
self.T4_stop .append(frame.intervals["T4"][1])
self.T5_start.append(frame.intervals["T5"][0])
self.T5_stop .append(frame.intervals["T5"][1])
self.T6_start.append(frame.intervals["T6"][0])
self.T6_stop .append(frame.intervals["T6"][1])
self.T7_start.append(frame.intervals["T7"][0])
self.T7_stop .append(frame.intervals["T7"][1])
# a FLAG is always 0 as we introduce session to index
self.flag.append(0)
def index(self):
return pl.DataFrame(
{
'key': self.key,
'stem': self.stem,
'station': self.station,
'sequence': self.sequence,
'data_quality': self.data_quality,
'total_quality': self.total_quality,
'processed': self.processed,
'start' : self.start ,
'stop' : self.stop ,
'T1_start' : self.T1_start ,
'T1_stop' : self.T1_stop ,
'T2_start' : self.T2_start ,
'T2_stop' : self.T2_stop ,
'T3_start' : self.T3_start ,
'T3_stop' : self.T3_stop ,
'T4_start' : self.T4_start ,
'T4_stop' : self.T4_stop ,
'T5_start' : self.T5_start ,
'T5_stop' : self.T5_stop ,
'T6_start' : self.T6_start ,
'T6_stop' : self.T6_stop ,
'T7_start' : self.T7_start ,
'T7_stop' : self.T7_stop ,
'flag' : self.flag ,
}
).sort('key')
def write_index_fits(self):
df = self.index()
hdu = fits.PrimaryHDU()
c00 = fits.Column(name='STEM', format='20A', unit='', array=np.array(df['stem']))
c01 = fits.Column(
name='STATION', format='2A', unit='', array=np.array(df['station'])
)
c02 = fits.Column(
name='SEQUENCE', format='3A', unit='', array=np.array(df['sequence'])
)
c03 = fits.Column(name='FLAG' , format='1I' , unit='' , array=np.array(df['flag' ]))
c04 = fits.Column(
name='DATA_QUALITY',
format='1D',
unit='%',
array=np.array(df['data_quality']),
)
c05 = fits.Column(
name='TOTAL_QUALITY',
format='1D',
unit='%',
array=np.array(df['total_quality']))
c06 = fits.Column(name='PROCESSED', format='19A', unit='date', array=np.array(df['processed']))
c07 = fits.Column(name='START' , format='1D' , unit='sec' , array=np.array(df['start' ]))
c08 = fits.Column(name='STOP' , format='1D' , unit='sec' , array=np.array(df['stop' ]))
c09 = fits.Column(name='T1_START' , format='1D' , unit='sec' , array=np.array(df['T1_start' ]))
c10 = fits.Column(name='T1_STOP' , format='1D' , unit='sec' , array=np.array(df['T1_stop' ]))
c11 = fits.Column(name='T2_START' , format='1D' , unit='sec' , array=np.array(df['T2_start' ]))
c12 = fits.Column(name='T2_STOP' , format='1D' , unit='sec' , array=np.array(df['T2_stop' ]))
c13 = fits.Column(name='T3_START' , format='1D' , unit='sec' , array=np.array(df['T3_start' ]))
c14 = fits.Column(name='T3_STOP' , format='1D' , unit='sec' , array=np.array(df['T3_stop' ]))
c15 = fits.Column(name='T4_START' , format='1D' , unit='sec' , array=np.array(df['T4_start' ]))
c16 = fits.Column(name='T4_STOP' , format='1D' , unit='sec' , array=np.array(df['T4_stop' ]))
c17 = fits.Column(name='T5_START' , format='1D' , unit='sec' , array=np.array(df['T5_start' ]))
c18 = fits.Column(name='T5_STOP' , format='1D' , unit='sec' , array=np.array(df['T5_stop' ]))
c19 = fits.Column(name='T6_START' , format='1D' , unit='sec' , array=np.array(df['T6_start' ]))
c20 = fits.Column(name='T6_STOP' , format='1D' , unit='sec' , array=np.array(df['T6_stop' ]))
c21 = fits.Column(name='T7_START' , format='1D' , unit='sec' , array=np.array(df['T7_start' ]))
c22 = fits.Column(name='T7_STOP' , format='1D' , unit='sec' , array=np.array(df['T7_stop' ]))
cols = fits.ColDefs(
[
c00, c01, c02, c03, c04, c05, c06, c07, c08, c09,
c10, c11, c12, c13, c14, c15, c16, c17, c18, c19,
c20, c21, c22
]
)
indexhdu = fits.BinTableHDU.from_columns(cols)
indexhdu.name = "INDEX"
creator = 'artindex v.{}'.format(__version__)
indexhdu.header['creator' ] = (creator, 'program and version that created this file')
dtnow = datetime.now(tz=TZ_MSK)
indexhdu.header['mjdref' ] = (MJDREFD , 'MJD corresponding to SC clock start 2000.0 MSK' )
indexhdu.header['date' ] = (dtnow.strftime('%Y/%m/%d'), 'date that FITS file was created' )
hdulist = fits.HDUList([hdu, indexhdu])
hdulist.writeto(self._index_file, overwrite=True)
print("written: {}".format(self._index_file))
@dataclass
class SessionIndexFrame:
def __init__(self):
self.key = ""
self.stem = ""
self.station = "00"
self.sequence = "000"
self.data_quality = 0
self.total_quality = 0
self.processed = ""
self.intervals = {
"T1": (0, 0),
"T2": (0, 0),
"T3": (0, 0),
"T4": (0, 0),
"T5": (0, 0),
"T6": (0, 0),
"T7": (0, 0),
}
self.flag = 0
def make_key(self):
self.key = ".".join((self.stem, self.station, self.sequence))
@dataclass
class ArtdayIndex:
def __init__(self, fname):
self.artday = []
self.msktime = []
self.complete = []
self.status = []
self.errors = []
self.processed = []
self.toolver = []
self.sciproc = []
self.scitool = []
self.scicaldb = []
self.start = []
self.stop = []
self.daylen = []
self.t1 = []
self.t2 = []
self.t3 = []
self.t4 = []
self.t5 = []
self.t6 = []
self.t7 = []
self.gyro = []
self.bokz = []
self.sed1 = []
self.sed2 = []
self._index = fname
if os.path.isfile(self._index):
self.read(self._index)
def read(self, fname):
with fits.open(fname) as hdul:
self.artday = hdul[1].data['ARTDAY' ].tolist()
self.msktime = hdul[1].data['MSKTIME' ].tolist()
self.complete = hdul[1].data['COMPLETE' ].tolist()
self.status = hdul[1].data['STATUS' ].tolist()
self.errors = hdul[1].data['ERRORS' ].tolist()
self.processed = hdul[1].data['PROCESSED' ].tolist()
self.toolver = hdul[1].data['TOOLVER' ].tolist()
self.sciproc = hdul[1].data['SCI_PROCESSED'].tolist()
self.scitool = hdul[1].data['SCI_TOOLVER' ].tolist()
self.scicaldb = hdul[1].data['SCI_CALDB' ].tolist()
self.start = hdul[1].data['START' ].tolist()
self.stop = hdul[1].data['STOP' ].tolist()
self.daylen = hdul[1].data['DAYLEN' ].tolist()
self.t1 = hdul[1].data['T1' ].tolist()
self.t2 = hdul[1].data['T2' ].tolist()
self.t3 = hdul[1].data['T3' ].tolist()
self.t4 = hdul[1].data['T4' ].tolist()
self.t5 = hdul[1].data['T5' ].tolist()
self.t6 = hdul[1].data['T6' ].tolist()
self.t7 = hdul[1].data['T7' ].tolist()
self.gyro = hdul[1].data['GYRO' ].tolist()
self.bokz = hdul[1].data['BOKZ' ].tolist()
self.sed1 = hdul[1].data['SED1' ].tolist()
self.sed2 = hdul[1].data['SED2' ].tolist()
def process(self, frame):
if frame['artday'] not in self.artday:
self.artday.append(frame['artday'])
self.msktime.append(frame['msktime'])
self.complete.append(frame['complete'])
self.status.append(-100)
self.errors.append(frame['errors'])
self.processed.append(frame['processed'])
self.toolver.append(frame['artfitstool'])
self.sciproc.append(' ')
self.scitool.append(' ')
self.scicaldb.append(' ')
self.start.append(frame['start'])
self.stop.append(frame['stop'])
self.daylen.append(frame['daylen'])
self.t1.append(frame['T1'])
self.t2.append(frame['T2'])
self.t3.append(frame['T3'])
self.t4.append(frame['T4'])
self.t5.append(frame['T5'])
self.t6.append(frame['T6'])
self.t7.append(frame['T7'])
self.gyro.append(frame['GYRO'])
self.bokz.append(frame['BOKZ'])
self.sed1.append(frame['SED1'])
self.sed2.append(frame['SED2'])
else:
index = np.where(np.array(self.artday) == frame['artday'])
if not index:
print('warning: failed to update index for artday={}'.format(frame['artday']))
return
index = index[0][0]
self.complete[index] = frame['complete']
self.errors[index] = frame['errors']
self.processed[index] = frame['processed']
self.toolver[index] = frame['artfitstool']
self.start[index] = frame['start']
self.stop[index] = frame['stop']
self.daylen[index] = frame['daylen']
self.t1[index] = frame['T1']
self.t2[index] = frame['T2']
self.t3[index] = frame['T3']
self.t4[index] = frame['T4']
self.t5[index] = frame['T5']
self.t6[index] = frame['T6']
self.t7[index] = frame['T7']
self.gyro[index] = frame['GYRO']
self.bokz[index] = frame['BOKZ']
self.sed1[index] = frame['SED1']
self.sed2[index] = frame['SED2']
def index(self):
return pl.DataFrame(
{
'artday' : self.artday ,
'msktime' : self.msktime ,
'complete' : self.complete ,
'status' : self.status ,
'errors' : self.errors ,
'processed' : self.processed,
'toolver' : self.toolver ,
'sciproc' : self.sciproc ,
'scitool' : self.scitool ,
'scicaldb' : self.scicaldb ,
'start' : self.start ,
'stop' : self.stop ,
'daylen' : self.daylen ,
'T1' : self.t1 ,
'T2' : self.t2 ,
'T3' : self.t3 ,
'T4' : self.t4 ,
'T5' : self.t5 ,
'T6' : self.t6 ,
'T7' : self.t7 ,
'GYRO' : self.gyro ,
'BOKZ' : self.bokz ,
'SED1' : self.sed1 ,
'SED2' : self.sed2 ,
}
, schema={
'artday' : pl.String ,
'msktime' : pl.String ,
'complete' : pl.Int16 ,
'status' : pl.Int16 ,
'errors' : pl.Int16 ,
'processed' : pl.String ,
'toolver' : pl.String ,
'sciproc' : pl.String ,
'scitool' : pl.String ,
'scicaldb' : pl.String ,
'start' : pl.Float64,
'stop' : pl.Float64,
'daylen' : pl.Float64,
'T1' : pl.Int16 ,
'T2' : pl.Int16 ,
'T3' : pl.Int16 ,
'T4' : pl.Int16 ,
'T5' : pl.Int16 ,
'T6' : pl.Int16 ,
'T7' : pl.Int16 ,
'GYRO' : pl.Int16 ,
'BOKZ' : pl.Int16 ,
'SED1' : pl.Int16 ,
'SED2' : pl.Int16
}
).sort('artday')
def _calculate_status(self, df):
status = np.array(df['status'])
errors = np.array(df['errors'])
artdays = np.array(df['artday'], dtype=int)
dtime = Dtime(datetime.utcnow().isoformat(), tzone=TZ_UTC)
nowday = ArtDay(dtime.to_artday()).get()
nowdays = np.full_like(artdays, nowday)
deltadays = nowdays - artdays
mask_maychange = status != 1
mask_haserrors = errors != 0
mask_freshdays = deltadays < 4
newstatus = np.zeros_like(artdays)
newstatus[mask_maychange & mask_freshdays & ~mask_haserrors] = -2
newstatus[mask_maychange & mask_freshdays & mask_haserrors] = -5
newstatus[mask_maychange & ~mask_freshdays & mask_haserrors] = -1
newstatus[~mask_maychange] = 1
return newstatus
def write_index_fits(self):
df = self.index()
status = self._calculate_status(df)
# print(df)
hdu = fits.PrimaryHDU()
# status = np.full_like(df['complete' ], -1)
c00 = fits.Column(name='ARTDAY' , format='6A' , unit='' , array=np.array(df['artday' ]))
c01 = fits.Column(name='MSKTIME' , format='19A', unit='date', array=np.array(df['msktime' ]))
c02 = fits.Column(name='COMPLETE' , format='1I' , unit='' , array=np.array(df['complete' ]))
c03 = fits.Column(name='STATUS' , format='1I' , unit='' , array=np.array(status ))
c04 = fits.Column(name='ERRORS' , format='1I' , unit='' , array=np.array(df['errors' ]))
c05 = fits.Column(name='PROCESSED' , format='19A', unit='date', array=np.array(df['processed']))
c06 = fits.Column(name='TOOLVER' , format='10A', unit='date', array=np.array(df['toolver' ]))
c07 = fits.Column(name='SCI_PROCESSED', format='19A', unit='' , array=np.array(df['sciproc' ]))
c08 = fits.Column(name='SCI_TOOLVER' , format='10A', unit='' , array=np.array(df['scitool' ]))
c09 = fits.Column(name='SCI_CALDB' , format='10A', unit='' , array=np.array(df['scicaldb' ]))
c10 = fits.Column(name='START' , format='1D' , unit='sec' , array=np.array(df['start' ]))
c11 = fits.Column(name='STOP' , format='1D' , unit='sec' , array=np.array(df['stop' ]))
c12 = fits.Column(name='DAYLEN' , format='1D' , unit='sec' , array=np.array(df['daylen' ]))
c13 = fits.Column(name='T1' , format='1I' , unit='' , array=np.array(df['T1' ]))
c14 = fits.Column(name='T2' , format='1I' , unit='' , array=np.array(df['T2' ]))
c15 = fits.Column(name='T3' , format='1I' , unit='' , array=np.array(df['T3' ]))
c16 = fits.Column(name='T4' , format='1I' , unit='' , array=np.array(df['T4' ]))
c17 = fits.Column(name='T5' , format='1I' , unit='' , array=np.array(df['T5' ]))
c18 = fits.Column(name='T6' , format='1I' , unit='' , array=np.array(df['T6' ]))
c19 = fits.Column(name='T7' , format='1I' , unit='' , array=np.array(df['T7' ]))
c20 = fits.Column(name='GYRO' , format='1I' , unit='' , array=np.array(df['GYRO' ]))
c21 = fits.Column(name='BOKZ' , format='1I' , unit='' , array=np.array(df['BOKZ' ]))
c22 = fits.Column(name='SED1' , format='1I' , unit='' , array=np.array(df['SED1' ]))
c23 = fits.Column(name='SED2' , format='1I' , unit='' , array=np.array(df['SED2' ]))
cols = fits.ColDefs(
[
c00, c01, c02, c03, c04, c05, c06, c07, c08, c09,
c10, c11, c12, c13, c14, c15, c16, c17, c18, c19,
c20, c21, c22, c23
]
)
indexhdu = fits.BinTableHDU.from_columns(cols)
indexhdu.name = "INDEX"
creator = 'artindex v.{}'.format(__version__)
indexhdu.header['creator' ] = (creator, 'program and version that created this file')
dtnow = datetime.now(tz=TZ_MSK)
indexhdu.header['mjdref' ] = (MJDREFD , 'MJD corresponding to SC clock start 2000.0 MSK' )
indexhdu.header['date' ] = (dtnow.strftime('%Y/%m/%d'), 'date that FITS file was created' )
hdulist = fits.HDUList([hdu, indexhdu])
hdulist.writeto(self._index, overwrite=True)
print("written: {}".format(self._index))
@dataclass
class SessionIndexFrame:
def __init__(self):
self.key = ""
self.stem = ""
self.station = "00"
self.sequence = "000"
self.data_quality = 0
self.total_quality = 0
self.processed = ""
self.intervals = {
"T1": (0, 0),
"T2": (0, 0),
"T3": (0, 0),
"T4": (0, 0),
"T5": (0, 0),
"T6": (0, 0),
"T7": (0, 0),
}
self.flag = 0
def make_key(self):
self.key = ".".join((self.stem, self.station, self.sequence))
if __name__ == "__main__":
pass

388
arttools/src/arttools/arttime.py Executable file
View File

@@ -0,0 +1,388 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2021-2024 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the M. Pavlinksy SRG/ART-XC telescope software.
# -----------------------------------------------------------------------------
import os
import sys
import shutil
import argparse
import numpy as np
from datetime import datetime
import astropy.units as u
from astropy.io import fits
from astropy.time.formats import erfa
from astropy.time import Time
from astropy.time import TimezoneInfo
from arttools.version import __version__
from arttools.argparse import KvAction
from arttools.toolargs import ToolArgs
MJDREF = Time(51543.875, format='mjd') #corresponds to date 01.01.2000 00:00:00 (UTC+3)
TZ_UTC = TimezoneInfo(utc_offset=0*u.hour) #UTC time zone
TZ_MSK = TimezoneInfo(utc_offset=3*u.hour) #UTC+3 (Moscow) time zone
class ArtTime(object):
def __init__(self, artday=None, missiontime=None, mjd=None, datetime_utc=None, datetime_msk=None):
self._has_value = 0
self._try_aday(artday)
self._try_mtime(missiontime)
self._try_mjd(mjd)
self._try_dtime_utc(datetime_utc)
self._try_dtime_msk(datetime_msk)
if self._has_value > 1:
print('Error: ArtTime needs only one argument')
exit()
if self._has_value == 0:
datetime_utc = datetime.utcnow()
self._try_dtime_utc(datetime_utc.isoformat())
def _try_mtime(self, missiontime):
if missiontime is None:
return
self._has_value = self._has_value+1
self._missiontime = MissionTime(missiontime)
self._artday = ArtDay(self._missiontime.to_artday())
self._mjd = MjdDay(self._missiontime.to_mjd())
self._datetime_utc = Dtime(dtime=str(self._missiontime.to_datetime(tzone=TZ_UTC)))
self._datetime_msk = Dtime(dtime=str(self._missiontime.to_datetime(tzone=TZ_MSK)))
return
def _try_aday(self, artday):
if artday is None:
return
self._has_value = self._has_value+1
self._artday = ArtDay(artday)
self._missiontime = MissionTime(self._artday.to_mission())
self._mjd = MjdDay(self._artday.to_mjd())
self._datetime_utc = Dtime(dtime=str(self._artday.to_datetime(tzone=TZ_UTC)))
self._datetime_msk = Dtime(dtime=str(self._artday.to_datetime(tzone=TZ_MSK)))
def _try_mjd(self, mjd):
if mjd is None:
return
self._has_value = self._has_value+1
self._mjd = MjdDay(mjd)
self._artday = ArtDay(self._mjd.to_artday())
self._missiontime = MissionTime(self._mjd.to_mission())
self._datetime_utc = Dtime(dtime=str(self._mjd.to_datetime(tzone=TZ_UTC)))
self._datetime_msk = Dtime(dtime=str(self._mjd.to_datetime(tzone=TZ_MSK)))
def _try_dtime_utc(self, datetime_utc):
if datetime_utc is None:
return
self._has_value = self._has_value+1
self._datetime_utc = Dtime(datetime_utc, tzone=TZ_UTC)
self._datetime_msk = Dtime(str(self._datetime_utc.to_msk()))
self._missiontime = MissionTime(self._datetime_utc.to_mission())
self._artday = ArtDay(self._datetime_utc.to_artday())
self._mjd = MjdDay(self._datetime_utc.to_mjd())
def _try_dtime_msk(self, datetime_msk):
if datetime_msk is None:
return
self._has_value = self._has_value+1
self._datetime_msk = Dtime(datetime_msk, tzone=TZ_MSK)
self._datetime_utc = Dtime(str(self._datetime_msk.to_utc()))
self._missiontime = MissionTime(self._datetime_msk.to_mission())
self._artday = ArtDay(self._datetime_msk.to_artday())
self._mjd = MjdDay(self._datetime_msk.to_mjd())
@property
def missiontime(self):
return self._missiontime
@property
def artday(self):
return self._artday
@property
def mjd(self):
return self._mjd
@property
def datetime_utc(self):
return self._datetime_utc
@property
def datetime_msk(self):
return self._datetime_msk
def __str__(self):
aday_str = 'Artday --> ' + str(self._artday )
mtime_str = 'Mission time --> ' + str(self._missiontime )
mjd_str = 'MJD --> ' + str(self._mjd )
date_msk_str = 'Date MSK --> ' + str(self._datetime_msk)
date_utc_str = 'Date UTC --> ' + str(self._datetime_utc)
return '\n'.join((aday_str, mtime_str, mjd_str, date_msk_str, date_utc_str))
class MissionTime(object):
def __init__(self, missiontime):
self._missiontime = missiontime
def get(self):
return self._missiontime
@property
def time(self):
return self._missiontime
def __str__(self):
return str(self._missiontime)
def to_datetime(self, tzone=TZ_UTC):
days = self.to_mjd()
return days.to_datetime(timezone=tzone)
def to_artday(self):
ARTDAYLEN = erfa.DAYSEC
return self._missiontime / ARTDAYLEN
def to_mjd(self):
mjd_value = MJDREF.mjd + Time(self._missiontime / erfa.DAYSEC, format='mjd').mjd
return Time(mjd_value, format='mjd')
class ArtDay(object):
def __init__(self, artday):
self._artday = artday
def get(self):
return self._artday
@property
def time(self):
return self._artday
def __str__(self):
return str(self._artday)
def to_mission(self):
ARTDAYLEN = erfa.DAYSEC
return self._artday * ARTDAYLEN
def to_datetime(self, tzone=TZ_UTC):
days = self.to_mjd()
return days.to_datetime(timezone=tzone)
def to_mjd(self):
mjd_value = MJDREF.mjd + Time(self._artday, format='mjd').mjd
return Time(mjd_value, format='mjd')
def to_str(self):
return '{:0>6}'.format(self._artday)
class MjdDay(object):
def __init__(self, mjd):
self._mjd = Time(mjd, format='mjd')
def get(self):
return self._mjd
@property
def time(self):
return self._mjd
def __str__(self):
return str(self._mjd)
def to_mission(self):
ARTDAYLEN = erfa.DAYSEC
return (self._mjd.mjd - MJDREF.mjd) * ARTDAYLEN
def to_artday(self):
return self._mjd.mjd - MJDREF.mjd
def to_datetime(self, tzone=TZ_UTC):
return Time(self._mjd, format='mjd').to_datetime(timezone=tzone)
class Dtime(object):
def __init__(self, dtime, tzone=None):
dtime = datetime.fromisoformat(dtime)
if tzone is None:
self._dtime = dtime
else:
self._dtime = dtime.replace(tzinfo=tzone)
self._tzone = tzone
def get(self):
return self._dtime
@property
def time(self):
return self._dtime
def __str__(self):
return str(self._dtime)
def to_mjd(self):
days = Time(self._dtime)
return days
def to_artday(self):
return self.to_mjd().mjd - MJDREF.mjd
def to_mission(self):
ARTDAYLEN = erfa.DAYSEC
return self.to_artday() * ARTDAYLEN
def to_msk(self):
dtime_utc = Time(self._dtime)
return dtime_utc.to_datetime(timezone=TZ_MSK)
def to_utc(self):
dtime_msk = Time(self._dtime)
return dtime_msk.to_datetime(timezone=TZ_UTC)
class Args(ToolArgs):
def __init__(self):
super().__init__()
self.artday = None
self.missiontime = None
self.mjd = None
self.datetime_utc = None
self.datetime_msk = None
def read(self):
argparser = argparse.ArgumentParser(
description=('The arttime tool converts artdays, mission time, MSK and UTC date time'
' and MJD to each other. Calling arttime without parameters will use the current '
'local time as input.'))
argparser.add_argument(
'--artday', '--aday',
help=('ART-XC days.'
'Example: arttime --artday=8000'),
action=KvAction)
argparser.add_argument(
'--missiontime', '--mtime',
help=('Onboard ART-XC seconds. '
'Example: arttime --missiontime=691200000'),
action=KvAction)
argparser.add_argument(
'--mjd',
help=('MJD. '
'Example: arttime --mjd=59543.875'),
action=KvAction)
argparser.add_argument(
'--datetime_utc', '--dtime_utc',
help=('Date time based on UTC time zone. '
'Example: arttime --datetime_utc=2021-11-25T21:00:00'),
action=KvAction)
argparser.add_argument(
'--datetime_msk', '--dtime_msk',
help=('Date time based on Moscow time zone (UTC+3). '
'Example: arttime --datetime_msk=2021-11-26T00:00:00'),
action=KvAction)
args = argparser.parse_args()
if args.artday is not None:
self.artday = float(args.artday)
if args.missiontime is not None:
self.missiontime = float(args.missiontime)
if args.mjd is not None:
self.mjd = float(args.mjd)
self.datetime_utc = args.datetime_utc
self.datetime_msk = args.datetime_msk
def print(self):
logstr = '\n'.join((
'Input parameters:',
' artday = {}'.format(self.artday ),
' missiontime = {}'.format(self.missiontime ),
' mjd = {}'.format(self.mjd ),
' datetime_utc = {}'.format(self.datetime_utc),
' datetime_msk = {}'.format(self.datetime_msk)
))
print(logstr)
class Task(object):
def __init__(self):
pass
def __enter__(self):
self.initialize()
return self
def __exit__(self, ex_type, ex_value, ex_traceback):
self.finalize()
def initialize(self):
print('running: arttime v.{}'.format(__version__) )
print('================================================================================')
self._args = Args()
self._args.read()
self._args.print()
def finalize(self):
print('--------------------------------------------------------------------------------')
def run(self):
if self._args.artday is not None:
print(ArtTime(artday=self._args.artday))
if self._args.missiontime is not None:
print(ArtTime(missiontime=self._args.missiontime))
if self._args.mjd is not None:
print(ArtTime(mjd=self._args.mjd))
if self._args.datetime_utc is not None:
print(ArtTime(datetime_utc=self._args.datetime_utc))
if self._args.datetime_msk is not None:
print(ArtTime(datetime_msk=self._args.datetime_msk))
if len(sys.argv) <= 1:
datetime_utc = datetime.utcnow()
print('*** Convertation for the current local time:')
print(ArtTime(datetime_utc=datetime_utc.isoformat()))
def main():
with Task() as task:
task.run()
if __name__ == '__main__':
main()

72
arttools/src/arttools/cmd.py Executable file
View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2025 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import sys
import asyncio as aio
from arttools.errors import Error
class InternalError(Error):
pass
class UncleanExecutionError(Error):
pass
class ExecutionResult(object):
def __init__(self, cmd, retcode, stdout_data, stderr_data):
self.command = cmd
self.return_code = retcode
self.stdout_data = stdout_data
self.stderr_data = stderr_data
class Command(object):
def __init__(self, cmd, stdout=sys.stdout, stderr=sys.stderr):
self._cmd = cmd
self._stdout = stdout
self._stderr = stderr
self._result = None
async def _run(self):
process = await aio.create_subprocess_shell(
cmd=self._cmd,
stdout=self._stdout,
stderr=self._stderr
)
stdout_data, stderr_data = await process.communicate()
if stdout_data is not None:
stdout_data = stdout_data.decode()
if stderr_data is not None:
stderr_data = stderr_data.decode()
self._result = \
ExecutionResult(
self._cmd,
process.returncode,
stdout_data,
stderr_data
)
def run(self):
aio.run(self._run())
return self._result
if __name__ == "__main__":
pass

View File

@@ -0,0 +1,99 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2025 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the M. Pavlinsky SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import polars as pl
import numpy as np
from astropy.io import fits
class DataTable(object):
def __init__(self):
self._dataframe = None
self._adapter = None
@property
def frame(self): return self._dataframe
@property
def size(self): return self._dataframe.height
def adapter(self, adapter):
self._adapter = adapter
return self
def read(self):
if self._adapter is not None:
self._dataframe = self._adapter.read()
return self
def column(self, name):
return self._dataframe.get_column(name)
def setcol(self, name, arr):
self._dataframe =\
self._dataframe.with_columns(
pl.Series(name, arr)
)
class FitsAdapter(object):
def __init__(self, filename, tablename):
self._FORMATS = {
'D': pl.Float64,
'J': pl.UInt32 ,
'I': pl.UInt16 ,
'B': pl.UInt8 ,
'A': pl.String
}
self._filename = filename
self._tablename = tablename
def _schema_for(self, columns):
schema = []
for col in columns:
colformat = ''.join([ch for ch in col.format if ch.isalpha()])
schema.append((col.name, self._FORMATS[colformat]))
return schema
def read(self):
with fits.open(self._filename) as hdul:
hdu = hdul[self._tablename]
schema = self._schema_for(hdu.data.columns)
data = {}
for col in hdu.data.columns:
colformat = ''.join([ch for ch in col.format if ch.isalpha()])
if colformat == 'D': data[col.name] = np.array(hdu.data[col.name], dtype=np.float64)
elif colformat == 'I': data[col.name] = np.array(hdu.data[col.name], dtype=np.uint16 )
else:
data[col.name] = hdu.data[col.name]
return pl.DataFrame(data, schema=schema)
def write(self):
pass
class Adapter(object):
pass
if __name__ == '__main__':
pass

21
arttools/src/arttools/errors.py Executable file
View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2025 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the MVN software project.
# -----------------------------------------------------------------------------
class Error(Exception):
pass
class ConfigNotFoundError(Error):
pass
class RegionsError(Error):
pass
if __name__ == "__main__":
pass

404
arttools/src/arttools/events.py Executable file
View File

@@ -0,0 +1,404 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2021-2026 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the M. Pavlinsky SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import os
import shutil
import datetime
import numpy as np
from scipy.interpolate import interp1d
from astropy.io import fits
import astropy.units as u
from astropy.time.formats import erfa
from astropy.time import Time
from astropy.time import TimezoneInfo
TZ_UTC = TimezoneInfo(utc_offset=0*u.hour) # UTC time zone
TZ_MSK = TimezoneInfo(utc_offset=3*u.hour) # UTC+3 (Moscow) time zone
from arttools.arttime import MissionTime
from arttools.gti import Gti
from arttools.datatable import DataTable, FitsAdapter
from arttools.version import __version__
class Events(object):
_deadtimes = {
1: 770e-6,
2: 770e-6,
3: 770e-6,
4: 770e-6,
5: 770e-6,
6: 1060e-6,
7: 770e-6,
}
_eventlist_keys = [
'TIME',
'TIME_I',
'TIME_F',
'TIME_CORR',
'ENERGY',
'ENERGY_TOP',
'ENERGY_BOT',
'GRADE',
'FLAG',
'RA',
'DEC',
'RAW_X',
'RAW_Y',
'PHA_BOT',
'PHA_BOT_ADD1',
'PHA_BOT_SUB1',
'PHA_TOP',
'PHA_TOP_ADD1',
'PHA_TOP_SUB1',
'TRIGGER',
]
_eventlist_layout = {
'TIME' : {'format': '1D', 'unit': 'sec' },
'TIME_I' : {'format': '1J', 'unit': 'sec', 'bzero': 2147483648},
'TIME_F' : {'format': '1D', 'unit': 'sec' },
'TIME_CORR' : {'format': '1D', 'unit': 'sec' },
'ENERGY' : {'format': '1D', 'unit': 'keV' },
'ENERGY_TOP' : {'format': '1D', 'unit': 'keV' },
'ENERGY_BOT' : {'format': '1D', 'unit': 'keV' },
'GRADE' : {'format': '1I', 'unit': '' },
'FLAG' : {'format': '1I', 'unit': '' },
'RA' : {'format': '1D', 'unit': 'deg' },
'DEC' : {'format': '1D', 'unit': 'deg' },
'RAW_X' : {'format': '1I', 'unit': 'pix' },
'RAW_Y' : {'format': '1I', 'unit': 'pix' },
'PHA_BOT' : {'format': '1I', 'unit': '' },
'PHA_BOT_ADD1': {'format': '1I', 'unit': '' },
'PHA_BOT_SUB1': {'format': '1I', 'unit': '' },
'PHA_TOP' : {'format': '1I', 'unit': '' },
'PHA_TOP_ADD1': {'format': '1I', 'unit': '' },
'PHA_TOP_SUB1': {'format': '1I', 'unit': '' },
'TRIGGER' : {'format': '1I', 'unit': '' },
}
def __init__(self, srcfile):
super().__init__()
self._srcfile = srcfile
self._telname = 0
self._teln = 0
self._tstart = 0
self._tstop = 0
self._events = DataTable()
self._hk = DataTable()
self._gti = Gti()
@property
def empty(self):
return self.size == 0
@property
def size(self):
return self._events.size
@property
def teln(self):
return self._teln
@property
def telname(self):
return self._telname
@property
def mjdref(self):
return self._mjdref
@property
def tstart(self):
return self._tstart
@property
def tstop(self):
return self._tstop
# @property
# def deadtime(self):
# return self._deadtime
@property
def events(self):
return self._events
@property
def hk(self):
return self._hk
@property
def gti(self):
return self._gti
@property
def filename(self):
return self._srcfile
def _telescope_deadtime(self):
return self._deadtimes[self._teln]
def deadtime(self, start, stop):
interval = stop - start
hktimes = np.array(self.hk.column('TIME' ))
hkevt = np.array(self.hk.column('EVENTS')).astype(np.int64)
X = hktimes[1:]
Y = np.diff(hkevt) / np.diff(hktimes)
hkmask = Y > 0
X = X[hkmask]
Y = Y[hkmask]
datamask = (X >= start) & (X <= stop)
interp = interp1d(X, Y, kind='linear', bounds_error=False, fill_value='extrapolate')
begin = interp(start).item()
end = interp(stop).item()
middle = Y[datamask]
cnttimes = np.array([start, *X[datamask], stop])
countrate = np.array([begin, *middle, end])
return np.average(((countrate[1:] + countrate[:-1]) / 2.), weights=np.diff(cnttimes)) * interval * self._telescope_deadtime()
def bindeadtime(self, edges, bins):
hktimes = np.array(self.hk.column('TIME' ))
hkevt = np.array(self.hk.column('EVENTS')).astype(np.int64)
binrate = np.ones_like(bins)
evtcountfn = interp1d(hktimes, hkevt,
kind='linear', bounds_error=False, fill_value='extrapolate')
edge_events = evtcountfn(edges)
# 2) make mask for event counter reversals (negative diffs)
edgediff = np.diff(edge_events)
edgemask = edgediff > 0
# # 3) interpolate filtered (2) points
edgecountfn = interp1d(
edges[1:][edgemask], edgediff[edgemask],
kind='linear', bounds_error=False, fill_value='extrapolate')
# 4) calculate deadtimes and fractional exposure for bins
bindeadt = edgecountfn(edges) * self._telescope_deadtime()
binrate -= bindeadt[1:] / np.diff(edges)
return bindeadt[1:], binrate
def _read_header(self):
with fits.open(self._srcfile) as hdul:
self._teln = hdul['events'].header['teln' ]
self._telname = hdul['events'].header['instrume']
self._mjdref = hdul['events'].header['mjdref' ]
self._tstart = hdul['events'].header['tstart' ]
self._tstop = hdul['events'].header['tstop' ]
def read(self):
self._read_header()
self._events = DataTable().adapter(FitsAdapter(self._srcfile, 'events')).read()
self._hk = DataTable().adapter(FitsAdapter(self._srcfile, 'hk' )).read()
self._gti = Gti.from_hduname(self._srcfile, 'stdgti')
return self
def filter(self, gti):
pass
def write_cl(self, dstfile, cdbinfo=[]):
srchdul = fits.open(self._srcfile)
# --- primary HDU ---
hdu = fits.PrimaryHDU()
# --- EVENTS HDU ---
a00 = np.array(self._events.column('TIME' ))
a01 = np.array(self._events.column('TIME_I' ))
a02 = np.array(self._events.column('TIME_F' ))
a03 = np.array(self._events.column('TIME_CORR' ))
a04 = np.array(self._events.column('ENERGY' ))
a05 = np.array(self._events.column('ENERGY_TOP' ))
a06 = np.array(self._events.column('ENERGY_BOT' ))
a07 = np.array(self._events.column('GRADE' ))
a08 = np.array(self._events.column('FLAG' ))
a09 = np.array(self._events.column('RA' ))
a10 = np.array(self._events.column('DEC' ))
a11 = np.array(self._events.column('RAW_X' ))
a12 = np.array(self._events.column('RAW_Y' ))
a13 = np.array(self._events.column('PHA_BOT' ))
a14 = np.array(self._events.column('PHA_BOT_ADD1'))
a15 = np.array(self._events.column('PHA_BOT_SUB1'))
a16 = np.array(self._events.column('PHA_TOP' ))
a17 = np.array(self._events.column('PHA_TOP_ADD1'))
a18 = np.array(self._events.column('PHA_TOP_SUB1'))
a19 = np.array(self._events.column('TRIGGER' ))
c00 = fits.Column(name='TIME' , format='1D', unit='sec', array=a00 )
c01 = fits.Column(name='TIME_I' , format='1J', unit='sec', array=a01, bzero=2147483648)
c02 = fits.Column(name='TIME_F' , format='1D', unit='sec', array=a02 )
c03 = fits.Column(name='TIME_CORR' , format='1D', unit='sec', array=a03 )
c04 = fits.Column(name='ENERGY' , format='1D', unit='keV', array=a04 )
c05 = fits.Column(name='ENERGY_TOP' , format='1D', unit='keV', array=a05 )
c06 = fits.Column(name='ENERGY_BOT' , format='1D', unit='keV', array=a06 )
c07 = fits.Column(name='GRADE' , format='1I', unit='' , array=a07 )
c08 = fits.Column(name='FLAG' , format='1I', unit='' , array=a08 )
c09 = fits.Column(name='RA' , format='1D', unit='deg', array=a09 )
c10 = fits.Column(name='DEC' , format='1D', unit='deg', array=a10 )
c11 = fits.Column(name='RAW_X' , format='1I', unit='pix', array=a11 )
c12 = fits.Column(name='RAW_Y' , format='1I', unit='pix', array=a12 )
c13 = fits.Column(name='PHA_BOT' , format='1I', unit='' , array=a13 )
c14 = fits.Column(name='PHA_BOT_ADD1', format='1I', unit='' , array=a14 )
c15 = fits.Column(name='PHA_BOT_SUB1', format='1I', unit='' , array=a15 )
c16 = fits.Column(name='PHA_TOP' , format='1I', unit='' , array=a16 )
c17 = fits.Column(name='PHA_TOP_ADD1', format='1I', unit='' , array=a17 )
c18 = fits.Column(name='PHA_TOP_SUB1', format='1I', unit='' , array=a18 )
c19 = fits.Column(name='TRIGGER' , format='1I', unit='' , array=a19 )
cols = fits.ColDefs([
c00, c01, c02, c03, c04, c05, c06, c07, c08, c09,
c10, c11, c12, c13, c14, c15, c16, c17, c18, c19
])
evthdu = fits.BinTableHDU.from_columns(cols)
evthdu.name = 'EVENTS'
creator = 'artpipeline v.{}'.format(__version__)
evthdu.header['creator' ] = (creator , 'program and version that created this file' )
evthdu.header['origin' ] = ('IKI RAS' , 'institution that created this file' )
evthdu.header['telescop'] = ('SRG/ART-XC', 'mission name' )
# copy header keys
COPYKEYS = [
'instrume', 'teln', 'detn', 'urdn',
'mjdref', 'artday', 'tstart', 'tstop',
'date', 'date-obs', 'time-obs', 'date-end', 'time-end',
'maxticks', 'timedel', 'timeunit', 'timeref', 'tassign'
]
for key in COPYKEYS:
try:
evthdu.header[key] = srchdul['EVENTS'].header[key]
except KeyError:
pass
# add caldb key
for key in cdbinfo:
evthdu.header[key] = cdbinfo[key]
# --- make file ---
hdulist = fits.HDUList([hdu, evthdu, srchdul['KVEA'], srchdul['HK'], srchdul['STDGTI']])
hdulist.writeto(dstfile, overwrite=True, checksum=True)
def modify_and_write_cl(self, dstfile, creator, keys, layout, data, position, mask, gti, cdbinfo=[]):
srchdul = fits.open(self._srcfile)
mjdref = srchdul['EVENTS'].header['MJDREF']
# -- prepare keys
newkeys = self._eventlist_keys[:position] + keys + self._eventlist_keys[position:]
# -- prepare layout
newlayout = self._eventlist_layout
newlayout.update(layout)
# -- prepare data
newdata = { }
for key in self._eventlist_keys:
newdata[key] = np.array(self._events.column(key))
for key in keys:
newdata[key] = np.array(data[key])
# -- prepare fits structure
hdu = fits.PrimaryHDU()
cols = []
for key in newkeys:
lay = newlayout[key]
if 'bzero' in lay:
col = fits.Column(name=key, format=lay['format'], unit=lay['unit'], array=newdata[key][mask], bzero=lay['bzero'])
else:
col = fits.Column(name=key, format=lay['format'], unit=lay['unit'], array=newdata[key][mask])
cols.append(col)
evthdu = fits.BinTableHDU.from_columns(cols)
evthdu.name = 'EVENTS'
evthdu.header['creator' ] = (creator , 'program and version that created this file' )
evthdu.header['origin' ] = ('IKI RAS' , 'institution that created this file' )
evthdu.header['telescop'] = ('SRG/ART-XC', 'mission name' )
# copy header keys
COPYKEYS = [
'instrume', 'teln', 'detn', 'urdn',
'maxticks', 'timedel', 'timeunit', 'timeref', 'tassign'
]
for key in COPYKEYS:
try:
evthdu.header[key] = srchdul['EVENTS'].header[key]
except KeyError:
pass
dtnow = datetime.datetime.now(tz=TZ_MSK)
dtobs = MissionTime(self.gti.start).to_datetime(TZ_MSK)
dtend = MissionTime(self.gti.stop ).to_datetime(TZ_MSK)
deadtime = self.deadtime(self.gti.start, self.gti.stop)
evthdu.header['MJDREF' ] = (mjdref , 'MJD corresponding to SC clock start 2000.0 MSK')
evthdu.header['tstart' ] = (self.gti.start , 'start time of the observation [sec]' )
evthdu.header['tstop' ] = (self.gti.stop , 'end time of the observation [sec]' )
evthdu.header['date' ] = (dtnow.strftime('%Y/%m/%d') , 'date that FITS file was created' )
evthdu.header['date-obs'] = (dtobs.strftime('%Y/%m/%d') , 'start date of the observation MSK')
evthdu.header['date-end'] = (dtend.strftime('%Y/%m/%d') , 'end date of the observation MSK' )
evthdu.header['time-obs'] = (dtobs.strftime('%H:%M:%S') , 'start time of the observation MSK')
evthdu.header['time-end'] = (dtend.strftime('%H:%M:%S') , 'end time of the observation MSK' )
evthdu.header['onsource'] = (self.gti.onsource , '[sec]' )
evthdu.header['exposure'] = (self.gti.exposure - deadtime, '[sec]' )
# add caldb key
# for key in cdbinfo:
# evthdu.header[key] = cdbinfo[key]
# --- make file ---
hdulist = fits.HDUList([hdu, evthdu, srchdul['KVEA'], srchdul['HK'], gti.make_hdu(mjdref)])
hdulist.writeto(dstfile, overwrite=True, checksum=True)
def setcol(self, name, arr):
self._events.setcol(name, arr)
if __name__ == '__main__':
pass

View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2025 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the M. Pavlinsky SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import os
import shutil
from enum import Enum
class Dir(object):
class Type(Enum):
path = 1
file = 2
def __init__(self, path):
self._path = path
@property
def value(self):
return self._path
def get(self):
return self._path
def pathfor(self, name):
return os.path.join(self._path, name)
def __truediv__(self, path):
return Dir(os.path.join(self._path, path))
def ls(self, type=None, filt=None):
flist = [f for f in os.listdir(self._path)]
flist.sort()
if filt is not None:
flist = [f for f in flist if filt(f)]
typefn = {
self.Type.path: os.path.isdir ,
self.Type.file: os.path.isfile,
}.get(type, None)
if typefn is not None:
flist = [f for f in flist if typefn(os.path.join(self._path, f))]
return flist
def lsdirs(self, filt=None):
return self.ls(type=self.Type.path, filt=filt)
def lsfiles(self, filt=None):
return self.ls(type=self.Type.file, filt=filt)
def empty(self):
if not os.path.isdir(self._path):
return False
return len(os.listdir(self._path)) == 0
def delete(self):
if self.exists():
shutil.rmtree(self._path)
def exists(self):
if self._path is None:
return False
return os.path.exists(self._path)
def check(self, clean=False):
if clean:
self.delete()
if not self.exists():
os.makedirs(self._path)
if __name__ == "__main__":
pass

View File

@@ -0,0 +1,254 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2020-2025 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import numpy as np
from astropy.io import fits
class Gti(object):
def __init__(self, array=np.zeros((0,2), dtype=np.double), start=None, stop=None):
self._array = np.reshape(np.asarray(array), (-1,2))
self._start = self._ntime(start, default=lambda: self._array.min(axis=0)[0])
self._stop = self._ntime(stop , default=lambda: self._array.max(axis=0)[1])
@property
def array(self):
return self._array
@property
def empty(self):
return self._array.size == 0
@property
def size(self) :
return self._array.shape[0]
@property
def start(self):
return self._start
@property
def stop(self) :
return self._stop
@property
def exposure(self):
return -np.sum(np.subtract.reduce(self._array, axis=1))
@property
def onsource(self):
return self._stop - self._start
def _ntime(self, val, default):
if val is not None:
return val
if self._array.size == 0:
return None
return default()
def mask(self, times):
masks = []
for istart, istop in self._array:
masks.append(np.logical_and(times >= istart, times < istop))
return np.any(masks, axis=0)
@staticmethod
def from_mask(times, mask):
slices = np.ma.clump_masked(np.ma.masked_where(mask, mask))
gtis = [[times[s.start], times[s.stop - 1]] for s in slices]
return Gti(gtis)
@staticmethod
def from_hduname(fitsfile, hduname):
with fits.open(fitsfile) as hdul:
hdu = hdul[hduname]
array = np.array([hdu.data['START'], hdu.data['STOP']]).T
try:
start = hdu.header['TSTART']
stop = hdu.header['TSTOP' ]
except KeyError:
start = hdu.data['START'][ 0]
stop = hdu.data['STOP' ][-1]
return Gti(array, start, stop)
@staticmethod
def from_hdu(hdu):
array = np.array([hdu.data['START'], hdu.data['STOP']]).T
try:
start = hdu.header['TSTART']
stop = hdu.header['TSTOP' ]
except KeyError:
start = hdu.data['START'][ 0]
stop = hdu.data['STOP' ][-1]
return Gti(array, start, stop)
def clone(self):
return Gti(self._array, self._start, self._stop)
def normalize(self, edge_delta=0.5):
if self._array.size == 0:
return
gti_edges_dt = self._array[1:, 0] - self._array[:-1, 1]
gti_edges_mask = gti_edges_dt > edge_delta / 2.
gti_indices = np.ravel(np.where(gti_edges_mask))
new_gtis = []
current_index = 0
for index in gti_indices:
start = self._array[current_index, 0]
stop = self._array[index, 1]
new_gtis.append([start, stop])
current_index = index + 1
start = self._array[current_index, 0]
new_gtis.append([start, self._stop])
return Gti(new_gtis)
def __eq__(self, other):
if self is other: return True
if other is None : return False
return np.array_equal(self._array, other.array) \
and (self._start == other.start) and (self._stop == other.stop)
def __add__(self, other):
if self is other: return self
if other is None : return self
if other.empty : return self
if self.empty : return other
array = self._union(other.array)
start = min(self._start, other.start)
stop = max(self._stop , other.stop )
return Gti(array, start, stop)
def __sub__(self, other):
if self is other: return Gti()
if self.empty : return Gti()
if other is None : return self
if other.empty : return self
array = self._diff(other.array)
return Gti(array, self._start, self._stop)
def __mul__(self, other):
if self is other: return self
if other is None : return self
if self.empty : return Gti()
if other.empty : return Gti()
array = self._intersect(other.array)
start = min(self._start, other.start)
stop = max(self._stop , other.stop )
return Gti(array, start, stop)
def _union(self, array):
array = np.concatenate((self._array, array), axis=0)
array = np.sort(array, axis=0)
starts = array[:,0]
stops = array[:,1]
mask = np.ones(array.shape[0] + 1, dtype=bool)
mask[ 1:-1] = starts[1:] >= stops[:-1]
return np.vstack((starts[:][mask[:-1]], stops[:][mask[1:]])).T
def _diff(self, array):
starts = self._array[:,0]
stops = self._array[:,1]
for item in array:
istart = item[0]
istop = item[1]
mask_lower = np.logical_and(istart >= starts, istart <= stops)
mask_upper = np.logical_and(istop >= starts, istop <= stops)
starts[mask_lower] = istart
stops [mask_upper] = istop
return np.dstack((starts, stops))
def _intersect_helper(self, arr1, arr2):
newarray = []
for start1, stop1 in arr1:
for start2, stop2 in arr2:
start = max(start1, start2)
stop = min(stop1 , stop2 )
has_intersections = stop > start
if has_intersections:
newarray.append([start, stop])
return newarray
def _intersect(self, array):
newarray = []
newarray1 = self._intersect_helper(self._array, array)
newarray2 = self._intersect_helper(array, self._array)
newarray.extend(newarray1)
newarray.extend(newarray2)
return np.unique(np.array(newarray), axis=0)
def make_hdu(self, mjdref):
a1 = np.array(self._array[:, 0])
a2 = np.array(self._array[:, 1])
c1 = fits.Column(name='START', format='1D', unit='sec', array=a1)
c2 = fits.Column(name='STOP' , format='1D', unit='sec', array=a2)
cols = fits.ColDefs([c1, c2])
gtihdu = fits.BinTableHDU.from_columns(cols)
gtihdu.name = 'STDGTI'
gtihdu.header['TSTART'] = self._start
gtihdu.header['TSTOP' ] = self._stop
gtihdu.header['MJDREF'] = (mjdref, 'MJD corresponding to SC clock start 2000.0 MSK')
return gtihdu
def write_fits(self, gtifile, mjdref):
# --- GTI file structure
hdu = fits.PrimaryHDU()
a1 = np.array(self._array[:, 0])
a2 = np.array(self._array[:, 1])
c1 = fits.Column(name='START', format='1D', unit='sec', array=a1)
c2 = fits.Column(name='STOP' , format='1D', unit='sec', array=a2)
cols = fits.ColDefs([c1, c2])
gtihdu = fits.BinTableHDU.from_columns(cols)
gtihdu.name = 'STDGTI'
gtihdu.header['TSTART'] = self._start
gtihdu.header['TSTOP' ] = self._stop
gtihdu.header['MJDREF'] = (mjdref, 'MJD corresponding to SC clock start 2000.0 MSK')
# --- wirte GTI
hdulist = fits.HDUList([hdu, gtihdu])
hdulist.writeto(gtifile)
if __name__ == '__main__':
pass

View File

@@ -0,0 +1,93 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2024 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the M. Pavlinsky SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import os
from arttools.cmd import Command, ExecutionResult
from arttools.errors import Error
class HeatoolsError(Error):
pass
class Heatools(object):
_ftcopy = 'ftcopy {src}[{ext}] {dst} copyall=no history=no clobber=yes'
_ftappend = 'ftappend {srcfile}[{srcext}]{filter} {dstfile} history=yes'
_maketime = 'maketime {srcfile}[{srcext}] {gtifile} "{filter_expr}" time=TIME prefr=0 postfr=0 compact=no histkw=no'
_ftmgtime = 'ftmgtime {gtifilelist} {dstgti} merge=and emptygti=apply history=yes clobber=yes'
_ftselect = 'ftselect "{srcfile}[{srcext}]" {dstfile} \'gtifilter("{gtifile}")\' copyall=no history=yes clobber=yes'
_ftchksum = 'ftchecksum {srcfile} update=yes chatter=0'
@classmethod
def check(cls, envvar='HEADAS'):
if not envvar in os.environ:
raise HeatoolsError('error: Heasoft environment not found (is it initialzed?)')
@classmethod
def ftcopy(cls, src, dst, ext):
Command(
cls._ftcopy.format(
src=src,
dst=dst,
ext=ext
)
).run()
@classmethod
def ftappend(cls, srcfile, srcext, dstfile, filt=''):
Command(
cls._ftappend.format(
srcfile=srcfile,
srcext=srcext,
filter=filt,
dstfile=dstfile
)
).run()
def maketime(self, srcfile, srcext, gtifile, filter_expr):
Command(
self._maketime.format(
srcfile=srcfile,
srcext=srcext,
gtifile=gtifile,
filter_expr=filter_expr
)
).run()
def ftmgtime(self, gtifilelist, dstgti):
Command(
self._ftmgtime.format(
gtifilelist=gtifilelist,
dstgti=dstgti
)
).run()
def ftselect(self, srcfile, srcext, dstfile, gtifile):
Command(
self._ftselect.format(
srcfile=srcfile,
srcext=srcext,
dstfile=dstfile,
gtifile=gtifile
)
).run()
def ftchecksum(self, srcfile):
Command(
self._ftchksum.format(
srcfile=srcfile
)
).run()
if __name__ == '__main__':
pass

137
arttools/src/arttools/printer.py Executable file
View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
#
# -----------------------------------------------------------------------------
# Copyright (c) 2024 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import sys
import logging
from enum import Enum
from datetime import datetime
# class Log(object):
# @staticmethod
# def configure(who, logfile):
# import logging
# logging.basicConfig(
# level=logging.INFO,
# format='{}: %(asctime)s [%(levelname)-7s] %(message)s'.format(who),
# handlers=[
# logging.FileHandler(logfile),
# logging.StreamHandler()
# ]
# )
class PrintType(Enum):
LOG = 'log'
INFO = 'info'
WARNING = 'warning'
ERROR = 'error'
class PrintFormatter(object):
def __init__(self):
self._newline = '\n'
self._line_width = 80
self._line_type_1 = '-' * self._line_width
self._line_type_2 = '=' * self._line_width
self._line_type_3 = '*' * 5
self._offset_w = 0
self._offset_str = ' ' * self._offset_w
self._separator = self._newline + self._offset_str
def offset(self, offset):
self._offset_w = offset
self._offset_str = ' ' * self._offset_w
self._separator = self._newline + self._offset_str
def header(self, toolname, version):
return self.multiline([
'[' + self._timestr() + '] running: {} v.{}'.format(toolname, version),
self._line_type_2
])
def footer(self, toolname, version, message):
return self.multiline([
'{} v.{}: exited with status: {}'.format(toolname, version, message),
self._line_type_1
])
def log(self, message):
return self._offset_str + message
def message(self, level, message):
return self._format(level, message)
def multiline(self, vmessage):
return self._separator.join(vmessage)
def _format(self, level, message):
return '[{}] {:>7}: {}'.format(self._timestr(), level, message)
def _timestr(self):
return str(datetime.now())
class Printer(object):
def __init__(self, toolname, version, logfile=None):
super().__init__()
self._toolname = toolname
self._version = version
self._formatter = PrintFormatter()
# -- setup logger
logfmt = logging.Formatter('%(message)s')
self._logger = logging.getLogger(self._toolname)
self._logger.setLevel(logging.INFO)
self._logger.handlers = [] # reset log handlers
logsh = logging.StreamHandler(sys.stdout)
logsh.setFormatter(logfmt)
self._logger.addHandler(logsh)
if logfile is not None:
logfh = logging.FileHandler(logfile)
logfh.setFormatter(logfmt)
self._logger.addHandler(logfh)
def info(self, message):
self._logger.info(
self._formatter.message('INFO' , message))
def warning(self, message):
self._logger.warning(
self._formatter.message('WARNING', message))
def error(self, message):
self._logger.error(
self._formatter.message('ERROR' , message))
def log(self, message):
self._logger.info(
self._formatter.log(message))
def header(self):
self._logger.info(
self._formatter.header(self._toolname, self._version))
def footer(self, message):
self._logger.info(
self._formatter.footer(self._toolname, self._version, message))
if __name__ == '__main__':
pass

42
arttools/src/arttools/random.py Executable file
View File

@@ -0,0 +1,42 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2025 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the M. Pavlinsky SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import numpy as np
class Random(object):
def __init__(self, seed=None):
super().__init__()
self._seed = seed
self._rng = None
self._initialize()
def _initialize(self):
if self._seed is None:
sseq = np.random.SeedSequence()
self._seed = sseq.entropy
else:
sseq = np.random.SeedSequence(self._seed)
self._rng = np.random.default_rng(sseq)
@property
def seed(self): return self._seed
def uniform(self, low, high, size=None):
return self._rng.uniform(low, high, size)
if __name__ == '__main__':
pass

93
arttools/src/arttools/task.py Executable file
View File

@@ -0,0 +1,93 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2025 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the M. Pavlinsky SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import os
import sys
from abc import ABC, abstractmethod
from arttools.cmd import Command
from arttools.errors import Error
class TaskError(Error):
pass
class TaskArgs(ABC):
def __init__(self, cfgfile=None):
super().__init__()
self.pid = os.getpid()
self.env = {}
self.cfg = {}
self.read_env()
self.read_cfg(cfgfile)
@abstractmethod
def read_env(self):
pass
@abstractmethod
def read_cfg(self, cfgfile):
pass
@abstractmethod
def read_args(self):
pass
def read(self):
self.read_args()
@abstractmethod
def check(self):
pass
class TaskCmds(ABC):
def __init__(self):
super().__init__()
def run(self):
pass
class Task(ABC):
def __init__(self):
super().__init__()
@abstractmethod
def initialize(self):
pass
@abstractmethod
def finalize(self):
pass
@abstractmethod
def runmain(self):
pass
def run(self):
try:
self.initialize()
self.runmain()
self.finalize()
except TaskError as e:
print('error: task execution:', e)
except Error as e:
print('error: unknown:', e)
if __name__ == "__main__":
pass

View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2016-2025 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the M. Pavlinsky SRG/ART-XC software project.
# -----------------------------------------------------------------------------
class Teldef(object):
F = 2693.
d = .595
centeroffset = 23.5
assemblies = [
{'detn': 33, 'urdn': 28, 'teln': 1},
{'detn': 34, 'urdn': 22, 'teln': 2},
{'detn': 24, 'urdn': 23, 'teln': 3},
{'detn': 25, 'urdn': 24, 'teln': 4},
{'detn': 22, 'urdn': 25, 'teln': 5},
{'detn': 26, 'urdn': 26, 'teln': 6},
{'detn': 23, 'urdn': 30, 'teln': 7}
]
urdalist = ['02', '04', '08', '10', '20', '40', '80']
urdnlist = [ 28 , 22 , 23 , 24 , 25 , 26 , 30 ]
detnlist = [ 33 , 34 , 24 , 24 , 22 , 26 , 23 ]
telnlist = [ 1 , 2 , 3 , 4 , 5 , 6 , 7 ]
telnames = [''.join(('T', str(teln))) for teln in telnlist]
@classmethod
def telname(cls, teln):
if teln not in cls.telnlist:
return 'T0'
return ''.join(('T', str(teln)))
@classmethod
def teln(cls, detn, urdn):
for item in cls.assemblies:
if item['detn'] == detn and item['urdn'] == urdn:
return item['teln']
return 0
if __name__ == '__main__':
pass

View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2016-2024 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the M. Pavlinksy SRG/ART-XC telescope software.
# -----------------------------------------------------------------------------
import os
import shutil
class ToolArgs(object):
def __init__(self, configfile=None):
self._configfile = configfile
self.pid = os.getpid()
self.config = {}
self._read_env()
def _read_env(self):
if self._configfile is None:
return
def make_tmpdir(self, path):
return os.path.join(path, str(self.pid))
def clean_path(self, path):
if os.path.exists(path):
shutil.rmtree(path)
def check_path(self, path, clean=False):
if clean and os.path.exists(path):
shutil.rmtree(path)
if not os.path.exists(path):
os.makedirs(path)
if __name__ == '__main__':
pass

View File

@@ -0,0 +1 @@
__version__ = '2.0.8'

View File

@@ -0,0 +1,143 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2021-2026 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the M. Pavlinsky SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import json
import numpy as np
from astropy.io import fits
from astropy.wcs import WCS
import astropy.units as u
from astropy.coordinates import Angle
from astropy.coordinates import SkyCoord
class Fov(object):
def __init__(self, ra_cen, dec_cen):
self._ra_cen = ra_cen
self._dec_cen = dec_cen
fov_size = 0.66 * u.deg
pix_size = 45.5728027182 * u.arcsec / u.pix
fov_wcs, fov_shape = \
self._make_fov_wcs(
self._ra_cen,
self._dec_cen,
fov_size,
pix_size
)
self._wcs = Wcs()
self._wcs.init(fov_wcs)
self._shape = fov_shape
@property
def wcs(self):
return self._wcs
@property
def ra(self):
return self._ra_cen
@property
def dec(self):
return self._dec_cen
def _make_fov_wcs(self,
ra_cen, dec_cen,
fov_size, pix_size,
ref_pix=None,
projection='TAN' ):
center = SkyCoord(ra_cen, dec_cen, unit=u.deg)
fov_width = Angle(fov_size, unit=u.deg)
fov_height = Angle(fov_size, unit=u.deg)
pix_width = Angle(pix_size)
pix_height = Angle(pix_size)
nx = int(np.ceil((fov_width / pix_width ).decompose().value))
ny = int(np.ceil((fov_height / pix_height).decompose().value))
if ref_pix is None:
ref_pix = [(nx + 1) / 2., (ny + 1) / 2.]
wcs = Wcs(naxis=2)
wcs.wcs.radesys = 'FK5'
wcs.wcs.equinox = 2000.
wcs.wcs.crval = [center.ra.deg, center.dec.deg]
wcs.wcs.crpix = ref_pix
wcs.wcs.cdelt = [-pix_width.value, pix_height.value]
return wcs, (ny, nx)
class Wcs(object):
def __init__(self):
self._strip_size = 45.5728027182
def init(self, wcs):
self._wcs = wcs
self._projection = self._make_projection(self._wcs)
return self
def from_other(self, projection):
self._wcs = None
self._projection = projection
return self
@staticmethod
def from_file(fname):
with open(fname) as wcsfile:
return Wcs().init(json.load(wcsfile))
@staticmethod
def from_eventfile(fname):
with fits.open(fname) as hdul:
projection = WCS(hdul['EVENTS'].header)
return Wcs().from_other(projection)
def get(self):
return self._projection
def world2pix(self, ra, dec):
y, x = (
self._projection.all_world2pix(
np.array([ra, dec]).T, 1) - 0.5
).astype(int).T
return x, y
def _make_projection(self, wcs, subpixels=5):
pixsize = self._strip_size / 3600. / subpixels
projection = WCS(naxis=2)
projection.wcs.crpix = [wcs['nx' ], wcs['ny' ]]
projection.wcs.crval = [wcs['crval1'], wcs['crval2']]
projection.wcs.cdelt = [pixsize, pixsize]
projection.wcs.ctype = ["RA---TAN", "DEC--TAN"]
projection.wcs.radesys = "FK5"
return projection
if __name__ == '__main__':
pass

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2020 Space Research Institute (http://iki.rssi.ru/)
#
# This file is part of ARTXC software project.
# -----------------------------------------------------------------------------
import os, sys
import unittest
import unittest.mock as mock
import numpy as np
sys.path.append('../src')
from pyartxc.gti import Gti
class Test_(unittest.TestCase):
def setUp(self):
pass
def test_create_empty(self):
pass
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,208 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# -----------------------------------------------------------------------------
# Copyright (c) 2020-2025 IKI RAS (http://iki.rssi.ru/)
# Space Research Institute of the Russian Academy of Science
#
# This file is part of the M. Pavlinsky SRG/ART-XC software project.
# -----------------------------------------------------------------------------
import os, sys
import unittest
import unittest.mock as mock
import numpy as np
sys.path.append(os.path.join(os.getcwd(), '../src/'))
import arttools
from arttools.gti import Gti
class TestGti(unittest.TestCase):
def setUp(self):
self.gti_0 = Gti()
self.gti_1 = Gti([[100., 200.],])
self.gti_2 = Gti([[100., 200.], [300, 305]])
self.gti_3 = Gti([100., 200., 300., 400.])
def test_create_empty(self):
self.assertTrue (self.gti_0.empty)
self.assertEqual(self.gti_0.size, 0)
def test_create_from_intervals_1(self):
self.assertFalse(self.gti_1.empty)
self.assertEqual(self.gti_1.size, 1)
self.assertEqual(self.gti_1.exposure(), 100.)
def test_create_from_intervals_2(self):
self.assertFalse(self.gti_2.empty)
self.assertEqual(self.gti_2.size, 2)
self.assertEqual(self.gti_2.exposure(), 105.)
def test_create_from_intervals_3(self):
with self.assertRaises(ValueError):
Gti([100., 200., 300.])
def test_startstop(self):
self.assertEqual(self.gti_0.start, None)
self.assertEqual(self.gti_0.stop , None)
self.assertEqual(self.gti_2.start, 100.)
self.assertEqual(self.gti_2.stop , 305.)
def test_create_from_timemask(self):
times = np.array([100, 200, 300, 400, 500, 600])
param = np.array([-99, -98, -10, 0, -98, -99])
gti = Gti.from_timemask(times, param < -95)
self.assertFalse(gti.empty)
self.assertEqual(gti.size , 2)
self.assertEqual(gti.exposure(), 200)
self.assertEqual(gti.start , 100)
self.assertEqual(gti.stop , 600)
def test_clone(self):
gti_clone = self.gti_2.clone()
self.assertFalse(gti_clone.empty)
self.assertEqual(gti_clone.size, 2)
self.assertEqual(gti_clone.exposure(), 105.)
def test_equal(self):
gti = Gti([100., 200.])
self.assertTrue (gti == gti )
self.assertTrue (gti == self.gti_1)
self.assertFalse(gti == self.gti_2)
self.assertFalse(gti == None )
def test_union_1(self):
gti_1 = Gti([100., 200.])
gti_2 = Gti()
res_gti_1 = gti_1 + None
res_gti_2 = gti_1 + gti_1
res_gti_3 = gti_1 + gti_2
res_gti_4 = gti_2 + gti_1
self.assertTrue(res_gti_1 == gti_1 )
self.assertTrue(res_gti_2 == gti_1 )
self.assertTrue(res_gti_3 == gti_1 )
self.assertTrue(res_gti_4 == gti_1 )
self.assertTrue(res_gti_3 == res_gti_4)
def test_union_2(self):
gti_1 = Gti([100., 200.])
gti_2 = Gti([150., 250.])
res_gti_1 = gti_1 + gti_2
res_gti_2 = gti_2 + gti_1
self.assertEqual(res_gti_1.size , 1)
self.assertEqual(res_gti_1.exposure(), 150)
self.assertEqual(res_gti_1.start , 100)
self.assertEqual(res_gti_1.stop , 250)
self.assertTrue (res_gti_1 == res_gti_2)
def test_union_3(self):
gti_1 = Gti([100., 200., 300., 400.])
gti_2 = Gti([150., 250., 350., 450., 370., 470.])
gti_3 = Gti([170., 270.])
res_gti = gti_1 + gti_2 + gti_3
self.assertEqual(res_gti.size , 2)
self.assertEqual(res_gti.exposure(), 340)
self.assertEqual(res_gti.start , 100)
self.assertEqual(res_gti.stop , 470)
def test_subtraction_1(self):
gti_1 = Gti([100., 200.])
gti_2 = Gti()
res_gti_1 = gti_1 - None
res_gti_2 = gti_1 - gti_1
res_gti_3 = gti_1 - gti_2
res_gti_4 = gti_2 - gti_1
self.assertTrue(res_gti_1 == gti_1 )
self.assertTrue(res_gti_2 == self.gti_0)
self.assertTrue(res_gti_3 == gti_1 )
self.assertTrue(res_gti_4 == self.gti_0)
def test_subtraction_2(self):
gti_1 = Gti([100., 200.])
gti_2 = Gti([150., 250.])
res_gti = gti_1 - gti_2
self.assertEqual(res_gti.size , 1)
self.assertEqual(res_gti.start , 100)
self.assertEqual(res_gti.stop , 200)
self.assertEqual(res_gti.exposure(), 50)
self.assertTrue(
np.array_equal(res_gti.array, np.array([[150., 200.]])))
def test_subtraction_3(self):
gti_2 = Gti([50., 120., 170., 250.])
gti_1 = Gti([100., 200.])
res_gti = gti_2 - gti_1
self.assertEqual(res_gti.size , 2)
self.assertEqual(res_gti.start , 50)
self.assertEqual(res_gti.stop , 250)
self.assertEqual(res_gti.exposure(), 50)
self.assertTrue(
np.array_equal(res_gti.array, np.array([[100., 120.], [170., 200.]])))
def test_intersection_1(self):
gti_1 = Gti([100., 200.])
gti_2 = Gti()
res_gti_1 = gti_1 * None
res_gti_2 = gti_1 * gti_1
res_gti_3 = gti_1 * gti_2
res_gti_4 = gti_2 * gti_1
self.assertTrue(res_gti_1 == self.gti_0)
self.assertTrue(res_gti_2 == gti_1 )
self.assertTrue(res_gti_3 == self.gti_0)
self.assertTrue(res_gti_4 == self.gti_0)
def test_intersection_2(self):
gti_1 = Gti([100., 200.])
gti_2 = Gti([50., 120., 170., 250.])
res_gti = gti_2 * gti_1
self.assertEqual(res_gti.size , 2)
self.assertEqual(res_gti.start , 50)
self.assertEqual(res_gti.stop , 250)
self.assertEqual(res_gti.exposure(), 50)
self.assertTrue(
np.array_equal(res_gti.array, np.array([[100., 120.], [170., 200.]])))
def test_intersection_3(self):
gti_1 = Gti([100., 200.])
gti_2 = Gti([50., 120., 170., 250.])
res_gti = gti_1 * gti_2
self.assertEqual(res_gti.size , 2)
self.assertEqual(res_gti.start , 50)
self.assertEqual(res_gti.stop , 250)
self.assertEqual(res_gti.exposure(), 50)
self.assertTrue(
np.array_equal(res_gti.array, np.array([[100., 120.], [170., 200.]])))
# def test_inversion(self):
# pass
if __name__ == '__main__':
unittest.main()