From 3a9ba2f32062855bee3edf545957f826843e7d1c Mon Sep 17 00:00:00 2001 From: tyrin Date: Tue, 10 Sep 2024 14:28:57 +0300 Subject: [PATCH] reorganization --- gaia_orm/settings.py | 3 +- gaia_orm/urls.py | 4 +- sample_app/management/commands/ingester.py | 242 +++++++++--------- sample_app/migrations/0001_initial.py | 4 +- .../0002_alter_catalogfile_status.py | 18 -- sample_app/serializers.py | 7 - sample_app/urls.py | 6 - sample_app/views.py | 12 - 8 files changed, 119 insertions(+), 177 deletions(-) delete mode 100644 sample_app/migrations/0002_alter_catalogfile_status.py delete mode 100644 sample_app/serializers.py diff --git a/gaia_orm/settings.py b/gaia_orm/settings.py index 8387ea1..7c4cd33 100644 --- a/gaia_orm/settings.py +++ b/gaia_orm/settings.py @@ -20,7 +20,7 @@ BASE_DIR = Path(__file__).resolve().parent.parent # See https://docs.djangoproject.com/en/5.1/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! -SECRET_KEY = 'django-insecure-k_$r^3e!9ycqnt0=+ur&sx#hsl44_+v3=al2$_gpnh3u^w!xj$' +SECRET_KEY = 'django-insecure-m!o^q^+en&_v%64&m8%d^%_olkzf7$8jbp0^4dph2=1rn=666m' # SECURITY WARNING: don't run with debug turned on in production! DEBUG = True @@ -31,7 +31,6 @@ ALLOWED_HOSTS = [] # Application definition INSTALLED_APPS = [ - #'rest_framework', 'sample_app', 'django.contrib.admin', 'django.contrib.auth', diff --git a/gaia_orm/urls.py b/gaia_orm/urls.py index 3723feb..58f2198 100644 --- a/gaia_orm/urls.py +++ b/gaia_orm/urls.py @@ -15,10 +15,8 @@ Including another URLconf 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) """ from django.contrib import admin -from django.urls import path, include - +from django.urls import path urlpatterns = [ path('admin/', admin.site.urls), - path('api/', include('sample_app.urls')), # Include your app's URLs ] diff --git a/sample_app/management/commands/ingester.py b/sample_app/management/commands/ingester.py index 0bc3733..fd253fd 100644 --- a/sample_app/management/commands/ingester.py +++ b/sample_app/management/commands/ingester.py @@ -4,141 +4,129 @@ import glob import uuid import asyncio from datetime import datetime, timedelta - +import pandas as pd import django from asgiref.sync import sync_to_async - -import pandas as pd - - - -# #environment init for django -# current_dir = os.getcwd() -# relative_path = os.path.join(current_dir, 'gaia_orm') -# os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'gaia_orm.settings') - -# sys.path.append(os.path.normpath(relative_path)) -# #sys.path.append('/home/kityr/practiceproject/gaia_orm') -# django.setup() - - - -#import models for both the sources and the files +from django.core.management.base import BaseCommand from sample_app.models import GaiaSource, CatalogFile +class Command(BaseCommand): + help = 'Ingest CSV files into the database' - -#fetching the file list -directory = input("Please enter the path to the directory containing the csv files (or leave empty if the files are in the same directory as this script): ") -csv_files = glob.glob(os.path.join(directory, '*csv*')) -print(f"Files found: {len(csv_files)}.") -print("Populating the file database...") -#initialize the counter -new_files_count = 0 -#add files as catalogfile instances into the database -for file_path in csv_files: - file_name = os.path.basename(file_path) - #use get_or_create to not add files twice - catalog_file, created = CatalogFile.objects.get_or_create( - name=file_name, - defaults={'uuid': uuid.uuid4(), 'status': 'PENDING'} - ) - - if created: - new_files_count += 1 -#show how many duplicates were already in db -print(f"File database populated. {len(csv_files) - new_files_count} were already in the database.") - - - -input("Press Enter to continue...") - - - -#bulk creation function -@sync_to_async -def bulk_create_gaia_sources(instances): #catalog status update to ingested after ingesting the sources - GaiaSource.objects.bulk_create(instances) - -@sync_to_async -def update_catalog_file_status(catalog_file, status): #catalog file status updater - catalog_file.status = status - catalog_file.save() - -@sync_to_async -def get_all_catalog_files(): #catalog file list getter - return list(CatalogFile.objects.all()) - -@sync_to_async -def delete_gaiasources_for_catalogfile(catalog_file): #for deleting the sources from partially ingested files in case of improper shutdown - GaiaSource.objects.filter(catalog_file=catalog_file).delete() - -@sync_to_async -def count_ingested_files(): - return CatalogFile.objects.filter(status='INGESTED').count() - - - -current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") -print(f"[{current_time}] Starting the data ingestion.") - -#function that iterates over all catalog files and ingests sources from them -async def ingest_files(): - - #catalog_files = CatalogFile.objects.all() - - catalog_files = await get_all_catalog_files() - - for catalog_file in catalog_files: - - if catalog_file.status == 'INGESTED': - print(f"Skipping {catalog_file.name} as it is already ingested.") - continue - - file_path = os.path.join(directory, catalog_file.name) - - #print(file_path) + def handle(self, *args, **options): - if os.path.exists(file_path): #check if the file exists at all just in case + #fetching the file list + directory = input("Please enter the path to the directory containing the csv files (or leave empty if the files are in the same directory as this script): ") + csv_files = glob.glob(os.path.join(directory, '*csv*')) + self.stdout.write(f"Files found: {len(csv_files)}.") + self.stdout.write("Populating the file database...") + #initialize the counter + new_files_count = 0 + #add files as catalogfile instances into the database + for file_path in csv_files: + file_name = os.path.basename(file_path) + #use get_or_create to not add files twice + catalog_file, created = CatalogFile.objects.get_or_create( + name=file_name, + defaults={'uuid': uuid.uuid4(), 'status': 'PENDING'} + ) - if catalog_file.status == 'IN_PROGRESS': - print(f"{catalog_file.name} seems to have been interrupted, starting over.")#if the file status is in_progress, it must've - await delete_gaiasources_for_catalogfile(catalog_file) #been interrupted mid-ingest, delete all the - #associated sources if there are any and start over - - #comment # to avoid the ecsv metadata lines - #switched to header=1000 and pyarrow backend for speed - df = pd.read_csv( - file_path, - #comment='#', - header=1000, - engine="pyarrow" - ) - - gaiasource_fields = [field.name for field in GaiaSource._meta.get_fields()] #get fields from the model - - common_fields = [field for field in gaiasource_fields if field in df.columns] #find common fields between the df and the model - #this way we can add new fields to the model and - df_filtered = df[common_fields] #the code will pick them up here - - data_dict = df_filtered.to_dict(orient='records') #translate the df into a dict - - gaia_source_instances = [ - GaiaSource(**data, catalog_file=catalog_file) for data in data_dict #create gaiasource instances, set the foreignkey - ] - - await update_catalog_file_status(catalog_file, 'IN_PROGRESS') - - await bulk_create_gaia_sources(gaia_source_instances) #bulk-create instances from the dict - - await update_catalog_file_status(catalog_file,'INGESTED') #update the catalogfile instance status field to 'INGESTED' - - ingested_files_count = await count_ingested_files() - - current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") #Timestamp and progress print statement - print(f"[{current_time}] {ingested_files_count}/{len(catalog_files)}") + if created: + new_files_count += 1 + #show how many duplicates were already in db + self.stdout.write(f"File database populated. {len(csv_files) - new_files_count} were already in the database.") -asyncio.run(ingest_files()) \ No newline at end of file + input("Press Enter to continue...") + + + + #bulk creation function + @sync_to_async + def bulk_create_gaia_sources(instances): #catalog status update to ingested after ingesting the sources + GaiaSource.objects.bulk_create(instances) + + @sync_to_async + def update_catalog_file_status(catalog_file, status): #catalog file status updater + catalog_file.status = status + catalog_file.save() + + @sync_to_async + def get_all_catalog_files(): #catalog file list getter + return list(CatalogFile.objects.all()) + + @sync_to_async + def delete_gaiasources_for_catalogfile(catalog_file): #for deleting the sources from partially ingested files in case of improper shutdown + GaiaSource.objects.filter(catalog_file=catalog_file).delete() + + @sync_to_async + def count_ingested_files(): + return CatalogFile.objects.filter(status='INGESTED').count() + + + + current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") + self.stdout.write(f"[{current_time}] Starting the data ingestion.") + + #function that iterates over all catalog files and ingests sources from them + async def ingest_files(): + + #catalog_files = CatalogFile.objects.all() + + catalog_files = await get_all_catalog_files() + + for catalog_file in catalog_files: + + if catalog_file.status == 'INGESTED': + self.stdout.write(f"Skipping {catalog_file.name} as it is already ingested.") + continue + + file_path = os.path.join(directory, catalog_file.name) + + #self.stdout.write(file_path) + + if os.path.exists(file_path): #check if the file exists at all just in case + pass + + if catalog_file.status == 'IN_PROGRESS': + self.stdout.write(f"{catalog_file.name} seems to have been interrupted, starting over.")#if the file status is in_progress, it must've + await delete_gaiasources_for_catalogfile(catalog_file) #been interrupted mid-ingest, delete all the + #associated sources if there are any and start over + + #comment # to avoid the ecsv metadata lines + #switched to header=1000 and pyarrow backend for speed + df = pd.read_csv( + file_path, + #comment='#', + header=1000, + engine="pyarrow" + ) + + gaiasource_fields = [field.name for field in GaiaSource._meta.get_fields()] #get fields from the model + + common_fields = [field for field in gaiasource_fields if field in df.columns] #find common fields between the df and the model + #this way we can add new fields to the model and + df_filtered = df[common_fields] #the code will pick them up here + + data_dict = df_filtered.to_dict(orient='records') #translate the df into a dict + + gaia_source_instances = [ + GaiaSource(**data, catalog_file=catalog_file) for data in data_dict #create gaiasource instances, set the foreignkey + ] + + await update_catalog_file_status(catalog_file, 'IN_PROGRESS') + + await bulk_create_gaia_sources(gaia_source_instances) #bulk-create instances from the dict + + await update_catalog_file_status(catalog_file,'INGESTED') #update the catalogfile instance status field to 'INGESTED' + + ingested_files_count = await count_ingested_files() + + current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") #Timestamp and progress self.stdout.write statement + self.stdout.write(f"[{current_time}] {ingested_files_count}/{len(catalog_files)}") + + + + asyncio.run(ingest_files()) \ No newline at end of file diff --git a/sample_app/migrations/0001_initial.py b/sample_app/migrations/0001_initial.py index b18cb0f..ec2d4bf 100644 --- a/sample_app/migrations/0001_initial.py +++ b/sample_app/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.1 on 2024-09-09 12:01 +# Generated by Django 5.1.1 on 2024-09-10 11:15 import django.db.models.deletion import uuid @@ -18,7 +18,7 @@ class Migration(migrations.Migration): fields=[ ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), ('name', models.CharField(max_length=32)), - ('status', models.CharField(choices=[('PENDING', 'Pending'), ('INGESTED', 'Ingested'), ('INDEXED', 'Indexed')], default='PENDING', max_length=10)), + ('status', models.CharField(choices=[('PENDING', 'Pending'), ('IN_PROGRESS', 'In Progress'), ('INGESTED', 'Ingested'), ('INDEXED', 'Indexed')], default='PENDING', max_length=11)), ], ), migrations.CreateModel( diff --git a/sample_app/migrations/0002_alter_catalogfile_status.py b/sample_app/migrations/0002_alter_catalogfile_status.py deleted file mode 100644 index 6231877..0000000 --- a/sample_app/migrations/0002_alter_catalogfile_status.py +++ /dev/null @@ -1,18 +0,0 @@ -# Generated by Django 5.1.1 on 2024-09-09 14:26 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ('sample_app', '0001_initial'), - ] - - operations = [ - migrations.AlterField( - model_name='catalogfile', - name='status', - field=models.CharField(choices=[('PENDING', 'Pending'), ('IN_PROGRESS', 'In Progress'), ('INGESTED', 'Ingested'), ('INDEXED', 'Indexed')], default='PENDING', max_length=11), - ), - ] diff --git a/sample_app/serializers.py b/sample_app/serializers.py deleted file mode 100644 index 227d010..0000000 --- a/sample_app/serializers.py +++ /dev/null @@ -1,7 +0,0 @@ -from rest_framework import serializers -from .models import GaiaSource - -class GaiaSourceSerializer(serializers.ModelSerializer): - class Meta: - model = GaiaSource - fields = '__all__' diff --git a/sample_app/urls.py b/sample_app/urls.py index dc04be1..8b13789 100644 --- a/sample_app/urls.py +++ b/sample_app/urls.py @@ -1,7 +1 @@ -from django.urls import path -from .views import GaiaSourceListCreate, GaiaSourceDetail -urlpatterns = [ - path('GaiaSource/', GaiaSourceListCreate.as_view(), name='GaiaSource-list-create'), - path('GaiaSource//', GaiaSourceDetail.as_view(), name='GaiaSource-detail'), -] diff --git a/sample_app/views.py b/sample_app/views.py index b138ed4..2536b37 100644 --- a/sample_app/views.py +++ b/sample_app/views.py @@ -1,13 +1 @@ from django.shortcuts import render - -from rest_framework import generics -from .models import GaiaSource -from .serializers import GaiaSourceSerializer - -class GaiaSourceListCreate(generics.ListCreateAPIView): - queryset = GaiaSource.objects.all() - serializer_class = GaiaSourceSerializer - -class GaiaSourceDetail(generics.RetrieveUpdateDestroyAPIView): - queryset = GaiaSource.objects.all() - serializer_class = GaiaSourceSerializer