reorganization
This commit is contained in:
parent
0f1088a066
commit
3a9ba2f320
@ -20,7 +20,7 @@ BASE_DIR = Path(__file__).resolve().parent.parent
|
|||||||
# See https://docs.djangoproject.com/en/5.1/howto/deployment/checklist/
|
# See https://docs.djangoproject.com/en/5.1/howto/deployment/checklist/
|
||||||
|
|
||||||
# SECURITY WARNING: keep the secret key used in production secret!
|
# SECURITY WARNING: keep the secret key used in production secret!
|
||||||
SECRET_KEY = 'django-insecure-k_$r^3e!9ycqnt0=+ur&sx#hsl44_+v3=al2$_gpnh3u^w!xj$'
|
SECRET_KEY = 'django-insecure-m!o^q^+en&_v%64&m8%d^%_olkzf7$8jbp0^4dph2=1rn=666m'
|
||||||
|
|
||||||
# SECURITY WARNING: don't run with debug turned on in production!
|
# SECURITY WARNING: don't run with debug turned on in production!
|
||||||
DEBUG = True
|
DEBUG = True
|
||||||
@ -31,7 +31,6 @@ ALLOWED_HOSTS = []
|
|||||||
# Application definition
|
# Application definition
|
||||||
|
|
||||||
INSTALLED_APPS = [
|
INSTALLED_APPS = [
|
||||||
#'rest_framework',
|
|
||||||
'sample_app',
|
'sample_app',
|
||||||
'django.contrib.admin',
|
'django.contrib.admin',
|
||||||
'django.contrib.auth',
|
'django.contrib.auth',
|
||||||
|
@ -15,10 +15,8 @@ Including another URLconf
|
|||||||
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
|
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
|
||||||
"""
|
"""
|
||||||
from django.contrib import admin
|
from django.contrib import admin
|
||||||
from django.urls import path, include
|
from django.urls import path
|
||||||
|
|
||||||
|
|
||||||
urlpatterns = [
|
urlpatterns = [
|
||||||
path('admin/', admin.site.urls),
|
path('admin/', admin.site.urls),
|
||||||
path('api/', include('sample_app.urls')), # Include your app's URLs
|
|
||||||
]
|
]
|
||||||
|
@ -4,141 +4,129 @@ import glob
|
|||||||
import uuid
|
import uuid
|
||||||
import asyncio
|
import asyncio
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
import pandas as pd
|
||||||
import django
|
import django
|
||||||
from asgiref.sync import sync_to_async
|
from asgiref.sync import sync_to_async
|
||||||
|
from django.core.management.base import BaseCommand
|
||||||
import pandas as pd
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# #environment init for django
|
|
||||||
# current_dir = os.getcwd()
|
|
||||||
# relative_path = os.path.join(current_dir, 'gaia_orm')
|
|
||||||
# os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'gaia_orm.settings')
|
|
||||||
|
|
||||||
# sys.path.append(os.path.normpath(relative_path))
|
|
||||||
# #sys.path.append('/home/kityr/practiceproject/gaia_orm')
|
|
||||||
# django.setup()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#import models for both the sources and the files
|
|
||||||
from sample_app.models import GaiaSource, CatalogFile
|
from sample_app.models import GaiaSource, CatalogFile
|
||||||
|
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
help = 'Ingest CSV files into the database'
|
||||||
|
|
||||||
|
def handle(self, *args, **options):
|
||||||
|
|
||||||
#fetching the file list
|
#fetching the file list
|
||||||
directory = input("Please enter the path to the directory containing the csv files (or leave empty if the files are in the same directory as this script): ")
|
directory = input("Please enter the path to the directory containing the csv files (or leave empty if the files are in the same directory as this script): ")
|
||||||
csv_files = glob.glob(os.path.join(directory, '*csv*'))
|
csv_files = glob.glob(os.path.join(directory, '*csv*'))
|
||||||
print(f"Files found: {len(csv_files)}.")
|
self.stdout.write(f"Files found: {len(csv_files)}.")
|
||||||
print("Populating the file database...")
|
self.stdout.write("Populating the file database...")
|
||||||
#initialize the counter
|
#initialize the counter
|
||||||
new_files_count = 0
|
new_files_count = 0
|
||||||
#add files as catalogfile instances into the database
|
#add files as catalogfile instances into the database
|
||||||
for file_path in csv_files:
|
for file_path in csv_files:
|
||||||
file_name = os.path.basename(file_path)
|
file_name = os.path.basename(file_path)
|
||||||
#use get_or_create to not add files twice
|
#use get_or_create to not add files twice
|
||||||
catalog_file, created = CatalogFile.objects.get_or_create(
|
catalog_file, created = CatalogFile.objects.get_or_create(
|
||||||
name=file_name,
|
name=file_name,
|
||||||
defaults={'uuid': uuid.uuid4(), 'status': 'PENDING'}
|
defaults={'uuid': uuid.uuid4(), 'status': 'PENDING'}
|
||||||
)
|
|
||||||
|
|
||||||
if created:
|
|
||||||
new_files_count += 1
|
|
||||||
#show how many duplicates were already in db
|
|
||||||
print(f"File database populated. {len(csv_files) - new_files_count} were already in the database.")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
input("Press Enter to continue...")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#bulk creation function
|
|
||||||
@sync_to_async
|
|
||||||
def bulk_create_gaia_sources(instances): #catalog status update to ingested after ingesting the sources
|
|
||||||
GaiaSource.objects.bulk_create(instances)
|
|
||||||
|
|
||||||
@sync_to_async
|
|
||||||
def update_catalog_file_status(catalog_file, status): #catalog file status updater
|
|
||||||
catalog_file.status = status
|
|
||||||
catalog_file.save()
|
|
||||||
|
|
||||||
@sync_to_async
|
|
||||||
def get_all_catalog_files(): #catalog file list getter
|
|
||||||
return list(CatalogFile.objects.all())
|
|
||||||
|
|
||||||
@sync_to_async
|
|
||||||
def delete_gaiasources_for_catalogfile(catalog_file): #for deleting the sources from partially ingested files in case of improper shutdown
|
|
||||||
GaiaSource.objects.filter(catalog_file=catalog_file).delete()
|
|
||||||
|
|
||||||
@sync_to_async
|
|
||||||
def count_ingested_files():
|
|
||||||
return CatalogFile.objects.filter(status='INGESTED').count()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S")
|
|
||||||
print(f"[{current_time}] Starting the data ingestion.")
|
|
||||||
|
|
||||||
#function that iterates over all catalog files and ingests sources from them
|
|
||||||
async def ingest_files():
|
|
||||||
|
|
||||||
#catalog_files = CatalogFile.objects.all()
|
|
||||||
|
|
||||||
catalog_files = await get_all_catalog_files()
|
|
||||||
|
|
||||||
for catalog_file in catalog_files:
|
|
||||||
|
|
||||||
if catalog_file.status == 'INGESTED':
|
|
||||||
print(f"Skipping {catalog_file.name} as it is already ingested.")
|
|
||||||
continue
|
|
||||||
|
|
||||||
file_path = os.path.join(directory, catalog_file.name)
|
|
||||||
|
|
||||||
#print(file_path)
|
|
||||||
|
|
||||||
if os.path.exists(file_path): #check if the file exists at all just in case
|
|
||||||
|
|
||||||
if catalog_file.status == 'IN_PROGRESS':
|
|
||||||
print(f"{catalog_file.name} seems to have been interrupted, starting over.")#if the file status is in_progress, it must've
|
|
||||||
await delete_gaiasources_for_catalogfile(catalog_file) #been interrupted mid-ingest, delete all the
|
|
||||||
#associated sources if there are any and start over
|
|
||||||
|
|
||||||
#comment # to avoid the ecsv metadata lines
|
|
||||||
#switched to header=1000 and pyarrow backend for speed
|
|
||||||
df = pd.read_csv(
|
|
||||||
file_path,
|
|
||||||
#comment='#',
|
|
||||||
header=1000,
|
|
||||||
engine="pyarrow"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
gaiasource_fields = [field.name for field in GaiaSource._meta.get_fields()] #get fields from the model
|
if created:
|
||||||
|
new_files_count += 1
|
||||||
common_fields = [field for field in gaiasource_fields if field in df.columns] #find common fields between the df and the model
|
#show how many duplicates were already in db
|
||||||
#this way we can add new fields to the model and
|
self.stdout.write(f"File database populated. {len(csv_files) - new_files_count} were already in the database.")
|
||||||
df_filtered = df[common_fields] #the code will pick them up here
|
|
||||||
|
|
||||||
data_dict = df_filtered.to_dict(orient='records') #translate the df into a dict
|
|
||||||
|
|
||||||
gaia_source_instances = [
|
|
||||||
GaiaSource(**data, catalog_file=catalog_file) for data in data_dict #create gaiasource instances, set the foreignkey
|
|
||||||
]
|
|
||||||
|
|
||||||
await update_catalog_file_status(catalog_file, 'IN_PROGRESS')
|
|
||||||
|
|
||||||
await bulk_create_gaia_sources(gaia_source_instances) #bulk-create instances from the dict
|
|
||||||
|
|
||||||
await update_catalog_file_status(catalog_file,'INGESTED') #update the catalogfile instance status field to 'INGESTED'
|
|
||||||
|
|
||||||
ingested_files_count = await count_ingested_files()
|
|
||||||
|
|
||||||
current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") #Timestamp and progress print statement
|
|
||||||
print(f"[{current_time}] {ingested_files_count}/{len(catalog_files)}")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
asyncio.run(ingest_files())
|
input("Press Enter to continue...")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#bulk creation function
|
||||||
|
@sync_to_async
|
||||||
|
def bulk_create_gaia_sources(instances): #catalog status update to ingested after ingesting the sources
|
||||||
|
GaiaSource.objects.bulk_create(instances)
|
||||||
|
|
||||||
|
@sync_to_async
|
||||||
|
def update_catalog_file_status(catalog_file, status): #catalog file status updater
|
||||||
|
catalog_file.status = status
|
||||||
|
catalog_file.save()
|
||||||
|
|
||||||
|
@sync_to_async
|
||||||
|
def get_all_catalog_files(): #catalog file list getter
|
||||||
|
return list(CatalogFile.objects.all())
|
||||||
|
|
||||||
|
@sync_to_async
|
||||||
|
def delete_gaiasources_for_catalogfile(catalog_file): #for deleting the sources from partially ingested files in case of improper shutdown
|
||||||
|
GaiaSource.objects.filter(catalog_file=catalog_file).delete()
|
||||||
|
|
||||||
|
@sync_to_async
|
||||||
|
def count_ingested_files():
|
||||||
|
return CatalogFile.objects.filter(status='INGESTED').count()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S")
|
||||||
|
self.stdout.write(f"[{current_time}] Starting the data ingestion.")
|
||||||
|
|
||||||
|
#function that iterates over all catalog files and ingests sources from them
|
||||||
|
async def ingest_files():
|
||||||
|
|
||||||
|
#catalog_files = CatalogFile.objects.all()
|
||||||
|
|
||||||
|
catalog_files = await get_all_catalog_files()
|
||||||
|
|
||||||
|
for catalog_file in catalog_files:
|
||||||
|
|
||||||
|
if catalog_file.status == 'INGESTED':
|
||||||
|
self.stdout.write(f"Skipping {catalog_file.name} as it is already ingested.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
file_path = os.path.join(directory, catalog_file.name)
|
||||||
|
|
||||||
|
#self.stdout.write(file_path)
|
||||||
|
|
||||||
|
if os.path.exists(file_path): #check if the file exists at all just in case
|
||||||
|
pass
|
||||||
|
|
||||||
|
if catalog_file.status == 'IN_PROGRESS':
|
||||||
|
self.stdout.write(f"{catalog_file.name} seems to have been interrupted, starting over.")#if the file status is in_progress, it must've
|
||||||
|
await delete_gaiasources_for_catalogfile(catalog_file) #been interrupted mid-ingest, delete all the
|
||||||
|
#associated sources if there are any and start over
|
||||||
|
|
||||||
|
#comment # to avoid the ecsv metadata lines
|
||||||
|
#switched to header=1000 and pyarrow backend for speed
|
||||||
|
df = pd.read_csv(
|
||||||
|
file_path,
|
||||||
|
#comment='#',
|
||||||
|
header=1000,
|
||||||
|
engine="pyarrow"
|
||||||
|
)
|
||||||
|
|
||||||
|
gaiasource_fields = [field.name for field in GaiaSource._meta.get_fields()] #get fields from the model
|
||||||
|
|
||||||
|
common_fields = [field for field in gaiasource_fields if field in df.columns] #find common fields between the df and the model
|
||||||
|
#this way we can add new fields to the model and
|
||||||
|
df_filtered = df[common_fields] #the code will pick them up here
|
||||||
|
|
||||||
|
data_dict = df_filtered.to_dict(orient='records') #translate the df into a dict
|
||||||
|
|
||||||
|
gaia_source_instances = [
|
||||||
|
GaiaSource(**data, catalog_file=catalog_file) for data in data_dict #create gaiasource instances, set the foreignkey
|
||||||
|
]
|
||||||
|
|
||||||
|
await update_catalog_file_status(catalog_file, 'IN_PROGRESS')
|
||||||
|
|
||||||
|
await bulk_create_gaia_sources(gaia_source_instances) #bulk-create instances from the dict
|
||||||
|
|
||||||
|
await update_catalog_file_status(catalog_file,'INGESTED') #update the catalogfile instance status field to 'INGESTED'
|
||||||
|
|
||||||
|
ingested_files_count = await count_ingested_files()
|
||||||
|
|
||||||
|
current_time = (datetime.now() + timedelta(hours=3)).strftime("%H:%M:%S") #Timestamp and progress self.stdout.write statement
|
||||||
|
self.stdout.write(f"[{current_time}] {ingested_files_count}/{len(catalog_files)}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
asyncio.run(ingest_files())
|
@ -1,4 +1,4 @@
|
|||||||
# Generated by Django 5.1.1 on 2024-09-09 12:01
|
# Generated by Django 5.1.1 on 2024-09-10 11:15
|
||||||
|
|
||||||
import django.db.models.deletion
|
import django.db.models.deletion
|
||||||
import uuid
|
import uuid
|
||||||
@ -18,7 +18,7 @@ class Migration(migrations.Migration):
|
|||||||
fields=[
|
fields=[
|
||||||
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
|
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
|
||||||
('name', models.CharField(max_length=32)),
|
('name', models.CharField(max_length=32)),
|
||||||
('status', models.CharField(choices=[('PENDING', 'Pending'), ('INGESTED', 'Ingested'), ('INDEXED', 'Indexed')], default='PENDING', max_length=10)),
|
('status', models.CharField(choices=[('PENDING', 'Pending'), ('IN_PROGRESS', 'In Progress'), ('INGESTED', 'Ingested'), ('INDEXED', 'Indexed')], default='PENDING', max_length=11)),
|
||||||
],
|
],
|
||||||
),
|
),
|
||||||
migrations.CreateModel(
|
migrations.CreateModel(
|
||||||
|
@ -1,18 +0,0 @@
|
|||||||
# Generated by Django 5.1.1 on 2024-09-09 14:26
|
|
||||||
|
|
||||||
from django.db import migrations, models
|
|
||||||
|
|
||||||
|
|
||||||
class Migration(migrations.Migration):
|
|
||||||
|
|
||||||
dependencies = [
|
|
||||||
('sample_app', '0001_initial'),
|
|
||||||
]
|
|
||||||
|
|
||||||
operations = [
|
|
||||||
migrations.AlterField(
|
|
||||||
model_name='catalogfile',
|
|
||||||
name='status',
|
|
||||||
field=models.CharField(choices=[('PENDING', 'Pending'), ('IN_PROGRESS', 'In Progress'), ('INGESTED', 'Ingested'), ('INDEXED', 'Indexed')], default='PENDING', max_length=11),
|
|
||||||
),
|
|
||||||
]
|
|
@ -1,7 +0,0 @@
|
|||||||
from rest_framework import serializers
|
|
||||||
from .models import GaiaSource
|
|
||||||
|
|
||||||
class GaiaSourceSerializer(serializers.ModelSerializer):
|
|
||||||
class Meta:
|
|
||||||
model = GaiaSource
|
|
||||||
fields = '__all__'
|
|
@ -1,7 +1 @@
|
|||||||
from django.urls import path
|
|
||||||
from .views import GaiaSourceListCreate, GaiaSourceDetail
|
|
||||||
|
|
||||||
urlpatterns = [
|
|
||||||
path('GaiaSource/', GaiaSourceListCreate.as_view(), name='GaiaSource-list-create'),
|
|
||||||
path('GaiaSource/<int:pk>/', GaiaSourceDetail.as_view(), name='GaiaSource-detail'),
|
|
||||||
]
|
|
||||||
|
@ -1,13 +1 @@
|
|||||||
from django.shortcuts import render
|
from django.shortcuts import render
|
||||||
|
|
||||||
from rest_framework import generics
|
|
||||||
from .models import GaiaSource
|
|
||||||
from .serializers import GaiaSourceSerializer
|
|
||||||
|
|
||||||
class GaiaSourceListCreate(generics.ListCreateAPIView):
|
|
||||||
queryset = GaiaSource.objects.all()
|
|
||||||
serializer_class = GaiaSourceSerializer
|
|
||||||
|
|
||||||
class GaiaSourceDetail(generics.RetrieveUpdateDestroyAPIView):
|
|
||||||
queryset = GaiaSource.objects.all()
|
|
||||||
serializer_class = GaiaSourceSerializer
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user