import os import secrets import shutil import mimetypes import re from urllib.parse import urlparse from django.core.files.storage import default_storage from django.core.files.base import ContentFile from pathlib import Path from django.utils.text import get_valid_filename from django.conf import settings from django.core.files import File from django.http import HttpRequest from django.core.mail import send_mail from rest_framework import serializers, status from rest_framework.generics import GenericAPIView from rest_framework.parsers import MultiPartParser, FormParser from rest_framework.response import Response from unidecode import unidecode from django.utils.text import slugify import random import string from django.conf import settings from django.utils.translation import gettext_lazy as _ from cachetools.func import lru_cache from django.http import HttpRequest from django.contrib import admin # Moved filer imports to avoid circular imports # These will be imported when needed in functions @lru_cache def qs_thumbs(): from filer.models import ThumbnailOption return ThumbnailOption.objects.all() def get_thumbs(obj, request: HttpRequest = None) -> dict: # print(f'----> {obj}') if not obj: return {} try: # تعریف سه سایز ثابت sizes = ['sm', 'md', 'lg'] thumbnail_object = {} # گرفتن URL اصلی تصویر if hasattr(obj, 'url'): original_url = obj.url else: return {} # برای هر سه سایز، همان URL اصلی را برگردان for size in sizes: if request: url = request.build_absolute_uri(original_url) else: url = original_url thumbnail_object[size] = url return thumbnail_object except Exception as p: print(p) return {} def environment_callback(request): if settings.DEBUG: return [_("Development"), "primary"] return [_("Production"), "primary"] def send_email(recipient, code): send_mail( 'Test Email', f'This is a test email {code} from Django using Gmail SMTP.', 'aliabdolahi.171@gmail.com', recipient, fail_silently=False, ) return True def is_valid_email(email): # تعریف الگوی regex برای یک ایمیل معتبر email_regex = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' # بررسی اینکه آیا ایمیل با regex مطابقت دارد یا خیر if re.match(email_regex, email): return True return False def generate_slug_for_model(model, value: str, recycled_count: int = 0): from slugify import slugify try: base_slug = slugify(unidecode(value)) slug = base_slug if recycled_count > 0: slug = f"{base_slug}-{recycled_count}" if model.objects.filter(slug=slug).exists(): return generate_slug_for_model(model, value, recycled_count + 1) return slug[:50] except Exception as exp: letters = string.ascii_lowercase result_str = ''.join(random.choice(letters) for i in range(8)) return result_str def generate_slugen_for_model(model, value_en, value_pk): try: unique_slug = value_en if not value_pk or not value_en: base_slug = slugify(unidecode(value_en)) unique_slug = base_slug num = 1 while model.objects.filter(slug=unique_slug).exists(): unique_slug = f"{base_slug}-{num}" num += 1 return unique_slug except Exception as exp: letters = string.ascii_lowercase result_str = ''.join(random.choice(letters) for i in range(8)) return result_str def exclude_host_from_url(url): # Parse the URL parsed_url = urlparse(url) # Extract the path and query parameters path_with_query = parsed_url.path + parsed_url.query return path_with_query def generate_slug_for_model(model, value: str, recycled_count: int = 0): from slugify import slugify slug = slugify(value) if model.objects.filter(slug=slug).exists(): recycled_count += 1 if value.endswith(f'-{recycled_count - 1}'): value = value.replace(f'-{recycled_count - 1}', f'-{recycled_count}') else: value = f"{value}-{recycled_count}" return generate_slug_for_model(model, value, recycled_count) return slug[:50] def generate_language_slugs(translations): """ Build a list of {language_code, title} where title is a slugified string from provided multilingual translations list. Expected input shape: - list[dict]: [{'language_code': 'fa', 'title': 'متن'}, ...] Fallback keys supported: code/lang/language for language, value/text for content. """ try: result = [] if isinstance(translations, list): for tr in translations: if isinstance(tr, dict): language_code = tr.get('language_code') or tr.get('code') or tr.get('lang') or tr.get('language') text = tr.get('title') or tr.get('text') or tr.get('value') if language_code and text: slug_text = slugify(text, allow_unicode=True) result.append({'language_code': str(language_code), 'title': slug_text}) return result except Exception as e: print(f"Error generating slugs: {e}") return [] def absolute_url(req, url): """ can either be a file instance or a URL string """ try: return req.build_absolute_uri(url.url if hasattr(url, 'url') else url) except Exception: return None def sizeof_fmt(num, suffix="B"): for unit in ["", "K", "M", "G"]: if abs(num) < 1024.0: return f"{num:3.1f} {unit}{suffix}" num /= 1024.0 return f"{num:.1f} Yi{suffix}" def file_location_media(path: str): """ Resolve a media URL/relative path to absolute filesystem path under MEDIA_ROOT. """ from django.conf import settings media_url = (getattr(settings, "MEDIA_URL", "/media/") or "/media/").rstrip("/") media_root = settings.MEDIA_ROOT if path.startswith("http"): path = exclude_host_from_url(path) if path.startswith(media_url + "/"): path = path[len(media_url):] if path.startswith("/"): path = path[1:] return os.path.join(media_root, path) def file_location(path): from django.conf import settings import os if path.startswith('http'): path = exclude_host_from_url(path) if path.startswith("/static"): path = path[7:] if path.startswith('/'): path = path[1:] return os.path.join(settings.STATIC_ROOT, path) def guess_file_type(filename): try: mimetype = mimetypes.guess_type(filename)[0].split('/')[0] return mimetype except Exception: return False class FileFieldSerializer(serializers.CharField): """ a field to handle uploaded file """ def get_rpath(self, p): # extract relative path of doc return p[p.find('/static/') + 7:] def to_representation(self, value): request = self.context.get('request', None) if value: if isinstance(value, str): # If value is a string, assume it's a file path return value elif hasattr(value, 'url'): if 'http://' in str(value) or 'https://' in str(value): return str(value) return absolute_url(request, value.url) if request else value.url return None def to_internal_value(self, data): if not data: return None if "/tmp/" not in data: # value not changed and here we simply return old file path return self.get_rpath(data) # if data.startswith('http'): # data = self.get_rpath(data) fpath = file_location_media(data) if not os.path.exists(fpath): raise serializers.ValidationError(f"File: '{fpath}' Does not exist") rel = os.path.basename(data) return File(open(fpath, "rb"), rel) # return File(open(fpath, 'rb'), os.path.basename(data)) class UploadChatMediaSerializer(serializers.Serializer): """Upload files permanently to /media/chat/""" file = serializers.FileField() url = serializers.URLField(read_only=True) name = serializers.CharField(read_only=True) size = serializers.CharField(read_only=True) mime_type = serializers.CharField(read_only=True) thumbnail_url = serializers.URLField(read_only=True, required=False) def to_representation(self, instance): data = super(UploadChatMediaSerializer, self).to_representation(instance) data['file'] = instance['file'] return data # def store_file(self, file): # from django.conf import settings # from utils.image_utils import ( # create_thumbnail, # is_image_file, # is_video_file, # extract_video_thumbnail # ) # media_path = settings.MEDIA_ROOT # os.makedirs(f'{media_path}/chat/uploads', exist_ok=True) # fpath = f"/chat/uploads/{secrets.token_urlsafe(4)}-{file.name}" # full_path = media_path + fpath # if hasattr(file, 'read'): # default_storage.save(str(full_path), ContentFile(file.read())) # else: # default_storage.save(str(full_path), file) # os.chmod(full_path, 0o644) # result = { # 'file': fpath, # 'url': absolute_url(self.context['request'], f"/media{fpath}"), # 'name': file.name, # 'size': sizeof_fmt(file.size), # 'mime_type': guess_file_type(fpath) # } # # Generate thumbnail if file is an image (low quality for preview) # if is_image_file(full_path): # try: # thumbnail_path = create_thumbnail(full_path, size=(200, 200), quality=60) # thumbnail_relative = thumbnail_path.replace(media_path, '') # result['thumbnail_url'] = absolute_url( # self.context['request'], # f"/media{thumbnail_relative}" # ) # except Exception as e: # print(f"Failed to generate image thumbnail: {e}") # result['thumbnail_url'] = None # # Generate thumbnail if file is a video (low quality for preview) # elif is_video_file(full_path): # try: # thumbnail_path = extract_video_thumbnail( # full_path, # time_offset='00:00:01', # size=(200, 200), # quality=60 # ) # thumbnail_relative = thumbnail_path.replace(media_path, '') # result['thumbnail_url'] = absolute_url( # self.context['request'], # f"/media{thumbnail_relative}" # ) # except Exception as e: # print(f"Failed to generate video thumbnail: {e}") # result['thumbnail_url'] = None # else: # result['thumbnail_url'] = None # return result def store_file(self, file): from django.conf import settings from utils.image_utils import ( create_thumbnail, is_image_file, is_video_file, extract_video_thumbnail, ) media_root = Path(settings.MEDIA_ROOT) # Path object chat_dir = media_root / "chat" / "uploads" chat_dir.mkdir(parents=True, exist_ok=True) safe_name = get_valid_filename(os.path.basename(file.name)) rel_path = Path("chat") / "uploads" / f"{secrets.token_urlsafe(4)}-{safe_name}" full_path = media_root / rel_path # Path object # Save via default_storage (path must be relative to MEDIA_ROOT) if hasattr(file, "read"): default_storage.save(str(rel_path), ContentFile(file.read())) else: default_storage.save(str(rel_path), file) # Optional: chmod only if local filesystem and OS supports it try: os.chmod(full_path, 0o644) except PermissionError: pass result = { "file": f"/{rel_path.as_posix()}", "url": absolute_url( self.context["request"], f"{settings.MEDIA_URL.rstrip('/')}/{rel_path.as_posix()}", ), "name": safe_name, "size": sizeof_fmt(file.size), "mime_type": guess_file_type(rel_path.name), } # For images if is_image_file(str(full_path)): try: thumbnail_path = create_thumbnail(str(full_path), size=(200, 200), quality=60) thumb_rel = str(thumbnail_path).replace(str(media_root), "").lstrip("\\/") result["thumbnail_url"] = absolute_url( self.context["request"], f"{settings.MEDIA_URL.rstrip('/')}/{thumb_rel.replace(os.sep, '/')}", ) except Exception as e: print(f"Failed to generate image thumbnail: {e}") result["thumbnail_url"] = None # For videos elif is_video_file(str(full_path)): try: thumbnail_path = extract_video_thumbnail( str(full_path), time_offset="00:00:01", size=(200, 200), quality=60, ) thumb_rel = str(thumbnail_path).replace(str(media_root), "").lstrip("\\/") result["thumbnail_url"] = absolute_url( self.context["request"], f"{settings.MEDIA_URL.rstrip('/')}/{thumb_rel.replace(os.sep, '/')}", ) except Exception as e: print(f"Failed to generate video thumbnail: {e}") result["thumbnail_url"] = None else: result["thumbnail_url"] = None return result def validate(self, attrs): file_details = self.store_file(attrs['file']) return file_details class UploadTmpSerializer(serializers.Serializer): file = serializers.FileField() url = serializers.URLField(read_only=True) name = serializers.CharField(read_only=True) size = serializers.CharField(read_only=True) mime_type = serializers.CharField(read_only=True) thumbnail_url = serializers.URLField(read_only=True, required=False) def to_representation(self, instance): data = super(UploadTmpSerializer, self).to_representation(instance) data['file'] = instance['file'] return data def store_file(self, file): from django.conf import settings from utils.image_utils import ( create_thumbnail, is_image_file, is_video_file, extract_video_thumbnail, ) media_root = Path(settings.MEDIA_ROOT) tmp_dir = media_root / "tmp" tmp_dir.mkdir(parents=True, exist_ok=True) safe_name = get_valid_filename(os.path.basename(file.name)) rel_path = Path("tmp") / f"{secrets.token_urlsafe(4)}-{safe_name}" # Save the file using Django storage (safe on Windows) # If the file is an InMemoryFile or TemporaryUploadedFile, read its content if hasattr(file, 'read'): default_storage.save(str(rel_path), ContentFile(file.read())) else: default_storage.save(str(rel_path), file) full_path = str(media_root / rel_path) result = { "file": f"/{rel_path.as_posix()}", "url": absolute_url( self.context["request"], f"{settings.MEDIA_URL.rstrip('/')}/{rel_path.as_posix()}", ), "name": safe_name, "size": sizeof_fmt(file.size), "mime_type": guess_file_type(rel_path.name), } # Generate thumbnail if image if is_image_file(full_path): try: thumb_path = create_thumbnail(full_path, size=(200, 200), quality=60) thumb_rel = thumb_path.replace(str(media_root), "").lstrip("\\/") result["thumbnail_url"] = absolute_url( self.context["request"], f"{settings.MEDIA_URL.rstrip('/')}/{thumb_rel.replace(os.sep, '/')}", ) except Exception as e: print("Failed to generate image thumbnail:", e) result["thumbnail_url"] = None # Generate thumbnail if video elif is_video_file(full_path): try: thumb_path = extract_video_thumbnail( full_path, time_offset="00:00:01", size=(200, 200), quality=60, ) thumb_rel = thumb_path.replace(str(media_root), "").lstrip("\\/") result["thumbnail_url"] = absolute_url( self.context["request"], f"{settings.MEDIA_URL.rstrip('/')}/{thumb_rel.replace(os.sep, '/')}", ) except Exception as e: print("Failed to generate video thumbnail:", e) result["thumbnail_url"] = None else: result["thumbnail_url"] = None return result def validate(self, attrs): file_details = self.store_file(attrs['file']) return file_details class UploadChatMedia(GenericAPIView): """ Upload files permanently to /media/chat/ Files are stored permanently and will NOT be removed """ parser_classes = (FormParser, MultiPartParser) serializer_class = UploadChatMediaSerializer def post(self, request: HttpRequest, *args, **kwargs): serializer = UploadChatMediaSerializer(data=request.FILES, context={'request': request}) is_valid = serializer.is_valid(raise_exception=True) if not is_valid: return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.data) class UploadTmpMedia(GenericAPIView): """ Upload files temporarily to /static/tmp/ Files will be removed every 1 hour """ parser_classes = (FormParser, MultiPartParser) serializer_class = UploadTmpSerializer def post(self, request: HttpRequest, *args, **kwargs): serializer = UploadTmpSerializer(data=request.FILES, context={'request': request}) is_valid = serializer.is_valid(raise_exception=True) if not is_valid: return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.data) # Configure filer admin after Django is fully loaded def configure_filer_admin(): try: from filer.admin.fileadmin import FileAdmin from filer.apps import FilerConfig FileAdmin.readonly_fields += ('owner',) FilerConfig.icon = 'icon-folder' except ImportError: pass # This will be executed when this module is imported after Django is fully loaded