from django.core.management.base import BaseCommand, CommandError import os import io import gzip import tarfile import warnings import numpy as np import pandas as pd import matplotlib.pyplot as plt from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter import fastavro from astropy.time import Time from astropy.io import fits import astropy.units as u import aplpy from collections import defaultdict import logging import requests import json import pprint from astrobasis.models import ZTFAlert from srglib.utils import find_ztf_in_survey from astropy.coordinates import SkyCoord # High-level coordinates from astropy.coordinates import ICRS, Galactic, FK4, FK5 # Low-level frames from astropy.coordinates import Angle, Latitude, Longitude # Angles import astropy.units as u import re import json import pandas as pd from astropy_healpix import HEALPix from erotrans.models import EroTransSource, EroTrans from erosurvey.models import NSIDE_PLATES, ORDER_PLATES from heasarc.models import NSIDE_SOURCES, ORDER from monthplan.models import Survey """ https://zwickytransientfacility.github.io/ztf-avro-alert/filtering.html Based on tests done at IPAC (F. Masci, priv. comm), the following filter delivers a relatively pure sample: rb >= 0.65 and nbad = 0 and fwhm <= 5 and elong <= 1.2 and abs(magdiff) <= 0.1 """ def plot_lightcurve(dflc, ax=None, days_ago=True): filter_color = {1:'green', 2:'red', 3:'pink'} if days_ago: now = Time.now().jd t = dflc.jd - now xlabel = 'Days Ago' else: t = dflc.jd xlabel = 'Time (JD)' if ax is None: plt.figure() for fid, color in filter_color.items(): # plot detections in this filter: w = (dflc.fid == fid) & ~dflc.magpsf.isnull() if np.sum(w): plt.errorbar(t[w],dflc.loc[w,'magpsf'], dflc.loc[w,'sigmapsf'],fmt='.',color=color) wnodet = (dflc.fid == fid) & dflc.magpsf.isnull() if np.sum(wnodet): plt.scatter(t[wnodet],dflc.loc[wnodet,'diffmaglim'], marker='v',color=color,alpha=0.25) plt.gca().invert_yaxis() plt.xlabel(xlabel) plt.ylabel('Magnitude') plt.title(dflc.objectId) def plot_cutout(stamp, fig=None, subplot=None, **kwargs): with gzip.open(io.BytesIO(stamp), 'rb') as f: with fits.open(io.BytesIO(f.read())) as hdul: if fig is None: fig = plt.figure(figsize=(4,4)) if subplot is None: subplot = (1,1,1) ffig = aplpy.FITSFigure(hdul[0],figure=fig, subplot=subplot, **kwargs) ffig.show_grayscale(stretch='arcsinh') return ffig def show_stamps(packet): #fig, axes = plt.subplots(1,3, figsize=(12,4)) fig = plt.figure(figsize=(12,4)) for i, cutout in enumerate(['Science','Template','Difference']): stamp = packet['cutout{}'.format(cutout)]['stampData'] ffig = plot_cutout(stamp, fig=fig, subplot = (1,3,i+1)) ffig.set_title(cutout) def show_all(packet): fig = plt.figure(figsize=(16,4)) dflc = make_dataframe(packet) plot_lightcurve(dflc,ax = plt.subplot(1,4,1)) for i, cutout in enumerate(['Science','Template','Difference']): stamp = packet['cutout{}'.format(cutout)]['stampData'] ffig = plot_cutout(stamp, fig=fig, subplot = (1,4,i+2)) ffig.set_title(cutout) def make_dataframe(packet): dfc = pd.DataFrame(packet['candidate'], index=[0]) df_prv = pd.DataFrame(packet['prv_candidates']) dflc = pd.concat([dfc,df_prv], ignore_index=True, sort=False) # we'll attach some metadata--not this may not be preserved after all operations # https://stackoverflow.com/questions/14688306/adding-meta-information-metadata-to-pandas-dataframe dflc.objectId = packet['objectId'] dflc.candid = packet['candid'] return dflc def is_alert_pure(packet): pure = True pure &= packet['candidate']['rb'] >= 0.65 pure &= packet['candidate']['nbad'] == 0 pure &= packet['candidate']['fwhm'] <= 5 pure &= packet['candidate']['elong'] <= 1.2 pure &= np.abs(packet['candidate']['magdiff']) <= 0.1 return pure def generate_dictionaries(root_dir): for fname in find_files(root_dir): for packet in open_avro(fname): yield packet def open_avro(fname): with open(fname,'rb') as f: freader = fastavro.reader(f) # in principle there can be multiple packets per file for packet in freader: yield packet def find_files(root_dir): for dir_name, subdir_list, file_list in os.walk(root_dir, followlinks=True): for fname in file_list: if fname.endswith('.avro'): yield dir_name+'/'+fname def is_transient(dflc): candidate = dflc.loc[0] is_positive_sub = candidate['isdiffpos'] == 't' if (candidate['distpsnr1'] is None) or (candidate['distpsnr1'] > 1.5): no_pointsource_counterpart = True else: if candidate['sgscore1'] < 0.5: no_pointsource_counterpart = True else: no_pointsource_counterpart = False where_detected = (dflc['isdiffpos'] == 't') # nondetections will be None if np.sum(where_detected) >= 2: detection_times = dflc.loc[where_detected,'jd'].values dt = np.diff(detection_times) not_moving = np.max(dt) >= (30*u.minute).to(u.day).value else: not_moving = False no_ssobject = (candidate['ssdistnr'] is None) or (candidate['ssdistnr'] < 0) or (candidate['ssdistnr'] > 5) return is_positive_sub and no_pointsource_counterpart and not_moving and no_ssobject def load_ztf_packet(item): hp = HEALPix(nside=NSIDE_SOURCES, order=ORDER, frame=FK5()) hp_plate = HEALPix(nside=NSIDE_PLATES, order=ORDER_PLATES, frame=FK5()) name = item['objectId'] try: ztf = ZTFAlert.objects.get(objectId=name) print("ZTF alert ID %s is already loaded, skip" % name) return except ZTFAlert.DoesNotExist: print("ZTF alert ID %s is not loaded" % name) pass #date_string = item['candidate']['wall_time'] #dt = datetime.strptime(date_string, format_string) ra = item['candidate']['ra'] dec = item['candidate']['dec'] crd = SkyCoord(ra, dec, frame=FK5(), unit="deg") healpix = hp.skycoord_to_healpix(crd) healpix_plate = hp_plate.skycoord_to_healpix(crd) ztf = ZTFAlert( objectId = item['objectId'], fid = item['candidate']['fid'], lco_id = item['candid'], healpix = healpix, healpix_plate = healpix_plate, ra = ra, dec = dec, programid = item['candidate']['programid'], magpsf = item['candidate']['magpsf'], sigmapsf = item['candidate']['sigmapsf'], magap = item['candidate']['magap'], magdiff = item['candidate']['magdiff'], nbad = item['candidate']['nbad'], fwhm = item['candidate']['fwhm'], elong = item['candidate']['elong'], sigmagap = item['candidate']['sigmagap'], #wall_time = datetime.strptime(date_string, format_string), diffmaglim = item['candidate']['diffmaglim'], #deltamagref = item['candidate']['deltamagref'], #deltamaglatest = item['candidate']['deltamaglatest'], rb = item['candidate']['rb']) ztf.save() find_ztf_in_survey(ztf) class Command(BaseCommand): help = 'Initiates data dase' def handle(self, *args, **options): tar_archive = '/data/ZTF/ztf_public_20200126.tar.gz' output_dir = tar_archive.split('/')[-1].split('.')[-3] #archive = tarfile.open(tar_archive,'r:gz') #archive.extractall(path=output_dir) #archive.close() print (output_dir) print('{} has {} avro files'.format(output_dir, len(list(find_files(output_dir))))) transient_alerts = [] for packet in filter(is_alert_pure,generate_dictionaries(output_dir)): dflc = make_dataframe(packet) if is_transient(dflc): transient_alerts.append(packet) load_ztf_packet(packet) """ for packet in transient_alerts: with warnings.catch_warnings(): warnings.simplefilter("ignore") show_all(packet) """