import os import gc import time import sys import json import glob import uuid import asyncio from datetime import datetime, timedelta import pandas as pd import django from asgiref.sync import sync_to_async from django.core.management.base import BaseCommand from GaiaDBInterface.models import GaiaSource, CatalogFile class Command(BaseCommand): help = 'Ingest CSV files into the database' def handle(self, *args, **options): if os.path.exists('config.json'): with open('config.json', 'r') as config_file: config = json.load(config_file) previous_choice = config.get('directory', '') else: previous_choice = '' #fetching the file list directory = input(f"Please enter the path to the directory containing the csv files [{previous_choice}]: ") if not directory: directory = previous_choice with open('config.json', 'w') as config_file: json.dump({'directory': directory}, config_file) print(f"Selected directory: {directory}") csv_files = glob.glob(os.path.join(directory, '*csv*')) self.stdout.write(f"Files found: {len(csv_files)}.") self.stdout.write("Populating the file database...") #initialize the counter new_files_count = 0 #add files as catalogfile instances into the database for file_path in csv_files: file_name = os.path.basename(file_path) #use get_or_create to not add files twice catalog_file, created = CatalogFile.objects.get_or_create( name=file_name, defaults={'uuid': uuid.uuid4(), 'status': 'PENDING'} ) if created: new_files_count += 1 #show how many duplicates were already in db self.stdout.write(f"File database populated. \n{len(csv_files) - new_files_count} were already in the database.") input("Press Enter to continue...") #bulk creation function @sync_to_async def bulk_create_gaia_sources(instances): #catalog status update to ingested after ingesting the sources GaiaSource.objects.bulk_create(instances) @sync_to_async def update_catalog_file_status(catalog_file, status): #catalog file status updater catalog_file.status = status catalog_file.save() @sync_to_async def get_all_catalog_files(): #catalog file list getter return list(CatalogFile.objects.all()) @sync_to_async def delete_gaiasources_for_catalogfile(catalog_file): #for deleting the sources from partially ingested files in case of improper shutdown GaiaSource.objects.filter(catalog_file=catalog_file).delete() @sync_to_async def count_ingested_files(): return CatalogFile.objects.filter(status='INGESTED').count() current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") self.stdout.write(f"[{current_time}] Starting the data ingestion.") #function that iterates over all catalog files and ingests sources from them async def ingest_files(): #garbage collection trigger init gc_interval = 10 * 60 # 10 min last_gc_time = time.time() #catalog_files = CatalogFile.objects.all() catalog_files = await get_all_catalog_files() for catalog_file in catalog_files: if catalog_file.status == 'INGESTED': self.stdout.write(f"Skipping {catalog_file.name} as it is already ingested.") continue file_path = os.path.join(directory, catalog_file.name) #self.stdout.write(file_path) if os.path.exists(file_path): #check if the file exists at all just in case pass if catalog_file.status == 'IN_PROGRESS': self.stdout.write(f"{catalog_file.name} seems to have been interrupted, starting over.")#if the file status is in_progress, it must've await delete_gaiasources_for_catalogfile(catalog_file) #been interrupted mid-ingest, delete all the #associated sources if there are any and start over #comment # to avoid the ecsv metadata lines #switched to header=1000 and pyarrow backend for speed df = pd.read_csv( file_path, #comment='#', header=1000, engine="pyarrow" ) gaiasource_fields = [field.name for field in GaiaSource._meta.get_fields()] #get fields from the model common_fields = [field for field in gaiasource_fields if field in df.columns] #find common fields between the df and the model #this way we can add new fields to the model and df_filtered = df[common_fields] #the code will pick them up here data_dict = df_filtered.to_dict(orient='records') #translate the df into a dict gaia_source_instances = [ GaiaSource(**data, catalog_file=catalog_file) for data in data_dict #create gaiasource instances, set the foreignkey ] await update_catalog_file_status(catalog_file, 'IN_PROGRESS') await bulk_create_gaia_sources(gaia_source_instances) #bulk-create instances from the dict await update_catalog_file_status(catalog_file,'INGESTED') #update the catalogfile instance status field to 'INGESTED' ingested_files_count = await count_ingested_files() current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") #Timestamp and progress self.stdout.write statement self.stdout.write(f"[{current_time}] {ingested_files_count}/{len(catalog_files)}") #garbage collection timer logic current_time = time.time() if current_time - last_gc_time >= gc_interval: gc.collect() last_gc_time = current_time self.stdout.write(f"[{current_time}] Garbage collection triggered.") asyncio.run(ingest_files())