Dubbo 3.x源碼(33)—Dubbo Consumer接收服務調用響應

基于Dubbo 3.1,詳細介紹了Dubbo Consumer接收服務調用響應

此前我們學習了Dubbo Provider處理服務調用請求的流程,現在我們來學習Dubbo Consumer接收服務調用響應流程。

實際上接收請求和接收響應同屬于接收消息,它們的流程的很多步驟是一樣的。下面我們僅分析關鍵步驟。

Dubbo 3.x服務調用源碼:

  1. Dubbo 3.x源碼(29)—Dubbo Consumer服務調用源碼(1)服務調用入口
  2. Dubbo 3.x源碼(30)—Dubbo Consumer服務調用源碼(2)發起遠程調用
  3. Dubbo 3.x源碼(31)—Dubbo消息的編碼解碼
  4. Dubbo 3.x源碼(32)—Dubbo Provider處理服務調用請求源碼
  5. Dubbo 3.x源碼(33)—Dubbo Consumer接收服務調用響應

Dubbo 3.x服務引用源碼:

  1. Dubbo 3.x源碼(11)—Dubbo服務的發布與引用的入口
  2. Dubbo 3.x源碼(18)—Dubbo服務引用源碼(1)
  3. Dubbo 3.x源碼(19)—Dubbo服務引用源碼(2)
  4. Dubbo 3.x源碼(20)—Dubbo服務引用源碼(3)
  5. Dubbo 3.x源碼(21)—Dubbo服務引用源碼(4)
  6. Dubbo 3.x源碼(22)—Dubbo服務引用源碼(5)服務引用bean的獲取以及懶加載原理
  7. Dubbo 3.x源碼(23)—Dubbo服務引用源碼(6)MigrationRuleListener遷移規則監聽器
  8. Dubbo 3.x源碼(24)—Dubbo服務引用源碼(7)接口級服務發現訂閱refreshInterfaceInvoker
  9. Dubbo 3.x源碼(25)—Dubbo服務引用源碼(8)notify訂閱服務通知更新
  10. Dubbo 3.x源碼(26)—Dubbo服務引用源碼(9)應用級服務發現訂閱refreshServiceDiscoveryInvoker
  11. Dubbo 3.x源碼(27)—Dubbo服務引用源碼(10)subscribeURLs訂閱應用級服務url

Dubbo 3.x服務發布源碼:

  1. Dubbo 3.x源碼(11)—Dubbo服務的發布與引用的入口
  2. Dubbo 3.x源碼(12)—Dubbo服務發布導出源碼(1)
  3. Dubbo 3.x源碼(13)—Dubbo服務發布導出源碼(2)
  4. Dubbo 3.x源碼(14)—Dubbo服務發布導出源碼(3)
  5. Dubbo 3.x源碼(15)—Dubbo服務發布導出源碼(4)
  6. Dubbo 3.x源碼(16)—Dubbo服務發布導出源碼(5)
  7. Dubbo 3.x源碼(17)—Dubbo服務發布導出源碼(6)
  8. Dubbo 3.x源碼(28)—Dubbo服務發布導出源碼(7)應用級服務接口元數據發布

文章目錄

  • AllChannelHandler#received分發任務
    • getPreferredExecutorService獲取首選線程池
  • ThreadlessExecutor#execute執行
  • HeaderExchangeHandler#received處理消息
  • HeaderExchangeHandler#handleResponse處理響應
  • DefaultFuture#received處理響應
  • DefaultFuture#doReceived處理響應

AllChannelHandler#received分發任務

將當前消息包裝為一個ChannelEventRunnable分發給對應的線程池執行,這里的線程池就是dubbo業務線程池,到此IO線程的任務結束。

這種方式實現了線程資源的隔離,釋放了IO線程,可以快速處理更多的IO操作,提升了系統吞吐量。

/*** AllChannelHandler的方法* <p>* 處理普通rpc請求請求** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//獲取對應的線程池,可能是ThreadlessExecutorExecutorService executor = getPreferredExecutorService(message);try {//創建一個線程任務ChannelEventRunnable,通過線程池執行//這里的handler是DecodeHandlerexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if (message instanceof Request && t instanceof RejectedExecutionException) {sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}

getPreferredExecutorService獲取首選線程池

該方法獲取處理業務的首選線程池,目前,這種方法主要是為了方便消費者端的線程模型而定制的。這是Dubo2.7.5之后的線程模型的優化:

  1. 對于響應消息,那么從DefaultFuture的靜態字段緩存映射FUTURES中獲取請求id對應的DefaultFuture。請求的id就是對應的響應的id。
    1. 然后獲取執DefaultFuture的執行器,對于默認的同步請求,那自然是ThreadlessExecutor,這里面阻塞著發起同步調用的線程,將回調直接委派給發起調用的線程。對于異步請求,則獲取異步請求的線程池。
    2. 因此后續的處理包括請求體的解碼都是由發起調用的線程來執行,這樣減輕了業務線程池的壓力。后面我們將消費者接受響應的時候,會講解對應的源碼。
  2. 對于請求消息,則使用共享executor執行后續邏輯。對于共享線程池,默認為FixedThreadPool,固定200線程,阻塞隊列長度為0,拒絕策略為打印異常日志并且拋出異常。

對于同步阻塞請求的響應,這是默認的請求方式,將會獲取ThreadlessExecutor執行器,對于異步請求的響應,將會獲取一個多線程的線程池。

/*** WrappedChannelHandler的方法* <p>* 目前,這種方法主要是為了方便消費者端的線程模型而定制的。* 1. 使用ThreadlessExecutor,又名,將回調直接委派給發起調用的線程。* 2. 使用共享executor執行回調。** @param msg 消息* @return 執行器*/
public ExecutorService getPreferredExecutorService(Object msg) {//如果是響應消息if (msg instanceof Response) {Response response = (Response) msg;//從DefaultFuture的靜態字段緩存映射FUTURES中獲取請求id對應的DefaultFutureDefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());// a typical scenario is the response returned after timeout, the timeout response may have completed the future//一個典型的場景是響應超時后返回,超時后的響應可能已經完成if (responseFuture == null) {//獲取當前服務器或客戶端的共享執行器return getSharedExecutorService();} else {//獲取執future的執行器,對于默認的同步請i去,那自然是ThreadlessExecutor,這里面阻塞著發起同步調用的線程ExecutorService executor = responseFuture.getExecutor();if (executor == null || executor.isShutdown()) {//獲取當前服務器或客戶端的共享執行器executor = getSharedExecutorService();}return executor;}} else {//獲取當前服務器或客戶端的共享執行器return getSharedExecutorService();}
}/*** get the shared executor for current Server or Client** @return*/
public ExecutorService getSharedExecutorService() {// Application may be destroyed before channel disconnected, avoid create new application model// see https://github.com/apache/dubbo/issues/9127//在斷開通道之前,應用程序可能被銷毀,避免創建新的應用程序模型if (url.getApplicationModel() == null || url.getApplicationModel().isDestroyed()) {return GlobalResourcesRepository.getGlobalExecutorService();}// note: url.getOrDefaultApplicationModel() may create new application modelApplicationModel applicationModel = url.getOrDefaultApplicationModel();ExecutorRepository executorRepository =applicationModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();//從執行器倉庫中根據url獲取對應的執行器,默認INTERNAL_SERVICE_EXECUTOR對應的執行器ExecutorService executor = executorRepository.getExecutor(url);if (executor == null) {executor = executorRepository.createExecutorIfAbsent(url);}return executor;
}

ThreadlessExecutor#execute執行

還記得我們此前學習Dubbo Consumer發起請求的流程嗎?對于同步請求,在發送請求之后,在AbstractInvoker#waitForResultIfSync方法中將會執行異步轉同步等待,具體的等待方法就是ThreadlessExecutor#waitAndDrain方法。

waitAndDrain方法中,請求調用線程將會執行queue.take()方法嘗試獲取一個任務,如果沒有任務,那么當前線程等待直到任務隊列中有一個任務,那么獲取并執行。實際上,這里等待的任務就是請求對應的響應結果。

ThreadlessExecutor#execute方法在響應返回之后會執行。首先將ChannelEventRunnable包裝為RunnableWrapper,對于run方法添加了try-catch,不會拋出異常僅僅打印日志。然后判斷如果同步調用線程沒有處于等待狀態,那么當前線程直接執行該線程任務。

否則將當前任務加入到阻塞隊列,如果同步調用線程還在因為調用waitAndDrain方而處于等待狀態,那么將會因為隊列中添加了元素而被喚醒,進而執行該線程任務。

/*** 如果調用線程仍在等待回調任務,則將該任務添加到阻塞隊列中以等待調度。否則,直接提交到共享回調執行器。** @param runnable 可執行的任務ChannelEventRunnable*/
@Override
public void execute(Runnable runnable) {//包裝RunnableWrapper,對于run方法添加了try-catch,不會拋出異常僅僅打印日志runnable = new RunnableWrapper(runnable);synchronized (lock) {//如果同步調用線程沒有處于等待狀態if (!isWaiting()) {//那么當前線程直接執行該線程任務runnable.run();return;}/** 將當前任務加入到阻塞隊列,如果同步調用線程還在因為調用waitAndDrain方而處于等待狀態* 那么將會因為隊列中添加了元素而被喚醒,進而執行該線程任務*/queue.add(runnable);}
}

HeaderExchangeHandler#received處理消息

HeaderExchangeHandler#received方法對于消息進行分類并調用不同的方法繼續處理。

對于請求消息,如果是雙向消息,那么調用handleRequest方法繼續處理,將會創建Response對象,然后調用dubboProtocol.requestHandler完成請求處理獲取結果,并將結果封裝到Response中后返回給客戶端。如果是單向消息則僅僅調用dubboProtocol.requestHandler完成請求處理即可。

對于響應消息,將會調用DefaultFuture#received方法處理,此時就會根據響應id獲取對應的DefaultFuture,將響應結果設置進去。這里的源碼我們后面講consumer獲取響應結果的時候再講解。

/*** HeaderExchangeHandler的方法* <p>* 分類處理消息** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);//請求消息if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {//處理事件消息handlerEvent(channel, request);} else {if (request.isTwoWay()) {//額外處理雙向消息handleRequest(exchangeChannel, request);} else {//處理單向消息,直接調用下層DubboProtocol.requestHandler#received方法handler.received(exchangeChannel, request.getData());}}}//響應消息else if (message instanceof Response) {handleResponse(channel, (Response) message);}//字符串else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (StringUtils.isNotEmpty(echo)) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}
}

HeaderExchangeHandler#handleResponse處理響應

該方法用于處理響應,將會判斷如果不是心跳響應,那么繼續通過DefaultFuture#received處理響應。

我們在此前學習Dubbo Consumer發起請求的流程的時候就講過,超時檢查任務TimeoutCheckTask#timeoutCheck方法中,如果請求超時,那么同樣會調用DefaultFuture#received方法處理超時響應。

/*** HeaderExchangeHandler的方法* <p>* 處理響應消息** @param channel  NettyChannel* @param response Response*/
static void handleResponse(Channel channel, Response response) throws RemotingException {//如果不是心跳響應,那么繼續通過DefaultFuture#received處理響應if (response != null && !response.isHeartbeat()) {DefaultFuture.received(channel, response);}
}

DefaultFuture#received處理響應

當請求結果返回或者請求超時之后,將會通過DefaultFuture#received處理響應。

/*** DefaultFuture的方法* <p>* 處理響應結果** @param channel  NettyChannel* @param response Response*/
public static void received(Channel channel, Response response) {//調用另一個received方法,timeout參數為falsereceived(channel, response, false);
}public static void received(Channel channel, Response response, boolean timeout) {try {//由于獲得了響應,那么將該請求的緩存從FUTURES中移除DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {//該請求的超時檢查任務Timeout t = future.timeoutCheckTask;//如果沒有超時,那么if (!timeout) {// decrease Time//取消該任務t.cancel();}//通過future處理響應,設置結果future.doReceived(response);} else {logger.warn("The timeout response finally returned at "+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))+ ", response status is " + response.getStatus()+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()+ " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");}} finally {//移除正在處理的channel緩存CHANNELS.remove(response.getId());}
}

DefaultFuture#doReceived處理響應

設置響應結果并喚醒在AsyncRpcResult#getAppResponse方法中因為調用responseFuture.get()而阻塞的線程。隨后AsyncRpcResult#getAppResponse方法可以獲取響應結果并返回。

/*** DefaultFuture的方法* <p>* 處理響應設置結果*/
private void doReceived(Response res) {if (res == null) {throw new IllegalStateException("response cannot be null");}//如果響應成功,則將設置響應結果并喚醒在AsyncRpcResult#getAppResponse方法中因為調用responseFuture.get()而阻塞的線程if (res.getStatus() == Response.OK) {this.complete(res.getResult());}//如果是客戶端或者服務端超時,拋出超時異常并喚醒在AsyncRpcResult#getAppResponse方法中因為調用responseFuture.get()而阻塞的線程else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));}//拋出遠程調用異常并喚醒在AsyncRpcResult#getAppResponse方法中因為調用responseFuture.get()而阻塞的線程else {this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));}// the result is returning, but the caller thread may still wait// to avoid endless waiting for whatever reason, notify caller thread to return.//結果正在返回,但是調用者線程可能仍在等待,為了避免無休止的等待,通知調用線程返回。if (executor != null && executor instanceof ThreadlessExecutor) {ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;if (threadlessExecutor.isWaiting()) {threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +" which is not an expected state, interrupt the thread manually by returning an exception."));}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/92873.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/92873.shtml
英文地址,請注明出處:http://en.pswp.cn/web/92873.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

棧和隊列:數據結構中的基礎與應用?

棧和隊列&#xff1a;數據結構中的基礎與應用在計算機科學的領域中&#xff0c;數據結構猶如大廈的基石&#xff0c;支撐著各類復雜軟件系統的構建。而棧和隊列作為兩種基礎且重要的數據結構&#xff0c;以其獨特的特性和廣泛的應用&#xff0c;在程序設計的舞臺上扮演著不可或…

服務端配置 CORS解決跨域問題的原理

服務端配置 CORS&#xff08;跨域資源共享&#xff09;的原理本質是 瀏覽器與服務器之間的安全協商機制。其核心在于服務器通過特定的 HTTP 響應頭聲明允許哪些外部源&#xff08;Origin&#xff09;訪問資源&#xff0c;瀏覽器根據這些響應頭決定是否放行跨域請求。以下是詳細…

Unity筆記(五)知識補充——場景切換、退出游戲、鼠標隱藏鎖定、隨機數、委托

寫在前面&#xff1a;寫本系列(自用)的目的是回顧已經學過的知識、記錄新學習的知識或是記錄心得理解&#xff0c;方便自己以后快速復習&#xff0c;減少遺忘。主要是C#代碼部分。十七、場景切換和退出游戲1、場景切換場景切換使用方法&#xff1a; SceneManager.LoadScene()&a…

用 Spring 思維快速上手 DDD——以 Kratos 為例的分層解讀

用 Spring 思維理解 DDD —— 以 Kratos 為參照 ? 在此前的學習工作中&#xff0c;使用的開發框架一直都是 SpringBoot&#xff0c;對 MVC 架構幾乎是肌肉記憶&#xff1a;Controller 接請求&#xff0c;Service 寫業務邏輯&#xff0c;Mapper 操作數據庫&#xff0c;這套套路…

docspace|Linux|使用docker完全離線化部署onlyoffice之docspace文檔協作系統(全網首發)

一、 前言 書接上回&#xff0c;Linux|實用工具|onlyoffice workspace使用docker快速部署&#xff08;離線和定制化部署&#xff09;-CSDN博客&#xff0c;如果是小公司或者比如某個項目組內部使用&#xff0c;那么&#xff0c;使用docspace這個文檔協同系統是非常合適的&…

【教程】如何高效提取胡蘿卜塊根形態和顏色特征?

胡蘿卜是全球不可或缺的健康食材和重要的經濟作物&#xff0c; 從田間到餐桌&#xff0c;從鮮食到深加工&#xff0c;胡蘿卜在現代人的飲食和健康中扮演著極其重要的角色&#xff0c;通過量化塊根形態和色澤均勻性&#xff0c;可實現對高產優質胡蘿卜品種的快速篩選。工具/材料…

Python初學者筆記第二十四期 -- (面向對象編程)

第33節課 面向對象編程 1. 面向對象編程基礎 1.1 什么是面向對象編程面向過程&#xff1a;執行者 耗時 費力 結果也不一定完美 面向對象&#xff1a;指揮者 省時 省力 結果比較完美面向對象編程(Object-Oriented Programming, OOP)是一種編程范式&#xff0c;它使用"對象&…

Go 語言 里 `var`、`make`、`new`、`:=` 的區別

把 Go 語言 里 var、make、new、: 的區別徹底梳理一下。1?? var 作用&#xff1a;聲明變量&#xff08;可以帶初始值&#xff0c;也可以不帶&#xff09;。語法&#xff1a; var a int // 聲明整型變量&#xff0c;默認值為 0 var b string // 默認值 ""…

計算機網絡---IP(互聯網協議)

一、IP協議概述 互聯網協議&#xff08;Internet Protocol&#xff0c;IP&#xff09;是TCP/IP協議族的核心成員&#xff0c;位于OSI模型的網絡層&#xff08;第三層&#xff09;&#xff0c;負責將數據包從源主機傳輸到目標主機。它是一種無連接、不可靠的協議&#xff0c;提供…

DataFun聯合開源AllData社區和開源Gravitino社區將在8月9日相聚數據治理峰會論壇

&#x1f525;&#x1f525; AllData大數據產品是可定義數據中臺&#xff0c;以數據平臺為底座&#xff0c;以數據中臺為橋梁&#xff0c;以機器學習平臺為中層框架&#xff0c;以大模型應用為上游產品&#xff0c;提供全鏈路數字化解決方案。 ?杭州奧零數據科技官網&#xff…

【工具】通用文檔轉換器 推薦 Markdown 轉為 Word 或者 Pdf格式 可以批量或者通過代碼調用

【工具】通用文檔轉換器 推薦 可以批量或者通過代碼調用 通用文檔轉換器 https://github.com/jgm/pandoc/ Pandoc - index 下載地址 https://github.com/jgm/pandoc/releases 使用方法: 比如 Markdown 轉為 Word 或者 Pdf格式 pandoc -s MANUAL.txt -o example29.docx …

【UEFI系列】Super IO

文章目錄一、什么是Super IO二、Super IO的作用常見廠商三、邏輯設備控制如何訪問SIO邏輯設備的配置寄存器具體配置數值四、硬件監控&#xff08;hardware monitor&#xff09;一、什么是Super IO Super Input/Output超級輸入輸出控制器。 通過LPC&#xff08;low pin count&a…

飛算 JavaAI 2.0.0 測評:自然語言編程如何顛覆傳統開發?

一、前言 在AI技術高速發展的今天&#xff0c;編程方式正在經歷一場革命。傳統的“手寫代碼”模式逐漸被AI輔助開發取代&#xff0c;而飛算JavaAI 2.0.0的推出&#xff0c;更是讓自然語言編程成為現實。 作為一名長期使用Java開發的程序員&#xff0c;我決定深度體驗飛算Java…

Dubbo + zk 微服務

一、安裝zk注冊中心 win版本&#xff1a;windows環境下安裝zookeeper教程詳解&#xff08;單機版&#xff09;-CSDN博客 linux版本&#xff1a; 二、服務提供方搭建 引入dubbo和zk依賴 提供接口 使用注解方式實現接口級注冊到zk&#xff0c;而springcloud是將服務注冊到注冊…

聆思duomotai_ap sdk適配dooiRobot

一、說明 1、duomotai_ap介紹 duomotai_ap是一個針對多模態開發板&#xff08;如 CSK6-MIX 開發板&#xff09;的大模型 AI 開發套件 SDK&#xff0c;主要用于開發語音、視覺等多模態 AI 應用。 2、dooiRobot介紹 基于Doly 機器人的經典外觀設計&#xff0c;采用聆思CSK6011A…

Photoshop軟件打開WebP文件格的操作教程

Photoshop軟件打開WebP文件格的操作教程&#xff0c;好吧&#xff0c;這是英文原版&#xff1a; Photoshop 23.2 原生支持 WebP 格式&#xff0c;無需插件即可打開、編輯和保存 WebP 文件。用戶可通過“文件 > 另存為副本”選擇 WebP 格式&#xff0c;調整無損/有損壓縮及質…

【數據結構】——順序表鏈表(超詳細解析!!!)

目錄一. 前言二. 順序表1. 順序表的特點2. 代碼實現三. 鏈表1. 單向鏈表代碼實現2.雙向鏈表代碼實現四. 順序表與鏈表的區別總結一. 前言 順序表和鏈表是最基礎的兩種線性表實現方式。它們各有特點&#xff0c;適用于不同的應用場景。本文將詳細介紹這兩種數據結構的實現原理、…

GitHub的簡單使用方法----(4)

在安裝完git之后&#xff0c;桌面右鍵會出現兩個git的選項第一個gui打開是這樣的用戶界面分別是新建倉庫&#xff0c;克隆倉庫&#xff0c;打開已經存在的倉庫。tips:Git Gui 默認只能操作本地倉庫——它本質上是一個圖形化的“本地 Git 客戶端”。 它本身不內置“下載遠程倉庫…

藍橋杯----大模板

在寫大模板之前&#xff0c;先講一個函System_Init()&#xff0c;用于系統初始化關閉所有LED與外設&#xff0c;關閉所有LED就是傳入0xff數據打開鎖存器&#xff0c;關閉外設就是傳入0x00打開鎖存器。現在所有底層已經提供給大家了&#xff0c;先提供最簡單版本的大模板&#x…

科技寫作改革我見:取消參考文獻,以點讀率取代引證率!

科技寫作改革我見&#xff1a;綜述應取消參考文獻&#xff0c;學術成就評估以點讀下載率取代參考文獻引證率&#xff01;李升偉 張君飛 韓若蘭引言在當今信息爆炸的時代&#xff0c;科技寫作作為知識傳播的核心載體&#xff0c;其形式與評價體系正面臨前所未有的挑戰。傳統…