基礎邏輯
一般來說這個驗證碼登錄分為手機號、以及郵箱登錄
手機號短信驗證,以騰訊云SMS 服務為例:
這個操作無非對后端來說就是兩個接口:
一個是獲取驗證碼,這塊后端生成6位數字+expire_time 去推送到騰訊云sdk ,騰訊云sdk再去推送運營商,運營商再去推送到用戶
一個就是驗證驗證碼,根據phone+ expire 去redis查就行了
注意 第一個用戶 使用手機號獲取驗證碼的時候,不要插庫,因為沒有經過驗證,所有涉及到手機號相關的,特別是綁定、登錄等必須要驗證手機號
騰訊云短信sms 服務sdk 封裝,注意python只有同步線程版,故使用asyncio.to_thread 封裝
import asyncio
import jsonfrom tencentcloud.common import credential
from tencentcloud.sms.v20210111 import sms_client, modelsfrom producer.utils.custom_exception_utils import CustomExceptiondef send_sms_sync(phone, code, expiration_date):try:cred = credential.Credential(settings.TENCENT_SMS_SECRET_ID, settings.TENCENT_SMS_SECRET_KEY)client = sms_client.SmsClient(cred, settings.TENCENT_SMS_REGION)req = models.SendSmsRequest()req.SmsSdkAppId = settings.TENCENT_SMS_APP_IDreq.SignName = settings.TENCENT_SMS_SIGN_NAMEreq.TemplateId = settings.TENCENT_SMS_TEMPLATE_IDreq.TemplateParamSet = [code, expiration_date]req.PhoneNumberSet = [phone]resp = client.SendSms(req)logger.info(resp.to_json_string(indent=2))# 將 JSON 字符串轉換為字典r = json.loads(resp.to_json_string())if r["SendStatusSet"][0]["Code"] == "Ok":return relse:error_message = r["SendStatusSet"][0]["Message"]logger.error(f"短信SMS服務失敗:{error_message}")raise CustomException(message=f"{phone}短信SMS服務失敗\n原因:{error_message}")except Exception as e:logger.error(f"發送短信失敗: {str(e)}")raise CustomException(message=str(e))async def send_sms_async(phone, code, expiration_date):result = await asyncio.to_thread(send_sms_sync, phone, code, str(expiration_date))return result
注意先插redis 再發送,因為 發送可能有異常,但是能保證測試的時候redis 有數據
一般來說會以redis 作為expire處理:
async def get_code(self, telephone: str):"""獲取驗證碼"""code = await self.login_code_services.create_login_code(telephone)await send_sms_async(phone=telephone, code=code, expiration_date=settings.SMS_VALIDATION_CODE_EXPIRATION)
創建六位數字的邏輯
create_login_code的邏輯:
async def create_login_code(self, telephone: str) -> str:"""根據手機號生成登錄驗證碼并存儲到 Redis 中:param telephone: 用戶手機號:return: 生成的驗證碼"""# 生成驗證碼# 生成驗證碼 generate_code 則是加密函數code = self.generate_code(telephone)# Redis 鍵名(可用手機號做區分)redis_key = f"{self.login_core_prefix}:{telephone}"# 使用 Redis 存儲驗證碼,有效時間 10 分鐘async with redis_client.get_client() as client:try:# 刪除已有的驗證碼await client.delete(redis_key)logger.info(f"Existing login code for {telephone} deleted from Redis.")# 設置新的驗證碼await client.set(redis_key, code, ex=self.login_code_expiration)logger.info(f"New login code for {telephone} stored in Redis: {code}")except Exception as e:logger.error(f"Failed to store login code in Redis: {e}")raisereturn code
以上展示的是最簡單的驗證登錄(設計sdk + 簡單的expire 處理)
高并發請求限制
1、問題:假設你的系統面對高并發用戶時,短信驗證碼的請求頻率可能會非常高。如何防止惡意用戶利用暴力破解或刷驗證碼的方式發起過多請求?
考察點:
? 防止頻繁請求(限流)。
? 防止濫用驗證碼接口。
解決方案:
? 限流機制:使用 Redis 或類似工具實現用戶請求頻率限制。比如,使用令牌桶算法(Token Bucket)或者漏桶算法(Leaky Bucket)來限制每個手機號每分鐘的驗證碼請求次數。
令牌桶
Token Bucket 令牌桶 [適用于需要平滑請求速率的場景,特別是在高并發的情況下,它可以平衡請求流量。]
令牌桶 本質:
先查redis 對應的鍵里面的值 的長度 是否超標了
超標了 就refuse 否則就是pass
插入redis的邏輯: 使用zset (有序集合)在[0, time] 插入值
import redis
import timeredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)# 配置
max_tokens = 5 # 最大令牌數
rate = 1 # 每秒生成一個令牌,每秒只能插入一個新的令牌時間戳(即每秒最多允許一個請求),
# 如果超出這個速率,后續的請求就會被拒絕。
interval = 60 # 限流時間窗口,單位為秒def generate_token(phone_number):# 構造桶鍵bucket_key = f"sms_rate_limit:{phone_number}"# 獲取當前時間(秒),這個就是rate 內容current_time = int(time.time())# 清除過期的令牌# ? 這行代碼用來移除掉超過時間窗口 interval 的過期令牌。# 如果設置的是 interval=60,那么每次請求時都會移除掉超過 60 秒的令牌,# 確保令牌桶中只包含當前時間窗口內的令牌。# 刪除 score 在 [0, current_time - interval] 之間的所有元素。redis_client.zremrangebyscore(bucket_key, 0, current_time - interval)# 獲取當前桶內的令牌數(即時間戳數)tokens = redis_client.zrange(bucket_key, 0, -1)# 如果桶里有足夠的令牌,則拒絕請求if len(tokens) >= max_tokens:return False # 限流拒絕else:# 向桶中添加當前時間戳作為新的令牌redis_client.zadd(bucket_key, {current_time: current_time})return True # 允許請求def check_rate_limit(phone_number):# 獲取桶的令牌數量bucket_key = f"sms_rate_limit:{phone_number}"tokens = redis_client.llen(bucket_key)# 如果桶內令牌超過最大容量,表示請求超限if tokens >= max_tokens:return False # 拒絕請求else:return True # 允許請求
漏桶算法
漏桶算法 (Leaky Bucket)
它的水流速率是固定的,水桶有固定容量。當水桶滿了,任何新的請求都會被丟棄。[適用于流量穩定的場景,具有更強的固定速率處理能力。]
import redis
import timeredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)# 配置
bucket_key = "sms_rate_limit:phone_number"
max_capacity = 5 # 桶的容量,最大允許的請求數
rate = 1 # 處理請求的速率(每秒處理一個請求)def process_request(phone_number):bucket_key = f"sms_rate_limit:{phone_number}"# 當前時間current_time = int(time.time())# 清理過期請求(處理漏桶)redis_client.zremrangebyscore(bucket_key, 0, current_time - 60)# 判斷請求是否超出容量限制if redis_client.zcard(bucket_key) >= max_capacity:return False # 超出請求限制,拒絕請求# 添加當前請求時間戳redis_client.zadd(bucket_key, {current_time: current_time})return True # 允許請求
,但漏桶著重于處理速率限制和固定容量控制,而令牌桶則關注請求的流量速率和生成令牌的動態過程。
滑動窗口限流
? 驗證碼有效期:設置合理的驗證碼過期時間(通常是 3-5 分鐘),避免用戶在很長時間內嘗試。
? 滑動窗口限流:每次請求時,記錄用戶請求的時間戳,在每次請求時檢查用戶在最近一分鐘內的請求次數,如果超過限制,則拒絕該請求。
# 假設我們用 Redis 記錄每個手機號的請求次數
import redis
from time import timeredis_client = redis.StrictRedis(host='localhost', port=6379, db=0)def check_sms_limit(phone_number):key = f"sms_request_count:{phone_number}"current_time = int(time())expire_time = 60 # 1 minutemax_requests = 5 # 最大請求次數# 檢查手機號最近一段時間內的請求次數requests = redis_client.lrange(key, 0, -1)requests = [int(r) for r in requests]# 刪除過期請求(超過1分鐘的請求)requests = [r for r in requests if r > current_time - expire_time]if len(requests) >= max_requests:return False # 超過最大請求次數# 添加當前請求時間redis_client.rpush(key, current_time)redis_client.expire(key, expire_time)return True
驗證碼泄漏
2、問題:用戶的手機可能存在被盜的風險,如何結合其他認證機制提高安全性?比如通過 動態密碼 或 生物識別 進一步加強登錄安全性。
考察點:
? 多因素認證(MFA)。
? 雙重驗證的實現。
雙重驗證(2FA)方案,結合了 一次性密碼(OTP) 和 二維碼生成 來增強系統的安全性。這個過程通常分為兩個步驟:
-
生成一次性密碼:通過一個標準的算法(如 TOTP)生成一次性密碼。
-
二維碼展示與掃描:將 OTP 所需的密鑰(通常是一個隨機生成的密鑰)通過二維碼的方式呈現給用戶,用戶可以使用 TOTP 兼容的應用(如 Google Authenticator 或 Authy)來生成驗證碼。
使用 TOTP (基于時間的一次性密碼) 生成 OTP
TOTP(Time-based One-Time Password)算法基于時間生成一次性密碼,通常使用 HMAC-SHA1 算法。這個算法確保了每隔一段時間生成一個新的密碼。
Time-Based One-Time Password
import pyotp
import qrcode
from hashlib import sha256# 假設用戶的手機號是唯一標識符
user_phone_number = "13800000000"# 使用手機號作為種子生成唯一的 secret
# 使用 hashlib 將手機號進行哈希處理,生成一個固定的 secret
# 這樣即使服務重啟,每次生成的 secret 都是一樣的
secret = pyotp.random_base32() # 你可以先生成一個固定的 secret, 并保存到數據庫# 如果希望生成基于手機號的 secret,可以使用 hashlib 和手機號
hashed_phone = sha256(user_phone_number.encode()).hexdigest()
secret = hashed_phone[:16] # 使用手機號的哈希值的一部分作為 secret# 將用戶手機號與生成的 secret 綁定存儲(此處示例,實際應存數據庫)
user = {"phone_number": user_phone_number,"2fa_enabled": True,"2fa_secret": secret
}# 生成二維碼URL
totp = pyotp.TOTP(secret)
uri = totp.provisioning_uri(user_phone_number, issuer_name="YourApp")# 生成二維碼(可以通過Web頁面顯示)
qr = qrcode.make(uri)
qr.show() # 展示二維碼# 用戶掃碼后,輸入驗證碼(假設用戶輸入了 '123456')
user_input_otp = "123456"# 驗證用戶輸入的OTP是否有效
if totp.verify(user_input_otp):print("2FA Verification Success")
else:print("Invalid OTP")
驗證碼頻繁失效
問題:驗證碼的過期時間通常比較短,但某些場景下可能會發生驗證碼失效,用戶卻沒有及時看到短信,如何處理這種情況?
延長驗證碼的過期時間雖然能夠解決部分問題,但帶來的一些安全和性能隱患也是不容忽視的。最好的解決方案是在驗證碼過期之前提供驗證碼重發、動態刷新或者適當的過期提醒等方式來保障用戶的體驗,同時確保系統的安全性和效率。
跨設備驗證碼
問題:同一個用戶在不同設備上登錄時,可能會因為設備間驗證碼的同步問題導致登錄失敗或需要重復輸入驗證碼。如何在跨設備的場景下保證一致性?
考察點:
? 跨設備的一致性。
? 設備間驗證碼的共享和同步。
. 生成設備標識(Device ID)
每次用戶登錄時,前端可以生成一個唯一的設備標識(Device ID),并將其作為參數與驗證碼一起發送到后端。這個設備標識可以基于設備的硬件信息、安裝的應用ID,或者生成一個唯一的UUID。比如可以通過瀏覽器的 localStorage、sessionStorage 或者移動端的設備ID生成。
2. 發送驗證碼時包含設備標識
后端在發送驗證碼時,將設備標識與手機號和驗證碼一起存儲在 Redis 或數據庫中。這樣,無論用戶在哪個設備上獲取驗證碼,都能根據設備標識進行有效的關聯。