最近在寫安全方面的程序,有需求,就做了這些TCP加密數據傳輸類。
utils.safeUtils的內容詳見:
- SafeObj:Python 高安全性加密數據容器類-CSDN博客
- SafeKey:Python 高安全性加密密碼容器類-CSDN博客
如有任何問題或漏洞歡迎批評指正!不勝感激!
# 代碼
1. 服務端
import sys
import tracebackimport os
import socket
from threading import Thread
from typing import Callable, Any
import secrets
import struct
from datetime import datetime as dt,timedelta as tdel
from utils.safeUtils import SafeKey,combine_keys,SafeObj,secure_eraseclass Server:def __init__(self, *,sendCallback: Callable[[str, int, bytes], bytes],host: str = "localhost",port: int = 0,errCallback: Callable[[Exception], Any] = lambda err: print(traceback.format_exc())):self.srv = socket.socket()self._host = hostself._port = portself._sendCallback = sendCallbackself._errCallback = errCallbackself._close = Falseself.srv.bind((host, port))self.srv.listen(16)self.srv.settimeout(1)self._mainThread = Thread(None, self._main, "TCP-Server_Main", daemon=True)self._mainThread.start()@propertydef host(self):return self._host@propertydef port(self):return self._portdef _main(self):while not self._close:try:conn, addr = self.srv.accept()except socket.timeout:continuetry:data = b""while block := conn.recv(1024):data += blockcontent = self._sendCallback(addr[0],addr[1],data)conn.send(content)except socket.error as err:self._errCallback(err)finally:conn.close()def close(self):self.srv.close()self._close = Trueself._mainThread.join()class CryptServer(Server):def __init__(self,*args,**kwargs):self.nonce = []super().__init__(*args,**kwargs)print(self.srv.getsockname())def _main(self):def ATTACK_ALERT(o = None,ts = None,n = None,c = None,r = None,s = True,t = "REPLAY or MAN-IN-THE-MIDDLE ATTACK"):L, R = CF.LIGHTWHITE_EX + CS.BRIGHT, CS.RESET_ALLerr = f'''{CB.BLUE}{L}!!! [SECURITY ALERT] !!!: {t}{R}\n{CB.RED}{L}{"?" *22} ATTACK {"?" * 22}{R}\n{"TIMESTAMP(Now/Response): " if s else "ATTACK Count: "}{("/".join(x.strftime("%Y-%m-%dT%H:%M:%S") for x in (o, ts))) if s else str(c)}\n{"NONCE: " if s else "RAM Tamper: "}{n.hex() if s else str(r)}\n{CB.RED}{L}{"?" * 20} SECURITY ALERT {"?" * 20}{R}'''.replace(" " * 4, "")os.popen("msg * SECURITY ALERT: ************ has been ATTACKED");raise RuntimeError(err)last_nonce = len(self.nonce)while not self._close:try:conn, addr = self.srv.accept()print(addr)except socket.timeout:continuekey, safeData, cliKey, mainKey, content, data = (None for _ in range(6)) # 初始化(finally要用)try:key = SafeKey(secrets.token_bytes(128)) # 每次會話的唯一公鑰key.use_password(conn.sendall,t = bytes) # 向客戶端發送公鑰cliKey = conn.recv(128) # 讀取客戶端私鑰ts = conn.recv(20).decode() # [安全校驗] 讀取時間戳(%Y%m%d%H%M%S%f)ts = dt.strptime(ts,"%Y%m%d%H%M%S%f") # [安全校驗] 格式化時間戳n = conn.recv(12) # [安全校驗] 讀取會話唯一 nonce# [安全校驗] 檢測是否可能為重放if abs((o:=dt.now())-ts).total_seconds()>1or n in self.nonce:ATTACK_ALERT(o = o,ts = ts,n = n)else:self.nonce.append(n)mainKey = combine_keys(key.get_password(bytes),cliKey) # 使用公鑰和私鑰派生主密鑰secure_erase(cliKey) # 擦除客戶端私鑰del key,cliKeylength = struct.unpack("!I",conn.recv(4))[0] # 讀取數據長度data = conn.recv(length)safeData = SafeObj.from_encrypted_package(data,mainKey) # 獲取SafeObj對象content = self._sendCallback(addr[0],addr[1],safeData.get_data()) # 從處理函數獲取數據encrypted_package = SafeObj(content,mainKey).get_encrypted_package()length = len(encrypted_package)conn.sendall(struct.pack("!I",length))conn.sendall(encrypted_package) # 發送加密數據secure_erase(bytearray(encrypted_package)) # 安全擦除所有中間數據secure_erase(bytearray(data))secure_erase(bytearray(content))print(mainKey.get_password(bytes),"\n",length,"\n",data,"\n",encrypted_package)except Exception as err:self._errCallback(err)finally:conn.close()try:del keyexcept NameError:passtry:del cliKeyexcept NameError:passdel safeData, mainKey, content, datad = len(self.nonce) - last_nonceif d < 0 or d > 10:ATTACK_ALERT(c = d,r = d < 0,t = "EXPLOSIVE ATTACK or RAM TAMPERING")last_nonce = len(self.nonce)conn.close()print("close:",addr)if __name__ == '__main__':def send(host,port,content):return secrets.token_bytes(256)srv = CryptServer(sendCallback = send)srv._mainThread.join()
2. 客戶端
import socket
import sys
import timeimport secrets
from datetime import datetime
import struct
from utils.safeUtils import SafeKey, combine_keys, SafeObj, secure_eraseclass Client:def __init__(self):self.cli = socket.socket()self.cli.settimeout(10)def connect(self,host:str,port:int):self.cli.connect((host,port))def request(self,data:bytes) -> bytes:self.cli.send(data)return self.cli.recv(1024)def close(self):self.cli.close()class CryptClient(Client):def __init__(self):super().__init__()def request(self, data: bytes) -> bytes:"""安全加密請求流程:1. 接收服務端公鑰2. 生成并發送客戶端私鑰3. 發送時間戳和隨機nonce4. 派生主密鑰5. 加密并發送請求數據6. 接收并解密響應參數:data: 要發送的原始數據(字節類型)返回:bytes: 解密后的響應數據"""# 生成客戶端私鑰(128字節安全隨機令牌)cli_key = secrets.token_bytes(128)# 1. 接收服務端公鑰(128字節)srv_key = self.cli.recv(128)if len(srv_key) != 128:raise ConnectionError("無效的公鑰長度")# 2. 發送客戶端私鑰self.cli.send(cli_key)# 3. 生成并發送安全參數# 時間戳(精確到微秒,20字節字符串)timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")[:20]# 隨機nonce(12字節)nonce = secrets.token_bytes(12)self.cli.send(timestamp.encode('utf-8')) # 發送時間戳self.cli.send(nonce) # 發送隨機noncetry:# 4. 派生主密鑰(組合服務端公鑰和客戶端私鑰)main_key = combine_keys(srv_key, cli_key)# 5. 加密請求數據# 創建安全對象加密數據safe_request = SafeObj(data, main_key)# 獲取完整加密包(包含鹽/nonce/標簽/加密數據)encrypted_package = safe_request.get_encrypted_package()# 發送長度length = len(encrypted_package)self.cli.sendall(struct.pack("!I",length))# 發送加密數據包self.cli.sendall(encrypted_package)# 6. 接收服務端響應# 接收數據長度length = struct.unpack("!I",self.cli.recv(4))[0]response = self.cli.recv(length)# 解密響應數據safe_response = SafeObj.from_encrypted_package(response, main_key)return safe_response.get_data()finally:# 安全清除所有敏感數據secure_erase(srv_key)secure_erase(cli_key)secure_erase(nonce)if 'main_key' in locals():del main_keyif 'safe_request' in locals():del safe_requestif 'safe_response' in locals():del safe_responseif __name__ == '__main__':data = b"hello world!" * 100cli = CryptClient()cli.connect("localhost", YOU_SERVER_PORT)print(cli.request(data))cli.close()
3.?utils.safeUtils
import ctypes
import mmap
import os
import pickle
import random
import secrets
import sys
import hashlib
import hmac
import time
from typing import Any, Optional, Callable, Union
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.kdf.hkdf import HKDFfrom utils.Infoget import infogetclass PasswordError(Exception): passclass SafeKey:passclass CryptConfig:erase_count = 7PBKDF2HMAC_iterations = 1000 # 建議1000-100000def secure_erase(buffer) -> None:"""安全清除內存內容(支持多種類型)"""if buffer is None:returntry:if isinstance(buffer, (bytes, bytearray)):# 多次覆蓋內存mutable = bytearray(buffer)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)# 最后填充零for i in range(len(mutable)):mutable[i] = 0# 防止編譯器優化if len(mutable) > 0:ctypes.memset(ctypes.addressof(ctypes.c_char.from_buffer(mutable)), 0, len(mutable))elif isinstance(buffer, mmap.mmap):# 處理內存映射對象buffer.seek(0)data = buffer.read()mutable = bytearray(data)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)for i in range(len(mutable)):mutable[i] = 0buffer.seek(0)buffer.write(mutable)elif isinstance(buffer, str):# 字符串清除 - 創建可變副本mutable = bytearray(buffer.encode('utf-8'))for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)for i in range(len(mutable)):mutable[i] = 0except:passdef combine_keys(key1: bytes, key2: bytes) -> SafeKey:"""安全混合兩個字節密鑰派生主密鑰并返回SafeKey對象參數:key1: 第一個輸入密鑰 (字節類型)key2: 第二個輸入密鑰 (字節類型)返回:SafeKey: 包含派生主密鑰的安全對象步驟:1. 驗證輸入密鑰有效性2. 使用HKDF混合派生密鑰3. 創建SafeKey對象4. 安全清除中間密鑰"""# 1. 驗證輸入密鑰if not key1 or len(key1) == 0:raise ValueError("key1 不能為空")if not key2 or len(key2) == 0:raise ValueError("key2 不能為空")try:# 2. 創建可變密鑰副本用于安全處理mutable_key1 = bytearray(key1)mutable_key2 = bytearray(key2)# 3. 拼接密鑰作為HKDF輸入combined_key = mutable_key1 + mutable_key2# 4. 使用HKDF派生主密鑰hkdf = HKDF(algorithm=hashes.SHA3_512(),length=32, # 輸出32字節密鑰(AES-256兼容)salt=None, # 不使用鹽(輸入密鑰已足夠隨機)info=b"SafeKey-Derivation", # 上下文信息backend=default_backend())# 派生密鑰derived_key = hkdf.derive(combined_key)# 5. 創建SafeKey對象safekey = SafeKey(derived_key)return safekeyfinally:# 6. 安全清除所有中間密鑰(確保即使異常也執行)if 'mutable_key1' in locals():secure_erase(mutable_key1) # 使用SafeKey中的安全擦除方法if 'mutable_key2' in locals():secure_erase(mutable_key2)if 'combined_key' in locals():secure_erase(combined_key)if 'derived_key' in locals():secure_erase(derived_key)class SafeKey:def __init__(self, password: Union[str,bytes,bytearray]):"""加固版絕對安全的密鑰存儲 - 單次使用"""# 1. 將密碼轉換為可變字節數組if isinstance(password,str):password_bytes = bytearray(password.encode('utf-8'))elif isinstance(password,bytes):password_bytes = bytearray(password)elif isinstance(password,bytearray):password_bytes = passwordelse:password_type = type(password)del passwordraise TypeError(f"unsupported password type: '{password_type}'")# 2. 生成隨機內部主密鑰 (32字節)self._internal_master_key = secrets.token_bytes(32)# 3. 生成隨機noncenonce = secrets.token_bytes(12)# 4. 使用AES-GCM加密密碼cipher = Cipher(algorithms.AES(self._internal_master_key),modes.GCM(nonce),backend=default_backend())encryptor = cipher.encryptor()encrypted = encryptor.update(password_bytes) + encryptor.finalize()# 5. 創建加密數據包并添加HMAC校驗encrypted_package = nonce + encryptor.tag + encryptedhmac_digest = hmac.new(self._internal_master_key,encrypted_package,hashlib.sha3_256).digest()self._encrypted_data = hmac_digest + encrypted_package# 6. 安全清除原始密碼secure_erase(password_bytes)del password_bytesdel password # 刪除引用# 7. 鎖定內部主密鑰和加密數據在內存中self._locked_master_key = self._lock_in_memory(self._internal_master_key)self._locked_encrypted_data = self._lock_in_memory(self._encrypted_data)# 8. 清除原始密鑰引用secure_erase(self._internal_master_key)self._internal_master_key = Nonesecure_erase(self._encrypted_data)self._encrypted_data = None# 9. 標記對象狀態self._is_active = Trueself._password_ref = None# 10. 反調試保護self._check_security_environment()def _lock_in_memory(self, data: bytes) -> mmap.mmap:"""將數據安全鎖定在內存中,防止交換到磁盤"""size = len(data)# 創建共享內存區域if sys.platform == 'win32':shm = mmap.mmap(-1, size, access=mmap.ACCESS_WRITE)else:# 類Unix系統內存保護PROT_READ = 1PROT_WRITE = 2PROT_READWRITE = PROT_READ | PROT_WRITEshm = mmap.mmap(-1, size, prot=PROT_READWRITE)# 復制數據到鎖定內存shm.write(data)# 鎖定內存防止交換if not self._mlock_memory(shm, size):# 鎖定失敗時立即清除并終止secure_erase(shm)shm.close()self._terminate_with_error("內存鎖定失敗")# 安全清除原始數據secure_erase(data)return shmdef _mlock_memory(self, shm: mmap.mmap, size: int) -> bool:"""鎖定內存防止交換到磁盤(跨平臺實現)"""try:if sys.platform == 'win32':# Windows內存鎖定return ctypes.windll.kernel32.VirtualLock(ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(shm))),size) != 0else:# Unix系統內存鎖定libc = ctypes.CDLL(None)return libc.mlock(ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(shm))),size) == 0except:return Falsedef _check_security_environment(self):"""檢查安全環境,防御調試和內存轉儲"""# 1. 檢測調試器if self._is_debugger_present():self._terminate_with_error("檢測到調試器 - 安全終止")# 2. 檢測虛擬機(可選)if self._is_running_in_vm():self._terminate_with_error("虛擬機環境不安全 - 安全終止")def _is_debugger_present(self) -> bool:"""檢測調試器存在"""try:# Windows調試器檢測if sys.platform == 'win32':kernel32 = ctypes.windll.kernel32return kernel32.IsDebuggerPresent() != 0# Linux調試器檢測elif sys.platform.startswith('linux'):try:with open('/proc/self/status', 'r') as status_file:for line in status_file:if line.startswith('TracerPid:'):tracer_pid = int(line.split(':')[1].strip())return tracer_pid != 0except:pass# 檢查LD_PRELOAD劫持if 'LD_PRELOAD' in os.environ:return Trueexcept:passreturn Falsedef _is_running_in_vm(self) -> bool:"""檢測是否在虛擬機中運行"""try:# 簡單虛擬機檢測if sys.platform == 'win32':import winregwith winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,r"SYSTEM\CurrentControlSet\Control\SystemInformation") as key:system_manufacturer = winreg.QueryValueEx(key, "SystemManufacturer")[0]system_product = winreg.QueryValueEx(key, "SystemProductName")[0]vm_indicators = ["VMware", "Virtual", "Xen", "QEMU", "KVM"]return any(indicator in system_manufacturer or indicator in system_product for indicator in vm_indicators)elif sys.platform.startswith('linux'):# 檢查DMI信息dmi_files = ['/sys/class/dmi/id/product_name','/sys/class/dmi/id/sys_vendor']for dmi_file in dmi_files:if os.path.exists(dmi_file):with open(dmi_file, 'r') as f:content = f.read().lower()if 'vmware' in content or 'virtual' in content or 'qemu' in content:return Trueexcept:passreturn Falsedef _terminate_with_error(self, message: str):"""安全終止程序并顯示錯誤消息"""infoget(f"安全錯誤: {message}",type = "err")# 嘗試安全清除內存if hasattr(self, '_locked_master_key'):secure_erase(self._locked_master_key)if hasattr(self, '_locked_encrypted_data'):secure_erase(self._locked_encrypted_data)# 立即終止程序os._exit(1)def __enter__(self):"""進入上下文時獲取密碼訪問對象"""if not self._is_active:raise RuntimeError("SafeKey 已銷毀")# 返回自身而不是直接解密密碼return selfdef get_password(self,t:type = str) -> Union[str,bytes]:"""安全獲取密碼(使用后立即清除)"""if not self._is_active:raise RuntimeError("SafeKey 已銷毀")# 1. 從鎖定內存中獲取加密數據self._locked_encrypted_data.seek(0)full_package = self._locked_encrypted_data.read()# 驗證數據包長度if len(full_package) < 32 + 12 + 16: # HMAC(32) + nonce(12) + tag(16)self._terminate_with_error("加密數據包損壞")# 2. 分離HMAC和加密數據hmac_digest = full_package[:32]encrypted_package = full_package[32:]# 3. 從鎖定內存中獲取主密鑰self._locked_master_key.seek(0)master_key = self._locked_master_key.read()# 4. 驗證HMACexpected_hmac = hmac.new(master_key,encrypted_package,hashlib.sha3_256).digest()if not hmac.compare_digest(hmac_digest, expected_hmac):self._terminate_with_error("加密數據完整性校驗失敗")# 5. 解包加密數據nonce = encrypted_package[:12]tag = encrypted_package[12:28]ciphertext = encrypted_package[28:]# 6. 使用AES-GCM解密cipher = Cipher(algorithms.AES(master_key),modes.GCM(nonce, tag),backend=default_backend())decryptor = cipher.decryptor()decrypted = decryptor.update(ciphertext) + decryptor.finalize()# 7. 創建可變密碼副本password = decrypted.decode("utf-8") if t == str else decrypted# 8. 安全清除臨時數據secure_erase(full_package)secure_erase(master_key)secure_erase(decrypted)return passworddef use_password(self, callback: callable,t = str) -> None:"""安全使用密碼(自動清除)"""password = self.get_password(t)try:callback(password)finally:# 確保密碼被清除if 'password' in locals():secure_erase(bytearray(password))def __exit__(self, exc_type, exc_value, traceback):"""退出上下文時銷毀所有敏感數據"""# 安全清除并銷毀密碼引用if self._password_ref:secure_erase(self._password_ref)try:self._password_ref.close()except:passself._password_ref = None# 安全清除并銷毀主密鑰if self._locked_master_key:secure_erase(self._locked_master_key)try:self._locked_master_key.close()except:passself._locked_master_key = None# 安全清除并銷毀加密數據if self._locked_encrypted_data:secure_erase(self._locked_encrypted_data)try:self._locked_encrypted_data.close()except:passself._locked_encrypted_data = None# 標記對象為已銷毀self._is_active = Falsedef __del__(self):"""析構時確保內存安全清除"""if hasattr(self, '_is_active') and self._is_active:self.__exit__(None, None, None)class SafeObj:def __init__(self, data: Any, key: Union[str, 'SafeKey']):"""安全數據保護對象:param data: 需要保護的任意類型數據:param key: 用于加密的密鑰(字符串或SafeKey對象)"""# 初始化安全屬性self._key = keyself._encrypted_data: bytes = b''self._salt = os.urandom(16)self._nonce = os.urandom(12)self._tag: bytes = b''self._is_active = True# 加密數據self._encrypt(data)def _secure_memzero(self, buffer) -> None:"""安全清除內存內容"""if buffer is None:returntry:if isinstance(buffer, (bytes, bytearray)):# 多次覆蓋內存mutable = bytearray(buffer)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)# 最后填充零for i in range(len(mutable)):mutable[i] = 0# 防止編譯器優化if len(mutable) > 0:ctypes.memset(ctypes.addressof(ctypes.c_char.from_buffer(mutable)), 0, len(mutable))elif isinstance(buffer, mmap.mmap):# 處理內存映射對象buffer.seek(0)data = buffer.read()mutable = bytearray(data)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)for i in range(len(mutable)):mutable[i] = 0buffer.seek(0)buffer.write(mutable)except (ValueError, TypeError, OSError):passdef _get_key_bytes(self) -> bytes:"""從密鑰源獲取字節形式密鑰"""if isinstance(self._key, str):# 字符串密鑰直接使用return self._key.encode()elif hasattr(self._key, 'get_password'):# SafeKey對象 - 安全獲取密碼return self._key.get_password(bytes)else:raise TypeError("不支持的密鑰類型")def _derive_key(self, key_length: int = 32) -> bytes:"""派生加密密鑰"""# 獲取原始密鑰字節key_bytes = self._get_key_bytes()# 派生加密密鑰kdf = PBKDF2HMAC(algorithm=hashes.SHA3_512(),length=key_length,salt=self._salt,iterations=CryptConfig.PBKDF2HMAC_iterations,backend=default_backend())derived_key = kdf.derive(key_bytes)# 安全清除中間數據self._secure_memzero(key_bytes)return derived_keydef _encrypt(self, data: Any) -> None:"""加密數據并存儲在內存中"""# 序列化數據serialized = pickle.dumps(data)try:# 派生加密密鑰derived_key = self._derive_key()# 加密數據cipher = Cipher(algorithms.AES(derived_key),modes.GCM(self._nonce),backend=default_backend())encryptor = cipher.encryptor()self._encrypted_data = encryptor.update(serialized) + encryptor.finalize()self._tag = encryptor.tagfinally:# 安全清除中間數據self._secure_memzero(serialized)if "derived_key" in locals():self._secure_memzero(derived_key)def get_data(self) -> Any:"""安全獲取解密后的原始數據- 返回原始數據類型- 調用者負責安全處理返回的數據"""if not self._is_active:raise RuntimeError("SafeObj 已銷毀")# 派生解密密鑰derived_key = self._derive_key()try:# 解密數據cipher = Cipher(algorithms.AES(derived_key),modes.GCM(self._nonce, self._tag),backend=default_backend())decryptor = cipher.decryptor()decrypted = decryptor.update(self._encrypted_data) + decryptor.finalize()# 反序列化數據data = pickle.loads(decrypted)return datafinally:# 安全清除中間數據self._secure_memzero(derived_key)self._secure_memzero(decrypted)def use_data(self, callback: Callable[[Any], None]) -> None:"""安全使用數據(自動清除):param callback: 回調函數,接受解密后的數據作為參數"""data = Nonetry:data = self.get_data()callback(data)finally:# 安全清除數據(如果可能)if data is not None:self._secure_erase_recursive(data)def _secure_erase_recursive(self, obj: Any) -> None:"""遞歸安全清除數據結構中的敏感內容"""try:if isinstance(obj, (bytes, bytearray)):self._secure_memzero(obj)elif isinstance(obj, str):# 字符串不可變,創建可變副本后清除mutable = bytearray(obj.encode('utf-8'))self._secure_memzero(mutable)elif isinstance(obj, dict):for key, value in obj.items():self._secure_erase_recursive(value)self._secure_erase_recursive(key)elif isinstance(obj, (list, tuple, set)):for item in obj:self._secure_erase_recursive(item)elif hasattr(obj, '__dict__'):# 處理自定義對象for attr in vars(obj):self._secure_erase_recursive(getattr(obj, attr))except:pass@propertydef encrypted_data(self) -> bytes:"""獲取加密后的數據(不含鹽和nonce)"""return self._encrypted_datadef get_encrypted_package(self) -> bytes:"""獲取完整加密包(包含鹽、nonce、標簽和加密數據)"""return self._salt + self._nonce + self._tag + self._encrypted_data@classmethoddef from_encrypted_package(cls, encrypted_package: bytes, key: Union[str, 'SafeKey']) -> 'SafeObj':"""從加密包創建SafeObj實例"""if len(encrypted_package) < 16 + 12 + 16:raise ValueError("無效的加密包")# 解包數據salt = encrypted_package[:16]nonce = encrypted_package[16:28]tag = encrypted_package[28:44]encrypted_data = encrypted_package[44:]# 創建虛擬實例instance = cls.__new__(cls)instance._key = keyinstance._salt = saltinstance._nonce = nonceinstance._tag = taginstance._encrypted_data = encrypted_datainstance._is_active = Truereturn instancedef __del__(self):"""析構時確保內存安全清除"""if self._is_active:self._is_active = False# 安全清除所有敏感數據self._secure_memzero(self._salt)self._secure_memzero(self._nonce)self._secure_memzero(self._tag)self._secure_memzero(self._encrypted_data)
# 分析
1. 連接步驟流程
初始化階段:
????????服務端啟動時生成隨機公鑰(128字節),綁定端口監聽連接。
????????客戶端連接服務端后,服務端立即發送公鑰。
密鑰交換階段:
????????客戶端:
????????????????生成隨機私鑰(128字節)并發送給服務端。
????????????????發送精確到微秒的當前時間戳(20字節)和隨機 nonce(12字節)。
????????服務端:
????????????????校驗時間戳(與當前時間差≤1秒)和 nonce(是否重復),防止重放攻擊。
????????????????組合服務端公鑰 + 客戶端私鑰,派生主密鑰(HKDF-SHA3-512)。
數據傳輸階段:
????????客戶端:
????????????????用主密鑰加密數據(AES-GCM + PBKDF2HMAC-SHA3-512)。
????????????????發送加密數據包(鹽 + nonce + 認證標簽 + 密文)。
????????服務端:
????????????????用主密鑰解密數據,處理業務邏輯。
????????????????加密響應數據并返回相同結構的加密包。
連接終止:
????????雙方安全擦除會話密鑰、臨時密鑰等敏感數據。
????????關閉連接。
2. 加密方式
密鑰派生:
????????主密鑰派生:
????????????????服務端公鑰(128B) + 客戶端私鑰(128B) → HKDF-SHA3-512 → 32字節主密鑰。
????????數據加密密鑰派生:
????????????????主密鑰 + 隨機鹽(16B) → PBKDF2HMAC-SHA3-512 → 32字節數據密鑰。
數據加密:
????????算法:
????????????????AES-256-GCM(認證加密)。
????????數據包結構:
????????????????16B鹽 + 12B nonce + 16B認證標簽 + 變長密文。
完整性保護:
????????時間戳 + nonce 防重放。
????????GCM模式提供數據完整性和來源認證。
3. 安全設計
前向安全性:
????????每次會話使用臨時密鑰(公鑰/私鑰對),會話結束立即擦除。
抗重放攻擊:
????????時間戳校驗(1秒窗口) + nonce唯一性檢測(服務端維護nonce列表)。
內存安全:
????????敏感數據擦除:密鑰、中間值用隨機數據覆蓋7次(secure_erase)。
????????內存鎖定:密鑰存儲在不可交換的內存區域(mlock)。
環境安全檢測:
????????反調試:檢查調試器附加(IsDebuggerPresent/TracerPid)。
????????反Hook:驗證關鍵函數地址完整性。
????????反虛擬機:檢測VM特征(注冊表/DMI信息)。
密鑰管理:
????????SafeKey對象:密鑰全程加密存儲,僅在使用時短暫解密。
????????單次使用:密鑰解密后立即擦除,不可復用。
4. 不足
長連接支持(暫未實現):
????????當前設計每次請求新建連接,可擴展為k復用連接的會話模式。
速度(本機測試最大速度僅2Mbps = 256KB/s):
? ? ? ? 加密操作大量耗時,CPU密集型操作。
適用性:
????????僅支持單次、慢速通信(進程間加密數據傳輸),不能用于遠程通信。
可能的漏洞:
? ? ? ? 中間人?(MITM)?攻擊(當前無身份驗證)
????????內存耗盡攻擊(惡意請求使服務器nonce列表無限增長)
????????時間窗口攻擊(攻擊者可快速重放合法請求)
????????內存安全殘留風險(Python內部對象清除延時)
????????拒絕服務 (DoS) 漏洞(Nonce 洪水、密鑰派生消耗、連接耗盡等)
????????側信道攻擊(時間、內存、物理監聽等)
制作不易,感謝大家的支持!如有任何問題或建議歡迎評論!