import logging import re from pathlib import Path from rest_framework.mixins import CreateModelMixin from rest_framework.permissions import IsAuthenticated from rest_framework.generics import GenericAPIView from rest_framework.response import Response from rest_framework.views import APIView from rest_framework import status from apps.account.models import LocationHistory from apps.account.serializers import LocationHistorySerializer, ReverseGeolocationSerializer, ReverseGeolocationResponseSerializer import geoip2.database import geoip2.errors from city_detection_ip import get_location_by_coordinates, get_location_by_ip, SPECIAL_COORDINATES from drf_yasg.utils import swagger_auto_schema from drf_yasg import openapi class LocationHistoryView(GenericAPIView, CreateModelMixin): permission_classes = [IsAuthenticated] serializer_class = LocationHistorySerializer def post(self, request, *args, **kwargs): ip = self.get_client_ip() data = request.data.copy() data['ip'] = ip serializer = self.get_serializer(data=data) if serializer.is_valid(): serializer.save(user=request.user) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) def get_queryset(self): return LocationHistory.objects.filter(user=self.request.user) def get_client_ip(self): # Retrieve the client's IP address from the request headers x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = self.request.META.get('REMOTE_ADDR') return ip logger = logging.getLogger(__name__) # GeoLite2 database path CITY_DB_PATH = Path("utils/country_city_db/GeoLite2-City.mmdb") def detect_browser_from_user_agent(user_agent): """ Detect browser name from User-Agent string. Args: user_agent (str): The User-Agent header value Returns: str or None: Browser name if detected, None if not a browser or detection fails """ if not user_agent: return None try: user_agent = user_agent.lower() # Check for Flutter/Dart app (return None for non-browser requests) if any(keyword in user_agent for keyword in ['dart:io', 'flutter', 'dart/']): return None # Check for mobile apps that might not be browsers if any(keyword in user_agent for keyword in ['habibapp', 'mobile app']): return None # Browser detection patterns (order matters - more specific first) browser_patterns = [ (r'edg/', 'Edge'), # Microsoft Edge (Chromium-based) (r'edge/', 'Edge'), # Microsoft Edge (Legacy) (r'opr/', 'Opera'), # Opera (r'opera/', 'Opera'), # Opera (r'chrome/', 'Chrome'), # Google Chrome (r'chromium/', 'Chromium'), # Chromium (r'firefox/', 'Firefox'), # Mozilla Firefox (r'fxios/', 'Firefox'), # Firefox for iOS (r'safari/', 'Safari'), # Safari (check after Chrome/Edge as they also contain Safari) (r'version/.*safari', 'Safari'), # Safari with version ] # Check each pattern for pattern, browser_name in browser_patterns: if re.search(pattern, user_agent): # Additional check for Safari to avoid false positives if browser_name == 'Safari': # Make sure it's not Chrome, Edge, or other browsers that include Safari in UA if not any(other in user_agent for other in ['chrome', 'edg', 'opr', 'opera']): return browser_name else: return browser_name # If no specific browser detected but contains Mozilla, it might be an unknown browser if 'mozilla' in user_agent and any(keyword in user_agent for keyword in ['gecko', 'webkit']): return 'Unknown Browser' return None except Exception as e: # Log the error but don't let it break the API logger.warning(f"Error detecting browser from user agent: {e}") return None class RegionInfoView(GenericAPIView): def get(self, request, *args, **kwargs): # Get browser information safely browser = None try: user_agent = request.META.get('HTTP_USER_AGENT', '') browser = detect_browser_from_user_agent(user_agent) except Exception as e: # Log the error but continue with the API response logger.warning(f"Error detecting browser in RegionInfoView: {e}") browser = None # Get IP address ip = self.get_client_ip(request) # Get geolocation data from GeoIP2 database geo_data = self.get_location_from_ip(ip) region_info = { 'ip': request.META.get('HTTP_CF_CONNECTING_IP') or ip, 'country': request.META.get('HTTP_CF_IPCOUNTRY'), 'region': request.META.get('HTTP_CF_REGION'), 'region_code': request.META.get('HTTP_CF_REGION_CODE'), 'city': request.META.get('HTTP_CF_CITY'), 'timezone': request.META.get('HTTP_CF_TIMEZONE'), 'browser': browser, } # Add geolocation data if available if geo_data: region_info.update({ 'country_code': geo_data.get('country_code'), 'latitude': geo_data.get('latitude'), 'longitude': geo_data.get('longitude'), 'accuracy_radius': geo_data.get('accuracy_radius'), 'time_zone': geo_data.get('time_zone'), 'postal_code': geo_data.get('postal_code'), }) return Response(region_info) def get_client_ip(self, request): """Extract client IP from request""" x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0].strip() else: ip = request.META.get('REMOTE_ADDR') return ip def get_location_from_ip(self, ip): """Get location data from IP using GeoIP2 database""" try: # Skip local/private IPs if ip in ['127.0.0.1', 'localhost'] or ip.startswith('192.168.') or ip.startswith('10.'): logger.warning(f"Skipping local/private IP: {ip}") return { 'ip': ip, 'city': None, 'country': None, 'country_code': None, 'latitude': None, 'longitude': None, 'accuracy_radius': None, 'time_zone': None, 'postal_code': None, } if not CITY_DB_PATH.exists(): logger.error(f"GeoIP2 database not found at {CITY_DB_PATH}") return None with geoip2.database.Reader(CITY_DB_PATH) as reader: response = reader.city(ip) # Extract city name with validation city_name = None if response.city and response.city.name: # Check if city name is actually a subdivision (region) # This is a known issue in GeoIP2 where subdivision names appear as city names subdivision_names = [s.name for s in response.subdivisions] if response.subdivisions else [] if response.city.name not in subdivision_names: # City name is valid - not a subdivision city_name = response.city.name else: # City name matches a subdivision - this is a region, not a city logger.warning(f"IP {ip}: City name '{response.city.name}' matches subdivision - treating as region") city_name = None # Don't return region as city location_data = { 'ip': ip, 'city': city_name, 'country': response.country.name if response.country else None, 'country_code': response.country.iso_code if response.country else None, 'latitude': response.location.latitude if response.location else None, 'longitude': response.location.longitude if response.location else None, 'accuracy_radius': response.location.accuracy_radius if response.location else None, 'time_zone': response.location.time_zone if response.location else None, 'postal_code': response.postal.code if response.postal else None, } logger.info(f"Successfully found location for IP {ip}: {location_data.get('city')}, {location_data.get('country')}") return location_data except geoip2.errors.AddressNotFoundError: logger.warning(f"IP address {ip} not found in GeoIP2 database") return None except Exception as e: logger.error(f"Error getting location from IP {ip}: {str(e)}") return None class ReverseGeolocationAPIView(APIView): """ API endpoint to get location information from geographic coordinates Returns: city, country, country_code based on latitude and longitude """ permission_classes = [] def validate_city_name_from_coordinates(self, lat, lon, city_name): """ Validate that the city name is not actually a subdivision (region). Uses keyword-based heuristic to detect subdivision names. Args: lat: Latitude coordinate lon: Longitude coordinate city_name: City name to validate Returns: Validated city name or None if it's a subdivision """ if not city_name: return None try: # Simple heuristic: if city name contains common subdivision keywords # in various languages, it might be a subdivision subdivision_keywords = [ 'Province', 'Region', 'Oblast', 'Governorate', 'District', 'County', 'State', 'Territory', 'استان', 'منطقه', 'ولایت', 'محافظه' ] for keyword in subdivision_keywords: if keyword.lower() in city_name.lower(): logger.warning( f"⚠️ City name '{city_name}' at ({lat}, {lon}) " f"contains subdivision keyword '{keyword}' - treating as region (returning None)" ) return None logger.debug(f"✅ City name '{city_name}' validated for ({lat}, {lon})") return city_name except Exception as e: logger.error(f"❌ Error validating city name for coordinates ({lat}, {lon}): {str(e)}") return city_name # Return as-is on error @swagger_auto_schema( operation_description="Get location information (city, country) based on geographic coordinates using reverse geocoding", manual_parameters=[ openapi.Parameter( 'lat', openapi.IN_QUERY, description="Latitude coordinate (-90 to 90)", type=openapi.TYPE_NUMBER, required=True ), openapi.Parameter( 'lon', openapi.IN_QUERY, description="Longitude coordinate (-180 to 180)", type=openapi.TYPE_NUMBER, required=True ), ], responses={ 200: openapi.Response( description="Location information", schema=ReverseGeolocationResponseSerializer() ), 400: openapi.Response( description="Invalid or missing coordinates" ), 404: openapi.Response( description="No location found for the given coordinates" ), 500: openapi.Response( description="Internal server error" ) }, tags=['account'] ) def get(self, request): """Get location info from coordinates""" # Validate query parameters serializer = ReverseGeolocationSerializer(data=request.query_params) if not serializer.is_valid(): return Response( { 'error': 'Invalid coordinates', 'details': serializer.errors }, status=status.HTTP_400_BAD_REQUEST ) lat = serializer.validated_data['lat'] lon = serializer.validated_data['lon'] # Log the coordinates for debugging logger.info(f"Reverse geocoding for coordinates: ({lat}, {lon})") # Get location data using the existing function from city_detection_ip.py location_data = get_location_by_coordinates(lat, lon) if not location_data or location_data.get('status') != 'success': return Response( { 'error': 'Could not find location data for these coordinates', 'latitude': lat, 'longitude': lon }, status=status.HTTP_404_NOT_FOUND ) # Validate city name to ensure it's not a subdivision (region) city_name = location_data.get('city') validated_city = self.validate_city_name_from_coordinates(lat, lon, city_name) # Format response response_data = { 'latitude': lat, 'longitude': lon, 'city': validated_city, 'country': None, # GeoNames only returns country_code 'country_code': location_data.get('countryCode'), 'accuracy_radius': None, 'time_zone': None, 'postal_code': None, } logger.info(f"Successfully found location for coordinates ({lat}, {lon}): {response_data.get('city')}, {response_data.get('country_code')}") return Response(response_data, status=status.HTTP_200_OK)