Python 進程間通信:TCP安全加密數據傳輸

最近在寫安全方面的程序,有需求,就做了這些TCP加密數據傳輸類。
utils.safeUtils的內容詳見:

  • SafeObj:Python 高安全性加密數據容器類-CSDN博客
  • SafeKey:Python 高安全性加密密碼容器類-CSDN博客

如有任何問題或漏洞歡迎批評指正!不勝感激!

# 代碼

1. 服務端

import sys
import tracebackimport os
import socket
from threading import Thread
from typing import Callable, Any
import secrets
import struct
from datetime import datetime as dt,timedelta as tdel
from utils.safeUtils import SafeKey,combine_keys,SafeObj,secure_eraseclass Server:def __init__(self, *,sendCallback: Callable[[str, int, bytes], bytes],host: str = "localhost",port: int = 0,errCallback: Callable[[Exception], Any] = lambda err: print(traceback.format_exc())):self.srv = socket.socket()self._host = hostself._port = portself._sendCallback = sendCallbackself._errCallback = errCallbackself._close = Falseself.srv.bind((host, port))self.srv.listen(16)self.srv.settimeout(1)self._mainThread = Thread(None, self._main, "TCP-Server_Main", daemon=True)self._mainThread.start()@propertydef host(self):return self._host@propertydef port(self):return self._portdef _main(self):while not self._close:try:conn, addr = self.srv.accept()except socket.timeout:continuetry:data = b""while block := conn.recv(1024):data += blockcontent = self._sendCallback(addr[0],addr[1],data)conn.send(content)except socket.error as err:self._errCallback(err)finally:conn.close()def close(self):self.srv.close()self._close = Trueself._mainThread.join()class CryptServer(Server):def __init__(self,*args,**kwargs):self.nonce = []super().__init__(*args,**kwargs)print(self.srv.getsockname())def _main(self):def ATTACK_ALERT(o = None,ts = None,n = None,c = None,r = None,s = True,t = "REPLAY or MAN-IN-THE-MIDDLE ATTACK"):L, R = CF.LIGHTWHITE_EX + CS.BRIGHT, CS.RESET_ALLerr = f'''{CB.BLUE}{L}!!! [SECURITY ALERT] !!!: {t}{R}\n{CB.RED}{L}{"?" *22} ATTACK {"?" * 22}{R}\n{"TIMESTAMP(Now/Response): " if s else "ATTACK Count: "}{("/".join(x.strftime("%Y-%m-%dT%H:%M:%S") for x in (o, ts))) if s else str(c)}\n{"NONCE: " if s else "RAM Tamper: "}{n.hex() if s else str(r)}\n{CB.RED}{L}{"?" * 20} SECURITY ALERT {"?" * 20}{R}'''.replace(" " * 4, "")os.popen("msg * SECURITY ALERT: ************ has been ATTACKED");raise RuntimeError(err)last_nonce = len(self.nonce)while not self._close:try:conn, addr = self.srv.accept()print(addr)except socket.timeout:continuekey, safeData, cliKey, mainKey, content, data = (None for _ in range(6)) # 初始化(finally要用)try:key = SafeKey(secrets.token_bytes(128))                       # 每次會話的唯一公鑰key.use_password(conn.sendall,t = bytes)                                   # 向客戶端發送公鑰cliKey = conn.recv(128)                                       # 讀取客戶端私鑰ts = conn.recv(20).decode()                                   # [安全校驗] 讀取時間戳(%Y%m%d%H%M%S%f)ts = dt.strptime(ts,"%Y%m%d%H%M%S%f")                 # [安全校驗] 格式化時間戳n = conn.recv(12)                                             # [安全校驗] 讀取會話唯一 nonce# [安全校驗] 檢測是否可能為重放if abs((o:=dt.now())-ts).total_seconds()>1or n in self.nonce:ATTACK_ALERT(o = o,ts = ts,n = n)else:self.nonce.append(n)mainKey = combine_keys(key.get_password(bytes),cliKey)            # 使用公鑰和私鑰派生主密鑰secure_erase(cliKey)                                              # 擦除客戶端私鑰del key,cliKeylength = struct.unpack("!I",conn.recv(4))[0]              # 讀取數據長度data = conn.recv(length)safeData = SafeObj.from_encrypted_package(data,mainKey)           # 獲取SafeObj對象content = self._sendCallback(addr[0],addr[1],safeData.get_data()) # 從處理函數獲取數據encrypted_package = SafeObj(content,mainKey).get_encrypted_package()length = len(encrypted_package)conn.sendall(struct.pack("!I",length))conn.sendall(encrypted_package)                                                 # 發送加密數據secure_erase(bytearray(encrypted_package))                                   # 安全擦除所有中間數據secure_erase(bytearray(data))secure_erase(bytearray(content))print(mainKey.get_password(bytes),"\n",length,"\n",data,"\n",encrypted_package)except Exception as err:self._errCallback(err)finally:conn.close()try:del keyexcept NameError:passtry:del cliKeyexcept NameError:passdel safeData, mainKey, content, datad = len(self.nonce) - last_nonceif d < 0 or d > 10:ATTACK_ALERT(c = d,r = d < 0,t = "EXPLOSIVE ATTACK or RAM TAMPERING")last_nonce = len(self.nonce)conn.close()print("close:",addr)if __name__ == '__main__':def send(host,port,content):return secrets.token_bytes(256)srv = CryptServer(sendCallback = send)srv._mainThread.join()

2. 客戶端

import socket
import sys
import timeimport secrets
from datetime import datetime
import struct
from utils.safeUtils import SafeKey, combine_keys, SafeObj, secure_eraseclass Client:def __init__(self):self.cli = socket.socket()self.cli.settimeout(10)def connect(self,host:str,port:int):self.cli.connect((host,port))def request(self,data:bytes) -> bytes:self.cli.send(data)return self.cli.recv(1024)def close(self):self.cli.close()class CryptClient(Client):def __init__(self):super().__init__()def request(self, data: bytes) -> bytes:"""安全加密請求流程:1. 接收服務端公鑰2. 生成并發送客戶端私鑰3. 發送時間戳和隨機nonce4. 派生主密鑰5. 加密并發送請求數據6. 接收并解密響應參數:data: 要發送的原始數據(字節類型)返回:bytes: 解密后的響應數據"""# 生成客戶端私鑰(128字節安全隨機令牌)cli_key = secrets.token_bytes(128)# 1. 接收服務端公鑰(128字節)srv_key = self.cli.recv(128)if len(srv_key) != 128:raise ConnectionError("無效的公鑰長度")# 2. 發送客戶端私鑰self.cli.send(cli_key)# 3. 生成并發送安全參數# 時間戳(精確到微秒,20字節字符串)timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")[:20]# 隨機nonce(12字節)nonce = secrets.token_bytes(12)self.cli.send(timestamp.encode('utf-8'))  # 發送時間戳self.cli.send(nonce)  # 發送隨機noncetry:# 4. 派生主密鑰(組合服務端公鑰和客戶端私鑰)main_key = combine_keys(srv_key, cli_key)# 5. 加密請求數據# 創建安全對象加密數據safe_request = SafeObj(data, main_key)# 獲取完整加密包(包含鹽/nonce/標簽/加密數據)encrypted_package = safe_request.get_encrypted_package()# 發送長度length = len(encrypted_package)self.cli.sendall(struct.pack("!I",length))# 發送加密數據包self.cli.sendall(encrypted_package)# 6. 接收服務端響應# 接收數據長度length = struct.unpack("!I",self.cli.recv(4))[0]response = self.cli.recv(length)# 解密響應數據safe_response = SafeObj.from_encrypted_package(response, main_key)return safe_response.get_data()finally:# 安全清除所有敏感數據secure_erase(srv_key)secure_erase(cli_key)secure_erase(nonce)if 'main_key' in locals():del main_keyif 'safe_request' in locals():del safe_requestif 'safe_response' in locals():del safe_responseif __name__ == '__main__':data = b"hello world!" * 100cli = CryptClient()cli.connect("localhost", YOU_SERVER_PORT)print(cli.request(data))cli.close()

3.?utils.safeUtils

import ctypes
import mmap
import os
import pickle
import random
import secrets
import sys
import hashlib
import hmac
import time
from typing import Any, Optional, Callable, Union
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.kdf.hkdf import HKDFfrom utils.Infoget import infogetclass PasswordError(Exception): passclass SafeKey:passclass CryptConfig:erase_count = 7PBKDF2HMAC_iterations = 1000  # 建議1000-100000def secure_erase(buffer) -> None:"""安全清除內存內容(支持多種類型)"""if buffer is None:returntry:if isinstance(buffer, (bytes, bytearray)):# 多次覆蓋內存mutable = bytearray(buffer)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)# 最后填充零for i in range(len(mutable)):mutable[i] = 0# 防止編譯器優化if len(mutable) > 0:ctypes.memset(ctypes.addressof(ctypes.c_char.from_buffer(mutable)), 0, len(mutable))elif isinstance(buffer, mmap.mmap):# 處理內存映射對象buffer.seek(0)data = buffer.read()mutable = bytearray(data)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)for i in range(len(mutable)):mutable[i] = 0buffer.seek(0)buffer.write(mutable)elif isinstance(buffer, str):# 字符串清除 - 創建可變副本mutable = bytearray(buffer.encode('utf-8'))for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)for i in range(len(mutable)):mutable[i] = 0except:passdef combine_keys(key1: bytes, key2: bytes) -> SafeKey:"""安全混合兩個字節密鑰派生主密鑰并返回SafeKey對象參數:key1: 第一個輸入密鑰 (字節類型)key2: 第二個輸入密鑰 (字節類型)返回:SafeKey: 包含派生主密鑰的安全對象步驟:1. 驗證輸入密鑰有效性2. 使用HKDF混合派生密鑰3. 創建SafeKey對象4. 安全清除中間密鑰"""# 1. 驗證輸入密鑰if not key1 or len(key1) == 0:raise ValueError("key1 不能為空")if not key2 or len(key2) == 0:raise ValueError("key2 不能為空")try:# 2. 創建可變密鑰副本用于安全處理mutable_key1 = bytearray(key1)mutable_key2 = bytearray(key2)# 3. 拼接密鑰作為HKDF輸入combined_key = mutable_key1 + mutable_key2# 4. 使用HKDF派生主密鑰hkdf = HKDF(algorithm=hashes.SHA3_512(),length=32,  # 輸出32字節密鑰(AES-256兼容)salt=None,  # 不使用鹽(輸入密鑰已足夠隨機)info=b"SafeKey-Derivation",  # 上下文信息backend=default_backend())# 派生密鑰derived_key = hkdf.derive(combined_key)# 5. 創建SafeKey對象safekey = SafeKey(derived_key)return safekeyfinally:# 6. 安全清除所有中間密鑰(確保即使異常也執行)if 'mutable_key1' in locals():secure_erase(mutable_key1)  # 使用SafeKey中的安全擦除方法if 'mutable_key2' in locals():secure_erase(mutable_key2)if 'combined_key' in locals():secure_erase(combined_key)if 'derived_key' in locals():secure_erase(derived_key)class SafeKey:def __init__(self, password: Union[str,bytes,bytearray]):"""加固版絕對安全的密鑰存儲 - 單次使用"""# 1. 將密碼轉換為可變字節數組if isinstance(password,str):password_bytes = bytearray(password.encode('utf-8'))elif isinstance(password,bytes):password_bytes = bytearray(password)elif isinstance(password,bytearray):password_bytes = passwordelse:password_type = type(password)del passwordraise TypeError(f"unsupported password type: '{password_type}'")# 2. 生成隨機內部主密鑰 (32字節)self._internal_master_key = secrets.token_bytes(32)# 3. 生成隨機noncenonce = secrets.token_bytes(12)# 4. 使用AES-GCM加密密碼cipher = Cipher(algorithms.AES(self._internal_master_key),modes.GCM(nonce),backend=default_backend())encryptor = cipher.encryptor()encrypted = encryptor.update(password_bytes) + encryptor.finalize()# 5. 創建加密數據包并添加HMAC校驗encrypted_package = nonce + encryptor.tag + encryptedhmac_digest = hmac.new(self._internal_master_key,encrypted_package,hashlib.sha3_256).digest()self._encrypted_data = hmac_digest + encrypted_package# 6. 安全清除原始密碼secure_erase(password_bytes)del password_bytesdel password  # 刪除引用# 7. 鎖定內部主密鑰和加密數據在內存中self._locked_master_key = self._lock_in_memory(self._internal_master_key)self._locked_encrypted_data = self._lock_in_memory(self._encrypted_data)# 8. 清除原始密鑰引用secure_erase(self._internal_master_key)self._internal_master_key = Nonesecure_erase(self._encrypted_data)self._encrypted_data = None# 9. 標記對象狀態self._is_active = Trueself._password_ref = None# 10. 反調試保護self._check_security_environment()def _lock_in_memory(self, data: bytes) -> mmap.mmap:"""將數據安全鎖定在內存中,防止交換到磁盤"""size = len(data)# 創建共享內存區域if sys.platform == 'win32':shm = mmap.mmap(-1, size, access=mmap.ACCESS_WRITE)else:# 類Unix系統內存保護PROT_READ = 1PROT_WRITE = 2PROT_READWRITE = PROT_READ | PROT_WRITEshm = mmap.mmap(-1, size, prot=PROT_READWRITE)# 復制數據到鎖定內存shm.write(data)# 鎖定內存防止交換if not self._mlock_memory(shm, size):# 鎖定失敗時立即清除并終止secure_erase(shm)shm.close()self._terminate_with_error("內存鎖定失敗")# 安全清除原始數據secure_erase(data)return shmdef _mlock_memory(self, shm: mmap.mmap, size: int) -> bool:"""鎖定內存防止交換到磁盤(跨平臺實現)"""try:if sys.platform == 'win32':# Windows內存鎖定return ctypes.windll.kernel32.VirtualLock(ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(shm))),size) != 0else:# Unix系統內存鎖定libc = ctypes.CDLL(None)return libc.mlock(ctypes.c_void_p(ctypes.addressof(ctypes.c_char.from_buffer(shm))),size) == 0except:return Falsedef _check_security_environment(self):"""檢查安全環境,防御調試和內存轉儲"""# 1. 檢測調試器if self._is_debugger_present():self._terminate_with_error("檢測到調試器 - 安全終止")# 2. 檢測虛擬機(可選)if self._is_running_in_vm():self._terminate_with_error("虛擬機環境不安全 - 安全終止")def _is_debugger_present(self) -> bool:"""檢測調試器存在"""try:# Windows調試器檢測if sys.platform == 'win32':kernel32 = ctypes.windll.kernel32return kernel32.IsDebuggerPresent() != 0# Linux調試器檢測elif sys.platform.startswith('linux'):try:with open('/proc/self/status', 'r') as status_file:for line in status_file:if line.startswith('TracerPid:'):tracer_pid = int(line.split(':')[1].strip())return tracer_pid != 0except:pass# 檢查LD_PRELOAD劫持if 'LD_PRELOAD' in os.environ:return Trueexcept:passreturn Falsedef _is_running_in_vm(self) -> bool:"""檢測是否在虛擬機中運行"""try:# 簡單虛擬機檢測if sys.platform == 'win32':import winregwith winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,r"SYSTEM\CurrentControlSet\Control\SystemInformation") as key:system_manufacturer = winreg.QueryValueEx(key, "SystemManufacturer")[0]system_product = winreg.QueryValueEx(key, "SystemProductName")[0]vm_indicators = ["VMware", "Virtual", "Xen", "QEMU", "KVM"]return any(indicator in system_manufacturer or indicator in system_product for indicator in vm_indicators)elif sys.platform.startswith('linux'):# 檢查DMI信息dmi_files = ['/sys/class/dmi/id/product_name','/sys/class/dmi/id/sys_vendor']for dmi_file in dmi_files:if os.path.exists(dmi_file):with open(dmi_file, 'r') as f:content = f.read().lower()if 'vmware' in content or 'virtual' in content or 'qemu' in content:return Trueexcept:passreturn Falsedef _terminate_with_error(self, message: str):"""安全終止程序并顯示錯誤消息"""infoget(f"安全錯誤: {message}",type = "err")# 嘗試安全清除內存if hasattr(self, '_locked_master_key'):secure_erase(self._locked_master_key)if hasattr(self, '_locked_encrypted_data'):secure_erase(self._locked_encrypted_data)# 立即終止程序os._exit(1)def __enter__(self):"""進入上下文時獲取密碼訪問對象"""if not self._is_active:raise RuntimeError("SafeKey 已銷毀")# 返回自身而不是直接解密密碼return selfdef get_password(self,t:type = str) -> Union[str,bytes]:"""安全獲取密碼(使用后立即清除)"""if not self._is_active:raise RuntimeError("SafeKey 已銷毀")# 1. 從鎖定內存中獲取加密數據self._locked_encrypted_data.seek(0)full_package = self._locked_encrypted_data.read()# 驗證數據包長度if len(full_package) < 32 + 12 + 16:  # HMAC(32) + nonce(12) + tag(16)self._terminate_with_error("加密數據包損壞")# 2. 分離HMAC和加密數據hmac_digest = full_package[:32]encrypted_package = full_package[32:]# 3. 從鎖定內存中獲取主密鑰self._locked_master_key.seek(0)master_key = self._locked_master_key.read()# 4. 驗證HMACexpected_hmac = hmac.new(master_key,encrypted_package,hashlib.sha3_256).digest()if not hmac.compare_digest(hmac_digest, expected_hmac):self._terminate_with_error("加密數據完整性校驗失敗")# 5. 解包加密數據nonce = encrypted_package[:12]tag = encrypted_package[12:28]ciphertext = encrypted_package[28:]# 6. 使用AES-GCM解密cipher = Cipher(algorithms.AES(master_key),modes.GCM(nonce, tag),backend=default_backend())decryptor = cipher.decryptor()decrypted = decryptor.update(ciphertext) + decryptor.finalize()# 7. 創建可變密碼副本password = decrypted.decode("utf-8") if t == str else decrypted# 8. 安全清除臨時數據secure_erase(full_package)secure_erase(master_key)secure_erase(decrypted)return passworddef use_password(self, callback: callable,t = str) -> None:"""安全使用密碼(自動清除)"""password = self.get_password(t)try:callback(password)finally:# 確保密碼被清除if 'password' in locals():secure_erase(bytearray(password))def __exit__(self, exc_type, exc_value, traceback):"""退出上下文時銷毀所有敏感數據"""# 安全清除并銷毀密碼引用if self._password_ref:secure_erase(self._password_ref)try:self._password_ref.close()except:passself._password_ref = None# 安全清除并銷毀主密鑰if self._locked_master_key:secure_erase(self._locked_master_key)try:self._locked_master_key.close()except:passself._locked_master_key = None# 安全清除并銷毀加密數據if self._locked_encrypted_data:secure_erase(self._locked_encrypted_data)try:self._locked_encrypted_data.close()except:passself._locked_encrypted_data = None# 標記對象為已銷毀self._is_active = Falsedef __del__(self):"""析構時確保內存安全清除"""if hasattr(self, '_is_active') and self._is_active:self.__exit__(None, None, None)class SafeObj:def __init__(self, data: Any, key: Union[str, 'SafeKey']):"""安全數據保護對象:param data: 需要保護的任意類型數據:param key: 用于加密的密鑰(字符串或SafeKey對象)"""# 初始化安全屬性self._key = keyself._encrypted_data: bytes = b''self._salt = os.urandom(16)self._nonce = os.urandom(12)self._tag: bytes = b''self._is_active = True# 加密數據self._encrypt(data)def _secure_memzero(self, buffer) -> None:"""安全清除內存內容"""if buffer is None:returntry:if isinstance(buffer, (bytes, bytearray)):# 多次覆蓋內存mutable = bytearray(buffer)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)# 最后填充零for i in range(len(mutable)):mutable[i] = 0# 防止編譯器優化if len(mutable) > 0:ctypes.memset(ctypes.addressof(ctypes.c_char.from_buffer(mutable)), 0, len(mutable))elif isinstance(buffer, mmap.mmap):# 處理內存映射對象buffer.seek(0)data = buffer.read()mutable = bytearray(data)for _ in range(CryptConfig.erase_count):for i in range(len(mutable)):mutable[i] = secrets.randbelow(256)for i in range(len(mutable)):mutable[i] = 0buffer.seek(0)buffer.write(mutable)except (ValueError, TypeError, OSError):passdef _get_key_bytes(self) -> bytes:"""從密鑰源獲取字節形式密鑰"""if isinstance(self._key, str):# 字符串密鑰直接使用return self._key.encode()elif hasattr(self._key, 'get_password'):# SafeKey對象 - 安全獲取密碼return self._key.get_password(bytes)else:raise TypeError("不支持的密鑰類型")def _derive_key(self, key_length: int = 32) -> bytes:"""派生加密密鑰"""# 獲取原始密鑰字節key_bytes = self._get_key_bytes()# 派生加密密鑰kdf = PBKDF2HMAC(algorithm=hashes.SHA3_512(),length=key_length,salt=self._salt,iterations=CryptConfig.PBKDF2HMAC_iterations,backend=default_backend())derived_key = kdf.derive(key_bytes)# 安全清除中間數據self._secure_memzero(key_bytes)return derived_keydef _encrypt(self, data: Any) -> None:"""加密數據并存儲在內存中"""# 序列化數據serialized = pickle.dumps(data)try:# 派生加密密鑰derived_key = self._derive_key()# 加密數據cipher = Cipher(algorithms.AES(derived_key),modes.GCM(self._nonce),backend=default_backend())encryptor = cipher.encryptor()self._encrypted_data = encryptor.update(serialized) + encryptor.finalize()self._tag = encryptor.tagfinally:# 安全清除中間數據self._secure_memzero(serialized)if "derived_key" in locals():self._secure_memzero(derived_key)def get_data(self) -> Any:"""安全獲取解密后的原始數據- 返回原始數據類型- 調用者負責安全處理返回的數據"""if not self._is_active:raise RuntimeError("SafeObj 已銷毀")# 派生解密密鑰derived_key = self._derive_key()try:# 解密數據cipher = Cipher(algorithms.AES(derived_key),modes.GCM(self._nonce, self._tag),backend=default_backend())decryptor = cipher.decryptor()decrypted = decryptor.update(self._encrypted_data) + decryptor.finalize()# 反序列化數據data = pickle.loads(decrypted)return datafinally:# 安全清除中間數據self._secure_memzero(derived_key)self._secure_memzero(decrypted)def use_data(self, callback: Callable[[Any], None]) -> None:"""安全使用數據(自動清除):param callback: 回調函數,接受解密后的數據作為參數"""data = Nonetry:data = self.get_data()callback(data)finally:# 安全清除數據(如果可能)if data is not None:self._secure_erase_recursive(data)def _secure_erase_recursive(self, obj: Any) -> None:"""遞歸安全清除數據結構中的敏感內容"""try:if isinstance(obj, (bytes, bytearray)):self._secure_memzero(obj)elif isinstance(obj, str):# 字符串不可變,創建可變副本后清除mutable = bytearray(obj.encode('utf-8'))self._secure_memzero(mutable)elif isinstance(obj, dict):for key, value in obj.items():self._secure_erase_recursive(value)self._secure_erase_recursive(key)elif isinstance(obj, (list, tuple, set)):for item in obj:self._secure_erase_recursive(item)elif hasattr(obj, '__dict__'):# 處理自定義對象for attr in vars(obj):self._secure_erase_recursive(getattr(obj, attr))except:pass@propertydef encrypted_data(self) -> bytes:"""獲取加密后的數據(不含鹽和nonce)"""return self._encrypted_datadef get_encrypted_package(self) -> bytes:"""獲取完整加密包(包含鹽、nonce、標簽和加密數據)"""return self._salt + self._nonce + self._tag + self._encrypted_data@classmethoddef from_encrypted_package(cls, encrypted_package: bytes, key: Union[str, 'SafeKey']) -> 'SafeObj':"""從加密包創建SafeObj實例"""if len(encrypted_package) < 16 + 12 + 16:raise ValueError("無效的加密包")# 解包數據salt = encrypted_package[:16]nonce = encrypted_package[16:28]tag = encrypted_package[28:44]encrypted_data = encrypted_package[44:]# 創建虛擬實例instance = cls.__new__(cls)instance._key = keyinstance._salt = saltinstance._nonce = nonceinstance._tag = taginstance._encrypted_data = encrypted_datainstance._is_active = Truereturn instancedef __del__(self):"""析構時確保內存安全清除"""if self._is_active:self._is_active = False# 安全清除所有敏感數據self._secure_memzero(self._salt)self._secure_memzero(self._nonce)self._secure_memzero(self._tag)self._secure_memzero(self._encrypted_data)

# 分析

1. 連接步驟流程

初始化階段:
????????服務端啟動時生成隨機公鑰(128字節),綁定端口監聽連接。
????????客戶端連接服務端后,服務端立即發送公鑰。
密鑰交換階段:
????????客戶端:
????????????????生成隨機私鑰(128字節)并發送給服務端。
????????????????發送精確到微秒的當前時間戳(20字節)和隨機 nonce(12字節)。
????????服務端:
????????????????校驗時間戳(與當前時間差≤1秒)和 nonce(是否重復),防止重放攻擊
????????????????組合服務端公鑰 + 客戶端私鑰,派生主密鑰(HKDF-SHA3-512)
數據傳輸階段:
????????客戶端:
????????????????用主密鑰加密數據(AES-GCM + PBKDF2HMAC-SHA3-512)。
????????????????發送加密數據包(鹽 + nonce + 認證標簽 + 密文)。
????????服務端:
????????????????用主密鑰解密數據,處理業務邏輯。
????????????????加密響應數據并返回相同結構的加密包。
連接終止:
????????雙方安全擦除會話密鑰、臨時密鑰等敏感數據
????????關閉連接。


2. 加密方式

密鑰派生:
????????主密鑰派生:
????????????????服務端公鑰(128B) + 客戶端私鑰(128B) → HKDF-SHA3-512 → 32字節主密鑰。
????????數據加密密鑰派生:
????????????????主密鑰 + 隨機鹽(16B) → PBKDF2HMAC-SHA3-512 → 32字節數據密鑰。
數據加密:
????????算法:
????????????????AES-256-GCM(認證加密)。
????????數據包結構:
????????????????16B鹽 + 12B nonce + 16B認證標簽 + 變長密文。
完整性保護:
????????時間戳 + nonce 防重放。
????????GCM模式提供數據完整性和來源認證。


3. 安全設計

前向安全性:
????????每次會話使用臨時密鑰(公鑰/私鑰對),會話結束立即擦除。
抗重放攻擊:
????????時間戳校驗(1秒窗口) + nonce唯一性檢測(服務端維護nonce列表)。
內存安全:
????????敏感數據擦除:密鑰、中間值用隨機數據覆蓋7次(secure_erase)。
????????內存鎖定:密鑰存儲在不可交換的內存區域(mlock)。
環境安全檢測:
????????反調試:檢查調試器附加(IsDebuggerPresent/TracerPid)。
????????反Hook:驗證關鍵函數地址完整性。
????????反虛擬機:檢測VM特征(注冊表/DMI信息)。
密鑰管理:
????????SafeKey對象:密鑰全程加密存儲,僅在使用時短暫解密。
????????單次使用:密鑰解密后立即擦除,不可復用。


4. 不足

長連接支持(暫未實現):
????????當前設計每次請求新建連接,可擴展為k復用連接的會話模式。

速度(本機測試最大速度僅2Mbps = 256KB/s):

? ? ? ? 加密操作大量耗時,CPU密集型操作。

適用性:
????????僅支持單次、慢速通信(進程間加密數據傳輸),不能用于遠程通信

可能的漏洞:

? ? ? ? 中間人?(MITM)?攻擊(當前無身份驗證)
????????內存耗盡攻擊(惡意請求使服務器nonce列表無限增長)
????????時間窗口攻擊(攻擊者可快速重放合法請求)
????????內存安全殘留風險(Python內部對象清除延時)
????????拒絕服務 (DoS) 漏洞(Nonce 洪水、密鑰派生消耗、連接耗盡等)
????????側信道攻擊(時間、內存、物理監聽等)


制作不易,感謝大家的支持!如有任何問題或建議歡迎評論!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/92024.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/92024.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/92024.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Windows批量修改文件屬性方法

標題使用icacls命令&#xff08;推薦批量操作&#xff09;打開管理員權限的命令提示符&#xff08;CMD&#xff09;執行以下命令&#xff1a;cmd icacls "文件夾路徑" /grant 用戶名:(OI)(CI)F /T /C 參數說明&#xff1a;(OI)&#xff1a;對象繼承 - 適用于文件夾(C…

Entity Component System架構

ECS架構 1 簡介 在當今快速發展的軟件開發領域&#xff0c;游戲開發、實時模擬等場景對系統的性能、靈活性和可擴展性提出了極高的要求。傳統的面向對象架構在面對復雜且動態變化的實體時&#xff0c;往往會出現代碼耦合度高、擴展性差等問題。? ECS&#xff08;Entity - Com…

.vscode 擴展配置

一、vue快捷鍵配置 在項目.vscode下新建vue3.0.code-snippets 每當輸入vue3.0后自動生成代碼片段 {"Vue3.0快速生成模板": {"scope": "vue","prefix": "Vue3.0","body": ["<template>"," &…

一個基于阿里云的C端Java服務的整體項目架構

1.背景介紹 總結一下工作使用到的基于通常的公有云的項目整體架構&#xff0c;如何基于公有云建設安全可靠的服務&#xff0c;以阿里云為例的整體架構&#xff1b;1. 全局流量治理層&#xff08;用戶請求入口&#xff09;1.1 域名與 DNS 解析域名注冊與備案&#xff1a;通過阿里…

《剝開洋蔥看中間件:Node.js請求處理效率與錯誤控制的深層邏輯》

在Node.js的運行時環境中&#xff0c;中間件如同一系列精密咬合的齒輪&#xff0c;驅動著請求從進入到響應的完整旅程&#xff0c;而洋蔥模型則是這組齒輪的傳動系統。它以一種看似矛盾的方式融合了順序與逆序、分離與協作——讓每個處理環節既能獨立工作&#xff0c;又能感知全…

GaussDB union 的用法

1 union 的作用union 運算符用于組合兩個或更多 select 語句的結果集。2 union 使用前提union 中的每個 select 語句必須具有相同的列數這些列也必須具有相似的數據類型每個 select 語句中的列也必須以相同的順序排列3 union 語法select column_name(s) from table1 union sele…

構建足球實時比分APP:REST API與WebSocket接入方案詳解

在開發足球實時比分應用時&#xff0c;數據接入方式的選擇直接影響用戶體驗和系統性能。本文將客觀分析REST API和WebSocket兩種主流接入方案的技術特點、適用場景和實現策略&#xff0c;幫助開發者做出合理選擇。一、REST API&#xff1a;靈活的數據獲取方案核心優勢標準化接口…

Linux文件系統三要素:塊劃分、分區管理與inode結構解析

理解文件系統 我們知道文件可以分為磁盤文件和內存文件&#xff0c;內存文件前面我們已經談過了&#xff0c;下面我們來談談磁盤文件。 目錄 一、引入"塊"概念 解析 stat demo.c 命令輸出 基本信息 設備信息 索引節點信息 權限信息 時間戳 二、引入"分區…

基于paddleDetect的半監督目標檢測實戰

基于paddleDetect的半監督目標檢測實戰前言相關介紹前提條件實驗環境安裝環境項目地址使用paddleDetect的半監督方法訓練自己的數據集準備數據分割數據集配置參數文件PaddleDetection-2.7.0/configs/semi_det/denseteacher/denseteacher_ppyoloe_plus_crn_l_coco_semi010.ymlPa…

計算機網絡:(十)虛擬專用網 VPN 和網絡地址轉換 NAT

計算機網絡&#xff1a;&#xff08;十&#xff09;虛擬專用網 VPN 和網絡地址轉換 NAT前言一、虛擬專用網 VPN1. 基礎概念與作用2. 工作原理3. 常見類型4. 協議對比二、NAT&#xff1a;網絡地址轉換1. 基礎概念與作用2. 工作原理與類型3. 優缺點與問題4. 進階類型三、VPN 與 N…

數位 dp

數位dp 特點 問題大多是指“在 [l,r][l,r][l,r] 的區間內&#xff0c;滿足……的數字的個數、種類&#xff0c;等等。” 但是顯然&#xff0c;出題人想要卡你&#xff0c;rrr 肯定是非常大的&#xff0c;暴力枚舉一定超時。 于是就有了數位 dp。 基本思路 數位 dp 說白了…

Selector的用法

Selector的用法 Selector是基于lxml構建的支持XPath選擇器、CSS選擇器&#xff0c;以及正則表達式&#xff0c;功能全面&#xff0c;解析速度和準確度非常高 from scrapy import Selectorbody <html><head><title>HelloWorld</title></head>&…

Netty封裝Websocket并實現動態路由

引言 關于Netty和Websocket的介紹我就不多講了,網上一搜一大片。現如今AI的趨勢發展很熱門,長連接對話也是會經常接觸到的,使用Websocket實現長連接,那么很多人為了快速開發快速集成就會使用spring-boot-starter-websocket依賴快速實現,但是注意該實現是基于tomcat的,有…

行為型設計模式:解釋器模式

解釋器模式 解釋器模式介紹 解釋器模式使用頻率不算高&#xff0c;通常用來描述如何構建一個簡單“語言”的語法解釋器。它只在一些非常特定的領域被用到&#xff0c;比如編譯器、規則引擎、正則表達式、SQL 解析等。不過&#xff0c;了解它的實現原理同樣很重要&#xff0c;能…

SaTokenException: 未能獲取對應StpLogic 問題解決

&#x1f4dd; Sa-Token 異常處&#xff1a;未能獲取對應StpLogic&#xff0c;typeuser&#x1f9e8; 異常信息 cn.dev33.satoken.exception.SaTokenException: 未能獲取對應StpLogic&#xff0c;typeuser拋出位置&#xff1a; throw new SaTokenException("未能獲取對應S…

Web前端性能優化原理與方法

一、概述 1.1 性能對業務的影響 大部分網站的作用是&#xff1a;產品信息載體、用戶交互工具或商品流通渠道。這就要求網站與更多用戶建立聯系&#xff0c;同時還要保持良好的用戶黏性&#xff0c;所以網站就不能只關注自我表達&#xff0c;而不顧及用戶是否喜歡。看看網站性…

第十八節:第六部分:java高級:注解、自定義注解、元注解

認識注解自定義注解注解的原理元注解常用的兩個元注解代碼&#xff1a; MyTest1&#xff08;注解類&#xff09; package com.itheima.day10_annotation;import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.Retent…

北京科技企業在軟文推廣發稿平臺發布文章,如何精準觸達客戶?

大家好&#xff01;我是你們的老朋友&#xff0c;今天咱們聊聊北京科技企業如何通過軟文推廣發稿平臺精準觸達目標客戶這個話題。作為企業營銷的老司機&#xff0c;我深知在這個信息爆炸的時代&#xff0c;如何讓你的品牌聲音被目標客戶聽到是多么重要。下面就讓我來分享一些實…

UE蒙太奇和動畫序列有什么區別?

在 UE5 中&#xff0c;Animation Sequence&#xff08;動畫序列&#xff09;和 Animation Montage&#xff08;動畫蒙太奇&#xff09;雖然都能播放骨骼動畫&#xff0c;但它們的定位、功能和使用場景有較大區別&#xff1a;1. 概念定位Animation Sequence&#xff08;動畫序列…

Nordic打印RTT[屏蔽打印中的<info> app]

屏蔽打印中的 app Nordic原裝的程序答應是這樣的,這個有" app"打印,因為習慣問題,有時候也不想打印太多造成RTT VIEW顯示被沖點,所以要把" app"去掉:這里把prefix_process函數調用屏蔽到,主要涉及到nrf_log_hexdump_entry_process和nrf_log_std_entry_proc…