? ? ? ? 一個月前機緣巧合,有朋友向我推薦了麻省理工學院非常著名的分布式系統課程MIT 6.824,是由世界五大黑客之一,蠕蟲病毒之父Robert Morris教授進行授課。由于我自己也在做基于分布式微服務架構的業務項目,所以對構建分布式系統這個課題非常感興趣,想要探尋其中的一些底層原理。經過一段時間的學習確實感覺受益匪淺!目前還在學習課程和做lab的過程中,不得不說還是很有挑戰性的,所以也想用文字的形式來記錄下自己曾經的學習心得體會分享給大家,有不當之處還請多多批評指正!
一.分布式系統概述
? ? ? ? 在早期的實際應用中,大多數系統屬于“集中式”架構。所有的存儲,計算都集中在一臺服務器上處理。確實,這種系統在架構層面很簡單,作為開發人員只需要重點關心服務端的處理邏輯即可,但是毫無疑問,存在明顯的瓶頸。首先系統性能會受限于單機資源,因為一臺物理服務器它的內存,磁盤,CPU資源都是很有限的;其次單點故障會使得整個系統不可用。而且這種單機系統可擴展性很差,難以應對業務規模的爆發增長。
? ? ? ? 在上世紀80~90年代,人們開始意識到了集中式架構已經很難滿足互聯網發展的需要了,分布式系統架構的設計思想由此應運而生。分布式系統采用多臺計算機進行互聯,共同完成某個任務,而對外屏蔽了系統內部的細節,從用戶或者應用開發者的角度來看仍然只是一個系統。分布式系統中的每臺機器稱為“節點”。由此也衍生出了一些比較輕量級的RPC框架,分布式數據庫等等。
? ? ? ? 隨著互聯網行業的快速發展,系統架構也在逐漸演變。在業務層面上將單體架構拆分成多個微服務部署到不同的服務器上,利用更多的物理計算資源并行工作,分布式微服務架構也由此誕生。同時,數據存儲的方式也發生了巨大的變化,由集中存儲到分片存儲,也由此引出了水平分片,主從復制等架構方案。特別是Google三大論文(GFS,MapReduce,BigTable)更是推進了分布式系統發展的速度。而現代的分布式系統逐漸趨向于云原生。充分利用云計算資源的彈性伸縮,自動化編排等能力。分布式系統的主要設計目標主要是應對以下幾個方面的挑戰:
(一)可擴展性
????????請看下圖中的場景:
????????
? ? ? ? 假如你在一臺Web服務器上搭建了一個網站,而此時有多個客戶端同時訪問這臺服務器的資源。由于服務器的CPU,內存等硬件資源有限,當客戶端請求達到一定數量之后,服務器會難以承受壓力而宕機。所以我們會很自然的想到增加Web服務器的數量,把這些請求分配到多臺機器上進行處理,這就是水平擴展的策略。但是,無限增加Web服務器的數量,數據庫又會成為系統的性能瓶頸,而我們又很難保證這些請求能夠平均分配到每臺機器上。所以我們使用一致性哈希等分片策略保證請求的均勻分配。可擴展性追求的是在增加硬件資源的同時,系統的性能可以獲得接近線性的提升。
(二)容錯性
????????假如你設計了一個分布式系統,如果你需要讓這個系統足夠健壯,在部分節點或者組件發生故障時仍然能正常對外提供服務,或者采用降級策略,就必須要在容錯層面上有比較科學的設計。其共同思想是可用性和自身可恢復性。比如在一個電商系統當中,如果因為某一個訂單服務所在的服務器節點宕機,而導致用戶無法下單,那說明這個系統的容錯性還有待提升。我們真正期望的設計是如果你部署了多個訂單服務節點實例,當其中一個節點掛了,另外一個節點可以頂上,如果實在沒有辦法,仍然有例如返回默認數據等服務降級策略。而保證容錯性的兩個關鍵要點分別是非易失性存儲和復制。由此我們需要在設計架構時根據實際業務需求考慮部署多個副本,主從也好,集群也罷。而且要有合理的限流,熔斷,降級策略和健康檢查機制。
(三)一致性
????????在分布式系統中,只要存在多個服務器節點,那必然會存在數據同步的相關問題。在分布式系統中,多個副本同時進行讀寫IO操作,很容易引發數據不一致。而實際上,一致性追求的是所有節點的數據狀況是相同的。一致性分為兩種情況,一種是強一致性。追求的是所有節點的數據必須始終保持同步,比如說在用戶注冊的時候,在寫完注冊數據之后,立刻可以讀取返回新的信息,而且所有節點都必須能夠讀取到新數據,這個過程中不能有延遲。因為這種全量即時同步的機制,所以如果要追求強一致性,那一定意味著更復雜的系統設計和更昂貴的通信成本。另外一種是弱一致性,數據最終會同步,但是會存在延遲。比如說用戶剛注冊完就從服務器讀取用戶數據時可能會讀取到舊數據,幾秒之后新的數據才同步到服務端。我們在系統設計時具體需要采用哪種理念,需要嚴格結合業務場景進行分析。開發者對于一致性的接受范圍往往取決于他們對異常行為的可接受度,比如會妥協追求最終一致性或版本一致性。
(四)CAP定理
? ? ? ? 首先我們要引出一個概念--分區容忍性。這個性質是針對節點間的通信來講的。當分布式系統中發生網絡分區(通信失敗)時,系統仍然能夠運行,而不是直接掛掉。假如你的公司在福州廈門兩地都部署了機房,某一時刻如果網絡出現問題,服務間的通信斷了,我們依然能夠訪問某些服務,哪怕只是功能受限。
? ? ? ? 2000年,加州大學伯克利分校的Eric Brewer教授提出了CAP定理,被視為分布式系統設計的的“鐵律”。即在分布式系統中,不可能同時滿足以下三點:一致性(C),可用性(A),分區容忍性(P),因為你不能同時在網絡斷開的情況下,既響應用戶請求,又確保所有副本的數據是一樣的。而分布式系統中一定會存在網絡分區,所以分區容忍性(P)是必須保證的。所以在實際架構設計中,我們只能在一致性和可用性二者之間做權衡。如果犧牲可用性,追求強一致性(如Zookeeper,Etcd),網絡異常時會直接拒絕服務。如果犧牲一致性,追求高可用(如Cassandra),則必須允許短時間內的數據不一致。比如在一個實際的后端項目中,Redis緩存的設計會更偏向AP,緩存中的數據可能不是最新的,但是保證了高可用和容錯性。而Etcd配置中心的設計偏向CP,配置必須保證強一致性不能有偏差。
二.分布式高性能計算框架-MapReduce
(一)引子-單詞計數
? ? ? ? MapReduce是一個用于大規模數據集(TB級)并行運算的分布式框架,于2003年由Google公司首先提出。不少公司在面試過程中會出現類似MapReduce框架設計的場景題,所以對于我們來說大致理解MapReduce的設計思想對于加深對分布式系統的理解是很有幫助的。這里我把MapReduce的論文鏈接直接貼在下方,如果想要真正深入理解MapReduce框架還是建議去讀一讀這篇論文,也許會有不一樣的收獲。
MapReduce論文原文
? ? ? ? MapReduce設計的初衷在于對業務開發者屏蔽掉分布式計算的相關細節,由MapReduce框架自動完成,開發者只需關注Map和Reduce兩個函數即可。在這里我們先不直接講述Map函數和Reduce函數的含義,我們先通過一個簡單的單詞計數實例大概了解下MapReduce框架。
? ? ? ? 如上圖所示,有三個客戶端分別向服務器輸入了不同的單詞數據,我們服務端的設計目的,是為了計算出每個單詞出現的次數并且返回。對于這個框架,只有Map函數和Reduce函數是提供給開發者的顯式接口。Map函數負責讀取輸入,并將其轉換為(key,value)鍵值對。在上面這個例子當中,Map函數讀取輸入的單詞并以此為key,統計詞頻為value。當然,key和value的實際意義是根據開發者的實際需求來定制的。而由Map產生的這個(key,value)鍵值對便是這個計算過程的一個中間結果。之后在框架內部,有一個Shuffle函數(對開發者透明,由框架自動執行)將所有Map階段輸出相同的key的記錄匯總到一起,交給對應的Reduce函數進行處理,由Reduce函數進行匯總過程的執行。而emit函數存在于Map/Reduce函數內部,負責輸出(key,value)鍵值對。由上面的過程我們不難看出,運用MapReduce進行單詞計數的過程可以用“分詞-分組-計數”來概括。而MapReduce框架的核心處理流程便是“局部處理-分組-匯總處理”。簡單了解了MapReduce處理問題的方法之后,我們來結合MapReduce的論文和架構圖來更詳細嚴謹的討論MapReduce框架。
(二)邏輯模型
? ? ? ? MapReduce論文中對于Map和Reduce函數的定義,翻譯過來是這樣的:“Map函數由用戶編寫,其主要功能是獲取一個輸入的(key,value)鍵值對并生成一個中間態的(key,value)鍵值對。
MapReduce框架會自動對所有的(key,value)鍵值對進行分組,使得所有有著相同中間態key值的(key,value)鍵值對的value組合在一起,然后將其傳遞給Reduce函數進行處理。Reduce函數也由用戶編寫,其主要功能是接收一個中間態的key值和與該鍵對應的一組value值的集合。它會將這些value值進行統一的合并以形成一個可能更小的value值集合。”如果能夠理解上面的例子,那么相信對于這段定義的理解就很容易了。MapReduce不僅僅可以用于進行詞頻統計,還可以應用于日志統計,倒排索引,社交推薦,分布式排序等更復雜的場景。
? ? ? ? 接下來我們來討論下MapReduce框架的邏輯模型,不過實際的系統設計遠遠不會這么簡單,但是一定與這個邏輯模型有共通之處。上圖是MapReduce論文中的架構圖,我們結合著這張架構圖來進行分析會比較直觀。
? ? ? ? 首先用戶程序的MapReduce庫,也就是客戶端會將輸入的數據進行分片(假設分片數為M)。論文給出的每個分片的參考大小通常為16MB至64MB,可以由用戶通過相關的參數自行設定。之后用戶程序便在服務器集群的其中一組物理機器上通過fork()系統調用,啟動多個該程序的副本,此時用戶程序。這些副本構成了MapReduce框架的處理集群。其中,有一個副本是特殊的,也就是圖中的Master。Master是這個計算集群的中心節點,主要用于分配任務。Master中維護了一些數據結構,存儲了每個Map/Reduce的任務狀態以及Worker機器的ID。而其他的副本都是Worker,這些才是真正的處理程序。Master選擇空閑的Worker,并給其分配Map任務(assign map)或者Reduce任務(assign reduce),被分配了相應任務的Worker節點也稱Map Worker或Reduce Worker。分配任務之后Worker會進行下一步的處理。
? ? ? ? MapWorker會通過read()系統調用讀取分片后的數據內容。從輸入的數據中通過某些邏輯解析出(key,value)鍵值對,然后將這個(key,value)鍵值對作為參數傳遞給用戶定義的Map函數,由Map函數進行處理生成中間態(key,value)鍵值對(該過程也叫emit)。這個中間態鍵值對會被緩存在內存中。緩存在內存中的中間態(key,value)對會被定期寫入本地磁盤(local disks)中進行持久化。而這些被緩存的中間態(key,value)對的位置會被回傳給Master節點,而Master本質上是一個管道,負責將這些位置信息傳遞給對應的Reduce Worker。
? ? ? ? 當一個Reduce Worker被Master節點告知了某個中間態(key,value)鍵值對的位置信息,會使用RPC(遠程過程調用)從對應Map Worker的本地磁盤上讀取這些緩存數據(remote read)。當一個Reduce Worker已經讀取了所有的中間態數據時,會根據其key值進行排序,擁有相同key值的(key,value)鍵值對會被分類到一起,這便是我們前面例子中的shuffle過程。需要進行排序的原因是通常會有不同key的(key,value)對會被映射到同一Reduce任務。如果需要排序的中間態數據量過大,內存無法一次裝載,可能需要考慮使用一些外部排序的方法。之后Reduce Worker會遍歷排好序的中間態(key,value)鍵值對,并將所遇到的每一個唯一的key值和對應的中間態value值集合作為參數傳遞給用戶自定義的reduce函數。reduce函數產生的輸出將會追加在一個該分區的輸出文件內,每個Reduce任務都會對應一個輸出文件。在很多情況下,用戶無需對這些輸出文件進行合并,而是傳遞這些文件,也許它們也可以作為下一個MapReduce任務的輸入。在所有的Map和Reduce任務都結束之后,Master會喚醒用戶程序。此時MapReduce框架的執行結果才返回給用戶程序。
(三)問題討論
? ? ? ? 以上便是MapReduce框架工作流程的概述。畢竟這是一個基于分布式架構設計的框架,所以接下來我們要討論這個系統中可能發生的問題,以及如何保證這個系統應有的分布式特性。
? ? ? ? 首先我們來討論MapReduce框架如何保證容錯性呢?在實際場景中,很有可能會出現以下兩種情況,即Worker節點掛掉或者Master節點掛掉。首先我們需要了解,Master節點和Worker節點之間是如何通信的。在MapReduce論文中提到的通信方式是Master會定期向Worker發送心跳包確認其活性,并且當有任務需要分配時會主動通知Worker。
????????假如是Worker節點出了故障,Master會周期性的ping每一個worker,當在一定時間內沒有收到來自Worker的響應時,Master就會將這些Worker標記為有故障。所有由該Worker完成的Map任務將會失效,重新初始化,因為這些Map任務產生的中間態數據是存儲在Map Worker所在服務器的本地磁盤的,宕機后會訪問失敗。由此這些Map任務可以被其他Worker調度執行。執行中的Map,Reduce任務也是同理,為什么執行完成的Reduce任務不是這樣呢?因為Reduce任務產生的結果是儲存在全局文件系統中的。由此,MapReduce能從大范圍的Worker故障中迅速的修復。
? ? ? ? 假如是Master節點出現了故障,其實這是個非常棘手的問題,畢竟Master節點是MapReduce框架的核心。所以我們只能做一些預防措施在Master節點掛了的時候進行補救,而補救的最好方法就是通過持久化的數據。我們可以讓master周期性的將master數據結構以檢查點的形式進行持久化。如果Master節點掛了,新的master備份節點將會從最新的檢查點狀態處啟動。
? ? ? ? 接下來,在一個龐大的分布式系統中,成千上萬臺物理機器要進行通信,此時網絡IO無疑成為了這個系統最大的性能瓶頸。為了盡可能的節省網絡帶寬,開發者會將GFS服務器和MapReduce Worker運行在同一個計算機集群中(GFS的相關知識我們后續會單獨出專題進行介紹)。GFS將每個文件分割為64MB的塊,同時為每一個塊存儲幾個備份在不同的機器上。MapReduce的master調度map任務時盡量在包含對應輸入數據副本的機器上調度執行一個map任務。如果任務失敗了,調度map任務時會讓執行任務的機器盡量靠近任務所需輸入數據所在的機器,例如被選中的worker機器與包含數據的機器位于同一網段中。當集群中的相當一部分worker都在執行大型MapReduce操作時,絕大多數的輸入數據都在本地讀取從而不會消耗網絡帶寬。
? ? ? ? 現在有一個疑問,在一個MapReduce框架中,Map任務數量(M)和Reduce任務數量(R)具體應該怎么確定呢,也就是說MapReduce框架的任務粒度是怎么確定的?論文中給出的闡述是這樣的:M的數量取決于輸入文件的分片個數,即輸入數據的切分粒度。如果數量太少,則系統并行度太低;如果數量太多,則會給Master的管理造成很大的負擔,IO壓力也隨之增加。在Map輸出階段之后,所有的中間態(key,value)鍵值對會根據key進行哈希。而R的數量應該是相對固定的,根據最終的輸出量進行確定,因為所有 map 輸出都要知道最終 key 屬于哪個 reduce 文件,必須事先固定好 R的數量,整個系統才能協調中間結果。理想情況下,M和R的值都應該遠大于worker機器的數量,Worker數量才是動態運行的實體數,這樣可以讓每一個worker執行很多不同的任務可以提高動態負載均衡的效率,同時也能加快worker故障時的恢復速度。
? ? ? ? ?以上便是我對分布式系統和MapReduce框架的粗淺認知,目前還在做MapReduce框架的lab,所以這篇文章的觀點會根據我做lab出現的問題實時更新,lab完成后我也會在本文加上lab的實現思路供大家參考,有不當之處還請批評指正,我們一起進步!