基于Dubbo 3.1,詳細介紹了Dubbo Provider處理服務調用請求源碼
上文我們學習了,Dubbo消息的編碼解的源碼。現在我們來學習一下Dubbo Provider處理服務調用請求源碼。
當前consumer發起了rpc請求,經過請求編碼之后到達provider端,在經過InternalDecoder#decode解碼請求解碼之后,后續調用鏈路為:InternalDecoder、IdleStateHandler#channelRead、NettyServerHandler#channelRead, NettyServerHandler#channelRead方法將會進行后續業務處理。
Dubbo 3.x服務調用源碼:
- Dubbo 3.x源碼(29)—Dubbo Consumer服務調用源碼(1)服務調用入口
- Dubbo 3.x源碼(30)—Dubbo Consumer服務調用源碼(2)發起遠程調用
- Dubbo 3.x源碼(31)—Dubbo消息的編碼解碼
- Dubbo 3.x源碼(32)—Dubbo Provider處理服務調用請求源碼
Dubbo 3.x服務引用源碼:
- Dubbo 3.x源碼(11)—Dubbo服務的發布與引用的入口
- Dubbo 3.x源碼(18)—Dubbo服務引用源碼(1)
- Dubbo 3.x源碼(19)—Dubbo服務引用源碼(2)
- Dubbo 3.x源碼(20)—Dubbo服務引用源碼(3)
- Dubbo 3.x源碼(21)—Dubbo服務引用源碼(4)
- Dubbo 3.x源碼(22)—Dubbo服務引用源碼(5)服務引用bean的獲取以及懶加載原理
- Dubbo 3.x源碼(23)—Dubbo服務引用源碼(6)MigrationRuleListener遷移規則監聽器
- Dubbo 3.x源碼(24)—Dubbo服務引用源碼(7)接口級服務發現訂閱refreshInterfaceInvoker
- Dubbo 3.x源碼(25)—Dubbo服務引用源碼(8)notify訂閱服務通知更新
- Dubbo 3.x源碼(26)—Dubbo服務引用源碼(9)應用級服務發現訂閱refreshServiceDiscoveryInvoker
- Dubbo 3.x源碼(27)—Dubbo服務引用源碼(10)subscribeURLs訂閱應用級服務url
Dubbo 3.x服務發布源碼:
- Dubbo 3.x源碼(11)—Dubbo服務的發布與引用的入口
- Dubbo 3.x源碼(12)—Dubbo服務發布導出源碼(1)
- Dubbo 3.x源碼(13)—Dubbo服務發布導出源碼(2)
- Dubbo 3.x源碼(14)—Dubbo服務發布導出源碼(3)
- Dubbo 3.x源碼(15)—Dubbo服務發布導出源碼(4)
- Dubbo 3.x源碼(16)—Dubbo服務發布導出源碼(5)
- Dubbo 3.x源碼(17)—Dubbo服務發布導出源碼(6)
- Dubbo 3.x源碼(28)—Dubbo服務發布導出源碼(7)應用級服務接口元數據發布
文章目錄
- NettyServerHandler#channelRead業務處理入口
- AbstractPeer#received接收請求
- MultiMessageHandler#received處理多個消息
- HeartbeatHandler處理心跳消息
- AllChannelHandler#received分發任務
- getPreferredExecutorService獲取首選線程池
- ChannelEventRunnable#run業務線程執行
- DecodeHandler#received解碼消息體
- HeaderExchangeHandler#received處理消息
- HeaderExchangeHandler#handleRequest處理請求
- DubboProtocol.requestHandler#reply真實方法調用
- DubboProtocol#getInvoker獲取invoker
- CallbackRegistrationInvoker#invoke過濾器鏈調用
- AbstractProxyInvoker#invoke業務實現接口調用
- doInvoke執行業務調用
- 總結
NettyServerHandler#channelRead業務處理入口
該方法內部根據netty NioSocketChannel獲取對應的dubbo NettyChannel,然后通過handler#received方法處理消息。關于這個handler,我們在服務的導出時就說過了,這實際上時很多個handler的嵌套對象,而最外層的就是在這里的handler,即NettyServer。
此時調用線程還是IO線程。
/*** NettyServerHandler的方法* <p>* 請求業務處理入口** @param ctx DefaultChannelHandlerContext* @param msg Request* @throws Exception*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//根據netty NioSocketChannel獲取對應的dubbo NettyChannelNettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);/** 通過NettyServer#received方法處理消息*/handler.received(channel, msg);// trigger qos handlerctx.fireChannelRead(msg);
}
AbstractPeer#received接收請求
NettyServer#received接收請求的方法由其父類AbstractPeer#received實現,該方法內部又委托MultiMessageHandler#received方法實現。
此時調用線程還是IO線程。
/*** AbstractPeer的方法* * @param ch NettyChannel* @param msg 消息*/
@Override
public void received(Channel ch, Object msg) throws RemotingException {if (closed) {return;}//調用MultiMessageHandler#received方法handler.received(ch, msg);
}
MultiMessageHandler#received處理多個消息
MultiMessageHandler#received方法主要是處理同時接收到多條消息的情況。如果msg屬于MultiMessage,那么循環調用下層handler#received方法,否則直接調用下層handler#received方法,下層handler是HeartbeatHandler。
此時調用線程還是IO線程。
/*** MultiMessageHandler的方法* <p>* 處理多消息的情況** @param channel NettyChannel* @param message 消息*/
@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {//如果是多消息類型,即包含多條消息if (message instanceof MultiMessage) {MultiMessage list = (MultiMessage) message;//循環處理多條消息for (Object obj : list) {try {//調用下層HeartbeatHandler#received方法handler.received(channel, obj);} catch (Throwable t) {logger.error("MultiMessageHandler received fail.", t);try {handler.caught(channel, t);} catch (Throwable t1) {logger.error("MultiMessageHandler caught fail.", t1);}}}} else {//直接調用下層HeartbeatHandler#received方法handler.received(channel, message);}
}
HeartbeatHandler處理心跳消息
該方法專門用于處理心跳請求和響應,對于心跳消息,那么這里處理了之后直接return了,不再進行下層handler處理,而普通rpc請求則繼續調用下層AllChannelHandler#received方法處理。
此時調用線程還是IO線程。
/*** HeartbeatHandler的方法* <p>* 處理心跳請求** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//為NettyChannel設置READ_TIMESTAMP屬性,置為當前時間戳setReadTimestamp(channel);//是否是心跳請求if (isHeartbeatRequest(message)) {Request req = (Request) message;//如果是雙向請求if (req.isTwoWay()) {//那么直接創建一個Response返回Response res = new Response(req.getId(), req.getVersion());res.setEvent(HEARTBEAT_EVENT);//直接發送響應channel.send(res);if (logger.isDebugEnabled()) {int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()+ ", cause: The channel has no data-transmission exceeds a heartbeat period"+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));}}//直接返回,不用后續處理return;}//如果是心跳響應,直接返回,不用后續處理if (isHeartbeatResponse(message)) {if (logger.isDebugEnabled()) {logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());}return;}//調用下層AllChannelHandler#received方法handler.received(channel, message);
}
AllChannelHandler#received分發任務
將當前消息包裝為一個ChannelEventRunnable分發給對應的線程池執行,這里的線程池就是dubbo業務線程池,到此IO線程的任務結束。
這種方式實現了線程資源的隔離,釋放了IO線程,可以快速處理更多的IO操作,提升了系統吞吐量。
實際上這對應的就是AllDispatcher的線程模型,也是Dubbo的默認線程模型,即所有消息都派發到業務線程池,包括建立連接CONNECTED、關閉連接DISCONNECTED、接受請求RECEIVED、處理異常CAUGHT、發送響應SENT等等,響應消息會優先使用對于請求所使用的線程池。這是默認策略。
/*** AllChannelHandler的方法* <p>* 處理普通rpc請求請求** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//獲取對應的線程池,可能是ThreadlessExecutorExecutorService executor = getPreferredExecutorService(message);try {//創建一個線程任務ChannelEventRunnable,通過線程池執行//這里的handler是DecodeHandlerexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if (message instanceof Request && t instanceof RejectedExecutionException) {sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}
getPreferredExecutorService獲取首選線程池
該方法獲取處理業務的首選線程池,目前這種方法主要是為了方便消費者端的線程模型而定制的。這是Dubo2.7.5之后的線程模型的優化:
- 對于響應消息,那么從DefaultFuture的靜態字段緩存映射FUTURES中獲取請求id對應的DefaultFuture。請求的id就是對應的響應的id。
1. 然后獲取執DefaultFuture的執行器,對于默認的同步請求,那自然是ThreadlessExecutor,這里面阻塞著發起同步調用的線程,將回調直接委派給發起調用的線程。對于異步請求,則獲取異步請求的線程池。
2. 因此后續的處理包括響應體的解碼都是由發起調用的線程來執行,這樣減輕了業務線程池的壓力。后面我們講消費者接受響應的時候,會講解對應的源碼。 - 對于請求消息,則使用共享executor執行后續邏輯。對于共享線程池,默認為FixedThreadPool,固定200線程,阻塞隊列長度為0,拒絕策略為打印異常日志并且拋出異常。
/*** WrappedChannelHandler的方法* <p>* 目前,這種方法主要是為了方便消費者端的線程模型而定制的。* 1. 使用ThreadlessExecutor,又名,將回調直接委派給發起調用的線程。* 2. 使用共享executor執行回調。** @param msg 消息* @return 執行器*/
public ExecutorService getPreferredExecutorService(Object msg) {//如果是響應消息if (msg instanceof Response) {Response response = (Response) msg;//從DefaultFuture的靜態字段緩存映射FUTURES中獲取請求id對應的DefaultFutureDefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());// a typical scenario is the response returned after timeout, the timeout response may have completed the future//一個典型的場景是響應超時后返回,超時后的響應可能已經完成if (responseFuture == null) {//獲取當前服務器或客戶端的共享執行器return getSharedExecutorService();} else {//獲取執future的執行器,對于默認的同步請i去,那自然是ThreadlessExecutor,這里面阻塞著發起同步調用的線程ExecutorService executor = responseFuture.getExecutor();if (executor == null || executor.isShutdown()) {//獲取當前服務器或客戶端的共享執行器executor = getSharedExecutorService();}return executor;}} else {//獲取當前服務器或客戶端的共享執行器return getSharedExecutorService();}
}/*** get the shared executor for current Server or Client** @return*/
public ExecutorService getSharedExecutorService() {// Application may be destroyed before channel disconnected, avoid create new application model// see https://github.com/apache/dubbo/issues/9127//在斷開通道之前,應用程序可能被銷毀,避免創建新的應用程序模型if (url.getApplicationModel() == null || url.getApplicationModel().isDestroyed()) {return GlobalResourcesRepository.getGlobalExecutorService();}// note: url.getOrDefaultApplicationModel() may create new application modelApplicationModel applicationModel = url.getOrDefaultApplicationModel();ExecutorRepository executorRepository =applicationModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();//從執行器倉庫中根據url獲取對應的執行器,默認INTERNAL_SERVICE_EXECUTOR對應的執行器ExecutorService executor = executorRepository.getExecutor(url);if (executor == null) {executor = executorRepository.createExecutorIfAbsent(url);}return executor;
}
ChannelEventRunnable#run業務線程執行
ChannelEventRunnable#run方法就是業務線程執行的入口,構建任務的時候傳遞的handler是DecodeHandler。此時調用線程已經變成了業務線程。
該方法將會判斷通道狀態的各種狀態,并且調用handler的各種對應的方法處理。這里的handler是DecodeHandler。
@Override
public void run() {//通道狀態如果是處理消息狀態,該狀態占大多數情況if (state == ChannelState.RECEIVED) {try {/** 調用DecodeHandler#received方法*/handler.received(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}} else {//其他狀態switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is: " + message + ", exception is " + exception, e);}break;default:logger.warn("unknown state: " + state + ", message is " + message);}}}
DecodeHandler#received解碼消息體
這個DecodeHandler主要是對于請求消息或者響應消息的消息體進行解碼,對于請求消息(request.mData),這一步將會解碼出調用的方法名,方法參數類型,傳遞的方法參數,attachments等信息,隨后調用下層HeaderExchangeHandler#received方法。
此前的InternalDecoder#decode方法解碼了Reuqest,但是對于內部的data并沒有解碼,而是留給了業務線程在DecodeHandler中去操作,這樣可以盡量快速釋放IO線程。
/*** DecodeHandler的方法** 消息體的解碼** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//Decodeable類型,直接處理if (message instanceof Decodeable) {decode(message);}//請求消息if (message instanceof Request) {decode(((Request) message).getData());}//響應消息if (message instanceof Response) {decode(((Response) message).getResult());}//調用下層HeaderExchangeHandler#received方法handler.received(channel, message);
}
HeaderExchangeHandler#received處理消息
HeaderExchangeHandler#received方法對于消息進行分類并調用不同的方法繼續處理。
對于請求消息,如果是雙向消息,那么調用handleRequest方法繼續處理,將會創建Response對象,然后調用dubboProtocol.requestHandler完成請求處理獲取結果,并將結果封裝到Response中后返回給客戶端。如果是單向消息則僅僅調用dubboProtocol.requestHandler完成請求處理即可。
對于響應消息,將會調用DefaultFuture.received方法處理,此時就會根據響應id獲取對應的DefaultFuture,將響應結果設置進去。這里的源碼我們后面將consumer獲取響應結果的時候再講解。
/*** HeaderExchangeHandler的方法* <p>* 分類處理消息** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);//請求消息if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {//處理事件消息handlerEvent(channel, request);} else {if (request.isTwoWay()) {//額外處理雙向消息handleRequest(exchangeChannel, request);} else {//處理單向消息,直接調用下層DubboProtocol.requestHandler#received方法handler.received(exchangeChannel, request.getData());}}}//響應消息else if (message instanceof Response) {handleResponse(channel, (Response) message);}//字符串else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (StringUtils.isNotEmpty(echo)) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}
}
HeaderExchangeHandler#handleRequest處理請求
該方法用于處理雙向請求,也就是普通rpc請求。將會創建Response對象,然后調用dubboProtocol.requestHandler完成請求處理獲取結果,并將結果封裝到Response中后返回給客戶端。
/*** HeaderExchangeHandler的方法* <p>* 處理雙向rpc請求消息** @param channel NettyChannel* @param req 請求消息*/
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {//創建一個Response響應對象//id是請求id,這樣當響應返回之后,請求端便能找到對應的DefaultFuture進行后續結果的封裝Response res = new Response(req.getId(), req.getVersion());//如果是損壞的請求,則直接返回錯誤響應,通常解碼失敗這樣返回if (req.isBroken()) {Object data = req.getData();String msg;if (data == null) {msg = null;} else if (data instanceof Throwable) {msg = StringUtils.toString((Throwable) data);} else {msg = data.toString();}res.setErrorMessage("Fail to decode request due to: " + msg);res.setStatus(Response.BAD_REQUEST);channel.send(res);return;}// find handler by message class.//獲取請求體 DecodeableRpcInvocationObject msg = req.getData();try {/** 異步調用下層handler DubboProtocol.requestHandler#reply方法* DubboProtocol.requestHandler是DubboProtocol中的一個匿名內部類實現*/CompletionStage<Object> future = handler.reply(channel, msg);/** 阻塞等待直到reply調用完成,則將結果設置到Response中,然后調用channel.send方法將調用結果返回給服務消費端*/future.whenComplete((appResult, t) -> {try {if (t == null) {res.setStatus(Response.OK);res.setResult(appResult);} else {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(t));}//將調用結果返回給服務消費端channel.send(res);} catch (RemotingException e) {logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);}});} catch (Throwable e) {//調用異常處理,設置SERVICE_ERROR狀態碼res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));//將異常調用結果返回給服務消費端channel.send(res);}
}
DubboProtocol.requestHandler#reply真實方法調用
該方法首先根據請求參數從DubboProtocol的exporterMap緩存中獲取對應的exporter,然后獲取exporter中的invoker,然后調用invoker#invoke方法對于真實的方法實現進行調用,然后返回調用結果。
這里獲取的Invoker,實際上是FilterChainBuilder的內部類CallbackRegistrationInvoker,我們在前面學習consumer發起服務調用請求的時候,實際上也是走的這個方法。可見dubbo在方法的復用的設計上還是很牛的。
CallbackRegistrationInvoker實際上內部持有是一個過濾器鏈,調用它的invoke方法,會觸發一系列過濾器filter的的調用,執行過濾操作。
/*** DubboProtocol.ExchangeHandler的匿名內部類實現*/
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {if (!(message instanceof Invocation)) {throw new RemotingException(channel, "Unsupported request: "+ (message == null ? null : (message.getClass().getName() + ": " + message))+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());}//轉換為調用抽象Invocation inv = (Invocation) message;//獲取對應的服務提供者invoker實例Invoker<?> invoker = getInvoker(channel, inv);inv.setServiceModel(invoker.getUrl().getServiceModel());// switch TCCLif (invoker.getUrl().getServiceModel() != null) {Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());}// need to consider backward-compatibility if it's a callback//如果是回調方法,需要考慮向后兼容,驗回調方法是否存在if (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {String methodsStr = invoker.getUrl().getParameters().get("methods");boolean hasMethod = false;if (methodsStr == null || !methodsStr.contains(",")) {hasMethod = inv.getMethodName().equals(methodsStr);} else {String[] methods = methodsStr.split(",");for (String method : methods) {if (inv.getMethodName().equals(method)) {hasMethod = true;break;}}}if (!hasMethod) {logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()+ " not found in callback service interface ,invoke will be ignored."+ " please update the api interface. url is:"+ invoker.getUrl()) + " ,invocation is :" + inv);return null;}}//設置調用端地址RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());/** 基于invoker調用invoke方法,內部將會對服務接口的真實實現進行調用并獲取結果*/Result result = invoker.invoke(inv);//同步等待結果并將結果封裝為CompletableFuture返回return result.thenApply(Function.identity());
}
DubboProtocol#getInvoker獲取invoker
該方法獲取從調用端傳遞的RpcInvocation中獲取各種信息,然后組裝成serviceKey:{group}/{servicePath}:{version}:{port},隨后從DubboProtocol的exporterMap緩存中根據serviceKey獲取對應的exporter,然后獲取exporter中的invoker返回。
我們在此前學習dubbo服務導出的時候,就學習了這個exporterMap,其內部存放著serviecKey到對應的exporter的實現。現在,在服務調用的時候,就用到了這個緩存,這樣一下就和之前學習的內容對應上了。
/*** DubboProtocol的方法* * @param channel HeaderExchangeChannel* @param inv 調用抽象,RpcInvocation* @return invoker*/
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {boolean isCallBackServiceInvoke;boolean isStubServiceInvoke;//端口號int port = channel.getLocalAddress().getPort();//獲取path,服務接口String path = (String) inv.getObjectAttachmentWithoutConvert(PATH_KEY);//if it's stub service on client side(after enable stubevent, usually is set up onconnect or ondisconnect method)//客戶端的存根服務isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(STUB_EVENT_KEY));if (isStubServiceInvoke) {//when a stub service export to local, it usually can't be exposed to portport = 0;}// if it's callback service on client side//如果是客戶端的回調服務isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;if (isCallBackServiceInvoke) {path += "." + inv.getObjectAttachmentWithoutConvert(CALLBACK_SERVICE_KEY);inv.setObjectAttachment(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());}//構建服務key,{group}/{servicePath}:{version}:{port}String serviceKey = serviceKey(port,path,(String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),(String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY));//從DubboProtocol的exporterMap緩存中根據serviceKey獲取對應的exporterDubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);if (exporter == null) {throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv));}//獲取exporter中的invokerreturn exporter.getInvoker();
}
CallbackRegistrationInvoker#invoke過濾器鏈調用
CallbackRegistrationInvoker#invoke方法用于添加一個回調,它可以在RPC調用完成時觸發,在這回調函數將會倒序執行filters中的過濾器。
/*** FilterChainBuilder的內部類CallbackRegistrationInvoker的方法*/
@Override
public Result invoke(Invocation invocation) throws RpcException {/** 繼續調用下層invoker#invoke*/Result asyncResult = filterInvoker.invoke(invocation);//添加一個回調,它可以在RPC調用完成時觸發。asyncResult.whenCompleteWithContext((r, t) -> {RuntimeException filterRuntimeException = null;//過濾器倒序執行for (int i = filters.size() - 1; i >= 0; i--) {FILTER filter = filters.get(i);try {InvocationProfilerUtils.releaseDetailProfiler(invocation);if (filter instanceof ListenableFilter) {//執行過濾器ListenableFilter listenableFilter = ((ListenableFilter) filter);Filter.Listener listener = listenableFilter.listener(invocation);try {if (listener != null) {if (t == null) {listener.onResponse(r, filterInvoker, invocation);} else {listener.onError(t, filterInvoker, invocation);}}} finally {listenableFilter.removeListener(invocation);}} else if (filter instanceof FILTER.Listener) {FILTER.Listener listener = (FILTER.Listener) filter;if (t == null) {listener.onResponse(r, filterInvoker, invocation);} else {listener.onError(t, filterInvoker, invocation);}}} catch (RuntimeException runtimeException) {LOGGER.error(String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName()));if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList())));}filterRuntimeException = runtimeException;t = runtimeException;}}if (filterRuntimeException != null) {throw filterRuntimeException;}});return asyncResult;
}/*** FilterChainBuilder#CopyOfFilterChainNode的方法*/
@Override
public Result invoke(Invocation invocation) throws RpcException {Result asyncResult;try {//進入細節分析器InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Filter " + filter.getClass().getName() + " invoke.");/** 執行過濾器操作*/asyncResult = filter.invoke(nextNode, invocation);} catch (Exception e) {InvocationProfilerUtils.releaseDetailProfiler(invocation);if (filter instanceof ListenableFilter) {ListenableFilter listenableFilter = ((ListenableFilter) filter);try {Filter.Listener listener = listenableFilter.listener(invocation);if (listener != null) {listener.onError(e, originalInvoker, invocation);}} finally {listenableFilter.removeListener(invocation);}} else if (filter instanceof FILTER.Listener) {FILTER.Listener listener = (FILTER.Listener) filter;listener.onError(e, originalInvoker, invocation);}throw e;} finally {}return asyncResult;
}
過濾器鏈的盡頭就是真實invoker,invoke方法經過的過濾器filter如下:ContextFilter、ProfilerServerFilter、EchoFilter、ClassLoaderFilter、GenericFilter、ExceptionFilter、MonitorFilter、TimeoutFilter、TraceFilter、ClassLoaderCallbackFilter、InvokerWrapper、DelegateProviderMetaDataInvoker、AbstractProxyInvoker。
AbstractProxyInvoker#invoke業務實現接口調用
AbstractProxyInvoker#invoke方法內部將會調用doInvoke方法,該方法會調用業務接口,最終統一個返回AsyncRpcResult。
/*** AbstractProxyInvoker的方法* <p>* 執行業務接口實現** @param invocation 調用抽象*/
@Override
public Result invoke(Invocation invocation) throws RpcException {try {ProfilerEntry originEntry = null;//性能分析if (ProfilerSwitch.isEnableSimpleProfiler()) {Object fromInvocation = invocation.get(Profiler.PROFILER_KEY);if (fromInvocation instanceof ProfilerEntry) {ProfilerEntry profiler = Profiler.enter((ProfilerEntry) fromInvocation, "Receive request. Server biz impl invoke begin.");invocation.put(Profiler.PROFILER_KEY, profiler);originEntry = Profiler.setToBizProfiler(profiler);}}/** 執行doInvoke,該方法由子類實現*/Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());//性能分析if (ProfilerSwitch.isEnableSimpleProfiler()) {Object fromInvocation = invocation.get(Profiler.PROFILER_KEY);if (fromInvocation instanceof ProfilerEntry) {ProfilerEntry profiler = Profiler.release((ProfilerEntry) fromInvocation);invocation.put(Profiler.PROFILER_KEY, profiler);}}Profiler.removeBizProfiler();if (originEntry != null) {Profiler.setToBizProfiler(originEntry);}//將返回結果轉換為CompletableFutureCompletableFuture<Object> future = wrapWithFuture(value, invocation);//轉換為AppResponse的CompletableFutureCompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {//創建AppResponseAppResponse result = new AppResponse(invocation);if (t != null) {if (t instanceof CompletionException) {result.setException(t.getCause());} else {result.setException(t);}} else {result.setValue(obj);}return result;});//統一返回AsyncRpcResultreturn new AsyncRpcResult(appResponseFuture, invocation);} catch (InvocationTargetException e) {if (RpcContext.getServiceContext().isAsyncStarted() && !RpcContext.getServiceContext().stopAsync()) {logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);}return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);} catch (Throwable e) {throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);}
}
doInvoke執行業務調用
到這一步,實際上以及調用到了基于JavassistProxyFactory#getInvoker或者JdkProxyFactory#getInvoker方法創建的AbstractProxyInvoker匿名實現類實例。我們在學習Dubbo服務導出的時候就說過了,這就是最底層的Invoker,其內部包含了真實的服務實現。
JavassistProxyFactory 是默認的invoker工廠。dubbo利用javassist動態創建了Class對應的Wrapper對象,動態生成的Wrapper類改寫invokeMethod方法,其內部會被改寫為根據接口方法名和參數直接調用ref對應名字的方法,避免通過Jdk的反射調用方法帶來的性能問題。
JavassistProxyFactory#getInvoker方法如下,可以看到返回的AbstractProxyInvoker實例重寫了doInvoke方法,而AbstractProxyInvoker#invoke方法內部將會調用doInvoke方法,在doInvoke方法中,內部實際調用的wrapper#invokeMethod方法,該方法中將會通過方法名字符串去調用對應真實接口實現的方法,而不是反射查找真實接口實現的方法去調用,提升了調用速度。
/*** JavassistProxyFactory的方法* <p>* 將ref、interfaceClass、url包裝成一個Invoker代理對象** @param proxy 被代理的實例* @param type 代理接口Class* @param url 服務url,可能是基于injvm協議的服務url(本地導出),也可能是追加了export=url參數的注冊中心url(遠程導出),還可能是原始的服務url(直連導出)* @return 一個Invoker可執行體實例*/
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {try {// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'//基于javassist動態創建了Class對應的Wrapper對象,動態生成的Wrapper類改寫invokeMethod方法,其內部會被改寫為根據接口方法名和參數直接調用ref對應名字的方法//這樣后續調用方法時,就可以避免Jdk動態代理中通過反射調用方法帶來的性能問題final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);//創建一個AbstractProxyInvoker的匿名實例return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {//調用wrapper的invokeMethod方法return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};} catch (Throwable fromJavassist) {// try fall back to JDK proxy factorytry {Invoker<T> invoker = jdkProxyFactory.getInvoker(proxy, type, url);logger.error("Failed to generate invoker by Javassist failed. Fallback to use JDK proxy success. " +"Interfaces: " + type, fromJavassist);// log out errorreturn invoker;} catch (Throwable fromJdk) {logger.error("Failed to generate invoker by Javassist failed. Fallback to use JDK proxy is also failed. " +"Interfaces: " + type + " Javassist Error.", fromJavassist);logger.error("Failed to generate invoker by Javassist failed. Fallback to use JDK proxy is also failed. " +"Interfaces: " + type + " JDK Error.", fromJdk);throw fromJavassist;}}
}
總結
本次我們學習了provider接收并處理consumer遠程rpc請求的流程,大概的流程如下:
服務端接受請求之后會反序列化(解碼)請求參數并通過參數找到之前暴露存儲的exporterMap緩存中獲取對應的exporter,然后獲取exporter中的invoker,而invoker內部持有真正的業務服務實現類對象,最終會調用真正的實現類對象的方法,獲取結果后會組裝響應并序列化并返回,這個響應的id和對應的請求的id是一樣的。
對應的方法調用棧為,IO線程調用棧:
- NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent)
- AbstractPeer#received(Channel, Object)
- MultiMessageHandler#received(Channel, Object)
- HeartbeatHandler#received(Channel, Object)
- AllChannelHandler#received(Channel, Object) – 業務線程池執行線程任務
業務線程調用棧:
- ChannelEventRunnable#run()
- DecodeHandler#received(Channel, Object)
- HeaderExchangeHandler#received(Channel, Object)
- HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
- DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
- CallbackRegistrationInvoker#invoke(Invocation) --各種Filter
- InvokerWrapper#invoke(Invocation)
- DelegateProviderMetaDataInvoker#invoke(Invocation)
- AbstractProxyInvoker#invoke(Invocation)
- AbstractProxyInvoker#doInvoke(Invocation) --基于javassist創建的invoker
- wrapper#invokeMethod(Object, String, Class[], Object[])
- xxxServiceImpl#xxx() --業務方法調用