import os import sys import glob import uuid import asyncio from datetime import datetime, timedelta import django from asgiref.sync import sync_to_async import pandas as pd # #environment init for django # current_dir = os.getcwd() # relative_path = os.path.join(current_dir, 'gaia_orm') # os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'gaia_orm.settings') # sys.path.append(os.path.normpath(relative_path)) # #sys.path.append('/home/kityr/practiceproject/gaia_orm') # django.setup() #import models for both the sources and the files from sample_app.models import GaiaSource, CatalogFile #fetching the file list directory = input("Please enter the path to the directory containing the csv files (or leave empty if the files are in the same directory as this script): ") csv_files = glob.glob(os.path.join(directory, '*csv*')) print(f"Files found: {len(csv_files)}.") print("Populating the file database...") #initialize the counter new_files_count = 0 #add files as catalogfile instances into the database for file_path in csv_files: file_name = os.path.basename(file_path) #use get_or_create to not add files twice catalog_file, created = CatalogFile.objects.get_or_create( name=file_name, defaults={'uuid': uuid.uuid4(), 'status': 'PENDING'} ) if created: new_files_count += 1 #show how many duplicates were already in db print(f"File database populated. {len(csv_files) - new_files_count} were already in the database.") input("Press Enter to continue...") #bulk creation function @sync_to_async def bulk_create_gaia_sources(instances): #catalog status update to ingested after ingesting the sources GaiaSource.objects.bulk_create(instances) @sync_to_async def update_catalog_file_status(catalog_file, status): #catalog file status updater catalog_file.status = status catalog_file.save() @sync_to_async def get_all_catalog_files(): #catalog file list getter return list(CatalogFile.objects.all()) @sync_to_async def delete_gaiasources_for_catalogfile(catalog_file): #for deleting the sources from partially ingested files in case of improper shutdown GaiaSource.objects.filter(catalog_file=catalog_file).delete() @sync_to_async def count_ingested_files(): return CatalogFile.objects.filter(status='INGESTED').count() current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") print(f"[{current_time}] Starting the data ingestion.") #function that iterates over all catalog files and ingests sources from them async def ingest_files(): #catalog_files = CatalogFile.objects.all() catalog_files = await get_all_catalog_files() for catalog_file in catalog_files: if catalog_file.status == 'INGESTED': print(f"Skipping {catalog_file.name} as it is already ingested.") continue file_path = os.path.join(directory, catalog_file.name) #print(file_path) if os.path.exists(file_path): #check if the file exists at all just in case if catalog_file.status == 'IN_PROGRESS': print(f"{catalog_file.name} seems to have been interrupted, starting over.")#if the file status is in_progress, it must've await delete_gaiasources_for_catalogfile(catalog_file) #been interrupted mid-ingest, delete all the #associated sources if there are any and start over #comment # to avoid the ecsv metadata lines #switched to header=1000 and pyarrow backend for speed df = pd.read_csv( file_path, #comment='#', header=1000, engine="pyarrow" ) gaiasource_fields = [field.name for field in GaiaSource._meta.get_fields()] #get fields from the model common_fields = [field for field in gaiasource_fields if field in df.columns] #find common fields between the df and the model #this way we can add new fields to the model and df_filtered = df[common_fields] #the code will pick them up here data_dict = df_filtered.to_dict(orient='records') #translate the df into a dict gaia_source_instances = [ GaiaSource(**data, catalog_file=catalog_file) for data in data_dict #create gaiasource instances, set the foreignkey ] await update_catalog_file_status(catalog_file, 'IN_PROGRESS') await bulk_create_gaia_sources(gaia_source_instances) #bulk-create instances from the dict await update_catalog_file_status(catalog_file,'INGESTED') #update the catalogfile instance status field to 'INGESTED' ingested_files_count = await count_ingested_files() current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") #Timestamp and progress print statement print(f"[{current_time}] {ingested_files_count}/{len(catalog_files)}") asyncio.run(ingest_files())