You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

422 lines
24 KiB

import os
import json
import csv
import re
from django.core.management.base import BaseCommand
from django.core.files import File
from django.db import transaction
from django.conf import settings
from apps.hadis.models import (
HadisCategory, HadisSect, HadisStatus, HadisTag, Hadis,
HadisCorrection, HadisReference, ReferenceImage, HadisTransmitter,
BookReference, BookReferenceImage, BookReferenceDocument, BookAuthor, BookSubjectArea,
Transmitters, NarratorLayer, TransmitterReliability, OpinionStatus,
TransmitterOpinion, TransmitterOriginalText, OriginalTextReference, OriginalTextReferenceImage,
HadisInterpretation, InterpretationReference, InterpretationReferenceImage, CorrectionReference, CorrectionReferenceImage
)
class Command(BaseCommand):
help = 'Import legacy Hadith data from JSON, CSV, and Media folders'
def add_arguments(self, parser):
parser.add_argument('base_dir', type=str, help='Absolute path to the "тестовая база данных" directory')
def wrap_lang(self, text, lang="ru"):
if text is None: text = ""
return [{"language_code": lang, "text": str(text).strip()}]
@transaction.atomic
def handle(self, *args, **kwargs):
base_dir = kwargs['base_dir']
if not os.path.exists(base_dir):
self.stderr.write(self.style.ERROR(f'Directory not found: {base_dir}'))
return
self.stdout.write(self.style.SUCCESS(f'Starting import from: {base_dir}'))
# Paths
aut_ui_path = os.path.join(base_dir, 'AUT_UI.csv')
bib_path = os.path.join(base_dir, 'bib.csv')
narrators_path = os.path.join(base_dir, 'narrators.json')
tathir_path = os.path.join(base_dir, 'tathir.json')
# --- PRE-SCAN TATHIR.JSON FOR CITED VOLUMES ---
cited_book_volumes = {}
if os.path.exists(tathir_path):
with open(tathir_path, 'r', encoding='utf-8') as f:
t_data = json.load(f).get('materials', [])
for item in t_data:
for ed in item.get('editions', []):
b_id = ed.get('book_id')
b_vol = str(ed.get('volume')).strip() if ed.get('volume') is not None else ''
if b_vol.lower() == 'none': b_vol = ''
if b_id:
if b_id not in cited_book_volumes:
cited_book_volumes[b_id] = set()
if b_vol:
try: cited_book_volumes[b_id].add(str(int(b_vol)))
except ValueError: cited_book_volumes[b_id].add(b_vol)
# --- PRE-FLIGHT CLEANUP ---
self.stdout.write(self.style.WARNING('\n--- PRE-FLIGHT: Cleaning up old legacy books ---'))
BookReference.objects.exclude(legacy_id__isnull=True).exclude(legacy_id__exact='').delete()
# --- PHASE 1: SCHOLARS & BOOKS ---
self.stdout.write(self.style.WARNING('\n--- PHASE 1: Loading Scholars & Books ---'))
scholars_map = {}
if os.path.exists(aut_ui_path):
with open(aut_ui_path, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
if len(row) >= 3:
scholars_map[row[0].strip()] = {"ar": row[1].strip(), "ru": row[2].strip()}
if os.path.exists(bib_path):
with open(bib_path, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
if len(row) < 5: continue
base_legacy_id = row[0].strip()
author_name = row[2].strip()
base_title = row[3].strip()
vol_str = row[10].strip() if len(row) > 10 else ''
try: total_vols_int = int(vol_str) if vol_str.isdigit() else 1
except ValueError: total_vols_int = 1
existing_vols = set()
book_folder = os.path.join(base_dir, 'books', base_legacy_id)
if os.path.exists(book_folder):
for item in os.listdir(book_folder):
if os.path.isdir(os.path.join(book_folder, item)):
try: existing_vols.add(str(int(item)))
except ValueError: existing_vols.add(item)
volumes_to_create = existing_vols.union(cited_book_volumes.get(base_legacy_id, set()))
if not volumes_to_create: volumes_to_create = {''}
for v in volumes_to_create:
legacy_id = f"{base_legacy_id}-v{v}" if v else base_legacy_id
title_text = f"{base_title} (Vol {v})" if v else base_title
book, _ = BookReference.objects.update_or_create(
legacy_id=legacy_id,
defaults={
'title': self.wrap_lang(title_text),
'order': int(row[1]) if len(row) > 1 and row[1].isdigit() else 0,
'researcher': self.wrap_lang(row[4].strip() if len(row) > 4 else ''),
'publisher': self.wrap_lang(row[5].strip() if len(row) > 5 else ''),
'city_of_publication': self.wrap_lang(row[6].strip() if len(row) > 6 else ''),
'country_of_publication': self.wrap_lang(row[7].strip() if len(row) > 7 else ''),
'edition_number': row[8].strip() if len(row) > 8 else '',
'year_of_publication': row[9].strip() if len(row) > 9 else '',
'number_of_volumes': total_vols_int,
'volume': v,
'source_url': row[11].strip() if len(row) > 11 else '',
'description': self.wrap_lang(row[12].strip() if len(row) > 12 else ''),
'language': self.wrap_lang('')
}
)
if author_name:
author, _ = BookAuthor.objects.get_or_create(name=self.wrap_lang(author_name))
book.authors.add(author)
# Map Book Tags
if len(row) > 13 and row[13].strip():
for t in row[13].split(','):
if t.strip():
btag, _ = BookSubjectArea.objects.get_or_create(title=self.wrap_lang(t.strip()))
book.subject_area.add(btag)
# Attach Media
if os.path.exists(book_folder):
for root, _, files in os.walk(book_folder):
folder_name = os.path.basename(root)
is_root = (root == book_folder)
for file in files:
file_lower = file.lower()
file_path = os.path.join(root, file)
if file_lower.endswith('.pdf'):
if v and file_lower in [f"{v}.pdf", f"{v.zfill(2)}.pdf"]:
with open(file_path, 'rb') as doc_f:
doc = BookReferenceDocument(book_reference=book, volume=v, title=file)
doc.file.save(file, File(doc_f), save=True)
elif not v and is_root and not file_lower[0].isdigit():
with open(file_path, 'rb') as doc_f:
doc = BookReferenceDocument(book_reference=book, volume=v, title=file)
doc.file.save(file, File(doc_f), save=True)
elif file_lower.endswith(('.png', '.jpg', '.jpeg', '.gif')):
if v and not is_root and folder_name.lstrip('0') == v.lstrip('0'):
with open(file_path, 'rb') as img_f:
img = BookReferenceImage(book_reference=book, volume=v)
img.image.save(file, File(img_f), save=True)
elif not v and is_root:
with open(file_path, 'rb') as img_f:
img = BookReferenceImage(book_reference=book, volume=v)
img.image.save(file, File(img_f), save=True)
self.stdout.write(self.style.SUCCESS('Books loaded successfully.'))
# --- PHASE 2: NARRATORS ---
self.stdout.write(self.style.WARNING('\n--- PHASE 2: Loading Narrators ---'))
if os.path.exists(narrators_path):
with open(narrators_path, 'r', encoding='utf-8') as f:
n_data_list = json.load(f).get('narrators', [])
for n_data in n_data_list:
legacy_id = n_data.get('id')
legacy_number = int(n_data.get('narrator_number')) if str(n_data.get('narrator_number')).isdigit() else None
info = n_data.get('info', {})
ar_info = info.get('arabic', {})
reliability, _ = TransmitterReliability.objects.get_or_create(title=self.wrap_lang(n_data.get('reliability', 'Unknown')))
generation = int(n_data.get('generation')) if str(n_data.get('generation')).isdigit() else None
if generation:
NarratorLayer.objects.get_or_create(number=generation, defaults={'name': self.wrap_lang(f'Layer {generation}'), 'description': self.wrap_lang('')})
# Safe Age Extraction
age_str = info.get('age', '')
age_nums = re.findall(r'\d+', str(age_str))
age_val = int(age_nums[0]) if age_nums else None
# Madhhab Translation
madhhab_list = n_data.get('madhab', [])
madhhab_val = Transmitters.MadhhabChoices.UNKNOWN
if madhhab_list:
m_str = str(madhhab_list[0]).lower()
if 'шиит' in m_str: madhhab_val = Transmitters.MadhhabChoices.SHIA
elif 'суннит' in m_str: madhhab_val = Transmitters.MadhhabChoices.SUNNI
else: madhhab_val = Transmitters.MadhhabChoices.OTHER
transmitter, _ = Transmitters.objects.update_or_create(
legacy_id=legacy_id,
defaults={
'legacy_number': legacy_number,
'full_name': self.wrap_lang(info.get('name', ''), 'ru') + self.wrap_lang(ar_info.get('name', ''), 'ar'),
'known_as': self.wrap_lang(info.get('known_name', ''), 'ru') + self.wrap_lang(ar_info.get('known_name', ''), 'ar'),
'kunya': self.wrap_lang(info.get('kunya', ''), 'ru') + self.wrap_lang(ar_info.get('kunya', ''), 'ar'),
'nickname': self.wrap_lang(info.get('nickname', ''), 'ru') + self.wrap_lang(ar_info.get('nickname', ''), 'ar'),
'origin': self.wrap_lang(info.get('origin', ''), 'ru') + self.wrap_lang(ar_info.get('origin', ''), 'ar'),
'lived_in': self.wrap_lang(info.get('city_of_residence', ''), 'ru') + self.wrap_lang(ar_info.get('city_of_residence', ''), 'ar'),
'died_in': self.wrap_lang(info.get('city_of_death', ''), 'ru') + self.wrap_lang(ar_info.get('city_of_death', ''), 'ar'),
'description': self.wrap_lang(''),
'generation': generation,
'reliability': reliability,
'in_sahih_bukhari': n_data.get('transmitted_to_bukhari', False),
'in_sahih_muslim': n_data.get('transmitted_to_muslim', False),
'relatives_raw': info.get('relatives', {}),
# NEW FIELDS MAPPED
'freed_slave_of': self.wrap_lang(info.get('freed_slave_of', ''), 'ru') + self.wrap_lang(ar_info.get('freed_slave_of', ''), 'ar'),
'occupation': self.wrap_lang(info.get('occupation', ''), 'ru') + self.wrap_lang(ar_info.get('occupation', ''), 'ar'),
'features': self.wrap_lang(info.get('features', ''), 'ru') + self.wrap_lang(ar_info.get('features', ''), 'ar'),
'birth_year_hijri': str(info.get('birth_year', '')),
'death_year_hijri': str(info.get('death_year', '')),
'age_at_death': age_val,
'tags': n_data.get('tags', []),
'madhhab': madhhab_val,
}
)
for op in n_data.get('strengthened_weakened', {}).get('review', []):
author_ui = op.get('author_ui')
scholar_data = scholars_map.get(author_ui, {"ar": author_ui, "ru": author_ui})
TransmitterOpinion.objects.get_or_create(
transmitter=transmitter,
opinion_text=self.wrap_lang(op.get('quote_original', ''), 'ar') + self.wrap_lang(op.get('quote_translated', ''), 'ru'),
scholar_name=self.wrap_lang(scholar_data['ar'], 'ar') + self.wrap_lang(scholar_data['ru'], 'ru')
)
for text_data in n_data.get('excerpts', []):
orig_text, _ = TransmitterOriginalText.objects.get_or_create(
transmitter=transmitter,
title=self.wrap_lang(text_data.get('title')),
text=self.wrap_lang(text_data.get('text'), 'ar'),
translation=self.wrap_lang(text_data.get('translation'), 'ru')
)
for ed in text_data.get('editions', []):
book_ref = self._get_book_volume(ed.get('book_id'), ed.get('volume'))
ref_obj, _ = OriginalTextReference.objects.update_or_create(
original_text=orig_text, book_reference=book_ref,
defaults={
'volume': str(ed.get('volume', '')),
'page': str(ed.get('pages', '')), # Fixed from 'page'
'url': ed.get('url', '')
}
)
folder = ed.get('screenshots_folder')
if folder:
self._attach_images(os.path.join(base_dir, 'screens_trx', legacy_id, folder), OriginalTextReferenceImage, ref_obj)
self.stdout.write(self.style.SUCCESS('Narrators loaded successfully.'))
# --- PHASE 3: HADITHS ---
self.stdout.write(self.style.WARNING('\n--- PHASE 3: Loading Hadiths ---'))
default_sect, _ = HadisSect.objects.get_or_create(
sect_type='sunni',
defaults={'title': self.wrap_lang('Sunni'), 'description': self.wrap_lang('')}
)
if os.path.exists(tathir_path):
with open(tathir_path, 'r', encoding='utf-8') as f:
materials = json.load(f).get('materials', [])
correction_to_hadith_map = {}
for item in materials:
if item.get('type') == 'arguments':
for conf_id in item.get('confirmation', []):
correction_to_hadith_map[conf_id] = item.get('id')
for item in materials:
i_type = item.get('type')
# A: BASE HADITHS
if i_type == 'arguments':
cat_str = item.get('category', [''])[0]
category, _ = HadisCategory.objects.get_or_create(
title=self.wrap_lang(cat_str),
defaults={'sect': default_sect, 'source_type': item.get('subtype', 'hadith') or 'hadith', 'description': self.wrap_lang('')}
)
status, _ = HadisStatus.objects.get_or_create(
title=self.wrap_lang(item.get('authenticity', '')),
defaults={'description': self.wrap_lang('')}
)
hadis, _ = Hadis.objects.update_or_create(
legacy_id=item.get('id'),
defaults={
'category': category, 'hadis_status': status,
'title': self.wrap_lang(item.get('aliases', [''])[0] if item.get('aliases') else ''),
'title_narrator': self.wrap_lang(item.get('aliases', [''])[0] if item.get('aliases') else ''),
'description': self.wrap_lang(''),
'explanation': self.wrap_lang(''),
'address': self.wrap_lang(''),
'hadis_status_text': self.wrap_lang(''),
'text': item.get('original_text', ''),
'translation': self.wrap_lang(item.get('translation', ''), 'ru')
}
)
# Map Hadith Tags
hadis.tags.clear()
for tag_str in item.get('tags', []):
htag, _ = HadisTag.objects.get_or_create(title=self.wrap_lang(tag_str))
hadis.tags.add(htag)
raw_chain = item.get('chain', [])
chain_arrays = []
if raw_chain:
if isinstance(raw_chain[0], int): chain_arrays = [raw_chain]
else: chain_arrays = raw_chain
for chain_idx, narrator_ids in enumerate(chain_arrays):
for order_idx, n_id in enumerate(narrator_ids):
transmitter = Transmitters.objects.filter(legacy_number=n_id).first()
if transmitter:
layer = NarratorLayer.objects.filter(number=transmitter.generation).first()
HadisTransmitter.objects.get_or_create(
hadis=hadis, transmitter=transmitter, chain_index=chain_idx, order=order_idx,
defaults={'narrator_layer': layer, 'status': transmitter.reliability}
)
for ed in item.get('editions', []):
book = self._get_book_volume(ed.get('book_id'), ed.get('volume'))
href, _ = HadisReference.objects.update_or_create(
hadis=hadis, book_reference=book,
defaults={
'hadith_number': str(ed.get('hadith_number', '')),
'description': self.wrap_lang(''),
'volume': str(ed.get('volume', '')),
'pages': str(ed.get('pages', '')),
'url': ed.get('url', '')
}
)
if ed.get('screenshots_folder'):
self._attach_images(os.path.join(base_dir, 'screens', item.get('id'), ed.get('screenshots_folder')), ReferenceImage, href, field_name='thumbnail')
# B: CORRECTIONS
elif i_type == 'authenticity_analysis':
parent_id = correction_to_hadith_map.get(item.get('id'))
parent_hadith = Hadis.objects.filter(legacy_id=parent_id).first()
if parent_hadith:
# CHANGE TO update_or_create HERE:
corr, _ = HadisCorrection.objects.update_or_create(
hadis=parent_hadith, legacy_id=item.get('id'),
defaults={
'title': self.wrap_lang(''),
'text': item.get('original_text', ''),
'translation': self.wrap_lang(item.get('translation', ''), 'ru')
}
)
for ed in item.get('editions', []):
book = self._get_book_volume(ed.get('book_id'), ed.get('volume'))
cref, _ = CorrectionReference.objects.update_or_create(
correction=corr, book_reference=book,
defaults={
'hadith_number': str(ed.get('hadith_number', '')),
'volume': str(ed.get('volume', '')),
'pages': str(ed.get('pages', '')),
'url': ed.get('url', '')
}
)
if ed.get('screenshots_folder'):
self._attach_images(os.path.join(base_dir, 'screens', item.get('id'), ed.get('screenshots_folder')), CorrectionReferenceImage, cref)
# C: INTERPRETATIONS
elif i_type == 'interpretation':
cat_str = item.get('category', [''])[0] if item.get('category') else ''
category = HadisCategory.objects.filter(title__contains=[{'text': cat_str}]).first()
if category:
# CHANGE TO update_or_create HERE:
interp, _ = HadisInterpretation.objects.update_or_create(
category=category, legacy_id=item.get('id'),
defaults={
'title': self.wrap_lang(''),
'text': item.get('original_text', ''),
'translation': self.wrap_lang(item.get('translation', ''), 'ru')
}
)
for ed in item.get('editions', []):
book = self._get_book_volume(ed.get('book_id'), ed.get('volume'))
iref, _ = InterpretationReference.objects.update_or_create(
interpretation=interp, book_reference=book,
defaults={
'hadith_number': str(ed.get('hadith_number', '')),
'volume': str(ed.get('volume', '')),
'pages': str(ed.get('pages', '')),
'url': ed.get('url', '')
}
)
if ed.get('screenshots_folder'):
self._attach_images(os.path.join(base_dir, 'screens', item.get('id'), ed.get('screenshots_folder')), InterpretationReferenceImage, iref)
self.stdout.write(self.style.SUCCESS('\nAll Hadiths, Corrections, and Interpretations Imported Successfully!'))
def _get_book_volume(self, book_id, volume_str):
"""Finds the specific volume of a book, with fallbacks."""
if not book_id: return None
if volume_str:
vol_clean = ''.join(filter(str.isdigit, str(volume_str)))
if vol_clean:
book = BookReference.objects.filter(legacy_id=f"{book_id}-v{vol_clean}").first()
if book: return book
return BookReference.objects.filter(legacy_id__startswith=book_id).first()
def _attach_images(self, folder_path, ImageModelClass, reference_instance, field_name='image'):
"""Helper to safely scan a folder and attach images to a specific reference instance."""
if os.path.exists(folder_path):
for i, filename in enumerate(sorted(os.listdir(folder_path))):
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')):
file_path = os.path.join(folder_path, filename)
with open(file_path, 'rb') as f:
img_obj = ImageModelClass(reference=reference_instance, priority=i)
image_field = getattr(img_obj, field_name)
image_field.save(filename, File(f), save=True)