You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

611 lines
21 KiB

import os
import secrets
import shutil
import mimetypes
import re
from urllib.parse import urlparse
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
from pathlib import Path
from django.utils.text import get_valid_filename
from django.conf import settings
from django.core.files import File
from django.http import HttpRequest
from django.core.mail import send_mail
from rest_framework import serializers, status
from rest_framework.generics import GenericAPIView
from rest_framework.parsers import MultiPartParser, FormParser
from rest_framework.response import Response
from unidecode import unidecode
from django.utils.text import slugify
import random
import string
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from cachetools.func import lru_cache
from django.http import HttpRequest
from django.contrib import admin
# Moved filer imports to avoid circular imports
# These will be imported when needed in functions
@lru_cache
def qs_thumbs():
from filer.models import ThumbnailOption
return ThumbnailOption.objects.all()
def get_thumbs(obj, request: HttpRequest = None) -> dict:
# print(f'----> {obj}')
if not obj:
return {}
try:
# تعریف سه سایز ثابت
sizes = ['sm', 'md', 'lg']
thumbnail_object = {}
# گرفتن URL اصلی تصویر
if hasattr(obj, 'url'):
original_url = obj.url
else:
return {}
# برای هر سه سایز، همان URL اصلی را برگردان
for size in sizes:
if request:
url = request.build_absolute_uri(original_url)
else:
url = original_url
thumbnail_object[size] = url
return thumbnail_object
except Exception as p:
print(p)
return {}
def environment_callback(request):
if settings.DEBUG:
return [_("Development"), "primary"]
return [_("Production"), "primary"]
def send_email(recipient, code):
import requests
from django.conf import settings
if not settings.RESEND_API_KEY:
print("RESEND_API_KEY is not set in settings.")
return False
url = "https://api.resend.com/emails"
headers = {
"Authorization": f"Bearer {settings.RESEND_API_KEY}",
"Content-Type": "application/json",
}
subject = 'Verification Code'
html_content = f"""
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px; border: 1px solid #e0e0e0; border-radius: 10px;">
<h2 style="color: #333; text-align: center;">Verification Code</h2>
<p style="font-size: 16px; color: #555;">Hello,</p>
<p style="font-size: 16px; color: #555;">Your verification code for <strong>Imam Javad App</strong> is:</p>
<div style="text-align: center; margin: 30px 0;">
<span style="font-size: 32px; font-weight: bold; color: #007bff; letter-spacing: 5px; background: #f8f9fa; padding: 10px 20px; border-radius: 5px; border: 1px dashed #007bff;">{code}</span>
</div>
<p style="font-size: 14px; color: #777; text-align: center;">This code will expire shortly. If you did not request this code, please ignore this email.</p>
<hr style="border: 0; border-top: 1px solid #eee; margin: 20px 0;">
<p style="font-size: 12px; color: #999; text-align: center;">Sent via Resend API</p>
</div>
"""
payload = {
"from": settings.RESEND_FROM_EMAIL,
"to": recipient,
"subject": subject,
"html": html_content,
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
return True
except Exception as e:
print(f"Failed to send email via Resend: {str(e)}")
return False
def is_valid_email(email):
# تعریف الگوی regex برای یک ایمیل معتبر
email_regex = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
# بررسی اینکه آیا ایمیل با regex مطابقت دارد یا خیر
if re.match(email_regex, email):
return True
return False
def generate_slug_for_model(model, value: str, recycled_count: int = 0):
from slugify import slugify
try:
base_slug = slugify(unidecode(value))
slug = base_slug
if recycled_count > 0:
slug = f"{base_slug}-{recycled_count}"
if model.objects.filter(slug=slug).exists():
return generate_slug_for_model(model, value, recycled_count + 1)
return slug[:50]
except Exception as exp:
letters = string.ascii_lowercase
result_str = ''.join(random.choice(letters) for i in range(8))
return result_str
def generate_slugen_for_model(model, value_en, value_pk):
try:
unique_slug = value_en
if not value_pk or not value_en:
base_slug = slugify(unidecode(value_en))
unique_slug = base_slug
num = 1
while model.objects.filter(slug=unique_slug).exists():
unique_slug = f"{base_slug}-{num}"
num += 1
return unique_slug
except Exception as exp:
letters = string.ascii_lowercase
result_str = ''.join(random.choice(letters) for i in range(8))
return result_str
def exclude_host_from_url(url):
# Parse the URL
parsed_url = urlparse(url)
# Extract the path and query parameters
path_with_query = parsed_url.path + parsed_url.query
return path_with_query
def generate_slug_for_model(model, value: str, recycled_count: int = 0):
from slugify import slugify
slug = slugify(value)
if model.objects.filter(slug=slug).exists():
recycled_count += 1
if value.endswith(f'-{recycled_count - 1}'):
value = value.replace(f'-{recycled_count - 1}', f'-{recycled_count}')
else:
value = f"{value}-{recycled_count}"
return generate_slug_for_model(model, value, recycled_count)
return slug[:50]
def generate_language_slugs(translations):
"""
Build a list of {language_code, title} where title is a slugified string
from provided multilingual translations list.
Expected input shape:
- list[dict]: [{'language_code': 'fa', 'title': 'متن'}, ...]
Fallback keys supported: code/lang/language for language, value/text for content.
"""
try:
result = []
if isinstance(translations, list):
for tr in translations:
if isinstance(tr, dict):
language_code = tr.get('language_code') or tr.get('code') or tr.get('lang') or tr.get('language')
text = tr.get('title') or tr.get('text') or tr.get('value')
if language_code and text:
slug_text = slugify(text, allow_unicode=True)
result.append({'language_code': str(language_code), 'title': slug_text})
return result
except Exception as e:
print(f"Error generating slugs: {e}")
return []
def absolute_url(req, url):
"""
can either be a file instance or a URL string
"""
try:
return req.build_absolute_uri(url.url if hasattr(url, 'url') else url)
except Exception:
return None
def sizeof_fmt(num, suffix="B"):
for unit in ["", "K", "M", "G"]:
if abs(num) < 1024.0:
return f"{num:3.1f} {unit}{suffix}"
num /= 1024.0
return f"{num:.1f} Yi{suffix}"
def file_location_media(path: str):
"""
Resolve a media URL/relative path to absolute filesystem path under MEDIA_ROOT.
"""
from django.conf import settings
media_url = (getattr(settings, "MEDIA_URL", "/media/") or "/media/").rstrip("/")
media_root = settings.MEDIA_ROOT
if path.startswith("http"):
path = exclude_host_from_url(path)
if path.startswith(media_url + "/"):
path = path[len(media_url):]
if path.startswith("/"):
path = path[1:]
return os.path.join(media_root, path)
def file_location(path):
from django.conf import settings
import os
if path.startswith('http'):
path = exclude_host_from_url(path)
if path.startswith("/static"):
path = path[7:]
if path.startswith('/'):
path = path[1:]
return os.path.join(settings.STATIC_ROOT, path)
def guess_file_type(filename):
try:
mimetype = mimetypes.guess_type(filename)[0].split('/')[0]
return mimetype
except Exception:
return False
class FileFieldSerializer(serializers.CharField):
"""
a field to handle uploaded file
"""
def get_rpath(self, p):
# extract relative path of doc
return p[p.find('/static/') + 7:]
def to_representation(self, value):
request = self.context.get('request', None)
if value:
if isinstance(value, str):
# If value is a string, assume it's a file path
return value
elif hasattr(value, 'url'):
if 'http://' in str(value) or 'https://' in str(value):
return str(value)
return absolute_url(request, value.url) if request else value.url
return None
def to_internal_value(self, data):
if not data:
return None
if "/tmp/" not in data:
# value not changed and here we simply return old file path
return self.get_rpath(data)
# if data.startswith('http'):
# data = self.get_rpath(data)
fpath = file_location_media(data)
if not os.path.exists(fpath):
raise serializers.ValidationError(f"File: '{fpath}' Does not exist")
rel = os.path.basename(data)
return File(open(fpath, "rb"), rel)
# return File(open(fpath, 'rb'), os.path.basename(data))
class UploadChatMediaSerializer(serializers.Serializer):
"""Upload files permanently to /media/chat/"""
file = serializers.FileField()
url = serializers.URLField(read_only=True)
name = serializers.CharField(read_only=True)
size = serializers.CharField(read_only=True)
mime_type = serializers.CharField(read_only=True)
thumbnail_url = serializers.URLField(read_only=True, required=False)
def to_representation(self, instance):
data = super(UploadChatMediaSerializer, self).to_representation(instance)
data['file'] = instance['file']
return data
# def store_file(self, file):
# from django.conf import settings
# from utils.image_utils import (
# create_thumbnail,
# is_image_file,
# is_video_file,
# extract_video_thumbnail
# )
# media_path = settings.MEDIA_ROOT
# os.makedirs(f'{media_path}/chat/uploads', exist_ok=True)
# fpath = f"/chat/uploads/{secrets.token_urlsafe(4)}-{file.name}"
# full_path = media_path + fpath
# if hasattr(file, 'read'):
# default_storage.save(str(full_path), ContentFile(file.read()))
# else:
# default_storage.save(str(full_path), file)
# os.chmod(full_path, 0o644)
# result = {
# 'file': fpath,
# 'url': absolute_url(self.context['request'], f"/media{fpath}"),
# 'name': file.name,
# 'size': sizeof_fmt(file.size),
# 'mime_type': guess_file_type(fpath)
# }
# # Generate thumbnail if file is an image (low quality for preview)
# if is_image_file(full_path):
# try:
# thumbnail_path = create_thumbnail(full_path, size=(200, 200), quality=60)
# thumbnail_relative = thumbnail_path.replace(media_path, '')
# result['thumbnail_url'] = absolute_url(
# self.context['request'],
# f"/media{thumbnail_relative}"
# )
# except Exception as e:
# print(f"Failed to generate image thumbnail: {e}")
# result['thumbnail_url'] = None
# # Generate thumbnail if file is a video (low quality for preview)
# elif is_video_file(full_path):
# try:
# thumbnail_path = extract_video_thumbnail(
# full_path,
# time_offset='00:00:01',
# size=(200, 200),
# quality=60
# )
# thumbnail_relative = thumbnail_path.replace(media_path, '')
# result['thumbnail_url'] = absolute_url(
# self.context['request'],
# f"/media{thumbnail_relative}"
# )
# except Exception as e:
# print(f"Failed to generate video thumbnail: {e}")
# result['thumbnail_url'] = None
# else:
# result['thumbnail_url'] = None
# return result
def store_file(self, file):
from django.conf import settings
from utils.image_utils import (
create_thumbnail,
is_image_file,
is_video_file,
extract_video_thumbnail,
)
media_root = Path(settings.MEDIA_ROOT) # Path object
chat_dir = media_root / "chat" / "uploads"
chat_dir.mkdir(parents=True, exist_ok=True)
safe_name = get_valid_filename(os.path.basename(file.name))
rel_path = Path("chat") / "uploads" / f"{secrets.token_urlsafe(4)}-{safe_name}"
full_path = media_root / rel_path # Path object
# Save via default_storage (path must be relative to MEDIA_ROOT)
if hasattr(file, "read"):
default_storage.save(str(rel_path), ContentFile(file.read()))
else:
default_storage.save(str(rel_path), file)
# Optional: chmod only if local filesystem and OS supports it
try:
os.chmod(full_path, 0o644)
except PermissionError:
pass
result = {
"file": f"/{rel_path.as_posix()}",
"url": absolute_url(
self.context["request"],
f"{settings.MEDIA_URL.rstrip('/')}/{rel_path.as_posix()}",
),
"name": safe_name,
"size": sizeof_fmt(file.size),
"mime_type": guess_file_type(rel_path.name),
}
# For images
if is_image_file(str(full_path)):
try:
thumbnail_path = create_thumbnail(str(full_path), size=(200, 200), quality=60)
thumb_rel = str(thumbnail_path).replace(str(media_root), "").lstrip("\\/")
result["thumbnail_url"] = absolute_url(
self.context["request"],
f"{settings.MEDIA_URL.rstrip('/')}/{thumb_rel.replace(os.sep, '/')}",
)
except Exception as e:
print(f"Failed to generate image thumbnail: {e}")
result["thumbnail_url"] = None
# For videos
elif is_video_file(str(full_path)):
try:
thumbnail_path = extract_video_thumbnail(
str(full_path),
time_offset="00:00:01",
size=(200, 200),
quality=60,
)
thumb_rel = str(thumbnail_path).replace(str(media_root), "").lstrip("\\/")
result["thumbnail_url"] = absolute_url(
self.context["request"],
f"{settings.MEDIA_URL.rstrip('/')}/{thumb_rel.replace(os.sep, '/')}",
)
except Exception as e:
print(f"Failed to generate video thumbnail: {e}")
result["thumbnail_url"] = None
else:
result["thumbnail_url"] = None
return result
def validate(self, attrs):
file_details = self.store_file(attrs['file'])
return file_details
class UploadTmpSerializer(serializers.Serializer):
file = serializers.FileField()
url = serializers.URLField(read_only=True)
name = serializers.CharField(read_only=True)
size = serializers.CharField(read_only=True)
mime_type = serializers.CharField(read_only=True)
thumbnail_url = serializers.URLField(read_only=True, required=False)
def to_representation(self, instance):
data = super(UploadTmpSerializer, self).to_representation(instance)
data['file'] = instance['file']
return data
def store_file(self, file):
from django.conf import settings
from utils.image_utils import (
create_thumbnail,
is_image_file,
is_video_file,
extract_video_thumbnail,
)
media_root = Path(settings.MEDIA_ROOT)
tmp_dir = media_root / "tmp"
tmp_dir.mkdir(parents=True, exist_ok=True)
safe_name = get_valid_filename(os.path.basename(file.name))
rel_path = Path("tmp") / f"{secrets.token_urlsafe(4)}-{safe_name}"
# Save the file using Django storage (safe on Windows)
# If the file is an InMemoryFile or TemporaryUploadedFile, read its content
if hasattr(file, 'read'):
default_storage.save(str(rel_path), ContentFile(file.read()))
else:
default_storage.save(str(rel_path), file)
full_path = str(media_root / rel_path)
result = {
"file": f"/{rel_path.as_posix()}",
"url": absolute_url(
self.context["request"],
f"{settings.MEDIA_URL.rstrip('/')}/{rel_path.as_posix()}",
),
"name": safe_name,
"size": sizeof_fmt(file.size),
"mime_type": guess_file_type(rel_path.name),
}
# Generate thumbnail if image
if is_image_file(full_path):
try:
thumb_path = create_thumbnail(full_path, size=(200, 200), quality=60)
thumb_rel = thumb_path.replace(str(media_root), "").lstrip("\\/")
result["thumbnail_url"] = absolute_url(
self.context["request"],
f"{settings.MEDIA_URL.rstrip('/')}/{thumb_rel.replace(os.sep, '/')}",
)
except Exception as e:
print("Failed to generate image thumbnail:", e)
result["thumbnail_url"] = None
# Generate thumbnail if video
elif is_video_file(full_path):
try:
thumb_path = extract_video_thumbnail(
full_path,
time_offset="00:00:01",
size=(200, 200),
quality=60,
)
thumb_rel = thumb_path.replace(str(media_root), "").lstrip("\\/")
result["thumbnail_url"] = absolute_url(
self.context["request"],
f"{settings.MEDIA_URL.rstrip('/')}/{thumb_rel.replace(os.sep, '/')}",
)
except Exception as e:
print("Failed to generate video thumbnail:", e)
result["thumbnail_url"] = None
else:
result["thumbnail_url"] = None
return result
def validate(self, attrs):
file_details = self.store_file(attrs['file'])
return file_details
class UploadChatMedia(GenericAPIView):
"""
Upload files permanently to /media/chat/
Files are stored permanently and will NOT be removed
"""
parser_classes = (FormParser, MultiPartParser)
serializer_class = UploadChatMediaSerializer
def post(self, request: HttpRequest, *args, **kwargs):
serializer = UploadChatMediaSerializer(data=request.FILES, context={'request': request})
is_valid = serializer.is_valid(raise_exception=True)
if not is_valid:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
return Response(serializer.data)
class UploadTmpMedia(GenericAPIView):
"""
Upload files temporarily to /static/tmp/
Files will be removed every 1 hour
"""
parser_classes = (FormParser, MultiPartParser)
serializer_class = UploadTmpSerializer
def post(self, request: HttpRequest, *args, **kwargs):
serializer = UploadTmpSerializer(data=request.FILES, context={'request': request})
is_valid = serializer.is_valid(raise_exception=True)
if not is_valid:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
return Response(serializer.data)
# Configure filer admin after Django is fully loaded
def configure_filer_admin():
try:
from filer.admin.fileadmin import FileAdmin
from filer.apps import FilerConfig
FileAdmin.readonly_fields += ('owner',)
FilerConfig.icon = 'icon-folder'
except ImportError:
pass
# This will be executed when this module is imported after Django is fully loaded