diff --git a/management/commands/indexer_multithread.py b/management/commands/indexer_multithread.py index 2352687..536da97 100644 --- a/management/commands/indexer_multithread.py +++ b/management/commands/indexer_multithread.py @@ -94,13 +94,13 @@ def healpix(): update_catalog_file_status(catalog_file, 'INDEX_IN_PROGRESS') - # Split the sources into chunks for each process to work on. + source_chunks = [sources[i:i + CHUNK_SIZE] for i in range(0, len(sources), CHUNK_SIZE)] - # Close the connection before forking. + django.db.connections.close_all() - # Create a pool of processes and run the update_sources function on each chunk. + with mp.Pool(MAX_WORKERS) as pool: pool.map(update_sources, source_chunks, chunksize=10) @@ -115,7 +115,7 @@ def healpix(): print(f'[{current_time()}] Progress: {current_file_index}/{total_files} files indexed.') def update_sources(source_chunk): - GaiaSource.objects.bulk_update(source_chunk, ['healpix_ring_index', 'healpix_nested_index']) + GaiaSource.objects.bulk_update(source_chunk, ['healpix_ring_index', 'healpix_nested_index'], batch_size = 1000) class Command(BaseCommand): help = 'Index sources using healpix.' diff --git a/management/commands/ingester.py b/management/commands/ingester.py index a6f40ab..c53fa6b 100644 --- a/management/commands/ingester.py +++ b/management/commands/ingester.py @@ -14,6 +14,9 @@ from asgiref.sync import sync_to_async from django.core.management.base import BaseCommand from GaiaDBInterface.models import GaiaSource, CatalogFile + + + def input_with_timeout(prompt, timeout=30): print(prompt, end='', flush=True) ready, _, _ = select.select([sys.stdin], [], [], timeout) @@ -167,7 +170,7 @@ class Command(BaseCommand): self.stdout.write(f"[{current_time()}] {ingested_files_count}/{len(catalog_files)}") del df, gaiasource_fields, common_fields, df_filtered, data_dict - + gc.collect()