分布式中間件彈性擴容與rebalance沖平衡
176_如果宕機的數據節點事后再次重啟會發生什么事情?
某個之前某個宕機的數據節點DataNode-A又重啟后,肯定會再次注冊,并進行全量上報的流程,此時,就會導致DataNode-A上的文件副本,實際上在整個DataNode集群中存了3份
177_接收數據節點存儲上報的時候發現副本冗余生成刪除任務
178_將冗余副本的刪除任務下發給對應的數據節點
179_在數據節點上刪除磁盤上的冗余圖片副本
180_測試數據節點掛掉之后副本能否正常復制到其他節點
復制任務的目標數據節點
- 第一,不能是已經死掉的節點
- 第二,不能是已經有這個副本的節點
181_測試宕機的數據節點再次重啟時能否正常刪除冗余副本
如果宕機的DataNode-A上有25萬個文件副本,DataNode-A宕機以后,這25萬個文件副本會打散復制到別的DataNode上去
此時,如果宕機的DataNode-A重新啟動,那么它就會向NameNode進行全量上報,把25萬個文件副本都全量上報到NameNode上去。NameNode就會生成25萬個冗余副本刪除任務RemoveReplicaTask,并放入NameNode內存中管理的DataNode-A對應的DataNodeInfo中的刪除任務阻塞隊列removeReplicaTaskQueue中去
當宕機的DataNode-A重新啟動后的下一次心跳發送到NameNode時,NameNode就會把這25w個RemoveReplicaTask都下發給DataNode-A,DataNode-A拿到這25w個RemoveReplicaTask后,就會開始執行它們,對應的也就是刪除DataNode-A本地的25萬個文件副本。對應的實現代碼如上
182_在上傳文件的時候發現數據節點宕機該如何進行處理?
最最典型的一個客戶端上傳的容錯機制,就是感知到網絡故障之后,就得去進行一些容錯的處理
解決方案就是:
客戶端找某個DataNode上傳文件如果失敗,那么客戶端就將宕機的DataNode傳給NameNode,讓NameNode重新分配除了宕機的DataNode以外的另一個DataNode,客戶端再次進行上傳
183_在客戶端的代碼中找找如何感知到上傳過程中的網絡故障?
這里做的比較粗,就是最外層的catch到Exception,就認為是上傳過程中出現了網絡故障。就不管它是在建立短連接的過程中就建立失敗,還是連接建立成功后,channel.write(file)執行失敗拋出了異常
184_改造代碼實現發現網絡故障時重新分配一個數據節點
通過這種方式,實現客戶端的容錯、故障的轉移,如果一個數據節點有故障,在客戶端是可以進行容錯的,客戶端在找一個數據節點上傳失敗,就會找NameNode重新分配一個DataNode并重新進行上傳
給上傳文件的方法,變成了有返回值的,上傳成功返回true,失敗返回false
185_定義一個新接口:重新分配數據節點以及排除故障節點
186_實現重新分配數據節點這個接口的代碼業務邏輯
187_在下載文件的時候發現數據節點宕機該如何進行處理?
如果某個數據節點掛了,某個數據節點剛掛,NameNode還沒有感知到它掛了,就把它分配給了客戶端,客戶端此時對著這個掛了個數據節點上傳文件/下載文件,肯定就會失敗的
188_在客戶端的代碼中找找如何感知到下載過程中的網絡故障?
189_改造代碼實現下載文件發現網絡故障重新申請一個數據節點
客戶端往第一個數據節點下載文件失敗時,需要找NameNode節點重新分配一個除了這個下載失敗的節點以外的別的數據節點
190_改造已有的舊接口:為下載文件重新分配一個數據節點
191_重寫為下載文件分配數據節點的接口:加入排除故障節點邏輯
將前面的getDatanodeForFile()改成了下面的chooseDataNodeFromReplicas(),就是從幾臺有fileName文件對應的副本的數據節點中,隨機的選擇出一臺數據節點,且這個選出的數據節點還不能是第一次下載失敗的數據節點
第479行,就是在排除第一次下載文件失敗的數據節點
192_海量數據存儲:分布式存儲、多副本冗余以及高可用架構
海量數據的存儲,主要針對的各種小文件、小圖片。海量數據的存儲,用一臺機器肯定是不行,所以首先做的就是分布式存儲架構
多副本冗余,高可用架構(任何一臺機器宕機數據不會丟,上傳、下載的過程中失敗,可以換數據節點重試)
一邊要多思考里面的架構設計思想,FastDFS和TFS。FastDFS是一個國產的開源項目,c語言開發的,中小型公司在使用,一般在分布式文件存儲的場景中,都會采用FastDFS來使用,最大缺點就是c語言開發,我們沒辦法閱讀里面的源碼,出問題的時候極坑,全部c異常,Java工程師沒有能力維護FastDFS集群的。我們需要有發現問題,改源碼、編譯、打包重新部署的能力
我們可以百度一下“FastDFS架構原理”,分布式存儲、副本冗余、高可用架構,跟我們設計的這套架構類似的
TFS,淘寶內部開發的分布式文件系統,主要是針對淘寶上面,大量的店鋪中的商品的小圖片,4kb~400mb之間,分布式存儲,元數據管理機制,副本冗余,高可用架構,和我們的架構也是類似的。但是,和我們的有一點不同的是,很多小圖片會被合并為一個大文件來存儲,每個文件都會有一個對應的索引文件。網上關于FastDFS和TFS的資料也不會太多
我們這邊參考了HDFS的架構,Hadoop分成了三大系統:Hadoop分布式文件系統,分布式計算系統,分布式資源調度系統,尤其是里面的元數據管理架構完全參考了HDFS
193_分布式文件系統的可伸縮架構值的是什么以及如何設計
六大架構
分布式存儲架構:
容錯架構:
高可用架構:除了NameNode需要依賴zk還沒做
可伸縮架構:可以隨時增加或下線機器
高性能架構:讓上傳和下載速度更快
高并發架構:大量客戶端同時連接過來,進行上傳下載
可伸縮架構,對于集群而言,就是可以保證加機器去里面,或者是下線機器都可以實現
加一臺機器,你接下來如何做,假設已經有4臺機器,每臺機器上的磁盤空間都快滿了,這個是很常見的場景,大數據的同學玩HDFS的時候就會有這種情況,已有的幾臺機器的磁盤空間都滿了,無法寫入新的數據了
首先你得保證接下來要優先往最空的這臺機器去寫入數據,接下來在后臺應該啟動一大堆的定時任務,要慢慢的把快滿的4臺機器上的數據逐步逐步的遷移到空的機器上去,緩解已有4臺機器的存儲壓力,可以讓空機器放更多的數據
下線機器,有某一臺機器不需要了,關閉DataNode,把機器干掉。這種情況就可以看做是屬于機器的宕機,NameNode感知到以后,會自動的去進行副本的復制,保證數據不丟失
194_上線新機器之后是否會自動優先往里面寫入數據?
如果每次擴容,一般來說針對我們的這個系統,要么不擴容,要么擴容都是>=2臺機器起步來進行擴容,每次擴容2臺機器,那么在這里一排序,就會優先往2臺空的機器里寫入數據,就可以立馬緩解住已有的4臺機器磁盤快寫滿的壓力
195_到底什么時候應該從磁盤快滿的機器緩慢遷移數據出去
假設每臺機器是100GB存儲空間:
機器01:90GB
機器02:90GB
機器03:90GB
機器04:90GB
新加入兩臺機器:
機器05:0GB
機器06:0GB
把所有的機器全部加起來,算一個平均數,一共360GB,每臺平均60GB
(機器01、機器02、機器03、機器04) ->(機器05、機器06)
舉個例子:機器01 -> 機器05,遷移30GB,首先要確定這30G具體包含對應的是哪些文件。需要生成兩種任務,第一種任務是復制任務。對于機器05而言有復制任務,他需要從機器01復制指定的文件過來
第二種就是刪除任務。對機器01而言就有刪除任務。每隔一段時間,你都可以讓他去進行一次全量存儲的匯報,把這個節點原先的各種存儲信息重新刷新一遍,同時在全量存儲匯報的時候,就可以檢查一下每個圖片的副本數量,如果超出了2個,就生成刪除任務也可以
196_定義一個新接口:手動觸發集群數據的rebalance
上線了新的幾臺機器,立馬就應該執行一個命令
我們可以提供一個用python編寫的腳本,在腳本里基于python調用gRPC提供的接口,調用到Master上去,執行某個命令觸發rebalance,gRPC本來就是支持多語言的
同理,shutdown()之類的接口,我們也可以寫Python腳本,來手動調用shutdown,來優雅關閉
199_實現rebalance的核心算法:集群存儲資源重平衡算法邏輯
假設現在
機器01:90GB
機器02:90GB
機器03:90GB
機器04:90GB
機器05:0GB
機器06:0GB
平均就是應該每臺60G
/*** 這個組件,就是負責管理集群里的所有的datanode的*/
public class DataNodeManager {/*** 為重平衡去創建副本復制的任務*/public void createReBalanceTasks() {// 沖平衡時,加大鎖synchronized(this) {long totalStoredDataSize = 0;for(DataNodeInfo datanode : datanodes.values()) {totalStoredDataSize += datanode.getStoredDataSize();}// 計算集群節點存儲數據的平均值long averageStoredDataSize = totalStoredDataSize / datanodes.size();// 將集群中的節點區分為兩類:遷出節點和遷入節點List<DataNodeInfo> sourceDataNodes = new ArrayList<>();List<DataNodeInfo> destDataNodes = new ArrayList<>();for(DataNodeInfo datanode : datanodes.values()) {// 遷出節點if(datanode.getStoredDataSize() > averageStoredDataSize) {sourceDataNodes.add(datanode);}// 遷入節點if(datanode.getStoredDataSize() < averageStoredDataSize) {destDataNodes.add(datanode);}}// 為遷入節點生成復制的任務,為遷出節點生成刪除的任務// 在這里生成的刪除任務統一放到24小時之后延遲調度執行,咱們可以實現一個延遲調度執行的線程List<RemoveReplicaTask> removeReplicaTasks = new ArrayList<>();for(DataNodeInfo sourceDatanode : sourceDataNodes) {// 當前源數據節點,需要遷移的數據大小long toRemoveDataSize = sourceDatanode.getStoredDataSize() - averageStoredDataSize;for(DataNodeInfo destDatanode : destDataNodes) {// 直接將sourceDatanode要遷移的數據,一次性放到一臺destDatanode機器就可以了if(destDatanode.getStoredDataSize() + toRemoveDataSize <= averageStoredDataSize) {createReBalanceTasks(sourceDatanode, destDatanode, removeReplicaTasks, toRemoveDataSize);break;}// 只能把部分數據放到這臺機器上去else if(destDatanode.getStoredDataSize() < averageStoredDataSize) {// sourceDatanode要遷移的數據,最多只能遷移maxRemoveDataSize的數據,到destDatanode上去// 比如sourceDatanode要遷移的數據一共30G,但是當前的destDatanode最多只能接收15G數據long maxRemoveDataSize = averageStoredDataSize - destDatanode.getStoredDataSize();long removedDataSize = createReBalanceTasks(sourceDatanode, destDatanode, removeReplicaTasks, maxRemoveDataSize);// 將本sourceDatanode節點,待遷移的量toRemoveDataSize,減去本次遷移的量removedDataSizetoRemoveDataSize -= removedDataSize;} }}// 交給一個延遲線程去24小時之后執行刪除副本的任務// 保證開始執行刪除任務時,前面的復制任務已經全部執行完畢了new DelayRemoveReplicaThread(removeReplicaTasks).start(); } }private long createReBalanceTasks(DataNodeInfo sourceDatanode, DataNodeInfo destDatanode,List<RemoveReplicaTask> removeReplicaTasks, long maxRemoveDataSize) {List<String> files = namesystem.getFilesByDatanode(sourceDatanode.getIp(), sourceDatanode.getHostname());long removedDataSize = 0;// 遍歷文件,不停的為每個文件生成一個復制的任務,直到準備遷移的文件的大小// 超過了待遷移總數據量maxRemoveDataSize為止for(String file : files) {String filename = file.split("_")[0];long fileLength = Long.parseLong(file.split("_")[1]);if(removedDataSize + fileLength >= maxRemoveDataSize) {break;}/** 為這個文件生成,針對目標節點的復制任務 */ReplicateTask replicateTask = new ReplicateTask(filename, fileLength, sourceDatanode, destDatanode);// 復制任務時立馬下發的destDatanode.addReplicateTask(replicateTask); destDatanode.addStoredDataSize(fileLength); /** 為這個文件生成,針對源節點的刪除任務 */sourceDatanode.addStoredDataSize(-fileLength); namesystem.removeReplicaFromDataNode(sourceDatanode.getId(), file); RemoveReplicaTask removeReplicaTask = new RemoveReplicaTask(filename, sourceDatanode);// 針對刪除任務,統一攢起來,24小時后一起執行removeReplicaTasks.add(removeReplicaTask); // 遷移一個文件,就累加一份fileLengthremovedDataSize += fileLength;}return removedDataSize;}/*** 延遲刪除副本的線程*/static class DelayRemoveReplicaThread extends Thread {private final List<RemoveReplicaTask> removeReplicaTasks;public DelayRemoveReplicaThread(List<RemoveReplicaTask> removeReplicaTasks) {this.removeReplicaTasks = removeReplicaTasks;}@Overridepublic void run() {long start = System.currentTimeMillis();while(true) {try {long now = System.currentTimeMillis();if(now - start > 24 * 60 * 60 * 1000) {for(RemoveReplicaTask removeReplicaTask : removeReplicaTasks) {// 真正的將刪除任務下發下去,相應的DataNode下一輪心跳過來,就能認領這些刪除任務removeReplicaTask.getDatanode().addRemoveReplicaTask(removeReplicaTask); }break;}Thread.sleep(60 * 1000); } catch (Exception e) {e.printStackTrace();}}}}}
200_基于可伸縮架構實現集群擴容支撐海浪數據的存儲
海量數據存儲架構:分布式存儲架構 + 可伸縮架構
高可用+高容錯架構:多副本冗余 + 副本自動遷移(數據節點宕機時) + 冗余副本自動刪除(宕機數據節點重啟后) + 客戶端容錯機制(文件上傳、下載時更換數據節點)
高性能架構:盡可能提升客戶端文件上傳和下載的性能和速度
高并發架構:盡可能讓每個數據節點可以支撐更多的客戶端的并發上傳和下載
201_在分布式文件系統中高并發主要指的是什么?
高并發和高性能的架構改造,這是比較升華的一個部分
第一塊:NameNode,元數據變更,能否承載高并發
第二塊:DataNode,文件的上傳和下載,能否承載高并發
對于NameNode而言,假設高峰時期,一萬個客戶端,同時發起請求要創建文件,一秒鐘內高峰期直接來一萬個請求去訪問NameNode
對于DataNode而言,1000個客戶端連接到DataNode上去,同時進行文件的 上傳和下載,能否扛得住
202_看看NameNode中有哪些接口可能會被高并發的訪問?
上傳接口:create、allocateDataNodes、informReplicaReceived
下載接口:chooseDataNodeFromReplicas
203_分析一下文件上傳的三個接口能否支撐幾千的QPS
NameNode節點,一般在生產部署的時候,肯定是高配置物理機,不會是虛擬機,起碼都是32核128G的配置。這種配置正常情況下,應該一臺機器極限支撐個每秒幾萬的請求都是可以的
比如,一秒鐘來1萬個請求,每個請求排隊獲取鎖,從而進入執行更新文件目錄樹的代碼邏輯。因為是基于純內存的操作,一個請求需要多少時間,1毫秒都不會到,可能一個請求就0.01毫秒,1毫秒可以執行100個請求,一秒就可以執行100 * 1000 = 10w個請求
雖然說有并發邏輯里會加鎖,但是不要緊,只要基于純內存,每個請求速度依然是極快的,就可以做到每秒處理幾萬個請求
絕大部分的創建文件的請求,可能就兩個操作:更新內存里的文件目錄樹 、editlog寫入內存緩沖 -> 0.01毫秒 -> 每秒執行10w次請求沒問題。可能只有隔一段時間才會有一個請求,雙緩沖的currentBuffer滿了的時候,才會輪到這個線程執行一下刷磁盤,并且這個刷磁盤也是順序寫
Kafka之類的中間件系統,其實本質也是大量的基于內存來實現核心邏輯的,在高配置物理機的場支持下,抗下來每秒10萬的QPS完全不是問題
204_分析一下文件上傳的三個接口能否支撐幾千的QPS(2)
平時寫CRUD的業務系統,用不著高并發、IO、網絡、磁盤、Netty、ZK一些技術。Java Web里最復雜的一塊東西,其實是Tomcat,人家Tomcat作為一個Web服務器,他底層就要去做網絡通信監聽某個端口、內存管理、并發控制。你寫的Servlet、SSM,其實就是嵌入在Tomcat容器里,執行的一些業務代碼,你就是CRUD。互聯網系統,緩存,MQ,數據庫,ES,NoSQL,架構設計
但是我們現在自己寫中間件系統,分布式文件系統,微服務注冊中心給完成,這兩個項目搞完有三層意義:第一個,把你底層的技術全部打通,基礎會極度的扎實;第二個,后面看一些開源中間件系統的源碼,會非常的輕松;第三個,這兩個都是工業級的項目,直接是可以在出去面試的時候寫簡歷上的,比如起個名字叫“盤古”分布式圖片存儲系統,替換你的很Low的CRUD的一些項目經歷
每秒10w的文件上傳/下載的請求,在NameNode這塊是沒有任何的瓶頸的,雖然NameNode是單機,但是也是高配的物理機。而有很多的中間件系統是基于zk來做元數據管理,每次更新元數據的時候,都需要走網絡請求,純內存一般就是0.001毫秒~0.01毫秒,這個時候性能就沒辦法保證了
因為只要一走網絡請求,耗時直接就到毫秒級,一個請求過來,你需要去請求zk來做一些事情,直接就會到1毫秒+,幾毫秒,10毫秒,直接會導致你的NameNode承載的并發能力,可能下降到每秒幾千QPS了
雖然,后面我們也會用ZK,但是不是用ZK管理分布式文件系統中的各種元數據,而是負責管理NameNode集群的高可用。我們目前NameNode使用的是單機,沒有辦法做到某臺NameNode掛了以后,自動切換到另一臺NameNode對外提供服務,所以需要ZK的協助
205_DataNode的NIO網絡通信架構能支撐高并發嗎?
后面會把這套分布式圖片存儲系統整合到電商平臺里去,電商平臺中大量的用到了很多的圖片,圖片其實都應該存儲在在這個分布式圖片系統里,對圖片的讀取,主要也是走圖片系統,評論曬圖、商品圖片。主要的壓力就是圖片讀取,評論曬圖的頻率一般都是很低的,畢竟寫評論的人是少數
主要的壓力可能就是來自電商首頁、商品詳情頁,可能會有很多圖片讀取的請求,每秒上萬的請求。但是針對這種情況,你肯定必須得做靜態化圖片的緩存,不可能說每次都從分布式圖片系統里來讀圖片,前置的Nginx本身就可以做靜態圖片的緩存
CDN緩存,大量的靜態資源可以在前置的很多地方做緩存,Nginx、緩存服務器、CDN做緩存和加速,不需要每次都請求到底層的分布式圖片系統里去的
比如說假設你每秒有1萬個請求,一共部署了10臺數據節點,每臺機器要每秒要承擔1000個QPS,目前的一個DataNode架構,每臺機器接收1000個連接和請求能否實現?
目前的網絡NIO通信架構,一個selector線程就需要監聽1000個sockeChannel,后面只有3個worker線程,要同時做兩件事情:1. 解析自定義的二進制通信協議的請求之外(很快),2. 最核心最笨重的就是執行本地磁盤的讀寫邏輯(很慢)
這個架構最大的問題,就是將解析請求和磁盤IO混在了一起,如果其中一個磁盤IO卡住了,那么這個worker負責的后續的所有請求的處理都會跟著受影響,從而導致高并發過來后,會產生大量積壓
206_基于Reactor模式重新設計DataNode的網絡通信架構
直接參考Kafka服務端的網絡通信架構,就是基于如下的Reactor模式來實現的
1000個客戶端同時連接過來,發送請求,高并發的場景下,用Reactor模式來支撐是很輕松的
10個Processor線程,每個線程也就處理100個客戶端
30個IO線程:執行比較慢的磁盤IO操作
207_重寫DataNode的NioServer讓其僅僅監聽客戶端連接請求
208_讓NioServer將建立好的連接均勻分發給Processor線程
209_Processor線程將均勻分配的連接注冊到自己的Selector上
210_在一個循環中以限時阻塞的方式完成客戶端請求的感知
NioProcessor
/*** 負責解析請求以及發送響應的線程*/
public class NioProcessor extends Thread {/*** 多路復用監聽時的最大阻塞時間*/public static final Long POLL_BLOCK_MAX_TIME = 1000L;// 等待注冊的網絡連接的隊列private ConcurrentLinkedQueue<SocketChannel> channelQueue = new ConcurrentLinkedQueue<SocketChannel>();// 每個Processor私有的Selector多路復用器private Selector selector;public NioProcessor() {try {this.selector = Selector.open();} catch (IOException e) {e.printStackTrace();}}/*** 給這個Processor線程分配一個網絡連接*/public void addChannel(SocketChannel channel) {channelQueue.offer(channel);// 喚醒在POLL_BLOCK_MAX_TIME處,等待的selectorselector.wakeup();}/*** 線程的核心主邏輯*/@Overridepublic void run() {while(true) {try {// 注冊排隊等待的連接registerQueuedClients();// 以限時阻塞的方式感知連接中的請求poll();} catch (Exception e) {e.printStackTrace(); }}}/*** 將排隊中的等待注冊的連接注冊到Selector上去*/private void registerQueuedClients() {SocketChannel channel = null;while((channel = channelQueue.poll()) != null) {try {channel.register(selector, SelectionKey.OP_READ);} catch (ClosedChannelException e) {e.printStackTrace();}}}/*** 以多路復用的方式來監聽各個連接的請求*/private void poll() {try {// 以限時阻塞的方式完成客戶端請求的感知(在一個循環中)int keys = selector.select(POLL_BLOCK_MAX_TIME);if(keys > 0) {Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();while(keyIterator.hasNext()) {SelectionKey key = keyIterator.next();keyIterator.remove();// 如果接受到了某個客戶端的請求if(key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();}}}} catch (Exception e) {e.printStackTrace();}}}
注意這里的selector.wakeup()的使用場景
213_封裝NetworkRequest來解析與抽取一個完整的請求
NetworkRequest
/*** 從channel中讀取一次二進制格式的網絡請求數據解析,并封裝為一個NetworkRequest對象NetworkRequest對象中的CachedRequest的內部,會記錄解析出來的文件名,文件內容等信息 */
public class NetworkRequest {public static final Integer REQUEST_SEND_FILE = 1;public static final Integer REQUEST_READ_FILE = 2;// 本次網絡請求對應的連接private SelectionKey key;// 本次網絡請求對應的連接private SocketChannel channel;// 緩存中的數據private final CachedRequest cachedRequest = new CachedRequest();private ByteBuffer cachedRequestTypeBuffer;private ByteBuffer cachedFilenameLengthBuffer;private ByteBuffer cachedFilenameBuffer;private ByteBuffer cachedFileLengthBuffer;private ByteBuffer cachedFileBuffer;public SelectionKey getKey() {return key;}public void setKey(SelectionKey key) {this.key = key;}public SocketChannel getChannel() {return channel;}public void setChannel(SocketChannel channel) {this.channel = channel;}/*** 從網絡連接中讀取與解析出來一個請求*/public void read() {try {// 假如說你這個一次讀取的數據里包含了多個文件的話// 這個時候我們會先讀取文件名,然后根據文件的大小去讀取這么多的數據// 需要先提取出來這次請求是什么類型:1 發送文件;2 讀取文件Integer requestType = null;if(cachedRequest.requestType != null) {requestType = cachedRequest.requestType;} else {requestType = getRequestType(channel); // 但是此時channel的position肯定也變為了4}if(requestType == null) {return;}System.out.println("從請求中解析出來請求類型:" + requestType); // 拆包,就是說人家一次請求,本來是包含了:requestType + filenameLength + filename [+ imageLength + image]// 這次OP_READ事件,就讀取到了requestType的4個字節中的2個字節,剩余的數據// 就被放在了下一次OP_READ事件中了if(REQUEST_SEND_FILE.equals(requestType)) {handleSendFileRequest(channel, key);} else if(REQUEST_READ_FILE.equals(requestType)) {handleReadFileRequest(channel, key);}} catch (Exception e) {e.printStackTrace();}}/*** 獲取本次請求的類型*/public Integer getRequestType(SocketChannel channel) throws Exception {Integer requestType = null;if(cachedRequest.requestType != null) {return cachedRequest.requestType;}ByteBuffer requestTypeBuffer = null;if(cachedRequestTypeBuffer != null) {requestTypeBuffer = cachedRequestTypeBuffer;} else {requestTypeBuffer = ByteBuffer.allocate(4);}channel.read(requestTypeBuffer); // 此時requestType ByteBuffer,position跟limit都是4,remaining是0if(!requestTypeBuffer.hasRemaining()) {// 已經讀取出來了4個字節,可以提取出來requestType了requestTypeBuffer.rewind(); // 將position變為0,limit還是維持著4requestType = requestTypeBuffer.getInt();cachedRequest.requestType = requestType;} else {cachedRequestTypeBuffer = requestTypeBuffer; }return requestType;}/*** 是否已經完成了一個請求的讀取* @return*/public Boolean hasCompletedRead() {return cachedRequest.hasCompletedRead;}/*** 獲取文件名同時轉換為本地磁盤目錄中的絕對路徑* @param channel* @return* @throws Exception*/private Filename getFilename(SocketChannel channel) throws Exception {Filename filename = new Filename(); if(cachedRequest.filename != null) {return cachedRequest.filename;} else {String relativeFilename = getRelativeFilename(channel);if(relativeFilename == null) {return null;}String absoluteFilename = getAbsoluteFilename(relativeFilename);// /image/product/iphone.jpgfilename.relativeFilename = relativeFilename;filename.absoluteFilename = absoluteFilename;cachedRequest.filename = filename;} return filename;}/*** 獲取相對路徑的文件名*/private String getRelativeFilename(SocketChannel channel) throws Exception {Integer filenameLength = null;String filename = null;// 讀取文件名的大小if(cachedRequest.filenameLength == null) {ByteBuffer filenameLengthBuffer = null;if(cachedFilenameLengthBuffer != null) {filenameLengthBuffer = cachedFilenameLengthBuffer;} else {filenameLengthBuffer = ByteBuffer.allocate(4);}channel.read(filenameLengthBuffer); if(!filenameLengthBuffer.hasRemaining()) { filenameLengthBuffer.rewind();filenameLength = filenameLengthBuffer.getInt();cachedRequest.filenameLength = filenameLength;} else {cachedFilenameLengthBuffer = filenameLengthBuffer;return null;}}// 讀取文件名ByteBuffer filenameBuffer = null;if(cachedFilenameBuffer != null) {filenameBuffer = cachedFilenameBuffer;} else {filenameBuffer = ByteBuffer.allocate(filenameLength);}channel.read(filenameBuffer);if(!filenameBuffer.hasRemaining()) {filenameBuffer.rewind();filename = new String(filenameBuffer.array()); } else {cachedFilenameBuffer = filenameBuffer;}return filename;}/*** 獲取文件在本地磁盤上的絕對路徑名*/private String getAbsoluteFilename(String relativeFilename) {String[] relativeFilenameSplited = relativeFilename.split("/"); String dirPath = DATA_DIR;for(int i = 0; i < relativeFilenameSplited.length - 1; i++) {if(i == 0) {continue;}dirPath += "\\" + relativeFilenameSplited[i];}File dir = new File(dirPath);if(!dir.exists()) {dir.mkdirs();}String absoluteFilename = dirPath + "\\" + relativeFilenameSplited[relativeFilenameSplited.length - 1];return absoluteFilename;}/*** 從網絡請求中獲取文件大小*/private Long getFileLength(SocketChannel channel) throws Exception {Long fileLength = null;if(cachedRequest.fileLength != null) {return cachedRequest.fileLength;} else {ByteBuffer fileLengthBuffer = null;if(cachedFileLengthBuffer != null) { fileLengthBuffer = cachedFileLengthBuffer;} else {fileLengthBuffer = ByteBuffer.allocate(8);}channel.read(fileLengthBuffer);if(!fileLengthBuffer.hasRemaining()) {fileLengthBuffer.rewind();fileLength = fileLengthBuffer.getLong();cachedRequest.fileLength = fileLength;} else {cachedFileLengthBuffer = fileLengthBuffer;}}return fileLength;}/*** 發送文件*/private void handleSendFileRequest(SocketChannel channel, SelectionKey key) throws Exception {// 從請求中解析文件名Filename filename = getFilename(channel); System.out.println("從網絡請求中解析出來文件名:" + filename); if(filename == null) {return;}// 從請求中解析文件大小Long fileLength = getFileLength(channel); System.out.println("從網絡請求中解析出來文件大小:" + fileLength); if(fileLength == null) {return;}// 循環不斷的從channel里讀取數據,并寫入磁盤文件ByteBuffer fileBuffer = null;if(cachedFileBuffer != null) {fileBuffer = cachedFileBuffer;} else {fileBuffer = ByteBuffer.allocate(Integer.parseInt(String.valueOf(fileLength)));}channel.read(fileBuffer);if(!fileBuffer.hasRemaining()) {fileBuffer.rewind();cachedRequest.file = fileBuffer;cachedRequest.hasCompletedRead = true;System.out.println("本次文件上傳請求讀取完畢......."); } else {cachedFileBuffer = fileBuffer;System.out.println("本次文件上傳出現拆包問題,緩存起來,下次繼續讀取......."); }}/*** 讀取文件*/private void handleReadFileRequest(SocketChannel channel, SelectionKey key) throws Exception {// 從請求中解析文件名// 已經是:F:\\development\\tmp1\\image\\product\\iphone.jpgFilename filename = getFilename(channel); System.out.println("從網絡請求中解析出來文件名:" + filename); if(filename == null) {return;}cachedRequest.hasCompletedRead = true;}/*** 文件名*/class Filename {// 相對路徑名String relativeFilename;// 絕對路徑名String absoluteFilename;@Overridepublic String toString() {return "Filename [relativeFilename=" + relativeFilename + ", absoluteFilename=" + absoluteFilename + "]";}}/*** 緩存文件*/class CachedRequest {Integer requestType;Filename filename;Integer filenameLength;Long fileLength;ByteBuffer file;Boolean hasCompletedRead = false;}}
214_將讀取完畢的網絡請求分發到全局的請求隊列中
NetworkRequestQueue?
package com.zhss.dfs.datanode.server;import java.util.concurrent.ConcurrentLinkedQueue;/*** 公共網絡請求存放隊列*/
public class NetworkRequestQueue {private static volatile NetworkRequestQueue instance = null;public static NetworkRequestQueue get() {if(instance == null) {synchronized(NetworkRequestQueue.class) {if(instance == null) {instance = new NetworkRequestQueue();}}}return instance;}// 一個全局的請求隊列private final ConcurrentLinkedQueue<NetworkRequest> requestQueue = new ConcurrentLinkedQueue<NetworkRequest>();public void offer(NetworkRequest request) {requestQueue.offer(request);}public NetworkRequest poll() {return requestQueue.poll();}}
216_實現IO線程從請求隊列中爭搶請求以及執行磁盤IO操作
IOThread
/*** 負責執行磁盤IO的線程*/
public class IOThread extends Thread {public static final Integer REQUEST_SEND_FILE = 1;public static final Integer REQUEST_READ_FILE = 2;// 拿出單例的公共請求存放隊列private final NetworkRequestQueue requestQueue = NetworkRequestQueue.get();private final NameNodeRpcClient namenode;public IOThread(NameNodeRpcClient namenode) {this.namenode = namenode;}@Overridepublic void run() {while(true) {try {NetworkRequest request = requestQueue.poll();if(request == null) {Thread.sleep(100);continue;}Integer requestType = request.getRequestType();if(requestType.equals(REQUEST_SEND_FILE)) {// 對于上傳文件,將文件寫入本地磁盤即可writeFileToLocalDisk(request);} else if(requestType.equals(REQUEST_READ_FILE)) {// 對于下載文件,從本地磁盤讀取文件readFileFromLocalDisk(request);}} catch (Exception e) {e.printStackTrace();}}}private void readFileFromLocalDisk(NetworkRequest request) throws Exception {FileInputStream localFileIn = null;FileChannel localFileChannel = null;try {File file = new File(request.getAbsoluteFilename());Long fileLength = file.length();localFileIn = new FileInputStream(request.getAbsoluteFilename()); localFileChannel = localFileIn.getChannel();// 循環不斷的從channel里讀取數據,并寫入磁盤文件ByteBuffer buffer = ByteBuffer.allocate(8 + Integer.parseInt(String.valueOf(fileLength)));buffer.putLong(fileLength);int hasReadImageLength = localFileChannel.read(buffer);System.out.println("從本次磁盤文件中讀取了" + hasReadImageLength + " bytes的數據"); buffer.rewind();} finally {if(localFileChannel != null) {localFileChannel.close();}if(localFileIn != null) {localFileIn.close();}}}private void writeFileToLocalDisk(NetworkRequest request) throws Exception {// 構建針對本地文件的輸出流FileOutputStream localFileOut = null;FileChannel localFileChannel = null;try {localFileOut = new FileOutputStream(request.getAbsoluteFilename()); localFileChannel = localFileOut.getChannel();localFileChannel.position(localFileChannel.size());System.out.println("對本地磁盤文件定位到position=" + localFileChannel.size()); int written = localFileChannel.write(request.getFile());System.out.println("本次文件上傳完畢,將" + written + " bytes的數據寫入本地磁盤文件......."); // 增量上報Master節點自己接收到了一個文件的副本// /image/product/iphone.jpgnamenode.informReplicaReceived(request.getRelativeFilename() + "_" + request.getFileLength());System.out.println("增量上報收到的文件副本給NameNode節點......"); } finally {localFileChannel.close();localFileOut.close();}}}