Files
2025-09-17 18:33:33 +03:00

223 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
"""
НАЗВАНИЕ:
gti.py
НАЗНАЧЕНИЕ:
рипт для создания интервалов хорошего времени по данным из файла, сбинированного по 60 минут программой create_binned_filed.py (на основе оригинального файла lc_orig_60-120.csv, выгруженного с СРГ L2 монитора https://monitor.srg.cosmos.ru/ )
Скрипт аппроксимирует кривую блеска полиномом, чтобы убрать синусоидальный тренд, ищет времена вспышек и возвращает интервалы времени без вспышек в офрмате секунд с начала 1 января 2000 года (Москва) (файл gti_sec.csv) и в формате дестичного года (файл gti_dyear.csv).
ВЫЗОВ:
conda activate
./gti.py
УПРАВЛЕНИЕ:
Название файла надо вставить внутрь скрипта (ищите lc_bin_60min_60-120.csv)
ПАРАМЕТРЫ:
N/A
ВЫВОД:
Файл с графиком monitor.png записывается в текущую директорию.
Файл с интервалами времени gti.csv записывается в текущую директорию.
ИСТОРИЯ:
Филиппова Екатерина, ИКИ РАН, kate@cosmos.ru
Сентябрь, 2025
"""
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import ticker
import pandas as pd
import datetime
import dateutil
from scipy.optimize import curve_fit
from scipy.ndimage import gaussian_filter1d
from scipy import signal
from scipy.stats import median_abs_deviation
import astropy.io.fits as pf
from astropy.time import Time
def polynomial_func(x, *coefficients):
"""General polynomial function"""
return sum(coef * x**i for i, coef in enumerate(coefficients))
def detect_outbursts_threshold(times, fluxes, threshold=2.0, min_duration=3):
"""
Detect outbursts using a threshold-based approach.
Parameters:
times: array of time values
fluxes: array of flux values
threshold: number of standard deviations above median to consider as outburst
min_duration: minimum number of consecutive points to consider as outburst
Returns:
outburst_intervals: list of (start_time, end_time) tuples
"""
# Calculate median and standard deviation
median_flux = np.median(fluxes)
mad = median_abs_deviation(fluxes)
print("median_flux",median_flux,"mad",mad)
# Create a threshold value
threshold_value = median_flux + threshold * mad
# Identify points above threshold
above_threshold = fluxes > threshold_value
# Find consecutive points above threshold
outburst_indices = []
in_outburst = False
start_idx = None
for i, is_above in enumerate(above_threshold):
if is_above and not in_outburst:
in_outburst = True
start_idx = i
elif not is_above and in_outburst:
in_outburst = False
# Check if the outburst duration meets the minimum requirement
if i - start_idx >= min_duration:
outburst_indices.append((start_idx, i-1))
start_idx = None
# Handle case where outburst continues until end of data
if in_outburst and len(fluxes) - start_idx >= min_duration:
outburst_indices.append((start_idx, len(fluxes)-1))
# Convert indices to time intervals
outburst_intervals = [(times[start], times[end]) for start, end in outburst_indices]
return outburst_intervals, median_flux,mad
fig, ax = plt.subplots(figsize=(9, 5), dpi=100)
ax.grid(visible=True,linestyle='dotted', )
ax.xaxis.set_minor_locator(ticker.MultipleLocator(1/12))
ax.yaxis.set_minor_locator(ticker.MultipleLocator(0.2))
ax.tick_params(axis="both", width=1, labelsize=14)
plt.ylim([-2,21])
plt.ylabel('Count rate (counts s$^{-1}$)',fontsize=14, fontweight='normal')
plt.xlabel('Year',fontsize=14, fontweight='normal')
for axis in ['top','bottom','left','right']:
ax.spines[axis].set_linewidth(1)
cl='black'
# Сюда надо вставить название файла с данными
df = pd.read_csv('lc_bin_60min_60-120.csv',)
tm=[]
rate=[]
error=[]
for index, row in df.iterrows():
dt = dateutil.parser.parse(row['timestamp'])
day_of_year = dt.timetuple().tm_yday+dt.timetuple().tm_hour/24+dt.timetuple().tm_min/60/24+dt.timetuple().tm_sec/60/60/24 # returns 1 for January 1st
is_leap = (dt.year % 4 == 0 and dt.year % 100 != 0) or (dt.year % 400 == 0)
days_in_year = 366 if is_leap else 365
tm.append(dt.year+(day_of_year/days_in_year))
rate.append(float(row['value_60.0-120.0']))
error.append(float(row['error_60.0-120.0']))
rate=np.array(rate)
tm=np.array(tm)
tmp=np.where(rate>0,1,0)
rate=np.compress(tmp,rate)
tm=np.compress(tmp,tm)
error=np.compress(tmp,error)
tmp=np.where(rate < 4.3,1,0)
tm_fit=np.compress(tmp,tm)
rate_fit=np.compress(tmp,rate)
# fit polinom of degree
degree = 6
initial_guess = [1] * (degree + 1) # Initial guess for coefficients
#initial_guess = [-3.67265411e+08, 5.44675290e+05, -2.69261063e+02, 4.43698213e-02]
initial_guess = [ -6.10911582e+07, 3.02005656e+04, 1.49297834e+01, 6.53394516e-09, -3.64859359e-06, -1.80368300e-09, 8.91651441e-13]
params, covariance = curve_fit(polynomial_func, tm_fit, rate_fit, p0=initial_guess)
#print("curve_fit:",params)
# Generate fitted curve
y_fit = polynomial_func(tm, *params)
rate_flat=rate-y_fit
plt.plot(tm, rate, color=cl, linewidth=1, linestyle='solid',marker='.',label="Original data")
#plt.plot(tm_fit, rate_fit, color=cl, marker="o")
plt.plot(tm, y_fit, color="blue", linewidth=1, linestyle='solid', label=f'Degree {degree} fit')
plt.plot(tm, rate_flat, color="red", linewidth=1, linestyle='solid',label="Residuals")
smoothed_rate = gaussian_filter1d(rate_flat, sigma=2)
plt.plot(tm, smoothed_rate, color=cl, linewidth=1, linestyle='solid',label="Smoothed resid")
threshold=8.
outbursts,median_flux,mad = detect_outbursts_threshold(tm, smoothed_rate, threshold=threshold, min_duration=1)
for start, end in outbursts:
mask = (tm >= start) & (tm <= end)
plt.plot(tm[mask], rate[mask], 'm-', linewidth=2, label='Threshold outbursts' if start == outbursts[0][0] else "")
horline=np.ones(len(tm))*(median_flux + threshold * mad)
plt.plot(tm,horline,color=cl, linewidth=1, linestyle='dashed',label="Threshold flux")
print("Number of outburst:",np.size(outbursts))
#print("Detected outburst intervals:", outbursts)
tstart=[tm[0]]
tstop=[]
for start,end in outbursts:
if end != tm[-1]:
tstop.append(start)
tstart.append(end)
else:
tstop.append(start)
if outbursts[-1][1] != tm[-1]:
tstop.append(tm[-1])
# plot GTI
for i in np.arange(len(tstart)):
mask = (tm >= tstart[i]) & (tm <= tstop[i])
# plt.plot(tm[mask], np.ones(len(tm[mask]))*2, 'y-', linewidth=2, label='GTI')
plt.legend()
fig.savefig("outbursts.png", bbox_inches='tight')
df = pd.DataFrame({'tstart': tstart,'tstop': tstop})
df.to_csv('gti_dyear.csv', index=False)
tstart_obj = Time(tstart, format='decimalyear')
tstop_obj=Time(tstop,format='decimalyear')
tstart_sec=(tstart_obj.mjd-51543.75)*86400
tstop_sec=(tstop_obj.mjd-51543.75)*86400
df = pd.DataFrame({'tstart': tstart_sec,'tstop': tstop_sec})
df.to_csv('gti_sec.csv', index=False)
print("Files gti_dyear.csv and gti_sec.csv are created")