import os import sys import json import glob import uuid import asyncio from datetime import datetime, timedelta import pandas as pd import django from asgiref.sync import sync_to_async from django.core.management.base import BaseCommand from sample_app.models import GaiaSource, CatalogFile class Command(BaseCommand): help = 'Ingest CSV files into the database' def handle(self, *args, **options): if os.path.exists('config.json'): with open('config.json', 'r') as config_file: config = json.load(config_file) previous_choice = config.get('directory', '') else: previous_choice = '' #fetching the file list directory = input(f"Please enter the path to the directory containing the csv files [{previous_choice}]: ") if not directory: directory = previous_choice with open('config.json', 'w') as config_file: json.dump({'directory': directory}, config_file) print(f"Selected directory: {directory}") csv_files = glob.glob(os.path.join(directory, '*csv*')) self.stdout.write(f"Files found: {len(csv_files)}.") self.stdout.write("Populating the file database...") #initialize the counter new_files_count = 0 #add files as catalogfile instances into the database for file_path in csv_files: file_name = os.path.basename(file_path) #use get_or_create to not add files twice catalog_file, created = CatalogFile.objects.get_or_create( name=file_name, defaults={'uuid': uuid.uuid4(), 'status': 'PENDING'} ) if created: new_files_count += 1 #show how many duplicates were already in db self.stdout.write(f"File database populated. \n{len(csv_files) - new_files_count} were already in the database.") input("Press Enter to continue...") #bulk creation function @sync_to_async def bulk_create_gaia_sources(instances): #catalog status update to ingested after ingesting the sources GaiaSource.objects.bulk_create(instances) @sync_to_async def update_catalog_file_status(catalog_file, status): #catalog file status updater catalog_file.status = status catalog_file.save() @sync_to_async def get_all_catalog_files(): #catalog file list getter return list(CatalogFile.objects.all()) @sync_to_async def delete_gaiasources_for_catalogfile(catalog_file): #for deleting the sources from partially ingested files in case of improper shutdown GaiaSource.objects.filter(catalog_file=catalog_file).delete() @sync_to_async def count_ingested_files(): return CatalogFile.objects.filter(status='INGESTED').count() current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") self.stdout.write(f"[{current_time}] Starting the data ingestion.") #function that iterates over all catalog files and ingests sources from them async def ingest_files(): #catalog_files = CatalogFile.objects.all() catalog_files = await get_all_catalog_files() for catalog_file in catalog_files: if catalog_file.status == 'INGESTED': self.stdout.write(f"Skipping {catalog_file.name} as it is already ingested.") continue file_path = os.path.join(directory, catalog_file.name) #self.stdout.write(file_path) if os.path.exists(file_path): #check if the file exists at all just in case pass if catalog_file.status == 'IN_PROGRESS': self.stdout.write(f"{catalog_file.name} seems to have been interrupted, starting over.")#if the file status is in_progress, it must've await delete_gaiasources_for_catalogfile(catalog_file) #been interrupted mid-ingest, delete all the #associated sources if there are any and start over #comment # to avoid the ecsv metadata lines #switched to header=1000 and pyarrow backend for speed df = pd.read_csv( file_path, #comment='#', header=1000, engine="pyarrow" ) gaiasource_fields = [field.name for field in GaiaSource._meta.get_fields()] #get fields from the model common_fields = [field for field in gaiasource_fields if field in df.columns] #find common fields between the df and the model #this way we can add new fields to the model and df_filtered = df[common_fields] #the code will pick them up here data_dict = df_filtered.to_dict(orient='records') #translate the df into a dict gaia_source_instances = [ GaiaSource(**data, catalog_file=catalog_file) for data in data_dict #create gaiasource instances, set the foreignkey ] await update_catalog_file_status(catalog_file, 'IN_PROGRESS') await bulk_create_gaia_sources(gaia_source_instances) #bulk-create instances from the dict await update_catalog_file_status(catalog_file,'INGESTED') #update the catalogfile instance status field to 'INGESTED' ingested_files_count = await count_ingested_files() current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") #Timestamp and progress self.stdout.write statement self.stdout.write(f"[{current_time}] {ingested_files_count}/{len(catalog_files)}") asyncio.run(ingest_files())