原文:http://blog.csdn.net/thomas0yang/article/details/41211259
----------------------------------------------------------------------------------------------
1、RPC框架概述
1.1 RPC(Remote Procedure Call Protocol)——遠程過程調用協議,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通信程序之間攜帶信息數據。在OSI網絡通信模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分布式多程序在內的應用程序更加容易。
1.2 RPC通常采用客戶端服務器模型,其框架主要有以下幾部分
- 通信模塊:實現請求應該協議。主要分為同步方式和異步方式。
- stub程序:客戶端和服務器均包含stub程序,可以看做代理程序。使得遠程函數表現的跟本地調用一樣,對用戶程序完全透明。
- 調度程序:接受來自通信模塊的請求消息,根據標識選擇stub程序處理。并發量大一般采用線程池處理。
- 客戶程序/服務過程:請求發出者和請求的處理者。
1.3 RPC流程圖
2、Hadoop RPC基本框架
2.1Hadoop RPC的使用方法見代碼
服務 public interface MyBiz extends VersionedProtocol { ??? long PROTOCOL_VERSION = 12321443L; ??? String hello(String name); } public class MyBizImpl implements MyBiz { ??? @Override ??? public long getProtocolVersion(String arg0, long arg1) throws IOException { ??????? return PROTOCOL_VERSION; ??? } ??? @Override ??? public String hello(String name) { ??????? System. out.println( "invoked"); ??????? return "hello " + name; ??? } } 服務器 public class MyServer { ??? public static final String SERVER_ADDRESS = "localhost"; ??? public static final int SERVER_PORT = 12345; ??? public static void main(String[] args) throws IOException { ??????? Server server = RPC. getServer(new MyBizImpl(), SERVER_ADDRESS, SERVER_PORT , new Configuration()); ??????? server.start(); ??? } } 客戶端 public class MyClient { ??? public static void main(String[] args) throws IOException { ??????? MyBiz proxy = (MyBiz) RPC. getProxy(MyBiz.class, MyBiz.PROTOCOL_VERSION, ??????????????? new InetSocketAddress(MyServer. SERVER_ADDRESS,MyServer.SERVER_PORT), ??????????????? new Configuration()); ??????? String result = proxy.hello( "5"); ??????? System. out.println(result); ??????? RPC.stopProxy(proxy); ??? } } |
2.2 org.apache.hadoop.ipc.RPC類解析
RPC類主要包含三部分:
- ClientCache(成員變量):根據用戶提供的SocketFactory來緩存Client對象,以便重用Client對象。
- Server(內部類):繼承Server抽象類,利用反射實現了call方法,即客戶端請求的方法和對應參數完成方法調用。
- Invocation(內部類):將要調用的方法名和參數打包成可序列化的對象,方便客戶端和服務器之間傳遞。
2.3 客戶端和服務器端的關系
- Client-NameNode之間,其中NameNode是服務器
- Client-DataNode之間,其中DataNode是服務器
- DataNode-NameNode之間,其中NameNode是服務器
- DataNode-DateNode之間,其中某一個DateNode是服務器,另一個是客戶端
2.4?org.apache.hadoop.ipc.Client類解析
2.4.1 Client類中主要包含:
- Call(內部類):封裝了一個RPC請求,包含5個成員變量,唯一表示id、函數調用信息param、函數返回值value、函數異常信息error、函數完成標識done。Hadoop rpc?server采用異步方式處理客戶端請求,使得遠程過程調用的發生順序和返回順序無直接關系,而客戶端正是通過id識別不同的函數調用。當客戶端向服務器發送請求,只需填充id和param兩個變量,其余3個變量由服務器端根據函數執行情況填充。
- Connection(內部類,一個線程):是client和server之間的一個通信連接,封裝了連接先關的基本信息和操作。基本信息包括:通信連接唯一標識remoteId(ConnectionId)、與Server端通信的scoket、網絡輸入輸出流in/out、保存RPC請求的哈希表calls(Hashtable<Integer, Call>)。操作包括:addCall將一個Call對象添加到哈希表中;sendParam想服務器端發送RPC請求;receiveResponse從服務器端接收已經處理完成的RPC請求;run調用receiveResponse方法,等待返回結果。
- ConnectionId(內部類):連接的標記(包括server地址,協議,其他一些連接的配置項信息)
- ParallelCall(內部類):實現并行調用的請求
- ParallelResults(內部類):并行調用的執行結果
2.4.2 Client類中主要對外通過兩個接口,分別用于單個遠程調用和批量遠程調用。
public Writable call(Writable param, ConnectionId remoteId)? throws InterruptedException, IOExceptionpublic Writable call(Writable param, InetSocketAddress addr,? Class<?> protocol, UserGroupInformation ticket,int rpcTimeout, Configuration conf)? throws InterruptedException, IOException
2.4.3 調用流程分析,當調用call函數執行某個遠程方法時,有以下幾個步驟:
1)創建一個Connection對象,并將遠程方法調用信息封裝成Call對象,放到Connection對象中的哈希表中;
2)調用Connection類中的sendRpcRequest()方法將當前Call對象發送給Server端;
3)Server端處理完RPC請求后,將結果通過網絡返回給Client端,Client端通過receiveRpcResponse()函數獲取結果;
4)Client檢查結果處理狀態(成功還是失敗),并將對應Call對象從哈希表中刪除。
2)調用Connection類中的sendRpcRequest()方法將當前Call對象發送給Server端;
3)Server端處理完RPC請求后,將結果通過網絡返回給Client端,Client端通過receiveRpcResponse()函數獲取結果;
4)Client檢查結果處理狀態(成功還是失敗),并將對應Call對象從哈希表中刪除。
2.4.4 一個Client包含多個連接,private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();
2.5?org.apache.hadoop.ipc.Server類解析
2.5.1 背景
Hadoop采用了Master/Slave結構,其中Master是整個系統的單點,如NameNode或JobTracker,這是制約系統性能和可擴展性的最關鍵因素之一;而Master通過ipc.Server接收并處理所有Slave發送的請求,這就要求ipc.Server 將高并發和可擴展性作為設計目標。為此,ipc.Server采用了很多提高并發處理能力的技術,主要包括線程池、事件驅動和Reactor設計模式等,這些技術均采用了JDK自帶的庫實現,這里重點分析它是如何利用Reactor設計模式提高整體性能的。
2.5.2 reactor設計模式
Reactor是并發編程中的一種基于事件驅動的設計模式,它具有以下兩個特點:通過派發/分離I/O操作事件提高系統的并發性能;提供了粗粒度的并發控制,使用單線程實現,避免了復雜的同步處理。典型的Reactor實現原理如圖所示。
典型的Reactor模式中主要包括以下幾個角色。
- Reactor:I/O事件的派發者。
- Acceptor:接受來自Client的連接,建立與Client對應的Handler,并向Reactor注冊此Handler。
- Handler:與一個Client通信的實體,并按一定的過程實現業務的處理。Handler內部往往會有更進一步的層次劃分,用來抽象諸如read、decode、compute、encode和send等過程。在Reactor模式中,業務邏輯被分散的I/O事件所打破,所以Handler需要有適當的機制在所需的信息還不全(讀到一半)的時候保存上下文,并在下一次I/O事件到來的時候(另一半可讀)能繼續上次中斷的處理。
- Reader/Sender:為了加速處理速度,Reactor模式往往構建一個存放數據處理線程的線程池,這樣數據讀出后,立即扔到線程池中等待后續處理即可。為此,Reactor模式一般分離Handler中的讀和寫兩個過程,分別注冊成單獨的讀事件和寫事件,并由對應的Reader和Sender線程處理。
2.5.3 java nio代碼實例
package com.sohu.tv.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * NIO服務端 * @author 小路 */ public class NIOServer { ??? //通道管理器 ??? private Selector selector; ??? /** ???? * 獲得一個ServerSocket通道,并對該通道做一些初始化的工作 ???? * @param port? 綁定的端口號 ???? * @throws IOException ???? */ ??? public void initServer(int port) throws IOException { ??????? // 獲得一個ServerSocket通道 ??????? ServerSocketChannel serverChannel = ServerSocketChannel.open(); ??????? // 設置通道為非阻塞 ??????? serverChannel.configureBlocking(false); ??????? // 將該通道對應的ServerSocket綁定到port端口 ??????? serverChannel.socket().bind(new InetSocketAddress(port)); ??????? // 獲得一個通道管理器 ??????? this.selector = Selector.open(); ??????? //將通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_ACCEPT事件,注冊該事件后, ??????? //當該事件到達時,selector.select()會返回,如果該事件沒到達selector.select()會一直阻塞。 ??????? serverChannel.register(selector, SelectionKey.OP_ACCEPT); ??? } ??? /** ???? * 采用輪詢的方式監聽selector上是否有需要處理的事件,如果有,則進行處理 ???? * @throws IOException ???? */ ??? @SuppressWarnings("unchecked") ??? public void listen() throws IOException { ??????? System.out.println("服務端啟動成功!"); ??????? // 輪詢訪問selector ??????? while (true) { ??????????? //當注冊的事件到達時,方法返回;否則,該方法會一直阻塞 ????????????selector.select(); ??????????? // 獲得selector中選中的項的迭代器,選中的項為注冊的事件 ??????????? Iterator ite = this.selector.selectedKeys().iterator(); ??????????? while (ite.hasNext()) { ??????????????? SelectionKey key = (SelectionKey) ite.next(); ??????????????? // 刪除已選的key,以防重復處理 ??????????????? ite.remove(); ??????????????? // 客戶端請求連接事件 ??????????????? if (key.isAcceptable()) { ??????????????????? ServerSocketChannel server = (ServerSocketChannel) key ??????????????????????????? .channel(); ??????????????????? // 獲得和客戶端連接的通道 ??????????????????? SocketChannel channel = server.accept(); ??????????????????? // 設置成非阻塞 ??????????????????? channel.configureBlocking(false); ??????????????????? //在這里可以給客戶端發送信息哦 ??????????????????? channel.write(ByteBuffer.wrap(new String("向客戶端發送了一條信息").getBytes())); ??????????????????? //在和客戶端連接成功之后,為了可以接收到客戶端的信息,需要給通道設置讀的權限。 ????????????????????channel.register(this.selector, SelectionKey.OP_READ); ??????????????????? // 獲得了可讀的事件 ??????????????? } else if (key.isReadable()) { ??????????????????? read(key); ??????????????? } ??????????? } ??????? } ??? } ??? /** ???? * 處理讀取客戶端發來的信息 的事件 ???? * @param key ???? * @throws IOException ???? */ ??? public void read(SelectionKey key) throws IOException{ ??????? // 服務器可讀取消息:得到事件發生的Socket通道 ??????? SocketChannel channel = (SocketChannel) key.channel(); ??????? // 創建讀取的緩沖區 ??????? ByteBuffer buffer = ByteBuffer.allocate(10); ??????? channel.read(buffer); ??????? byte[] data = buffer.array(); ??????? String msg = new String(data).trim(); ??????? System.out.println("服務端收到信息:"+msg); ??????? ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes()); ??????? channel.write(outBuffer);// 將消息回送給客戶端 ??? } ??? /** ???? * 啟動服務端測試 ???? * @throws IOException ???? */ ??? public static void main(String[] args) throws IOException { ??????? NIOServer server = new NIOServer(); ??????? server.initServer(8000); ??????? server.listen(); ??? } } package com.sohu.tv.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * NIO客戶端 * @author 小路 */ public class NIOClient { ??? //通道管理器 ??? private Selector selector; ??? /** ???? * 獲得一個Socket通道,并對該通道做一些初始化的工作 ???? * @param ip 連接的服務器的ip ???? * @param port? 連接的服務器的端口號 ???? * @throws IOException ???? */ ??? public void initClient(String ip,int port) throws IOException { ??????? // 獲得一個Socket通道 ??????? SocketChannel channel = SocketChannel.open(); ??????? // 設置通道為非阻塞 ??????? channel.configureBlocking(false); ??????? // 獲得一個通道管理器 ??????? this.selector = Selector.open(); ??????? // 客戶端連接服務器,其實方法執行并沒有實現連接,需要在listen()方法中調 ??????? //用channel.finishConnect();才能完成連接 ??????? channel.connect(new InetSocketAddress(ip,port)); ??????? //將通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_CONNECT事件。 ??????? channel.register(selector, SelectionKey.OP_CONNECT); ??? } ??? /** ???? * 采用輪詢的方式監聽selector上是否有需要處理的事件,如果有,則進行處理 ???? * @throws IOException ???? */ ??? @SuppressWarnings("unchecked") ??? public void listen() throws IOException { ??????? // 輪詢訪問selector ??????? while (true) { ??????????? selector.select(); ??????????? // 獲得selector中選中的項的迭代器 ??????????? Iterator ite = this.selector.selectedKeys().iterator(); ??????????? while (ite.hasNext()) { ??????????????? SelectionKey key = (SelectionKey) ite.next(); ??????????????? // 刪除已選的key,以防重復處理 ??????????????? ite.remove(); ??????????????? // 連接事件發生 ??????????????? if (key.isConnectable()) { ??????????????????? SocketChannel channel = (SocketChannel) key ??????????????????????????? .channel(); ??????????????????? // 如果正在連接,則完成連接 ??????????????????? if(channel.isConnectionPending()){ ??????????????????????? channel.finishConnect(); ??????????????????? } ??????????????????? // 設置成非阻塞 ??????????????????? channel.configureBlocking(false); ??????????????????? //在這里可以給服務端發送信息哦 ??????????????????? channel.write(ByteBuffer.wrap(new String("向服務端發送了一條信息").getBytes())); ??????????????????? //在和服務端連接成功之后,為了可以接收到服務端的信息,需要給通道設置讀的權限。 ??????????????????? channel.register(this.selector, SelectionKey.OP_READ); ??????????????????? // 獲得了可讀的事件 ??????????????? } else if (key.isReadable()) { ??????????????????? read(key); ??????????????? } ??????????? } ??????? } ??? } ??? /** ???? * 處理讀取服務端發來的信息 的事件 ???? * @param key ???? * @throws IOException ???? */ ??? public void read(SelectionKey key) throws IOException{ ??????? //和服務端的read方法一樣 ??? } ??? /** ???? * 啟動客戶端測試 ???? * @throws IOException ???? */ ??? public static void main(String[] args) throws IOException { ??????? NIOClient client = new NIOClient(); ??????? client.initClient("localhost",8000); ??????? client.listen(); ??? } } |
2.5.4 server處理流程
ipc.Server的主要功能是接收來自客戶端的RPC請求,經過調用相應的函數獲取結果后,返回給對應的客戶端。為此,ipc.Server被劃分成3個階段:接收請求、處理請求和返回結果。
(1)接收請求
? ? ?該階段主要任務是接收來自各個客戶端的RPC請求,并將它們封裝成固定的格式(Call類)放到一個共享隊列(callQueue)中,以便進行后續處理。該階段內部又分為建立連接和接收請求兩個子階段,分別由Listener和Reader兩種線程完成。
? ? ?整個Server只有一個Listener線程,統一負責監聽來自客戶端的連接請求,一旦有新的請求到達,它會采用輪詢的方式從線程池中選擇一個Reader線程進行處理,而Reader線程可同時存在多個,它們分別負責接收一部分客戶端連接的RPC請求,至于每個Reader線程負責哪些客戶端連接,完全由Listener決定,當前Listener只是采用了簡單的輪詢分配機制。
? ? ?Listener和Reader線程內部各自包含一個Selector對象,分別用于監聽SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。對于Listener線程,主循環的實現體是監聽是否有新的連接請求到達,并采用輪詢策略選擇一個Reader線程處理新連接;對于Reader線程,主循環的實現體是監聽(它負責的那部分)客戶端連接中是否有新的RPC請求到達,并將新的RPC請求封裝成Call對象,放到共享隊列callQueue中。
(2)處理請求
? ? ?該階段主要任務是從共享隊列callQueue中獲取Call對象,執行對應的函數調用,并將結果返回給客戶端,這全部由Handler線程完成。
? ? ?Server端可同時存在多個Handler線程,它們并行從共享隊列中讀取Call對象,經執行對應的函數調用后,將嘗試著直接將結果返回給對應的客戶端。但考慮到某些函數調用返回結果很大或者網絡速度過慢,可能難以將結果一次性發送到客戶端,此時Handler將嘗試著將后續發送任務交給Responder線程。
(3)返回結果
? ? ?前面提到,每個Handler線程執行完函數調用后,會嘗試著將執行結果返回給客戶端,但對于特殊情況,比如函數調用返回結果過大或者網絡異常情況(網速過慢),會將發送任務交給Responder線程。
? ? ?Server端僅存在一個Responder線程,它的內部包含一個Selector對象,用于監聽SelectionKey.OP_WRITE事件。當Handler沒能將結果一次性發送到客戶端時,會向該Selector對象注冊SelectionKey.OP_WRITE事件,進而由Responder線程采用異步方式繼續發送未發送完成的結果。
? ? ?該階段主要任務是接收來自各個客戶端的RPC請求,并將它們封裝成固定的格式(Call類)放到一個共享隊列(callQueue)中,以便進行后續處理。該階段內部又分為建立連接和接收請求兩個子階段,分別由Listener和Reader兩種線程完成。
? ? ?整個Server只有一個Listener線程,統一負責監聽來自客戶端的連接請求,一旦有新的請求到達,它會采用輪詢的方式從線程池中選擇一個Reader線程進行處理,而Reader線程可同時存在多個,它們分別負責接收一部分客戶端連接的RPC請求,至于每個Reader線程負責哪些客戶端連接,完全由Listener決定,當前Listener只是采用了簡單的輪詢分配機制。
? ? ?Listener和Reader線程內部各自包含一個Selector對象,分別用于監聽SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。對于Listener線程,主循環的實現體是監聽是否有新的連接請求到達,并采用輪詢策略選擇一個Reader線程處理新連接;對于Reader線程,主循環的實現體是監聽(它負責的那部分)客戶端連接中是否有新的RPC請求到達,并將新的RPC請求封裝成Call對象,放到共享隊列callQueue中。
(2)處理請求
? ? ?該階段主要任務是從共享隊列callQueue中獲取Call對象,執行對應的函數調用,并將結果返回給客戶端,這全部由Handler線程完成。
? ? ?Server端可同時存在多個Handler線程,它們并行從共享隊列中讀取Call對象,經執行對應的函數調用后,將嘗試著直接將結果返回給對應的客戶端。但考慮到某些函數調用返回結果很大或者網絡速度過慢,可能難以將結果一次性發送到客戶端,此時Handler將嘗試著將后續發送任務交給Responder線程。
(3)返回結果
? ? ?前面提到,每個Handler線程執行完函數調用后,會嘗試著將執行結果返回給客戶端,但對于特殊情況,比如函數調用返回結果過大或者網絡異常情況(網速過慢),會將發送任務交給Responder線程。
? ? ?Server端僅存在一個Responder線程,它的內部包含一個Selector對象,用于監聽SelectionKey.OP_WRITE事件。當Handler沒能將結果一次性發送到客戶端時,會向該Selector對象注冊SelectionKey.OP_WRITE事件,進而由Responder線程采用異步方式繼續發送未發送完成的結果。