Indexer: switched to user input (with previous value support) for the NSIDE value. Testing indexer_multithread version added. Ingester: minor code tweaks
This commit is contained in:
parent
e83b23a35c
commit
eec4ebbbfc
@ -4,6 +4,9 @@ from astropy.coordinates import SkyCoord
|
|||||||
import astropy_healpix as ah
|
import astropy_healpix as ah
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
import gc
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -18,13 +21,41 @@ def update_catalog_file_status(catalog_file, status): #catalog file
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
def healpix(nside): #NSIDE 2048, ORDER ring or nested
|
def healpix(): #NSIDE 2048, ORDER ring or nested
|
||||||
|
|
||||||
|
interrupted_files = CatalogFile.objects.filter(status='INDEX_IN_PROGRESS')
|
||||||
|
total_interrupted = interrupted_files.count()
|
||||||
|
print(f'[{current_time()}] Found {total_interrupted} files interrupted mid-index. Cleaning up.')
|
||||||
|
|
||||||
|
for interrupted_file in interrupted_files:
|
||||||
|
GaiaSource.objects.filter(catalog_file=interrupted_file).update(
|
||||||
|
healpix_ring_index=None,
|
||||||
|
healpix_nested_index=None
|
||||||
|
)
|
||||||
|
update_catalog_file_status(interrupted_file, 'INGESTED')
|
||||||
|
|
||||||
ingested_files = CatalogFile.objects.filter(status='INGESTED')
|
ingested_files = CatalogFile.objects.filter(status='INGESTED')
|
||||||
total_files = ingested_files.count()
|
total_files = ingested_files.count()
|
||||||
print(f'[{current_time()}] There are {total_files} ingested files.')
|
print(f'[{current_time()}] There are {total_files} ingested files.')
|
||||||
current_file_index = 0
|
current_file_index = 0
|
||||||
|
|
||||||
|
if os.path.exists('config.json'):
|
||||||
|
with open('config.json', 'r') as config_file:
|
||||||
|
config = json.load(config_file)
|
||||||
|
previous_choice = config.get('nside', '')
|
||||||
|
else:
|
||||||
|
previous_choice = ''
|
||||||
|
|
||||||
|
nside = input(f'[{current_time()}] Please enter the NSIDE HEALPix parameter [{previous_choice}]:')
|
||||||
|
|
||||||
|
if not nside:
|
||||||
|
nside = previous_choice
|
||||||
|
|
||||||
|
with open('config.json', 'w') as config_file:
|
||||||
|
json.dump({'nside': nside}, config_file)
|
||||||
|
|
||||||
|
print(f"[{current_time()}] Selected nside: {nside}")
|
||||||
|
|
||||||
for catalog_file in ingested_files:
|
for catalog_file in ingested_files:
|
||||||
|
|
||||||
current_file_index += 1
|
current_file_index += 1
|
||||||
@ -33,14 +64,7 @@ def healpix(nside): #NSIDE 2048, ORDER ring or nested
|
|||||||
sources = list(catalog_file.sources.all())
|
sources = list(catalog_file.sources.all())
|
||||||
print(f'[{current_time()}] Sources loaded, calculating indices.')
|
print(f'[{current_time()}] Sources loaded, calculating indices.')
|
||||||
|
|
||||||
if catalog_file.status == 'INDEX_IN_PROGRESS':
|
|
||||||
#sources.update(healpix_ring_index=None, healpix_nested_index=None)
|
|
||||||
GaiaSource.objects.filter(catalog_file=catalog_file).update(
|
|
||||||
healpix_ring_index=None,
|
|
||||||
healpix_nested_index=None
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
ra_list = np.array([source.ra for source in sources])
|
ra_list = np.array([source.ra for source in sources])
|
||||||
dec_list = np.array([source.dec for source in sources])
|
dec_list = np.array([source.dec for source in sources])
|
||||||
|
|
||||||
@ -83,7 +107,13 @@ def healpix(nside): #NSIDE 2048, ORDER ring or nested
|
|||||||
|
|
||||||
update_catalog_file_status(catalog_file, 'INDEXED')
|
update_catalog_file_status(catalog_file, 'INDEXED')
|
||||||
|
|
||||||
print(f'[{current_time()}] Database updated, sources indexed successfully.')
|
print(f'[{current_time()}] Database updated, sources indexed successfully. Initializing garbage collection.')
|
||||||
|
|
||||||
|
del sources, ra_list, dec_list, skycoord, ring_healpix, ring_healpix_indices, nested_healpix, nested_healpix_indices
|
||||||
|
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
|
print(f'[{current_time()}] Garbage collection complete.')
|
||||||
print(f'[{current_time()}] Progress: {current_file_index}/{total_files} files indexed.')
|
print(f'[{current_time()}] Progress: {current_file_index}/{total_files} files indexed.')
|
||||||
|
|
||||||
|
|
||||||
@ -91,18 +121,6 @@ def healpix(nside): #NSIDE 2048, ORDER ring or nested
|
|||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
help = 'Index sources using healpix.'
|
help = 'Index sources using healpix.'
|
||||||
|
|
||||||
|
|
||||||
def add_arguments(self, parser):
|
|
||||||
parser.add_argument(
|
|
||||||
'nside',
|
|
||||||
type=int,
|
|
||||||
help='NSIDE parameter for HEALPix'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def handle(self, *args, **options):
|
def handle(self, *args, **options):
|
||||||
nside = options['nside']
|
healpix()
|
||||||
|
|
||||||
healpix(nside)
|
|
||||||
|
|
126
management/commands/indexer_multithread.py
Normal file
126
management/commands/indexer_multithread.py
Normal file
@ -0,0 +1,126 @@
|
|||||||
|
import os
|
||||||
|
import django
|
||||||
|
import multiprocessing as mp
|
||||||
|
import time
|
||||||
|
from django.core.management.base import BaseCommand
|
||||||
|
from GaiaDBInterface.models import GaiaSource, CatalogFile
|
||||||
|
from astropy.coordinates import SkyCoord
|
||||||
|
import astropy_healpix as ah
|
||||||
|
import numpy as np
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
import gc
|
||||||
|
import json
|
||||||
|
|
||||||
|
MAX_WORKERS = 2 * mp.cpu_count() - 1
|
||||||
|
CHUNK_SIZE = 1_000
|
||||||
|
|
||||||
|
def current_time():
|
||||||
|
return (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S")
|
||||||
|
|
||||||
|
def update_catalog_file_status(catalog_file, status):
|
||||||
|
catalog_file.status = status
|
||||||
|
catalog_file.save()
|
||||||
|
|
||||||
|
def healpix():
|
||||||
|
interrupted_files = CatalogFile.objects.filter(status='INDEX_IN_PROGRESS')
|
||||||
|
total_interrupted = interrupted_files.count()
|
||||||
|
print(f'[{current_time()}] Found {total_interrupted} files interrupted mid-index. Cleaning up.')
|
||||||
|
|
||||||
|
for interrupted_file in interrupted_files:
|
||||||
|
GaiaSource.objects.filter(catalog_file=interrupted_file).update(
|
||||||
|
healpix_ring_index=None,
|
||||||
|
healpix_nested_index=None
|
||||||
|
)
|
||||||
|
update_catalog_file_status(interrupted_file, 'INGESTED')
|
||||||
|
|
||||||
|
ingested_files = CatalogFile.objects.filter(status='INGESTED')
|
||||||
|
total_files = ingested_files.count()
|
||||||
|
print(f'[{current_time()}] There are {total_files} ingested files.')
|
||||||
|
current_file_index = 0
|
||||||
|
|
||||||
|
if os.path.exists('config.json'):
|
||||||
|
with open('config.json', 'r') as config_file:
|
||||||
|
config = json.load(config_file)
|
||||||
|
previous_choice = config.get('nside', '')
|
||||||
|
else:
|
||||||
|
previous_choice = ''
|
||||||
|
|
||||||
|
nside = input(f'[{current_time()}] Please enter the NSIDE HEALPix parameter [{previous_choice}]:')
|
||||||
|
if not nside:
|
||||||
|
nside = previous_choice
|
||||||
|
|
||||||
|
with open('config.json', 'w') as config_file:
|
||||||
|
json.dump({'nside': nside}, config_file)
|
||||||
|
|
||||||
|
print(f"[{current_time()}] Selected nside: {nside}")
|
||||||
|
|
||||||
|
for catalog_file in ingested_files:
|
||||||
|
current_file_index += 1
|
||||||
|
print(f'[{current_time()}] Processing sources from {catalog_file.name}')
|
||||||
|
sources = list(catalog_file.sources.all())
|
||||||
|
print(f'[{current_time()}] Sources loaded, calculating indices.')
|
||||||
|
|
||||||
|
ra_list = np.array([source.ra for source in sources])
|
||||||
|
dec_list = np.array([source.dec for source in sources])
|
||||||
|
|
||||||
|
skycoord = SkyCoord(
|
||||||
|
ra=ra_list,
|
||||||
|
dec=dec_list,
|
||||||
|
unit='deg',
|
||||||
|
frame='fk5'
|
||||||
|
)
|
||||||
|
|
||||||
|
ring_healpix = ah.HEALPix(
|
||||||
|
nside=nside,
|
||||||
|
order='ring',
|
||||||
|
frame='fk5'
|
||||||
|
)
|
||||||
|
ring_healpix_indices = ring_healpix.skycoord_to_healpix(skycoord)
|
||||||
|
|
||||||
|
nested_healpix = ah.HEALPix(
|
||||||
|
nside=nside,
|
||||||
|
order='nested',
|
||||||
|
frame='fk5'
|
||||||
|
)
|
||||||
|
nested_healpix_indices = nested_healpix.skycoord_to_healpix(skycoord)
|
||||||
|
|
||||||
|
for source, healpix_index in zip(sources, ring_healpix_indices):
|
||||||
|
source.healpix_ring_index = healpix_index
|
||||||
|
|
||||||
|
for source, healpix_index in zip(sources, nested_healpix_indices):
|
||||||
|
source.healpix_nested_index = healpix_index
|
||||||
|
|
||||||
|
print(f'[{current_time()}] Calculations done, updating database.')
|
||||||
|
|
||||||
|
update_catalog_file_status(catalog_file, 'INDEX_IN_PROGRESS')
|
||||||
|
|
||||||
|
# Split the sources into chunks for each process to work on.
|
||||||
|
source_chunks = [sources[i:i + CHUNK_SIZE] for i in range(0, len(sources), CHUNK_SIZE)]
|
||||||
|
|
||||||
|
# Close the connection before forking.
|
||||||
|
django.db.connections.close_all()
|
||||||
|
|
||||||
|
# Create a pool of processes and run the update_sources function on each chunk.
|
||||||
|
with mp.Pool(MAX_WORKERS) as pool:
|
||||||
|
pool.map(update_sources, source_chunks, chunksize=10)
|
||||||
|
|
||||||
|
update_catalog_file_status(catalog_file, 'INDEXED')
|
||||||
|
|
||||||
|
print(f'[{current_time()}] Database updated, sources indexed successfully. Initializing garbage collection.')
|
||||||
|
|
||||||
|
del sources, ra_list, dec_list, skycoord, ring_healpix, ring_healpix_indices, nested_healpix, nested_healpix_indices
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
|
print(f'[{current_time()}] Garbage collection complete.')
|
||||||
|
print(f'[{current_time()}] Progress: {current_file_index}/{total_files} files indexed.')
|
||||||
|
|
||||||
|
def update_sources(source_chunk):
|
||||||
|
GaiaSource.objects.bulk_update(source_chunk, ['healpix_ring_index', 'healpix_nested_index'])
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
help = 'Index sources using healpix.'
|
||||||
|
def handle(self, *args, **options):
|
||||||
|
healpix()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
@ -166,11 +166,8 @@ class Command(BaseCommand):
|
|||||||
#Timestamp and progress self.stdout.write statement
|
#Timestamp and progress self.stdout.write statement
|
||||||
self.stdout.write(f"[{current_time()}] {ingested_files_count}/{len(catalog_files)}")
|
self.stdout.write(f"[{current_time()}] {ingested_files_count}/{len(catalog_files)}")
|
||||||
|
|
||||||
del df
|
del df, gaiasource_fields, common_fields, df_filtered, data_dict
|
||||||
del gaiasource_fields
|
|
||||||
del common_fields
|
|
||||||
del df_filtered
|
|
||||||
del data_dict
|
|
||||||
gc.collect()
|
gc.collect()
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user