Thrift源碼學習二——Server層

Thrift 提供了如圖五種模式:TSimpleServer、TNonblockingServer、THsHaServer、TThreadPoolServer、TThreadSelectorServer

image.png??

TSimpleServer、TThreadPoolServer 屬于阻塞模型

TNonblockingServer、THsHaServer、TThreadedSelectorServer 屬于非阻塞模型

TServer

TServer 為抽象類

public static class Args extends AbstractServerArgs<Args> {public Args(TServerTransport transport) {super(transport);}
}public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {final TServerTransport serverTransport;// 處理層工廠
  TProcessorFactory processorFactory;// 傳輸層工廠TTransportFactory inputTransportFactory = new TTransportFactory();TTransportFactory outputTransportFactory = new TTransportFactory();// 協議層工廠TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
}

TServer 定義的對外方法

/*** The run method fires up the server and gets things going.*/public abstract void serve();
/*** Stop the server. This is optional on a per-implementation basis. Not* all servers are required to be cleanly stoppable.*/public void stop() {}

stop 并不是每個服務都需要優雅的退出,所以沒有定義為抽象方法

抽象方法 serve() 由具體的 TServer 實例實現

image.png

TSimpleServer

TSimpleServer 實現比較簡單,是單線程阻塞模型,只適合測試開發使用

serve 方法源碼分析

public void serve() {// 啟動監聽 socket
  serverTransport.listen();// 設置服務狀態setServing(true);// 不斷的等待與處理 socket 請求while(!stopped) {// accept 一個業務 socket 請求client = serverTransport_.accept();if (client != null) {// 通過工廠獲取 server 定義的處理層、傳輸層和協議層processor = processorFactory_.getProcessor(client);inputTransport = inputTransportFactory_.getTransport(client);outputTransport = outputTransportFactory_.getTransport(client);inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);if (eventHandler_ != null) {connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);}// 阻塞式處理while (true) {// 處理請求事件if (eventHandler_ != null) {eventHandler_.processContext(connectionContext, inputTransport, outputTransport);}// 如果處理層為異步,則退出if(!processor.process(inputProtocol, outputProtocol)) {break;}}}// 關閉
    eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);inputTransport.close();outputTransport.close();setServing(false);}
}

TSimpleServer 工作圖

draw.io

TThreadPoolServer

ThreadPoolServer 解決了 TSimple 不支持并發和多連接的問題,引入了線程池

與 TSimple 相同,主線程負責阻塞式監聽 socket,而剩下的業務處理則全部交由線程池去處理

public void serve() {// 主線程啟動監聽 socket
  serverTransport_.listen();// 設置服務狀態stopped_ = false;setServing(true);// 等待并處理 socket 請求while (!stopped_) {TTransport client = serverTransport_.accept();// Runnable run 邏輯與 TSimpleServer 類似WorkerProcess wp = new WorkerProcess(client);int retryCount = 0;long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);while(true) {// 交由線程池來處理
        executorService_.execute(wp);break;} }executorService_.shutdown();setServing(false);
}

TThreadPoolServer 的缺點:

處理能力的好壞受限于線程池的設置

TNoblockingServer

TNoblockingServer 是單線程工作,但該模式采用了 NIO,所有的 socket 被注冊到 selector 中,通過一個線程循環 selector 來監控所有 socket,當有就緒的 socket 時,根據不同的請求做不同的動作(讀取、寫入數據或 accept 新連接)

TNoblockingServer 的 serve 方法在其父類 AbstractNonblockingServer 中定義

/*** Begin accepting connections and processing invocations.* 開始接受并處理調用*/
public void serve() {// start any IO threads// 注冊一些監聽 socket 的線程到 selector 中if (!startThreads()) {return;}// start listening, or exit// 開始監聽if (!startListening()) {return;}// 設置服務狀態setServing(true);// this will block while we serve// TNonblocking 中實現為 selectAcceptThread_.join(); // 主線程等待 selectAcceptThread 執行完畢// SelectAcceptThread 的 run 方法為 select();// 取出一個就緒的 socket
  waitForShutdown();setServing(false);// do a little cleanup
  stopListening();
}// SelectAcceptThread run 方法
public void run() {while (!stopped_) {select();processInterestChanges();}for (SelectionKey selectionKey : selector.keys()) {cleanupSelectionKey(selectionKey);}
}// SelectAcceptThread Select 過程
private void select() {try {// wait for io events.// NIO 取出一個
    selector.select();// process the io events we receivedIterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();// 遍歷就緒的 socketwhile (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// if the key is marked Accept, then it has to be the server// transport.// accept 新 socket 并將其注冊到 selector 中if (key.isAcceptable()) {handleAccept();} else if (key.isReadable()) {// deal with reads// 處理讀數據的 socket 請求
        handleRead(key);} else if (key.isWritable()) {// deal with writes// 處理寫數據的 socket 請求
        handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}} catch (IOException e) {LOGGER.warn("Got an IOException while selecting!", e);}
}// 接收新的連接
private void handleAccept() throws IOException {SelectionKey clientKey = null;TNonblockingTransport client = null;// accept the connectionclient = (TNonblockingTransport)serverTransport.accept();// 注冊到 selector 中clientKey = client.registerSelector(selector, SelectionKey.OP_READ);// add this key to the mapFrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);clientKey.attach(frameBuffer);
}

TNonblockingServer 模式的缺點:

其還是采用單線程順序來完成,當業務處理比較復雜耗時,該模式的效率將會下降

TNonblockingServer 工作圖:

draw.io

THsHaServer

THsHaServer 是 TNoblockingServer 的子類,處理邏輯基本相同,不同的是,在處理讀取請求時,THsHaServer 將處理過程交由線程池來完成,主線程直接返回進行下一次循環,提高了效率

THsHaServer 模式的缺點:

其主線程需要完成對所有 socket 的監聽一級數據的寫操作,當大請求量時,效率較低

TThreadedSelectorServer

TThreadedSelectorServer 是 Thrift 目前提供的最高級模式,生產環境的首選,其對 TNonblockingServer 進行了擴展

TThreadedSelectorServer 源碼中一些關鍵的屬性

public static class Args extends AbstractNonblockingServerArgs<Args> {// 在已接收的連接中選擇線程的個數public int selectorThreads = 2;// 執行線程池 ExecutorService 的線程個數private int workerThreads = 5;// 執行請求具體任務的線程池private ExecutorService executorService = null;
}
// The thread handling all accepts
private AcceptThread acceptThread;
// Threads handling events on client transports
private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
// This wraps all the functionality of queueing and thread pool management
// for the passing of Invocations from the selector thread(s) to the workers
// (if any).
private final ExecutorService invoker;
/*** 循環模式的負載均衡器,用于為新的連接選擇 SelectorThread*/
protected static class SelectorThreadLoadBalancer {}
  1. AcceptThread 線程對象,用于監聽 socket 的新連接

  2. 多個 SelectorThread 線程對象,用于處理 socket 的讀寫操作

  3. 一個負載均衡對象 SelectorThreadLoadBalancer,用于決定將 AcceptThread 接收到的 socket 請求分配給哪個 SelectorThread 線程

  4. SelectorThread 線程執行過讀寫操作后,通過 ExecutorService 線程池來完成此次調用的具體執行

SelectorThread 對象源碼解析

/*** 多個 SelectorThread 負責處理 socket 的 I/O 操作*/
protected class SelectorThread extends AbstractSelectThread {/*** The work loop. Handles selecting (read/write IO), dispatching, and* managing the selection preferences of all existing connections.* 選擇(處理 socket 的網絡讀寫 IO),分配和管理現有連接*/public void run() {while (!stopped_) {select();}}private void select() {// process the io events we receivedIterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// skip if not validif (!key.isValid()) {cleanupSelectionKey(key);continue;}if (key.isReadable()) {// deal with reads
        handleRead(key);} else if (key.isWritable()) {// deal with writes
        handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}}
}

AcceptThread 對象源碼解析

/*** 在服務器傳輸中選擇線程(監聽 socket 請求)并向 IO 選擇器(SelectorThread)提供新連接*/
protected class AcceptThread extends Thread {// The listen socket to accept onprivate final TNonblockingServerTransport serverTransport;private final Selector acceptSelector;// 負載均衡器,決定將連接分配給哪個 SelectorThreadprivate final SelectorThreadLoadBalancer threadChooser;public void run() {while (!stopped_) {select();}}private void select() {// process the io events we receivedIterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// 處理接收的新情求if (key.isAcceptable()) {handleAccept();} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}}/*** Accept a new connection.*/private void handleAccept() {final TNonblockingTransport client = doAccept();if (client != null) {// 從負載均衡器中,獲取 SelectorThread 線程final SelectorThread targetThread = threadChooser.nextThread();if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {doAddAccept(targetThread, client);} else {// FAIR_ACCEPTinvoker.submit(new Runnable() {public void run() {// 將選擇到的線程和連接放入 線程池 處理// 用 targetThread 線程取處理一個給接受的鏈接 client,如果新連接的隊列處于滿的狀態,則將處于阻塞狀態
            doAddAccept(targetThread, client);}});}}}private TNonblockingTransport doAccept() {return (TNonblockingTransport) serverTransport.accept();}// 用 targetThread 線程取處理一個給接受的鏈接 client,如果新連接的隊列處于滿的狀態,則將處于阻塞狀態private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {if (!thread.addAcceptedConnection(client)) {client.close();}}
}

TThreadedSelectorServer 工作圖

draw.io

參考資料

  • Thrift server端的幾種工作模式分析:http://blog.csdn.net/houjixin/article/details/42779915
  • Thrift 網絡服務模型:http://www.cnblogs.com/mumuxinfei/p/3875165.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/539229.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/539229.shtml
英文地址,請注明出處:http://en.pswp.cn/news/539229.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

linux top 命令可視化_25個Linux性能監控工具

一段時間以來&#xff0c;我們在網上向讀者介紹了如何為Linux以及類Linux操作系統配置多種不同的性能監控工具。在這篇文章中我們將羅列一系列使用最頻繁的性能監控工具&#xff0c;并對介紹到的每一個工具提供了相應的簡介鏈接&#xff0c;大致將其劃分為兩類&#xff0c;基于…

base64是哪個jar包的_漲知識 | 用maven輕松管理jar包

前言相信只要做過 Java 開發的童鞋們&#xff0c;對 Ant 想必都不陌生&#xff0c;我們往往使用 Ant 來構建項目&#xff0c;尤其是涉及到特別繁雜的工作量&#xff0c;一個 build.xml 能夠完成編譯、測試、打包、部署等很多任務&#xff0c;這在很大的程度上解放了程序員們的雙…

Hive數據類型

概述 Hive的內置數據類型可以分為兩大類&#xff1a;(1)、基礎數據類型&#xff1b;(2)、復雜數據類型。 基礎數據類型 數據類型 所占字節 開始支持版本 TINYINT 1byte&#xff0c;-128 ~ 127 SMALLINT 2byte&#xff0c;-32,768 ~ 32,767 INT 4byte,-2,147,483,648 ~ 2,14…

JMS(Java消息服務)與消息隊列ActiveMQ基本使用(一)

最近的項目中用到了mq&#xff0c;之前自己一直在碼農一樣的照葫蘆畫瓢。最近幾天研究了下&#xff0c;把自己所有看下來的文檔和了解總結一下。 一. 認識JMS 1.概述 對于JMS,百度百科&#xff0c;是這樣介紹的&#xff1a;JMS即Java消息服務&#xff08;Java Message Service&…

python單詞反轉_python文本 字符串逐字符反轉以及逐單詞反轉

python文本 字符串逐字符反轉以及逐單詞反轉 場景&#xff1a; 字符串逐字符反轉以及逐單詞反轉 首先來看字符串逐字符反轉&#xff0c;由于python提供了非常有用的切片&#xff0c;所以只需要一句就可以搞定了 >>> aabc edf degd >>> a[::-1] dged fde cba …

hive復合數據類型之struct

概述 STRUCT&#xff1a;STRUCT可以包含不同數據類型的元素。這些元素可以通過”點語法”的方式來得到所需要的元素&#xff0c;比如user是一個STRUCT類型&#xff0c;那么可以通過user.address得到這個用戶的地址。 操作實例 1、創建表 create table student_test(id int,in…

pycharm 運行celery_Celery全面學習筆記

來源介紹Celery 是 Distributed Task Queue&#xff0c;分布式任務隊列。分布式決定了可以有多個 worker 的存在&#xff0c;隊列表示其是異步操作。Celery 核心模塊Celery有一下5個核心角色Task就是任務&#xff0c;有異步任務和定時任務Broker中間人&#xff0c;接收生產者發…

hive復合數據類型之array

概述 ARRAY&#xff1a;ARRAY類型是由一系列相同數據類型的元素組成&#xff0c;這些元素可以通過下標來訪問。比如有一個ARRAY類型的變量fruits&#xff0c;它是由[apple,orange,mango]組成&#xff0c;那么我們可以通過fruits[1]來訪問元素orange&#xff0c;因為ARRAY類型的…

Exploit開發系列教程-Mona 2 SEH

P3nro5e 2015/07/10 10:580x00 Mona 2 前言 & 準備Mona 2是一種非常有用的插件&#xff0c;它由Corelan Team開發。起初是為Immunity Debugger寫的&#xff0c;現在它適用于WinDbg調試器。你將需要為WinDbg x86 和 WinDbg x64安裝一些工具&#xff1a;安裝Python 2.7 (從這…

python集合的元素可以是_Python集合的元素中,為什么不可以是包含嵌套列表的元組?...

你有一個誤解&#xff0c;hash算法針對的是元素的內容&#xff0c;并不是針對指針&#xff0c;所以指針不變不等于可hash。 如果你想深究細節的話&#xff0c;可以看tuple的源碼&#xff1a; static Py_hash_t tuplehash(PyTupleObject *v) { Py_uhash_t x; /* Unsigned for de…

python lib庫_python_lib基礎庫

1&#xff1a;argv傳遞給python腳本的命令行參數列表&#xff0c;argv[0]是腳本的名字(他是平臺獨立的&#xff0c;不管他是一個路徑全名或不是)&#xff0c;如果使用了-c參數選項&#xff0c;argv[0]會被設置為字符串-c&#xff0c;如果沒有腳本名傳遞給python解釋器&#xff…

hive復合數據類型之map

概述 MAP&#xff1a;MAP包含key->value鍵值對&#xff0c;可以通過key來訪問元素。比如”userlist”是一個map類型&#xff0c;其中username是key&#xff0c;password是value&#xff1b;那么我們可以通過userlist[username]來得到這個用戶對應的password&#xff1b; 操…

Beego框架使用

為什么80%的碼農都做不了架構師&#xff1f;>>> Beego Web項目目錄結構 new 命令是新建一個 Web 項目&#xff0c;我們在命令行下執行 bee new <項目名> 就可以創建一個新的項目。但是注意該命令必須在 $GOPATH/src 下執行。最后會在 $GOPATH/src 相應目錄下…

oracle下lag和lead分析函數

Lag和Lead分析函數可以在同一次查詢中取出同一字段的前N行的數據(Lag)和后N行的數據(Lead)作為獨立的列。 這種操作可以代替表的自聯接&#xff0c;并且LAG和LEAD有更高的效率。 語法&#xff1a; [sql] view plaincopy /*語法*/ lag(exp_str,offset,defval) over() Lead(…

802d簡明調試手冊_SINUMERIK-828D簡明調試手冊.pdf

SINUMERIK 828D / 828D BASIC簡明調試手冊SINUMERIKAnswers for industry. SIEMENSABC01.2012 ASINUMERIK 828D / 828D BASIC V04.04SP01123PLC 45NC 67PLC 891011121314151617PLC 18i1 11.1 11.1.1 NC 31.1.2 31.2

jtessboxeditorfx 界面顯示不出來_macOS 使用 XQuartz 支持 X11 實現 Linux 圖形化界面顯示...

更多奇技淫巧歡迎訂閱博客&#xff1a;https://fuckcloudnative.io前言在 Windows 中相信大家已經很熟悉使用 Xmanager(Xshell), MobaXterm, SecureCRT 通過 X11 實現 Linux 圖形化界面顯示&#xff0c;我的需求是在 macOS 下使用 iTerm2 作為 Terminal 實現 X11 圖形化界面顯示…

EntityFramework Core 2.0 Explicitly Compiled Query(顯式編譯查詢)

前言 EntityFramework Core 2.0引入了顯式編譯查詢&#xff0c;在查詢數據時預先編譯好LINQ查詢便于在請求數據時能夠立即響應。顯式編譯查詢提供了高可用場景&#xff0c;通過使用顯式編譯的查詢可以提高查詢性能。EF Core已經使用查詢表達式的散列來表示自動編譯和緩存查詢&a…

Oracle Minus關鍵字 不包含 取差集

Oracle Minus關鍵字   SQL中的MINUS關鍵字   SQL中有一個MINUS關鍵字&#xff0c;它運用在兩個SQL語句上&#xff0c;它先找出第一條SQL語句所產生的結果&#xff0c;然后看這些結果有沒有在第二個SQL語句的結果 中。如果有的話&#xff0c;那這一筆記錄就被去除&#xff0…

python掃描器甄別操作系統類型_20189317 《網絡攻防技術》 第三周作業

一.教材內容總結1.網絡踩點&#xff1a;web搜索與挖掘、DNS和IP查詢、網絡拓撲偵察(1)網絡踩點目標確定(2)技術手段&#xff1a;web信息搜索與挖掘、DNS和IP查詢、網絡拓撲偵察(3)web信息搜索與挖掘&#xff1a;基本搜索與挖掘技巧、高級搜索與挖掘技巧、編程實現google搜索、元…

python 網頁重定向_小試牛刀:python爬蟲爬取springer開放電子書.

首先聲明,本文旨在記錄反思,并沒有資源,代碼也不具有借鑒意義(水平實在不行.某天,水群的時候發現群友發了一個文件,里面是疫情時期springer開放的免費電子書名單,同時還附有下載鏈接,總共有400多本,這要是一個一個下載不得累死個人,只下載自己感興趣的書也是一個好主意,但是,我…