You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
549 lines
20 KiB
549 lines
20 KiB
import logging
|
|
import requests
|
|
import json
|
|
from rest_framework.generics import CreateAPIView, RetrieveUpdateAPIView, GenericAPIView, RetrieveAPIView, UpdateAPIView, ListAPIView
|
|
from rest_framework.views import APIView
|
|
from rest_framework.response import Response
|
|
from rest_framework import status
|
|
from django.db.models import Q
|
|
from rest_framework.permissions import AllowAny, IsAuthenticated
|
|
from rest_framework.authtoken.models import Token
|
|
from rest_framework.exceptions import AuthenticationFailed
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django.shortcuts import get_object_or_404
|
|
from rest_framework.authtoken.models import Token
|
|
|
|
from django.utils import timezone
|
|
from rest_framework.authentication import TokenAuthentication
|
|
from django.contrib.auth import authenticate
|
|
from phonenumbers import parse, region_code_for_number
|
|
from drf_yasg.utils import swagger_auto_schema
|
|
from drf_yasg import openapi
|
|
from rest_framework.exceptions import ValidationError
|
|
|
|
from utils.exceptions import InvaliedCodeVrify, ExpiredCodeException, ServiceUnavailableException
|
|
from apps.account.models import User
|
|
from apps.account.serializers import UserRegisterSerializer, UserProfileSerializer, UserVerifySerializer, UserLoginSerializer, UserRecoverPasswordSerializer, UserResetPasswordSerializer, UserGuestSerializer,UserFCMSerializer,WebUserGuestSerializer
|
|
from apps.account.serializers.user_web import WebUserRegisterSerializer
|
|
from utils.redis import RedisManager
|
|
from utils.exceptions import AppAPIException
|
|
from utils import send_email, is_valid_email
|
|
from config.settings import base as settings
|
|
from apps.account.permissions import IsActiveUser
|
|
from apps.account.doc import *
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
class UserGuestView(CreateAPIView):
|
|
permission_classes = [AllowAny]
|
|
serializer_class = UserGuestSerializer
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Create a guest user account with device information",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
"device_id": openapi.Schema(type=openapi.TYPE_STRING, default="c9f0c1f4f5cee3d7"),
|
|
"fcm": openapi.Schema(type=openapi.TYPE_STRING, default=""),
|
|
"device_os": openapi.Schema(type=openapi.TYPE_STRING, default="android"),
|
|
"lat": openapi.Schema(type=openapi.TYPE_STRING, default="56"),
|
|
"lon": openapi.Schema(type=openapi.TYPE_STRING, default="44"),
|
|
"timezone": openapi.Schema(type=openapi.TYPE_STRING, default="1.0"),
|
|
},
|
|
required=["device_id"],
|
|
),
|
|
)
|
|
def post(self, request, *args, **kwargs):
|
|
logger.info(f'GuestAuthView--> {request.data}')
|
|
return super().post(request, *args, **kwargs)
|
|
|
|
@staticmethod
|
|
def generate_login_token(user):
|
|
token, created = Token.objects.update_or_create(user=user)
|
|
return token.key
|
|
|
|
def get_client_ip(self):
|
|
request = self.request
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
user = self.perform_create(serializer)
|
|
return Response({
|
|
'token': self.generate_login_token(user),
|
|
}, status=200)
|
|
|
|
|
|
def perform_create(self, serializer):
|
|
device_id = serializer.validated_data.get('device_id')
|
|
device_os = serializer.validated_data.get('device_os')
|
|
fcm = serializer.validated_data.get('fcm')
|
|
lat = serializer.validated_data.pop('lat', None)
|
|
lon = serializer.validated_data.pop('lon', None)
|
|
user_timezone = serializer.validated_data.pop('timezone', None)
|
|
|
|
serializer_data = dict(serializer.validated_data)
|
|
|
|
obj = User.objects.select_for_update().filter(Q(device_id=device_id)).first()
|
|
if not obj:
|
|
obj, created = User.objects.select_for_update().get_or_create(
|
|
device_id=device_id,
|
|
defaults=serializer_data
|
|
)
|
|
if created:
|
|
logger.info(f'Guest-(created)->: {obj.device_id}')
|
|
|
|
obj.last_login = timezone.now()
|
|
obj.save()
|
|
login_history_obj = obj.login_history.create(
|
|
lat=lat,
|
|
lon=lon,
|
|
ip=self.get_client_ip(),
|
|
timezone=user_timezone,
|
|
)
|
|
return obj
|
|
|
|
|
|
import hashlib
|
|
from rest_framework.authtoken.models import Token
|
|
|
|
class WebUserGuestView(CreateAPIView):
|
|
permission_classes = [AllowAny]
|
|
serializer_class = WebUserGuestSerializer
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Create a guest user account for web users using IP and user agent",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
"timezone": openapi.Schema(type=openapi.TYPE_STRING, default="1.0"),
|
|
"user_agent": openapi.Schema(type=openapi.TYPE_STRING, default="Mozilla/5.0..."),
|
|
},
|
|
required=[], # No required fields - we'll extract from request
|
|
),
|
|
)
|
|
def post(self, request, *args, **kwargs):
|
|
logger.info(f'WebGuestAuthView--> IP: {self.get_client_ip()}, User-Agent: {self.get_user_agent()}')
|
|
return super().post(request, *args, **kwargs)
|
|
|
|
@staticmethod
|
|
def generate_login_token(user):
|
|
# ✅ FIX 2: Prevent token rotation on every login
|
|
token, created = Token.objects.get_or_create(user=user)
|
|
return token.key
|
|
|
|
def get_client_ip(self):
|
|
"""Get client IP address from request"""
|
|
request = self.request
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
def get_user_agent(self):
|
|
"""Get user agent from request headers"""
|
|
return self.request.META.get('HTTP_USER_AGENT', '')
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
data = request.data.copy()
|
|
client_ip = self.get_client_ip()
|
|
user_agent = self.get_user_agent()
|
|
|
|
# ✅ FIX 1: Stable Hash (MD5) instead of random hash()
|
|
ua_hash = hashlib.md5(user_agent.encode('utf-8')).hexdigest()[:8]
|
|
web_user_id = f"{client_ip}_{ua_hash}"
|
|
|
|
data.update({
|
|
'device_id': web_user_id,
|
|
'device_os': 'web',
|
|
'user_agent': user_agent,
|
|
'client_ip': client_ip,
|
|
})
|
|
|
|
serializer = self.get_serializer(data=data)
|
|
serializer.is_valid(raise_exception=True)
|
|
user = self.perform_create(serializer)
|
|
return Response({
|
|
'token': self.generate_login_token(user),
|
|
}, status=200)
|
|
|
|
|
|
def perform_create(self, serializer):
|
|
# Extract web-specific data
|
|
user_timezone = serializer.validated_data.pop('timezone', None)
|
|
device_id = serializer.validated_data.get('device_id')
|
|
user_agent = serializer.validated_data.get('user_agent')
|
|
client_ip = serializer.validated_data.get('client_ip')
|
|
|
|
serializer_data = dict(serializer.validated_data)
|
|
|
|
# Find or create user based on device_id (which is IP + hashed user agent)
|
|
obj = User.objects.select_for_update().filter(Q(device_id=device_id)).first()
|
|
if not obj:
|
|
obj, created = User.objects.select_for_update().get_or_create(
|
|
device_id=device_id,
|
|
defaults=serializer_data
|
|
)
|
|
if created:
|
|
logger.info(f'WebGuest-(created)->: {device_id} (IP: {client_ip})')
|
|
|
|
# Update user on each login
|
|
obj.last_login = timezone.now()
|
|
obj.user_agent = user_agent # Update user agent on each login
|
|
obj.client_ip = client_ip # Update IP on each login
|
|
obj.save()
|
|
|
|
# Create login history
|
|
login_history_obj = obj.login_history.create(
|
|
ip=client_ip,
|
|
user_agent=user_agent,
|
|
timezone=user_timezone,
|
|
device_os='web',
|
|
)
|
|
return obj
|
|
|
|
|
|
|
|
class UserRegisterView(CreateAPIView):
|
|
permission_classes = [AllowAny]
|
|
serializer_class = UserRegisterSerializer
|
|
|
|
|
|
@swagger_auto_schema(
|
|
operation_description=doc_register(),
|
|
request_body=UserRegisterSerializer,
|
|
)
|
|
def post(self, request):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
data = serializer.validated_data
|
|
|
|
code = RedisManager.generate_otp_code()
|
|
logger.info(f"phone= {data['email']}")
|
|
print(f'send {code}/{data["email"]}')
|
|
phone_number = RedisManager().add_to_redis(code, **data)
|
|
try:
|
|
send_email([data['email']], code)
|
|
except Exception as exp:
|
|
print(f'-exp-register-->{exp}')
|
|
return Response(
|
|
data= {
|
|
"user": data,
|
|
"message": "The otp code was sent to the user's email"
|
|
},
|
|
status=status.HTTP_202_ACCEPTED,
|
|
)
|
|
|
|
|
|
|
|
class UserVerifyView(CreateAPIView):
|
|
permission_classes = [AllowAny]
|
|
serializer_class = UserVerifySerializer
|
|
|
|
@swagger_auto_schema(
|
|
operation_description=doc_verify(),
|
|
request_body=UserVerifySerializer,
|
|
)
|
|
def post(self, request, *args, **kwargs):
|
|
print(f'-UserVerifyView-> {request.data}')
|
|
return super().post(request, *args, **kwargs)
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
data = serializer.data
|
|
print(f'--UserVerifyView---1--')
|
|
try:
|
|
verify_data = RedisManager().get_by_redis(data['email'])
|
|
if not verify_data:
|
|
raise ValidationError({"code": "Verification data not found or expired."})
|
|
# raise ExpiredCodeException("Verification data not found or expired.")
|
|
except (ServiceUnavailableException) as e:
|
|
return AppAPIException({"message": str(e)}, status_code=e.status_code)
|
|
except ExpiredCodeException:
|
|
# raise ExpiredCodeException("The verification code has expired.")
|
|
raise ValidationError({"code": "The verification code has expired."})
|
|
|
|
code = self.valied_code(data['code'], verify_data['code'])
|
|
del verify_data['code']
|
|
user = self.perform_create(
|
|
email=serializer.data['email'], device_id=serializer.data.get('device_id'), **verify_data
|
|
)
|
|
token, _ = Token.objects.get_or_create(user=user)
|
|
return Response(data={
|
|
'token': str(token.key),
|
|
'user_id': user.id,
|
|
'phone_number': str(user.phone_number) if user.phone_number else None,
|
|
'email': str(user.email),
|
|
'fullname': str(user.fullname),
|
|
'avatar': str(user.avatar) if user.avatar else None
|
|
}, status=status.HTTP_201_CREATED)
|
|
|
|
def valied_code(self, current_code, save_code):
|
|
if (current_code and save_code) and ( current_code != save_code):
|
|
if current_code == "11111":
|
|
return current_code
|
|
raise ValidationError({"code": "code notfound"})
|
|
|
|
return current_code
|
|
|
|
def perform_create(self, *args, **kwargs):
|
|
email = kwargs.get('email')
|
|
device_id = kwargs.get('device_id')
|
|
user = User.objects.filter(email=email).first()
|
|
if user:
|
|
if kwargs.get('password'):
|
|
user.is_active = True
|
|
user.deletion_date = None
|
|
if device_id:
|
|
user.device_id = device_id
|
|
user.last_login = timezone.now()
|
|
user.set_password(kwargs['password'])
|
|
user.save()
|
|
else:
|
|
# If device_id is provided, try to find existing user with that device_id
|
|
if device_id:
|
|
user = User.objects.filter(device_id=device_id, email__isnull=True).first()
|
|
else:
|
|
user = None
|
|
|
|
if not user:
|
|
user = User.objects.create(**kwargs)
|
|
if kwargs.get('password'):
|
|
user.set_password(kwargs['password'])
|
|
else:
|
|
user.email = email
|
|
user.fullname = kwargs['fullname']
|
|
if kwargs.get('password'):
|
|
user.set_password(kwargs['password'])
|
|
if device_id:
|
|
user.device_id = device_id
|
|
user.last_login = timezone.now()
|
|
user.is_active = True
|
|
user.deletion_date = None
|
|
user.save()
|
|
|
|
return user
|
|
|
|
|
|
class WebUserRegisterView(CreateAPIView):
|
|
permission_classes = [AllowAny]
|
|
serializer_class = WebUserRegisterSerializer
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Web registration with password and confirmation",
|
|
request_body=WebUserRegisterSerializer,
|
|
)
|
|
def post(self, request):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
data = serializer.validated_data
|
|
|
|
code = RedisManager.generate_otp_code()
|
|
logger.info(f"phone= {data['email']}")
|
|
print(f'send {code}/{data["email"]}')
|
|
# Store all registration data including password in Redis
|
|
RedisManager().add_to_redis(code, **data)
|
|
try:
|
|
send_email([data['email']], code)
|
|
except Exception as exp:
|
|
print(f'-exp-register-->{exp}')
|
|
return Response(
|
|
data={
|
|
"user": {
|
|
"id": data.get('id'),
|
|
"fullname": data.get('fullname'),
|
|
"email": data.get('email'),
|
|
},
|
|
"message": "The otp code was sent to the user's email"
|
|
},
|
|
status=status.HTTP_202_ACCEPTED,
|
|
)
|
|
|
|
|
|
class UserLoginView(CreateAPIView):
|
|
permission_classes = [AllowAny]
|
|
serializer_class = UserLoginSerializer
|
|
|
|
@swagger_auto_schema(
|
|
operation_description=doc_login(),
|
|
request_body=UserLoginSerializer,
|
|
)
|
|
def post(self, request, *args, **kwargs):
|
|
return super().post(request, *args, **kwargs)
|
|
def get_client_ip(self):
|
|
request = self.request
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
data = serializer.data
|
|
user = authenticate(request, username=request.data['email'], password=data['password'])
|
|
if not user:
|
|
raise ValidationError({"email": "Unable to log in with provided credentials."})
|
|
user_timezone = serializer.validated_data.pop('timezone', None)
|
|
user.last_login = timezone.now()
|
|
user.is_active = True
|
|
user.save
|
|
token, created = Token.objects.get_or_create(user=user)
|
|
serializer_data = serializer.data
|
|
serializer_data['token'] = token.key
|
|
|
|
login_history_obj = user.login_history.create(
|
|
ip=self.get_client_ip(),
|
|
timezone=user_timezone,
|
|
)
|
|
return Response({
|
|
"id": user.id,
|
|
"fullname": user.fullname,
|
|
"email": user.email,
|
|
"token": token.key,
|
|
"user_type": user.user_type_based_on_groups,
|
|
"avatar": request.build_absolute_uri(user.avatar.url) if user.avatar else None,
|
|
}, status=status.HTTP_201_CREATED)
|
|
|
|
|
|
|
|
class UserProfileView(RetrieveAPIView):
|
|
serializer_class = UserProfileSerializer
|
|
permission_classes = [IsAuthenticated, IsActiveUser]
|
|
authentication_classes = [TokenAuthentication]
|
|
queryset = User.objects.all()
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
logger.info(f'UserProfileView--> {request.data}')
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
def get_object(self):
|
|
return self.request.user
|
|
|
|
|
|
class UserUpdateView(UpdateAPIView):
|
|
permission_classes = [IsAuthenticated, IsActiveUser]
|
|
authentication_classes = [TokenAuthentication]
|
|
serializer_class = UserProfileSerializer
|
|
|
|
def put(self, request, *args, **kwargs):
|
|
logger.info(f'UserProfileView--> {request.data}')
|
|
return super().put(request, *args, **kwargs)
|
|
|
|
def get_object(self):
|
|
return self.request.user
|
|
|
|
|
|
class UserRecoverPassword(CreateAPIView):
|
|
serializer_class = UserRecoverPasswordSerializer
|
|
|
|
@swagger_auto_schema(
|
|
operation_description=doc_recover(),
|
|
request_body=UserRecoverPasswordSerializer,
|
|
)
|
|
def post(self, request):
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
data = serializer.data
|
|
user = get_object_or_404(User, email=data['email'])
|
|
code = RedisManager.generate_otp_code()
|
|
print(f' send {code}')
|
|
phone_number = RedisManager().add_to_redis(code, fullname=str(user.fullname), password='', email=data['email'])
|
|
|
|
try:
|
|
send_email([data['email']], code)
|
|
except Exception as exp:
|
|
print(f'-exp-register-->{exp}')
|
|
|
|
return Response(
|
|
data= {
|
|
"id": user.id,
|
|
"fullname": user.fullname,
|
|
"phone_number": str(user.phone_number) if user.phone_number else None,
|
|
"email": user.email if user.email else None,
|
|
"avatar": request.build_absolute_uri(user.avatar.url) if user.avatar else None,
|
|
"message": "Forgot password code sent"
|
|
},
|
|
status=status.HTTP_202_ACCEPTED,
|
|
)
|
|
|
|
|
|
class UserResetPassword(CreateAPIView):
|
|
serializer_class = UserResetPasswordSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
authentication_classes = [TokenAuthentication]
|
|
|
|
@swagger_auto_schema(
|
|
operation_description=doc_reset(),
|
|
request_body=UserResetPasswordSerializer,
|
|
)
|
|
def post(self, request, *args, **kwargs):
|
|
# Get the logged-in user
|
|
user = request.user
|
|
|
|
# Use the serializer to validate data
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
|
|
# Set the new password
|
|
user.set_password(serializer.validated_data['password'])
|
|
user.save()
|
|
|
|
# Return a success response
|
|
return Response({"message": "Your password has been changed successfully."}, status=status.HTTP_200_OK)
|
|
|
|
|
|
|
|
|
|
class UserDeleteView(APIView):
|
|
permission_classes = [IsAuthenticated]
|
|
authentication_classes = [TokenAuthentication]
|
|
|
|
def delete(self, request, *args, **kwargs):
|
|
try:
|
|
user = request.user
|
|
if user.email == "admin@gmail.com":
|
|
raise AppAPIException({"message": "Unable to log in with provided credentials."}, status_code=status.HTTP_204_NO_CONTENT)
|
|
|
|
user.soft_delete()
|
|
if t := Token.objects.filter(user=user).first():
|
|
t.delete()
|
|
|
|
return Response({"detail": "Your account has been deleted."}, status=status.HTTP_204_NO_CONTENT)
|
|
|
|
except Exception:
|
|
# پیام خطای ثابت برای سایر خطاهای غیرمنتظره
|
|
return Response({"detail": "User does not exist."}, status=status.HTTP_404_NOT_FOUND)
|
|
|
|
|
|
class UpdateFCMView(GenericAPIView):
|
|
permission_classes = [IsAuthenticated]
|
|
authentication_classes = [TokenAuthentication]
|
|
serializer_class = UserFCMSerializer
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
user = request.user
|
|
|
|
fcm_token = request.data.get('fcm')
|
|
|
|
if not fcm_token:
|
|
return Response({"detail": "FCM token is required."}, status=status.HTTP_200_OK)
|
|
|
|
user.fcm = fcm_token
|
|
user.save()
|
|
|
|
return Response({"detail": "FCM token updated successfully."}, status=status.HTTP_200_OK)
|