srg/astrobasis/management/commands/00_load_ztf_archive.py
2024-04-26 12:43:00 +03:00

257 lines
8.4 KiB
Python

from django.core.management.base import BaseCommand, CommandError
import os
import io
import gzip
import tarfile
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import fastavro
from astropy.time import Time
from astropy.io import fits
import astropy.units as u
import aplpy
from collections import defaultdict
import logging
import requests
import json
import pprint
from astrobasis.models import ZTFAlert
from srglib.utils import find_ztf_in_survey
from astropy.coordinates import SkyCoord # High-level coordinates
from astropy.coordinates import ICRS, Galactic, FK4, FK5 # Low-level frames
from astropy.coordinates import Angle, Latitude, Longitude # Angles
import astropy.units as u
import re
import json
import pandas as pd
from astropy_healpix import HEALPix
from erotrans.models import EroTransSource, EroTrans
from erosurvey.models import NSIDE_PLATES, ORDER_PLATES
from heasarc.models import NSIDE_SOURCES, ORDER
from monthplan.models import Survey
"""
https://zwickytransientfacility.github.io/ztf-avro-alert/filtering.html
Based on tests done at IPAC (F. Masci, priv. comm), the following filter delivers a relatively pure sample:
rb >= 0.65 and
nbad = 0 and
fwhm <= 5 and
elong <= 1.2 and
abs(magdiff) <= 0.1
"""
def plot_lightcurve(dflc, ax=None, days_ago=True):
filter_color = {1:'green', 2:'red', 3:'pink'}
if days_ago:
now = Time.now().jd
t = dflc.jd - now
xlabel = 'Days Ago'
else:
t = dflc.jd
xlabel = 'Time (JD)'
if ax is None:
plt.figure()
for fid, color in filter_color.items():
# plot detections in this filter:
w = (dflc.fid == fid) & ~dflc.magpsf.isnull()
if np.sum(w):
plt.errorbar(t[w],dflc.loc[w,'magpsf'], dflc.loc[w,'sigmapsf'],fmt='.',color=color)
wnodet = (dflc.fid == fid) & dflc.magpsf.isnull()
if np.sum(wnodet):
plt.scatter(t[wnodet],dflc.loc[wnodet,'diffmaglim'], marker='v',color=color,alpha=0.25)
plt.gca().invert_yaxis()
plt.xlabel(xlabel)
plt.ylabel('Magnitude')
plt.title(dflc.objectId)
def plot_cutout(stamp, fig=None, subplot=None, **kwargs):
with gzip.open(io.BytesIO(stamp), 'rb') as f:
with fits.open(io.BytesIO(f.read())) as hdul:
if fig is None:
fig = plt.figure(figsize=(4,4))
if subplot is None:
subplot = (1,1,1)
ffig = aplpy.FITSFigure(hdul[0],figure=fig, subplot=subplot, **kwargs)
ffig.show_grayscale(stretch='arcsinh')
return ffig
def show_stamps(packet):
#fig, axes = plt.subplots(1,3, figsize=(12,4))
fig = plt.figure(figsize=(12,4))
for i, cutout in enumerate(['Science','Template','Difference']):
stamp = packet['cutout{}'.format(cutout)]['stampData']
ffig = plot_cutout(stamp, fig=fig, subplot = (1,3,i+1))
ffig.set_title(cutout)
def show_all(packet):
fig = plt.figure(figsize=(16,4))
dflc = make_dataframe(packet)
plot_lightcurve(dflc,ax = plt.subplot(1,4,1))
for i, cutout in enumerate(['Science','Template','Difference']):
stamp = packet['cutout{}'.format(cutout)]['stampData']
ffig = plot_cutout(stamp, fig=fig, subplot = (1,4,i+2))
ffig.set_title(cutout)
def make_dataframe(packet):
dfc = pd.DataFrame(packet['candidate'], index=[0])
df_prv = pd.DataFrame(packet['prv_candidates'])
dflc = pd.concat([dfc,df_prv], ignore_index=True, sort=False)
# we'll attach some metadata--not this may not be preserved after all operations
# https://stackoverflow.com/questions/14688306/adding-meta-information-metadata-to-pandas-dataframe
dflc.objectId = packet['objectId']
dflc.candid = packet['candid']
return dflc
def is_alert_pure(packet):
pure = True
pure &= packet['candidate']['rb'] >= 0.65
pure &= packet['candidate']['nbad'] == 0
pure &= packet['candidate']['fwhm'] <= 5
pure &= packet['candidate']['elong'] <= 1.2
pure &= np.abs(packet['candidate']['magdiff']) <= 0.1
return pure
def generate_dictionaries(root_dir):
for fname in find_files(root_dir):
for packet in open_avro(fname):
yield packet
def open_avro(fname):
with open(fname,'rb') as f:
freader = fastavro.reader(f)
# in principle there can be multiple packets per file
for packet in freader:
yield packet
def find_files(root_dir):
for dir_name, subdir_list, file_list in os.walk(root_dir, followlinks=True):
for fname in file_list:
if fname.endswith('.avro'):
yield dir_name+'/'+fname
def is_transient(dflc):
candidate = dflc.loc[0]
is_positive_sub = candidate['isdiffpos'] == 't'
if (candidate['distpsnr1'] is None) or (candidate['distpsnr1'] > 1.5):
no_pointsource_counterpart = True
else:
if candidate['sgscore1'] < 0.5:
no_pointsource_counterpart = True
else:
no_pointsource_counterpart = False
where_detected = (dflc['isdiffpos'] == 't') # nondetections will be None
if np.sum(where_detected) >= 2:
detection_times = dflc.loc[where_detected,'jd'].values
dt = np.diff(detection_times)
not_moving = np.max(dt) >= (30*u.minute).to(u.day).value
else:
not_moving = False
no_ssobject = (candidate['ssdistnr'] is None) or (candidate['ssdistnr'] < 0) or (candidate['ssdistnr'] > 5)
return is_positive_sub and no_pointsource_counterpart and not_moving and no_ssobject
def load_ztf_packet(item):
hp = HEALPix(nside=NSIDE_SOURCES,
order=ORDER, frame=FK5())
hp_plate = HEALPix(nside=NSIDE_PLATES,
order=ORDER_PLATES,
frame=FK5())
name = item['objectId']
try:
ztf = ZTFAlert.objects.get(objectId=name)
print("ZTF alert ID %s is already loaded, skip" % name)
return
except ZTFAlert.DoesNotExist:
print("ZTF alert ID %s is not loaded" % name)
pass
#date_string = item['candidate']['wall_time']
#dt = datetime.strptime(date_string, format_string)
ra = item['candidate']['ra']
dec = item['candidate']['dec']
crd = SkyCoord(ra, dec, frame=FK5(), unit="deg")
healpix = hp.skycoord_to_healpix(crd)
healpix_plate = hp_plate.skycoord_to_healpix(crd)
ztf = ZTFAlert(
objectId = item['objectId'],
fid = item['candidate']['fid'],
lco_id = item['candid'],
healpix = healpix, healpix_plate = healpix_plate,
ra = ra, dec = dec,
programid = item['candidate']['programid'],
magpsf = item['candidate']['magpsf'],
sigmapsf = item['candidate']['sigmapsf'],
magap = item['candidate']['magap'],
magdiff = item['candidate']['magdiff'],
nbad = item['candidate']['nbad'],
fwhm = item['candidate']['fwhm'],
elong = item['candidate']['elong'],
sigmagap = item['candidate']['sigmagap'],
#wall_time = datetime.strptime(date_string, format_string),
diffmaglim = item['candidate']['diffmaglim'],
#deltamagref = item['candidate']['deltamagref'],
#deltamaglatest = item['candidate']['deltamaglatest'],
rb = item['candidate']['rb'])
ztf.save()
find_ztf_in_survey(ztf)
class Command(BaseCommand):
help = 'Initiates data dase'
def handle(self, *args, **options):
tar_archive = '/data/ZTF/ztf_public_20200126.tar.gz'
output_dir = tar_archive.split('/')[-1].split('.')[-3]
#archive = tarfile.open(tar_archive,'r:gz')
#archive.extractall(path=output_dir)
#archive.close()
print (output_dir)
print('{} has {} avro files'.format(output_dir, len(list(find_files(output_dir)))))
transient_alerts = []
for packet in filter(is_alert_pure,generate_dictionaries(output_dir)):
dflc = make_dataframe(packet)
if is_transient(dflc):
transient_alerts.append(packet)
load_ztf_packet(packet)
"""
for packet in transient_alerts:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
show_all(packet)
"""