# uplim/views.py # from astropy_healpix import HEALPix does not have an option to # search for pixels non-inclusively import healpy as hp import astropy.units as u import numpy as np import scipy.special as sp from astropy.coordinates import SkyCoord, Angle from astropy.stats import poisson_conf_interval from django.db.models import Sum from django.shortcuts import get_object_or_404 from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from uplim.models import Pixel, CatalogSource # SANITIZE RESPONSE DATA BEFORE JSON CONVERSION FOR DEBUGGING NANS # now NaNs are converted to 'null' beforehand # **************************************************************** def sanitize(obj): if isinstance(obj, dict): return {k: sanitize(v) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [sanitize(v) for v in obj] # handle numpy scalars if isinstance(obj, np.generic): v = obj.item() return None if (np.isnan(v) or np.isinf(v)) else v if isinstance(obj, float): return None if (np.isnan(obj) or np.isinf(obj)) else obj return obj # SURVEY PARAMETER PARSER # ************************************************************** def parse_survey_param(raw): surveys = set() for part in raw.split(","): if "-" in part: start, end = part.split("-", 1) surveys.update(range(int(start), int(end) + 1)) else: surveys.add(int(part)) return sorted(surveys) # PIXEL VIEW (MOSTLY FOR TESTING) # add healpix indices into the output # ************************************************************** class PixelAggregateView(APIView): def get(self, request): # GET PARAMETERS FROM THE QUERY # ************************************************************** raw_pixel = request.query_params.get("pixel") raw_survey = request.query_params.get("survey") # 400 BADREQUEST # ************************************************************** if raw_pixel is None or raw_survey is None: return Response( {"detail": "Both `pixel` and `survey` parameters are required."}, status=status.HTTP_400_BAD_REQUEST, ) # FILTER THE INPUTS # ************************************************************** try: hpid = int(raw_pixel) except ValueError: return Response( {"detail": "`pixel` must be an integer."}, status=status.HTTP_400_BAD_REQUEST, ) try: survey_numbers = parse_survey_param(raw_survey) except ValueError: return Response( {"detail": "Malformed `survey`; use N, N,M or N-M formats."}, status=status.HTTP_400_BAD_REQUEST, ) # FILTER AND AGGREGATE # ************************************************************** qs = Pixel.objects.filter(hpid=hpid, survey__in=survey_numbers) if not qs.exists(): # no matching pixel(s) → 404 get_object_or_404(Pixel, hpid=hpid, survey__in=survey_numbers) aggregates = qs.aggregate( # pixel_hpid=hpid, # survey_number=survey_numbers, total_counts=Sum("counts"), total_exposure=Sum("exposure"), ) plusdata = {"pixel_hpid": hpid, "surveys": survey_numbers} result = {**aggregates, **plusdata} # RETURN THE SUMS # ************************************************************** return Response(result, status=status.HTTP_200_OK) # UPPER LIMIT COMPUTATION VIEW # ************************************************************** class UpperLimitView(APIView): """ Calculate confidence bounds based on aperture photometry using classic and bayesian methods """ def get(self, request): # GET PARAMETERS FROM THE REQUEST # ************************************************************** try: ra = float(request.query_params.get("ra")) dec = float(request.query_params.get("dec")) confidence_level = float(request.query_params.get("cl")) except (TypeError, ValueError): return Response( {"error": "Invalud parameters, provide RA, DEC, and CL"}, status=status.HTTP_400_BAD_REQUEST, ) # pull & parse survey selection raw_survey = request.query_params.get("survey") if raw_survey is None: return Response( {"error": "Missing required `survey` parameter (e.g. ?survey=1,3-5)"}, status=status.HTTP_400_BAD_REQUEST, ) try: survey_numbers = parse_survey_param(raw_survey) except ValueError: return Response( {"error": "Malformed `survey`; use formats like 1, 2-5, or 1,3-4"}, status=status.HTTP_400_BAD_REQUEST, ) map_radius_value = request.query_params.get("mr") # CREATE SKYCOORD, CONVERT TO GALACTIC BECAUSE THE HEALPIX # MAP ITSELF WAS MADE IN GALACTIC COORDINATES # ************************************************************** src_coord = SkyCoord(ra, dec, unit="deg", frame="icrs") gal = src_coord.galactic src_vec = hp.ang2vec(gal.l.deg, gal.b.deg, lonlat=True) # DEFINE APERTURE AND ANNULUS RADII # ************************************************************** aperture_radius = 71 # radius of the aperture in arc seconds # HPD ~48 arcseconds # 90% ~100 arcseconds annulus_inner = 142 # 2 * aperture_radius annulus_outer = 284 # 4 * aperture_radius # FETCH PIXEL DATA DEFINED VIA HP.QUERY_DISC (INCLUSIVE=FALSE) # ************************************************************** source_pixel_list = hp.query_disc( nside=4096, vec=src_vec, inclusive=False, nest=False, radius=(aperture_radius * u.arcsecond).to(u.radian).value, ) inner_pixel_list = hp.query_disc( nside=4096, vec=src_vec, inclusive=False, nest=False, radius=(annulus_inner * u.arcsecond).to(u.radian).value, ) outer_pixel_list = hp.query_disc( nside=4096, vec=src_vec, inclusive=False, nest=False, radius=(annulus_outer * u.arcsecond).to(u.radian).value, ) annulus_pixel_list = [ item for item in outer_pixel_list if item not in inner_pixel_list ] source_pixels = Pixel.objects.filter( hpid__in=source_pixel_list, survey__in=survey_numbers ) annulus_pixels = Pixel.objects.filter( hpid__in=annulus_pixel_list, survey__in=survey_numbers ) # check contamination contamination = ( source_pixels.filter(contaminated=True).exists() or annulus_pixels.filter(contaminated=True).exists() ) if not source_pixels.exists() and not annulus_pixels.exists(): return Response( {"detail": "No pixel data for the given survey selection."}, status=status.HTTP_404_NOT_FOUND, ) # COMPUTE COUNTS, BACKGROUND ESTIMATE, EXPOSURE # ************************************************************** N = sum(obj.counts for obj in source_pixels) Nnpix = len(source_pixels) Bcounts = sum(obj.counts for obj in annulus_pixels) Bnpix = len(annulus_pixels) B = Bcounts / Bnpix * Nnpix tsum = sum(obj.exposure for obj in source_pixels) t = tsum / Nnpix # CONSTANTS # ************************************************************** # EEF = .9 # eclosed energy fraction, .5 for hpd, .9 for w90 # ECF = 4e-11 # energy conversion factor EEF = 0.80091 # use values from the paper ECF = 3.3423184e-11 # BAYESIAN IMPLEMENTATION VIA POISSON_CONF_INTERVAL # ************************************************************** # double sided interval low, high = poisson_conf_interval( n=N, background=B, interval="kraft-burrows-nousek", confidence_level=confidence_level, ) # because poisson_conf_interval returns lists bayesian_count_ul = high[0] bayesian_count_ll = low[0] bayesian_count_UL = ( sp.gammaincinv( N + 1, confidence_level * sp.gammaincc(N + 1, B) + sp.gammainc(N + 1, B) ) - B ) bayesian_rate_ul = bayesian_count_ul / t / EEF # count rate limits bayesian_rate_ll = bayesian_count_ll / t / EEF bayesian_rate_UL = bayesian_count_UL / t / EEF bayesian_flux_ul = bayesian_rate_ul * ECF # flux limits bayesian_flux_ll = bayesian_rate_ll * ECF bayesian_flux_UL = bayesian_rate_UL * ECF # CLASSICAL IMPLEMENTATION VIA GAMMAINCCINV # **************************************************************** # confidence level for the double sided interval cl2 = (1 + confidence_level) / 2 # double sided interval classic_count_ul = sp.gammainccinv(N + 1, 1 - cl2) - B classic_count_ll = sp.gammainccinv(N, cl2) - B # one sided interval classic_count_UL = sp.gammainccinv(N + 1, 1 - confidence_level) - B if not np.isfinite(classic_count_ll) or classic_count_ll < 0: classic_count_ll = 0.0 classic_rate_ul = classic_count_ul / t / EEF # count rate limits classic_rate_ll = classic_count_ll / t / EEF classic_rate_UL = classic_count_UL / t / EEF classic_flux_ul = classic_rate_ul * ECF # flux limits classic_flux_ll = classic_rate_ll * ECF classic_flux_UL = classic_rate_UL * ECF # FLUX ESTIMATION # **************************************************************** S = N - B # counts as simply counts within aperture # with the background estimate subtracted CR = S / t / EEF # source count rate BR = B / t # background rate within aperture FL = CR * ECF # conversion to flux Flux = max(FL, 0) # flux cannot be lower than zero # NEARBY SOURCES CHECK # **************************************************************** radius_as = 120 radius_deg = radius_as / 3600 dec_min = max(dec - radius_deg, -90) dec_max = min(dec + radius_deg, 90) # cheap belt query belt_sources = CatalogSource.objects.filter( dec_deg__gte=dec_min, dec_deg__lte=dec_max ) center_coord = SkyCoord(ra, dec, unit="deg") nearby_sources = [] # refine belt to circular region using astropy separation for catsrc in belt_sources: catsrc_coord = SkyCoord(catsrc.ra_deg, catsrc.dec_deg, unit="deg") if center_coord.separation(catsrc_coord).deg <= radius_deg: nearby_sources.append( { "srcid": catsrc.srcid, "name": catsrc.name, "ra_deg": catsrc.ra_deg, "dec_deg": catsrc.dec_deg, "pos_error": catsrc.pos_error, "significance": catsrc.significance, "flux": catsrc.flux, "flux_error": catsrc.flux_error, "catalog_name": catsrc.catalog_name, "new_xray": catsrc.new_xray, "source_type": catsrc.source_type, } ) # SQUARE REGION IMAGE SERVING # **************************************************************** # default value if not specified in the query # get hpids within a circle of radius sqrt(2) * outer annulus radius if map_radius_value is None: map_radius = annulus_outer * np.sqrt(2) else: map_radius = float(map_radius_value) map_pixel_list = hp.query_disc( nside=4096, vec=src_vec, inclusive=False, nest=False, radius=(map_radius * u.arcsecond).to(u.radian).value, ) # fetch those pixels for the requested surveys # summing counts and sorting by hpid map_pixels_qs = ( Pixel.objects.filter(hpid__in=map_pixel_list, survey__in=survey_numbers) .values("hpid") .annotate(counts=Sum("counts")) .order_by("hpid") ) # turn the queryset to a list map_pixels_list = list(map_pixels_qs) # get lists of healpix indices and count values map_healpix_list = [d["hpid"] for d in map_pixels_list] map_counts_list = [d["counts"] for d in map_pixels_list] # set map nside map_nside = 4096 # set map order map_order = "ring" # assemble the result dict map_dict = { "healpix": map_healpix_list, "counts": map_counts_list, "nside": map_nside, "order": map_order, "radius_as": map_radius, } # RESULT JSON # **************************************************************** result = { # frequentist limits "ClassicUpperLimit": classic_count_ul, "ClassicLowerLimit": classic_count_ll, "ClassicOneSideUL": classic_count_UL, "ClassicCountRateUpperLimit": classic_rate_ul, "ClassicCountRateLowerLimit": classic_rate_ll, "ClassicCountRateOneSideUL": classic_rate_UL, "ClassicFluxUpperLimit": classic_flux_ul, "ClassicFluxLowerLimit": classic_flux_ll, "ClassicFluxOneSideUL": classic_flux_UL, # bayesian limits "BayesianUpperLimit": bayesian_count_ul, "BayesianLowerLimit": bayesian_count_ll, "BayesianOneSideUL": bayesian_count_UL, "BayesianCountRateUpperLimit": bayesian_rate_ul, "BayesianCountRateLowerLimit": bayesian_rate_ll, "BayesianCountRateOneSideUL": bayesian_rate_UL, "BayesianFluxUpperLimit": bayesian_flux_ul, "BayesianFluxLowerLimit": bayesian_flux_ll, "BayesianFluxOneSideUL": bayesian_flux_UL, # flux 'center value' estimate "FluxEstimate": Flux, # raw data "ApertureCounts": N, "ApertureBackgroundCounts": B, "SourceCounts": S, "Exposure": t, # count rates "SourceRate": CR, "BackgroundRate": BR, # contamination "Contamination": contamination, "NearbySources": nearby_sources, # count map for the frontend image "CountMap": map_dict, } clean = sanitize(result) # calling sanitize() to convert NaN to null return Response(clean, status=status.HTTP_200_OK)