?netty系列文章:
01-netty基礎-socket |
02-netty基礎-java四種IO模型 |
03-netty基礎-多路復用select、poll、epoll |
04-netty基礎-Reactor三種模型 |
05-netty基礎-ByteBuf數據結構 |
06-netty基礎-編碼解碼 |
07-netty基礎-自定義編解碼器 |
08-netty基礎-自定義序列化和反序列化 |
09-netty基礎-手寫rpc-原理-01 |
10-netty基礎-手寫rpc-定義協議頭-02 |
11-netty基礎-手寫rpc-支持多序列化協議-03 |
12-netty基礎-手寫rpc-編解碼-04 |
13-netty基礎-手寫rpc-消費方生成代理-05 |
14-netty基礎-手寫rpc-提供方(服務端)-06 |
1 功能邏輯
在客戶端啟動的時候要為添加了BonnieRemoteReference注解的屬性生成一個代理類;代理類的主要功能:在spring容器加載完BeanDefinition之后,在Bean初始化之前,觸發生成代理類。
邏輯:
- 獲取到所有的BeanDefinition
- 拿到BeanDefinition對應的class
- 遍歷class下的所有被BonnieRemoteReference修飾的屬性(成員變量)
- 為被BonnieRemoteReference修飾的屬性,使用BeanDefinitionBuilder構建BeanDefinition,設置interfaceClass、serviceAddress、servicePort屬性,并放入到spring容器中,對象的類型為SpringRpcReferenceBean;
- SpringRpcReferenceBean實現FactoryBean接口,然后在getObject中返回代理對象。
- 編寫NettyClient代碼
補充:
Spring 的?FactoryBean
?是一個工廠 bean 接口,用于自定義 bean 的創建邏輯。它的核心作用是:
- 當容器獲取該 bean 時(如?
getBean("xxx")
),實際返回的是?getObject()
?方法創建的對象,而非?SpringRpcReferenceBean
?自身實例。 - 常用于創建復雜對象(如遠程服務代理、數據庫連接池等)
2 重點代碼介紹
2.1 觸發生成代理類入口代碼
在spring容器加載BeanDefinition之后,在Bean初始化之前執行,實現接口BeanFactoryPostProcessor接口中postProcessBeanFactory方法即可
?
獲取所有的beanDefinitionNames
String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();
獲取beanClassName對應的類信息
Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);
獲取clazz上的所有屬性(成員變量)
ReflectionUtils.doWithFields(clazz, this::parseRpcReference);
當前這個field是否被BonnieRemoteReference注解修飾
BonnieRemoteReference remoteReference = AnnotationUtils.getAnnotation(field, BonnieRemoteReference.class);
生成SpringRpcReferenceBean的BeanDefinition
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class)
放入屬性,遠程調用中需要的內容,比如是那個類,以及地址端口信息
builder.addPropertyValue("interfaceClass", field.getType());
builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());
builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());
BeanDefinition beanDefinition = builder.getBeanDefinition();
rpcRefBeanDefinitionMap.put(field.getName(), beanDefinition);
放入到spring容器中
registry.registerBeanDefinition(entry.getKey(), entry.getValue());
package com.bonnie.protocol.spring.reference;import com.bonnie.protocol.annotation.BonnieRemoteReference;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;import java.lang.reflect.Field;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {private ApplicationContext applicationContext;private ClassLoader classLoader;//保存發布的引用bean的信息private final Map<String, BeanDefinition> rpcRefBeanDefinitionMap = new ConcurrentHashMap<>();private RpcClientProperties rpcClientProperties;public SpringRpcReferencePostProcessor(RpcClientProperties rpcClientProperties) {this.rpcClientProperties = rpcClientProperties;}/*** 實現postProcessBeanFactory方法,spring容器加載了bean的定義文件之后, 在bean實例化之前執行* 1、將類型的存在的BonnieRemoteReference注解的屬性,構造BeanDefinition放在容器中,beanName是類的全限定名, BeanDefinition(類的全限定名,客戶端IP,客戶端端口號)* @param beanFactory* @throws BeansException*/@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {// 獲取到所有的beanDefinitionString[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();// 遍歷for (String beanDefinitionName : beanDefinitionNames) {BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);String beanClassName = beanDefinition.getBeanClassName();if (Objects.nonNull(beanClassName)) {// 獲取到這個類的所有fieldClass<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);// 該方法遍歷class對象中的所有的field屬性,并且作為參數傳入到parseRpcReference方法中ReflectionUtils.doWithFields(clazz, this::parseRpcReference);}}// 將生成的BeanDefinition放入到容器中BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;Set<Map.Entry<String, BeanDefinition>> entries = this.rpcRefBeanDefinitionMap.entrySet();for (Map.Entry<String, BeanDefinition> entry : entries) {if (applicationContext.containsBean(entry.getKey())) {log.warn("SpringContext already register bean {}", entry.getKey());} else {registry.registerBeanDefinition(entry.getKey(), entry.getValue());log.info("registered RpcReferenceBean {} success", entry.getKey());}}}private void parseRpcReference(Field field) {// 當前這個field是否被BonnieRemoteReference注解修飾BonnieRemoteReference remoteReference = AnnotationUtils.getAnnotation(field, BonnieRemoteReference.class);// BonnieRemoteReference注解修飾if (Objects.nonNull(remoteReference)) {BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);builder.addPropertyValue("interfaceClass", field.getType());builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());BeanDefinition beanDefinition = builder.getBeanDefinition();rpcRefBeanDefinitionMap.put(field.getName(), beanDefinition);}}@Overridepublic void setBeanClassLoader(ClassLoader classLoader) {this.classLoader = classLoader;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}}
2.2?生成代理類代碼
上面會被BonnieRemoteReference修飾的屬性(Field)為生成SpringRpcReferenceBean對象,并添加相關的屬性。
實現FactoryBean
接口,當spring獲取
SpringRpcReferenceBean對象的時候,調用的就是里面的getObject對象,在getObject里面生成一個代理類,即可代理被BonnieRemoteReference修飾的類。
package com.bonnie.protocol.spring.reference;import lombok.Setter;
import org.springframework.beans.factory.FactoryBean;import java.lang.reflect.Proxy;/*** 創建SpringRpcReferenceBean的代理對象*/
@Setter
public class SpringRpcReferenceBean implements FactoryBean<Object> {private String serviceAddress;private Integer servicePort;private Class<?> interfaceClass;/*** 返回由工廠創建的目標Bean實例* @return* @throws Exception*/@Overridepublic Object getObject() throws Exception {System.out.println("代理類 serviceAddress "+serviceAddress);System.out.println("代理類 servicePort "+servicePort);System.out.println("代理類 interfaceClass "+interfaceClass);// 為BonnieRemoteReference生成一個代理類return Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[]{interfaceClass},new RpcInvokerProxy(serviceAddress, servicePort));}/*** 返回目標Bean的類型* @return*/@Overridepublic Class<?> getObjectType() {return this.interfaceClass;}}
2.3 代理類handler
這塊主要是在發生rpc調用的時候,組裝請求信息,并通過nettyClient向服務端發起連接并且發送請求。
package com.bonnie.protocol.spring.reference;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.enums.RpcConstant;
import com.bonnie.protocol.enums.SerialTypeEnum;
import com.bonnie.protocol.netty.NettyClient;
import io.netty.channel.DefaultEventLoop;
import io.netty.util.concurrent.DefaultPromise;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;public class RpcInvokerProxy implements InvocationHandler {private String host;private Integer port;public RpcInvokerProxy(String host, Integer port) {this.host = host;this.port = port;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {/*** 構建發送的請求報文,首先去創建RequestHold類,在這個類定義一個原子自增的RequestId,* 在一個就是每次請求都會有結果,那么請求id和結果的關系要有一個映射關系*/RpcProtocol<RpcRequest> reqProtocol = new RpcProtocol<>();long requestId = RequestHolder.REQUEST_ID.incrementAndGet();System.out.println("生成的requestId:" + requestId);Header header = new Header();header.setMagic(RpcConstant.MAGIC);header.setSerialType(SerialTypeEnum.JAVA_SERIAL.getCode());header.setReqType(ReqTypeEnum.REQUEST.getCode());header.setRequestId(requestId);header.setLength(0);RpcRequest rpcRequest = new RpcRequest();rpcRequest.setClassName(method.getDeclaringClass().getName());rpcRequest.setMethodName(method.getName());rpcRequest.setParams(args);rpcRequest.setParameterTypes(method.getParameterTypes());reqProtocol.setHeader(header);reqProtocol.setContent(rpcRequest);// 發起遠程調用NettyClient nettyClient = new NettyClient(host, port);System.out.println("代理發送到服務端請求內容:" + JSONObject.toJSONString(reqProtocol));// new DefaultEventLoop(),是用來去執行監聽器的RpcFuture<RpcResponse> future = new RpcFuture<>(new DefaultPromise<RpcResponse>(new DefaultEventLoop()));// 在發起請求之前,添加映射關系到map中RequestHolder.REQUEST_MAP.put(header.getRequestId(), future);// 客戶端發送數據nettyClient.sendRequest(reqProtocol);// 通過promise,異步等待服務端發送數據來,不然就會一直在此等待// get方法得到的是RpcResponse類,然后調用getData方法獲取到數據return future.getPromise().get().getData();}
}
2.4 netty客戶端代碼
這塊主要包含創建客戶端、向服務端發起連接、發送請求,也會設置前文中自定義編解碼、序列化的操作
package com.bonnie.protocol.netty;import com.bonnie.protocol.code.BonnieDecoder;
import com.bonnie.protocol.code.BonnieEncoder;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyClient {private final Bootstrap bootstrap;private final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();private String serviceAddress;private Integer servicePort;public NettyClient(String serviceAddress, Integer servicePort) {log.info("開始初始化NettyClient======");bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.info("開始初始化RpcClientInitializer======");ch.pipeline().addLast(new LoggingHandler()).addLast(new BonnieEncoder()).addLast(new BonnieDecoder()).addLast(new RpcClientHandler());}});this.serviceAddress = serviceAddress;this.servicePort = servicePort;}/*** 發送數據* @param protocol* @throws Exception*/public void sendRequest(RpcProtocol<RpcRequest> protocol) {try {System.out.println(this.serviceAddress+ "===="+this.servicePort);final ChannelFuture channelFuture = bootstrap.connect(this.serviceAddress, this.servicePort).sync();// 注冊一個監聽器,如果出問題就關閉groupchannelFuture.addListener(listener -> {if (channelFuture.isSuccess()) {log.info("connect rpc server {} success.",this.serviceAddress);} else {log.error("connect rpc server {} failed. ",this.servicePort);channelFuture.cause().printStackTrace();eventLoopGroup.shutdownGracefully();}});log.info("begin transfer data");// 向服務端發送數據channelFuture.channel().writeAndFlush(protocol);} catch (InterruptedException e) {e.printStackTrace();}}}
2.5 netty客戶端接收服務端響應數據
package com.bonnie.protocol.netty;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.spring.reference.RequestHolder;
import com.bonnie.protocol.spring.reference.RpcFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {/*** 接收服務端響應數據* @param ctx* @param msg* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) throws Exception {long requestId = msg.getHeader().getRequestId();log.info("接收服務端響應的結果====== requestId {} {}", requestId, JSONObject.toJSONString(msg));// 刪除映射關系RpcFuture<RpcResponse> future = RequestHolder.REQUEST_MAP.remove(requestId);// 我們之前說異步等待服務端發送數據過來,那么只要服務端發送數據過來,就會調用管道RpcClentHandler的read方法// 那么當初future.getPromise().get()如果不再阻塞獲取數據呢?就是通過給Promise中的Success設置值,同時會喚醒阻塞的線程// 一當喚醒線程, future.getPromise().get()就會不再阻塞,就獲取到服務端返回的數據future.getPromise().setSuccess(msg.getContent());}}