You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

278 lines
11 KiB

import logging
from pathlib import Path
import geoip2.database
import geoip2.errors
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from apps.geolocation_package.serializers import IPGeolocationSerializer, ReverseGeolocationSerializer, ReverseGeolocationResponseSerializer
from city_detection_ip import get_location_by_coordinates
logger = logging.getLogger(__name__)
# GeoLite2 database path
CITY_DB_PATH = Path("utils/country_city_db/GeoLite2-City.mmdb")
class IPGeolocationAPIView(APIView):
"""
API endpoint to get geolocation information from client's IP address
Returns: country, city, latitude, longitude, timezone, and other location data
"""
permission_classes = []
@swagger_auto_schema(
operation_description="Get geolocation information based on the client's IP address",
responses={
200: openapi.Response(
description="Geolocation information",
schema=IPGeolocationSerializer()
),
404: openapi.Response(
description="IP address not found in database or database not available"
),
500: openapi.Response(
description="Internal server error"
)
},
tags=['account']
)
def get(self, request):
"""Get geolocation info from request IP"""
ip = self.get_client_ip(request)
if not ip:
return Response(
{'error': 'Could not determine client IP address'},
status=status.HTTP_400_BAD_REQUEST
)
# Log the detected IP for debugging
logger.info(f"Detecting location for IP: {ip}")
location_data = self.get_location_from_ip(ip)
if not location_data:
return Response(
{
'error': 'Could not find location data for this IP address',
'ip': ip
},
status=status.HTTP_404_NOT_FOUND
)
# Return location data directly
# Serializer with all read_only fields doesn't work with data parameter
return Response(location_data, status=status.HTTP_200_OK)
def get_client_ip(self, request):
"""Extract client IP from request"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0].strip()
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def get_location_from_ip(self, ip):
"""Get location data from IP using GeoIP2 database"""
try:
# Skip local/private IPs
if ip in ['127.0.0.1', 'localhost'] or ip.startswith('192.168.') or ip.startswith('10.'):
logger.warning(f"Skipping local/private IP: {ip}")
# Return empty data instead of None to avoid 404
return {
'ip': ip,
'city': None,
'country': None,
'country_code': None,
'latitude': None,
'longitude': None,
'accuracy_radius': None,
'time_zone': None,
'postal_code': None,
}
if not CITY_DB_PATH.exists():
logger.error(f"GeoIP2 database not found at {CITY_DB_PATH}")
return None
with geoip2.database.Reader(CITY_DB_PATH) as reader:
response = reader.city(ip)
# Extract city name with validation
city_name = None
if response.city and response.city.name:
# Check if city name is actually a subdivision (region)
# This is a known issue in GeoIP2 where subdivision names appear as city names
subdivision_names = [s.name for s in response.subdivisions] if response.subdivisions else []
if response.city.name not in subdivision_names:
# City name is valid - not a subdivision
city_name = response.city.name
else:
# City name matches a subdivision - this is a region, not a city
logger.warning(f"IP {ip}: City name '{response.city.name}' matches subdivision - treating as region")
city_name = None # Don't return region as city
location_data = {
'ip': ip,
'city': city_name,
'country': response.country.name if response.country else None,
'country_code': response.country.iso_code if response.country else None,
'latitude': response.location.latitude if response.location else None,
'longitude': response.location.longitude if response.location else None,
'accuracy_radius': response.location.accuracy_radius if response.location else None,
'time_zone': response.location.time_zone if response.location else None,
'postal_code': response.postal.code if response.postal else None,
}
logger.info(f"Successfully found location for IP {ip}: {location_data.get('city')}, {location_data.get('country')}")
return location_data
except geoip2.errors.AddressNotFoundError:
logger.warning(f"IP address {ip} not found in GeoIP2 database")
return None
except Exception as e:
logger.error(f"Error getting location from IP {ip}: {str(e)}")
return None
class ReverseGeolocationAPIView(APIView):
"""
API endpoint to get location information from geographic coordinates
Returns: city, country, country_code based on latitude and longitude
"""
permission_classes = []
def validate_city_name_from_coordinates(self, lat, lon, city_name):
"""
Validate that the city name is not actually a subdivision (region).
Uses keyword-based heuristic to detect subdivision names.
Args:
lat: Latitude coordinate
lon: Longitude coordinate
city_name: City name to validate
Returns:
Validated city name or None if it's a subdivision
"""
if not city_name:
return None
try:
# Simple heuristic: if city name contains common subdivision keywords
# in various languages, it might be a subdivision
subdivision_keywords = [
'Province', 'Region', 'Oblast', 'Governorate',
'District', 'County', 'State', 'Territory',
'استان', 'منطقه', 'ولایت', 'محافظه'
]
for keyword in subdivision_keywords:
if keyword.lower() in city_name.lower():
logger.warning(
f"⚠️ City name '{city_name}' at ({lat}, {lon}) "
f"contains subdivision keyword '{keyword}' - treating as region (returning None)"
)
return None
logger.debug(f"✅ City name '{city_name}' validated for ({lat}, {lon})")
return city_name
except Exception as e:
logger.error(f"❌ Error validating city name for coordinates ({lat}, {lon}): {str(e)}")
return city_name # Return as-is on error
@swagger_auto_schema(
operation_description="Get location information (city, country) based on geographic coordinates using reverse geocoding",
manual_parameters=[
openapi.Parameter(
'lat',
openapi.IN_QUERY,
description="Latitude coordinate (-90 to 90)",
type=openapi.TYPE_NUMBER,
required=True
),
openapi.Parameter(
'lon',
openapi.IN_QUERY,
description="Longitude coordinate (-180 to 180)",
type=openapi.TYPE_NUMBER,
required=True
),
],
responses={
200: openapi.Response(
description="Location information",
schema=ReverseGeolocationResponseSerializer()
),
400: openapi.Response(
description="Invalid or missing coordinates"
),
404: openapi.Response(
description="No location found for the given coordinates"
),
500: openapi.Response(
description="Internal server error"
)
},
tags=['account']
)
def get(self, request):
"""Get location info from coordinates"""
# Validate query parameters
serializer = ReverseGeolocationSerializer(data=request.query_params)
if not serializer.is_valid():
return Response(
{
'error': 'Invalid coordinates',
'details': serializer.errors
},
status=status.HTTP_400_BAD_REQUEST
)
lat = serializer.validated_data['lat']
lon = serializer.validated_data['lon']
# Log the coordinates for debugging
logger.info(f"Reverse geocoding for coordinates: ({lat}, {lon})")
# Get location data using the existing function from city_detection_ip.py
location_data = get_location_by_coordinates(lat, lon)
if not location_data or location_data.get('status') != 'success':
return Response(
{
'error': 'Could not find location data for these coordinates',
'latitude': lat,
'longitude': lon
},
status=status.HTTP_404_NOT_FOUND
)
# Validate city name to ensure it's not a subdivision (region)
city_name = location_data.get('city')
validated_city = self.validate_city_name_from_coordinates(lat, lon, city_name)
# Format response
response_data = {
'latitude': lat,
'longitude': lon,
'city': validated_city,
'country': None, # GeoNames only returns country_code
'country_code': location_data.get('countryCode'),
'accuracy_radius': None,
'time_zone': None,
'postal_code': None,
}
logger.info(f"Successfully found location for coordinates ({lat}, {lon}): {response_data.get('city')}, {response_data.get('country_code')}")
return Response(response_data, status=status.HTTP_200_OK)