You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
453 lines
17 KiB
453 lines
17 KiB
import os
|
|
import time
|
|
import geoip2.database
|
|
from pathlib import Path
|
|
from django.db import connection
|
|
from django.db.models import Q
|
|
from django.core.cache import cache
|
|
from django.db import transaction
|
|
import logging
|
|
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.develop')
|
|
from django.core.wsgi import get_wsgi_application
|
|
|
|
application = get_wsgi_application()
|
|
|
|
from apps.account.models import LoginHistory
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# GeoLite2 database paths
|
|
CITY_DB_PATH = Path("utils/country_city_db/GeoLite2-City.mmdb")
|
|
|
|
# Special coordinates
|
|
SPECIAL_COORDINATES = [
|
|
(32.616565, 44.03462),
|
|
(51.5287718, -0.2416802),
|
|
(40.3947021, 49.78492),
|
|
(55.751199, 37.614706),
|
|
(48.8589466, 2.2769956),
|
|
(40.4381311, -3.8196194),
|
|
(-6.2295712, 106.759478),
|
|
(33.6158004, 72.8059198)
|
|
]
|
|
|
|
def get_location_by_coordinates_optimized(lat, lon):
|
|
"""
|
|
Optimized version with special coordinates handling.
|
|
Handles special coordinates correctly before geo lookup.
|
|
"""
|
|
try:
|
|
# Quick validation
|
|
if not lat or not lon:
|
|
return None
|
|
|
|
lat, lon = float(lat), float(lon)
|
|
|
|
# Check if coordinates are in special list - should use IP detection instead
|
|
for special_lat, special_lon in SPECIAL_COORDINATES:
|
|
if abs(lat - special_lat) < 0.001 and abs(lon - special_lon) < 0.001:
|
|
# These coordinates should use IP detection, not geo lookup
|
|
# Return None to trigger fallback to original method
|
|
logger.debug(f"Special coordinate detected: ({lat}, {lon}) - skipping geo lookup")
|
|
return None
|
|
|
|
# Simple cache key
|
|
cache_key = f'geo_{round(lat, 2)}_{round(lon, 2)}'
|
|
|
|
# Try cache first (no exception handling for speed)
|
|
cached_result = cache.get(cache_key)
|
|
if cached_result is not None:
|
|
return cached_result
|
|
|
|
# Simple bounding box (larger range for better coverage)
|
|
lat_range = 3.0 # ~330km
|
|
lon_range = 3.0
|
|
|
|
lat_min = lat - lat_range
|
|
lat_max = lat + lat_range
|
|
lon_min = lon - lon_range
|
|
lon_max = lon + lon_range
|
|
|
|
# Query with population weighting to prefer larger cities
|
|
with connection.cursor() as cursor:
|
|
# First, let's get debug information about nearby cities
|
|
cursor.execute("""
|
|
WITH bounded_cities AS (
|
|
SELECT name, country_code, latitude, longitude, population
|
|
FROM geonames_city
|
|
WHERE feature_class = 'P'
|
|
AND latitude BETWEEN %s AND %s
|
|
AND longitude BETWEEN %s AND %s
|
|
),
|
|
distance_calc AS (
|
|
SELECT name, country_code, population,
|
|
(6371 * acos(least(1, greatest(-1,
|
|
cos(radians(%s)) * cos(radians(latitude)) *
|
|
cos(radians(longitude) - radians(%s)) +
|
|
sin(radians(%s)) * sin(radians(latitude))
|
|
)))) AS distance
|
|
FROM bounded_cities
|
|
)
|
|
SELECT name, country_code, population, distance
|
|
FROM distance_calc
|
|
WHERE distance <= 100
|
|
ORDER BY distance
|
|
LIMIT 10
|
|
""", [lat_min, lat_max, lon_min, lon_max, lat, lon, lat])
|
|
|
|
debug_results = cursor.fetchall()
|
|
if debug_results:
|
|
logger.info(f"🔍 Top 10 nearby cities for coordinates ({lat}, {lon}):")
|
|
for name, cc, pop, dist in debug_results:
|
|
logger.info(f" 📍 {name} ({cc}): population={pop:,}, distance={dist:.2f}km")
|
|
|
|
# Now get the best city using a weighted approach
|
|
# Prefer cities with larger population within reasonable distance
|
|
cursor.execute("""
|
|
WITH bounded_cities AS (
|
|
SELECT name, country_code, latitude, longitude, population
|
|
FROM geonames_city
|
|
WHERE feature_class = 'P'
|
|
AND latitude BETWEEN %s AND %s
|
|
AND longitude BETWEEN %s AND %s
|
|
AND population IS NOT NULL
|
|
AND population > 0
|
|
),
|
|
distance_calc AS (
|
|
SELECT name, country_code, population,
|
|
(6371 * acos(least(1, greatest(-1,
|
|
cos(radians(%s)) * cos(radians(latitude)) *
|
|
cos(radians(longitude) - radians(%s)) +
|
|
sin(radians(%s)) * sin(radians(latitude))
|
|
)))) AS distance
|
|
FROM bounded_cities
|
|
),
|
|
scored_cities AS (
|
|
SELECT name, country_code, distance, population,
|
|
-- Score: prefer closer cities, but weight population heavily
|
|
-- Cities within 30km: prioritize by population
|
|
-- Cities beyond 30km: balance distance and population
|
|
CASE
|
|
WHEN distance <= 30 THEN population / (distance + 1)
|
|
ELSE population / POWER(distance, 2)
|
|
END AS score
|
|
FROM distance_calc
|
|
WHERE distance <= 100
|
|
)
|
|
SELECT name, country_code
|
|
FROM scored_cities
|
|
ORDER BY score DESC
|
|
LIMIT 1
|
|
""", [lat_min, lat_max, lon_min, lon_max, lat, lon, lat])
|
|
|
|
result = cursor.fetchone()
|
|
|
|
if result:
|
|
name, country_code = result
|
|
logger.info(f"✅ Selected city: {name} ({country_code}) for coordinates ({lat}, {lon})")
|
|
response = {
|
|
'status': 'success',
|
|
'city': name,
|
|
'countryCode': country_code
|
|
}
|
|
|
|
# Cache for 24 hours
|
|
cache.set(cache_key, response, 86400)
|
|
return response
|
|
else:
|
|
logger.warning(f"⚠️ No city found within 100km for coordinates ({lat}, {lon})")
|
|
# Cache None for 1 hour
|
|
cache.set(cache_key, None, 3600)
|
|
return None
|
|
|
|
except Exception:
|
|
# Fallback to original method on any error
|
|
return get_location_by_coordinates_original(lat, lon)
|
|
|
|
|
|
def get_location_by_coordinates_original(lat, lon):
|
|
"""Original implementation as fallback"""
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("""
|
|
WITH distance_calc AS (
|
|
SELECT name, country_code, latitude, longitude,
|
|
(6371 * acos(least(1, greatest(-1, cos(radians(%s)) * cos(radians(latitude)) *
|
|
cos(radians(longitude) - radians(%s)) +
|
|
sin(radians(%s)) * sin(radians(latitude)))))) AS distance
|
|
FROM geonames_city
|
|
WHERE feature_class = 'P'
|
|
)
|
|
SELECT name, country_code
|
|
FROM distance_calc
|
|
WHERE distance <= 300
|
|
ORDER BY distance
|
|
LIMIT 1
|
|
""", [lat, lon, lat])
|
|
|
|
result = cursor.fetchone()
|
|
|
|
if result:
|
|
name, country_code = result
|
|
logger.info(f"🔄 Fallback method selected city: {name} ({country_code}) for coordinates ({lat}, {lon})")
|
|
return {
|
|
'status': 'success',
|
|
'city': name,
|
|
'countryCode': country_code
|
|
}
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error in fallback method for coordinates ({lat}, {lon}): {str(e)}")
|
|
return None
|
|
|
|
|
|
def get_location_by_coordinates(lat, lon):
|
|
"""
|
|
Main function with smart fallback strategy.
|
|
Try optimized first, fallback to original if needed.
|
|
"""
|
|
# Try optimized version first
|
|
print('lat:',lat ,'lon:', lon)
|
|
result = get_location_by_coordinates_optimized(lat, lon)
|
|
|
|
print('optimize:',result)
|
|
|
|
# If optimized fails, use original as fallback
|
|
if result is None:
|
|
result = get_location_by_coordinates_original(lat, lon)
|
|
print ('original',result)
|
|
print('out',result)
|
|
return result
|
|
|
|
def get_location_by_ip(ip):
|
|
"""Get location from IP using MaxMind MMDB file directly"""
|
|
try:
|
|
if not CITY_DB_PATH.exists():
|
|
return None
|
|
|
|
with geoip2.database.Reader(CITY_DB_PATH) as reader:
|
|
response = reader.city(ip)
|
|
if response and response.country:
|
|
# Validate city name - check if it's not a subdivision
|
|
city_name = None
|
|
if response.city and response.city.name:
|
|
subdivision_names = [s.name for s in response.subdivisions] if response.subdivisions else []
|
|
|
|
if response.city.name not in subdivision_names:
|
|
# City name is valid - not a subdivision
|
|
city_name = response.city.name
|
|
else:
|
|
# City name matches a subdivision - this is a region, not a city
|
|
logger.warning(f"IP {ip}: City name '{response.city.name}' matches subdivision - treating as region")
|
|
city_name = None
|
|
|
|
return {
|
|
'status': 'success',
|
|
'countryCode': response.country.iso_code,
|
|
'city': city_name
|
|
}
|
|
return None
|
|
|
|
except Exception:
|
|
return None
|
|
|
|
def update_login_history_optimized():
|
|
"""
|
|
Optimized version with batch processing and better error handling.
|
|
Processes records in batches to reduce database load and improve performance.
|
|
"""
|
|
logger.info("Starting optimized login history update...")
|
|
|
|
# Query for login histories that need updating
|
|
special_records = (
|
|
LoginHistory.objects
|
|
.exclude(location_method="IP_DETECTION")
|
|
.exclude(lat__isnull=True)
|
|
.exclude(lon__isnull=True)
|
|
.filter(lat__in=[lat for lat, _ in SPECIAL_COORDINATES], lon__in=[lon for _, lon in SPECIAL_COORDINATES])
|
|
[:1000] # Limit batch size
|
|
)
|
|
|
|
normal_records = (
|
|
LoginHistory.objects
|
|
.exclude(location_method="IP_DETECTION")
|
|
.exclude(lat__isnull=True)
|
|
.exclude(lon__isnull=True)
|
|
.exclude(lat__in=[lat for lat, _ in SPECIAL_COORDINATES], lon__in=[lon for _, lon in SPECIAL_COORDINATES])
|
|
[:1000] # Limit batch size
|
|
)
|
|
|
|
# Process special coordinates records (with IP) in batches
|
|
special_updates = []
|
|
for login in special_records:
|
|
try:
|
|
location_data = get_location_by_ip(login.ip)
|
|
if location_data and location_data['status'] == 'success':
|
|
login.country = location_data['countryCode']
|
|
login.city = location_data['city']
|
|
login.location_method = 'IP_DETECTION'
|
|
special_updates.append(login)
|
|
|
|
# Batch update every 50 records
|
|
if len(special_updates) >= 50:
|
|
with transaction.atomic():
|
|
LoginHistory.objects.bulk_update(
|
|
special_updates,
|
|
['country', 'city', 'location_method']
|
|
)
|
|
logger.info(f"Updated {len(special_updates)} special coordinate records")
|
|
special_updates = []
|
|
except Exception as e:
|
|
logger.error(f"Error processing special record {login.id}: {e}")
|
|
continue
|
|
|
|
# Final batch update for remaining special records
|
|
if special_updates:
|
|
with transaction.atomic():
|
|
LoginHistory.objects.bulk_update(
|
|
special_updates,
|
|
['country', 'city', 'location_method']
|
|
)
|
|
logger.info(f"Updated final {len(special_updates)} special coordinate records")
|
|
|
|
# Process normal coordinates records (with GeoNames) in batches
|
|
normal_updates = []
|
|
processed_normal = 0
|
|
for login in normal_records:
|
|
try:
|
|
location_data = get_location_by_coordinates(login.lat, login.lon)
|
|
if location_data and location_data['status'] == 'success':
|
|
login.country = location_data['countryCode']
|
|
login.city = location_data['city']
|
|
login.location_method = 'COORDINATES'
|
|
normal_updates.append(login)
|
|
processed_normal += 1
|
|
|
|
# Batch update every 20 records (smaller batch for geo queries)
|
|
if len(normal_updates) >= 20:
|
|
with transaction.atomic():
|
|
LoginHistory.objects.bulk_update(
|
|
normal_updates,
|
|
['country', 'city', 'location_method']
|
|
)
|
|
logger.info(f"Updated {len(normal_updates)} normal coordinate records")
|
|
normal_updates = []
|
|
except Exception as e:
|
|
logger.error(f"Error processing normal record {login.id}: {e}")
|
|
continue
|
|
|
|
# Final batch update for remaining normal records
|
|
if normal_updates:
|
|
with transaction.atomic():
|
|
LoginHistory.objects.bulk_update(
|
|
normal_updates,
|
|
['country', 'city', 'location_method']
|
|
)
|
|
logger.info(f"Updated final {len(normal_updates)} normal coordinate records")
|
|
|
|
logger.info(f"Completed login history update. Processed {processed_normal} normal records.")
|
|
|
|
|
|
def update_login_history():
|
|
"""Backward compatibility wrapper"""
|
|
return update_login_history_optimized()
|
|
|
|
def update_location_history_records_optimized():
|
|
"""
|
|
Optimized version with batch processing and progress tracking.
|
|
Updates location history records with city and country information using GeoNames database.
|
|
Only processes records that have coordinates but no city/country information.
|
|
"""
|
|
from apps.account.models import LocationHistory
|
|
|
|
logger.info("Starting optimized location history update...")
|
|
|
|
# Find records that need updating (limit to manageable batch size)
|
|
records = LocationHistory.objects.filter(
|
|
Q(city__isnull=True) | Q(city='') | Q(country__isnull=True) | Q(country=''),
|
|
lat__isnull=False,
|
|
lon__isnull=False
|
|
)[:1000] # Process in batches of 1000
|
|
|
|
total_records = records.count()
|
|
logger.info(f"Found {total_records} location history records to update")
|
|
|
|
if total_records == 0:
|
|
logger.info("No records to update")
|
|
return
|
|
|
|
updated_count = 0
|
|
batch_updates = []
|
|
|
|
for i, record in enumerate(records, 1):
|
|
try:
|
|
# Get location data based on coordinates
|
|
location_data = get_location_by_coordinates(record.lat, record.lon)
|
|
|
|
if location_data and location_data['status'] == 'success':
|
|
record.city = location_data['city']
|
|
record.country = location_data['countryCode']
|
|
batch_updates.append(record)
|
|
updated_count += 1
|
|
|
|
# Progress logging every 50 records
|
|
if i % 50 == 0:
|
|
logger.info(f"Processed {i}/{total_records} records ({updated_count} updated)")
|
|
|
|
# Batch update every 20 records
|
|
if len(batch_updates) >= 20:
|
|
with transaction.atomic():
|
|
LocationHistory.objects.bulk_update(
|
|
batch_updates,
|
|
['city', 'country']
|
|
)
|
|
logger.info(f"Bulk updated {len(batch_updates)} location history records")
|
|
batch_updates = []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing location history record {record.id}: {e}")
|
|
continue
|
|
|
|
# Final batch update for remaining records
|
|
if batch_updates:
|
|
with transaction.atomic():
|
|
LocationHistory.objects.bulk_update(
|
|
batch_updates,
|
|
['city', 'country']
|
|
)
|
|
logger.info(f"Final bulk update of {len(batch_updates)} location history records")
|
|
|
|
logger.info(f"Completed location history update. Updated {updated_count}/{total_records} records.")
|
|
|
|
|
|
def update_location_history_records():
|
|
"""Backward compatibility wrapper"""
|
|
return update_location_history_records_optimized()
|
|
|
|
if __name__ == "__main__":
|
|
# Configure logging for script execution
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('geo_optimization.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
|
|
logger.info("Starting optimized geo location processing...")
|
|
start_time = time.time()
|
|
|
|
try:
|
|
update_login_history()
|
|
update_location_history_records()
|
|
|
|
total_time = time.time() - start_time
|
|
logger.info(f"Completed all geo location processing in {total_time:.2f} seconds")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in main execution: {e}")
|
|
raise
|