分布式文件系統06-分布式中間件彈性擴容與rebalance沖平衡

分布式中間件彈性擴容與rebalance沖平衡

176_如果宕機的數據節點事后再次重啟會發生什么事情?

某個之前某個宕機的數據節點DataNode-A又重啟后,肯定會再次注冊,并進行全量上報的流程,此時,就會導致DataNode-A上的文件副本,實際上在整個DataNode集群中存了3份

177_接收數據節點存儲上報的時候發現副本冗余生成刪除任務

178_將冗余副本的刪除任務下發給對應的數據節點

179_在數據節點上刪除磁盤上的冗余圖片副本

180_測試數據節點掛掉之后副本能否正常復制到其他節點

復制任務的目標數據節點

  • 第一,不能是已經死掉的節點
  • 第二,不能是已經有這個副本的節點

181_測試宕機的數據節點再次重啟時能否正常刪除冗余副本

如果宕機的DataNode-A上有25萬個文件副本,DataNode-A宕機以后,這25萬個文件副本會打散復制到別的DataNode上去

此時,如果宕機的DataNode-A重新啟動,那么它就會向NameNode進行全量上報,把25萬個文件副本都全量上報到NameNode上去。NameNode就會生成25萬個冗余副本刪除任務RemoveReplicaTask,并放入NameNode內存中管理的DataNode-A對應的DataNodeInfo中的刪除任務阻塞隊列removeReplicaTaskQueue中去

當宕機的DataNode-A重新啟動后的下一次心跳發送到NameNode時,NameNode就會把這25w個RemoveReplicaTask都下發給DataNode-A,DataNode-A拿到這25w個RemoveReplicaTask后,就會開始執行它們,對應的也就是刪除DataNode-A本地的25萬個文件副本。對應的實現代碼如上

182_在上傳文件的時候發現數據節點宕機該如何進行處理?

最最典型的一個客戶端上傳的容錯機制,就是感知到網絡故障之后,就得去進行一些容錯的處理

解決方案就是:

客戶端找某個DataNode上傳文件如果失敗,那么客戶端就將宕機的DataNode傳給NameNode,讓NameNode重新分配除了宕機的DataNode以外的另一個DataNode,客戶端再次進行上傳

183_在客戶端的代碼中找找如何感知到上傳過程中的網絡故障?

這里做的比較粗,就是最外層的catch到Exception,就認為是上傳過程中出現了網絡故障。就不管它是在建立短連接的過程中就建立失敗,還是連接建立成功后,channel.write(file)執行失敗拋出了異常

184_改造代碼實現發現網絡故障時重新分配一個數據節點

通過這種方式,實現客戶端的容錯、故障的轉移,如果一個數據節點有故障,在客戶端是可以進行容錯的,客戶端在找一個數據節點上傳失敗,就會找NameNode重新分配一個DataNode并重新進行上傳


給上傳文件的方法,變成了有返回值的,上傳成功返回true,失敗返回false

185_定義一個新接口:重新分配數據節點以及排除故障節點

186_實現重新分配數據節點這個接口的代碼業務邏輯

187_在下載文件的時候發現數據節點宕機該如何進行處理?

如果某個數據節點掛了,某個數據節點剛掛,NameNode還沒有感知到它掛了,就把它分配給了客戶端,客戶端此時對著這個掛了個數據節點上傳文件/下載文件,肯定就會失敗的

188_在客戶端的代碼中找找如何感知到下載過程中的網絡故障?

189_改造代碼實現下載文件發現網絡故障重新申請一個數據節點

客戶端往第一個數據節點下載文件失敗時,需要找NameNode節點重新分配一個除了這個下載失敗的節點以外的別的數據節點

190_改造已有的舊接口:為下載文件重新分配一個數據節點

191_重寫為下載文件分配數據節點的接口:加入排除故障節點邏輯

將前面的getDatanodeForFile()改成了下面的chooseDataNodeFromReplicas(),就是從幾臺有fileName文件對應的副本的數據節點中,隨機的選擇出一臺數據節點,且這個選出的數據節點還不能是第一次下載失敗的數據節點

第479行,就是在排除第一次下載文件失敗的數據節點

192_海量數據存儲:分布式存儲、多副本冗余以及高可用架構

海量數據的存儲,主要針對的各種小文件、小圖片。海量數據的存儲,用一臺機器肯定是不行,所以首先做的就是分布式存儲架構

多副本冗余,高可用架構(任何一臺機器宕機數據不會丟,上傳、下載的過程中失敗,可以換數據節點重試)

一邊要多思考里面的架構設計思想,FastDFS和TFS。FastDFS是一個國產的開源項目,c語言開發的,中小型公司在使用,一般在分布式文件存儲的場景中,都會采用FastDFS來使用,最大缺點就是c語言開發,我們沒辦法閱讀里面的源碼,出問題的時候極坑,全部c異常,Java工程師沒有能力維護FastDFS集群的。我們需要有發現問題,改源碼、編譯、打包重新部署的能力

我們可以百度一下“FastDFS架構原理”,分布式存儲、副本冗余、高可用架構,跟我們設計的這套架構類似的

TFS,淘寶內部開發的分布式文件系統,主要是針對淘寶上面,大量的店鋪中的商品的小圖片,4kb~400mb之間,分布式存儲,元數據管理機制,副本冗余,高可用架構,和我們的架構也是類似的。但是,和我們的有一點不同的是,很多小圖片會被合并為一個大文件來存儲,每個文件都會有一個對應的索引文件。網上關于FastDFS和TFS的資料也不會太多

我們這邊參考了HDFS的架構,Hadoop分成了三大系統:Hadoop分布式文件系統,分布式計算系統,分布式資源調度系統,尤其是里面的元數據管理架構完全參考了HDFS

193_分布式文件系統的可伸縮架構值的是什么以及如何設計

六大架構

分布式存儲架構:

容錯架構:

高可用架構:除了NameNode需要依賴zk還沒做

可伸縮架構:可以隨時增加或下線機器

高性能架構:讓上傳和下載速度更快

高并發架構:大量客戶端同時連接過來,進行上傳下載

可伸縮架構,對于集群而言,就是可以保證加機器去里面,或者是下線機器都可以實現

加一臺機器,你接下來如何做,假設已經有4臺機器,每臺機器上的磁盤空間都快滿了,這個是很常見的場景,大數據的同學玩HDFS的時候就會有這種情況,已有的幾臺機器的磁盤空間都滿了,無法寫入新的數據了

首先你得保證接下來要優先往最空的這臺機器去寫入數據,接下來在后臺應該啟動一大堆的定時任務,要慢慢的把快滿的4臺機器上的數據逐步逐步的遷移到空的機器上去,緩解已有4臺機器的存儲壓力,可以讓空機器放更多的數據

下線機器,有某一臺機器不需要了,關閉DataNode,把機器干掉。這種情況就可以看做是屬于機器的宕機,NameNode感知到以后,會自動的去進行副本的復制,保證數據不丟失

194_上線新機器之后是否會自動優先往里面寫入數據?

如果每次擴容,一般來說針對我們的這個系統,要么不擴容,要么擴容都是>=2臺機器起步來進行擴容,每次擴容2臺機器,那么在這里一排序,就會優先往2臺空的機器里寫入數據,就可以立馬緩解住已有的4臺機器磁盤快寫滿的壓力

195_到底什么時候應該從磁盤快滿的機器緩慢遷移數據出去

假設每臺機器是100GB存儲空間:

機器01:90GB

機器02:90GB

機器03:90GB

機器04:90GB

新加入兩臺機器:

機器05:0GB

機器06:0GB

把所有的機器全部加起來,算一個平均數,一共360GB,每臺平均60GB

(機器01、機器02、機器03、機器04) ->(機器05、機器06)

舉個例子:機器01 -> 機器05,遷移30GB,首先要確定這30G具體包含對應的是哪些文件。需要生成兩種任務,第一種任務是復制任務。對于機器05而言有復制任務,他需要從機器01復制指定的文件過來

第二種就是刪除任務。對機器01而言就有刪除任務。每隔一段時間,你都可以讓他去進行一次全量存儲的匯報,把這個節點原先的各種存儲信息重新刷新一遍,同時在全量存儲匯報的時候,就可以檢查一下每個圖片的副本數量,如果超出了2個,就生成刪除任務也可以

196_定義一個新接口:手動觸發集群數據的rebalance

上線了新的幾臺機器,立馬就應該執行一個命令

我們可以提供一個用python編寫的腳本,在腳本里基于python調用gRPC提供的接口,調用到Master上去,執行某個命令觸發rebalance,gRPC本來就是支持多語言的

同理,shutdown()之類的接口,我們也可以寫Python腳本,來手動調用shutdown,來優雅關閉

199_實現rebalance的核心算法:集群存儲資源重平衡算法邏輯

假設現在

機器01:90GB

機器02:90GB

機器03:90GB

機器04:90GB

機器05:0GB

機器06:0GB

平均就是應該每臺60G

/*** 這個組件,就是負責管理集群里的所有的datanode的*/
public class DataNodeManager {/*** 為重平衡去創建副本復制的任務*/public void createReBalanceTasks() {// 沖平衡時,加大鎖synchronized(this) {long totalStoredDataSize = 0;for(DataNodeInfo datanode : datanodes.values()) {totalStoredDataSize += datanode.getStoredDataSize();}// 計算集群節點存儲數據的平均值long averageStoredDataSize = totalStoredDataSize / datanodes.size();// 將集群中的節點區分為兩類:遷出節點和遷入節點List<DataNodeInfo> sourceDataNodes = new ArrayList<>();List<DataNodeInfo> destDataNodes = new ArrayList<>();for(DataNodeInfo datanode : datanodes.values()) {// 遷出節點if(datanode.getStoredDataSize() > averageStoredDataSize) {sourceDataNodes.add(datanode);}// 遷入節點if(datanode.getStoredDataSize() < averageStoredDataSize) {destDataNodes.add(datanode);}}// 為遷入節點生成復制的任務,為遷出節點生成刪除的任務// 在這里生成的刪除任務統一放到24小時之后延遲調度執行,咱們可以實現一個延遲調度執行的線程List<RemoveReplicaTask> removeReplicaTasks = new ArrayList<>();for(DataNodeInfo sourceDatanode : sourceDataNodes) {// 當前源數據節點,需要遷移的數據大小long toRemoveDataSize = sourceDatanode.getStoredDataSize() - averageStoredDataSize;for(DataNodeInfo destDatanode : destDataNodes) {// 直接將sourceDatanode要遷移的數據,一次性放到一臺destDatanode機器就可以了if(destDatanode.getStoredDataSize() + toRemoveDataSize <= averageStoredDataSize) {createReBalanceTasks(sourceDatanode, destDatanode, removeReplicaTasks, toRemoveDataSize);break;}// 只能把部分數據放到這臺機器上去else if(destDatanode.getStoredDataSize() < averageStoredDataSize) {// sourceDatanode要遷移的數據,最多只能遷移maxRemoveDataSize的數據,到destDatanode上去// 比如sourceDatanode要遷移的數據一共30G,但是當前的destDatanode最多只能接收15G數據long maxRemoveDataSize = averageStoredDataSize - destDatanode.getStoredDataSize();long removedDataSize = createReBalanceTasks(sourceDatanode, destDatanode, removeReplicaTasks, maxRemoveDataSize);// 將本sourceDatanode節點,待遷移的量toRemoveDataSize,減去本次遷移的量removedDataSizetoRemoveDataSize -= removedDataSize;} }}// 交給一個延遲線程去24小時之后執行刪除副本的任務// 保證開始執行刪除任務時,前面的復制任務已經全部執行完畢了new DelayRemoveReplicaThread(removeReplicaTasks).start(); } }private long createReBalanceTasks(DataNodeInfo sourceDatanode, DataNodeInfo destDatanode,List<RemoveReplicaTask> removeReplicaTasks, long maxRemoveDataSize) {List<String> files = namesystem.getFilesByDatanode(sourceDatanode.getIp(), sourceDatanode.getHostname());long removedDataSize = 0;// 遍歷文件,不停的為每個文件生成一個復制的任務,直到準備遷移的文件的大小// 超過了待遷移總數據量maxRemoveDataSize為止for(String file : files) {String filename = file.split("_")[0];long fileLength = Long.parseLong(file.split("_")[1]);if(removedDataSize + fileLength >= maxRemoveDataSize) {break;}/** 為這個文件生成,針對目標節點的復制任務 */ReplicateTask replicateTask = new ReplicateTask(filename, fileLength, sourceDatanode, destDatanode);// 復制任務時立馬下發的destDatanode.addReplicateTask(replicateTask); destDatanode.addStoredDataSize(fileLength); /** 為這個文件生成,針對源節點的刪除任務 */sourceDatanode.addStoredDataSize(-fileLength); namesystem.removeReplicaFromDataNode(sourceDatanode.getId(), file); RemoveReplicaTask removeReplicaTask = new RemoveReplicaTask(filename, sourceDatanode);// 針對刪除任務,統一攢起來,24小時后一起執行removeReplicaTasks.add(removeReplicaTask);  // 遷移一個文件,就累加一份fileLengthremovedDataSize += fileLength;}return removedDataSize;}/*** 延遲刪除副本的線程*/static class DelayRemoveReplicaThread extends Thread {private final List<RemoveReplicaTask> removeReplicaTasks;public DelayRemoveReplicaThread(List<RemoveReplicaTask> removeReplicaTasks) {this.removeReplicaTasks = removeReplicaTasks;}@Overridepublic void run() {long start = System.currentTimeMillis();while(true) {try {long now = System.currentTimeMillis();if(now - start > 24 * 60 * 60 * 1000) {for(RemoveReplicaTask removeReplicaTask : removeReplicaTasks) {// 真正的將刪除任務下發下去,相應的DataNode下一輪心跳過來,就能認領這些刪除任務removeReplicaTask.getDatanode().addRemoveReplicaTask(removeReplicaTask);  }break;}Thread.sleep(60 * 1000); } catch (Exception e) {e.printStackTrace();}}}}}

200_基于可伸縮架構實現集群擴容支撐海浪數據的存儲

海量數據存儲架構:分布式存儲架構 + 可伸縮架構

高可用+高容錯架構:多副本冗余 + 副本自動遷移(數據節點宕機時) + 冗余副本自動刪除(宕機數據節點重啟后) + 客戶端容錯機制(文件上傳、下載時更換數據節點)

高性能架構:盡可能提升客戶端文件上傳和下載的性能和速度

高并發架構:盡可能讓每個數據節點可以支撐更多的客戶端的并發上傳和下載

201_在分布式文件系統中高并發主要指的是什么?

高并發和高性能的架構改造,這是比較升華的一個部分

第一塊:NameNode,元數據變更,能否承載高并發

第二塊:DataNode,文件的上傳和下載,能否承載高并發

對于NameNode而言,假設高峰時期,一萬個客戶端,同時發起請求要創建文件,一秒鐘內高峰期直接來一萬個請求去訪問NameNode

對于DataNode而言,1000個客戶端連接到DataNode上去,同時進行文件的 上傳和下載,能否扛得住

202_看看NameNode中有哪些接口可能會被高并發的訪問?

上傳接口:create、allocateDataNodes、informReplicaReceived

下載接口:chooseDataNodeFromReplicas

203_分析一下文件上傳的三個接口能否支撐幾千的QPS

NameNode節點,一般在生產部署的時候,肯定是高配置物理機,不會是虛擬機,起碼都是32核128G的配置。這種配置正常情況下,應該一臺機器極限支撐個每秒幾萬的請求都是可以的

比如,一秒鐘來1萬個請求,每個請求排隊獲取鎖,從而進入執行更新文件目錄樹的代碼邏輯。因為是基于純內存的操作,一個請求需要多少時間,1毫秒都不會到,可能一個請求就0.01毫秒,1毫秒可以執行100個請求,一秒就可以執行100 * 1000 = 10w個請求

雖然說有并發邏輯里會加鎖,但是不要緊,只要基于純內存,每個請求速度依然是極快的,就可以做到每秒處理幾萬個請求

絕大部分的創建文件的請求,可能就兩個操作:更新內存里的文件目錄樹 、editlog寫入內存緩沖 -> 0.01毫秒 -> 每秒執行10w次請求沒問題。可能只有隔一段時間才會有一個請求,雙緩沖的currentBuffer滿了的時候,才會輪到這個線程執行一下刷磁盤,并且這個刷磁盤也是順序寫

Kafka之類的中間件系統,其實本質也是大量的基于內存來實現核心邏輯的,在高配置物理機的場支持下,抗下來每秒10萬的QPS完全不是問題

204_分析一下文件上傳的三個接口能否支撐幾千的QPS(2)

平時寫CRUD的業務系統,用不著高并發、IO、網絡、磁盤、Netty、ZK一些技術。Java Web里最復雜的一塊東西,其實是Tomcat,人家Tomcat作為一個Web服務器,他底層就要去做網絡通信監聽某個端口、內存管理、并發控制。你寫的Servlet、SSM,其實就是嵌入在Tomcat容器里,執行的一些業務代碼,你就是CRUD。互聯網系統,緩存,MQ,數據庫,ES,NoSQL,架構設計

但是我們現在自己寫中間件系統,分布式文件系統,微服務注冊中心給完成,這兩個項目搞完有三層意義:第一個,把你底層的技術全部打通,基礎會極度的扎實;第二個,后面看一些開源中間件系統的源碼,會非常的輕松;第三個,這兩個都是工業級的項目,直接是可以在出去面試的時候寫簡歷上的,比如起個名字叫“盤古”分布式圖片存儲系統,替換你的很Low的CRUD的一些項目經歷

每秒10w的文件上傳/下載的請求,在NameNode這塊是沒有任何的瓶頸的,雖然NameNode是單機,但是也是高配的物理機。而有很多的中間件系統是基于zk來做元數據管理,每次更新元數據的時候,都需要走網絡請求,純內存一般就是0.001毫秒~0.01毫秒,這個時候性能就沒辦法保證了

因為只要一走網絡請求,耗時直接就到毫秒級,一個請求過來,你需要去請求zk來做一些事情,直接就會到1毫秒+,幾毫秒,10毫秒,直接會導致你的NameNode承載的并發能力,可能下降到每秒幾千QPS了

雖然,后面我們也會用ZK,但是不是用ZK管理分布式文件系統中的各種元數據,而是負責管理NameNode集群的高可用。我們目前NameNode使用的是單機,沒有辦法做到某臺NameNode掛了以后,自動切換到另一臺NameNode對外提供服務,所以需要ZK的協助

205_DataNode的NIO網絡通信架構能支撐高并發嗎?

后面會把這套分布式圖片存儲系統整合到電商平臺里去,電商平臺中大量的用到了很多的圖片,圖片其實都應該存儲在在這個分布式圖片系統里,對圖片的讀取,主要也是走圖片系統,評論曬圖、商品圖片。主要的壓力就是圖片讀取,評論曬圖的頻率一般都是很低的,畢竟寫評論的人是少數

主要的壓力可能就是來自電商首頁、商品詳情頁,可能會有很多圖片讀取的請求,每秒上萬的請求。但是針對這種情況,你肯定必須得做靜態化圖片的緩存,不可能說每次都從分布式圖片系統里來讀圖片,前置的Nginx本身就可以做靜態圖片的緩存

CDN緩存,大量的靜態資源可以在前置的很多地方做緩存,Nginx、緩存服務器、CDN做緩存和加速,不需要每次都請求到底層的分布式圖片系統里去的

比如說假設你每秒有1萬個請求,一共部署了10臺數據節點,每臺機器要每秒要承擔1000個QPS,目前的一個DataNode架構,每臺機器接收1000個連接和請求能否實現?

目前的網絡NIO通信架構,一個selector線程就需要監聽1000個sockeChannel,后面只有3個worker線程,要同時做兩件事情:1. 解析自定義的二進制通信協議的請求之外(很快),2. 最核心最笨重的就是執行本地磁盤的讀寫邏輯(很慢)

這個架構最大的問題,就是將解析請求和磁盤IO混在了一起,如果其中一個磁盤IO卡住了,那么這個worker負責的后續的所有請求的處理都會跟著受影響,從而導致高并發過來后,會產生大量積壓

206_基于Reactor模式重新設計DataNode的網絡通信架構

直接參考Kafka服務端的網絡通信架構,就是基于如下的Reactor模式來實現的

1000個客戶端同時連接過來,發送請求,高并發的場景下,用Reactor模式來支撐是很輕松的

10個Processor線程,每個線程也就處理100個客戶端

30個IO線程:執行比較慢的磁盤IO操作

207_重寫DataNode的NioServer讓其僅僅監聽客戶端連接請求

208_讓NioServer將建立好的連接均勻分發給Processor線程

209_Processor線程將均勻分配的連接注冊到自己的Selector上

210_在一個循環中以限時阻塞的方式完成客戶端請求的感知

NioProcessor

/*** 負責解析請求以及發送響應的線程*/
public class NioProcessor extends Thread {/*** 多路復用監聽時的最大阻塞時間*/public static final Long POLL_BLOCK_MAX_TIME = 1000L;// 等待注冊的網絡連接的隊列private ConcurrentLinkedQueue<SocketChannel> channelQueue = new ConcurrentLinkedQueue<SocketChannel>();// 每個Processor私有的Selector多路復用器private Selector selector;public NioProcessor() {try {this.selector = Selector.open();} catch (IOException e) {e.printStackTrace();}}/*** 給這個Processor線程分配一個網絡連接*/public void addChannel(SocketChannel channel) {channelQueue.offer(channel);// 喚醒在POLL_BLOCK_MAX_TIME處,等待的selectorselector.wakeup();}/*** 線程的核心主邏輯*/@Overridepublic void run() {while(true) {try {// 注冊排隊等待的連接registerQueuedClients();// 以限時阻塞的方式感知連接中的請求poll();} catch (Exception e) {e.printStackTrace();  }}}/*** 將排隊中的等待注冊的連接注冊到Selector上去*/private void registerQueuedClients() {SocketChannel channel = null;while((channel = channelQueue.poll()) != null) {try {channel.register(selector, SelectionKey.OP_READ);} catch (ClosedChannelException e) {e.printStackTrace();}}}/*** 以多路復用的方式來監聽各個連接的請求*/private void poll() {try {// 以限時阻塞的方式完成客戶端請求的感知(在一個循環中)int keys = selector.select(POLL_BLOCK_MAX_TIME);if(keys > 0) {Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();while(keyIterator.hasNext()) {SelectionKey key = keyIterator.next();keyIterator.remove();// 如果接受到了某個客戶端的請求if(key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();}}}} catch (Exception e) {e.printStackTrace();}}}

注意這里的selector.wakeup()的使用場景

213_封裝NetworkRequest來解析與抽取一個完整的請求

NetworkRequest

/*** 從channel中讀取一次二進制格式的網絡請求數據解析,并封裝為一個NetworkRequest對象NetworkRequest對象中的CachedRequest的內部,會記錄解析出來的文件名,文件內容等信息 */
public class NetworkRequest {public static final Integer REQUEST_SEND_FILE = 1;public static final Integer REQUEST_READ_FILE = 2;// 本次網絡請求對應的連接private SelectionKey key;// 本次網絡請求對應的連接private SocketChannel channel;// 緩存中的數據private final CachedRequest cachedRequest = new CachedRequest();private ByteBuffer cachedRequestTypeBuffer;private ByteBuffer cachedFilenameLengthBuffer;private ByteBuffer cachedFilenameBuffer;private ByteBuffer cachedFileLengthBuffer;private ByteBuffer cachedFileBuffer;public SelectionKey getKey() {return key;}public void setKey(SelectionKey key) {this.key = key;}public SocketChannel getChannel() {return channel;}public void setChannel(SocketChannel channel) {this.channel = channel;}/*** 從網絡連接中讀取與解析出來一個請求*/public void read() {try {// 假如說你這個一次讀取的數據里包含了多個文件的話// 這個時候我們會先讀取文件名,然后根據文件的大小去讀取這么多的數據// 需要先提取出來這次請求是什么類型:1 發送文件;2 讀取文件Integer requestType = null;if(cachedRequest.requestType != null) {requestType = cachedRequest.requestType;} else {requestType = getRequestType(channel); // 但是此時channel的position肯定也變為了4}if(requestType == null) {return;}System.out.println("從請求中解析出來請求類型:" + requestType); // 拆包,就是說人家一次請求,本來是包含了:requestType + filenameLength + filename [+ imageLength + image]// 這次OP_READ事件,就讀取到了requestType的4個字節中的2個字節,剩余的數據// 就被放在了下一次OP_READ事件中了if(REQUEST_SEND_FILE.equals(requestType)) {handleSendFileRequest(channel, key);} else if(REQUEST_READ_FILE.equals(requestType)) {handleReadFileRequest(channel, key);}} catch (Exception e) {e.printStackTrace();}}/*** 獲取本次請求的類型*/public Integer getRequestType(SocketChannel channel) throws Exception {Integer requestType = null;if(cachedRequest.requestType != null) {return cachedRequest.requestType;}ByteBuffer requestTypeBuffer = null;if(cachedRequestTypeBuffer != null) {requestTypeBuffer = cachedRequestTypeBuffer;} else {requestTypeBuffer = ByteBuffer.allocate(4);}channel.read(requestTypeBuffer);  // 此時requestType ByteBuffer,position跟limit都是4,remaining是0if(!requestTypeBuffer.hasRemaining()) {// 已經讀取出來了4個字節,可以提取出來requestType了requestTypeBuffer.rewind(); // 將position變為0,limit還是維持著4requestType = requestTypeBuffer.getInt();cachedRequest.requestType = requestType;} else {cachedRequestTypeBuffer = requestTypeBuffer;  }return requestType;}/*** 是否已經完成了一個請求的讀取* @return*/public Boolean hasCompletedRead() {return cachedRequest.hasCompletedRead;}/*** 獲取文件名同時轉換為本地磁盤目錄中的絕對路徑* @param channel* @return* @throws Exception*/private Filename getFilename(SocketChannel channel) throws Exception {Filename filename = new Filename(); if(cachedRequest.filename != null) {return cachedRequest.filename;} else {String relativeFilename = getRelativeFilename(channel);if(relativeFilename == null) {return null;}String absoluteFilename = getAbsoluteFilename(relativeFilename);// /image/product/iphone.jpgfilename.relativeFilename = relativeFilename;filename.absoluteFilename = absoluteFilename;cachedRequest.filename = filename;} return filename;}/*** 獲取相對路徑的文件名*/private String getRelativeFilename(SocketChannel channel) throws Exception {Integer filenameLength = null;String filename = null;// 讀取文件名的大小if(cachedRequest.filenameLength == null) {ByteBuffer filenameLengthBuffer = null;if(cachedFilenameLengthBuffer != null) {filenameLengthBuffer = cachedFilenameLengthBuffer;} else {filenameLengthBuffer = ByteBuffer.allocate(4);}channel.read(filenameLengthBuffer); if(!filenameLengthBuffer.hasRemaining()) { filenameLengthBuffer.rewind();filenameLength = filenameLengthBuffer.getInt();cachedRequest.filenameLength = filenameLength;} else {cachedFilenameLengthBuffer = filenameLengthBuffer;return null;}}// 讀取文件名ByteBuffer filenameBuffer = null;if(cachedFilenameBuffer != null) {filenameBuffer = cachedFilenameBuffer;} else {filenameBuffer = ByteBuffer.allocate(filenameLength);}channel.read(filenameBuffer);if(!filenameBuffer.hasRemaining()) {filenameBuffer.rewind();filename = new String(filenameBuffer.array());  } else {cachedFilenameBuffer = filenameBuffer;}return filename;}/*** 獲取文件在本地磁盤上的絕對路徑名*/private String getAbsoluteFilename(String relativeFilename) {String[] relativeFilenameSplited = relativeFilename.split("/"); String dirPath = DATA_DIR;for(int i = 0; i < relativeFilenameSplited.length - 1; i++) {if(i == 0) {continue;}dirPath += "\\" + relativeFilenameSplited[i];}File dir = new File(dirPath);if(!dir.exists()) {dir.mkdirs();}String absoluteFilename = dirPath + "\\" + relativeFilenameSplited[relativeFilenameSplited.length - 1];return absoluteFilename;}/*** 從網絡請求中獲取文件大小*/private Long getFileLength(SocketChannel channel) throws Exception {Long fileLength = null;if(cachedRequest.fileLength != null) {return cachedRequest.fileLength;} else {ByteBuffer fileLengthBuffer = null;if(cachedFileLengthBuffer != null) {  fileLengthBuffer = cachedFileLengthBuffer;} else {fileLengthBuffer = ByteBuffer.allocate(8);}channel.read(fileLengthBuffer);if(!fileLengthBuffer.hasRemaining()) {fileLengthBuffer.rewind();fileLength = fileLengthBuffer.getLong();cachedRequest.fileLength = fileLength;} else {cachedFileLengthBuffer = fileLengthBuffer;}}return fileLength;}/*** 發送文件*/private void handleSendFileRequest(SocketChannel channel, SelectionKey key) throws Exception {// 從請求中解析文件名Filename filename = getFilename(channel); System.out.println("從網絡請求中解析出來文件名:" + filename); if(filename == null) {return;}// 從請求中解析文件大小Long fileLength = getFileLength(channel); System.out.println("從網絡請求中解析出來文件大小:" + fileLength); if(fileLength == null) {return;}// 循環不斷的從channel里讀取數據,并寫入磁盤文件ByteBuffer fileBuffer = null;if(cachedFileBuffer != null) {fileBuffer = cachedFileBuffer;} else {fileBuffer = ByteBuffer.allocate(Integer.parseInt(String.valueOf(fileLength)));}channel.read(fileBuffer);if(!fileBuffer.hasRemaining()) {fileBuffer.rewind();cachedRequest.file = fileBuffer;cachedRequest.hasCompletedRead = true;System.out.println("本次文件上傳請求讀取完畢.......");  } else {cachedFileBuffer = fileBuffer;System.out.println("本次文件上傳出現拆包問題,緩存起來,下次繼續讀取.......");  }}/*** 讀取文件*/private void handleReadFileRequest(SocketChannel channel, SelectionKey key) throws Exception {// 從請求中解析文件名// 已經是:F:\\development\\tmp1\\image\\product\\iphone.jpgFilename filename = getFilename(channel); System.out.println("從網絡請求中解析出來文件名:" + filename); if(filename == null) {return;}cachedRequest.hasCompletedRead = true;}/*** 文件名*/class Filename {// 相對路徑名String relativeFilename;// 絕對路徑名String absoluteFilename;@Overridepublic String toString() {return "Filename [relativeFilename=" + relativeFilename + ", absoluteFilename=" + absoluteFilename + "]";}}/*** 緩存文件*/class CachedRequest {Integer requestType;Filename filename;Integer filenameLength;Long fileLength;ByteBuffer file;Boolean hasCompletedRead = false;}}

214_將讀取完畢的網絡請求分發到全局的請求隊列中

NetworkRequestQueue?

package com.zhss.dfs.datanode.server;import java.util.concurrent.ConcurrentLinkedQueue;/*** 公共網絡請求存放隊列*/
public class NetworkRequestQueue {private static volatile NetworkRequestQueue instance = null;public static NetworkRequestQueue get() {if(instance == null) {synchronized(NetworkRequestQueue.class) {if(instance == null) {instance = new NetworkRequestQueue();}}}return instance;}// 一個全局的請求隊列private final ConcurrentLinkedQueue<NetworkRequest> requestQueue = new ConcurrentLinkedQueue<NetworkRequest>();public void offer(NetworkRequest request) {requestQueue.offer(request);}public NetworkRequest poll() {return requestQueue.poll();}}

216_實現IO線程從請求隊列中爭搶請求以及執行磁盤IO操作

IOThread

/*** 負責執行磁盤IO的線程*/
public class IOThread extends Thread {public static final Integer REQUEST_SEND_FILE = 1;public static final Integer REQUEST_READ_FILE = 2;// 拿出單例的公共請求存放隊列private final NetworkRequestQueue requestQueue = NetworkRequestQueue.get();private final NameNodeRpcClient namenode;public IOThread(NameNodeRpcClient namenode) {this.namenode = namenode;}@Overridepublic void run() {while(true) {try {NetworkRequest request = requestQueue.poll();if(request == null) {Thread.sleep(100);continue;}Integer requestType = request.getRequestType();if(requestType.equals(REQUEST_SEND_FILE)) {// 對于上傳文件,將文件寫入本地磁盤即可writeFileToLocalDisk(request);} else if(requestType.equals(REQUEST_READ_FILE)) {// 對于下載文件,從本地磁盤讀取文件readFileFromLocalDisk(request);}} catch (Exception e) {e.printStackTrace();}}}private void readFileFromLocalDisk(NetworkRequest request) throws Exception {FileInputStream localFileIn = null;FileChannel localFileChannel = null;try {File file = new File(request.getAbsoluteFilename());Long fileLength = file.length();localFileIn = new FileInputStream(request.getAbsoluteFilename());    localFileChannel = localFileIn.getChannel();// 循環不斷的從channel里讀取數據,并寫入磁盤文件ByteBuffer buffer = ByteBuffer.allocate(8 + Integer.parseInt(String.valueOf(fileLength)));buffer.putLong(fileLength);int hasReadImageLength = localFileChannel.read(buffer);System.out.println("從本次磁盤文件中讀取了" + hasReadImageLength + " bytes的數據"); buffer.rewind();} finally {if(localFileChannel != null) {localFileChannel.close();}if(localFileIn != null) {localFileIn.close();}}}private void writeFileToLocalDisk(NetworkRequest request) throws Exception {// 構建針對本地文件的輸出流FileOutputStream localFileOut = null;FileChannel localFileChannel = null;try {localFileOut = new FileOutputStream(request.getAbsoluteFilename());    localFileChannel = localFileOut.getChannel();localFileChannel.position(localFileChannel.size());System.out.println("對本地磁盤文件定位到position=" + localFileChannel.size()); int written = localFileChannel.write(request.getFile());System.out.println("本次文件上傳完畢,將" + written + " bytes的數據寫入本地磁盤文件.......");  // 增量上報Master節點自己接收到了一個文件的副本// /image/product/iphone.jpgnamenode.informReplicaReceived(request.getRelativeFilename() + "_" + request.getFileLength());System.out.println("增量上報收到的文件副本給NameNode節點......"); } finally {localFileChannel.close();localFileOut.close();}}}

217_完成磁盤IO之后封裝響應并且放入對應的響應隊列中

220_整體走讀Reactor模式重構的網絡通信架構的代碼流程

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/92106.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/92106.shtml
英文地址,請注明出處:http://en.pswp.cn/web/92106.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

芯祥科技:工業/車規級BMS芯片廠商 規格選型對比

芯祥科技公司專注于工業和車規級BMS芯片&#xff0c;電源芯片及可編程模擬芯片的研發與銷售&#xff0c;客戶遍及新能源儲能&#xff0c;汽車&#xff0c;電腦&#xff0c;服務器及電動工具等領域。并具有創業公司成功經驗&#xff0c;平均具有逾17年以上的芯片研發和市場銷售經…

莫隊基礎(Mo‘s algorithm)

莫隊算法簡介 莫隊算法是一種用于高效處理離線區間查詢問題的算法&#xff0c;由莫濤&#xff08;Mo Tao&#xff09;在2009年提出。其核心思想是通過對查詢區間進行分塊和排序&#xff0c;利用前一次查詢的結果來減少計算量&#xff0c;從而將時間復雜度優化至接近線性。 莫…

板卡兩個ADC,一個JESD204b sync正常,另一個JESD204B同步不上的問題

目錄 1.問題來源: 2.問題分析 進一步測試表現: 抓取204B高速鏈路數據如上所示。 說明不是配置流程的問題 1.問題來源: 在工控機上和部分電腦上面出現時鐘鎖不住的現象,無法正常使用板卡。 經過分析,發現板卡上有兩片ADC,其中一片的ADC的sync信號經過測量,是正常的,…

Android10 系統休眠調試相關

Android10 系統休眠調試相關實時打印休眠日志(實測好像沒作用)&#xff1a;echo 1 > /sys/module/printk/parameters/console_suspend查看喚醒鎖&#xff1a;cat sys/power/wake_lock msm8953_64:/ # cat sys/power/wake_lock PowerManager.SuspendLockout PowerManagerServ…

一文掌握Bard機器翻譯,以及用python調用的4種方式(現已升級為 Gemini)

文章目錄一、Bard機器翻譯概述1.1. Bard機器翻譯介紹1.2 Bard機器翻譯的核心特點1.3 技術背景1.4 與同類模型對比二、Bard機器翻譯案例2.1 官方 REST API&#xff08;推薦生產&#xff09;2.2 通過Google Cloud API調用2.3 私有化部署方案2.4 開源鏡像 PyBard&#xff08;無需 …

Kafka-Eagle 安裝

Kafka-Eagle官網 1&#xff09;上傳壓縮包 kafka-eagle-bin-2.0.8.tar.gz 到集群第一臺的/opt/modules 目錄 2&#xff09;解壓到本地 tar -zxvf kafka-eagle-bin-2.0.8.tar.gz 3&#xff09;將 efak-web-2.0.8-bin.tar.gz 解壓至/opt/installs cd kafka-eagle-bin-2.0.8 …

接口請求的后臺發起確認

場景講解做業務開發時經常遇到這些場景&#xff0c;在后端代碼執行命中了些業務規則&#xff0c;需要前端用戶確認一下再往下執行。示例1&#xff1a;后端判斷申請1筆超過5萬的資金時會發起監管流程&#xff0c;告訴前端操作用戶風險并詢問是否確認執行。示例2&#xff1a;數據…

完整學習MySQL

DML 等術語概念 DML&#xff08;Data Manipulation Language&#xff0c;數據操縱語言&#xff09;&#xff1a; DML主要用于插入、更新、刪除和查詢數據庫中的數據。常見的DML語句包括&#xff1a; INSERT&#xff1a;用于向表中插入新的數據行。UPDATE&#xff1a;用于修改…

大模型筆記1——李宏毅《2025機器學習》第一講

本篇筆記內容1、學習本節課需要的前置知識了解大模型的訓練過程&#xff1a;預訓練、后訓練、強化學習&#xff08;2024年生成式AI導論前8講&#xff09;了解基礎機器學習、深度學習概念&#xff08;如transformer&#xff09;&#xff08;2021年機器學習課程&#xff09;2、本…

CSS scrollbar-width:輕松定制滾動條寬度的隱藏屬性

在前端設計中&#xff0c;滾動條往往是一個容易被忽略的細節。默認的滾動條樣式常常與頁面設計格格不入&#xff0c;尤其是寬度 —— 過寬的滾動條會擠占內容空間&#xff0c;過窄又可能影響用戶操作。而 CSS 的scrollbar-width屬性&#xff0c;就像一把 “精細的尺子”&#x…

小迪23年-28~31-js簡單回顧

前端-js開發 課堂完結后欲復習鞏固也方便后續-重游-故寫此篇 從實現功能過渡到涉及的相關知識點 知識點 1、 JS 是前端語言&#xff0c;是可以被瀏覽器“看到”的&#xff0c;當然也可以被修改啊&#xff0c;被瀏覽器禁用網頁的 JS 功能啊之類的。所以一般都是前后端分離開發&…

JavaScript 概述

JavaScript 是一種高級、解釋型編程語言&#xff0c;主要用于網頁開發&#xff0c;使其具備動態交互功能。它是網頁三大核心技術之一&#xff08;HTML、CSS、JavaScript&#xff09;&#xff0c;能夠直接嵌入 HTML 頁面并在瀏覽器中執行。核心特性動態弱類型語言 JavaScript 是…

Mermaid流程圖可視化系統:基于Spring Boot與Node.js的三層架構實現

什么是Mermaid?系統架構設計 三層架構 overview架構交互流程 核心組件詳解 1. Spring Boot后端2. Node.js中間層3. 前端界面 功能實現 1. 節點和關系管理2. 流程圖渲染3. 主題切換4. 導出功能 使用指南 啟動步驟頁面操作 總結與展望 什么是Mermaid? Mermaid流程圖可視化系統…

R 數據框:高效數據處理與分析的利器

R 數據框:高效數據處理與分析的利器 引言 在數據科學和統計分析領域,R語言因其強大的數據處理能力和豐富的統計模型而備受推崇。R數據框(data frame)是R語言中一種重要的數據結構,它以表格形式存儲數據,使得數據的組織、操作和分析變得簡單高效。本文將深入探討R數據框…

論文閱讀筆記:《Curriculum Coarse-to-Fine Selection for High-IPC Dataset Distillation》

論文閱讀筆記&#xff1a;《Curriculum Coarse-to-Fine Selection for High-IPC Dataset Distillation》1.背景與動機2.核心貢獻3.方法詳解4.實驗結果與貢獻主體代碼算法整體邏輯CVPR25 github 一句話總結&#xff1a; CCFS基于組合范式&#xff08;軌跡匹配選擇真實圖像&…

【Linux系統】詳解,進程控制

前言&#xff1a; 上文我們講到了Linux中的虛擬空間地址&#xff0c;知道了一個進程對應一個虛擬地址空間&#xff0c;虛擬空間地址與物理地址之間通過頁表映射....【Linux】虛擬地址空間-CSDN博客 本文我們來講一講Linux系統是如何控制進程的&#xff01; 如果喜歡本期文章&am…

Matplotlib(五)- 繪制子圖

文章目錄一、子圖概述1. 子圖介紹2. 子圖布局2.1 網格布局2.2 自由布局二、繪制等分區域子圖1. 使用 plt.subplot() 繪制子圖示例&#xff1a;繪制多個子圖示例&#xff1a;工業月度同比情況2. 使用 plt.subplots() 繪制子圖示例&#xff1a;繪制多個子圖示例&#xff1a;部分國…

C++中互斥鎖、共享鎖深度解析

一&#xff0c;互斥鎖互斥鎖&#xff08;Mutex&#xff0c;全稱 Mutual Exclusion&#xff09;是并發編程中用于保護共享資源的核心同步機制。它通過確保同一時間僅有一個線程訪問臨界區&#xff08;Critical Section&#xff09;&#xff0c;解決多線程環境下的數據競爭和不一…

Qt中的QWebSocket 和 QWebSocketServer詳解:從協議說明到實際應用解析

前言 本篇圍繞 QWebSocket 和 QWebSocketServer&#xff0c;從協議基礎、通信模式、數據傳輸特點等方面展開&#xff0c;結合具體接口應用與實戰案例進行說明。 在實時網絡通信領域&#xff0c;WebSocket 技術以其獨特的全雙工通信能力&#xff0c;成為連接客戶端與服務器的重要…

機器學習 —— 決策樹

機器學習 —— 決策樹&#xff08;Decision Tree&#xff09;詳細介紹決策樹是一種直觀且易于解釋的監督學習算法&#xff0c;廣泛應用于分類和回歸任務。它通過模擬人類決策過程&#xff0c;將復雜問題拆解為一系列簡單的判斷規則&#xff0c;最終形成類似 “樹” 狀的結構。以…