diff --git a/management/commands/indexer.py b/management/commands/indexer.py index 322fc1a..e6b5b3d 100644 --- a/management/commands/indexer.py +++ b/management/commands/indexer.py @@ -4,6 +4,9 @@ from astropy.coordinates import SkyCoord import astropy_healpix as ah import numpy as np from datetime import datetime, timedelta +import gc +import os +import json @@ -18,13 +21,41 @@ def update_catalog_file_status(catalog_file, status): #catalog file -def healpix(nside): #NSIDE 2048, ORDER ring or nested +def healpix(): #NSIDE 2048, ORDER ring or nested + + interrupted_files = CatalogFile.objects.filter(status='INDEX_IN_PROGRESS') + total_interrupted = interrupted_files.count() + print(f'[{current_time()}] Found {total_interrupted} files interrupted mid-index. Cleaning up.') + + for interrupted_file in interrupted_files: + GaiaSource.objects.filter(catalog_file=interrupted_file).update( + healpix_ring_index=None, + healpix_nested_index=None + ) + update_catalog_file_status(interrupted_file, 'INGESTED') ingested_files = CatalogFile.objects.filter(status='INGESTED') total_files = ingested_files.count() print(f'[{current_time()}] There are {total_files} ingested files.') current_file_index = 0 + if os.path.exists('config.json'): + with open('config.json', 'r') as config_file: + config = json.load(config_file) + previous_choice = config.get('nside', '') + else: + previous_choice = '' + + nside = input(f'[{current_time()}] Please enter the NSIDE HEALPix parameter [{previous_choice}]:') + + if not nside: + nside = previous_choice + + with open('config.json', 'w') as config_file: + json.dump({'nside': nside}, config_file) + + print(f"[{current_time()}] Selected nside: {nside}") + for catalog_file in ingested_files: current_file_index += 1 @@ -33,14 +64,7 @@ def healpix(nside): #NSIDE 2048, ORDER ring or nested sources = list(catalog_file.sources.all()) print(f'[{current_time()}] Sources loaded, calculating indices.') - if catalog_file.status == 'INDEX_IN_PROGRESS': - #sources.update(healpix_ring_index=None, healpix_nested_index=None) - GaiaSource.objects.filter(catalog_file=catalog_file).update( - healpix_ring_index=None, - healpix_nested_index=None - ) - - + ra_list = np.array([source.ra for source in sources]) dec_list = np.array([source.dec for source in sources]) @@ -83,7 +107,13 @@ def healpix(nside): #NSIDE 2048, ORDER ring or nested update_catalog_file_status(catalog_file, 'INDEXED') - print(f'[{current_time()}] Database updated, sources indexed successfully.') + print(f'[{current_time()}] Database updated, sources indexed successfully. Initializing garbage collection.') + + del sources, ra_list, dec_list, skycoord, ring_healpix, ring_healpix_indices, nested_healpix, nested_healpix_indices + + gc.collect() + + print(f'[{current_time()}] Garbage collection complete.') print(f'[{current_time()}] Progress: {current_file_index}/{total_files} files indexed.') @@ -91,18 +121,6 @@ def healpix(nside): #NSIDE 2048, ORDER ring or nested class Command(BaseCommand): help = 'Index sources using healpix.' - - - def add_arguments(self, parser): - parser.add_argument( - 'nside', - type=int, - help='NSIDE parameter for HEALPix' - ) - - def handle(self, *args, **options): - nside = options['nside'] - - healpix(nside) + healpix() \ No newline at end of file diff --git a/management/commands/indexer_multithread.py b/management/commands/indexer_multithread.py new file mode 100644 index 0000000..2352687 --- /dev/null +++ b/management/commands/indexer_multithread.py @@ -0,0 +1,126 @@ +import os +import django +import multiprocessing as mp +import time +from django.core.management.base import BaseCommand +from GaiaDBInterface.models import GaiaSource, CatalogFile +from astropy.coordinates import SkyCoord +import astropy_healpix as ah +import numpy as np +from datetime import datetime, timedelta +import gc +import json + +MAX_WORKERS = 2 * mp.cpu_count() - 1 +CHUNK_SIZE = 1_000 + +def current_time(): + return (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") + +def update_catalog_file_status(catalog_file, status): + catalog_file.status = status + catalog_file.save() + +def healpix(): + interrupted_files = CatalogFile.objects.filter(status='INDEX_IN_PROGRESS') + total_interrupted = interrupted_files.count() + print(f'[{current_time()}] Found {total_interrupted} files interrupted mid-index. Cleaning up.') + + for interrupted_file in interrupted_files: + GaiaSource.objects.filter(catalog_file=interrupted_file).update( + healpix_ring_index=None, + healpix_nested_index=None + ) + update_catalog_file_status(interrupted_file, 'INGESTED') + + ingested_files = CatalogFile.objects.filter(status='INGESTED') + total_files = ingested_files.count() + print(f'[{current_time()}] There are {total_files} ingested files.') + current_file_index = 0 + + if os.path.exists('config.json'): + with open('config.json', 'r') as config_file: + config = json.load(config_file) + previous_choice = config.get('nside', '') + else: + previous_choice = '' + + nside = input(f'[{current_time()}] Please enter the NSIDE HEALPix parameter [{previous_choice}]:') + if not nside: + nside = previous_choice + + with open('config.json', 'w') as config_file: + json.dump({'nside': nside}, config_file) + + print(f"[{current_time()}] Selected nside: {nside}") + + for catalog_file in ingested_files: + current_file_index += 1 + print(f'[{current_time()}] Processing sources from {catalog_file.name}') + sources = list(catalog_file.sources.all()) + print(f'[{current_time()}] Sources loaded, calculating indices.') + + ra_list = np.array([source.ra for source in sources]) + dec_list = np.array([source.dec for source in sources]) + + skycoord = SkyCoord( + ra=ra_list, + dec=dec_list, + unit='deg', + frame='fk5' + ) + + ring_healpix = ah.HEALPix( + nside=nside, + order='ring', + frame='fk5' + ) + ring_healpix_indices = ring_healpix.skycoord_to_healpix(skycoord) + + nested_healpix = ah.HEALPix( + nside=nside, + order='nested', + frame='fk5' + ) + nested_healpix_indices = nested_healpix.skycoord_to_healpix(skycoord) + + for source, healpix_index in zip(sources, ring_healpix_indices): + source.healpix_ring_index = healpix_index + + for source, healpix_index in zip(sources, nested_healpix_indices): + source.healpix_nested_index = healpix_index + + print(f'[{current_time()}] Calculations done, updating database.') + + update_catalog_file_status(catalog_file, 'INDEX_IN_PROGRESS') + + # Split the sources into chunks for each process to work on. + source_chunks = [sources[i:i + CHUNK_SIZE] for i in range(0, len(sources), CHUNK_SIZE)] + + # Close the connection before forking. + django.db.connections.close_all() + + # Create a pool of processes and run the update_sources function on each chunk. + with mp.Pool(MAX_WORKERS) as pool: + pool.map(update_sources, source_chunks, chunksize=10) + + update_catalog_file_status(catalog_file, 'INDEXED') + + print(f'[{current_time()}] Database updated, sources indexed successfully. Initializing garbage collection.') + + del sources, ra_list, dec_list, skycoord, ring_healpix, ring_healpix_indices, nested_healpix, nested_healpix_indices + gc.collect() + + print(f'[{current_time()}] Garbage collection complete.') + print(f'[{current_time()}] Progress: {current_file_index}/{total_files} files indexed.') + +def update_sources(source_chunk): + GaiaSource.objects.bulk_update(source_chunk, ['healpix_ring_index', 'healpix_nested_index']) + +class Command(BaseCommand): + help = 'Index sources using healpix.' + def handle(self, *args, **options): + healpix() + +if __name__ == "__main__": + main() diff --git a/management/commands/ingester.py b/management/commands/ingester.py index f8b8528..a6f40ab 100644 --- a/management/commands/ingester.py +++ b/management/commands/ingester.py @@ -166,11 +166,8 @@ class Command(BaseCommand): #Timestamp and progress self.stdout.write statement self.stdout.write(f"[{current_time()}] {ingested_files_count}/{len(catalog_files)}") - del df - del gaiasource_fields - del common_fields - del df_filtered - del data_dict + del df, gaiasource_fields, common_fields, df_filtered, data_dict + gc.collect()