GaiaDBInterface/management/commands/indexer_multithread.py

143 lines
4.5 KiB
Python

import os
import django
import multiprocessing as mp
import time
from django.core.management.base import BaseCommand
from GaiaDBInterface.models import GaiaSource, CatalogFile
from astropy.coordinates import SkyCoord
import astropy_healpix as ah
import numpy as np
from datetime import datetime, timedelta
import gc
import json
MAX_WORKERS = 2 * mp.cpu_count() - 1 #max number of workers for multiprocessing
CHUNK_SIZE = 1_000 #size of individual chunk
def current_time(): #a function for timestamps
return (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S")
def update_catalog_file_status(catalog_file, status): #file status updater function
catalog_file.status = status
catalog_file.save()
def healpix():
interrupted_files = CatalogFile.objects.filter(status='INDEX_IN_PROGRESS')
total_interrupted = interrupted_files.count()
print(f'[{current_time()}] Found {total_interrupted} files interrupted mid-index. Cleaning up.')
for interrupted_file in interrupted_files: #handling apparent interrupted files
GaiaSource.objects.filter(catalog_file=interrupted_file).update(
healpix_ring_index=None,
healpix_nested_index=None
)
update_catalog_file_status(interrupted_file, 'INGESTED')
ingested_files = CatalogFile.objects.filter(status='INGESTED') #find files ready for indexing
total_files = ingested_files.count()
print(f'[{current_time()}] There are {total_files} ingested files.')
if os.path.exists('config.json'):
with open('config.json', 'r') as config_file:
config = json.load(config_file)
previous_choice = config.get('nside', '')
else:
previous_choice = ''
nside = input(f'[{current_time()}] Please enter the NSIDE HEALPix parameter [{previous_choice}]:')
if not nside:
nside = previous_choice
with open('config.json', 'w') as config_file:
json.dump({'nside': nside}, config_file)
print(f"[{current_time()}] Selected nside: {nside}")
current_file_index = 0
for catalog_file in ingested_files:
current_file_index += 1
print(f'[{current_time()}] Processing sources from {catalog_file.name}')
sources = list(catalog_file.sources.all())
print(f'[{current_time()}] Sources loaded, calculating indices.')
ra_list = np.array([source.ra for source in sources])
dec_list = np.array([source.dec for source in sources])
skycoord = SkyCoord(
ra=ra_list,
dec=dec_list,
unit='deg',
frame='fk5'
)
ring_healpix = ah.HEALPix(
nside=nside,
order='ring',
frame='fk5'
)
ring_healpix_indices = ring_healpix.skycoord_to_healpix(skycoord)
for source, healpix_index in zip(sources, ring_healpix_indices):
source.healpix_ring_index = healpix_index
nested_healpix = ah.HEALPix(
nside=nside,
order='nested',
frame='fk5'
)
nested_healpix_indices = nested_healpix.skycoord_to_healpix(skycoord)
for source, healpix_index in zip(sources, nested_healpix_indices):
source.healpix_nested_index = healpix_index
print(f'[{current_time()}] Calculations done, updating database.')
update_catalog_file_status(catalog_file, 'INDEX_IN_PROGRESS')
source_chunks = [sources[i:i + CHUNK_SIZE] for i in range(0, len(sources), CHUNK_SIZE)]
django.db.connections.close_all()
with mp.Pool(MAX_WORKERS) as pool:
pool.map(update_sources, source_chunks, chunksize=10)
update_catalog_file_status(catalog_file, 'INDEXED')
print(f'[{current_time()}] Database updated, sources indexed successfully. Initializing garbage collection.')
del sources, ra_list, dec_list, skycoord, ring_healpix, ring_healpix_indices, nested_healpix, nested_healpix_indices
gc.collect()
print(f'[{current_time()}] Garbage collection complete.')
print(f'[{current_time()}] Progress: {current_file_index}/{total_files} files indexed.')
def update_sources(source_chunk):
GaiaSource.objects.bulk_update(source_chunk, ['healpix_ring_index', 'healpix_nested_index'], batch_size = 1000)
class Command(BaseCommand):
help = 'Index sources using healpix.'
def handle(self, *args, **options):
healpix()
if __name__ == "__main__":
main()