import os import json import csv import re from django.core.management.base import BaseCommand from django.core.files import File from django.db import transaction from django.conf import settings from apps.hadis.models import ( HadisCategory, HadisSect, HadisStatus, HadisTag, Hadis, HadisCorrection, HadisReference, ReferenceImage, HadisTransmitter, BookReference, BookReferenceImage, BookReferenceDocument, BookAuthor, BookSubjectArea, Transmitters, NarratorLayer, TransmitterReliability, OpinionStatus, TransmitterOpinion, TransmitterOriginalText, OriginalTextReference, OriginalTextReferenceImage, HadisInterpretation, InterpretationReference, InterpretationReferenceImage, CorrectionReference, CorrectionReferenceImage ) class Command(BaseCommand): help = 'Import legacy Hadith data from JSON, CSV, and Media folders' def add_arguments(self, parser): parser.add_argument('base_dir', type=str, help='Absolute path to the "тестовая база данных" directory') def wrap_lang(self, text, lang="ru"): if text is None: text = "" return [{"language_code": lang, "text": str(text).strip()}] @transaction.atomic def handle(self, *args, **kwargs): base_dir = kwargs['base_dir'] if not os.path.exists(base_dir): self.stderr.write(self.style.ERROR(f'Directory not found: {base_dir}')) return self.stdout.write(self.style.SUCCESS(f'Starting import from: {base_dir}')) # Paths aut_ui_path = os.path.join(base_dir, 'AUT_UI.csv') bib_path = os.path.join(base_dir, 'bib.csv') narrators_path = os.path.join(base_dir, 'narrators.json') tathir_path = os.path.join(base_dir, 'tathir.json') # --- PRE-SCAN TATHIR.JSON FOR CITED VOLUMES --- cited_book_volumes = {} if os.path.exists(tathir_path): with open(tathir_path, 'r', encoding='utf-8') as f: t_data = json.load(f).get('materials', []) for item in t_data: for ed in item.get('editions', []): b_id = ed.get('book_id') b_vol = str(ed.get('volume')).strip() if ed.get('volume') is not None else '' if b_vol.lower() == 'none': b_vol = '' if b_id: if b_id not in cited_book_volumes: cited_book_volumes[b_id] = set() if b_vol: try: cited_book_volumes[b_id].add(str(int(b_vol))) except ValueError: cited_book_volumes[b_id].add(b_vol) # --- PRE-FLIGHT CLEANUP --- self.stdout.write(self.style.WARNING('\n--- PRE-FLIGHT: Cleaning up old legacy books ---')) BookReference.objects.exclude(legacy_id__isnull=True).exclude(legacy_id__exact='').delete() # --- PHASE 1: SCHOLARS & BOOKS --- self.stdout.write(self.style.WARNING('\n--- PHASE 1: Loading Scholars & Books ---')) scholars_map = {} if os.path.exists(aut_ui_path): with open(aut_ui_path, 'r', encoding='utf-8') as f: reader = csv.reader(f) for row in reader: if len(row) >= 3: scholars_map[row[0].strip()] = {"ar": row[1].strip(), "ru": row[2].strip()} if os.path.exists(bib_path): with open(bib_path, 'r', encoding='utf-8') as f: reader = csv.reader(f) for row in reader: if len(row) < 5: continue base_legacy_id = row[0].strip() author_name = row[2].strip() base_title = row[3].strip() vol_str = row[10].strip() if len(row) > 10 else '' try: total_vols_int = int(vol_str) if vol_str.isdigit() else 1 except ValueError: total_vols_int = 1 existing_vols = set() book_folder = os.path.join(base_dir, 'books', base_legacy_id) if os.path.exists(book_folder): for item in os.listdir(book_folder): if os.path.isdir(os.path.join(book_folder, item)): try: existing_vols.add(str(int(item))) except ValueError: existing_vols.add(item) volumes_to_create = existing_vols.union(cited_book_volumes.get(base_legacy_id, set())) if not volumes_to_create: volumes_to_create = {''} for v in volumes_to_create: legacy_id = f"{base_legacy_id}-v{v}" if v else base_legacy_id title_text = f"{base_title} (Vol {v})" if v else base_title book, _ = BookReference.objects.update_or_create( legacy_id=legacy_id, defaults={ 'title': self.wrap_lang(title_text), 'order': int(row[1]) if len(row) > 1 and row[1].isdigit() else 0, 'researcher': self.wrap_lang(row[4].strip() if len(row) > 4 else ''), 'publisher': self.wrap_lang(row[5].strip() if len(row) > 5 else ''), 'city_of_publication': self.wrap_lang(row[6].strip() if len(row) > 6 else ''), 'country_of_publication': self.wrap_lang(row[7].strip() if len(row) > 7 else ''), 'edition_number': row[8].strip() if len(row) > 8 else '', 'year_of_publication': row[9].strip() if len(row) > 9 else '', 'number_of_volumes': total_vols_int, 'volume': v, 'source_url': row[11].strip() if len(row) > 11 else '', 'description': self.wrap_lang(row[12].strip() if len(row) > 12 else ''), 'language': self.wrap_lang('') } ) if author_name: author, _ = BookAuthor.objects.get_or_create(name=self.wrap_lang(author_name)) book.authors.add(author) # Map Book Tags if len(row) > 13 and row[13].strip(): for t in row[13].split(','): if t.strip(): btag, _ = BookSubjectArea.objects.get_or_create(title=self.wrap_lang(t.strip())) book.subject_area.add(btag) # Attach Media if os.path.exists(book_folder): for root, _, files in os.walk(book_folder): folder_name = os.path.basename(root) is_root = (root == book_folder) for file in files: file_lower = file.lower() file_path = os.path.join(root, file) if file_lower.endswith('.pdf'): if v and file_lower in [f"{v}.pdf", f"{v.zfill(2)}.pdf"]: with open(file_path, 'rb') as doc_f: doc = BookReferenceDocument(book_reference=book, volume=v, title=file) doc.file.save(file, File(doc_f), save=True) elif not v and is_root and not file_lower[0].isdigit(): with open(file_path, 'rb') as doc_f: doc = BookReferenceDocument(book_reference=book, volume=v, title=file) doc.file.save(file, File(doc_f), save=True) elif file_lower.endswith(('.png', '.jpg', '.jpeg', '.gif')): if v and not is_root and folder_name.lstrip('0') == v.lstrip('0'): with open(file_path, 'rb') as img_f: img = BookReferenceImage(book_reference=book, volume=v) img.image.save(file, File(img_f), save=True) elif not v and is_root: with open(file_path, 'rb') as img_f: img = BookReferenceImage(book_reference=book, volume=v) img.image.save(file, File(img_f), save=True) self.stdout.write(self.style.SUCCESS('Books loaded successfully.')) # --- PHASE 2: NARRATORS --- self.stdout.write(self.style.WARNING('\n--- PHASE 2: Loading Narrators ---')) if os.path.exists(narrators_path): with open(narrators_path, 'r', encoding='utf-8') as f: n_data_list = json.load(f).get('narrators', []) for n_data in n_data_list: legacy_id = n_data.get('id') legacy_number = int(n_data.get('narrator_number')) if str(n_data.get('narrator_number')).isdigit() else None info = n_data.get('info', {}) ar_info = info.get('arabic', {}) reliability, _ = TransmitterReliability.objects.get_or_create(title=self.wrap_lang(n_data.get('reliability', 'Unknown'))) generation = int(n_data.get('generation')) if str(n_data.get('generation')).isdigit() else None if generation: NarratorLayer.objects.get_or_create(number=generation, defaults={'name': self.wrap_lang(f'Layer {generation}'), 'description': self.wrap_lang('')}) # Safe Age Extraction age_str = info.get('age', '') age_nums = re.findall(r'\d+', str(age_str)) age_val = int(age_nums[0]) if age_nums else None # Madhhab Translation madhhab_list = n_data.get('madhab', []) madhhab_val = Transmitters.MadhhabChoices.UNKNOWN if madhhab_list: m_str = str(madhhab_list[0]).lower() if 'шиит' in m_str: madhhab_val = Transmitters.MadhhabChoices.SHIA elif 'суннит' in m_str: madhhab_val = Transmitters.MadhhabChoices.SUNNI else: madhhab_val = Transmitters.MadhhabChoices.OTHER transmitter, _ = Transmitters.objects.update_or_create( legacy_id=legacy_id, defaults={ 'legacy_number': legacy_number, 'full_name': self.wrap_lang(info.get('name', ''), 'ru') + self.wrap_lang(ar_info.get('name', ''), 'ar'), 'known_as': self.wrap_lang(info.get('known_name', ''), 'ru') + self.wrap_lang(ar_info.get('known_name', ''), 'ar'), 'kunya': self.wrap_lang(info.get('kunya', ''), 'ru') + self.wrap_lang(ar_info.get('kunya', ''), 'ar'), 'nickname': self.wrap_lang(info.get('nickname', ''), 'ru') + self.wrap_lang(ar_info.get('nickname', ''), 'ar'), 'origin': self.wrap_lang(info.get('origin', ''), 'ru') + self.wrap_lang(ar_info.get('origin', ''), 'ar'), 'lived_in': self.wrap_lang(info.get('city_of_residence', ''), 'ru') + self.wrap_lang(ar_info.get('city_of_residence', ''), 'ar'), 'died_in': self.wrap_lang(info.get('city_of_death', ''), 'ru') + self.wrap_lang(ar_info.get('city_of_death', ''), 'ar'), 'description': self.wrap_lang(''), 'generation': generation, 'reliability': reliability, 'in_sahih_bukhari': n_data.get('transmitted_to_bukhari', False), 'in_sahih_muslim': n_data.get('transmitted_to_muslim', False), 'relatives_raw': info.get('relatives', {}), # NEW FIELDS MAPPED 'freed_slave_of': self.wrap_lang(info.get('freed_slave_of', ''), 'ru') + self.wrap_lang(ar_info.get('freed_slave_of', ''), 'ar'), 'occupation': self.wrap_lang(info.get('occupation', ''), 'ru') + self.wrap_lang(ar_info.get('occupation', ''), 'ar'), 'features': self.wrap_lang(info.get('features', ''), 'ru') + self.wrap_lang(ar_info.get('features', ''), 'ar'), 'birth_year_hijri': str(info.get('birth_year', '')), 'death_year_hijri': str(info.get('death_year', '')), 'age_at_death': age_val, 'tags': n_data.get('tags', []), 'madhhab': madhhab_val, } ) for op in n_data.get('strengthened_weakened', {}).get('review', []): author_ui = op.get('author_ui') scholar_data = scholars_map.get(author_ui, {"ar": author_ui, "ru": author_ui}) TransmitterOpinion.objects.get_or_create( transmitter=transmitter, opinion_text=self.wrap_lang(op.get('quote_original', ''), 'ar') + self.wrap_lang(op.get('quote_translated', ''), 'ru'), scholar_name=self.wrap_lang(scholar_data['ar'], 'ar') + self.wrap_lang(scholar_data['ru'], 'ru') ) for text_data in n_data.get('excerpts', []): orig_text, _ = TransmitterOriginalText.objects.get_or_create( transmitter=transmitter, title=self.wrap_lang(text_data.get('title')), text=self.wrap_lang(text_data.get('text'), 'ar'), translation=self.wrap_lang(text_data.get('translation'), 'ru') ) for ed in text_data.get('editions', []): book_ref = self._get_book_volume(ed.get('book_id'), ed.get('volume')) ref_obj, _ = OriginalTextReference.objects.update_or_create( original_text=orig_text, book_reference=book_ref, defaults={ 'volume': str(ed.get('volume', '')), 'page': str(ed.get('pages', '')), # Fixed from 'page' 'url': ed.get('url', '') } ) folder = ed.get('screenshots_folder') if folder: self._attach_images(os.path.join(base_dir, 'screens_trx', legacy_id, folder), OriginalTextReferenceImage, ref_obj) self.stdout.write(self.style.SUCCESS('Narrators loaded successfully.')) # --- PHASE 3: HADITHS --- self.stdout.write(self.style.WARNING('\n--- PHASE 3: Loading Hadiths ---')) default_sect, _ = HadisSect.objects.get_or_create( sect_type='sunni', defaults={'title': self.wrap_lang('Sunni'), 'description': self.wrap_lang('')} ) if os.path.exists(tathir_path): with open(tathir_path, 'r', encoding='utf-8') as f: materials = json.load(f).get('materials', []) correction_to_hadith_map = {} for item in materials: if item.get('type') == 'arguments': for conf_id in item.get('confirmation', []): correction_to_hadith_map[conf_id] = item.get('id') for item in materials: i_type = item.get('type') # A: BASE HADITHS if i_type == 'arguments': cat_str = item.get('category', [''])[0] category, _ = HadisCategory.objects.get_or_create( title=self.wrap_lang(cat_str), defaults={'sect': default_sect, 'source_type': item.get('subtype', 'hadith') or 'hadith', 'description': self.wrap_lang('')} ) status, _ = HadisStatus.objects.get_or_create( title=self.wrap_lang(item.get('authenticity', '')), defaults={'description': self.wrap_lang('')} ) hadis, _ = Hadis.objects.update_or_create( legacy_id=item.get('id'), defaults={ 'category': category, 'hadis_status': status, 'title': self.wrap_lang(item.get('aliases', [''])[0] if item.get('aliases') else ''), 'title_narrator': self.wrap_lang(item.get('aliases', [''])[0] if item.get('aliases') else ''), 'description': self.wrap_lang(''), 'explanation': self.wrap_lang(''), 'address': self.wrap_lang(''), 'hadis_status_text': self.wrap_lang(''), 'text': item.get('original_text', ''), 'translation': self.wrap_lang(item.get('translation', ''), 'ru') } ) # Map Hadith Tags hadis.tags.clear() for tag_str in item.get('tags', []): htag, _ = HadisTag.objects.get_or_create(title=self.wrap_lang(tag_str)) hadis.tags.add(htag) raw_chain = item.get('chain', []) chain_arrays = [] if raw_chain: if isinstance(raw_chain[0], int): chain_arrays = [raw_chain] else: chain_arrays = raw_chain for chain_idx, narrator_ids in enumerate(chain_arrays): for order_idx, n_id in enumerate(narrator_ids): transmitter = Transmitters.objects.filter(legacy_number=n_id).first() if transmitter: layer = NarratorLayer.objects.filter(number=transmitter.generation).first() HadisTransmitter.objects.get_or_create( hadis=hadis, transmitter=transmitter, chain_index=chain_idx, order=order_idx, defaults={'narrator_layer': layer, 'status': transmitter.reliability} ) for ed in item.get('editions', []): book = self._get_book_volume(ed.get('book_id'), ed.get('volume')) href, _ = HadisReference.objects.update_or_create( hadis=hadis, book_reference=book, defaults={ 'hadith_number': str(ed.get('hadith_number', '')), 'description': self.wrap_lang(''), 'volume': str(ed.get('volume', '')), 'pages': str(ed.get('pages', '')), 'url': ed.get('url', '') } ) if ed.get('screenshots_folder'): self._attach_images(os.path.join(base_dir, 'screens', item.get('id'), ed.get('screenshots_folder')), ReferenceImage, href, field_name='thumbnail') # B: CORRECTIONS elif i_type == 'authenticity_analysis': parent_id = correction_to_hadith_map.get(item.get('id')) parent_hadith = Hadis.objects.filter(legacy_id=parent_id).first() if parent_hadith: # CHANGE TO update_or_create HERE: corr, _ = HadisCorrection.objects.update_or_create( hadis=parent_hadith, legacy_id=item.get('id'), defaults={ 'title': self.wrap_lang(''), 'text': item.get('original_text', ''), 'translation': self.wrap_lang(item.get('translation', ''), 'ru') } ) for ed in item.get('editions', []): book = self._get_book_volume(ed.get('book_id'), ed.get('volume')) cref, _ = CorrectionReference.objects.update_or_create( correction=corr, book_reference=book, defaults={ 'hadith_number': str(ed.get('hadith_number', '')), 'volume': str(ed.get('volume', '')), 'pages': str(ed.get('pages', '')), 'url': ed.get('url', '') } ) if ed.get('screenshots_folder'): self._attach_images(os.path.join(base_dir, 'screens', item.get('id'), ed.get('screenshots_folder')), CorrectionReferenceImage, cref) # C: INTERPRETATIONS elif i_type == 'interpretation': cat_str = item.get('category', [''])[0] if item.get('category') else '' category = HadisCategory.objects.filter(title__contains=[{'text': cat_str}]).first() if category: # CHANGE TO update_or_create HERE: interp, _ = HadisInterpretation.objects.update_or_create( category=category, legacy_id=item.get('id'), defaults={ 'title': self.wrap_lang(''), 'text': item.get('original_text', ''), 'translation': self.wrap_lang(item.get('translation', ''), 'ru') } ) for ed in item.get('editions', []): book = self._get_book_volume(ed.get('book_id'), ed.get('volume')) iref, _ = InterpretationReference.objects.update_or_create( interpretation=interp, book_reference=book, defaults={ 'hadith_number': str(ed.get('hadith_number', '')), 'volume': str(ed.get('volume', '')), 'pages': str(ed.get('pages', '')), 'url': ed.get('url', '') } ) if ed.get('screenshots_folder'): self._attach_images(os.path.join(base_dir, 'screens', item.get('id'), ed.get('screenshots_folder')), InterpretationReferenceImage, iref) self.stdout.write(self.style.SUCCESS('\nAll Hadiths, Corrections, and Interpretations Imported Successfully!')) def _get_book_volume(self, book_id, volume_str): """Finds the specific volume of a book, with fallbacks.""" if not book_id: return None if volume_str: vol_clean = ''.join(filter(str.isdigit, str(volume_str))) if vol_clean: book = BookReference.objects.filter(legacy_id=f"{book_id}-v{vol_clean}").first() if book: return book return BookReference.objects.filter(legacy_id__startswith=book_id).first() def _attach_images(self, folder_path, ImageModelClass, reference_instance, field_name='image'): """Helper to safely scan a folder and attach images to a specific reference instance.""" if os.path.exists(folder_path): for i, filename in enumerate(sorted(os.listdir(folder_path))): if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')): file_path = os.path.join(folder_path, filename) with open(file_path, 'rb') as f: img_obj = ImageModelClass(reference=reference_instance, priority=i) image_field = getattr(img_obj, field_name) image_field.save(filename, File(f), save=True)