Dubbo 3.x源碼(32)—Dubbo Provider處理服務調用請求源碼

基于Dubbo 3.1,詳細介紹了Dubbo Provider處理服務調用請求源碼

上文我們學習了,Dubbo消息的編碼解的源碼。現在我們來學習一下Dubbo Provider處理服務調用請求源碼。

當前consumer發起了rpc請求,經過請求編碼之后到達provider端,在經過InternalDecoder#decode解碼請求解碼之后,后續調用鏈路為:InternalDecoder、IdleStateHandler#channelRead、NettyServerHandler#channelRead, NettyServerHandler#channelRead方法將會進行后續業務處理。

Dubbo 3.x服務調用源碼:

  1. Dubbo 3.x源碼(29)—Dubbo Consumer服務調用源碼(1)服務調用入口
  2. Dubbo 3.x源碼(30)—Dubbo Consumer服務調用源碼(2)發起遠程調用
  3. Dubbo 3.x源碼(31)—Dubbo消息的編碼解碼
  4. Dubbo 3.x源碼(32)—Dubbo Provider處理服務調用請求源碼

Dubbo 3.x服務引用源碼:

  1. Dubbo 3.x源碼(11)—Dubbo服務的發布與引用的入口
  2. Dubbo 3.x源碼(18)—Dubbo服務引用源碼(1)
  3. Dubbo 3.x源碼(19)—Dubbo服務引用源碼(2)
  4. Dubbo 3.x源碼(20)—Dubbo服務引用源碼(3)
  5. Dubbo 3.x源碼(21)—Dubbo服務引用源碼(4)
  6. Dubbo 3.x源碼(22)—Dubbo服務引用源碼(5)服務引用bean的獲取以及懶加載原理
  7. Dubbo 3.x源碼(23)—Dubbo服務引用源碼(6)MigrationRuleListener遷移規則監聽器
  8. Dubbo 3.x源碼(24)—Dubbo服務引用源碼(7)接口級服務發現訂閱refreshInterfaceInvoker
  9. Dubbo 3.x源碼(25)—Dubbo服務引用源碼(8)notify訂閱服務通知更新
  10. Dubbo 3.x源碼(26)—Dubbo服務引用源碼(9)應用級服務發現訂閱refreshServiceDiscoveryInvoker
  11. Dubbo 3.x源碼(27)—Dubbo服務引用源碼(10)subscribeURLs訂閱應用級服務url

Dubbo 3.x服務發布源碼:

  1. Dubbo 3.x源碼(11)—Dubbo服務的發布與引用的入口
  2. Dubbo 3.x源碼(12)—Dubbo服務發布導出源碼(1)
  3. Dubbo 3.x源碼(13)—Dubbo服務發布導出源碼(2)
  4. Dubbo 3.x源碼(14)—Dubbo服務發布導出源碼(3)
  5. Dubbo 3.x源碼(15)—Dubbo服務發布導出源碼(4)
  6. Dubbo 3.x源碼(16)—Dubbo服務發布導出源碼(5)
  7. Dubbo 3.x源碼(17)—Dubbo服務發布導出源碼(6)
  8. Dubbo 3.x源碼(28)—Dubbo服務發布導出源碼(7)應用級服務接口元數據發布

文章目錄

  • NettyServerHandler#channelRead業務處理入口
  • AbstractPeer#received接收請求
  • MultiMessageHandler#received處理多個消息
  • HeartbeatHandler處理心跳消息
  • AllChannelHandler#received分發任務
    • getPreferredExecutorService獲取首選線程池
  • ChannelEventRunnable#run業務線程執行
  • DecodeHandler#received解碼消息體
  • HeaderExchangeHandler#received處理消息
  • HeaderExchangeHandler#handleRequest處理請求
    • DubboProtocol.requestHandler#reply真實方法調用
      • DubboProtocol#getInvoker獲取invoker
      • CallbackRegistrationInvoker#invoke過濾器鏈調用
    • AbstractProxyInvoker#invoke業務實現接口調用
      • doInvoke執行業務調用
  • 總結

NettyServerHandler#channelRead業務處理入口

該方法內部根據netty NioSocketChannel獲取對應的dubbo NettyChannel,然后通過handler#received方法處理消息。關于這個handler,我們在服務的導出時就說過了,這實際上時很多個handler的嵌套對象,而最外層的就是在這里的handler,即NettyServer。

此時調用線程還是IO線程。

/*** NettyServerHandler的方法* <p>* 請求業務處理入口** @param ctx DefaultChannelHandlerContext* @param msg Request* @throws Exception*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//根據netty NioSocketChannel獲取對應的dubbo NettyChannelNettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);/** 通過NettyServer#received方法處理消息*/handler.received(channel, msg);// trigger qos handlerctx.fireChannelRead(msg);
}

AbstractPeer#received接收請求

NettyServer#received接收請求的方法由其父類AbstractPeer#received實現,該方法內部又委托MultiMessageHandler#received方法實現。

此時調用線程還是IO線程。

/*** AbstractPeer的方法* * @param ch NettyChannel* @param msg 消息*/
@Override
public void received(Channel ch, Object msg) throws RemotingException {if (closed) {return;}//調用MultiMessageHandler#received方法handler.received(ch, msg);
}

MultiMessageHandler#received處理多個消息

MultiMessageHandler#received方法主要是處理同時接收到多條消息的情況。如果msg屬于MultiMessage,那么循環調用下層handler#received方法,否則直接調用下層handler#received方法,下層handler是HeartbeatHandler。

此時調用線程還是IO線程。

/*** MultiMessageHandler的方法* <p>* 處理多消息的情況** @param channel  NettyChannel* @param message 消息*/
@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {//如果是多消息類型,即包含多條消息if (message instanceof MultiMessage) {MultiMessage list = (MultiMessage) message;//循環處理多條消息for (Object obj : list) {try {//調用下層HeartbeatHandler#received方法handler.received(channel, obj);} catch (Throwable t) {logger.error("MultiMessageHandler received fail.", t);try {handler.caught(channel, t);} catch (Throwable t1) {logger.error("MultiMessageHandler caught fail.", t1);}}}} else {//直接調用下層HeartbeatHandler#received方法handler.received(channel, message);}
}

HeartbeatHandler處理心跳消息

該方法專門用于處理心跳請求和響應,對于心跳消息,那么這里處理了之后直接return了,不再進行下層handler處理,而普通rpc請求則繼續調用下層AllChannelHandler#received方法處理。

此時調用線程還是IO線程。

/*** HeartbeatHandler的方法* <p>* 處理心跳請求** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//為NettyChannel設置READ_TIMESTAMP屬性,置為當前時間戳setReadTimestamp(channel);//是否是心跳請求if (isHeartbeatRequest(message)) {Request req = (Request) message;//如果是雙向請求if (req.isTwoWay()) {//那么直接創建一個Response返回Response res = new Response(req.getId(), req.getVersion());res.setEvent(HEARTBEAT_EVENT);//直接發送響應channel.send(res);if (logger.isDebugEnabled()) {int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()+ ", cause: The channel has no data-transmission exceeds a heartbeat period"+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));}}//直接返回,不用后續處理return;}//如果是心跳響應,直接返回,不用后續處理if (isHeartbeatResponse(message)) {if (logger.isDebugEnabled()) {logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());}return;}//調用下層AllChannelHandler#received方法handler.received(channel, message);
}

AllChannelHandler#received分發任務

將當前消息包裝為一個ChannelEventRunnable分發給對應的線程池執行,這里的線程池就是dubbo業務線程池,到此IO線程的任務結束。


這種方式實現了線程資源的隔離,釋放了IO線程,可以快速處理更多的IO操作,提升了系統吞吐量。


實際上這對應的就是AllDispatcher的線程模型,也是Dubbo的默認線程模型,即所有消息都派發到業務線程池,包括建立連接CONNECTED、關閉連接DISCONNECTED、接受請求RECEIVED、處理異常CAUGHT、發送響應SENT等等,響應消息會優先使用對于請求所使用的線程池。這是默認策略。

/*** AllChannelHandler的方法* <p>* 處理普通rpc請求請求** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//獲取對應的線程池,可能是ThreadlessExecutorExecutorService executor = getPreferredExecutorService(message);try {//創建一個線程任務ChannelEventRunnable,通過線程池執行//這里的handler是DecodeHandlerexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if (message instanceof Request && t instanceof RejectedExecutionException) {sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}

getPreferredExecutorService獲取首選線程池

該方法獲取處理業務的首選線程池,目前這種方法主要是為了方便消費者端的線程模型而定制的。這是Dubo2.7.5之后的線程模型的優化:

  1. 對于響應消息,那么從DefaultFuture的靜態字段緩存映射FUTURES中獲取請求id對應的DefaultFuture。請求的id就是對應的響應的id。
    1. 然后獲取執DefaultFuture的執行器,對于默認的同步請求,那自然是ThreadlessExecutor,這里面阻塞著發起同步調用的線程,將回調直接委派給發起調用的線程。對于異步請求,則獲取異步請求的線程池。
    2. 因此后續的處理包括響應體的解碼都是由發起調用的線程來執行,這樣減輕了業務線程池的壓力。后面我們講消費者接受響應的時候,會講解對應的源碼。
  2. 對于請求消息,則使用共享executor執行后續邏輯。對于共享線程池,默認為FixedThreadPool,固定200線程,阻塞隊列長度為0,拒絕策略為打印異常日志并且拋出異常。
/*** WrappedChannelHandler的方法* <p>* 目前,這種方法主要是為了方便消費者端的線程模型而定制的。* 1. 使用ThreadlessExecutor,又名,將回調直接委派給發起調用的線程。* 2. 使用共享executor執行回調。** @param msg 消息* @return 執行器*/
public ExecutorService getPreferredExecutorService(Object msg) {//如果是響應消息if (msg instanceof Response) {Response response = (Response) msg;//從DefaultFuture的靜態字段緩存映射FUTURES中獲取請求id對應的DefaultFutureDefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());// a typical scenario is the response returned after timeout, the timeout response may have completed the future//一個典型的場景是響應超時后返回,超時后的響應可能已經完成if (responseFuture == null) {//獲取當前服務器或客戶端的共享執行器return getSharedExecutorService();} else {//獲取執future的執行器,對于默認的同步請i去,那自然是ThreadlessExecutor,這里面阻塞著發起同步調用的線程ExecutorService executor = responseFuture.getExecutor();if (executor == null || executor.isShutdown()) {//獲取當前服務器或客戶端的共享執行器executor = getSharedExecutorService();}return executor;}} else {//獲取當前服務器或客戶端的共享執行器return getSharedExecutorService();}
}/*** get the shared executor for current Server or Client** @return*/
public ExecutorService getSharedExecutorService() {// Application may be destroyed before channel disconnected, avoid create new application model// see https://github.com/apache/dubbo/issues/9127//在斷開通道之前,應用程序可能被銷毀,避免創建新的應用程序模型if (url.getApplicationModel() == null || url.getApplicationModel().isDestroyed()) {return GlobalResourcesRepository.getGlobalExecutorService();}// note: url.getOrDefaultApplicationModel() may create new application modelApplicationModel applicationModel = url.getOrDefaultApplicationModel();ExecutorRepository executorRepository =applicationModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();//從執行器倉庫中根據url獲取對應的執行器,默認INTERNAL_SERVICE_EXECUTOR對應的執行器ExecutorService executor = executorRepository.getExecutor(url);if (executor == null) {executor = executorRepository.createExecutorIfAbsent(url);}return executor;
}

ChannelEventRunnable#run業務線程執行

ChannelEventRunnable#run方法就是業務線程執行的入口,構建任務的時候傳遞的handler是DecodeHandler。此時調用線程已經變成了業務線程。

該方法將會判斷通道狀態的各種狀態,并且調用handler的各種對應的方法處理。這里的handler是DecodeHandler。

@Override
public void run() {//通道狀態如果是處理消息狀態,該狀態占大多數情況if (state == ChannelState.RECEIVED) {try {/** 調用DecodeHandler#received方法*/handler.received(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}} else {//其他狀態switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is: " + message + ", exception is " + exception, e);}break;default:logger.warn("unknown state: " + state + ", message is " + message);}}}

DecodeHandler#received解碼消息體

這個DecodeHandler主要是對于請求消息或者響應消息的消息體進行解碼,對于請求消息(request.mData),這一步將會解碼出調用的方法名,方法參數類型,傳遞的方法參數,attachments等信息,隨后調用下層HeaderExchangeHandler#received方法。

此前的InternalDecoder#decode方法解碼了Reuqest,但是對于內部的data并沒有解碼,而是留給了業務線程在DecodeHandler中去操作,這樣可以盡量快速釋放IO線程。

/*** DecodeHandler的方法** 消息體的解碼** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//Decodeable類型,直接處理if (message instanceof Decodeable) {decode(message);}//請求消息if (message instanceof Request) {decode(((Request) message).getData());}//響應消息if (message instanceof Response) {decode(((Response) message).getResult());}//調用下層HeaderExchangeHandler#received方法handler.received(channel, message);
}

HeaderExchangeHandler#received處理消息

HeaderExchangeHandler#received方法對于消息進行分類并調用不同的方法繼續處理。

對于請求消息,如果是雙向消息,那么調用handleRequest方法繼續處理,將會創建Response對象,然后調用dubboProtocol.requestHandler完成請求處理獲取結果,并將結果封裝到Response中后返回給客戶端。如果是單向消息則僅僅調用dubboProtocol.requestHandler完成請求處理即可。

對于響應消息,將會調用DefaultFuture.received方法處理,此時就會根據響應id獲取對應的DefaultFuture,將響應結果設置進去。這里的源碼我們后面將consumer獲取響應結果的時候再講解。

/*** HeaderExchangeHandler的方法* <p>* 分類處理消息** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);//請求消息if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {//處理事件消息handlerEvent(channel, request);} else {if (request.isTwoWay()) {//額外處理雙向消息handleRequest(exchangeChannel, request);} else {//處理單向消息,直接調用下層DubboProtocol.requestHandler#received方法handler.received(exchangeChannel, request.getData());}}}//響應消息else if (message instanceof Response) {handleResponse(channel, (Response) message);}//字符串else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (StringUtils.isNotEmpty(echo)) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}
}

HeaderExchangeHandler#handleRequest處理請求

該方法用于處理雙向請求,也就是普通rpc請求。將會創建Response對象,然后調用dubboProtocol.requestHandler完成請求處理獲取結果,并將結果封裝到Response中后返回給客戶端。

/*** HeaderExchangeHandler的方法* <p>* 處理雙向rpc請求消息** @param channel NettyChannel* @param req     請求消息*/
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {//創建一個Response響應對象//id是請求id,這樣當響應返回之后,請求端便能找到對應的DefaultFuture進行后續結果的封裝Response res = new Response(req.getId(), req.getVersion());//如果是損壞的請求,則直接返回錯誤響應,通常解碼失敗這樣返回if (req.isBroken()) {Object data = req.getData();String msg;if (data == null) {msg = null;} else if (data instanceof Throwable) {msg = StringUtils.toString((Throwable) data);} else {msg = data.toString();}res.setErrorMessage("Fail to decode request due to: " + msg);res.setStatus(Response.BAD_REQUEST);channel.send(res);return;}// find handler by message class.//獲取請求體 DecodeableRpcInvocationObject msg = req.getData();try {/** 異步調用下層handler  DubboProtocol.requestHandler#reply方法* DubboProtocol.requestHandler是DubboProtocol中的一個匿名內部類實現*/CompletionStage<Object> future = handler.reply(channel, msg);/** 阻塞等待直到reply調用完成,則將結果設置到Response中,然后調用channel.send方法將調用結果返回給服務消費端*/future.whenComplete((appResult, t) -> {try {if (t == null) {res.setStatus(Response.OK);res.setResult(appResult);} else {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(t));}//將調用結果返回給服務消費端channel.send(res);} catch (RemotingException e) {logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);}});} catch (Throwable e) {//調用異常處理,設置SERVICE_ERROR狀態碼res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));//將異常調用結果返回給服務消費端channel.send(res);}
}

DubboProtocol.requestHandler#reply真實方法調用

該方法首先根據請求參數從DubboProtocol的exporterMap緩存中獲取對應的exporter,然后獲取exporter中的invoker,然后調用invoker#invoke方法對于真實的方法實現進行調用,然后返回調用結果。

這里獲取的Invoker,實際上是FilterChainBuilder的內部類CallbackRegistrationInvoker,我們在前面學習consumer發起服務調用請求的時候,實際上也是走的這個方法。可見dubbo在方法的復用的設計上還是很牛的。

CallbackRegistrationInvoker實際上內部持有是一個過濾器鏈,調用它的invoke方法,會觸發一系列過濾器filter的的調用,執行過濾操作。

/*** DubboProtocol.ExchangeHandler的匿名內部類實現*/
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {if (!(message instanceof Invocation)) {throw new RemotingException(channel, "Unsupported request: "+ (message == null ? null : (message.getClass().getName() + ": " + message))+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());}//轉換為調用抽象Invocation inv = (Invocation) message;//獲取對應的服務提供者invoker實例Invoker<?> invoker = getInvoker(channel, inv);inv.setServiceModel(invoker.getUrl().getServiceModel());// switch TCCLif (invoker.getUrl().getServiceModel() != null) {Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());}// need to consider backward-compatibility if it's a callback//如果是回調方法,需要考慮向后兼容,驗回調方法是否存在if (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {String methodsStr = invoker.getUrl().getParameters().get("methods");boolean hasMethod = false;if (methodsStr == null || !methodsStr.contains(",")) {hasMethod = inv.getMethodName().equals(methodsStr);} else {String[] methods = methodsStr.split(",");for (String method : methods) {if (inv.getMethodName().equals(method)) {hasMethod = true;break;}}}if (!hasMethod) {logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()+ " not found in callback service interface ,invoke will be ignored."+ " please update the api interface. url is:"+ invoker.getUrl()) + " ,invocation is :" + inv);return null;}}//設置調用端地址RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());/** 基于invoker調用invoke方法,內部將會對服務接口的真實實現進行調用并獲取結果*/Result result = invoker.invoke(inv);//同步等待結果并將結果封裝為CompletableFuture返回return result.thenApply(Function.identity());
}

DubboProtocol#getInvoker獲取invoker

該方法獲取從調用端傳遞的RpcInvocation中獲取各種信息,然后組裝成serviceKey:{group}/{servicePath}:{version}:{port},隨后從DubboProtocol的exporterMap緩存中根據serviceKey獲取對應的exporter,然后獲取exporter中的invoker返回。

我們在此前學習dubbo服務導出的時候,就學習了這個exporterMap,其內部存放著serviecKey到對應的exporter的實現。現在,在服務調用的時候,就用到了這個緩存,這樣一下就和之前學習的內容對應上了。

/*** DubboProtocol的方法* * @param channel HeaderExchangeChannel* @param inv 調用抽象,RpcInvocation* @return invoker*/
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {boolean isCallBackServiceInvoke;boolean isStubServiceInvoke;//端口號int port = channel.getLocalAddress().getPort();//獲取path,服務接口String path = (String) inv.getObjectAttachmentWithoutConvert(PATH_KEY);//if it's stub service on client side(after enable stubevent, usually is set up onconnect or ondisconnect method)//客戶端的存根服務isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(STUB_EVENT_KEY));if (isStubServiceInvoke) {//when a stub service export to local, it usually can't be exposed to portport = 0;}// if it's callback service on client side//如果是客戶端的回調服務isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;if (isCallBackServiceInvoke) {path += "." + inv.getObjectAttachmentWithoutConvert(CALLBACK_SERVICE_KEY);inv.setObjectAttachment(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());}//構建服務key,{group}/{servicePath}:{version}:{port}String serviceKey = serviceKey(port,path,(String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),(String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY));//從DubboProtocol的exporterMap緩存中根據serviceKey獲取對應的exporterDubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);if (exporter == null) {throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv));}//獲取exporter中的invokerreturn exporter.getInvoker();
}

CallbackRegistrationInvoker#invoke過濾器鏈調用

CallbackRegistrationInvoker#invoke方法用于添加一個回調,它可以在RPC調用完成時觸發,在這回調函數將會倒序執行filters中的過濾器。

/*** FilterChainBuilder的內部類CallbackRegistrationInvoker的方法*/
@Override
public Result invoke(Invocation invocation) throws RpcException {/** 繼續調用下層invoker#invoke*/Result asyncResult = filterInvoker.invoke(invocation);//添加一個回調,它可以在RPC調用完成時觸發。asyncResult.whenCompleteWithContext((r, t) -> {RuntimeException filterRuntimeException = null;//過濾器倒序執行for (int i = filters.size() - 1; i >= 0; i--) {FILTER filter = filters.get(i);try {InvocationProfilerUtils.releaseDetailProfiler(invocation);if (filter instanceof ListenableFilter) {//執行過濾器ListenableFilter listenableFilter = ((ListenableFilter) filter);Filter.Listener listener = listenableFilter.listener(invocation);try {if (listener != null) {if (t == null) {listener.onResponse(r, filterInvoker, invocation);} else {listener.onError(t, filterInvoker, invocation);}}} finally {listenableFilter.removeListener(invocation);}} else if (filter instanceof FILTER.Listener) {FILTER.Listener listener = (FILTER.Listener) filter;if (t == null) {listener.onResponse(r, filterInvoker, invocation);} else {listener.onError(t, filterInvoker, invocation);}}} catch (RuntimeException runtimeException) {LOGGER.error(String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName()));if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList())));}filterRuntimeException = runtimeException;t = runtimeException;}}if (filterRuntimeException != null) {throw filterRuntimeException;}});return asyncResult;
}/*** FilterChainBuilder#CopyOfFilterChainNode的方法*/
@Override
public Result invoke(Invocation invocation) throws RpcException {Result asyncResult;try {//進入細節分析器InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Filter " + filter.getClass().getName() + " invoke.");/** 執行過濾器操作*/asyncResult = filter.invoke(nextNode, invocation);} catch (Exception e) {InvocationProfilerUtils.releaseDetailProfiler(invocation);if (filter instanceof ListenableFilter) {ListenableFilter listenableFilter = ((ListenableFilter) filter);try {Filter.Listener listener = listenableFilter.listener(invocation);if (listener != null) {listener.onError(e, originalInvoker, invocation);}} finally {listenableFilter.removeListener(invocation);}} else if (filter instanceof FILTER.Listener) {FILTER.Listener listener = (FILTER.Listener) filter;listener.onError(e, originalInvoker, invocation);}throw e;} finally {}return asyncResult;
}

過濾器鏈的盡頭就是真實invoker,invoke方法經過的過濾器filter如下:ContextFilter、ProfilerServerFilter、EchoFilter、ClassLoaderFilter、GenericFilter、ExceptionFilter、MonitorFilter、TimeoutFilter、TraceFilter、ClassLoaderCallbackFilter、InvokerWrapper、DelegateProviderMetaDataInvoker、AbstractProxyInvoker。

AbstractProxyInvoker#invoke業務實現接口調用

AbstractProxyInvoker#invoke方法內部將會調用doInvoke方法,該方法會調用業務接口,最終統一個返回AsyncRpcResult。

/*** AbstractProxyInvoker的方法* <p>* 執行業務接口實現** @param invocation 調用抽象*/
@Override
public Result invoke(Invocation invocation) throws RpcException {try {ProfilerEntry originEntry = null;//性能分析if (ProfilerSwitch.isEnableSimpleProfiler()) {Object fromInvocation = invocation.get(Profiler.PROFILER_KEY);if (fromInvocation instanceof ProfilerEntry) {ProfilerEntry profiler = Profiler.enter((ProfilerEntry) fromInvocation, "Receive request. Server biz impl invoke begin.");invocation.put(Profiler.PROFILER_KEY, profiler);originEntry = Profiler.setToBizProfiler(profiler);}}/** 執行doInvoke,該方法由子類實現*/Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());//性能分析if (ProfilerSwitch.isEnableSimpleProfiler()) {Object fromInvocation = invocation.get(Profiler.PROFILER_KEY);if (fromInvocation instanceof ProfilerEntry) {ProfilerEntry profiler = Profiler.release((ProfilerEntry) fromInvocation);invocation.put(Profiler.PROFILER_KEY, profiler);}}Profiler.removeBizProfiler();if (originEntry != null) {Profiler.setToBizProfiler(originEntry);}//將返回結果轉換為CompletableFutureCompletableFuture<Object> future = wrapWithFuture(value, invocation);//轉換為AppResponse的CompletableFutureCompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {//創建AppResponseAppResponse result = new AppResponse(invocation);if (t != null) {if (t instanceof CompletionException) {result.setException(t.getCause());} else {result.setException(t);}} else {result.setValue(obj);}return result;});//統一返回AsyncRpcResultreturn new AsyncRpcResult(appResponseFuture, invocation);} catch (InvocationTargetException e) {if (RpcContext.getServiceContext().isAsyncStarted() && !RpcContext.getServiceContext().stopAsync()) {logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);}return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);} catch (Throwable e) {throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);}
}

doInvoke執行業務調用

到這一步,實際上以及調用到了基于JavassistProxyFactory#getInvoker或者JdkProxyFactory#getInvoker方法創建的AbstractProxyInvoker匿名實現類實例。我們在學習Dubbo服務導出的時候就說過了,這就是最底層的Invoker,其內部包含了真實的服務實現。

JavassistProxyFactory 是默認的invoker工廠。dubbo利用javassist動態創建了Class對應的Wrapper對象,動態生成的Wrapper類改寫invokeMethod方法,其內部會被改寫為根據接口方法名和參數直接調用ref對應名字的方法,避免通過Jdk的反射調用方法帶來的性能問題。

JavassistProxyFactory#getInvoker方法如下,可以看到返回的AbstractProxyInvoker實例重寫了doInvoke方法,而AbstractProxyInvoker#invoke方法內部將會調用doInvoke方法,在doInvoke方法中,內部實際調用的wrapper#invokeMethod方法,該方法中將會通過方法名字符串去調用對應真實接口實現的方法,而不是反射查找真實接口實現的方法去調用,提升了調用速度。

/*** JavassistProxyFactory的方法* <p>* 將ref、interfaceClass、url包裝成一個Invoker代理對象** @param proxy 被代理的實例* @param type  代理接口Class* @param url   服務url,可能是基于injvm協議的服務url(本地導出),也可能是追加了export=url參數的注冊中心url(遠程導出),還可能是原始的服務url(直連導出)* @return 一個Invoker可執行體實例*/
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {try {// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'//基于javassist動態創建了Class對應的Wrapper對象,動態生成的Wrapper類改寫invokeMethod方法,其內部會被改寫為根據接口方法名和參數直接調用ref對應名字的方法//這樣后續調用方法時,就可以避免Jdk動態代理中通過反射調用方法帶來的性能問題final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);//創建一個AbstractProxyInvoker的匿名實例return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {//調用wrapper的invokeMethod方法return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};} catch (Throwable fromJavassist) {// try fall back to JDK proxy factorytry {Invoker<T> invoker = jdkProxyFactory.getInvoker(proxy, type, url);logger.error("Failed to generate invoker by Javassist failed. Fallback to use JDK proxy success. " +"Interfaces: " + type, fromJavassist);// log out errorreturn invoker;} catch (Throwable fromJdk) {logger.error("Failed to generate invoker by Javassist failed. Fallback to use JDK proxy is also failed. " +"Interfaces: " + type + " Javassist Error.", fromJavassist);logger.error("Failed to generate invoker by Javassist failed. Fallback to use JDK proxy is also failed. " +"Interfaces: " + type + " JDK Error.", fromJdk);throw fromJavassist;}}
}

總結

本次我們學習了provider接收并處理consumer遠程rpc請求的流程,大概的流程如下:

服務端接受請求之后會反序列化(解碼)請求參數并通過參數找到之前暴露存儲的exporterMap緩存中獲取對應的exporter,然后獲取exporter中的invoker,而invoker內部持有真正的業務服務實現類對象,最終會調用真正的實現類對象的方法,獲取結果后會組裝響應并序列化并返回,這個響應的id和對應的請求的id是一樣的。

對應的方法調用棧為,IO線程調用棧:

  1. NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent)
  2. AbstractPeer#received(Channel, Object)
  3. MultiMessageHandler#received(Channel, Object)
  4. HeartbeatHandler#received(Channel, Object)
  5. AllChannelHandler#received(Channel, Object) – 業務線程池執行線程任務

業務線程調用棧:

  1. ChannelEventRunnable#run()
  2. DecodeHandler#received(Channel, Object)
  3. HeaderExchangeHandler#received(Channel, Object)
  4. HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
  5. DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
  6. CallbackRegistrationInvoker#invoke(Invocation) --各種Filter
  7. InvokerWrapper#invoke(Invocation)
  8. DelegateProviderMetaDataInvoker#invoke(Invocation)
  9. AbstractProxyInvoker#invoke(Invocation)
  10. AbstractProxyInvoker#doInvoke(Invocation) --基于javassist創建的invoker
  11. wrapper#invokeMethod(Object, String, Class[], Object[])
  12. xxxServiceImpl#xxx() --業務方法調用

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/94290.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/94290.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/94290.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

每日一leetcode:移動零

目錄 解題過程: 描述: 分析條件: 解題思路: 通過這道題可以學到什么: 解題過程: 描述: 給定一個數組 nums&#xff0c;編寫一個函數將所有 0 移動到數組的末尾&#xff0c;同時保持非零元素的相對順序。 請注意 &#xff0c;必須在不復制數組的情況下原地對數組進行操…

6-Django項目實戰-[dtoken]-用戶登錄模塊

1.創建應用 python manage.py startapp dtoken 2.注冊應用 settings.py中注冊 3.匹配路由4.編寫登錄功能視圖函數 import hashlib import json import timeimport jwt from django.conf import settings from django.http import JsonResponse from user.models import UserPro…

Axure日期日歷高保真動態交互原型

在數字化產品設計中&#xff0c;日期日歷組件作為高頻交互元素&#xff0c;其功能完整性與用戶體驗直接影響著用戶對產品的信任度。本次帶來的日期日歷高保真動態交互原型&#xff0c;依照Element UI、View UI等主流前端框架為參考&#xff0c;通過動態面板、中繼器、函數、交互…

【YOLOv4】

YOLOv4 論文地址&#xff1a;&#xff1a;【https://arxiv.org/pdf/2004.10934】 YOLOv4 論文中文翻譯地址&#xff1a;【深度學習論文閱讀目標檢測篇&#xff08;七&#xff09;中文版&#xff1a;YOLOv4《Optimal Speed and Accuracy of Object Detection》-CSDN博客】 yol…

【秋招筆試】2025.08.03蝦皮秋招筆試-第一題

?? 點擊直達筆試專欄 ??《大廠筆試突圍》 ?? 春秋招筆試突圍在線OJ ?? 筆試突圍在線刷題 bishipass.com 01. 蛋糕切分的最大收益 問題描述 K小姐經營著一家甜品店,今天她有一塊長度為 n n n 厘米的長條蛋糕需要切分。根據店里的規定,她必須將蛋糕切成至少 2 2

2.0 vue工程項目的創建

前提準備.需要電腦上已經安裝了nodejs 參考 7.nodejs和npm簡單使用_npmjs官網-CSDN博客 創建vue2工程 全局安裝 Vue CLI 在終端中運行以下命令來全局安裝 Vue CLI&#xff1a; npm install -g vue/cli npm install -g 表示全局安裝。vue/cli 是 Vue CLI 的包名。 安裝完成后…

視覺圖像處理中級篇 [2]—— 外觀檢查 / 傷痕模式的原理與優化設置方法

外觀缺陷檢測是工業生產中的關鍵環節&#xff0c;而傷痕模式作為圖像處理的核心算法&#xff0c;能精準識別工件表面的劃痕、污跡等缺陷。掌握其原理和優化方法&#xff0c;對提升檢測效率至關重要。一、利用傷痕模式進行外觀檢查雖然總稱為外觀檢查&#xff0c;但根據檢查對象…

ethtool,lspci,iperf工具常用命令總結

ethtool、lspci 和 iperf 是 Linux 系統中進行網絡硬件查看、配置和性能測試的核心命令行工具。下面是它們的常用命令分析和總結&#xff1a; 核心作用總結&#xff1a; lspci: 偵察兵 - 列出系統所有 PCI/PCIe 總線上的硬件設備信息&#xff0c;主要用于識別網卡型號、制造商、…

DAY10DAY11-新世紀DL(DeepLearning/深度學習)戰士:序

本文參考視頻[雙語字幕]吳恩達深度學習deeplearning.ai_嗶哩嗶哩_bilibili 參考文章0.0 目錄-深度學習第一課《神經網絡與深度學習》-Stanford吳恩達教授-CSDN博客 1深度學習概論 1.舉例介紹 lg房價預測&#xff1a;房價與面積之間的坐標關系如圖所示&#xff0c;由線性回歸…

flutter release調試插件

chucker_flutter &#xff08;只有網絡請求的信息&#xff0c;親測可以用&#xff09; flutter&#xff1a;3.24.3 使用版本 chucker_flutter: 1.8.2 chucker_flutter | Flutter package void main() async {// 可以控制顯示ChuckerFlutter.showNotification false;ChuckerF…

基于開源鏈動2+1模式AI智能名片S2B2C商城小程序的私域流量拉新策略研究

摘要&#xff1a;私域流量運營已成為企業數字化轉型的核心戰略&#xff0c;其本質是通過精細化用戶運營實現流量價值最大化。本文以“定位、拉新、養熟、成交、裂變、留存”全鏈路為框架&#xff0c;聚焦開源鏈動21模式、AI智能名片與S2B2C商城小程序的協同創新&#xff0c;揭示…

華為云云服務高級顧問葉正暉:華為對多模態大模型的思考與實踐

嘉賓介紹&#xff1a;葉正暉&#xff0c;華為云云服務高級顧問&#xff0c;全球化企業信息化專家&#xff0c;從業年限超過23年&#xff0c;在華為任職超過21年&#xff0c;涉及運營商、企業、消費者、云服務、安全與隱私等領域&#xff0c;精通云服務、安全合規、隱私保護等領…

【機器學習(二)】KNN算法與模型評估調優

目錄 一、寫在前面的話 二、KNN&#xff08;K-Nearest Neighbor&#xff09; 2.1 KNN算法介紹 2.1.1 概念介紹 2.1.2 算法特點 2.1.3 API 講解 2.2 樣本距離計算 2.2.1 距離的類型 &#xff08;1&#xff09;歐幾里得距離&#xff08;Euclidean Distance&#xff09; …

《Uniapp-Vue 3-TS 實戰開發》實現自定義頭部導航欄

本文介紹了如何將Vue2組件遷移至Vue3的組合式API。主要內容包括:1) 使用<script setup lang="ts">語法;2) 通過接口定義props類型約束;3) 用defineProps替代props選項;4) 將data變量轉為ref響應式變量;5) 使用computed替代計算屬性;6) 將created生命周期…

GitCode疑難問題診療

問題診斷與解決框架通用問題排查流程&#xff08;適用于大多數場景&#xff09; 版本兼容性驗證方法 網絡連接與權限檢查清單常見錯誤分類與解決方案倉庫克隆失敗場景分析 HTTP/SSH協議錯誤代碼解讀 403/404錯誤深層原因排查高級疑難問題處理分支合并沖突的深度解決 .gitignore…

告別物業思維:科技正重構產業園區的價值坐標系

文 | 方寸控股引言&#xff1a;當產業園區的競爭升維為“科技軍備競賽”&#xff0c;土地紅利消退&#xff0c;政策優勢趨同&#xff0c;傳統園區運營陷入增長困局。當招商團隊還在用Excel統計企業需求&#xff0c;當能耗管理依賴保安夜間巡檢&#xff0c;當企業服務停留在“修…

GitHub 熱門項目 PandaWiki:零門檻搭建智能漏洞庫,支持 10 + 大模型接入

轉自&#xff1a;Khan安全團隊你還沒有自己的漏洞庫嗎&#xff1f;一條命令教你搭建。PandaWiki 是一款 AI 大模型驅動的開源知識庫搭建系統&#xff0c;幫助你快速構建智能化的 產品文檔、技術文檔、FAQ、博客系統&#xff0c;借助大模型的力量為你提供 AI 創作、AI 問答、AI …

Python 程序設計講義(55):Python 的函數——函數的參數

Python 程序設計講義&#xff08;55&#xff09;&#xff1a;Python 的函數——函數的參數 目錄Python 程序設計講義&#xff08;55&#xff09;&#xff1a;Python 的函數——函數的參數一、聲明形參二、傳遞實參&#xff08;位置參數&#xff09;1、在調用函數進行傳遞參數時…

機器學習sklearn:支持向量機svm

概述&#xff1a;現在就只知道這個svm可以畫出決策邊界&#xff0c;對數據的劃分。簡單舉例就是&#xff1a;好的和壞的數據分開&#xff0c;中間的再驗證from sklearn.datasets import make_blobs from sklearn.svm import SVC import matplotlib.pyplot as plt import numpy …

Django ORM 詳解:從入門到進階,掌握數據庫操作的核心

在 Django 中&#xff0c;ORM&#xff08;Object Relational Mapping&#xff0c;對象關系映射&#xff09; 是框架的核心之一。它允許你使用 Python 代碼來操作數據庫&#xff0c;而無需編寫原始 SQL 語句。Django ORM 不僅簡化了數據庫操作&#xff0c;還提升了代碼的可讀性、…