RPC(Remote Procedure Call Protocol)遠程過程調用協議,它是一種通過網絡,從遠程計算機程序上請求服務,而不必了解底層網絡技術的協議。說的再直白一點,就是客戶端在不必知道調用細節的前提之下,調用遠程計算機上運行的某個對象,使用起來就像調用本地的對象一樣。目前典型的RPC實現框架有:Thrift(facebook開源)、Dubbo(alibaba開源)等等。RPC框架針對網絡協議、網絡I/O模型的封裝是透明的,對于調用的客戶端而言,它就認為自己在調用本地的一個對象。至于傳輸層上,運用的是TCP協議、UDP協議、亦或是HTTP協議,一概不關心。從網絡I/O模型上來看,是基于select、poll、epoll方式、還是IOCP(I/O Completion Port)方式承載實現的,對于調用者而言也不用關心。
目前,主流的RPC框架都支持跨語言調用,即有所謂的IDL(接口定義語言),其實,這個并不是RPC所必須要求的。如果你的RPC框架沒有跨語言的要求,IDL就可以不用包括了。
最后,值得一提的是,衡量一個RPC框架性能的好壞與否,RPC的網絡I/O模型的選擇,至關重要。在此基礎上,設計出來的RPC服務器,可以考慮支持阻塞式同步IO、非阻塞式同步IO、當然還有所謂的多路復用IO模型、異步IO模型。支持不同的網絡IO模型,在高并發的狀態下,處理性能上會有很大的差別。還有一個衡量的標準,就是選擇的傳輸協議。是基于TCP協議、還是HTTP協議、還是UDP協議?對性能也有一定的影響。但是從我目前了解的情況來看,大多數RPC開源實現框架都是基于TCP、或者HTTP的,目測沒有采用UDP協議做為主要的傳輸協議的。
明白了RPC的使用原理和性能要求。現在,我們能不能撇開那些RPC開源框架,自己動手開發一個高性能的RPC服務器呢?我想,還是可以的。現在本人就使用Java,基于Netty,開發實現一個高性能的RPC服務器。
如何實現、基于什么原理?并發處理性能如何?請繼續接著看下文。
我們有的時候,為了提高單個節點的通信吞吐量,提高通信性能。如果是基于Java后端的,一般首選的是NIO框架(No-block IO)。但是問題也來了,Java的NIO掌握起來要相當的技術功底,和足夠的技術積累,使用起來才能得心應手。一般的開發人員,如果要使用NIO開發一個后端的TCP/HTTP服務器,附帶考慮TCP粘包、網絡通信異常、消息鏈接處理等等網絡通信細節,開發門檻太高,所以比較明智的選擇是,采用業界主流的NIO框架進行服務器后端開發。主流的NIO框架主要有Netty、Mina。它們主要都是基于TCP通信,非阻塞的IO、靈活的IO線程池而設計的,應對高并發請求也是綽綽有余。隨著Netty、Mina這樣優秀的NIO框架,設計上日趨完善,Java后端高性能服務器開發,在技術上提供了有力的支持保障,從而打破了C++在服務器后端,一統天下的局面。因為在此之前,Java的NIO一直受人詬病,讓人敬而遠之!
既然,這個RPC服務器是基于Netty的,那就在說說Netty吧。實際上Netty是對JAVA NIO框架的再次封裝,它的開源網址是http://netty.io/,本文中使用的Netty版本是:4.0版本,可以通過http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2,進行下載使用。那也許你會問,如何使用Netty進行RPC服務器的開發呢?實際不難,下面我就簡單的說明一下技術原理:
1、定義RPC請求消息、應答消息結構,里面要包括RPC的接口定義模塊、包括遠程調用的類名、方法名稱、參數結構、參數值等信息。
2、服務端初始化的時候通過容器加載RPC接口定義和RPC接口實現類對象的映射關系,然后等待客戶端發起調用請求。
3、客戶端發起的RPC消息里面包含,遠程調用的類名、方法名稱、參數結構、參數值等信息,通過網絡,以字節流的方式送給RPC服務端,RPC服務端接收到字節流的請求之后,去對應的容器里面,查找客戶端接口映射的具體實現對象。
4、RPC服務端找到實現對象的參數信息,通過反射機制創建該對象的實例,并返回調用處理結果,最后封裝成RPC應答消息通知到客戶端。
5、客戶端通過網絡,收到字節流形式的RPC應答消息,進行拆包、解析之后,顯示遠程調用結果。
上面說的是很簡單,但是實現的時候,我們還要考慮如下的問題:
1、RPC服務器的傳輸層是基于TCP協議的,出現粘包咋辦?這樣客戶端的請求,服務端不是會解析失敗?好在Netty里面已經提供了解決TCP粘包問題的解碼器:LengthFieldBasedFrameDecoder,可以靠它輕松搞定TCP粘包問題。
2、Netty服務端的線程模型是單線程、多線程(一個線程負責客戶端連接,連接成功之后,丟給后端IO的線程池處理)、還是主從模式(客戶端連接、后端IO處理都是基于線程池的實現)。當然在這里,我出于性能考慮,使用了Netty主從線程池模型。
3、Netty的IO處理線程池,如果遇到非常耗時的業務,出現阻塞了咋辦?這樣不是很容易把后端的NIO線程給掛死、阻塞?本文的處理方式是,對于復雜的后端業務,分派到專門的業務線程池里面,進行異步回調處理。
4、RPC消息的傳輸是通過字節流在NIO的通道(Channel)之間傳輸,那具體如何實現呢?本文,是通過基于Java原生對象序列化機制的編碼、解碼器(ObjectEncoder、ObjectDecoder)進行實現的。當然出于性能考慮,這個可能不是最優的方案。更優的方案是把消息的編碼、解碼器,搞成可以配置實現的。具體比如可以通過:protobuf、JBoss?Marshalling方式進行解碼和編碼,以提高網絡消息的傳輸效率。
5、RPC服務器要考慮多線程、高并發的使用場景,所以線程安全是必須的。此外盡量不要使用synchronized進行加鎖,改用輕量級的ReentrantLock方式進行代碼塊的條件加鎖。比如本文中的RPC消息處理回調,就有這方面的使用。
6、RPC服務端的服務接口對象和服務接口實現對象要能輕易的配置,輕松進行加載、卸載。在這里,本文是通過Spring容器進行統一的對象管理。
綜上所述,本文設計的RPC服務器調用的流程圖如下所示:
? ? ?
客戶端并發發起RPC調用請求,然后RPC服務端使用Netty連接器,分派出N個NIO連接線程,這個時候Netty連接器的任務結束。然后NIO連接線程是統一放到Netty NIO處理線程池進行管理,這個線程池里面會對具體的RPC請求連接進行消息編碼、消息解碼、消息處理等等一系列操作。最后進行消息處理(Handler)的時候,處于性能考慮,這里的設計是,直接把復雜的消息處理過程,丟給專門的RPC業務處理線程池集中處理,然后Handler對應的NIO線程就立即返回、不會阻塞。這個時候RPC調用結束,客戶端會異步等待服務端消息的處理結果,本文是通過消息回調機制實現(MessageCallBack)。
再來說一說Netty對于RPC消息的解碼、編碼、處理對應的模塊和流程,具體如下圖所示:
? ?
從上圖可以看出客戶端、服務端對RPC消息編碼、解碼、處理調用的模塊以及調用順序了。Netty就是把這樣一個一個的處理器串在一起,形成一個責任鏈,統一進行調用。
說了這么多,現在先簡單看下,我設計實現的NettyRPC的代碼目錄層級結構:
? ? ?
其中newlandframework.netty.rpc.core包是NettyRPC的核心實現。newlandframework.netty.rpc.model包里面,則封裝了RPC消息請求、應答報文結構,以及RPC服務接口與實現綁定關系的容器定義。newlandframework.netty.rpc.config里面定義了NettyRPC的服務端文件配置屬性。
下面先來看下newlandframework.netty.rpc.model包中定義的內容。具體是RPC消息請求、應答消息的結構定義:
RPC請求消息結構
/*** @filename:MessageRequest.java** Newland Co. Ltd. All rights reserved.** @Description:rpc服務請求結構* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.model;import java.io.Serializable; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle;public class MessageRequest implements Serializable {private String messageId;private String className;private String methodName;private Class<?>[] typeParameters;private Object[] parametersVal;public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}public String getClassName() {return className;}public void setClassName(String className) {this.className = className;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName = methodName;}public Class<?>[] getTypeParameters() {return typeParameters;}public void setTypeParameters(Class<?>[] typeParameters) {this.typeParameters = typeParameters;}public Object[] getParameters() {return parametersVal;}public void setParameters(Object[] parametersVal) {this.parametersVal = parametersVal;}public String toString() {return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("messageId", messageId).append("className", className).append("methodName", methodName).toString();} }
RPC應答消息結構
/*** @filename:MessageResponse.java** Newland Co. Ltd. All rights reserved.** @Description:rpc服務應答結構* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.model;import java.io.Serializable; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle;public class MessageResponse implements Serializable {private String messageId;private String error;private Object resultDesc;public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}public String getError() {return error;}public void setError(String error) {this.error = error;}public Object getResult() {return resultDesc;}public void setResult(Object resultDesc) {this.resultDesc = resultDesc;}public String toString() {return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("messageId", messageId).append("error", error).toString();} }
RPC服務接口定義、服務接口實現綁定關系容器定義,提供給spring作為容器使用。
/*** @filename:MessageKeyVal.java** Newland Co. Ltd. All rights reserved.** @Description:rpc服務映射容器* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.model;import java.util.Map;public class MessageKeyVal {private Map<String, Object> messageKeyVal;public void setMessageKeyVal(Map<String, Object> messageKeyVal) {this.messageKeyVal = messageKeyVal;}public Map<String, Object> getMessageKeyVal() {return messageKeyVal;} }
好了,定義好核心模型結構之后,現在再向大家展示一下NettyRPC核心包:newlandframework.netty.rpc.core的關鍵部分實現代碼,首先是業務線程池相關類的實現代碼,具體如下:
線程工廠定義實現
/*** @filename:NamedThreadFactory.java** Newland Co. Ltd. All rights reserved.** @Description:線程工廠* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger;public class NamedThreadFactory implements ThreadFactory {private static final AtomicInteger threadNumber = new AtomicInteger(1);private final AtomicInteger mThreadNum = new AtomicInteger(1);private final String prefix;private final boolean daemoThread;private final ThreadGroup threadGroup;public NamedThreadFactory() {this("rpcserver-threadpool-" + threadNumber.getAndIncrement(), false);}public NamedThreadFactory(String prefix) {this(prefix, false);}public NamedThreadFactory(String prefix, boolean daemo) {this.prefix = prefix + "-thread-";daemoThread = daemo;SecurityManager s = System.getSecurityManager();threadGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();}public Thread newThread(Runnable runnable) {String name = prefix + mThreadNum.getAndIncrement();Thread ret = new Thread(threadGroup, runnable, name, 0);ret.setDaemon(daemoThread);return ret;}public ThreadGroup getThreadGroup() {return threadGroup;} }
業務線程池定義實現
/*** @filename:RpcThreadPool.java** Newland Co. Ltd. All rights reserved.** @Description:rpc線程池封裝* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class RpcThreadPool {//獨立出線程池主要是為了應對復雜耗I/O操作的業務,不阻塞netty的handler線程而引入//當然如果業務足夠簡單,把處理邏輯寫入netty的handler(ChannelInboundHandlerAdapter)也未嘗不可public static Executor getExecutor(int threads, int queues) {String name = "RpcThreadPool";return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>(): (queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));} }
/*** @filename:AbortPolicyWithReport.java** Newland Co. Ltd. All rights reserved.** @Description:線程池異常策略* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor;public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {private final String threadName;public AbortPolicyWithReport(String threadName) {this.threadName = threadName;}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("RpcServer["+ " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d),"+ " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)]",threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());System.out.println(msg);throw new RejectedExecutionException(msg);} }
RPC調用客戶端定義實現
/*** @filename:MessageSendExecutor.java** Newland Co. Ltd. All rights reserved.** @Description:Rpc客戶端執行模塊* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import java.lang.reflect.Proxy;public class MessageSendExecutor {private RpcServerLoader loader = RpcServerLoader.getInstance();public MessageSendExecutor(String serverAddress) {loader.load(serverAddress);}public void stop() {loader.unLoad();}public static <T> T execute(Class<T> rpcInterface) {return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(),new Class<?>[]{rpcInterface},new MessageSendProxy<T>(rpcInterface));} }
這里的RPC客戶端實際上,是動態代理了MessageSendProxy,當然這里是應用了,JDK原生的動態代理實現,你還可以改成CGLIB(Code Generation Library)方式。不過本人測試了一下CGLIB方式,在高并發的情況下面會出現空指針異常,但是同樣的情況,JDK原生的動態代理卻沒有問題。并發程度不高的情況下面,兩種代理方式都運行正常。后續再深入研究看看吧!廢話不說了,現在給出MessageSendProxy的實現方式
/*** @filename:MessageSendProxy.java** Newland Co. Ltd. All rights reserved.** @Description:Rpc客戶端消息處理* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.UUID; import newlandframework.netty.rpc.model.MessageRequest;public class MessageSendProxy<T> implements InvocationHandler {private Class<T> cls;public MessageSendProxy(Class<T> cls) {this.cls = cls;}public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {MessageRequest request = new MessageRequest();request.setMessageId(UUID.randomUUID().toString());request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setTypeParameters(method.getParameterTypes());request.setParameters(args);MessageSendHandler handler = RpcServerLoader.getInstance().getMessageSendHandler();MessageCallBack callBack = handler.sendRequest(request);return callBack.start();} }
進一步發現MessageSendProxy其實是把消息發送給RpcServerLoader模塊,它的代碼如下:
/*** @filename:RpcServerLoader.java** Newland Co. Ltd. All rights reserved.** @Description:rpc服務器配置加載* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import java.net.InetSocketAddress; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;public class RpcServerLoader {private volatile static RpcServerLoader rpcServerLoader;private final static String DELIMITER = ":";private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;//方法返回到Java虛擬機的可用的處理器數量private final static int parallel = Runtime.getRuntime().availableProcessors() * 2;//netty nio線程池private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(parallel);private static ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1);private MessageSendHandler messageSendHandler = null;//等待Netty服務端鏈路建立通知信號private Lock lock = new ReentrantLock();private Condition signal = lock.newCondition();private RpcServerLoader() {}//并發雙重鎖定public static RpcServerLoader getInstance() {if (rpcServerLoader == null) {synchronized (RpcServerLoader.class) {if (rpcServerLoader == null) {rpcServerLoader = new RpcServerLoader();}}}return rpcServerLoader;}public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) {String[] ipAddr = serverAddress.split(RpcServerLoader.DELIMITER);if (ipAddr.length == 2) {String host = ipAddr[0];int port = Integer.parseInt(ipAddr[1]);final InetSocketAddress remoteAddr = new InetSocketAddress(host, port);threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, this, serializeProtocol));}}public void setMessageSendHandler(MessageSendHandler messageInHandler) {try {lock.lock();this.messageSendHandler = messageInHandler;//喚醒所有等待客戶端RPC線程signal.signalAll();} finally {lock.unlock();}}public MessageSendHandler getMessageSendHandler() throws InterruptedException {try {lock.lock();//Netty服務端鏈路沒有建立完畢之前,先掛起等待if (messageSendHandler == null) {signal.await();}return messageSendHandler;} finally {lock.unlock();}}public void unLoad() {messageSendHandler.close();threadPoolExecutor.shutdown();eventLoopGroup.shutdownGracefully();}public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) {this.serializeProtocol = serializeProtocol;} }
好了,現在一次性給出RPC客戶端消息編碼、解碼、處理的模塊實現代碼。
/*** @filename:MessageSendInitializeTask.java** Newland Co. Ltd. All rights reserved.** @Description:Rpc客戶端線程任務處理* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress;public class MessageSendInitializeTask implements Runnable {private EventLoopGroup eventLoopGroup = null;private InetSocketAddress serverAddress = null;private RpcServerLoader loader = null;MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcServerLoader loader) {this.eventLoopGroup = eventLoopGroup;this.serverAddress = serverAddress;this.loader = loader;}public void run() {Bootstrap b = new Bootstrap();b.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);b.handler(new MessageSendChannelInitializer());ChannelFuture channelFuture = b.connect(serverAddress);channelFuture.addListener(new ChannelFutureListener() {public void operationComplete(final ChannelFuture channelFuture) throws Exception {if (channelFuture.isSuccess()) {MessageSendHandler handler = channelFuture.channel().pipeline().get(MessageSendHandler.class);MessageSendInitializeTask.this.loader.setMessageSendHandler(handler);}}});} }
/*** @filename:MessageSendChannelInitializer.java** Newland Co. Ltd. All rights reserved.** @Description:Rpc客戶端管道初始化* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder;public class MessageSendChannelInitializer extends ChannelInitializer<SocketChannel> {//ObjectDecoder 底層默認繼承半包解碼器LengthFieldBasedFrameDecoder處理粘包問題的時候,//消息頭開始即為長度字段,占據4個字節。這里出于保持兼容的考慮final public static int MESSAGE_LENGTH = 4;protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();//ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持兼容。因為底層的父類LengthFieldBasedFrameDecoder//的初始化參數即為super(maxObjectSize, 0, 4, 0, 4);pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageSendChannelInitializer.MESSAGE_LENGTH, 0, MessageSendChannelInitializer.MESSAGE_LENGTH));//利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭pipeline.addLast(new LengthFieldPrepender(MessageSendChannelInitializer.MESSAGE_LENGTH));pipeline.addLast(new ObjectEncoder());//考慮到并發性能,采用weakCachingConcurrentResolver緩存策略。一般情況使用:cacheDisabled即可pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));pipeline.addLast(new MessageSendHandler());} }
/*** @filename:MessageSendHandler.java** Newland Co. Ltd. All rights reserved.** @Description:Rpc客戶端處理模塊* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.net.SocketAddress; import java.util.concurrent.ConcurrentHashMap; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse;public class MessageSendHandler extends ChannelInboundHandlerAdapter {private ConcurrentHashMap<String, MessageCallBack> mapCallBack = new ConcurrentHashMap<String, MessageCallBack>();private volatile Channel channel;private SocketAddress remoteAddr;public Channel getChannel() {return channel;}public SocketAddress getRemoteAddr() {return remoteAddr;}public void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);this.remoteAddr = this.channel.remoteAddress();}public void channelRegistered(ChannelHandlerContext ctx) throws Exception {super.channelRegistered(ctx);this.channel = ctx.channel();}public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {MessageResponse response = (MessageResponse) msg;String messageId = response.getMessageId();MessageCallBack callBack = mapCallBack.get(messageId);if (callBack != null) {mapCallBack.remove(messageId);callBack.over(response);}}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}public void close() {channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);}public MessageCallBack sendRequest(MessageRequest request) {MessageCallBack callBack = new MessageCallBack(request);mapCallBack.put(request.getMessageId(), callBack);channel.writeAndFlush(request);return callBack;} }
最后給出RPC服務端的實現。首先是通過spring自動加載RPC服務接口、接口實現容器綁定加載,初始化Netty主/從線程池等操作,具體是通過MessageRecvExecutor模塊實現的,現在給出實現代碼:
/*** @filename:MessageRecvExecutor.java** Newland Co. Ltd. All rights reserved.** @Description:Rpc服務器執行模塊* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.logging.Level; import newlandframework.netty.rpc.model.MessageKeyVal; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware;public class MessageRecvExecutor implements ApplicationContextAware, InitializingBean {private String serverAddress;private final static String DELIMITER = ":";private Map<String, Object> handlerMap = new ConcurrentHashMap<String, Object>();private static ThreadPoolExecutor threadPoolExecutor;public MessageRecvExecutor(String serverAddress) {this.serverAddress = serverAddress;}public static void submit(Runnable task) {if (threadPoolExecutor == null) {synchronized (MessageRecvExecutor.class) {if (threadPoolExecutor == null) {threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1);}}}threadPoolExecutor.submit(task);}public void setApplicationContext(ApplicationContext ctx) throws BeansException {try {MessageKeyVal keyVal = (MessageKeyVal) ctx.getBean(Class.forName("newlandframework.netty.rpc.model.MessageKeyVal"));Map<String, Object> rpcServiceObject = keyVal.getMessageKeyVal();Set s = rpcServiceObject.entrySet();Iterator<Map.Entry<String, Object>> it = s.iterator();Map.Entry<String, Object> entry;while (it.hasNext()) {entry = it.next();handlerMap.put(entry.getKey(), entry.getValue());}} catch (ClassNotFoundException ex) {java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex);}}public void afterPropertiesSet() throws Exception {//netty的線程池模型設置成主從線程池模式,這樣可以應對高并發請求//當然netty還支持單線程、多線程網絡IO模型,可以根據業務需求靈活配置ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory");//方法返回到Java虛擬機的可用的處理器數量int parallel = Runtime.getRuntime().availableProcessors() * 2;EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup(parallel,threadRpcFactory,SelectorProvider.provider());try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new MessageRecvChannelInitializer(handlerMap)).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER);if (ipAddr.length == 2) {String host = ipAddr[0];int port = Integer.parseInt(ipAddr[1]);ChannelFuture future = bootstrap.bind(host, port).sync();System.out.printf("[author tangjie] Netty RPC Server start success ip:%s port:%d\n", host, port);future.channel().closeFuture().sync();} else {System.out.printf("[author tangjie] Netty RPC Server start fail!\n");}} finally {worker.shutdownGracefully();boss.shutdownGracefully();}} }
最后還是老規矩,給出RPC服務端消息編碼、解碼、處理的核心模塊代碼實現,具體如下:
/*** @filename:MessageRecvChannelInitializer.java** Newland Co. Ltd. All rights reserved.** @Description:Rpc服務端管道初始化* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import java.util.Map;public class MessageRecvChannelInitializer extends ChannelInitializer<SocketChannel> {//ObjectDecoder 底層默認繼承半包解碼器LengthFieldBasedFrameDecoder處理粘包問題的時候,//消息頭開始即為長度字段,占據4個字節。這里出于保持兼容的考慮final public static int MESSAGE_LENGTH = 4;private Map<String, Object> handlerMap = null;MessageRecvChannelInitializer(Map<String, Object> handlerMap) {this.handlerMap = handlerMap;}protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();//ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持兼容。因為底層的父類LengthFieldBasedFrameDecoder//的初始化參數即為super(maxObjectSize, 0, 4, 0, 4); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH));//利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭pipeline.addLast(new LengthFieldPrepender(MessageRecvChannelInitializer.MESSAGE_LENGTH));pipeline.addLast(new ObjectEncoder());//考慮到并發性能,采用weakCachingConcurrentResolver緩存策略。一般情況使用:cacheDisabled即可pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));pipeline.addLast(new MessageRecvHandler(handlerMap));} }
/*** @filename:MessageRecvHandler.java** Newland Co. Ltd. All rights reserved.** @Description:Rpc服務器消息處理* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Map; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse;public class MessageRecvHandler extends ChannelInboundHandlerAdapter {private final Map<String, Object> handlerMap;public MessageRecvHandler(Map<String, Object> handlerMap) {this.handlerMap = handlerMap;}public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {MessageRequest request = (MessageRequest) msg;MessageResponse response = new MessageResponse();MessageRecvInitializeTask recvTask = new MessageRecvInitializeTask(request, response, handlerMap, ctx);//不要阻塞nio線程,復雜的業務邏輯丟給專門的線程池MessageRecvExecutor.submit(recvTask);}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {//網絡有異常要關閉通道ctx.close();} }
/*** @filename:MessageRecvInitializeTask.java** Newland Co. Ltd. All rights reserved.** @Description:Rpc服務器消息線程任務處理* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import java.util.Map; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; import org.apache.commons.beanutils.MethodUtils;public class MessageRecvInitializeTask implements Runnable {private MessageRequest request = null;private MessageResponse response = null;private Map<String, Object> handlerMap = null;private ChannelHandlerContext ctx = null;public MessageResponse getResponse() {return response;}public MessageRequest getRequest() {return request;}public void setRequest(MessageRequest request) {this.request = request;}MessageRecvInitializeTask(MessageRequest request, MessageResponse response, Map<String, Object> handlerMap, ChannelHandlerContext ctx) {this.request = request;this.response = response;this.handlerMap = handlerMap;this.ctx = ctx;}public void run() {response.setMessageId(request.getMessageId());try {Object result = reflect(request);response.setResult(result);} catch (Throwable t) {response.setError(t.toString());t.printStackTrace();System.err.printf("RPC Server invoke error!\n");}ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture channelFuture) throws Exception {System.out.println("RPC Server Send message-id respone:" + request.getMessageId());}});}private Object reflect(MessageRequest request) throws Throwable {String className = request.getClassName();Object serviceBean = handlerMap.get(className);String methodName = request.getMethodName();Object[] parameters = request.getParameters();return MethodUtils.invokeMethod(serviceBean, methodName, parameters);} }
然后是RPC消息處理的回調實現模塊代碼
/*** @filename:MessageCallBack.java** Newland Co. Ltd. All rights reserved.** @Description:Rpc消息回調* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.core;import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse;public class MessageCallBack {private MessageRequest request;private MessageResponse response;private Lock lock = new ReentrantLock();private Condition finish = lock.newCondition();public MessageCallBack(MessageRequest request) {this.request = request;}public Object start() throws InterruptedException {try {lock.lock();//設定一下超時時間,rpc服務器太久沒有相應的話,就默認返回空吧。finish.await(10*1000, TimeUnit.MILLISECONDS);if (this.response != null) {return this.response.getResult();} else {return null;}} finally {lock.unlock();}}public void over(MessageResponse reponse) {try {lock.lock();finish.signal();this.response = reponse;} finally {lock.unlock();}} }
到此為止,NettyRPC的關鍵部分:服務端、客戶端的模塊已經通過Netty全部實現了。現在給出spring加載配置rpc-invoke-config.xml的內容:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context.xsd"><context:component-scan base-package="newlandframework.netty.rpc.core"/><context:property-placeholder location="classpath:newlandframework/netty/rpc/config/rpc-server.properties"/><bean id="rpcbean" class="newlandframework.netty.rpc.model.MessageKeyVal"><property name="messageKeyVal"><map><entry key="newlandframework.netty.rpc.servicebean.Calculate"><ref bean="calc"/></entry></map></property></bean><bean id="calc" class="newlandframework.netty.rpc.servicebean.CalculateImpl"/><bean id="rpcServer" class="newlandframework.netty.rpc.core.MessageRecvExecutor"><constructor-arg name="serverAddress" value="${rpc.server.addr}"/></bean> </beans>
再貼出RPC服務綁定ip信息的配置文件:rpc-server.properties的內容。
#rpc server's ip address config rpc.server.addr=127.0.0.1:18888
最后NettyRPC服務端啟動方式參考如下:
new ClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config.xml");
如果一切順利,沒有出現意外的話,控制臺上面,會出現如下截圖所示的情況:
如果出現了,說明NettyRPC服務器,已經啟動成功!
上面基于Netty的RPC服務器,并發處理性能如何呢?實踐是檢驗真理的唯一標準,下面我們就來實戰一下。
下面的測試案例,是基于RPC遠程調用兩數相加函數,并返回計算結果。客戶端同時開1W個線程,同一時刻,瞬時發起并發計算請求,然后觀察Netty的RPC服務器是否有正常應答回復響應,以及客戶端是否有正常返回調用計算結果。值得注意的是,測試案例是基于1W個線程瞬時并發請求而設計的,并不是1W個線程循環發起請求。這兩者對于衡量RPC服務器的并發處理性能,還是有很大差別的。當然,前者對于并發性能的處理要求,要高上很多很多。
現在,先給出RPC計算接口、RPC計算接口實現類的代碼實現:
/*** @filename:Calculate.java** Newland Co. Ltd. All rights reserved.** @Description:計算器定義接口* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.servicebean;public interface Calculate {//兩數相加int add(int a, int b); }
/*** @filename:CalculateImpl.java** Newland Co. Ltd. All rights reserved.** @Description:計算器定義接口實現* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.servicebean;public class CalculateImpl implements Calculate {//兩數相加public int add(int a, int b) {return a + b;} }
下面是瞬時并發RPC請求的測試樣例:
/*** @filename:CalcParallelRequestThread.java** Newland Co. Ltd. All rights reserved.** @Description:并發線程模擬* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.servicebean;import newlandframework.netty.rpc.core.MessageSendExecutor; import java.util.concurrent.CountDownLatch; import java.util.logging.Level; import java.util.logging.Logger;public class CalcParallelRequestThread implements Runnable {private CountDownLatch signal;private CountDownLatch finish;private MessageSendExecutor executor;private int taskNumber = 0;public CalcParallelRequestThread(MessageSendExecutor executor, CountDownLatch signal, CountDownLatch finish, int taskNumber) {this.signal = signal;this.finish = finish;this.taskNumber = taskNumber;this.executor = executor;}public void run() {try {signal.await();Calculate calc = executor.execute(Calculate.class);int add = calc.add(taskNumber, taskNumber);System.out.println("calc add result:[" + add + "]");finish.countDown();} catch (InterruptedException ex) {Logger.getLogger(CalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex);}} }
/*** @filename:RpcParallelTest.java** Newland Co. Ltd. All rights reserved.** @Description:rpc并發測試代碼* @author tangjie* @version 1.0**/ package newlandframework.netty.rpc.servicebean;import java.util.concurrent.CountDownLatch; import newlandframework.netty.rpc.core.MessageSendExecutor; import org.apache.commons.lang.time.StopWatch;public class RpcParallelTest {public static void main(String[] args) throws Exception {final MessageSendExecutor executor = new MessageSendExecutor("127.0.0.1:18888");//并行度10000int parallel = 10000;//開始計時StopWatch sw = new StopWatch();sw.start();CountDownLatch signal = new CountDownLatch(1);CountDownLatch finish = new CountDownLatch(parallel);for (int index = 0; index < parallel; index++) {CalcParallelRequestThread client = new CalcParallelRequestThread(executor, signal, finish, index);new Thread(client).start();}//10000個并發線程瞬間發起請求操作signal.countDown();finish.await();sw.stop();String tip = String.format("RPC調用總共耗時: [%s] 毫秒", sw.getTime());System.out.println(tip);executor.stop();} }
好了,現在先啟動NettyRPC服務器,確認沒有問題之后,運行并發RPC請求客戶端,看下客戶端打印的計算結果,以及處理耗時。
? ? ?
從上面來看,10000個瞬時RPC計算請求,總共耗時接近11秒。我們在來看下NettyRPC的服務端運行情況,如下所示:
? ? ?
可以很清楚地看到,RPC服務端都有收到客戶端發起的RPC計算請求,并返回消息應答。
最后我們還是要分別驗證一下,RPC服務端是否存在丟包、粘包、IO阻塞的情況?1W個并發計算請求,是否成功接收處理并應答了?實際情況說明一切,看下圖所示:
? ? ??
? 非常給力,RPC的服務端確實成功接收到了客戶端發起的1W筆瞬時并發計算請求,并且成功應答處理了。并沒有出現:丟包、粘包、IO阻塞的情況。再看下RPC客戶端,是否成功得到計算結果的應答返回了呢?
很好,RPC的客戶端,確實收到了RPC服務端計算的1W筆加法請求的計算結果,而且耗時接近11秒。由此可見,基于Netty+業務線程池的NettyRPC服務器,應對并發多線程RPC請求,處理起來是得心應手,游刃有余!
最后,本文通過Netty這個NIO框架,實現了一個很簡單的“高性能”的RPC服務器,代碼雖然寫出來了,但是還是有一些值得改進的地方,比如:
1、對象序列化傳輸可以支持目前主流的序列化框架:protobuf、JBoss?Marshalling、Avro等等。
2、Netty的線程模型可以根據業務需求,進行定制。因為,并不是每筆業務都需要這么強大的并發處理性能。
3、目前RPC計算只支持一個RPC服務接口映射綁定一個對應的實現,后續要支持一對多的情況。
4、業務線程池的啟動參數、線程池并發阻塞容器模型等等,可以配置化管理。
5、Netty的Handler處理部分,對于復雜的業務邏輯,現在是統一分派到特定的線程池進行后臺異步處理。當然你還可以考慮JMS(消息隊列)方式進行解耦,統一分派給消息隊列的訂閱者,統一處理。目前實現JMS的開源框架也有很多,ActiveMQ、RocketMQ等等,都可以考慮。
本文實現的NettyRPC,對于面前的您而言,一定還有很多地方,可以加以完善和改進,優化改進的工作就交給您自由發揮了。
由于本人技術能力、認知水平有限。本文中有說不對的地方,懇請園友們批評指正!不吝賜教!最后,感謝面前的您,耐心的閱讀完本文,相信現在的你,對于Java開發高性能的服務端應用,又有了一個更深入的了解!本文算是對我Netty學習成果的階段性總結,后續有時間,我還會繼續推出Netty工業級開發的相關文章,敬請期待!
PS:還有興趣的朋友可以參考、閱讀一下,我的另外一篇文章:Netty實現高性能RPC服務器優化篇之消息序列化。此外,自從在博客園發表了兩篇:基于Netty開發高性能RPC服務器的文章之后,本人收到很多園友們索要源代碼進行學習交流的請求。為了方便大家,本人把NettyRPC的代碼開源托管到github上面,歡迎有興趣的朋友一起學習、研究!
附上NettyRPC項目的下載路徑:https://github.com/tang-jie/NettyRPC
?
Netty工業級開發系列文章進階:Netty構建分布式消息隊列(AvatarMQ)設計指南之架構篇
談談如何使用Netty開發實現高性能的RPC服務器、Netty實現高性能RPC服務器優化篇之消息序列化。這兩篇文章主要設計的思路是,基于Netty構建了一個高性能的RPC服務器,而這些前期代碼的準備工作,主要是為了設計、實現一個基于Netty的分布式消息隊列系統做鋪墊,本人把這個分布式消息隊列系統,命名為:AvatarMQ。作為Netty工業級開發系列的進階篇,感興趣的朋友可以點擊關注:Netty構建分布式消息隊列(AvatarMQ)設計指南之架構篇,一定不會讓您失望!
AvatarMQ項目開源網址:https://github.com/tang-jie/AvatarMQ。