You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
577 lines
19 KiB
577 lines
19 KiB
import os
|
|
import secrets
|
|
import shutil
|
|
import mimetypes
|
|
import re
|
|
from urllib.parse import urlparse
|
|
|
|
from django.core.files.storage import default_storage
|
|
from django.core.files.base import ContentFile
|
|
|
|
from pathlib import Path
|
|
from django.utils.text import get_valid_filename
|
|
from django.conf import settings
|
|
from django.core.files import File
|
|
from django.http import HttpRequest
|
|
from django.core.mail import send_mail
|
|
|
|
from rest_framework import serializers, status
|
|
from rest_framework.generics import GenericAPIView
|
|
from rest_framework.parsers import MultiPartParser, FormParser
|
|
from rest_framework.response import Response
|
|
from unidecode import unidecode
|
|
from django.utils.text import slugify
|
|
import random
|
|
import string
|
|
|
|
from django.conf import settings
|
|
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from cachetools.func import lru_cache
|
|
from django.http import HttpRequest
|
|
from django.contrib import admin
|
|
|
|
# Moved filer imports to avoid circular imports
|
|
# These will be imported when needed in functions
|
|
|
|
@lru_cache
|
|
def qs_thumbs():
|
|
from filer.models import ThumbnailOption
|
|
return ThumbnailOption.objects.all()
|
|
|
|
|
|
def get_thumbs(obj, request: HttpRequest = None) -> dict:
|
|
# print(f'----> {obj}')
|
|
if not obj:
|
|
return {}
|
|
|
|
try:
|
|
# تعریف سه سایز ثابت
|
|
sizes = ['sm', 'md', 'lg']
|
|
thumbnail_object = {}
|
|
|
|
# گرفتن URL اصلی تصویر
|
|
if hasattr(obj, 'url'):
|
|
original_url = obj.url
|
|
else:
|
|
return {}
|
|
|
|
# برای هر سه سایز، همان URL اصلی را برگردان
|
|
for size in sizes:
|
|
if request:
|
|
url = request.build_absolute_uri(original_url)
|
|
else:
|
|
url = original_url
|
|
|
|
thumbnail_object[size] = url
|
|
|
|
return thumbnail_object
|
|
|
|
except Exception as p:
|
|
print(p)
|
|
return {}
|
|
|
|
|
|
def environment_callback(request):
|
|
if settings.DEBUG:
|
|
return [_("Development"), "primary"]
|
|
|
|
return [_("Production"), "primary"]
|
|
|
|
|
|
|
|
def send_email(recipient, code):
|
|
send_mail(
|
|
'Test Email',
|
|
f'This is a test email {code} from Django using Gmail SMTP.',
|
|
'aliabdolahi.171@gmail.com',
|
|
recipient,
|
|
fail_silently=False,
|
|
)
|
|
return True
|
|
|
|
|
|
def is_valid_email(email):
|
|
# تعریف الگوی regex برای یک ایمیل معتبر
|
|
email_regex = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
|
|
|
|
# بررسی اینکه آیا ایمیل با regex مطابقت دارد یا خیر
|
|
if re.match(email_regex, email):
|
|
return True
|
|
return False
|
|
|
|
|
|
def generate_slug_for_model(model, value: str, recycled_count: int = 0):
|
|
from slugify import slugify
|
|
try:
|
|
|
|
base_slug = slugify(unidecode(value))
|
|
slug = base_slug
|
|
if recycled_count > 0:
|
|
slug = f"{base_slug}-{recycled_count}"
|
|
|
|
if model.objects.filter(slug=slug).exists():
|
|
return generate_slug_for_model(model, value, recycled_count + 1)
|
|
|
|
return slug[:50]
|
|
except Exception as exp:
|
|
letters = string.ascii_lowercase
|
|
result_str = ''.join(random.choice(letters) for i in range(8))
|
|
return result_str
|
|
|
|
|
|
def generate_slugen_for_model(model, value_en, value_pk):
|
|
try:
|
|
unique_slug = value_en
|
|
if not value_pk or not value_en:
|
|
base_slug = slugify(unidecode(value_en))
|
|
unique_slug = base_slug
|
|
num = 1
|
|
while model.objects.filter(slug=unique_slug).exists():
|
|
unique_slug = f"{base_slug}-{num}"
|
|
num += 1
|
|
|
|
return unique_slug
|
|
except Exception as exp:
|
|
letters = string.ascii_lowercase
|
|
result_str = ''.join(random.choice(letters) for i in range(8))
|
|
return result_str
|
|
|
|
|
|
def exclude_host_from_url(url):
|
|
# Parse the URL
|
|
parsed_url = urlparse(url)
|
|
|
|
# Extract the path and query parameters
|
|
path_with_query = parsed_url.path + parsed_url.query
|
|
|
|
return path_with_query
|
|
|
|
|
|
def generate_slug_for_model(model, value: str, recycled_count: int = 0):
|
|
from slugify import slugify
|
|
|
|
slug = slugify(value)
|
|
if model.objects.filter(slug=slug).exists():
|
|
recycled_count += 1
|
|
if value.endswith(f'-{recycled_count - 1}'):
|
|
value = value.replace(f'-{recycled_count - 1}', f'-{recycled_count}')
|
|
else:
|
|
value = f"{value}-{recycled_count}"
|
|
return generate_slug_for_model(model, value, recycled_count)
|
|
|
|
return slug[:50]
|
|
|
|
|
|
def generate_language_slugs(translations):
|
|
"""
|
|
Build a list of {language_code, title} where title is a slugified string
|
|
from provided multilingual translations list.
|
|
Expected input shape:
|
|
- list[dict]: [{'language_code': 'fa', 'title': 'متن'}, ...]
|
|
Fallback keys supported: code/lang/language for language, value/text for content.
|
|
"""
|
|
try:
|
|
result = []
|
|
if isinstance(translations, list):
|
|
for tr in translations:
|
|
if isinstance(tr, dict):
|
|
language_code = tr.get('language_code') or tr.get('code') or tr.get('lang') or tr.get('language')
|
|
text = tr.get('title') or tr.get('text') or tr.get('value')
|
|
if language_code and text:
|
|
slug_text = slugify(text, allow_unicode=True)
|
|
result.append({'language_code': str(language_code), 'title': slug_text})
|
|
return result
|
|
except Exception as e:
|
|
print(f"Error generating slugs: {e}")
|
|
return []
|
|
|
|
def absolute_url(req, url):
|
|
"""
|
|
can either be a file instance or a URL string
|
|
"""
|
|
try:
|
|
return req.build_absolute_uri(url.url if hasattr(url, 'url') else url)
|
|
except Exception:
|
|
return None
|
|
|
|
def sizeof_fmt(num, suffix="B"):
|
|
for unit in ["", "K", "M", "G"]:
|
|
if abs(num) < 1024.0:
|
|
return f"{num:3.1f} {unit}{suffix}"
|
|
num /= 1024.0
|
|
return f"{num:.1f} Yi{suffix}"
|
|
|
|
def file_location_media(path: str):
|
|
"""
|
|
Resolve a media URL/relative path to absolute filesystem path under MEDIA_ROOT.
|
|
"""
|
|
from django.conf import settings
|
|
|
|
media_url = (getattr(settings, "MEDIA_URL", "/media/") or "/media/").rstrip("/")
|
|
media_root = settings.MEDIA_ROOT
|
|
|
|
if path.startswith("http"):
|
|
path = exclude_host_from_url(path)
|
|
|
|
if path.startswith(media_url + "/"):
|
|
path = path[len(media_url):]
|
|
|
|
if path.startswith("/"):
|
|
path = path[1:]
|
|
|
|
return os.path.join(media_root, path)
|
|
|
|
|
|
def file_location(path):
|
|
from django.conf import settings
|
|
import os
|
|
|
|
if path.startswith('http'):
|
|
path = exclude_host_from_url(path)
|
|
|
|
if path.startswith("/static"):
|
|
path = path[7:]
|
|
|
|
if path.startswith('/'):
|
|
path = path[1:]
|
|
|
|
return os.path.join(settings.STATIC_ROOT, path)
|
|
|
|
|
|
def guess_file_type(filename):
|
|
try:
|
|
mimetype = mimetypes.guess_type(filename)[0].split('/')[0]
|
|
return mimetype
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
class FileFieldSerializer(serializers.CharField):
|
|
"""
|
|
a field to handle uploaded file
|
|
"""
|
|
|
|
def get_rpath(self, p):
|
|
# extract relative path of doc
|
|
return p[p.find('/static/') + 7:]
|
|
|
|
def to_representation(self, value):
|
|
request = self.context.get('request', None)
|
|
if value:
|
|
if isinstance(value, str):
|
|
# If value is a string, assume it's a file path
|
|
return value
|
|
elif hasattr(value, 'url'):
|
|
if 'http://' in str(value) or 'https://' in str(value):
|
|
return str(value)
|
|
|
|
return absolute_url(request, value.url) if request else value.url
|
|
return None
|
|
|
|
def to_internal_value(self, data):
|
|
if not data:
|
|
return None
|
|
|
|
if "/tmp/" not in data:
|
|
# value not changed and here we simply return old file path
|
|
return self.get_rpath(data)
|
|
|
|
# if data.startswith('http'):
|
|
# data = self.get_rpath(data)
|
|
|
|
fpath = file_location_media(data)
|
|
if not os.path.exists(fpath):
|
|
raise serializers.ValidationError(f"File: '{fpath}' Does not exist")
|
|
|
|
rel = os.path.basename(data)
|
|
return File(open(fpath, "rb"), rel)
|
|
# return File(open(fpath, 'rb'), os.path.basename(data))
|
|
|
|
|
|
class UploadChatMediaSerializer(serializers.Serializer):
|
|
"""Upload files permanently to /media/chat/"""
|
|
file = serializers.FileField()
|
|
url = serializers.URLField(read_only=True)
|
|
name = serializers.CharField(read_only=True)
|
|
size = serializers.CharField(read_only=True)
|
|
mime_type = serializers.CharField(read_only=True)
|
|
thumbnail_url = serializers.URLField(read_only=True, required=False)
|
|
|
|
def to_representation(self, instance):
|
|
data = super(UploadChatMediaSerializer, self).to_representation(instance)
|
|
data['file'] = instance['file']
|
|
return data
|
|
|
|
# def store_file(self, file):
|
|
# from django.conf import settings
|
|
# from utils.image_utils import (
|
|
# create_thumbnail,
|
|
# is_image_file,
|
|
# is_video_file,
|
|
# extract_video_thumbnail
|
|
# )
|
|
# media_path = settings.MEDIA_ROOT
|
|
|
|
# os.makedirs(f'{media_path}/chat/uploads', exist_ok=True)
|
|
# fpath = f"/chat/uploads/{secrets.token_urlsafe(4)}-{file.name}"
|
|
# full_path = media_path + fpath
|
|
# if hasattr(file, 'read'):
|
|
# default_storage.save(str(full_path), ContentFile(file.read()))
|
|
# else:
|
|
# default_storage.save(str(full_path), file)
|
|
# os.chmod(full_path, 0o644)
|
|
|
|
# result = {
|
|
# 'file': fpath,
|
|
# 'url': absolute_url(self.context['request'], f"/media{fpath}"),
|
|
# 'name': file.name,
|
|
# 'size': sizeof_fmt(file.size),
|
|
# 'mime_type': guess_file_type(fpath)
|
|
# }
|
|
|
|
# # Generate thumbnail if file is an image (low quality for preview)
|
|
# if is_image_file(full_path):
|
|
# try:
|
|
# thumbnail_path = create_thumbnail(full_path, size=(200, 200), quality=60)
|
|
# thumbnail_relative = thumbnail_path.replace(media_path, '')
|
|
# result['thumbnail_url'] = absolute_url(
|
|
# self.context['request'],
|
|
# f"/media{thumbnail_relative}"
|
|
# )
|
|
# except Exception as e:
|
|
# print(f"Failed to generate image thumbnail: {e}")
|
|
# result['thumbnail_url'] = None
|
|
# # Generate thumbnail if file is a video (low quality for preview)
|
|
# elif is_video_file(full_path):
|
|
# try:
|
|
# thumbnail_path = extract_video_thumbnail(
|
|
# full_path,
|
|
# time_offset='00:00:01',
|
|
# size=(200, 200),
|
|
# quality=60
|
|
# )
|
|
# thumbnail_relative = thumbnail_path.replace(media_path, '')
|
|
# result['thumbnail_url'] = absolute_url(
|
|
# self.context['request'],
|
|
# f"/media{thumbnail_relative}"
|
|
# )
|
|
# except Exception as e:
|
|
# print(f"Failed to generate video thumbnail: {e}")
|
|
# result['thumbnail_url'] = None
|
|
# else:
|
|
# result['thumbnail_url'] = None
|
|
|
|
# return result
|
|
def store_file(self, file):
|
|
from django.conf import settings
|
|
from utils.image_utils import (
|
|
create_thumbnail,
|
|
is_image_file,
|
|
is_video_file,
|
|
extract_video_thumbnail,
|
|
)
|
|
|
|
media_root = Path(settings.MEDIA_ROOT) # Path object
|
|
chat_dir = media_root / "chat" / "uploads"
|
|
chat_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
safe_name = get_valid_filename(os.path.basename(file.name))
|
|
rel_path = Path("chat") / "uploads" / f"{secrets.token_urlsafe(4)}-{safe_name}"
|
|
full_path = media_root / rel_path # Path object
|
|
|
|
# Save via default_storage (path must be relative to MEDIA_ROOT)
|
|
if hasattr(file, "read"):
|
|
default_storage.save(str(rel_path), ContentFile(file.read()))
|
|
else:
|
|
default_storage.save(str(rel_path), file)
|
|
|
|
# Optional: chmod only if local filesystem and OS supports it
|
|
try:
|
|
os.chmod(full_path, 0o644)
|
|
except PermissionError:
|
|
pass
|
|
|
|
result = {
|
|
"file": f"/{rel_path.as_posix()}",
|
|
"url": absolute_url(
|
|
self.context["request"],
|
|
f"{settings.MEDIA_URL.rstrip('/')}/{rel_path.as_posix()}",
|
|
),
|
|
"name": safe_name,
|
|
"size": sizeof_fmt(file.size),
|
|
"mime_type": guess_file_type(rel_path.name),
|
|
}
|
|
|
|
# For images
|
|
if is_image_file(str(full_path)):
|
|
try:
|
|
thumbnail_path = create_thumbnail(str(full_path), size=(200, 200), quality=60)
|
|
thumb_rel = str(thumbnail_path).replace(str(media_root), "").lstrip("\\/")
|
|
result["thumbnail_url"] = absolute_url(
|
|
self.context["request"],
|
|
f"{settings.MEDIA_URL.rstrip('/')}/{thumb_rel.replace(os.sep, '/')}",
|
|
)
|
|
except Exception as e:
|
|
print(f"Failed to generate image thumbnail: {e}")
|
|
result["thumbnail_url"] = None
|
|
# For videos
|
|
elif is_video_file(str(full_path)):
|
|
try:
|
|
thumbnail_path = extract_video_thumbnail(
|
|
str(full_path),
|
|
time_offset="00:00:01",
|
|
size=(200, 200),
|
|
quality=60,
|
|
)
|
|
thumb_rel = str(thumbnail_path).replace(str(media_root), "").lstrip("\\/")
|
|
result["thumbnail_url"] = absolute_url(
|
|
self.context["request"],
|
|
f"{settings.MEDIA_URL.rstrip('/')}/{thumb_rel.replace(os.sep, '/')}",
|
|
)
|
|
except Exception as e:
|
|
print(f"Failed to generate video thumbnail: {e}")
|
|
result["thumbnail_url"] = None
|
|
else:
|
|
result["thumbnail_url"] = None
|
|
return result
|
|
|
|
def validate(self, attrs):
|
|
file_details = self.store_file(attrs['file'])
|
|
return file_details
|
|
|
|
|
|
class UploadTmpSerializer(serializers.Serializer):
|
|
file = serializers.FileField()
|
|
url = serializers.URLField(read_only=True)
|
|
name = serializers.CharField(read_only=True)
|
|
size = serializers.CharField(read_only=True)
|
|
mime_type = serializers.CharField(read_only=True)
|
|
thumbnail_url = serializers.URLField(read_only=True, required=False)
|
|
|
|
def to_representation(self, instance):
|
|
data = super(UploadTmpSerializer, self).to_representation(instance)
|
|
data['file'] = instance['file']
|
|
return data
|
|
|
|
def store_file(self, file):
|
|
from django.conf import settings
|
|
from utils.image_utils import (
|
|
create_thumbnail,
|
|
is_image_file,
|
|
is_video_file,
|
|
extract_video_thumbnail,
|
|
)
|
|
|
|
media_root = Path(settings.MEDIA_ROOT)
|
|
tmp_dir = media_root / "tmp"
|
|
tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
safe_name = get_valid_filename(os.path.basename(file.name))
|
|
rel_path = Path("tmp") / f"{secrets.token_urlsafe(4)}-{safe_name}"
|
|
|
|
# Save the file using Django storage (safe on Windows)
|
|
# If the file is an InMemoryFile or TemporaryUploadedFile, read its content
|
|
if hasattr(file, 'read'):
|
|
default_storage.save(str(rel_path), ContentFile(file.read()))
|
|
else:
|
|
default_storage.save(str(rel_path), file)
|
|
|
|
full_path = str(media_root / rel_path)
|
|
|
|
result = {
|
|
"file": f"/{rel_path.as_posix()}",
|
|
"url": absolute_url(
|
|
self.context["request"],
|
|
f"{settings.MEDIA_URL.rstrip('/')}/{rel_path.as_posix()}",
|
|
),
|
|
"name": safe_name,
|
|
"size": sizeof_fmt(file.size),
|
|
"mime_type": guess_file_type(rel_path.name),
|
|
}
|
|
|
|
# Generate thumbnail if image
|
|
if is_image_file(full_path):
|
|
try:
|
|
thumb_path = create_thumbnail(full_path, size=(200, 200), quality=60)
|
|
thumb_rel = thumb_path.replace(str(media_root), "").lstrip("\\/")
|
|
result["thumbnail_url"] = absolute_url(
|
|
self.context["request"],
|
|
f"{settings.MEDIA_URL.rstrip('/')}/{thumb_rel.replace(os.sep, '/')}",
|
|
)
|
|
except Exception as e:
|
|
print("Failed to generate image thumbnail:", e)
|
|
result["thumbnail_url"] = None
|
|
# Generate thumbnail if video
|
|
elif is_video_file(full_path):
|
|
try:
|
|
thumb_path = extract_video_thumbnail(
|
|
full_path,
|
|
time_offset="00:00:01",
|
|
size=(200, 200),
|
|
quality=60,
|
|
)
|
|
thumb_rel = thumb_path.replace(str(media_root), "").lstrip("\\/")
|
|
result["thumbnail_url"] = absolute_url(
|
|
self.context["request"],
|
|
f"{settings.MEDIA_URL.rstrip('/')}/{thumb_rel.replace(os.sep, '/')}",
|
|
)
|
|
except Exception as e:
|
|
print("Failed to generate video thumbnail:", e)
|
|
result["thumbnail_url"] = None
|
|
else:
|
|
result["thumbnail_url"] = None
|
|
|
|
return result
|
|
|
|
def validate(self, attrs):
|
|
file_details = self.store_file(attrs['file'])
|
|
return file_details
|
|
|
|
|
|
class UploadChatMedia(GenericAPIView):
|
|
"""
|
|
Upload files permanently to /media/chat/
|
|
Files are stored permanently and will NOT be removed
|
|
"""
|
|
parser_classes = (FormParser, MultiPartParser)
|
|
serializer_class = UploadChatMediaSerializer
|
|
|
|
def post(self, request: HttpRequest, *args, **kwargs):
|
|
serializer = UploadChatMediaSerializer(data=request.FILES, context={'request': request})
|
|
is_valid = serializer.is_valid(raise_exception=True)
|
|
if not is_valid:
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
return Response(serializer.data)
|
|
|
|
|
|
class UploadTmpMedia(GenericAPIView):
|
|
"""
|
|
Upload files temporarily to /static/tmp/
|
|
Files will be removed every 1 hour
|
|
"""
|
|
parser_classes = (FormParser, MultiPartParser)
|
|
serializer_class = UploadTmpSerializer
|
|
|
|
def post(self, request: HttpRequest, *args, **kwargs):
|
|
serializer = UploadTmpSerializer(data=request.FILES, context={'request': request})
|
|
is_valid = serializer.is_valid(raise_exception=True)
|
|
if not is_valid:
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
return Response(serializer.data)
|
|
|
|
# Configure filer admin after Django is fully loaded
|
|
def configure_filer_admin():
|
|
try:
|
|
from filer.admin.fileadmin import FileAdmin
|
|
from filer.apps import FilerConfig
|
|
|
|
FileAdmin.readonly_fields += ('owner',)
|
|
FilerConfig.icon = 'icon-folder'
|
|
except ImportError:
|
|
pass
|
|
|
|
# This will be executed when this module is imported after Django is fully loaded
|