手寫Rpc框架 - 導讀
git倉庫-all-rpc
GTIEE:https://gitee.com/quercus-sp204/all-rpc 【參考源碼 yrpc】
1. Rpc概念
RPC 即遠程過程調用(Remote Procedure Call) ,就是通過網絡從遠程計算機程序上請求服務。
- 本地調用抽象:允許程序像調用本地函數一樣調用遠程計算機上的函數。開發者無需編寫復雜的網絡通信代碼來處理諸如建立連接、發送請求、等待響應等細節,只需關注業務邏輯的實現。例如,在一個分布式系統中,A 服務器上的程序需要調用 B 服務器上的某個函數來獲取數據,使用 RPC 就可以像調用本地函數一樣簡單。
- 通信協議與序列化:為了實現這種跨網絡的函數調用,RPC 框架通常會使用特定的通信協議(如 TCP/IP)來傳輸數據,并通過序列化和反序列化技術,將調用的參數和返回值轉換為適合在網絡上傳輸的格式。比如,將參數對象轉換為字節流進行傳輸,在接收端再將字節流還原為對象。
那么,它的應用場景有哪些呢?我們平時用到了嗎?
- 微服務架構:在微服務架構中,各個微服務之間通常需要相互通信來完成復雜的業務流程。例如,一個電商系統中,訂單服務可能需要調用庫存服務來檢查商品庫存,調用用戶服務來驗證用戶信息等,RPC 可以高效地實現這些微服務間的通信。
- 分布式系統:在大型分布式系統中,不同節點可能負責不同的功能模塊。例如,在搜索引擎系統中,索引構建節點和查詢服務節點可能分布在不同的服務器上,通過 RPC 可以實現節點之間的協同工作。
說白了,就是不同服務之間的網絡通信嘛。那你可能會問了,假如我的系統有User、Order、Shipment三個服務【爪哇SpringBoot編寫的嚯】,如果User想要訪問Order上面的函數,我只需要將此函數以Http接口的形式暴露出去,然后User那邊使用RestTemplate來訪問不就好了嗎?何必這么費勁還要用框架呢?
仔細想一下,確實有那么點兒卵道理,但是又仔細一想,
- 你會發現,你使用RestTemplate的時候,需要手動處理請求的 URL 拼接、請求頭設置、參數序列化和響應反序列化等操作,對了,還有異常也需要自己處理,需要自己處理調用失敗的情況,如重試幾次啊等等
- HTTP 協議是一種文本協議,存在較多的頭部信息,在數據傳輸時會帶來額外的開銷。而rpc框架的協議通常相比http協議,是很輕量的。
- 如果Order部署在了多臺機器上面,代碼里面肯定是存了這些機器的地址,如果擴容或者縮容,都要修改代碼,同時還需要我們手動選擇調用哪一臺機器上面的Order【需要手動實現負載均衡】
那么,rpc都可以完成這些,并且服務的地址信息那些啊,可以在rpc的注冊中心拉取到,動態感知。
綜上所述,一個基本的Rpc框架主要應該具有的能力是:
基礎通信能力
-
高效序列化、優化網絡傳輸,采用高效的網絡協議和傳輸方式
-
穩定性與可靠性
- 連接管理:具備完善的連接池管理機制,對連接進行復用,減少頻繁建立和銷毀連接的開銷,同時保證連接的穩定性。例如,在高并發場景下,能自動處理連接的超時、重連等問題
服務發現與治理能力
- 服務注冊與發現: 1. 服務提供者能夠在啟動時自動將自己的服務信息(如服務名稱、地址、端口等)注冊到服務注冊中心,方便服務消費者發現和調用。2.實時感知:調用方能夠及時感知調用的服務信息并更新。
- 負載均衡: 如隨機、輪詢、最少活躍調用數、一致性哈希
- 服務容錯
- 熔斷機制:當服務提供者出現故障或響應時間過長時,能夠自動熔斷對該服務的調用,避免大量請求積壓,影響整個系統的穩定性。
- 降級策略:在系統資源緊張或服務出現故障時,能夠自動降級服務,提供默認的響應結果或采取其他臨時措施,保證系統的基本可用性。
易用性與可擴展性
- 簡單的編程模型:讓開發者能夠像調用本地函數一樣調用遠程服務,無需關注底層的網絡通信細節,降低開發難度,提高開發效率
- 插件化與擴展性
- SPI 機制:具備良好的插件化架構,通過服務提供者接口(SPI)機制,允許開發者根據實際需求擴展框架的功能,如自定義序列化方式、負載均衡策略、過濾器等。
現在就按照上面的能力,來一個一個實現,最終將其組合成一個框架。
2. 角色
一個Rpc框架的大致角色分布:
服務提供者將自己的數據信息【例如,端口、ip、接口等信息提供給注冊中心】(服務注冊)
消費者從注冊中心拉取到可用的服務信息(服務發現),然后選擇一個合適的服務(負載均衡),發送網絡請求【請求里面封裝了需要調用的接口,參數等等】,
服務提供者接收到請求之后,本地調用方法,然后通過網絡把響應結果過傳輸給消費者
最后消費者解析響應結果。
注冊中心: (本文就選擇zookeeper為注冊中心)
3. 注冊中心的接入
zookeeper的安裝與啟動,就不在這里贅述了。
思考注冊中心的主要能力:【服務注冊,服務發現】
定義接口
Registry接口
/** 注冊中心的能力: 注冊服務, 拉取服務列表*/
public interface Registry {/*** 注冊服務* @param serviceConfig 服務的配置內容*/void register(ServiceConfig<?> serviceConfig);/*** 從注冊中心拉取服務列表* @param serviceName 服務的名稱* @return 服務的地址*/List<InetSocketAddress> lookup(String serviceName, String group);
}
ServiceConfig 封裝服務信息的class
/*服務信息*/
public class ServiceConfig<T> {// 接口的類型/*比如UserServiceImpl實現了UserService, 真實對象就是實現類,interfaceProvider就是UserService.class*/private Class<?> interfaceProvider;private Object ref; // 真實對象private String group = "default"; // 分組// get set.....
}
Zookeeper注冊中心的實現類 ZookeeperRegistry
服務注冊在zookeeper上面的節點如圖所示
trpc根節點
==?消費者節點
==▼生產者節點
====▼接口的全限定名
======▼分組
========▼地址信息1...
@Slf4j
public class ZookeeperRegistry extends AbstractRegistry {// 維護一個zk實例private ZooKeeper zooKeeper;public ZookeeperRegistry() {this.zooKeeper = ZookeeperUtil.createZookeeper();}public ZookeeperRegistry(String connectString,int timeout) {this.zooKeeper = ZookeeperUtil.createZookeeper(connectString,timeout);}@Overridepublic void register(ServiceConfig<?> service) {// 服務名稱的節點 ---- "/tprc-metadata/providers/接口全限定名"String parentNode = Constant.BASE_PROVIDERS_PATH +"/"+service.getInterface().getName();// 建立服務節點這個節點應該是一個持久節點if(!ZookeeperUtil.exists(zooKeeper,parentNode,null)){ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode,null);ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);}// 建立分組節點parentNode = parentNode + "/" + service.getGroup();if(!ZookeeperUtil.exists(zooKeeper,parentNode,null)){ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode,null);ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);}// 創建本機的臨時節點, ip:port ,// 服務提供方的端口一般自己設定,我們還需要一個獲取ip的方法// ip我們通常是需要一個局域網ip,不是127.0.0.1,也不是ipv6// 192.168.12.121String node = parentNode + "/" + NetUtils.getIp() + ":" + TrpcBootstrap.getInstance().getConfiguration().getPort();if(!ZookeeperUtil.exists(zooKeeper,node,null)){ZookeeperNode zookeeperNode = new ZookeeperNode(node,null);ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.EPHEMERAL);}log.info("服務{},注冊ok",service.getInterface().getName());}/*** 拉取合適的服務列表* @param serviceName 服務名稱* @return 服務列表*/@Overridepublic List<InetSocketAddress> lookup(String serviceName,String group) {// 1、找到服務對應的節點String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName + "/" +group;List<String> children = ZookeeperUtil.getChildren(zooKeeper, serviceNode, null);// 獲取了所有的可用的服務列表List<InetSocketAddress> inetSocketAddresses = children.stream().map(ipString -> {String[] ipAndPort = ipString.split(":");String ip = ipAndPort[0];int port = Integer.parseInt(ipAndPort[1]);return new InetSocketAddress(ip, port);}).toList();if(inetSocketAddresses.isEmpty()){throw new DiscoveryException("未發現任何可用的服務主機.");}return inetSocketAddresses;}
}
上面的ZookeeperUtil是自定義的操作Zookeeper的工具類。-- 詳情見源碼里面的注釋,值得說明一下,Zookeeper要先有父結點才能創建子節點,不能把路徑直接寫全了直接創建,故在源碼里面會用createRoot方法初始化所有的父結點。
public static ZooKeeper createZookeeper(String connectPath, int timeout) {CountDownLatch countDownLatch = new CountDownLatch(1);try {.................// 連接成功就創建根節點,檢查是否存在rpc根節點 /trpc-metadata/providers && /trpc-metadata/consumerscreateRoot(zooKeeper);...............} catch (IOException | InterruptedException e) {log.info("創建zookeeper實例時發生異常:",e);throw new ZookeeperException("創建zookeeper實例時發生異常");}
}
至此,注冊中心的兩個基本方法就可以告一段落了。
4.Trpc框架啟動器
既然是一個框架,那么,我們必然有一個入口來啟動這一套流程。
①服務提供方
- 基本功能信息appName、registry
形如: all-tRpc-demo / demo-simple-provider / …/ProviderApplication.java 這樣來啟動我們的提供方。
public class ProviderApplication {public static void main(String[] args) {TrpcBootstrap.getInstance() .appName("user-provider")// 配置注冊中心.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))// 掃描包下的類,然后批量發布接口.scan("com.feng.demo")// 啟動服務.start();}
}
在TrpcBootstrap.java里面
@Slf4j
public class TrpcBootstrap {// 單例,每個應用程序只有一個private static final TrpcBootstrap trpcBootstrap = new TrpcBootstrap();// 配置private final RpcConfiguration configuration;// 獲取實例public static TrpcBootstrap getInstance() {return trpcBootstrap;}// 設置應用名稱 *****public TrpcBootstrap appName( String appName ) {configuration.setAppName(appName);return this;}// 配置注冊中心 *****public TrpcBootstrap registry( RegistryConfig registryConfig ) {// 傳遞過來的參數registryConfig,還沒有創建連接,在這里創建與注冊中心的連接registryConfig.createRegistryConnection(); // 創建注冊中心的連接configuration.setRegistryConfig(registryConfig); // 設置服務注冊中心return this;}
}
RpcConfiguration是封裝的配置信息
// 全局的配置類,代碼配置 --> xml配置 --> 默認項
@Data
public class RpcConfiguration {// 配置信息-->端口號private int port = 8094;// 配置信息-->應用程序的名字private String appName = "default";// 分組信息private String group = "default";// 配置信息-->注冊中心private RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181"); // 默認的// 配置信息-->序列化協議private String serializeType = "jdk";// 配置信息-->壓縮使用的協議private String compressType = "gzip";// 配置信息@Getterpublic IdGenerator idGenerator = new IdGenerator(1, 2);// 配置信息-->負載均衡策略private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();// 為每一個ip配置一個限流器private final Map<SocketAddress, RateLimiter> everyIpRateLimiter = new ConcurrentHashMap<>(16);// 為每一個ip配置一個斷路器,熔斷private final Map<SocketAddress, CircuitBreaker> everyIpCircuitBreaker = new ConcurrentHashMap<>(16);// 讀xml,dom4jpublic RpcConfiguration() {............}
}// 里面又持有注冊中心的類
@Slf4j
public class RegistryConfig {// 定義連接的 url zookeeper://127.0.0.1:2181 redis://192.168.12.125:3306private final String connectString;// 持有一個 Registryprivate Registry registry;public RegistryConfig(String connectString) {this.connectString = connectString;}public Registry getRegistry() {if ( registry == null ) createRegistryConnection();return registry;}/*** 可以使用簡單工廠來完成* @return 具體的注冊中心實例*/public void createRegistryConnection() {if ( connectString == null ) throw new DiscoveryException("未配置注冊中心!");// 1、獲取注冊中心的類型String registryType = getRegistryType(connectString,true).toLowerCase().trim();log.info("【創建與注冊中心的連接~~~】 注冊中心的類型: {}", registryType);// 2、通過類型獲取具體注冊中心if( registryType.equals("zookeeper") ){String host = getRegistryType(connectString, false);this.registry = new ZookeeperRegistry(host, Constant.ZK_TIME_OUT);} else if (registryType.equals("nacos")){String host = getRegistryType(connectString, false);this.registry = new NacosRegistry(host, Constant.ZK_TIME_OUT);} else {throw new DiscoveryException("未發現合適的注冊中心。");}}private String getRegistryType(String connectString,boolean ifType){String[] typeAndHost = connectString.split("://");if(typeAndHost.length != 2){throw new RuntimeException("給定的注冊中心連接url不合法");}if(ifType){return typeAndHost[0];} else {return typeAndHost[1];}}
}
- 掃描接口并發布scan
// 掃描項目指定包下面的接口,并且將他們發布到注冊中心
public TrpcBootstrap scan(String packageName) {// 1. 獲取指定包 path 下的所有類的全限定名List<String> classNames = getAllClassName(packageName);// 2.1 拿到所有標注了TrpcApi注解的類List<? extends Class<?>> classes = getTrpcClassesByList(classNames);// 2.2遍歷這些構建實例for (Class<?> clazz : classes) {Class<?>[] interfaces = clazz.getInterfaces(); // 獲取到他的接口Object instance;try {instance = clazz.getConstructor().newInstance(); // 通過無參構造器創建一個實例} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {throw new RuntimeException(e);}// 獲取注解的groupTrpcApi annotation = clazz.getAnnotation(TrpcApi.class);String group = annotation.group();// 3.將這些接口發布for (Class<?> anInterface : interfaces) {ServiceConfig<?> serviceConfig = new ServiceConfig<>();serviceConfig.setInterface(anInterface);serviceConfig.setRef(instance);serviceConfig.setGroup(group);if (log.isDebugEnabled()){log.debug("---->已經通過包掃描,將服務【{}】發布.",anInterface);}// 3、發布publish(serviceConfig);}}return this;
}//
private TrpcBootstrap publish( ServiceConfig<?> service ) {configuration.getRegistryConfig().getRegistry().register(service);// 維護一個映射關系SERVERS_LIST.put(service.getInterface().getName(), service);return this;
}
具體可以看源碼里面的實現
- 啟動netty
public void start() {// 1、創建eventLoop,老板只負責處理請求,之后會將請求分發至workerEventLoopGroup boss = new NioEventLoopGroup(2);EventLoopGroup worker = new NioEventLoopGroup(10);try {ServerBootstrap bootstrap = new ServerBootstrap();// 3.配置服務bootstrap = bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 是核心,我們需要添加很多入站和出站的handlersocketChannel.pipeline().addLast(new LoggingHandler()) // 打印日志.addLast(new TrpcRequestDecoder()) // 請求過來,需要解碼// 根據請求進行方法調用.addLast(new MethodCallHandler()).addLast(new TrpcResponseEncoder()) // 響應回去,需要編碼;}});// 4.綁定端口ChannelFuture channelFuture = bootstrap.bind(configuration.getPort()).sync();// 5.關閉channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}
}
② 服務消費方
形如 all-tRpc-demo / demo-simple-consumer /…/ConsumerApplication.java
public class ConsumerApplication {public static void main(String[] args) {ReferenceConfig<UserService> reference = new ReferenceConfig<>();reference.setReference(UserService.class);// 1、連接注冊中心// 2、拉取服務列表// 3、選擇一個服務并建立連接// 4、發送請求,攜帶一些信息(接口名,參數列表,方法的名字),獲得結果TrpcBootstrap.getInstance().appName("user-consumer").registry(new RegistryConfig("zookeeper://127.0.0.1:2181")).reference(reference);UserService userService = reference.get();System.out.println("=======================================");for (int i = 0; i < 10; i++) {System.out.println("【rpc調用開始=============】");// 開始時間long start = System.currentTimeMillis();List<User> users = userService.getUserByName("田小鋒q" + i);for (User user : users) {System.out.println(user);}// 結束時間long end = System.currentTimeMillis();System.out.println("rpc執行耗時:" + (end - start));System.out.println("【rpc調用=============結束-----】");}}
}@Slf4j
public class ReferenceConfig<T> {// 接口類型private Class<T> interfaceRef;// 注冊中心private Registry registry;// 分組信息private String group;/*** 代理設計模式* @return 代理對象*/public T get() {// 此處一定是使用動態代理完成了一些工作ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 類加載器Class<T>[] classes = new Class[]{interfaceRef}; // 接口類型InvocationHandler handler = new ProxyConsumerInvocationHandler(registry,interfaceRef,group);// 使用動態代理生成代理對象Object helloProxy = Proxy.newProxyInstance(classLoader, classes, handler);return (T)helloProxy;}
}
主要是jdk動態代理 在invoke里面實現我們的遠程調用
5. 序列化&&壓縮
在 RPC(遠程過程調用)框架中,序列化和壓縮是兩個重要的概念,它們在數據傳輸過程中起著關鍵作用
序列化
上圖里面可以看出來,序列化是將對象轉換為字節流的過程,以便在網絡上傳輸或存儲到文件中。在 RPC 中,客戶端調用遠程服務時,需要將調用的參數對象序列化為字節流,通過網絡發送到服務端;服務端接收到字節流后,再將其反序列化為對象進行處理。處理完后,又將結果對象序列化為字節流返回給客戶端,客戶端再反序列化得到結果。
常見的序列化方式
那么我們就定義一下序列化的接口
public interface Serializer {/*** 抽象的用來做序列化的方法*/byte[] serialize(Object object);/*** 反序列化的方法*/<T> T deserialize(byte[] bytes, Class<T> clazz);
}
1. JDK序列化
@Slf4j // lombok里面的日志注解
public class JdkSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if (object == null) return null;try (ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream outputStream = new ObjectOutputStream(baos);) { // try - with寫法outputStream.writeObject(object);byte[] result = baos.toByteArray();if(log.isInfoEnabled()){log.info("對象【{}】已經完成了序列化操作,序列化后的字節數為【{}】",object,result.length);}return result;} catch (IOException e) {log.error("序列化對象【{}】時放生異常.",object);throw new SerializeException(e);}}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if(bytes == null || clazz == null) return null;try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);ObjectInputStream objectInputStream = new ObjectInputStream(bais);) {Object object = objectInputStream.readObject();if(log.isInfoEnabled()){log.info("類【{}】已經完成了反序列化操作.",clazz);}return (T)object;} catch (IOException | ClassNotFoundException e) {log.error("反序列化對象【{}】時放生異常.",clazz);throw new SerializeException(e);}}
}
2.JSON序列化
這里就用fastjson了
@Slf4j
public class JsonSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if (object == null) return null;byte[] result = JSON.toJSONBytes(object);if (log.isInfoEnabled()) {log.info("對象【{}】已經完成了序列化操作,序列化后的字節數為【{}】", object, result.length);}return result;}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if (bytes == null || clazz == null) return null;T t = JSON.parseObject(bytes, clazz);if (log.isInfoEnabled()) {log.info("類【{}】已經完成了反序列化操作.", clazz);}return t;}
}
3.Hessian序列化
Hessian是一個輕量級的、基于HTTP的RPC(遠程過程調用)框架,由Resin開源提供。它使用一個簡單的、基于二進制的協議來序列化對象,并通過HTTP進行傳輸。Hessian的設計目標是提供一種高效、可靠且易于使用的遠程服務調用機制。
maven依賴
<dependency><groupId>com.caucho</groupId><artifactId>hessian</artifactId><version>版本號</version> <!-- 4.0.66 -->
</dependency>
@Slf4j
public class HessianSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if (object == null) return null;try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) {Hessian2Output hessian2Output = new Hessian2Output(baos);hessian2Output.writeObject(object);hessian2Output.flush();byte[] result = baos.toByteArray();if(log.isInfoEnabled())log.info("對象【{}】已經完成了序列化操作,序列化后的字節數為【{}】",object,result.length);return result;} catch (IOException e) {log.error("使用hessian進行序列化對象【{}】時放生異常.",object);throw new SerializeException(e);}}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if(bytes == null || clazz == null) return null;try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);) {Hessian2Input hessian2Input = new Hessian2Input(bais);T t = (T) hessian2Input.readObject();if(log.isInfoEnabled()log.info("類【{}】已經使用hessian完成了反序列化操作.",clazz);return t;} catch (IOException e) {log.error("使用hessian進行反序列化對象【{}】時發生異常.",clazz);throw new SerializeException(e);}}
}
4.序列化工廠
轉念一想嚯,我們這是實現一個框架額,肯定是要有對外擴展的能力的,那么,我們就將序列化的所有方式默認加載到內存中去,通過一個工廠類來獲取指定的序列化方法就可以了,然后暴露添加其他序列化方法的接口。
/*** @version 1.0* @Author txf* @Date 2025/2/10 15:17* @注釋 序列化工廠*/
@Slf4j
public class SerializerFactory {private final static ConcurrentHashMap<String, ObjectWrapper<Serializer>> SERIALIZER_CACHE = new ConcurrentHashMap<>(8);private final static ConcurrentHashMap<Byte, ObjectWrapper<Serializer>> SERIALIZER_CACHE_CODE = new ConcurrentHashMap<>(8);static {ObjectWrapper<Serializer> jdk = new ObjectWrapper<>((byte) 1, "jdk", new JdkSerializer());ObjectWrapper<Serializer> json = new ObjectWrapper<>((byte) 2, "json", new JsonSerializer());ObjectWrapper<Serializer> hessian = new ObjectWrapper<>((byte) 3, "hessian", new HessianSerializer());SERIALIZER_CACHE.put("jdk",jdk);SERIALIZER_CACHE.put("json",json);SERIALIZER_CACHE.put("hessian",hessian);SERIALIZER_CACHE_CODE.put((byte) 1, jdk);SERIALIZER_CACHE_CODE.put((byte) 2, json);SERIALIZER_CACHE_CODE.put((byte) 3, hessian);}/*** 使用工廠方法獲取一個SerializerWrapper* @param serializeType 序列化的類型* @return SerializerWrapper*/public static ObjectWrapper<Serializer> getSerializer(String serializeType) {ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CACHE.get(serializeType);if(serializerWrapper == null){log.error("未找到您配置的【{}】序列化工具,默認選用jdk的序列化方式。",serializeType);return SERIALIZER_CACHE.get("jdk");}return SERIALIZER_CACHE.get(serializeType);}public static ObjectWrapper<Serializer> getSerializer(Byte serializeCode) {ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CACHE_CODE.get(serializeCode);if(serializerWrapper == null){log.error("未找到您配置的【{}】序列化工具,默認選用jdk的序列化方式。",serializeCode);return SERIALIZER_CACHE.get("jdk");}return SERIALIZER_CACHE_CODE.get(serializeCode);}/*** 新增一個新的序列化器* @param serializerObjectWrapper 序列化器的包裝*/public static void addSerializer(ObjectWrapper<Serializer> serializerObjectWrapper){SERIALIZER_CACHE.put(serializerObjectWrapper.getName(),serializerObjectWrapper);SERIALIZER_CACHE_CODE.put(serializerObjectWrapper.getCode(),serializerObjectWrapper);}
}
壓縮
壓縮是指通過特定的算法對數據進行處理,減少數據的存儲空間或傳輸帶寬。在 RPC 中,對序列化后的字節流進行壓縮可以進一步減少數據傳輸量,提高傳輸效率。說白了,就是傳的少了。RPC嘛,將性能追求到極致!!!!
設計方式同序列化一樣的。
public interface Compressor {// 序列化后的字節數組壓縮byte[] compress(byte[] bytes);// 解壓縮byte[] decompress(byte[] bytes);
}
Gzip壓縮
@Slf4j
public class GzipCompressor implements Compressor {@Overridepublic byte[] compress(byte[] bytes) {try (ByteArrayOutputStream baos = new ByteArrayOutputStream();GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);) {gzipOutputStream.write(bytes);gzipOutputStream.finish();byte[] result = baos.toByteArray();if(log.isInfoEnabled())log.info("對字節數組進行了壓縮長度由【{}】壓縮至【{}】.",bytes.length,result.length);return result;} catch (IOException e){log.error("對字節數組進行壓縮時發生異常",e);throw new CompressException(e);}}@Overridepublic byte[] decompress(byte[] bytes) {try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);GZIPInputStream gzipInputStream = new GZIPInputStream(bais);) {byte[] result = gzipInputStream.readAllBytes();if(log.isInfoEnabled())log.info("對字節數組進行了解壓縮長度由【{}】變為【{}】.",bytes.length,result.length);return result;} catch (IOException e){log.error("對字節數組進行壓縮時發生異常",e);throw new CompressException(e);}}
}
壓縮工廠就不在這里寫了。
導讀部分結束了。。