From c122b271e965e7034d5f6688c5e0212e4ed71dfe Mon Sep 17 00:00:00 2001 From: tyrin Date: Mon, 16 Sep 2024 10:44:40 +0300 Subject: [PATCH] Indexer: made code more legible, added some comments --- management/commands/indexer_multithread.py | 44 +++++++++++++++------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/management/commands/indexer_multithread.py b/management/commands/indexer_multithread.py index 536da97..63f036a 100644 --- a/management/commands/indexer_multithread.py +++ b/management/commands/indexer_multithread.py @@ -11,32 +11,39 @@ from datetime import datetime, timedelta import gc import json -MAX_WORKERS = 2 * mp.cpu_count() - 1 -CHUNK_SIZE = 1_000 +MAX_WORKERS = 2 * mp.cpu_count() - 1 #max number of workers for multiprocessing +CHUNK_SIZE = 1_000 #size of individual chunk -def current_time(): +def current_time(): #a function for timestamps return (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") -def update_catalog_file_status(catalog_file, status): +def update_catalog_file_status(catalog_file, status): #file status updater function catalog_file.status = status catalog_file.save() def healpix(): - interrupted_files = CatalogFile.objects.filter(status='INDEX_IN_PROGRESS') + + + + interrupted_files = CatalogFile.objects.filter(status='INDEX_IN_PROGRESS') total_interrupted = interrupted_files.count() print(f'[{current_time()}] Found {total_interrupted} files interrupted mid-index. Cleaning up.') - for interrupted_file in interrupted_files: + for interrupted_file in interrupted_files: #handling apparent interrupted files GaiaSource.objects.filter(catalog_file=interrupted_file).update( healpix_ring_index=None, healpix_nested_index=None ) update_catalog_file_status(interrupted_file, 'INGESTED') - ingested_files = CatalogFile.objects.filter(status='INGESTED') + + + ingested_files = CatalogFile.objects.filter(status='INGESTED') #find files ready for indexing total_files = ingested_files.count() print(f'[{current_time()}] There are {total_files} ingested files.') - current_file_index = 0 + + + if os.path.exists('config.json'): with open('config.json', 'r') as config_file: @@ -54,7 +61,13 @@ def healpix(): print(f"[{current_time()}] Selected nside: {nside}") + + + + current_file_index = 0 + for catalog_file in ingested_files: + current_file_index += 1 print(f'[{current_time()}] Processing sources from {catalog_file.name}') sources = list(catalog_file.sources.all()) @@ -70,6 +83,8 @@ def healpix(): frame='fk5' ) + + ring_healpix = ah.HEALPix( nside=nside, order='ring', @@ -77,6 +92,11 @@ def healpix(): ) ring_healpix_indices = ring_healpix.skycoord_to_healpix(skycoord) + for source, healpix_index in zip(sources, ring_healpix_indices): + source.healpix_ring_index = healpix_index + + + nested_healpix = ah.HEALPix( nside=nside, order='nested', @@ -84,23 +104,19 @@ def healpix(): ) nested_healpix_indices = nested_healpix.skycoord_to_healpix(skycoord) - for source, healpix_index in zip(sources, ring_healpix_indices): - source.healpix_ring_index = healpix_index - for source, healpix_index in zip(sources, nested_healpix_indices): source.healpix_nested_index = healpix_index + + print(f'[{current_time()}] Calculations done, updating database.') update_catalog_file_status(catalog_file, 'INDEX_IN_PROGRESS') - source_chunks = [sources[i:i + CHUNK_SIZE] for i in range(0, len(sources), CHUNK_SIZE)] - django.db.connections.close_all() - with mp.Pool(MAX_WORKERS) as pool: pool.map(update_sources, source_chunks, chunksize=10)