Hadoop源碼分析7: IPC流程(1) 主要類

1.服務器端主要類

public abstractclass?Server

{

??public static final ByteBuffer HEADER =ByteBuffer.wrap("hrpc".getBytes());

??public static final byte CURRENT_VERSION =4;

??private static finalThreadLocalServer?SERVER= new ThreadLocalServer();??

??private static finalThreadLocal<CallCurCall = newThreadLocal<Call>();

??private String bindAddress; ??

??private int port; ?? ? ?? ??

??private int handlerCount; ?? ?

??private int readThreads; ?? ?

??private Class<? extendsWritable> paramClass; ???

??private int maxIdleTime; ??

??private int thresholdIdleConnections;? ? ?

??private Configuration conf;

??private int maxQueueSize;

??private final int maxRespSize;

??private int socketSendBufferSize;

??volatile private boolean running =true;

??privateBlockingQueue<CallcallQueue;

??privateList<ConnectionconnectionList = Collections.synchronizedList(newLinkedList<Connection>());

??private?Listener?listener= null;

??private?Responder?responder= null;

??private int numConnections = 0;

??private?Handler[]handlers = null;

?

??//內部類Server.Call,包裝請求參數

??private static class?Call?{

?? ? private int id;? ? ?? ? ?? ? ?? ? ?? ? ? // theclient's call id ? ?

?? ? private Writable param;? ? ?? ? ?? ? ?? ? // the parameter passed? ?

?? ? private?Connection?connection;? ?

?? ? private ByteBuffer response;? ? ?

? }? ?

?

//內部類Server.Listener?,線程??

??private class?Listener?extendsThread {

?? ??privateServerSocketChannel acceptChannel =null; //the accept channel

?? ??privateSelector selector = null; //theselector that we use for the server

?? ? private?Reader[]readers = null;

?? ? private int currentReader =0;

?? ? private InetSocketAddressaddress; //the address we bind at

?? ? private Random rand = newRandom();

?? ? private longlastCleanupRunTime = 0;?

?? ? private ExecutorServicereadPool; ? ?

?

??//內部類Server.Listener.Reader?,?線程????

?? ?privateclass?Reader?implementsRunnable {

?? ? ? privatevolatile boolean adding = false;

?? ? ? privateSelector readSelector =null;

?? }??

?

???//內部類Server.Responder??,?線程??

?? ?privateclass?Responder?extendsThread {

?? ? ?privateSelector writeSelector;

?? ? ?private intpending; ? ??

?? }?

?

//內部類Server.Connection,而Client.Connection是線程?

?? ?publicclass?Connection?{

?? ? ?privateboolean rpcHeaderRead = false; // if initial rpc header isread

?? ? ?privateboolean headerRead = false;??

?? ? ?privateSocketChannel channel;

?? ? ?privateByteBuffer data;

?? ? ?privateByteBuffer dataLengthBuffer;

?? ? ?privateLinkedList<CallresponseQueue;

?? ? ?privatevolatile int rpcCount = 0; // number of outstanding rpcs

?? ? ?privatelong lastContact;

?? ? ?private intdataLength;

?? ? ?privateSocket socket;

?? ? ?privateString hostAddress;

?? ? ?private intremotePort;

?? ? ?privateInetAddress addr;

?? ??ConnectionHeader header = new ConnectionHeader();

?? ??Class<?> protocol;

?? ? ?privateAuthMethod authMethod;?

?? }??

?

??//內部類Server.Handler,線程?

??private class?Handler?extendsThread {

?}

?

}

?

2.客戶端主要類

public classClient {

  privateHashtable<ConnectionId, Connectionconnections = new Hashtable<ConnectionId,Connection>();?

?  privateClass? extendsWritable?valueClass;??

?  private intcounter; ? ? ?? ? ?? ? ?? ? ?? ?// counter for call ids

?  privateAtomicBoolean running = new AtomicBoolean(true); // if clientruns

?  finalprivate Configuration conf;

?  privateSocketFactory socketFactory; ? ?? ? ? // how tocreate sockets

?

?  private intrefCount = 1;


  //內部類Client.Call

?? private class Call {

?? ? int id; ?? ? ?? ? ?? ? ?? ? ?? ? ?? ? ? // callid

?? ? Writable param;? ? ?? ? ?? ? ?? ? ?? ? ? //parameter

?? ? Writable value;? ? ?? ? ?? ? ?? ? ?? ? ? // value,null if error

?? ? IOException error;? ? ?? ? ?? ? ?? ? ?? ?// exception, null ifvalue

?? ? boolean done;?

??}

???//內部類Client.Connection?,線程?,而Server.Connection不是線程?

?? private class Connection extendsThread {

????? ??privateInetSocketAddress server; ? ?? ? ?? // server ip:port

????? ??privateString serverPrincipal; ?// server's krb5principal name

????? ??privateConnectionHeader header; ? ?? ? ?? ?// connection header

????? ??privatefinal ConnectionId remoteId; ?? ? ?? ? ??// connection id

????? ??privateAuthMethod authMethod; // authentication method

?? ? ?? private Socket socket = null; ?? ? ?? ? ?? // connected socket

??? ????privateDataInputStream in;

????? ??privateDataOutputStream out;

????? ??private intrpcTimeout;

????? ??private intmaxIdleTime; ?

????? ??private intmaxRetries; //the max. no. of retries for socket connections

????? ??privateboolean tcpNoDelay; // if T then disable Nagle's Algorithm

????? ??private intpingInterval; /?

??? ????privateHashtable<Integer, Callcalls= new Hashtable<Integer, Call>();

????? ??privateAtomicLong lastActivity = new AtomicLong();?

????? ??privateAtomicBoolean shouldCloseConnection = new AtomicBoolean();??

?? ? ?? private IOException closeException; // closereason ?


   ?? ?//內部類Client.Connection.PingInputStream

?? ? ?? private class?PingInputStream?extendsFilterInputStream {

   ?? }


?? }

???//內部類Client.ParallelCall?

  ?privateclass ParallelCall extends Call {

?? ? ?privateParallelResults results;

?? ? ?private intindex;

?? } ??

???//內部類Client.ParallelResults?

?? ?private static classParallelResults {

?? ? ? privateWritable[] values;

?? ? ? privateint size;

?? ? ? privateint count;

?? ? //

?? ?}

???

??//內部類Client.ConnectionId?

?? static class ConnectionId {

?? ???InetSocketAddress address;

??????UserGroupInformationticket;

??????Class<?>protocol;

??????privatestatic final int PRIME = 16777619;

??????privateint rpcTimeout;

??????privateString serverPrincipal;

??????privateint maxIdleTime; ?

??????privateint maxRetries; //the max. no. of retries for socketconnections

??????privateboolean tcpNoDelay; // if T then disable Nagle's Algorithm

??????privateint pingInterval; // how often sends ping to the server inmsecs

?? ?}

?? ?

}

3.RPC主要類

public class RPC {

??private static ClientCache CLIENTS=newClientCache();


??//內部類RPC.ClientCache?

??static privateclass?ClientCache?{

?? ? private MapSocketFactory,?Client?clients= ?new HashMapSocketFactory, Client();

? }


?//內部類RPC.Invocation ,只是一個包裝請求參數的普通類,不執行動態代理方法

?? private static class Invocationimplements Writable, Configurable {

?? ? ?privateString methodName;

?? ? ?privateClass[] parameterClasses;

?? ? ?privateObject[] parameters;

?

?? ? ?privateConfiguration conf;

? }


??//內部類RPC.Invoker?,執行動態代理方法

?? private static class Invoker implementsInvocationHandler {

?? ? ? privateClient.ConnectionId remoteId;

?? ? ? privateClient client;

?? ? ? privateboolean isClosed = false;

?? }

????//內部類RPC.VersionMismatch?

?? ?public static classVersionMismatch extends IOException {

?? ? ? privateString interfaceName;

?? ? ? privatelong clientVersion;

?? ? ? privatelong serverVersion;

?? } ?


???//內部類RPC.Server?,添加了兩個成員??instance,verbose

??public static class Server extendsorg.apache.hadoop.ipc.Server {

?? ? ?? private Object instance;

?? ? ?? private boolean verbose;

?? }



}

4.其他類

?

//IPC所有類都要實現的接口

publicinterface?VersionedProtocol?{

? publiclong getProtocolVersion(String protocol, ?longclientVersion) throws IOException;

}

?

?

//連接頭信息,包括protocol,userGroupInformation?, ?authMethod三個成員變量

class?ConnectionHeader?implementsWritable ?{?

?? private String protocol;

?? private UserGroupInformation ugi =null;

?? private AuthMethod authMethod; ......

}?

?

//訪問狀況,包括SUCCESS、ERROR、FATAL

enum?Status{

? SUCCESS(0),

? ERROR(1),

? FATAL(-1);....... ?

}

?

//包裝IO異常

publicclass?RemoteException?extendsIOException {

}

?

?

轉載于:https://www.cnblogs.com/leeeee/p/7276533.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/375842.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/375842.shtml
英文地址,請注明出處:http://en.pswp.cn/news/375842.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

html5 服務器手機編程,html5實現服務器發送事件

頁面DocumentStatus:Server Datajs代碼創建一個新的 EventSource 對象,然后規定發送更新的頁面的 URL(本例中是 "demo_sse.php")每接收到一次更新,就會發生 onmessage 事件當 onmessage 事件發生時,把已接收的數據推入 id 為 "…

【動態規劃】【多重背包】[HDU 1291]悼念512汶川大地震遇難同胞――珍惜現在,感恩生活...

這道題目是一個多重背包的題目,多重背包實際上就是把整個物品的件數拆分成a0?20a1?21a2?22...an?2n且a0或1這樣每一次最優解實際上就是在之前的基礎上進行的最優解的累加,但是發現如果物品數量不是恰好是某幾個數之和,那么就會出現有幾個…

輸出字符串的比特串

48是0的ASCII碼&#xff0c;49是1的ASCII碼&#xff0c;char型占一個字節&#xff0c;四個比特。 #include<iostream> #include<string.h> using namespace std; string Str2Bin (char* str){int change,k0,mask8;char bit;char stack[100]{0};for (short i 0; i…

法律專業計算機基礎試卷答案,大學計算機基礎試題及答案

以下是小編整理的關于大學計算機基礎試題及答案&#xff0c;希望對你有幫助。一、單選題1、完整的計算機系統由(C)組成。A、運算器、控制器、存儲器、輸入設備和輸出設備B、主機和外部設備C、硬件系統和軟件系統D、主機箱、顯示器、鍵盤、鼠標、打印機2、以下軟件中&#xff0c…

憑據不工作

最悲催的事情是什么&#xff1f;那就是你可以遠程別人的電腦&#xff0c;但是別人不能遠程自己的電腦&#xff01; 背景&#xff1a; 換了個win8.1的系統&#xff0c;剛開始可以遠程上,過了幾天,電腦突然不能遠程了,讓我很是郁悶呀. 于是在網上查了好多資料,看看他到底是什么原…

【最后的沖刺】android中excel表的導入和數據處理

【最后的沖刺】android中excel表的導入和數據處理 ——學校課程的查詢和修改 1.編寫 The Class類把課程表courses.db當做一個實體類&#xff0c;hashcode和equals這兩個類是為了判斷輸入的查詢內容和Excel表中的內容是否一致。 并在java里面區別兩個對象是否一致 1 public clas…

詳解C++函數模板

函數模板屬于類屬&#xff0c;能夠處理不同的數據類型&#xff0c;當編譯器遇到函數調用是&#xff0c;將根據實際參數的類型產生特定的代碼&#xff0c;函數模板的定義形式是&#xff1a; template <類型參數表> 返回值類型 函數名&#xff08;形式參數表&#xff09;{…

計算機專業女兵,陳豪2010《點解阿Sir》劇照

0陳豪2010《點解阿Sir》劇照2012-07-21 08:24{"info": {"setname": "陳豪2010《點解阿Sir》劇照","imgsum_bk": 20,"imgsum": 20,"lmodify": "2012-07-21 08:24:00","prevue": " "…

ASP.NET MVC學習之Ajax(完結)

一.前言 通過上面的一番學習&#xff0c;大家一定收獲不少。但是總歸會有一個結束的時候&#xff0c;但是這個結束也意味著新的開始。 如果你是從事ASP.NET開發&#xff0c;并且也使用了第三方控件&#xff0c;那么一定會覺得ASP.NET開發ajax十分的簡單&#xff0c;而ASP.NET M…

認知計算機語言學,什么是認知語言學

文獻綜述&#xff1a;“語文素養”內涵研究綜述“語文素養”內涵研究綜述摘要&#xff1a;“語文素養”是新一輪語文課程改革所提出的一個重要概念&#xff0c;其作為語文課程改革的目標與核心理念&#xff0c;擠兌了“語文能力”的核心地位。目前&#xff0c;人們對“語文素養…

data URI scheme及其應用

data URI scheme通俗來講就是圖片直接塞到HTML而不是由HTTP。這樣從表面上看會降低一次HTTP的請求&#xff0c;實現了對于網頁的優化&#xff08;只是看了其它一些文章data URI由于將圖片採用了base 64的編碼方式進行表達&#xff0c;所以還是須要進行HTTP去下載內容&#xff0…

Linux 禁用觸摸板

1&#xff0c;首先需要查看觸摸板&#xff1a; 命令&#xff1a;xinput list 結果&#xff1a; ? Virtual core pointer         id2 [master pointer (3)]    ? ? Virtual core XTEST pointer      id4 […

大學新生學計算機推薦電腦,大學新生用什么電腦好呢?

科技的發展日新月異&#xff0c;數碼的yi巴為你資訊。今天是7月的開頭&#xff0c;我們正式邁入了2019下半年。7月開頭也正是許多大多數高考生快忙完志愿填報&#xff0c;開始考慮大學該選擇什么電腦的時候。今天yi巴就來跟大家聊聊該大學新生該怎么選擇電腦&#xff0c;并給予…

NewCode----句子反轉

題目描述 給定一個句子&#xff08;只包含字母和空格&#xff09;&#xff0c; 將句子中的單詞位置反轉&#xff0c;單詞用空格分割, 單詞之間只有一個空格&#xff0c;前后沒有空格。 比如&#xff1a; &#xff08;1&#xff09; “hello xiao mi”-> “mi xiao hello” …

mac boot2docker certs not valid with 1.7

摘自&#xff1a;https://github.com/boot2docker/boot2docker/issues/824 An error occurred trying to connect: Get https://192.168.59.103:2376/v1.19/containers/json: x509: certificate is valid for 127.0.0.1, 10.0.2.15, not 192.168.59.103 I come with the same p…

對象之間的交互

之前寫過一篇隨筆《剪刀剪紙》是給一些新同事講面向對象時用的&#xff0c;當時就感覺有些不順暢&#xff0c;不過用來給新同事入門足夠了就沒多想&#xff0c;最近看書時偶爾走神把這件事想起來了&#xff0c;順便群里討論時談到聚合之間的方法調用&#xff0c;于是決定寫一篇…

NewCode----數串

題目描述&#xff1a; 設有n個正整數&#xff0c;將他們連接成一排&#xff0c;組成一個最大的多位整數。 如:n3時&#xff0c;3個整數13,312,343,連成的最大整數為34331213。 如:n4時,4個整數7,13,4,246連接成的最大整數為7424613。 輸入描述: 有多組測試樣例&#xff0c…

計算機跨專業專插本學音樂,歡迎投稿丨專插本可以跨專業考,只要肯堅持!

點擊上方△藍字可關注我們昵稱E師姐性別女插本復習資料教材、小紅書、CB398、啟航等插本關注的公眾號、網站等介紹微信公眾號居多&#xff1a;專插本資料庫、專插本直通車、廣東省專插本、插本最前線等等……專科學校和專業廣州城市職業學院 會計插本學校和專業廣東財經大學華商…

Android,監控ContentProvider的數據改變

有時候應用中需要監聽ContentProvider的改變并提供響應&#xff0c;這時候就要利用ContentObserver類了 不管是ContentProvider中實現的,insert,delete,update方法中的任何一個&#xff0c;程序都會調用getContext().getContentResolver().notifyChange(uri,null); 這行代碼可用…

[leetcode]Sort List

題目要求&#xff1a;Sort a linked list in O(n log n) time using constant space complexity. 數據結構定義&#xff1a; 1 /** 2 * Definition for singly-linked list. 3 * struct ListNode { 4 * int val; 5 * ListNode *next; 6 * ListNode(int x) : v…