You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
121 lines
4.1 KiB
121 lines
4.1 KiB
import json
|
|
import hashlib
|
|
import random
|
|
import secrets
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
|
|
|
|
from redis.exceptions import RedisError
|
|
|
|
from django.conf import settings
|
|
|
|
from config.redis_config import RedisConfig
|
|
from utils.exceptions import ServiceUnavailableException, NotFoundException
|
|
|
|
class RedisManager(RedisConfig):
|
|
|
|
def __serialize(self, code, fullname, password):
|
|
return f'{code},{fullname},{password}'
|
|
|
|
|
|
def add_to_redis(self, code, **kwargs) -> bool:
|
|
try:
|
|
password = kwargs.get('password')
|
|
key = self.__serialize(
|
|
code=code, fullname=kwargs['fullname'], password=password
|
|
)
|
|
self.redis.set(kwargs["email"], str(key), ex=timedelta(minutes=20))
|
|
return kwargs["email"]
|
|
except RedisError as exp:
|
|
raise ServiceUnavailableException()
|
|
|
|
def __deserialize(
|
|
self,
|
|
value: str,
|
|
key: list = ['code', 'fullname', 'password']
|
|
):
|
|
values = value.split(',')
|
|
# Check if lengths of keys and values are not equal
|
|
|
|
if len(key) != len(values):
|
|
raise ValueError("The number of keys does not match the number of values.")
|
|
|
|
result = {}
|
|
for k, v in zip(key, values):
|
|
if not k or not v: # Check if either key or value is empty
|
|
result[k] = None # or '' if you prefer empty string
|
|
else:
|
|
result[k] = v
|
|
|
|
return result
|
|
|
|
def get_by_redis(self, key: str):
|
|
try:
|
|
print(key)
|
|
data = self.redis.get(key)
|
|
print(f'get => {data}')
|
|
return self.__deserialize(data.decode())
|
|
except RedisError as exp:
|
|
raise ServiceUnavailableException()
|
|
except (TypeError, ValueError, AttributeError):
|
|
raise NotFoundException()
|
|
|
|
def check_exists_redis(self, email: str) -> bool:
|
|
"""
|
|
check exists key in redis
|
|
"""
|
|
try:
|
|
exists = self.redis.exists(email)
|
|
return exists
|
|
except RedisError as exp:
|
|
raise CustomException("Service temporarily unavailable")
|
|
|
|
@staticmethod
|
|
def generate_otp_code() -> int:
|
|
random_code = random.randint(10000, 99999)
|
|
return random_code
|
|
|
|
|
|
class OnlineClassTokenManager(RedisConfig):
|
|
"""Manage temporary tokens used for joining online classes."""
|
|
|
|
KEY_PREFIX = "online_class_token:"
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.ttl = getattr(settings, "ONLINE_CLASS_TOKEN_TTL", 300)
|
|
|
|
def _build_key(self, token: str) -> str:
|
|
return f"{self.KEY_PREFIX}{token}"
|
|
|
|
def generate_token(self, course_id: int, user_identifier: str) -> str:
|
|
seed = f"{course_id}:{user_identifier}:{secrets.token_urlsafe(16)}"
|
|
return hashlib.sha256(seed.encode()).hexdigest()
|
|
|
|
def store_token(self, token: str, payload: dict, ttl: Optional[int] = None) -> None:
|
|
data = {
|
|
**payload,
|
|
"generated_at": datetime.utcnow().isoformat() + "Z",
|
|
}
|
|
self.redis.set(self._build_key(token), json.dumps(data), ex=ttl or self.ttl)
|
|
|
|
def get_payload(self, token: str) -> dict:
|
|
stored = self.redis.get(self._build_key(token))
|
|
if not stored:
|
|
raise NotFoundException("Token not found or has expired.")
|
|
return json.loads(stored)
|
|
|
|
def delete_token(self, token: str) -> None:
|
|
self.redis.delete(self._build_key(token))
|
|
|
|
@staticmethod
|
|
def build_entry_url(token: str, base_url: Optional[str] = None) -> str:
|
|
base = base_url or getattr(settings, "ONLINE_CLASS_FRONTEND_DOMAIN", getattr(settings, "SITE_DOMAIN", ""))
|
|
if not base:
|
|
return f"?token={token}"
|
|
parsed = urlparse(base)
|
|
query_params = dict(parse_qsl(parsed.query))
|
|
query_params["token"] = token
|
|
new_query = urlencode(query_params)
|
|
return urlunparse(parsed._replace(query=new_query))
|