Thrift 提供了如圖五種模式:TSimpleServer、TNonblockingServer、THsHaServer、TThreadPoolServer、TThreadSelectorServer
??
TSimpleServer、TThreadPoolServer 屬于阻塞模型
TNonblockingServer、THsHaServer、TThreadedSelectorServer 屬于非阻塞模型
TServer
TServer 為抽象類
public static class Args extends AbstractServerArgs<Args> {public Args(TServerTransport transport) {super(transport);} }public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {final TServerTransport serverTransport;// 處理層工廠 TProcessorFactory processorFactory;// 傳輸層工廠TTransportFactory inputTransportFactory = new TTransportFactory();TTransportFactory outputTransportFactory = new TTransportFactory();// 協議層工廠TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory(); }
TServer 定義的對外方法
/*** The run method fires up the server and gets things going.*/public abstract void serve(); /*** Stop the server. This is optional on a per-implementation basis. Not* all servers are required to be cleanly stoppable.*/public void stop() {}
stop 并不是每個服務都需要優雅的退出,所以沒有定義為抽象方法
抽象方法 serve() 由具體的 TServer 實例實現
TSimpleServer
TSimpleServer 實現比較簡單,是單線程阻塞模型,只適合測試開發使用
serve 方法源碼分析
public void serve() {// 啟動監聽 socket serverTransport.listen();// 設置服務狀態setServing(true);// 不斷的等待與處理 socket 請求while(!stopped) {// accept 一個業務 socket 請求client = serverTransport_.accept();if (client != null) {// 通過工廠獲取 server 定義的處理層、傳輸層和協議層processor = processorFactory_.getProcessor(client);inputTransport = inputTransportFactory_.getTransport(client);outputTransport = outputTransportFactory_.getTransport(client);inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);if (eventHandler_ != null) {connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);}// 阻塞式處理while (true) {// 處理請求事件if (eventHandler_ != null) {eventHandler_.processContext(connectionContext, inputTransport, outputTransport);}// 如果處理層為異步,則退出if(!processor.process(inputProtocol, outputProtocol)) {break;}}}// 關閉 eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);inputTransport.close();outputTransport.close();setServing(false);} }
TSimpleServer 工作圖
TThreadPoolServer
ThreadPoolServer 解決了 TSimple 不支持并發和多連接的問題,引入了線程池
與 TSimple 相同,主線程負責阻塞式監聽 socket,而剩下的業務處理則全部交由線程池去處理
public void serve() {// 主線程啟動監聽 socket serverTransport_.listen();// 設置服務狀態stopped_ = false;setServing(true);// 等待并處理 socket 請求while (!stopped_) {TTransport client = serverTransport_.accept();// Runnable run 邏輯與 TSimpleServer 類似WorkerProcess wp = new WorkerProcess(client);int retryCount = 0;long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);while(true) {// 交由線程池來處理 executorService_.execute(wp);break;} }executorService_.shutdown();setServing(false); }
TThreadPoolServer 的缺點:
處理能力的好壞受限于線程池的設置
TNoblockingServer
TNoblockingServer 是單線程工作,但該模式采用了 NIO,所有的 socket 被注冊到 selector 中,通過一個線程循環 selector 來監控所有 socket,當有就緒的 socket 時,根據不同的請求做不同的動作(讀取、寫入數據或 accept 新連接)
TNoblockingServer 的 serve 方法在其父類 AbstractNonblockingServer 中定義
/*** Begin accepting connections and processing invocations.* 開始接受并處理調用*/ public void serve() {// start any IO threads// 注冊一些監聽 socket 的線程到 selector 中if (!startThreads()) {return;}// start listening, or exit// 開始監聽if (!startListening()) {return;}// 設置服務狀態setServing(true);// this will block while we serve// TNonblocking 中實現為 selectAcceptThread_.join(); // 主線程等待 selectAcceptThread 執行完畢// SelectAcceptThread 的 run 方法為 select();// 取出一個就緒的 socket waitForShutdown();setServing(false);// do a little cleanup stopListening(); }// SelectAcceptThread run 方法 public void run() {while (!stopped_) {select();processInterestChanges();}for (SelectionKey selectionKey : selector.keys()) {cleanupSelectionKey(selectionKey);} }// SelectAcceptThread Select 過程 private void select() {try {// wait for io events.// NIO 取出一個 selector.select();// process the io events we receivedIterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();// 遍歷就緒的 socketwhile (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// if the key is marked Accept, then it has to be the server// transport.// accept 新 socket 并將其注冊到 selector 中if (key.isAcceptable()) {handleAccept();} else if (key.isReadable()) {// deal with reads// 處理讀數據的 socket 請求 handleRead(key);} else if (key.isWritable()) {// deal with writes// 處理寫數據的 socket 請求 handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}} catch (IOException e) {LOGGER.warn("Got an IOException while selecting!", e);} }// 接收新的連接 private void handleAccept() throws IOException {SelectionKey clientKey = null;TNonblockingTransport client = null;// accept the connectionclient = (TNonblockingTransport)serverTransport.accept();// 注冊到 selector 中clientKey = client.registerSelector(selector, SelectionKey.OP_READ);// add this key to the mapFrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);clientKey.attach(frameBuffer); }
TNonblockingServer 模式的缺點:
其還是采用單線程順序來完成,當業務處理比較復雜耗時,該模式的效率將會下降
TNonblockingServer 工作圖:
THsHaServer
THsHaServer 是 TNoblockingServer 的子類,處理邏輯基本相同,不同的是,在處理讀取請求時,THsHaServer 將處理過程交由線程池來完成,主線程直接返回進行下一次循環,提高了效率
THsHaServer 模式的缺點:
其主線程需要完成對所有 socket 的監聽一級數據的寫操作,當大請求量時,效率較低
TThreadedSelectorServer
TThreadedSelectorServer 是 Thrift 目前提供的最高級模式,生產環境的首選,其對 TNonblockingServer 進行了擴展
TThreadedSelectorServer 源碼中一些關鍵的屬性
public static class Args extends AbstractNonblockingServerArgs<Args> {// 在已接收的連接中選擇線程的個數public int selectorThreads = 2;// 執行線程池 ExecutorService 的線程個數private int workerThreads = 5;// 執行請求具體任務的線程池private ExecutorService executorService = null; } // The thread handling all accepts private AcceptThread acceptThread; // Threads handling events on client transports private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>(); // This wraps all the functionality of queueing and thread pool management // for the passing of Invocations from the selector thread(s) to the workers // (if any). private final ExecutorService invoker; /*** 循環模式的負載均衡器,用于為新的連接選擇 SelectorThread*/ protected static class SelectorThreadLoadBalancer {}
-
AcceptThread 線程對象,用于監聽 socket 的新連接
-
多個 SelectorThread 線程對象,用于處理 socket 的讀寫操作
-
一個負載均衡對象 SelectorThreadLoadBalancer,用于決定將 AcceptThread 接收到的 socket 請求分配給哪個 SelectorThread 線程
-
SelectorThread 線程執行過讀寫操作后,通過 ExecutorService 線程池來完成此次調用的具體執行
SelectorThread 對象源碼解析
/*** 多個 SelectorThread 負責處理 socket 的 I/O 操作*/ protected class SelectorThread extends AbstractSelectThread {/*** The work loop. Handles selecting (read/write IO), dispatching, and* managing the selection preferences of all existing connections.* 選擇(處理 socket 的網絡讀寫 IO),分配和管理現有連接*/public void run() {while (!stopped_) {select();}}private void select() {// process the io events we receivedIterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// skip if not validif (!key.isValid()) {cleanupSelectionKey(key);continue;}if (key.isReadable()) {// deal with reads handleRead(key);} else if (key.isWritable()) {// deal with writes handleWrite(key);} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}} }
AcceptThread 對象源碼解析
/*** 在服務器傳輸中選擇線程(監聽 socket 請求)并向 IO 選擇器(SelectorThread)提供新連接*/ protected class AcceptThread extends Thread {// The listen socket to accept onprivate final TNonblockingServerTransport serverTransport;private final Selector acceptSelector;// 負載均衡器,決定將連接分配給哪個 SelectorThreadprivate final SelectorThreadLoadBalancer threadChooser;public void run() {while (!stopped_) {select();}}private void select() {// process the io events we receivedIterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();while (!stopped_ && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();// 處理接收的新情求if (key.isAcceptable()) {handleAccept();} else {LOGGER.warn("Unexpected state in select! " + key.interestOps());}}}/*** Accept a new connection.*/private void handleAccept() {final TNonblockingTransport client = doAccept();if (client != null) {// 從負載均衡器中,獲取 SelectorThread 線程final SelectorThread targetThread = threadChooser.nextThread();if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {doAddAccept(targetThread, client);} else {// FAIR_ACCEPTinvoker.submit(new Runnable() {public void run() {// 將選擇到的線程和連接放入 線程池 處理// 用 targetThread 線程取處理一個給接受的鏈接 client,如果新連接的隊列處于滿的狀態,則將處于阻塞狀態 doAddAccept(targetThread, client);}});}}}private TNonblockingTransport doAccept() {return (TNonblockingTransport) serverTransport.accept();}// 用 targetThread 線程取處理一個給接受的鏈接 client,如果新連接的隊列處于滿的狀態,則將處于阻塞狀態private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {if (!thread.addAcceptedConnection(client)) {client.close();}} }
TThreadedSelectorServer 工作圖
參考資料
- Thrift server端的幾種工作模式分析:http://blog.csdn.net/houjixin/article/details/42779915
- Thrift 網絡服務模型:http://www.cnblogs.com/mumuxinfei/p/3875165.html