153 lines
6.3 KiB
Python

import os
import sys
import json
import glob
import uuid
import asyncio
from datetime import datetime, timedelta
import pandas as pd
import django
from asgiref.sync import sync_to_async
from django.core.management.base import BaseCommand
from GaiaDBInterface.models import GaiaSource, CatalogFile
class Command(BaseCommand):
help = 'Ingest CSV files into the database'
def handle(self, *args, **options):
if os.path.exists('config.json'):
with open('config.json', 'r') as config_file:
config = json.load(config_file)
previous_choice = config.get('directory', '')
else:
previous_choice = ''
#fetching the file list
directory = input(f"Please enter the path to the directory containing the csv files [{previous_choice}]: ")
if not directory:
directory = previous_choice
with open('config.json', 'w') as config_file:
json.dump({'directory': directory}, config_file)
print(f"Selected directory: {directory}")
csv_files = glob.glob(os.path.join(directory, '*csv*'))
self.stdout.write(f"Files found: {len(csv_files)}.")
self.stdout.write("Populating the file database...")
#initialize the counter
new_files_count = 0
#add files as catalogfile instances into the database
for file_path in csv_files:
file_name = os.path.basename(file_path)
#use get_or_create to not add files twice
catalog_file, created = CatalogFile.objects.get_or_create(
name=file_name,
defaults={'uuid': uuid.uuid4(), 'status': 'PENDING'}
)
if created:
new_files_count += 1
#show how many duplicates were already in db
self.stdout.write(f"File database populated. \n{len(csv_files) - new_files_count} were already in the database.")
input("Press Enter to continue...")
#bulk creation function
@sync_to_async
def bulk_create_gaia_sources(instances): #catalog status update to ingested after ingesting the sources
GaiaSource.objects.bulk_create(instances)
@sync_to_async
def update_catalog_file_status(catalog_file, status): #catalog file status updater
catalog_file.status = status
catalog_file.save()
@sync_to_async
def get_all_catalog_files(): #catalog file list getter
return list(CatalogFile.objects.all())
@sync_to_async
def delete_gaiasources_for_catalogfile(catalog_file): #for deleting the sources from partially ingested files in case of improper shutdown
GaiaSource.objects.filter(catalog_file=catalog_file).delete()
@sync_to_async
def count_ingested_files():
return CatalogFile.objects.filter(status='INGESTED').count()
current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S")
self.stdout.write(f"[{current_time}] Starting the data ingestion.")
#function that iterates over all catalog files and ingests sources from them
async def ingest_files():
#catalog_files = CatalogFile.objects.all()
catalog_files = await get_all_catalog_files()
for catalog_file in catalog_files:
if catalog_file.status == 'INGESTED':
self.stdout.write(f"Skipping {catalog_file.name} as it is already ingested.")
continue
file_path = os.path.join(directory, catalog_file.name)
#self.stdout.write(file_path)
if os.path.exists(file_path): #check if the file exists at all just in case
pass
if catalog_file.status == 'IN_PROGRESS':
self.stdout.write(f"{catalog_file.name} seems to have been interrupted, starting over.")#if the file status is in_progress, it must've
await delete_gaiasources_for_catalogfile(catalog_file) #been interrupted mid-ingest, delete all the
#associated sources if there are any and start over
#comment # to avoid the ecsv metadata lines
#switched to header=1000 and pyarrow backend for speed
df = pd.read_csv(
file_path,
#comment='#',
header=1000,
engine="pyarrow"
)
gaiasource_fields = [field.name for field in GaiaSource._meta.get_fields()] #get fields from the model
common_fields = [field for field in gaiasource_fields if field in df.columns] #find common fields between the df and the model
#this way we can add new fields to the model and
df_filtered = df[common_fields] #the code will pick them up here
data_dict = df_filtered.to_dict(orient='records') #translate the df into a dict
gaia_source_instances = [
GaiaSource(**data, catalog_file=catalog_file) for data in data_dict #create gaiasource instances, set the foreignkey
]
await update_catalog_file_status(catalog_file, 'IN_PROGRESS')
await bulk_create_gaia_sources(gaia_source_instances) #bulk-create instances from the dict
await update_catalog_file_status(catalog_file,'INGESTED') #update the catalogfile instance status field to 'INGESTED'
ingested_files_count = await count_ingested_files()
current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") #Timestamp and progress self.stdout.write statement
self.stdout.write(f"[{current_time}] {ingested_files_count}/{len(catalog_files)}")
asyncio.run(ingest_files())