srg/monthplan/views.py
2024-04-26 12:43:00 +03:00

554 lines
23 KiB
Python

from django.shortcuts import render
from monthplan.models import Head
from monthplan.models import Observation, Seance, Correction, Scan
from monthplan.models import Survey, MonthPlanUpload, MonthPlanUploadForm
from monthplan.models import FlightPlan, DataDump, ScanPath
from monthplan.forms import ObsIDSearchForm
from plan.models import LaunchDate
from datetime import datetime as dt
from django.utils import timezone
from django.http import HttpResponse
from django.http import HttpResponseRedirect
from django.http import HttpResponseForbidden
import csv
from django.utils.encoding import smart_str
from django.contrib.auth.models import User
from django.core.mail import send_mail
import pandas as pd
from astropy.table import Table
from astropy.io import fits
import math
from Quaternion import Quat
from django.http import JsonResponse
from django.db.models import Q
def find_closest(now, closest_greater_qs, closest_less_qs):
try:
try:
closest_greater = closest_greater_qs[0]
except IndexError:
closest = closest_less_qs[0]
try:
closest_less = closest_less_qs[0]
except IndexError:
closest = closest_greater_qs[0]
except IndexError:
raise self.model.DoesNotExist("There is no closest object"
" because there are no objects.")
if closest_greater.dtime - now > now - closest_less.dtime:
closest = closest_less
else:
closest = closest_greater
return closest.ra, closest.dec
# Create your views here.
def time(request):
""" AJAX time procedure """
now = timezone.localtime(timezone.now())
data = {
'time': now.strftime('%d %b %Y %H:%M:%S MSK'),
}
return JsonResponse(data)
def wherenow(request):
request_path = request.GET.get('request_path', None)
now = timezone.localtime(timezone.now()) # timezone.now() # dt.now()
target = 'None'
obsid = 'None'
url = '#'
ra = 'n/a'
dec = 'n/a'
mode=''
msg=''
path=list()
try:
#surveys=Survey.objects.filter(experiment__contains="10000100100")
surveys = Survey.objects.filter(Q(start__lte=now) & Q(stop__gte=now))
# take first element only:
for survey in surveys:
target = survey.target
obsid = survey.experiment
url = survey.get_absolute_url()
ra="%.4f" % survey.ra_p
dec="%.4f" % survey.dec_p
mode='Survey'
"""
if request_path == "load":
path=list(survey.surveypath_set.all().values('ra', 'dec'))[::200]
"""
closest_greater_qs = survey.surveypath_set.filter(dtime__gt=now).order_by('dtime')
closest_less_qs = survey.surveypath_set.filter(dtime__lt=now).order_by('-dtime')
closest_ra, closest_dec = find_closest(now, closest_greater_qs, closest_less_qs)
ra="%.4f" % closest_ra
dec="%.4f" % closest_dec
break
except NameError as error:
pass
try:
scans = Scan.objects.filter(Q(start__lte=now) & Q(stop__gte=now))
# take first element only
for scan in scans:
target = scan.target
obsid = scan.experiment
url = scan.get_absolute_url()
ra="%.4f" % scan.ra
dec="%.4f" % scan.dec
mode='Scan'
if request_path == "load":
path=list(scan.scanpath_set.all().values('ra', 'dec'))[::50]
closest_greater_qs = scan.scanpath_set.filter(dtime__gt=now).order_by('dtime')
closest_less_qs = scan.scanpath_set.filter(dtime__lt=now).order_by('-dtime')
try:
try:
closest_greater = closest_greater_qs[0]
except IndexError:
closest = closest_less_qs[0]
try:
closest_less = closest_less_qs[0]
except IndexError:
closest = closest_greater_qs[0]
except IndexError:
raise self.model.DoesNotExist("There is no closest object"
" because there are no objects.")
if closest_greater.dtime - now > now - closest_less.dtime:
closest = closest_less
else:
closest = closest_greater
# update ra, dec
ra="%.4f" % closest.ra
dec="%.4f" % closest.dec
break
except NameError as error:
pass
try:
observations = Observation.objects.filter(Q(start__lte=now) & Q(stop__gte=now))
# take first element only:
for obs in observations:
target = obs.target
obsid = obs.experiment
url = obs.get_absolute_url()
ra="%.4f" % obs.ra
dec="%.4f" % obs.dec
mode='Pointing'
break
except NameError as error:
pass
data = {
'time': now.strftime('%d %b %Y %H:%M:%S MSK'),
'target': target,
'obsid': obsid,
'url': url,
'msg': msg,
'ra': ra, 'dec': dec,
#'observations': list(observations.values()) ,
#'scans': list(scans.values()) ,
#'surveys': list(surveys.values()),
'path':path,
'mode':mode,
}
return JsonResponse(data)
def index(request):
heads = Head.objects.all().order_by('-start')
return render(request,'monthplan/index.html', {'heads': heads,'today':dt.now(),})
def index_flightplan(request):
flightplans = FlightPlan.objects.all()
return render(request,'monthplan/index_flightplan.html', {'flightplans': flightplans,'today':dt.now(),})
def index_flightplan_valid(request):
flightplans = FlightPlan.objects.filter(valid__exact=True)
return render(request,'monthplan/index_flightplan.html', {'flightplans': flightplans,'today':dt.now(),})
def index_flightplan_invalid(request):
flightplans = FlightPlan.objects.filter(valid__exact=False)
return render(request,'monthplan/index_flightplan.html', {'flightplans': flightplans,})
def index_admin(request):
heads = Head.objects.all()
return render(request,'monthplan/index.html', {'heads': heads,'admin':True,})
def show_scans(request):
scans = Scan.objects.all().order_by('start')
return render(request,'monthplan/show_targets.html', {'scans':scans,})
def show_surveys(request):
surveys = Survey.objects.all().order_by('start')
return render(request,'monthplan/show_targets.html', {'surveys':surveys,})
def show_observations(request):
observations = Observation.objects.all().order_by('start')
return render(request,'monthplan/show_targets.html', {'observations': observations,})
def load_monthplan(request):
heads = Head.objects.all()
return render(request,'monthplan/index.html', {'heads': heads,'admin':True,})
def show_monthplan(request, monthplan_id):
try:
head=Head.objects.get(pk=monthplan_id)
except:
return HttpResponse("This month plan was not found")
seances = head.seance_set.all()
observations = head.observation_set.all()
corrections = head.correction_set.all()
scans = head.scan_set.all()
surveys = head.survey_set.all()
seq = range(1,head.nrows+1)
return render(request,'monthplan/show_monthplan.html', {'head': head,
'seances':seances,
'observations':observations,
'corrections':corrections,
'scans':scans,
'surveys':surveys,
'seq':seq,
'today':dt.now(),})
def share_monthplan(request, monthplan_id):
try:
head=Head.objects.get(pk=monthplan_id)
except:
return HttpResponse("This month plan was not found")
seances = head.seance_set.all()
observations = head.observation_set.all()
corrections = head.correction_set.all()
scans = head.scan_set.all()
surveys = head.survey_set.all()
seq = range(1,head.nrows+1)
emails = []
users = User.objects.all()
for user in users:
emails.append(user.email)
mid=head.get_datetime()
email_subject = "SRG-IKI month plan for %s" % (mid.strftime("%B %Y"))
email_body = u"The following SRG month plan for %s has been added/updated: http://193.232.10.38/%s\n\nThis SRG-IKI service email is sent to all registered users on srg.cosmos.ru. This is an automatic email, please do not reply. If you do not want to receive this kind of notifications, please drop a message to Roman Krivonos (krivonos@cosmos.ru)." % (mid.strftime("%B %Y"), head.get_absolute_url())
for email in emails:
send_mail(email_subject,email_body,'krivonos@cosmos.ru',[email])
return render(request,'monthplan/show_monthplan.html', {'head': head,
'seances':seances,
'observations':observations,
'corrections':corrections,
'scans':scans,
'surveys':surveys,
'seq':seq,
'today':dt.now(),})
def show_monthplan_table(request, monthplan_id):
try:
head=Head.objects.get(pk=monthplan_id)
except:
return HttpResponse("This month plan was not found")
seances = head.seance_set.all()
observations = head.observation_set.all()
corrections = head.correction_set.all()
scans = head.scan_set.all()
surveys = head.survey_set.all()
seq = range(1,head.nrows+1)
return render(request,'monthplan/show_monthplan_table.html', {'head': head,
'seances':seances,
'observations':observations,
'corrections':corrections,
'scans':scans,
'surveys':surveys,
'seq':seq,
'today':dt.now(),})
def show_monthplan_aladin(request, monthplan_id):
try:
head=Head.objects.get(pk=monthplan_id)
except:
return HttpResponse("This month plan was not found")
seances = head.seance_set.all()
observations = head.observation_set.all()
corrections = head.correction_set.all()
scans = head.scan_set.all()
surveys = head.survey_set.all()
surveypaths = []
for survey in surveys:
surveypaths.extend(list(survey.surveypath_set.all().order_by('obt')))
seq = range(1,head.nrows+1)
return render(request,'monthplan/show_monthplan_aladin.html', {'surveypaths':surveypaths, 'head': head,
'seances':seances,
'observations':observations,
'corrections':corrections,
'scans':scans,
'surveys':surveys,
'seq':seq,
'today':dt.now(),})
def delete_monthplan(request, monthplan_id):
try:
head=Head.objects.get(pk=monthplan_id)
except:
return HttpResponse("This month plan was not found")
#head.upload.delete()
head.delete()
heads = Head.objects.all()
return render(request,'monthplan/index.html', {'heads': heads,'today':dt.now(),'admin':True,})
def show_observation_info(request, observation_id):
try:
observation=Observation.objects.get(pk=observation_id)
except Observation.DoesNotExist:
return HttpResponseRedirect('/monthplan')
flightplans=FlightPlan.objects.filter(experiment__contains=observation.experiment)
datadumps=observation.datadump_set.all()
return render(request,'monthplan/show_observation_info.html', {'head': observation.head, 'observation': observation,'flightplans':flightplans,'datadumps':datadumps,'today':dt.now(),})
def show_flightplan_info(request, flightplan_id):
try:
pz=FlightPlan.objects.get(pk=flightplan_id)
except FlightPlan.DoesNotExist:
return HttpResponseRedirect('/monthplan')
observations=Observation.objects.filter(experiment__exact=pz.experiment)
scans=Scan.objects.filter(experiment__exact=pz.experiment)
surveys=Survey.objects.filter(experiment__exact=pz.experiment)
return render(request,'monthplan/show_flightplan_info.html', {'observations': observations, 'scans':scans, 'surveys':surveys, 'obs':pz, 'today':dt.now(),})
def show_scan_info(request, scan_id):
try:
scan=Scan.objects.get(pk=scan_id)
except Scan.DoesNotExist:
return HttpResponseRedirect('/monthplan')
flightplans=FlightPlan.objects.filter(experiment__contains=scan.experiment)
datadumps=scan.datadump_set.all()
scanpaths=scan.scanpath_set.all()
return render(request,'monthplan/show_scan_info.html', {'head':scan.head, 'scan': scan,'flightplans':flightplans,'scanpaths':scanpaths,'datadumps':datadumps,'today':dt.now(),})
def scan_download_csv(request, scan_id):
try:
scan=Scan.objects.get(pk=scan_id)
except Scan.DoesNotExist:
return HttpResponseRedirect('/monthplan')
scanpaths=scan.scanpath_set.all()#[::50]
response = HttpResponse(content_type='text/csv')
#decide the file name
response['Content-Disposition'] = 'attachment; filename='+"scan_%s.csv" % (scan.experiment,)
writer = csv.writer(response, csv.excel)
#response.write(u'\ufeff'.encode('utf8'))
response.write(u'# '.encode('utf8'))
#write the headers
writer.writerow([
smart_str(u"MJD"),
smart_str(u"EroDay"),
smart_str(u"RA"),
smart_str(u"Dec"),
])
for path in scanpaths:
writer.writerow([
smart_str(path.mjd),
smart_str(path.eroday),
smart_str(path.ra),
smart_str(path.dec),
])
return response
# fsock = open('/path/to/file.mp3', 'r')
# response = HttpResponse(fsock, content_type='audio/mpeg')
# response['Content-Disposition'] = "attachment; filename=%s - %s.mp3" % \
# (song.artist, song.title)
# return response
def show_tracking(request):
return render(request,'monthplan/show_tracking.html')
def show_survey_info(request, survey_id):
try:
survey=Survey.objects.get(pk=survey_id)
except Survey.DoesNotExist:
return HttpResponseRedirect('/monthplan')
flightplans=FlightPlan.objects.filter(experiment__contains=survey.experiment)
surveypaths=survey.surveypath_set.all().order_by('obt')
plates = survey.surveyhealpixplate_set.all().order_by('healpix')
return render(request,'monthplan/show_survey_info.html', {'head':survey.head, 'surveypaths':surveypaths, 'survey': survey,'flightplans':flightplans,'plates':plates,'today':dt.now(),})
def show_monthplan_seance(request, monthplan_id):
try:
head=Head.objects.get(pk=monthplan_id)
except:
return HttpResponseRedirect('/monthplan')
seances = head.seance_set.all()
return render(request,'monthplan/show_monthplan.html', {'head': head,'seances':seances,'today':dt.now(),})
def show_monthplan_observation(request, monthplan_id):
try:
head=Head.objects.get(pk=monthplan_id)
except Head.DoesNotExist:
return HttpResponseRedirect('/monthplan')
observations = head.observation_set.all()
return render(request,'monthplan/show_monthplan.html', {'head': head,'observations':observations,'today':dt.now(),})
def show_monthplan_correction(request, monthplan_id):
try:
head=Head.objects.get(pk=monthplan_id)
except:
return HttpResponseRedirect('/monthplan')
corrections = head.correction_set.all()
return render(request,'monthplan/show_monthplan.html', {'head': head,'corrections':corrections,'today':dt.now(),})
def show_monthplan_scan(request, monthplan_id):
try:
head=Head.objects.get(pk=monthplan_id)
except:
return HttpResponseRedirect('/monthplan')
scans = head.scan_set.all()
return render(request,'monthplan/show_monthplan.html', {'head': head,'scans':scans,'today':dt.now(),})
def show_monthplan_survey(request, monthplan_id):
try:
head=Head.objects.get(pk=monthplan_id)
except:
return HttpResponseRedirect('/monthplan')
surveys = head.survey_set.all()
return render(request,'monthplan/show_monthplan.html', {'head': head,'surveys':surveys,'today':dt.now(),})
def upload(request):
if not request.user.is_authenticated:
html = "<html><body>You have no rights for this operation.</body></html>"
return HttpResponseForbidden(html)
try:
user_profile=request.user.profile
except:
html = "<html><body>You have no user profile.</body></html>"
return HttpResponseForbidden(html)
if request.method == 'POST':
form = MonthPlanUploadForm(request.POST, request.FILES)
if form.is_valid():
upload = form.save(commit=False)
upload.owner = user_profile
upload.filename = request.FILES['filefield'].name.encode('utf-8')
upload.save()
table = Table.read(upload.filefield.path)
df = table.to_pandas()
hdul = fits.open(upload.filefield.path)
hdul.info()
head=Head()
head.title=upload.title
head.start=hdul[1].header['START']
head.stop=hdul[1].header['STOP']
head.gentime=hdul[1].header['GENTIME']
head.version=hdul[1].header['VERSION']
head.author=hdul[1].header['AUTHOR']
head.nrows=df.shape[0]
head.save()
upload.head=head
upload.save()
for i in df.index:
row=(i+1)
if(df['TYPE'][i].decode().strip() == 'SEANCE'):
print('SEA')
print(df['START'][i].decode().strip())
seance=Seance(head=head, row=row)
seance.start=df['START'][i].decode().strip()
seance.stop=df['STOP'][i].decode().strip()
seance.stations=df['STATIONS'][i].decode().strip()
seance.guid=df['GUID'][i].decode().strip()
seance.save()
if(df['TYPE'][i].decode().strip() == 'OBSERVATION'):
obs=Observation(head=head, row=row)
obs.start=df['START'][i].decode().strip()
obs.stop=df['STOP'][i].decode().strip()
obs.target=df['TARGET'][i].decode().strip()
obs.experiment=df['EXPERIMENT'][i].decode().strip()
obs.ra=df['RA'][i]
obs.dec=df['DEC'][i]
if('RA_OBJ' in df):
obs.ra_obj=df['RA_OBJ'][i]
if('DEC_OBJ' in df):
obs.dec_obj=df['DEC_OBJ'][i]
if('OBJECT' in df):
obs.name_obj=df['OBJECT'][i].decode().strip()
obs.roll_angle=df['ROLL_ANGLE'][i]
obs.sun_xoz_angle=df['SUN_XOZ_ANGLE'][i]
obs.guid=df['GUID'][i].decode().strip()
obs.save()
if(df['TYPE'][i].decode().strip() == 'CORRECTION'):
cor=Correction(head=head, row=row)
cor.start=df['START'][i].decode().strip()
cor.stop=df['STOP'][i].decode().strip()
cor.impstart=df['IMPSTART'][i].decode().strip()
cor.guid=df['GUID'][i].decode().strip()
cor.save()
if(df['TYPE'][i].decode().strip() == 'SCAN'):
print('SCAN')
scan=Scan(head=head, row=row)
scan.start=df['START'][i].decode().strip()
scan.stop=df['STOP'][i].decode().strip()
scan.target=df['TARGET'][i].decode().strip()
scan.experiment=df['EXPERIMENT'][i].decode().strip()
scan.ra=df['RA'][i]
scan.dec=df['DEC'][i]
scan.roll_angle=df['ROLL_ANGLE'][i]
scan.sun_xoz_angle=df['SUN_XOZ_ANGLE'][i]
scan.template=int(df['TEMPLATE'][i].decode().strip())
scan.guid=df['GUID'][i].decode().strip()
scan.save()
if(df['TYPE'][i].decode().strip() == 'SURVEY'):
sur=Survey(head=head, row=row)
sur.start=df['START'][i].decode().strip()
sur.stop=df['STOP'][i].decode().strip()
sur.target=df['TARGET'][i].decode().strip()
sur.experiment=df['EXPERIMENT'][i].decode().strip()
sur.ra_p=df['RA_P'][i]
sur.dec_p=df['DEC_P'][i]
sur.ra_z0=df['RA_Z0'][i]
sur.dec_z0=df['DEC_Z0'][i]
sur.ra_zk=df['RA_ZK'][i]
sur.dec_zk=df['DEC_ZK'][i]
sur.z_speed=df['Z_SPEED'][i]
sur.guid=df['GUID'][i].decode().strip()
sur.save()
return HttpResponseRedirect(head.get_absolute_url())
else:
form = MonthPlanUploadForm()
return render(request,'monthplan/upload.html',
{'UserProfile':user_profile, 'form': form,}
)
def get_obsid_search(request):
# if this is a POST request we need to process the form data
if request.method == 'POST':
# create a form instance and populate it with data from the request:
form = ObsIDSearchForm(request.POST)
# check whether it's valid:
if form.is_valid():
obsid = form.cleaned_data['obsid']
observations=Observation.objects.filter(experiment__contains=obsid)
scans=Scan.objects.filter(experiment__contains=obsid)
surveys=Survey.objects.filter(experiment__contains=obsid)
flightplans=FlightPlan.objects.filter(experiment__contains=obsid)
#ordered=phots.order_by('separation')
return render(request, 'monthplan/obsid_search.html', {'form': form, 'observations':observations,'scans':scans,'surveys':surveys,'flightplans':flightplans,})
# if a GET (or any other method) we'll create a blank form
else:
form = ObsIDSearchForm()
#MyModel.objects.all().annotate(mycolumn=Value('xxx', output_field=CharField()))
return render(request, 'monthplan/obsid_search.html', {'form': form})