Indexer: additional work on the multithreaded version

This commit is contained in:
Никита Тырин 2024-09-16 10:35:43 +03:00
parent eec4ebbbfc
commit 862efc5cbd
2 changed files with 8 additions and 5 deletions

View File

@ -94,13 +94,13 @@ def healpix():
update_catalog_file_status(catalog_file, 'INDEX_IN_PROGRESS')
# Split the sources into chunks for each process to work on.
source_chunks = [sources[i:i + CHUNK_SIZE] for i in range(0, len(sources), CHUNK_SIZE)]
# Close the connection before forking.
django.db.connections.close_all()
# Create a pool of processes and run the update_sources function on each chunk.
with mp.Pool(MAX_WORKERS) as pool:
pool.map(update_sources, source_chunks, chunksize=10)
@ -115,7 +115,7 @@ def healpix():
print(f'[{current_time()}] Progress: {current_file_index}/{total_files} files indexed.')
def update_sources(source_chunk):
GaiaSource.objects.bulk_update(source_chunk, ['healpix_ring_index', 'healpix_nested_index'])
GaiaSource.objects.bulk_update(source_chunk, ['healpix_ring_index', 'healpix_nested_index'], batch_size = 1000)
class Command(BaseCommand):
help = 'Index sources using healpix.'

View File

@ -14,6 +14,9 @@ from asgiref.sync import sync_to_async
from django.core.management.base import BaseCommand
from GaiaDBInterface.models import GaiaSource, CatalogFile
def input_with_timeout(prompt, timeout=30):
print(prompt, end='', flush=True)
ready, _, _ = select.select([sys.stdin], [], [], timeout)
@ -167,7 +170,7 @@ class Command(BaseCommand):
self.stdout.write(f"[{current_time()}] {ingested_files_count}/{len(catalog_files)}")
del df, gaiasource_fields, common_fields, df_filtered, data_dict
gc.collect()