基于Dubbo 3.1,詳細介紹了Dubbo Consumer接收服務調用響應
此前我們學習了Dubbo Provider處理服務調用請求的流程,現在我們來學習Dubbo Consumer接收服務調用響應流程。
實際上接收請求和接收響應同屬于接收消息,它們的流程的很多步驟是一樣的。下面我們僅分析關鍵步驟。
Dubbo 3.x服務調用源碼:
- Dubbo 3.x源碼(29)—Dubbo Consumer服務調用源碼(1)服務調用入口
- Dubbo 3.x源碼(30)—Dubbo Consumer服務調用源碼(2)發起遠程調用
- Dubbo 3.x源碼(31)—Dubbo消息的編碼解碼
- Dubbo 3.x源碼(32)—Dubbo Provider處理服務調用請求源碼
- Dubbo 3.x源碼(33)—Dubbo Consumer接收服務調用響應
Dubbo 3.x服務引用源碼:
- Dubbo 3.x源碼(11)—Dubbo服務的發布與引用的入口
- Dubbo 3.x源碼(18)—Dubbo服務引用源碼(1)
- Dubbo 3.x源碼(19)—Dubbo服務引用源碼(2)
- Dubbo 3.x源碼(20)—Dubbo服務引用源碼(3)
- Dubbo 3.x源碼(21)—Dubbo服務引用源碼(4)
- Dubbo 3.x源碼(22)—Dubbo服務引用源碼(5)服務引用bean的獲取以及懶加載原理
- Dubbo 3.x源碼(23)—Dubbo服務引用源碼(6)MigrationRuleListener遷移規則監聽器
- Dubbo 3.x源碼(24)—Dubbo服務引用源碼(7)接口級服務發現訂閱refreshInterfaceInvoker
- Dubbo 3.x源碼(25)—Dubbo服務引用源碼(8)notify訂閱服務通知更新
- Dubbo 3.x源碼(26)—Dubbo服務引用源碼(9)應用級服務發現訂閱refreshServiceDiscoveryInvoker
- Dubbo 3.x源碼(27)—Dubbo服務引用源碼(10)subscribeURLs訂閱應用級服務url
Dubbo 3.x服務發布源碼:
- Dubbo 3.x源碼(11)—Dubbo服務的發布與引用的入口
- Dubbo 3.x源碼(12)—Dubbo服務發布導出源碼(1)
- Dubbo 3.x源碼(13)—Dubbo服務發布導出源碼(2)
- Dubbo 3.x源碼(14)—Dubbo服務發布導出源碼(3)
- Dubbo 3.x源碼(15)—Dubbo服務發布導出源碼(4)
- Dubbo 3.x源碼(16)—Dubbo服務發布導出源碼(5)
- Dubbo 3.x源碼(17)—Dubbo服務發布導出源碼(6)
- Dubbo 3.x源碼(28)—Dubbo服務發布導出源碼(7)應用級服務接口元數據發布
文章目錄
- AllChannelHandler#received分發任務
- getPreferredExecutorService獲取首選線程池
- ThreadlessExecutor#execute執行
- HeaderExchangeHandler#received處理消息
- HeaderExchangeHandler#handleResponse處理響應
- DefaultFuture#received處理響應
- DefaultFuture#doReceived處理響應
AllChannelHandler#received分發任務
將當前消息包裝為一個ChannelEventRunnable分發給對應的線程池執行,這里的線程池就是dubbo業務線程池,到此IO線程的任務結束。
這種方式實現了線程資源的隔離,釋放了IO線程,可以快速處理更多的IO操作,提升了系統吞吐量。
/*** AllChannelHandler的方法* <p>* 處理普通rpc請求請求** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//獲取對應的線程池,可能是ThreadlessExecutorExecutorService executor = getPreferredExecutorService(message);try {//創建一個線程任務ChannelEventRunnable,通過線程池執行//這里的handler是DecodeHandlerexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if (message instanceof Request && t instanceof RejectedExecutionException) {sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}
getPreferredExecutorService獲取首選線程池
該方法獲取處理業務的首選線程池,目前,這種方法主要是為了方便消費者端的線程模型而定制的。這是Dubo2.7.5之后的線程模型的優化:
- 對于響應消息,那么從DefaultFuture的靜態字段緩存映射FUTURES中獲取請求id對應的DefaultFuture。請求的id就是對應的響應的id。
- 然后獲取執DefaultFuture的執行器,對于默認的同步請求,那自然是ThreadlessExecutor,這里面阻塞著發起同步調用的線程,將回調直接委派給發起調用的線程。對于異步請求,則獲取異步請求的線程池。
- 因此后續的處理包括請求體的解碼都是由發起調用的線程來執行,這樣減輕了業務線程池的壓力。后面我們將消費者接受響應的時候,會講解對應的源碼。
- 對于請求消息,則使用共享executor執行后續邏輯。對于共享線程池,默認為FixedThreadPool,固定200線程,阻塞隊列長度為0,拒絕策略為打印異常日志并且拋出異常。
對于同步阻塞請求的響應,這是默認的請求方式,將會獲取ThreadlessExecutor執行器,對于異步請求的響應,將會獲取一個多線程的線程池。
/*** WrappedChannelHandler的方法* <p>* 目前,這種方法主要是為了方便消費者端的線程模型而定制的。* 1. 使用ThreadlessExecutor,又名,將回調直接委派給發起調用的線程。* 2. 使用共享executor執行回調。** @param msg 消息* @return 執行器*/
public ExecutorService getPreferredExecutorService(Object msg) {//如果是響應消息if (msg instanceof Response) {Response response = (Response) msg;//從DefaultFuture的靜態字段緩存映射FUTURES中獲取請求id對應的DefaultFutureDefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());// a typical scenario is the response returned after timeout, the timeout response may have completed the future//一個典型的場景是響應超時后返回,超時后的響應可能已經完成if (responseFuture == null) {//獲取當前服務器或客戶端的共享執行器return getSharedExecutorService();} else {//獲取執future的執行器,對于默認的同步請i去,那自然是ThreadlessExecutor,這里面阻塞著發起同步調用的線程ExecutorService executor = responseFuture.getExecutor();if (executor == null || executor.isShutdown()) {//獲取當前服務器或客戶端的共享執行器executor = getSharedExecutorService();}return executor;}} else {//獲取當前服務器或客戶端的共享執行器return getSharedExecutorService();}
}/*** get the shared executor for current Server or Client** @return*/
public ExecutorService getSharedExecutorService() {// Application may be destroyed before channel disconnected, avoid create new application model// see https://github.com/apache/dubbo/issues/9127//在斷開通道之前,應用程序可能被銷毀,避免創建新的應用程序模型if (url.getApplicationModel() == null || url.getApplicationModel().isDestroyed()) {return GlobalResourcesRepository.getGlobalExecutorService();}// note: url.getOrDefaultApplicationModel() may create new application modelApplicationModel applicationModel = url.getOrDefaultApplicationModel();ExecutorRepository executorRepository =applicationModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();//從執行器倉庫中根據url獲取對應的執行器,默認INTERNAL_SERVICE_EXECUTOR對應的執行器ExecutorService executor = executorRepository.getExecutor(url);if (executor == null) {executor = executorRepository.createExecutorIfAbsent(url);}return executor;
}
ThreadlessExecutor#execute執行
還記得我們此前學習Dubbo Consumer發起請求的流程嗎?對于同步請求,在發送請求之后,在AbstractInvoker#waitForResultIfSync方法中將會執行異步轉同步等待,具體的等待方法就是ThreadlessExecutor#waitAndDrain方法。
waitAndDrain方法中,請求調用線程將會執行queue.take()方法嘗試獲取一個任務,如果沒有任務,那么當前線程等待直到任務隊列中有一個任務,那么獲取并執行。實際上,這里等待的任務就是請求對應的響應結果。
ThreadlessExecutor#execute方法在響應返回之后會執行。首先將ChannelEventRunnable包裝為RunnableWrapper,對于run方法添加了try-catch,不會拋出異常僅僅打印日志。然后判斷如果同步調用線程沒有處于等待狀態,那么當前線程直接執行該線程任務。
否則將當前任務加入到阻塞隊列,如果同步調用線程還在因為調用waitAndDrain方而處于等待狀態,那么將會因為隊列中添加了元素而被喚醒,進而執行該線程任務。
/*** 如果調用線程仍在等待回調任務,則將該任務添加到阻塞隊列中以等待調度。否則,直接提交到共享回調執行器。** @param runnable 可執行的任務ChannelEventRunnable*/
@Override
public void execute(Runnable runnable) {//包裝RunnableWrapper,對于run方法添加了try-catch,不會拋出異常僅僅打印日志runnable = new RunnableWrapper(runnable);synchronized (lock) {//如果同步調用線程沒有處于等待狀態if (!isWaiting()) {//那么當前線程直接執行該線程任務runnable.run();return;}/** 將當前任務加入到阻塞隊列,如果同步調用線程還在因為調用waitAndDrain方而處于等待狀態* 那么將會因為隊列中添加了元素而被喚醒,進而執行該線程任務*/queue.add(runnable);}
}
HeaderExchangeHandler#received處理消息
HeaderExchangeHandler#received方法對于消息進行分類并調用不同的方法繼續處理。
對于請求消息,如果是雙向消息,那么調用handleRequest方法繼續處理,將會創建Response對象,然后調用dubboProtocol.requestHandler完成請求處理獲取結果,并將結果封裝到Response中后返回給客戶端。如果是單向消息則僅僅調用dubboProtocol.requestHandler完成請求處理即可。
對于響應消息,將會調用DefaultFuture#received方法處理,此時就會根據響應id獲取對應的DefaultFuture,將響應結果設置進去。這里的源碼我們后面講consumer獲取響應結果的時候再講解。
/*** HeaderExchangeHandler的方法* <p>* 分類處理消息** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);//請求消息if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {//處理事件消息handlerEvent(channel, request);} else {if (request.isTwoWay()) {//額外處理雙向消息handleRequest(exchangeChannel, request);} else {//處理單向消息,直接調用下層DubboProtocol.requestHandler#received方法handler.received(exchangeChannel, request.getData());}}}//響應消息else if (message instanceof Response) {handleResponse(channel, (Response) message);}//字符串else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (StringUtils.isNotEmpty(echo)) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}
}
HeaderExchangeHandler#handleResponse處理響應
該方法用于處理響應,將會判斷如果不是心跳響應,那么繼續通過DefaultFuture#received處理響應。
我們在此前學習Dubbo Consumer發起請求的流程的時候就講過,超時檢查任務TimeoutCheckTask#timeoutCheck方法中,如果請求超時,那么同樣會調用DefaultFuture#received方法處理超時響應。
/*** HeaderExchangeHandler的方法* <p>* 處理響應消息** @param channel NettyChannel* @param response Response*/
static void handleResponse(Channel channel, Response response) throws RemotingException {//如果不是心跳響應,那么繼續通過DefaultFuture#received處理響應if (response != null && !response.isHeartbeat()) {DefaultFuture.received(channel, response);}
}
DefaultFuture#received處理響應
當請求結果返回或者請求超時之后,將會通過DefaultFuture#received處理響應。
/*** DefaultFuture的方法* <p>* 處理響應結果** @param channel NettyChannel* @param response Response*/
public static void received(Channel channel, Response response) {//調用另一個received方法,timeout參數為falsereceived(channel, response, false);
}public static void received(Channel channel, Response response, boolean timeout) {try {//由于獲得了響應,那么將該請求的緩存從FUTURES中移除DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {//該請求的超時檢查任務Timeout t = future.timeoutCheckTask;//如果沒有超時,那么if (!timeout) {// decrease Time//取消該任務t.cancel();}//通過future處理響應,設置結果future.doReceived(response);} else {logger.warn("The timeout response finally returned at "+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))+ ", response status is " + response.getStatus()+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()+ " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");}} finally {//移除正在處理的channel緩存CHANNELS.remove(response.getId());}
}
DefaultFuture#doReceived處理響應
設置響應結果并喚醒在AsyncRpcResult#getAppResponse方法中因為調用responseFuture.get()而阻塞的線程。隨后AsyncRpcResult#getAppResponse方法可以獲取響應結果并返回。
/*** DefaultFuture的方法* <p>* 處理響應設置結果*/
private void doReceived(Response res) {if (res == null) {throw new IllegalStateException("response cannot be null");}//如果響應成功,則將設置響應結果并喚醒在AsyncRpcResult#getAppResponse方法中因為調用responseFuture.get()而阻塞的線程if (res.getStatus() == Response.OK) {this.complete(res.getResult());}//如果是客戶端或者服務端超時,拋出超時異常并喚醒在AsyncRpcResult#getAppResponse方法中因為調用responseFuture.get()而阻塞的線程else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));}//拋出遠程調用異常并喚醒在AsyncRpcResult#getAppResponse方法中因為調用responseFuture.get()而阻塞的線程else {this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));}// the result is returning, but the caller thread may still wait// to avoid endless waiting for whatever reason, notify caller thread to return.//結果正在返回,但是調用者線程可能仍在等待,為了避免無休止的等待,通知調用線程返回。if (executor != null && executor instanceof ThreadlessExecutor) {ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;if (threadlessExecutor.isWaiting()) {threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +" which is not an expected state, interrupt the thread manually by returning an exception."));}}
}