You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

179 lines
5.8 KiB

import geoip2.database
from pathlib import Path
from django.db import connection
from django.core.cache import cache
import logging
# Configure logging
logger = logging.getLogger(__name__)
# GeoLite2 database paths
CITY_DB_PATH = Path("utils/country_city_db/GeoLite2-City.mmdb")
# Special coordinates
SPECIAL_COORDINATES = [
(32.616565, 44.03462),
(51.5287718, -0.2416802),
(40.3947021, 49.78492),
(55.751199, 37.614706),
(48.8589466, 2.2769956),
(40.4381311, -3.8196194),
(-6.2295712, 106.759478),
(33.6158004, 72.8059198)
]
def get_location_by_coordinates_optimized(lat, lon):
"""
Optimized version with special coordinates handling.
Handles special coordinates correctly before geo lookup.
"""
try:
# Quick validation
if not lat or not lon:
return None
lat, lon = float(lat), float(lon)
# Check if coordinates are in special list - should use IP detection instead
for special_lat, special_lon in SPECIAL_COORDINATES:
if abs(lat - special_lat) < 0.001 and abs(lon - special_lon) < 0.001:
# These coordinates should use IP detection, not geo lookup
# Return None to trigger fallback to original method
logger.debug(f"Special coordinate detected: ({lat}, {lon}) - skipping geo lookup")
return None
# Simple cache key
cache_key = f'geo_{round(lat, 2)}_{round(lon, 2)}'
# Try cache first (no exception handling for speed)
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
# Simple bounding box (larger range for better coverage)
lat_range = 3.0 # ~330km
lon_range = 3.0
lat_min = lat - lat_range
lat_max = lat + lat_range
lon_min = lon - lon_range
lon_max = lon + lon_range
# Simplified query - remove population filter for better accuracy
with connection.cursor() as cursor:
cursor.execute("""
WITH bounded_cities AS (
SELECT name, country_code, latitude, longitude
FROM geonames_city
WHERE feature_class = 'P'
AND latitude BETWEEN %s AND %s
AND longitude BETWEEN %s AND %s
),
distance_calc AS (
SELECT name, country_code,
(6371 * acos(least(1, greatest(-1,
cos(radians(%s)) * cos(radians(latitude)) *
cos(radians(longitude) - radians(%s)) +
sin(radians(%s)) * sin(radians(latitude))
)))) AS distance
FROM bounded_cities
)
SELECT name, country_code
FROM distance_calc
WHERE distance <= 300
ORDER BY distance
LIMIT 1
""", [lat_min, lat_max, lon_min, lon_max, lat, lon, lat])
result = cursor.fetchone()
if result:
name, country_code = result
response = {
'status': 'success',
'city': name,
'countryCode': country_code
}
# Cache for 24 hours
cache.set(cache_key, response, 86400)
return response
else:
# Cache None for 1 hour
cache.set(cache_key, None, 3600)
return None
except Exception:
# Fallback to original method on any error
return get_location_by_coordinates_original(lat, lon)
def get_location_by_coordinates_original(lat, lon):
"""Original implementation as fallback"""
try:
with connection.cursor() as cursor:
cursor.execute("""
WITH distance_calc AS (
SELECT name, country_code, latitude, longitude,
(6371 * acos(least(1, greatest(-1, cos(radians(%s)) * cos(radians(latitude)) *
cos(radians(longitude) - radians(%s)) +
sin(radians(%s)) * sin(radians(latitude)))))) AS distance
FROM geonames_city
WHERE feature_class = 'P'
)
SELECT name, country_code, distance
FROM distance_calc
WHERE distance <= 300
ORDER BY distance
LIMIT 1
""", [lat, lon, lat])
result = cursor.fetchone()
if result:
name, country_code, distance = result
return {
'status': 'success',
'city': name,
'countryCode': country_code
}
return None
except Exception:
return None
def get_location_by_coordinates(lat, lon):
"""
Main function with smart fallback strategy.
Try optimized first, fallback to original if needed.
"""
# Try optimized version first
result = get_location_by_coordinates_optimized(lat, lon)
# If optimized fails, use original as fallback
if result is None:
result = get_location_by_coordinates_original(lat, lon)
return result
def get_location_by_ip(ip):
"""Get location from IP using MaxMind MMDB file directly"""
try:
if not CITY_DB_PATH.exists():
return None
with geoip2.database.Reader(CITY_DB_PATH) as reader:
response = reader.city(ip)
if response and response.city and response.country:
return {
'status': 'success',
'countryCode': response.country.iso_code,
'city': response.city.name
}
return None
except Exception:
return None