You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

369 lines
20 KiB

import os
import json
import csv
from django.core.management.base import BaseCommand
from django.core.files import File
from django.db import transaction
from django.conf import settings
# Import all necessary models
from apps.hadis.models import (
HadisCategory, HadisSect, HadisStatus, HadisTag, Hadis,
HadisCorrection, HadisReference, ReferenceImage, HadisTransmitter,
BookReference, BookReferenceImage, BookReferenceDocument, BookAuthor, BookSubjectArea,
Transmitters, NarratorLayer, TransmitterReliability, OpinionStatus,
TransmitterOpinion, TransmitterOriginalText, OriginalTextReference, OriginalTextReferenceImage,
HadisInterpretation, InterpretationReference, InterpretationReferenceImage, CorrectionReference, CorrectionReferenceImage
)
class Command(BaseCommand):
help = 'Import legacy Hadith data from JSON, CSV, and Media folders'
def add_arguments(self, parser):
parser.add_argument('base_dir', type=str, help='Absolute path to the "тестовая база данных" directory')
def wrap_lang(self, text, lang="ru"):
"""Helper to format strings into the [ {'language_code': lang, 'text': text} ] schema.
Always returns a valid dictionary to bypass Django's blank=False validators."""
if text is None:
text = ""
return [{"language_code": lang, "text": str(text).strip()}]
@transaction.atomic
def handle(self, *args, **kwargs):
base_dir = kwargs['base_dir']
if not os.path.exists(base_dir):
self.stderr.write(self.style.ERROR(f'Directory not found: {base_dir}'))
return
self.stdout.write(self.style.SUCCESS(f'Starting import from: {base_dir}'))
# Paths
aut_ui_path = os.path.join(base_dir, 'AUT_UI.csv')
bib_path = os.path.join(base_dir, 'bib.csv')
narrators_path = os.path.join(base_dir, 'narrators.json')
tathir_path = os.path.join(base_dir, 'tathir.json')
# --- PHASE 1: SCHOLARS & BOOKS ---
self.stdout.write(self.style.WARNING('\n--- PHASE 1: Loading Scholars & Books ---'))
scholars_map = {}
if os.path.exists(aut_ui_path):
with open(aut_ui_path, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
if len(row) >= 3:
scholars_map[row[0].strip()] = {
"ar": row[1].strip(),
"ru": row[2].strip()
}
self.stdout.write(f'Loaded {len(scholars_map)} scholars into memory.')
if os.path.exists(bib_path):
with open(bib_path, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
if len(row) < 5: continue
base_legacy_id = row[0].strip()
author_name = row[2].strip()
base_title = row[3].strip()
# Extract total volumes (Column 11 / Index 10)
vol_str = row[10].strip() if len(row) > 10 else ''
try:
total_vols = int(vol_str) if vol_str.isdigit() else 1
except ValueError:
total_vols = 1
# Create a BookReference for EVERY volume
for v in range(1, total_vols + 1):
# Generate unique ID and Title for multi-volume books
is_multi_vol = total_vols > 1
legacy_id = f"{base_legacy_id}-v{v}" if is_multi_vol else base_legacy_id
title_text = f"{base_title} (Vol {v})" if is_multi_vol else base_title
book, _ = BookReference.objects.update_or_create(
legacy_id=legacy_id,
defaults={
'title': self.wrap_lang(title_text),
'number_of_volumes': total_vols,
'volume': str(v),
'year_of_publication': row[9].strip() if len(row) > 9 else '',
'source_url': row[11].strip() if len(row) > 11 else '',
'description': self.wrap_lang(row[12].strip() if len(row) > 12 else ''),
'publisher': self.wrap_lang(row[5].strip() if len(row) > 5 else ''),
'language': self.wrap_lang('')
}
)
# Author
if author_name:
author, _ = BookAuthor.objects.get_or_create(name=self.wrap_lang(author_name))
book.authors.add(author)
# Scan Book Folder for Specific Volume Images and PDFs
book_folder = os.path.join(base_dir, 'books', base_legacy_id)
if os.path.exists(book_folder):
vol_num_str = str(v)
vol_padded_str = str(v).zfill(2) # "1" -> "01"
for root, _, files in os.walk(book_folder):
folder_name = os.path.basename(root)
for file in files:
file_path = os.path.join(root, file)
file_lower = file.lower()
# Attach PDF if it matches "1.pdf" or "01.pdf"
if file_lower.endswith('.pdf'):
if file_lower in [f"{vol_num_str}.pdf", f"{vol_padded_str}.pdf"] or not is_multi_vol:
with open(file_path, 'rb') as doc_f:
doc = BookReferenceDocument(book_reference=book, volume=vol_num_str, title=file)
doc.file.save(file, File(doc_f), save=True)
# Attach Images if they are in folder "1" or "01"
elif file_lower.endswith(('.png', '.jpg', '.jpeg', '.gif')):
if folder_name in [vol_num_str, vol_padded_str] or not is_multi_vol:
with open(file_path, 'rb') as img_f:
img = BookReferenceImage(book_reference=book, volume=vol_num_str)
img.image.save(file, File(img_f), save=True)
self.stdout.write(self.style.SUCCESS('Books (split by volumes) loaded successfully.'))
# --- PHASE 2: NARRATORS ---
self.stdout.write(self.style.WARNING('\n--- PHASE 2: Loading Narrators ---'))
if os.path.exists(narrators_path):
with open(narrators_path, 'r', encoding='utf-8') as f:
n_data_list = json.load(f).get('narrators', [])
for n_data in n_data_list:
legacy_id = n_data.get('id')
legacy_number = int(n_data.get('narrator_number')) if str(n_data.get('narrator_number')).isdigit() else None
info = n_data.get('info', {})
ar_info = info.get('arabic', {})
reliability, _ = TransmitterReliability.objects.get_or_create(
title=self.wrap_lang(n_data.get('reliability', 'Unknown'))
)
generation = int(n_data.get('generation')) if str(n_data.get('generation')).isdigit() else None
if generation:
NarratorLayer.objects.get_or_create(
number=generation,
defaults={
'name': self.wrap_lang(f'Layer {generation}'),
'description': self.wrap_lang('')
}
)
# Create Transmitter
transmitter, _ = Transmitters.objects.update_or_create(
legacy_id=legacy_id,
defaults={
'legacy_number': legacy_number,
'full_name': self.wrap_lang(info.get('name', ''), 'ru') + self.wrap_lang(ar_info.get('name', ''), 'ar'),
'known_as': self.wrap_lang(info.get('known_name', ''), 'ru') + self.wrap_lang(ar_info.get('known_name', ''), 'ar'),
'kunya': self.wrap_lang(info.get('kunya', ''), 'ru') + self.wrap_lang(ar_info.get('kunya', ''), 'ar'),
'nickname': self.wrap_lang(info.get('nickname', ''), 'ru') + self.wrap_lang(ar_info.get('nickname', ''), 'ar'),
'origin': self.wrap_lang(info.get('origin', ''), 'ru') + self.wrap_lang(ar_info.get('origin', ''), 'ar'),
'lived_in': self.wrap_lang(info.get('city_of_residence', ''), 'ru') + self.wrap_lang(ar_info.get('city_of_residence', ''), 'ar'),
'died_in': self.wrap_lang(info.get('city_of_death', ''), 'ru') + self.wrap_lang(ar_info.get('city_of_death', ''), 'ar'),
'description': self.wrap_lang(''),
'generation': generation,
'reliability': reliability,
'in_sahih_bukhari': n_data.get('transmitted_to_bukhari', False),
'in_sahih_muslim': n_data.get('transmitted_to_muslim', False),
'relatives_raw': info.get('relatives', {})
}
)
# Opinions
for op in n_data.get('strengthened_weakened', {}).get('review', []):
author_ui = op.get('author_ui')
scholar_data = scholars_map.get(author_ui, {"ar": author_ui, "ru": author_ui})
TransmitterOpinion.objects.get_or_create(
transmitter=transmitter,
opinion_text=self.wrap_lang(op.get('quote_original', ''), 'ar') + self.wrap_lang(op.get('quote_translated', ''), 'ru'),
scholar_name=self.wrap_lang(scholar_data['ar'], 'ar') + self.wrap_lang(scholar_data['ru'], 'ru')
)
# Original Texts
for text_data in n_data.get('excerpts', []):
orig_text, _ = TransmitterOriginalText.objects.get_or_create(
transmitter=transmitter,
title=self.wrap_lang(text_data.get('title')),
text=self.wrap_lang(text_data.get('text'), 'ar'),
translation=self.wrap_lang(text_data.get('translation'), 'ru')
)
for ed in text_data.get('editions', []):
book_ref = self._get_book_volume(ed.get('book_id'), ed.get('volume'))
ref_obj, _ = OriginalTextReference.objects.get_or_create(
original_text=orig_text, book_reference=book_ref,
volume=ed.get('volume'), page=ed.get('page'), url=ed.get('url')
)
folder = ed.get('screenshots_folder')
if folder:
self._attach_images(os.path.join(base_dir, 'screens_trx', legacy_id, folder), OriginalTextReferenceImage, ref_obj)
self.stdout.write(self.style.SUCCESS('Narrators loaded successfully.'))
# --- PHASE 3: HADITHS (Arguments, Corrections, Interpretations) ---
self.stdout.write(self.style.WARNING('\n--- PHASE 3: Loading Hadiths ---'))
default_sect, _ = HadisSect.objects.get_or_create(
sect_type='sunni',
defaults={
'title': self.wrap_lang('Sunni'),
'description': self.wrap_lang('')
}
)
if os.path.exists(tathir_path):
with open(tathir_path, 'r', encoding='utf-8') as f:
materials = json.load(f).get('materials', [])
# Map corrections to their parent hadiths
correction_to_hadith_map = {}
for item in materials:
if item.get('type') == 'arguments':
for conf_id in item.get('confirmation', []):
correction_to_hadith_map[conf_id] = item.get('id')
for item in materials:
i_type = item.get('type')
# A: BASE HADITHS
if i_type == 'arguments':
cat_str = item.get('category', [''])[0]
category, _ = HadisCategory.objects.get_or_create(
title=self.wrap_lang(cat_str),
defaults={
'sect': default_sect,
'source_type': item.get('subtype', 'hadith') or 'hadith',
'description': self.wrap_lang('')
}
)
status, _ = HadisStatus.objects.get_or_create(
title=self.wrap_lang(item.get('authenticity', '')),
defaults={'description': self.wrap_lang('')}
)
hadis, _ = Hadis.objects.update_or_create(
legacy_id=item.get('id'),
defaults={
'category': category,
'hadis_status': status,
'title': self.wrap_lang(item.get('aliases', [''])[0] if item.get('aliases') else ''),
'title_narrator': self.wrap_lang(item.get('aliases', [''])[0] if item.get('aliases') else ''),
'description': self.wrap_lang(''),
'explanation': self.wrap_lang(''),
'address': self.wrap_lang(''),
'hadis_status_text': self.wrap_lang(''),
'text': item.get('original_text', ''),
'translation': self.wrap_lang(item.get('translation', ''), 'ru')
}
)
raw_chain = item.get('chain', [])
chain_arrays = []
if raw_chain:
# Normalize: If it's a flat list of ints, wrap it in a list so it's a 2D array
if isinstance(raw_chain[0], int):
chain_arrays = [raw_chain]
else:
chain_arrays = raw_chain
for chain_idx, narrator_ids in enumerate(chain_arrays):
for order_idx, n_id in enumerate(narrator_ids):
transmitter = Transmitters.objects.filter(legacy_number=n_id).first()
if transmitter:
layer = NarratorLayer.objects.filter(number=transmitter.generation).first()
HadisTransmitter.objects.get_or_create(
hadis=hadis, transmitter=transmitter, chain_index=chain_idx, order=order_idx,
defaults={'narrator_layer': layer, 'status': transmitter.reliability}
)
# Editions & Images
for ed in item.get('editions', []):
book = self._get_book_volume(ed.get('book_id'), ed.get('volume'))
href, _ = HadisReference.objects.get_or_create(
hadis=hadis, book_reference=book,
defaults={'hadith_number': str(ed.get('hadith_number', '')), 'description': self.wrap_lang('')}
)
if ed.get('screenshots_folder'):
self._attach_images(os.path.join(base_dir, 'screens', item.get('id'), ed.get('screenshots_folder')), ReferenceImage, href , field_name='thumbnail')
# B: CORRECTIONS
elif i_type == 'authenticity_analysis':
parent_id = correction_to_hadith_map.get(item.get('id'))
parent_hadith = Hadis.objects.filter(legacy_id=parent_id).first()
if parent_hadith:
corr, _ = HadisCorrection.objects.get_or_create(
hadis=parent_hadith, legacy_id=item.get('id'),
defaults={
'title': self.wrap_lang(''),
'text': item.get('original_text', ''), # Directly mapped to TextField
'translation': self.wrap_lang(item.get('translation', ''), 'ru')
}
)
for ed in item.get('editions', []):
book = self._get_book_volume(ed.get('book_id'), ed.get('volume'))
cref, _ = CorrectionReference.objects.get_or_create(correction=corr, book_reference=book, defaults={'hadith_number': str(ed.get('hadith_number', ''))})
if ed.get('screenshots_folder'):
self._attach_images(os.path.join(base_dir, 'screens', item.get('id'), ed.get('screenshots_folder')), CorrectionReferenceImage, cref)
# C: INTERPRETATIONS
elif i_type == 'interpretation':
cat_str = item.get('category', [''])[0] if item.get('category') else ''
category = HadisCategory.objects.filter(title__contains=[{'text': cat_str}]).first()
if category:
interp, _ = HadisInterpretation.objects.get_or_create(
category=category, legacy_id=item.get('id'),
defaults={
'title': self.wrap_lang(''),
'text': item.get('original_text', ''),
'translation': self.wrap_lang(item.get('translation', ''), 'ru')
}
)
for ed in item.get('editions', []):
book = self._get_book_volume(ed.get('book_id'), ed.get('volume'))
iref, _ = InterpretationReference.objects.get_or_create(interpretation=interp, book_reference=book, defaults={'hadith_number': str(ed.get('hadith_number', ''))})
if ed.get('screenshots_folder'):
self._attach_images(os.path.join(base_dir, 'screens', item.get('id'), ed.get('screenshots_folder')), InterpretationReferenceImage, iref)
self.stdout.write(self.style.SUCCESS('\nAll Hadiths, Corrections, and Interpretations Imported Successfully!'))
def _get_book_volume(self, book_id, volume_str):
"""Finds the specific volume of a book, with fallbacks."""
if not book_id: return None
# 1. Try to find specific volume (e.g., uuid-v2)
if volume_str:
vol_clean = ''.join(filter(str.isdigit, str(volume_str))) # extracts "2" from "Vol 2"
if vol_clean:
book = BookReference.objects.filter(legacy_id=f"{book_id}-v{vol_clean}").first()
if book: return book
# 2. Fallback: Find the base book (single volume) or the first volume available
return BookReference.objects.filter(legacy_id__startswith=book_id).first()
def _attach_images(self, folder_path, ImageModelClass, reference_instance, field_name='image'):
"""Helper to safely scan a folder and attach images to a specific reference instance."""
if os.path.exists(folder_path):
for i, filename in enumerate(sorted(os.listdir(folder_path))):
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')):
file_path = os.path.join(folder_path, filename)
with open(file_path, 'rb') as f:
img_obj = ImageModelClass(reference=reference_instance, priority=i)
# Dynamically grab the correct field ('image' or 'thumbnail')
image_field = getattr(img_obj, field_name)
image_field.save(filename, File(f), save=True)