Experimental fix to the memory leak issue in ingester. Additional work on the indexer
This commit is contained in:
parent
e5663e3c30
commit
c27ac6ce97
@ -16,6 +16,8 @@ def healpix(nside): #NSIDE 2048, ORDER ring or nested
|
|||||||
|
|
||||||
for catalog_file in CatalogFile.objects.filter(status='INGESTED'):
|
for catalog_file in CatalogFile.objects.filter(status='INGESTED'):
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
print(f'[{current_time()}] Loading sources from {catalog_file.name}...')
|
print(f'[{current_time()}] Loading sources from {catalog_file.name}...')
|
||||||
sources = list(catalog_file.sources.all())
|
sources = list(catalog_file.sources.all())
|
||||||
print(f'[{current_time()}] Sources ready. Forming ra & dec arrays...')
|
print(f'[{current_time()}] Sources ready. Forming ra & dec arrays...')
|
||||||
|
@ -7,7 +7,6 @@ import glob
|
|||||||
import uuid
|
import uuid
|
||||||
import asyncio
|
import asyncio
|
||||||
import select
|
import select
|
||||||
import tracemalloc
|
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import django
|
import django
|
||||||
@ -146,19 +145,19 @@ class Command(BaseCommand):
|
|||||||
|
|
||||||
data_dict = df_filtered.to_dict(orient='records') #translate the df into a dict
|
data_dict = df_filtered.to_dict(orient='records') #translate the df into a dict
|
||||||
|
|
||||||
#df = None #free up memory
|
|
||||||
|
|
||||||
gaia_source_instances = [
|
gaia_source_instances = [
|
||||||
GaiaSource(**data, catalog_file=catalog_file) for data in data_dict #create gaiasource instances, set the foreignkey
|
GaiaSource(**data, catalog_file=catalog_file) for data in data_dict #create gaiasource instances, set the foreignkey
|
||||||
]
|
]
|
||||||
|
|
||||||
#data_dict = None #free up memory
|
|
||||||
|
|
||||||
await update_catalog_file_status(catalog_file, 'IN_PROGRESS')
|
await update_catalog_file_status(catalog_file, 'IN_PROGRESS')
|
||||||
|
|
||||||
await bulk_create_gaia_sources(gaia_source_instances) #bulk-create instances from the dict
|
await bulk_create_gaia_sources(gaia_source_instances) #bulk-create instances from the dict
|
||||||
|
|
||||||
#gaia_source_instances = None #free up memory
|
|
||||||
|
|
||||||
await update_catalog_file_status(catalog_file,'INGESTED') #update the catalogfile instance status field to 'INGESTED'
|
await update_catalog_file_status(catalog_file,'INGESTED') #update the catalogfile instance status field to 'INGESTED'
|
||||||
|
|
||||||
@ -166,6 +165,14 @@ class Command(BaseCommand):
|
|||||||
|
|
||||||
#Timestamp and progress self.stdout.write statement
|
#Timestamp and progress self.stdout.write statement
|
||||||
self.stdout.write(f"[{current_time()}] {ingested_files_count}/{len(catalog_files)}")
|
self.stdout.write(f"[{current_time()}] {ingested_files_count}/{len(catalog_files)}")
|
||||||
|
|
||||||
|
del df
|
||||||
|
del gaiasource_fields
|
||||||
|
del common_fields
|
||||||
|
del df_filtered
|
||||||
|
del data_dict
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
|
|
||||||
# Create a completion flag file
|
# Create a completion flag file
|
||||||
with open("ingester_done.flag", "w") as f:
|
with open("ingester_done.flag", "w") as f:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user