import random from datetime import datetime, timedelta from redis.exceptions import RedisError from config.redis_config import RedisConfig from utils.exceptions import ServiceUnavailableException, NotFoundException class RedisManager(RedisConfig): def __serialize(self, code, fullname, password): return f'{code},{fullname},{password}' def add_to_redis(self, code, **kwargs) -> bool: try: password = kwargs.get('password') key = self.__serialize( code=code, fullname=kwargs['fullname'], password=password ) self.redis.set(kwargs["email"], str(key), ex=timedelta(minutes=20)) return kwargs["email"] except RedisError as exp: raise ServiceUnavailableException() def __deserialize( self, value: str, key: list = ['code', 'fullname', 'password'] ): values = value.split(',') # Check if lengths of keys and values are not equal if len(key) != len(values): raise ValueError("The number of keys does not match the number of values.") result = {} for k, v in zip(key, values): if not k or not v: # Check if either key or value is empty result[k] = None # or '' if you prefer empty string else: result[k] = v return result def get_by_redis(self, key: str): try: print(key) data = self.redis.get(key) print(f'get => {data}') return self.__deserialize(data.decode()) except RedisError as exp: raise ServiceUnavailableException() except (TypeError, ValueError, AttributeError): raise NotFoundException() def check_exists_redis(self, email: str) -> bool: """ check exists key in redis """ try: exists = self.redis.exists(email) return exists except RedisError as exp: raise CustomException("Service temporarily unavailable") @staticmethod def generate_otp_code() -> int: random_code = random.randint(10000, 99999) return random_code