GaiaDBInterface/ingester.py
2024-09-10 12:28:01 +03:00

144 lines
5.3 KiB
Python

import os
import sys
import glob
import uuid
import asyncio
from datetime import datetime, timedelta
import django
from asgiref.sync import sync_to_async
import pandas as pd
#environment init for django
current_dir = os.getcwd()
relative_path = os.path.join(current_dir, 'gaia_orm')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'gaia_orm.settings')
sys.path.append(os.path.normpath(relative_path))
#sys.path.append('/home/kityr/practiceproject/gaia_orm')
django.setup()
#import models for both the sources and the files
from sample_app.models import GaiaSource, CatalogFile
#fetching the file list
directory = input("Please enter the path to the directory containing the csv files (or leave empty if the files are in the same directory as this script): ")
csv_files = glob.glob(os.path.join(directory, '*csv*'))
print(f"Files found: {len(csv_files)}.")
print("Populating the file database...")
#initialize the counter
new_files_count = 0
#add files as catalogfile instances into the database
for file_path in csv_files:
file_name = os.path.basename(file_path)
#use get_or_create to not add files twice
catalog_file, created = CatalogFile.objects.get_or_create(
name=file_name,
defaults={'uuid': uuid.uuid4(), 'status': 'PENDING'}
)
if created:
new_files_count += 1
#show how many duplicates were already in db
print(f"File database populated. {len(csv_files) - new_files_count} were already in the database.")
input("Press Enter to continue...")
#bulk creation function
@sync_to_async
def bulk_create_gaia_sources(instances): #catalog status update to ingested after ingesting the sources
GaiaSource.objects.bulk_create(instances)
@sync_to_async
def update_catalog_file_status(catalog_file, status): #catalog file status updater
catalog_file.status = status
catalog_file.save()
@sync_to_async
def get_all_catalog_files(): #catalog file list getter
return list(CatalogFile.objects.all())
@sync_to_async
def delete_gaiasources_for_catalogfile(catalog_file): #for deleting the sources from partially ingested files in case of improper shutdown
GaiaSource.objects.filter(catalog_file=catalog_file).delete()
@sync_to_async
def count_ingested_files():
return CatalogFile.objects.filter(status='INGESTED').count()
current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S")
print(f"[{current_time}] Starting the data ingestion.")
#function that iterates over all catalog files and ingests sources from them
async def ingest_files():
#catalog_files = CatalogFile.objects.all()
catalog_files = await get_all_catalog_files()
for catalog_file in catalog_files:
if catalog_file.status == 'INGESTED':
print(f"Skipping {catalog_file.name} as it is already ingested.")
continue
file_path = os.path.join(directory, catalog_file.name)
#print(file_path)
if os.path.exists(file_path): #check if the file exists at all just in case
if catalog_file.status == 'IN_PROGRESS':
print(f"{catalog_file.name} seems to have been interrupted, starting over.")#if the file status is in_progress, it must've
await delete_gaiasources_for_catalogfile(catalog_file) #been interrupted mid-ingest, delete all the
#associated sources if there are any and start over
#comment # to avoid the ecsv metadata lines
#switched to header=1000 and pyarrow backend for speed
df = pd.read_csv(
file_path,
#comment='#',
header=1000,
engine="pyarrow"
)
gaiasource_fields = [field.name for field in GaiaSource._meta.get_fields()] #get fields from the model
common_fields = [field for field in gaiasource_fields if field in df.columns] #find common fields between the df and the model
#this way we can add new fields to the model and
df_filtered = df[common_fields] #the code will pick them up here
data_dict = df_filtered.to_dict(orient='records') #translate the df into a dict
gaia_source_instances = [
GaiaSource(**data, catalog_file=catalog_file) for data in data_dict #create gaiasource instances, set the foreignkey
]
await update_catalog_file_status(catalog_file, 'IN_PROGRESS')
await bulk_create_gaia_sources(gaia_source_instances) #bulk-create instances from the dict
await update_catalog_file_status(catalog_file,'INGESTED') #update the catalogfile instance status field to 'INGESTED'
ingested_files_count = await count_ingested_files()
current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") #Timestamp and progress print statement
print(f"[{current_time}] {ingested_files_count}/{len(catalog_files)}")
asyncio.run(ingest_files())