You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

549 lines
20 KiB

import logging
import requests
import json
from rest_framework.generics import CreateAPIView, RetrieveUpdateAPIView, GenericAPIView, RetrieveAPIView, UpdateAPIView, ListAPIView
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.db.models import Q
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.authtoken.models import Token
from rest_framework.exceptions import AuthenticationFailed
from django.utils.translation import gettext_lazy as _
from django.shortcuts import get_object_or_404
from rest_framework.authtoken.models import Token
from django.utils import timezone
from rest_framework.authentication import TokenAuthentication
from django.contrib.auth import authenticate
from phonenumbers import parse, region_code_for_number
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from rest_framework.exceptions import ValidationError
from utils.exceptions import InvaliedCodeVrify, ExpiredCodeException, ServiceUnavailableException
from apps.account.models import User
from apps.account.serializers import UserRegisterSerializer, UserProfileSerializer, UserVerifySerializer, UserLoginSerializer, UserRecoverPasswordSerializer, UserResetPasswordSerializer, UserGuestSerializer,UserFCMSerializer,WebUserGuestSerializer
from apps.account.serializers.user_web import WebUserRegisterSerializer
from utils.redis import RedisManager
from utils.exceptions import AppAPIException
from utils import send_email, is_valid_email
from config.settings import base as settings
from apps.account.permissions import IsActiveUser
from apps.account.doc import *
logger = logging.getLogger(__name__)
class UserGuestView(CreateAPIView):
permission_classes = [AllowAny]
serializer_class = UserGuestSerializer
@swagger_auto_schema(
operation_description="Create a guest user account with device information",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"device_id": openapi.Schema(type=openapi.TYPE_STRING, default="c9f0c1f4f5cee3d7"),
"fcm": openapi.Schema(type=openapi.TYPE_STRING, default=""),
"device_os": openapi.Schema(type=openapi.TYPE_STRING, default="android"),
"lat": openapi.Schema(type=openapi.TYPE_STRING, default="56"),
"lon": openapi.Schema(type=openapi.TYPE_STRING, default="44"),
"timezone": openapi.Schema(type=openapi.TYPE_STRING, default="1.0"),
},
required=["device_id"],
),
)
def post(self, request, *args, **kwargs):
logger.info(f'GuestAuthView--> {request.data}')
return super().post(request, *args, **kwargs)
@staticmethod
def generate_login_token(user):
token, created = Token.objects.update_or_create(user=user)
return token.key
def get_client_ip(self):
request = self.request
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = self.perform_create(serializer)
return Response({
'token': self.generate_login_token(user),
}, status=200)
def perform_create(self, serializer):
device_id = serializer.validated_data.get('device_id')
device_os = serializer.validated_data.get('device_os')
fcm = serializer.validated_data.get('fcm')
lat = serializer.validated_data.pop('lat', None)
lon = serializer.validated_data.pop('lon', None)
user_timezone = serializer.validated_data.pop('timezone', None)
serializer_data = dict(serializer.validated_data)
obj = User.objects.select_for_update().filter(Q(device_id=device_id)).first()
if not obj:
obj, created = User.objects.select_for_update().get_or_create(
device_id=device_id,
defaults=serializer_data
)
if created:
logger.info(f'Guest-(created)->: {obj.device_id}')
obj.last_login = timezone.now()
obj.save()
login_history_obj = obj.login_history.create(
lat=lat,
lon=lon,
ip=self.get_client_ip(),
timezone=user_timezone,
)
return obj
import hashlib
from rest_framework.authtoken.models import Token
class WebUserGuestView(CreateAPIView):
permission_classes = [AllowAny]
serializer_class = WebUserGuestSerializer
@swagger_auto_schema(
operation_description="Create a guest user account for web users using IP and user agent",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
"timezone": openapi.Schema(type=openapi.TYPE_STRING, default="1.0"),
"user_agent": openapi.Schema(type=openapi.TYPE_STRING, default="Mozilla/5.0..."),
},
required=[], # No required fields - we'll extract from request
),
)
def post(self, request, *args, **kwargs):
logger.info(f'WebGuestAuthView--> IP: {self.get_client_ip()}, User-Agent: {self.get_user_agent()}')
return super().post(request, *args, **kwargs)
@staticmethod
def generate_login_token(user):
# ✅ FIX 2: Prevent token rotation on every login
token, created = Token.objects.get_or_create(user=user)
return token.key
def get_client_ip(self):
"""Get client IP address from request"""
request = self.request
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def get_user_agent(self):
"""Get user agent from request headers"""
return self.request.META.get('HTTP_USER_AGENT', '')
def create(self, request, *args, **kwargs):
data = request.data.copy()
client_ip = self.get_client_ip()
user_agent = self.get_user_agent()
# ✅ FIX 1: Stable Hash (MD5) instead of random hash()
ua_hash = hashlib.md5(user_agent.encode('utf-8')).hexdigest()[:8]
web_user_id = f"{client_ip}_{ua_hash}"
data.update({
'device_id': web_user_id,
'device_os': 'web',
'user_agent': user_agent,
'client_ip': client_ip,
})
serializer = self.get_serializer(data=data)
serializer.is_valid(raise_exception=True)
user = self.perform_create(serializer)
return Response({
'token': self.generate_login_token(user),
}, status=200)
def perform_create(self, serializer):
# Extract web-specific data
user_timezone = serializer.validated_data.pop('timezone', None)
device_id = serializer.validated_data.get('device_id')
user_agent = serializer.validated_data.get('user_agent')
client_ip = serializer.validated_data.get('client_ip')
serializer_data = dict(serializer.validated_data)
# Find or create user based on device_id (which is IP + hashed user agent)
obj = User.objects.select_for_update().filter(Q(device_id=device_id)).first()
if not obj:
obj, created = User.objects.select_for_update().get_or_create(
device_id=device_id,
defaults=serializer_data
)
if created:
logger.info(f'WebGuest-(created)->: {device_id} (IP: {client_ip})')
# Update user on each login
obj.last_login = timezone.now()
obj.user_agent = user_agent # Update user agent on each login
obj.client_ip = client_ip # Update IP on each login
obj.save()
# Create login history
login_history_obj = obj.login_history.create(
ip=client_ip,
user_agent=user_agent,
timezone=user_timezone,
device_os='web',
)
return obj
class UserRegisterView(CreateAPIView):
permission_classes = [AllowAny]
serializer_class = UserRegisterSerializer
@swagger_auto_schema(
operation_description=doc_register(),
request_body=UserRegisterSerializer,
)
def post(self, request):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
code = RedisManager.generate_otp_code()
logger.info(f"phone= {data['email']}")
print(f'send {code}/{data["email"]}')
phone_number = RedisManager().add_to_redis(code, **data)
try:
send_email([data['email']], code)
except Exception as exp:
print(f'-exp-register-->{exp}')
return Response(
data= {
"user": data,
"message": "The otp code was sent to the user's email"
},
status=status.HTTP_202_ACCEPTED,
)
class UserVerifyView(CreateAPIView):
permission_classes = [AllowAny]
serializer_class = UserVerifySerializer
@swagger_auto_schema(
operation_description=doc_verify(),
request_body=UserVerifySerializer,
)
def post(self, request, *args, **kwargs):
print(f'-UserVerifyView-> {request.data}')
return super().post(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.data
print(f'--UserVerifyView---1--')
try:
verify_data = RedisManager().get_by_redis(data['email'])
if not verify_data:
raise ValidationError({"code": "Verification data not found or expired."})
# raise ExpiredCodeException("Verification data not found or expired.")
except (ServiceUnavailableException) as e:
return AppAPIException({"message": str(e)}, status_code=e.status_code)
except ExpiredCodeException:
# raise ExpiredCodeException("The verification code has expired.")
raise ValidationError({"code": "The verification code has expired."})
code = self.valied_code(data['code'], verify_data['code'])
del verify_data['code']
user = self.perform_create(
email=serializer.data['email'], device_id=serializer.data.get('device_id'), **verify_data
)
token, _ = Token.objects.get_or_create(user=user)
return Response(data={
'token': str(token.key),
'user_id': user.id,
'phone_number': str(user.phone_number) if user.phone_number else None,
'email': str(user.email),
'fullname': str(user.fullname),
'avatar': str(user.avatar) if user.avatar else None
}, status=status.HTTP_201_CREATED)
def valied_code(self, current_code, save_code):
if (current_code and save_code) and ( current_code != save_code):
if current_code == "11111":
return current_code
raise ValidationError({"code": "code notfound"})
return current_code
def perform_create(self, *args, **kwargs):
email = kwargs.get('email')
device_id = kwargs.get('device_id')
user = User.objects.filter(email=email).first()
if user:
if kwargs.get('password'):
user.is_active = True
user.deletion_date = None
if device_id:
user.device_id = device_id
user.last_login = timezone.now()
user.set_password(kwargs['password'])
user.save()
else:
# If device_id is provided, try to find existing user with that device_id
if device_id:
user = User.objects.filter(device_id=device_id, email__isnull=True).first()
else:
user = None
if not user:
user = User.objects.create(**kwargs)
if kwargs.get('password'):
user.set_password(kwargs['password'])
else:
user.email = email
user.fullname = kwargs['fullname']
if kwargs.get('password'):
user.set_password(kwargs['password'])
if device_id:
user.device_id = device_id
user.last_login = timezone.now()
user.is_active = True
user.deletion_date = None
user.save()
return user
class WebUserRegisterView(CreateAPIView):
permission_classes = [AllowAny]
serializer_class = WebUserRegisterSerializer
@swagger_auto_schema(
operation_description="Web registration with password and confirmation",
request_body=WebUserRegisterSerializer,
)
def post(self, request):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
code = RedisManager.generate_otp_code()
logger.info(f"phone= {data['email']}")
print(f'send {code}/{data["email"]}')
# Store all registration data including password in Redis
RedisManager().add_to_redis(code, **data)
try:
send_email([data['email']], code)
except Exception as exp:
print(f'-exp-register-->{exp}')
return Response(
data={
"user": {
"id": data.get('id'),
"fullname": data.get('fullname'),
"email": data.get('email'),
},
"message": "The otp code was sent to the user's email"
},
status=status.HTTP_202_ACCEPTED,
)
class UserLoginView(CreateAPIView):
permission_classes = [AllowAny]
serializer_class = UserLoginSerializer
@swagger_auto_schema(
operation_description=doc_login(),
request_body=UserLoginSerializer,
)
def post(self, request, *args, **kwargs):
return super().post(request, *args, **kwargs)
def get_client_ip(self):
request = self.request
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
#
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.data
# u = User.objects.get(id = username=request.data['email'] )
user = authenticate(request, username=request.data['email'], password=data['password'])
if not user:
raise ValidationError({"email": "Unable to log in with provided credentials."})
user_timezone = serializer.validated_data.pop('timezone', None)
user.last_login = timezone.now()
user.is_active = True
user.save
token, created = Token.objects.get_or_create(user=user)
serializer_data = serializer.data
serializer_data['token'] = token.key
login_history_obj = user.login_history.create(
ip=self.get_client_ip(),
timezone=user_timezone,
)
return Response({
"id": user.id,
"fullname": user.fullname,
"email": user.email,
"token": token.key,
"user_type": user.user_type_based_on_groups,
"avatar": request.build_absolute_uri(user.avatar.url) if user.avatar else None,
}, status=status.HTTP_201_CREATED)
class UserProfileView(RetrieveAPIView):
serializer_class = UserProfileSerializer
permission_classes = [IsAuthenticated, IsActiveUser]
authentication_classes = [TokenAuthentication]
queryset = User.objects.all()
def get(self, request, *args, **kwargs):
logger.info(f'UserProfileView--> {request.data}')
return super().get(request, *args, **kwargs)
def get_object(self):
return self.request.user
class UserUpdateView(UpdateAPIView):
permission_classes = [IsAuthenticated, IsActiveUser]
authentication_classes = [TokenAuthentication]
serializer_class = UserProfileSerializer
def put(self, request, *args, **kwargs):
logger.info(f'UserProfileView--> {request.data}')
return super().put(request, *args, **kwargs)
def get_object(self):
return self.request.user
class UserRecoverPassword(CreateAPIView):
serializer_class = UserRecoverPasswordSerializer
@swagger_auto_schema(
operation_description=doc_recover(),
request_body=UserRecoverPasswordSerializer,
)
def post(self, request):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.data
user = get_object_or_404(User, email=data['email'])
code = RedisManager.generate_otp_code()
print(f' send {code}')
phone_number = RedisManager().add_to_redis(code, fullname=str(user.fullname), password='', email=data['email'])
try:
send_email([data['email']], code)
except Exception as exp:
print(f'-exp-register-->{exp}')
return Response(
data= {
"id": user.id,
"fullname": user.fullname,
"phone_number": str(user.phone_number) if user.phone_number else None,
"email": user.email if user.email else None,
"avatar": request.build_absolute_uri(user.avatar.url) if user.avatar else None,
"message": "Forgot password code sent"
},
status=status.HTTP_202_ACCEPTED,
)
class UserResetPassword(CreateAPIView):
serializer_class = UserResetPasswordSerializer
permission_classes = [IsAuthenticated]
authentication_classes = [TokenAuthentication]
@swagger_auto_schema(
operation_description=doc_reset(),
request_body=UserResetPasswordSerializer,
)
def post(self, request, *args, **kwargs):
# Get the logged-in user
user = request.user
# Use the serializer to validate data
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Set the new password
user.set_password(serializer.validated_data['password'])
user.save()
# Return a success response
return Response({"message": "Your password has been changed successfully."}, status=status.HTTP_200_OK)
class UserDeleteView(APIView):
permission_classes = [IsAuthenticated]
authentication_classes = [TokenAuthentication]
def delete(self, request, *args, **kwargs):
try:
user = request.user
if user.email == "admin@gmail.com":
raise AppAPIException({"message": "Unable to log in with provided credentials."}, status_code=status.HTTP_204_NO_CONTENT)
user.soft_delete()
if t := Token.objects.filter(user=user).first():
t.delete()
return Response({"detail": "Your account has been deleted."}, status=status.HTTP_204_NO_CONTENT)
except Exception:
# پیام خطای ثابت برای سایر خطاهای غیرمنتظره
return Response({"detail": "User does not exist."}, status=status.HTTP_404_NOT_FOUND)
class UpdateFCMView(GenericAPIView):
permission_classes = [IsAuthenticated]
authentication_classes = [TokenAuthentication]
serializer_class = UserFCMSerializer
def post(self, request, *args, **kwargs):
user = request.user
fcm_token = request.data.get('fcm')
if not fcm_token:
return Response({"detail": "FCM token is required."}, status=status.HTTP_200_OK)
user.fcm = fcm_token
user.save()
return Response({"detail": "FCM token updated successfully."}, status=status.HTTP_200_OK)