You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
236 lines
6.8 KiB
236 lines
6.8 KiB
import os
|
|
import secrets
|
|
import shutil
|
|
import mimetypes
|
|
import re
|
|
from urllib.parse import urlparse
|
|
|
|
from django.conf import settings
|
|
from django.core.files import File
|
|
from django.http import HttpRequest
|
|
from django.core.mail import send_mail
|
|
|
|
from rest_framework import serializers, status
|
|
from rest_framework.generics import GenericAPIView
|
|
from rest_framework.parsers import MultiPartParser, FormParser
|
|
from rest_framework.response import Response
|
|
from unidecode import unidecode
|
|
from django.utils.text import slugify
|
|
import random
|
|
import string
|
|
|
|
|
|
|
|
|
|
|
|
def send_email(recipient, code):
|
|
send_mail(
|
|
'Test Email',
|
|
f'This is a test email {code} from Django using Gmail SMTP.',
|
|
'aliabdolahi.171@gmail.com',
|
|
recipient,
|
|
fail_silently=False,
|
|
)
|
|
return True
|
|
|
|
|
|
def is_valid_email(email):
|
|
# تعریف الگوی regex برای یک ایمیل معتبر
|
|
email_regex = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
|
|
|
|
# بررسی اینکه آیا ایمیل با regex مطابقت دارد یا خیر
|
|
if re.match(email_regex, email):
|
|
return True
|
|
return False
|
|
|
|
|
|
def generate_slug_for_model(model, value: str, recycled_count: int = 0):
|
|
from slugify import slugify
|
|
try:
|
|
|
|
base_slug = slugify(unidecode(value))
|
|
slug = base_slug
|
|
if recycled_count > 0:
|
|
slug = f"{base_slug}-{recycled_count}"
|
|
|
|
if model.objects.filter(slug=slug).exists():
|
|
return generate_slug_for_model(model, value, recycled_count + 1)
|
|
|
|
return slug[:50]
|
|
except Exception as exp:
|
|
letters = string.ascii_lowercase
|
|
result_str = ''.join(random.choice(letters) for i in range(8))
|
|
return result_str
|
|
|
|
|
|
def generate_slugen_for_model(model, value_en, value_pk):
|
|
try:
|
|
unique_slug = value_en
|
|
if not value_pk or not value_en:
|
|
base_slug = slugify(unidecode(value_en))
|
|
unique_slug = base_slug
|
|
num = 1
|
|
while model.objects.filter(slug=unique_slug).exists():
|
|
unique_slug = f"{base_slug}-{num}"
|
|
num += 1
|
|
|
|
return unique_slug
|
|
except Exception as exp:
|
|
letters = string.ascii_lowercase
|
|
result_str = ''.join(random.choice(letters) for i in range(8))
|
|
return result_str
|
|
|
|
|
|
def exclude_host_from_url(url):
|
|
# Parse the URL
|
|
parsed_url = urlparse(url)
|
|
|
|
# Extract the path and query parameters
|
|
path_with_query = parsed_url.path + parsed_url.query
|
|
|
|
return path_with_query
|
|
|
|
|
|
def generate_slug_for_model(model, value: str, recycled_count: int = 0):
|
|
from slugify import slugify
|
|
|
|
slug = slugify(value)
|
|
if model.objects.filter(slug=slug).exists():
|
|
recycled_count += 1
|
|
if value.endswith(f'-{recycled_count - 1}'):
|
|
value = value.replace(f'-{recycled_count - 1}', f'-{recycled_count}')
|
|
else:
|
|
value = f"{value}-{recycled_count}"
|
|
return generate_slug_for_model(model, value, recycled_count)
|
|
|
|
return slug[:50]
|
|
|
|
def absolute_url(req, url):
|
|
"""
|
|
can either be a file instance or a URL string
|
|
"""
|
|
try:
|
|
return req.build_absolute_uri(url.url if hasattr(url, 'url') else url)
|
|
except Exception:
|
|
return None
|
|
|
|
def sizeof_fmt(num, suffix="B"):
|
|
for unit in ["", "K", "M", "G"]:
|
|
if abs(num) < 1024.0:
|
|
return f"{num:3.1f} {unit}{suffix}"
|
|
num /= 1024.0
|
|
return f"{num:.1f} Yi{suffix}"
|
|
|
|
|
|
def file_location(path):
|
|
from django.conf import settings
|
|
import os
|
|
|
|
if path.startswith('http'):
|
|
path = exclude_host_from_url(path)
|
|
|
|
if path.startswith("/static"):
|
|
path = path[7:]
|
|
|
|
if path.startswith('/'):
|
|
path = path[1:]
|
|
|
|
return os.path.join(settings.STATIC_ROOT, path)
|
|
|
|
|
|
def guess_file_type(filename):
|
|
try:
|
|
mimetype = mimetypes.guess_type(filename)[0].split('/')[0]
|
|
return mimetype
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
class FileFieldSerializer(serializers.CharField):
|
|
"""
|
|
a field to handle uploaded file
|
|
"""
|
|
|
|
def get_rpath(self, p):
|
|
# extract relative path of doc
|
|
return p[p.find('/static/') + 7:]
|
|
|
|
def to_representation(self, value):
|
|
request = self.context.get('request', None)
|
|
if value:
|
|
if isinstance(value, str):
|
|
# If value is a string, assume it's a file path
|
|
return value
|
|
elif hasattr(value, 'url'):
|
|
if 'http://' in str(value) or 'https://' in str(value):
|
|
return str(value)
|
|
|
|
return absolute_url(request, value.url) if request else value.url
|
|
return None
|
|
|
|
def to_internal_value(self, data):
|
|
if not data:
|
|
return None
|
|
|
|
if "/tmp/" not in data:
|
|
# value not changed and here we simply return old file path
|
|
return self.get_rpath(data)
|
|
|
|
if data.startswith('http'):
|
|
data = self.get_rpath(data)
|
|
|
|
fpath = file_location(data)
|
|
if not os.path.exists(fpath):
|
|
raise serializers.ValidationError(f"File: '{fpath}' Does not exist")
|
|
|
|
return File(open(fpath, 'rb'), os.path.basename(data))
|
|
|
|
|
|
class UploadTmpSerializer(serializers.Serializer):
|
|
file = serializers.FileField()
|
|
url = serializers.URLField(read_only=True)
|
|
name = serializers.CharField(read_only=True)
|
|
size = serializers.CharField(read_only=True)
|
|
mime_type = serializers.CharField(read_only=True)
|
|
|
|
def to_representation(self, instance):
|
|
data = super(UploadTmpSerializer, self).to_representation(instance)
|
|
data['file'] = instance['file']
|
|
return data
|
|
|
|
def store_file(self, file):
|
|
from django.conf import settings
|
|
static_path = settings.STATIC_ROOT
|
|
|
|
os.makedirs(f'{static_path}/tmp', exist_ok=True)
|
|
fpath = f"/tmp/{secrets.token_urlsafe(4)}-{file.name}"
|
|
shutil.move(file.temporary_file_path(), static_path + fpath)
|
|
os.chmod(static_path + fpath, 0o644)
|
|
|
|
return {
|
|
'file': fpath,
|
|
'url': absolute_url(self.context['request'], f"/static{fpath}"),
|
|
'name': file.name,
|
|
'size': sizeof_fmt(file.size),
|
|
'mime_type': guess_file_type(fpath)
|
|
}
|
|
|
|
def validate(self, attrs):
|
|
file_details = self.store_file(attrs['file'])
|
|
return file_details
|
|
|
|
|
|
class UploadTmpMedia(GenericAPIView):
|
|
"""
|
|
Files will remove every 1 hour
|
|
"""
|
|
parser_classes = (FormParser, MultiPartParser)
|
|
serializer_class = UploadTmpSerializer
|
|
|
|
def post(self, request: HttpRequest, *args, **kwargs):
|
|
serializer = UploadTmpSerializer(data=request.FILES, context={'request': request})
|
|
is_valid = serializer.is_valid(raise_exception=True)
|
|
if not is_valid:
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
return Response(serializer.data)
|