在當今數字化時代,分布式系統的重要性愈發凸顯。它不僅能提升數據的存儲安全性和可靠性,還能增強通信的效率和隱私性。于是我做了這個去中心化共享及通訊的程序,它構建了一個強大的分布式存儲和通信網絡,下面我們就來詳細了解其實現原理和核心功能。
程序概述
實現了一個基于點對點(P2P)網絡的分布式存儲和通信系統。該系統允許節點加入網絡、共享文件、存儲文件塊以及進行加密通信。它采用了分布式哈希表(DHT)的思想,通過節點間的協作來存儲和檢索文件。
核心類與功能
1.?Node
?類
Node
?類代表網絡中的一個節點,它包含了節點的基本信息和狀態。以下是該類的主要功能:
- 地址簿管理:維護一個地址簿,記錄網絡中其他節點的信息,包括主機名、端口號和最后活躍時間。
- 文件管理:存儲文件的元數據和文件塊,支持文件的注冊和廣播。
- 加密密鑰管理:使用 SHA - 256 算法為每個節點生成共享加密密鑰,確保通信的安全性。
class Node:def __init__(self, host, port, domain):self.host = hostself.port = portself.domain = domainself.address_book = {} # {domain: (host, port, last_seen)}self.files = {} # {file_hash: {'name': str, 'size': int, 'chunks': list}}self.stored_chunks = {} # {chunk_hash: data}self.network_initialized = Falseself.is_bootstrap = Falseself.peer_lock = threading.Lock()self.file_lock = threading.Lock()self.shared_secrets = {} # 存儲加密密鑰# 初始化時將自己加入地址簿self.add_peer(domain, host, port)
2.?NetworkManager
?類
NetworkManager
?類負責節點的網絡通信和管理,它處理節點的加入、消息傳遞、文件上傳和下載等任務。以下是該類的主要功能:
- 服務器啟動:啟動一個 TCP 服務器,監聽其他節點的連接請求。
- 節點加入網絡:允許節點通過引導節點加入網絡,并同步地址簿信息。
- 消息傳遞:支持直接消息和中繼消息的發送和接收,確保通信的可靠性。
- 文件操作:實現文件的上傳、下載和存儲,將文件分割成塊并存儲在多個節點上。
class NetworkManager:def __init__(self, node, log_callback):self.node = nodeself.log_callback = log_callbackself.server_socket = Noneself.running = False# 啟動節點狀態檢查self.status_check_timer = threading.Timer(PING_INTERVAL, self.check_peer_status)self.status_check_timer.daemon = Trueself.status_check_timer.start()def start_server(self):self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.server_socket.bind((self.node.host, self.node.port))self.server_socket.listen(5)self.running = Trueserver_thread = threading.Thread(target=self._accept_connections)server_thread.daemon = Trueserver_thread.start()self.log_callback(f"服務器已啟動在 {self.node.host}:{self.node.port}")
核心功能實現
1. 節點加入網絡
節點通過向引導節點發送?join
?請求來加入網絡。引導節點收到請求后,會檢查域名是否已存在,如果不存在則將新節點添加到地址簿,并廣播新節點信息給其他節點。
def _handle_join(self, message, client_socket):domain = message['domain']host = message['host']port = message['port']# 檢查域名是否已存在if domain in self.node.address_book:response = {'type': 'join_error','message': '域名已存在'}client_socket.send(json.dumps(response).encode())self.log_callback(f"拒絕加入: 域名 {domain} 已存在")return# 添加新節點到地址簿self.node.add_peer(domain, host, port)# 發送完整的地址簿給新節點response = {'type': 'join_ack','bootstrap': self.node.is_bootstrap,'address_book': self.node.get_all_peers_with_status()}client_socket.send(json.dumps(response).encode())# 通知日志self.log_callback(f"新節點加入: {domain} ({host}:{port})")# 廣播新節點信息給所有已知節點(除了新節點自己)self._broadcast_new_node(domain, host, port)
2. 文件上傳與下載
文件上傳時,系統會將文件分割成多個塊,并將每個塊存儲在多個節點上。同時,文件的元數據會被廣播到網絡中的其他節點。文件下載時,系統會首先獲取文件的元數據,然后從各個節點收集文件塊,最后驗證文件的完整性并保存。
def upload_file(self, file_path, num_chunks=4, num_replicas=2):try:file_name = os.path.basename(file_path)file_size = os.path.getsize(file_path)# 讀取文件內容with open(file_path, 'rb') as f:file_data = f.read()# 創建文件哈希file_hash = hashlib.sha256(file_data).hexdigest()# 分割文件為多個塊chunks = []chunk_size = max(FILE_CHUNK_SIZE, (len(file_data) + num_chunks - 1) // num_chunks)for i in range(0, len(file_data), chunk_size):chunk = file_data[i:i+chunk_size]chunk_hash = hashlib.sha256(chunk).hexdigest()chunks.append(chunk_hash)# 存儲塊到多個節點self._store_chunk(chunk, chunk_hash, num_replicas)# 注冊文件元數據metadata = {'name': file_name,'size': file_size,'chunks': chunks}self.node.register_file(file_hash, file_name, file_size, chunks)# 廣播文件元數據self.node.broadcast_file_metadata(file_hash, metadata)# 通知日志self.log_callback(f"文件上傳成功: {file_name} (哈希: {file_hash})")return file_hashexcept Exception as e:self.log_callback(f"文件上傳失敗: {str(e)}")return None
3. 加密通信
系統使用 AES 算法對消息進行加密,確保通信內容的隱私性。每個節點會根據雙方的域名生成一個共享密鑰,用于加密和解密消息。
def _generate_shared_key(self, target_domain):"""生成共享密鑰 - 基于雙方域名生成確定性密鑰"""# 按字母順序排序域名以確保雙方生成相同密鑰domains = sorted([self.node.domain, target_domain])key_str = f"{domains[0]}-{domains[1]}"# 使用SHA-256生成256位哈希,取前16字節作為AES-128密鑰key = hashlib.sha256(key_str.encode()).digest()[:16]self.node.shared_secrets[target_domain] = keyreturn keydef _encrypt_message(self, message, key):iv = get_random_bytes(16)cipher = AES.new(key, AES.MODE_CFB, iv)encrypted = cipher.encrypt(message.encode())encrypted_base64 = base64.b64encode(encrypted).decode()while len(encrypted_base64) % 4 != 0:encrypted_base64 += '='iv_base64 = base64.b64encode(iv).decode()while len(iv_base64) % 4 != 0:iv_base64 += '='return encrypted_base64, iv_base64
總結
DSAC
是一個功能強大的分布式存儲和通信系統,它通過節點間的協作實現了文件的安全存儲和高效通信。其采用的分布式架構和加密技術確保了系統的可靠性和隱私性。通過深入了解這個程序,我們可以學習到分布式系統的設計和實現思路,為構建更復雜的分布式應用打下基礎。
希望這篇文章能幫助你更好地理解?DSAC
程序,如果你有任何問題或建議,歡迎在評論區留言。