You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

257 lines
9.3 KiB

"""
Django Management Command: Regenerate TransmitterOriginalText Slugs
This command:
1. Takes all existing TransmitterOriginalText objects
2. Extracts their title (first N words, default 8)
3. Generates smart, short, meaningful slugs
4. Handles uniqueness with counters (-1, -2, -3, etc.)
5. REPLACES all old slug values with new ones
6. Detects and numbers duplicates automatically
Usage:
python manage.py regenerate_transmitter_originaltext_slugs
python manage.py regenerate_transmitter_originaltext_slugs --max-length 75
python manage.py regenerate_transmitter_originaltext_slugs --keep-words 6
python manage.py regenerate_transmitter_originaltext_slugs --dry-run
"""
from collections import defaultdict
from django.core.management.base import BaseCommand
from django.db import transaction
from django.utils.text import slugify
from apps.hadis.models import TransmitterOriginalText
from utils.slug import generate_smart_slug # your existing helper
class Command(BaseCommand):
help = (
"Regenerate smart slugs for all TransmitterOriginalText objects. "
"Replaces existing slug values with optimized, short, meaningful ones. "
"Automatically adds counters for duplicates."
)
def add_arguments(self, parser):
"""Add optional command-line arguments"""
parser.add_argument(
"--max-length",
type=int,
default=100,
help="Maximum slug length (default: 100)",
)
parser.add_argument(
"--keep-words",
type=int,
default=8,
help="Maximum number of words to keep in slug (default: 8)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would change without saving",
)
@transaction.atomic
def handle(self, *args, **options):
max_length = options["max_length"]
keep_words = options["keep_words"]
dry_run = options["dry_run"]
self.stdout.write(self.style.HTTP_INFO("=" * 80))
self.stdout.write("🔄 TRANSMITTER ORIGINAL TEXT SLUG REGENERATION\n")
self.stdout.write("Configuration:")
self.stdout.write(f" • Max length: {max_length} chars")
self.stdout.write(f" • Keep words: {keep_words} words")
self.stdout.write(f" • Dry run: {'Yes (no changes)' if dry_run else 'No (will save)'}")
self.stdout.write(self.style.HTTP_INFO("=" * 80) + "\n")
# Get all objects
qs = TransmitterOriginalText.objects.all().order_by("id")
total = qs.count()
self.stdout.write(f"Step 1: Analyzing {total} TransmitterOriginalText objects...\n")
# Dictionary to track duplicate slugs: {base_slug: [ids]}
slug_map = defaultdict(list)
obj_slug_map = {} # {id: data}
# First pass: Generate base slugs and identify duplicates
for obj in qs:
try:
# Extract title text from JSONField
title_text = None
if obj.title and isinstance(obj.title, list) and obj.title:
first_item = obj.title[0]
if isinstance(first_item, dict):
title_text = first_item.get("text")
# Fallback if no title
if not title_text:
# use transmitter name if available, otherwise id
base = None
if obj.transmitter and isinstance(obj.transmitter.full_name, list):
first_name = obj.transmitter.full_name[0]
if isinstance(first_name, dict):
base = first_name.get("text")
title_text = base or f"transmitter-originaltext-{obj.id}"
# Generate base slug (without counter)
base_slug = self._generate_base_slug(
title_text,
max_length - 5, # Reserve space for counter
keep_words,
)
obj_slug_map[obj.id] = {
"base_slug": base_slug,
"title_text": title_text,
"old_slug": obj.slug or "(empty)",
}
slug_map[base_slug].append(obj.id)
except Exception as e:
self.stdout.write(
self.style.ERROR(f" ❌ Error analyzing TransmitterOriginalText {obj.id}: {str(e)}")
)
self.stdout.write("✅ Analysis complete!\n")
self.stdout.write("Step 2: Detecting duplicates...\n")
# Identify groups with duplicates
duplicate_groups = {
slug: ids for slug, ids in slug_map.items() if len(ids) > 1
}
if duplicate_groups:
self.stdout.write(
self.style.WARNING(
f"⚠️ Found {len(duplicate_groups)} groups with duplicate slugs:\n"
)
)
for base_slug, ids in sorted(duplicate_groups.items()):
self.stdout.write(f"'{base_slug}' → {len(ids)} objects")
for idx, obj_id in enumerate(ids):
counter = "" if idx == 0 else f"-{idx}"
self.stdout.write(
f" - TransmitterOriginalText {obj_id}: {base_slug}{counter}"
)
else:
self.stdout.write("✅ No duplicates found! All slugs are unique.\n")
self.stdout.write("\nStep 3: Applying slugs with counters...\n")
# Second pass: Apply slugs with counters for duplicates
updated = 0
unchanged = 0
errors = []
for obj in qs:
try:
slug_info = obj_slug_map.get(obj.id)
if not slug_info:
continue
base_slug = slug_info["base_slug"]
old_slug = slug_info["old_slug"]
# If this slug has duplicates, add counter
if len(slug_map[base_slug]) > 1:
duplicate_ids = slug_map[base_slug]
position = duplicate_ids.index(obj.id)
# First one gets no counter, rest get -1, -2, etc.
if position == 0:
new_slug = base_slug
else:
counter_suffix = f"-{position}"
available_length = max_length - len(counter_suffix)
new_slug = (
base_slug[:available_length].rstrip("-")
+ counter_suffix
)
else:
new_slug = base_slug
changed = obj.slug != new_slug
if changed:
self.stdout.write(
f" [{obj.id:5d}] {old_slug:45s} → {new_slug}"
)
if not dry_run:
obj.slug = new_slug
obj.save(update_fields=["slug"])
updated += 1
else:
unchanged += 1
except Exception as e:
error_msg = f"TransmitterOriginalText {obj.id}: {str(e)}"
self.stdout.write(self.style.ERROR(f" ❌ {error_msg}"))
errors.append(error_msg)
# Summary
self.stdout.write("\n" + self.style.HTTP_INFO("=" * 80))
self.stdout.write("📊 RESULTS\n")
self.stdout.write(f" ✅ Updated: {updated}")
self.stdout.write(f" ➡️ Unchanged: {unchanged}")
self.stdout.write(f" ❌ Errors: {len(errors)}")
self.stdout.write(
f" 🔢 Duplicate groups handled: {len(duplicate_groups)}"
)
if dry_run:
self.stdout.write(
self.style.WARNING(
"\n ⚠️ DRY RUN MODE: No changes were saved."
)
)
self.stdout.write(
self.style.WARNING(
" Run without --dry-run to apply changes."
)
)
if errors:
self.stdout.write(self.style.ERROR("\n❌ ERRORS ENCOUNTERED:"))
for error in errors:
self.stdout.write(f" • {error}")
self.stdout.write(self.style.HTTP_INFO("=" * 80) + "\n")
if not dry_run and updated > 0:
self.stdout.write(
self.style.SUCCESS(
f"✅ Successfully regenerated {updated} slug(s) for TransmitterOriginalText!"
)
)
elif dry_run:
self.stdout.write(
self.style.HTTP_INFO(
f"ℹ️ Would update {updated} slug(s) in production run."
)
)
def _generate_base_slug(self, text: str, max_length: int, keep_words: int) -> str:
"""
Generate a base slug without counter.
Returns the base slug that might be used for multiple objects.
"""
if not text or not isinstance(text, str):
return "transmitter-originaltext-unknown"
# Extract first N words
words = text.strip().split()[:keep_words]
text_shortened = " ".join(words)
# Slugify
base_slug = slugify(text_shortened, allow_unicode=True)
# Truncate
slug = base_slug[:max_length].rstrip("-")
return slug