Rpc導讀

手寫Rpc框架 - 導讀

git倉庫-all-rpc

GTIEE:https://gitee.com/quercus-sp204/all-rpc 【參考源碼 yrpc】

1. Rpc概念

RPC 即遠程過程調用(Remote Procedure Call) ,就是通過網絡從遠程計算機程序上請求服務。

  • 本地調用抽象:允許程序像調用本地函數一樣調用遠程計算機上的函數。開發者無需編寫復雜的網絡通信代碼來處理諸如建立連接、發送請求、等待響應等細節,只需關注業務邏輯的實現。例如,在一個分布式系統中,A 服務器上的程序需要調用 B 服務器上的某個函數來獲取數據,使用 RPC 就可以像調用本地函數一樣簡單。
  • 通信協議與序列化:為了實現這種跨網絡的函數調用,RPC 框架通常會使用特定的通信協議(如 TCP/IP)來傳輸數據,并通過序列化和反序列化技術,將調用的參數和返回值轉換為適合在網絡上傳輸的格式。比如,將參數對象轉換為字節流進行傳輸,在接收端再將字節流還原為對象。

那么,它的應用場景有哪些呢?我們平時用到了嗎?

  • 微服務架構:在微服務架構中,各個微服務之間通常需要相互通信來完成復雜的業務流程。例如,一個電商系統中,訂單服務可能需要調用庫存服務來檢查商品庫存,調用用戶服務來驗證用戶信息等,RPC 可以高效地實現這些微服務間的通信。
  • 分布式系統:在大型分布式系統中,不同節點可能負責不同的功能模塊。例如,在搜索引擎系統中,索引構建節點和查詢服務節點可能分布在不同的服務器上,通過 RPC 可以實現節點之間的協同工作。

說白了,就是不同服務之間的網絡通信嘛。那你可能會問了,假如我的系統有User、Order、Shipment三個服務【爪哇SpringBoot編寫的嚯】,如果User想要訪問Order上面的函數,我只需要將此函數以Http接口的形式暴露出去,然后User那邊使用RestTemplate來訪問不就好了嗎?何必這么費勁還要用框架呢?

仔細想一下,確實有那么點兒卵道理,但是又仔細一想,

  • 你會發現,你使用RestTemplate的時候,需要手動處理請求的 URL 拼接、請求頭設置、參數序列化和響應反序列化等操作,對了,還有異常也需要自己處理,需要自己處理調用失敗的情況,如重試幾次啊等等
  • HTTP 協議是一種文本協議,存在較多的頭部信息,在數據傳輸時會帶來額外的開銷。而rpc框架的協議通常相比http協議,是很輕量的。
  • 如果Order部署在了多臺機器上面,代碼里面肯定是存了這些機器的地址,如果擴容或者縮容,都要修改代碼,同時還需要我們手動選擇調用哪一臺機器上面的Order【需要手動實現負載均衡】

那么,rpc都可以完成這些,并且服務的地址信息那些啊,可以在rpc的注冊中心拉取到,動態感知。

綜上所述,一個基本的Rpc框架主要應該具有的能力是:

基礎通信能力

  • 高效序列化、優化網絡傳輸,采用高效的網絡協議和傳輸方式

  • 穩定性與可靠性

    • 連接管理:具備完善的連接池管理機制,對連接進行復用,減少頻繁建立和銷毀連接的開銷,同時保證連接的穩定性。例如,在高并發場景下,能自動處理連接的超時、重連等問題

服務發現與治理能力

  • 服務注冊與發現: 1. 服務提供者能夠在啟動時自動將自己的服務信息(如服務名稱、地址、端口等)注冊到服務注冊中心,方便服務消費者發現和調用。2.實時感知:調用方能夠及時感知調用的服務信息并更新。
  • 負載均衡: 如隨機、輪詢、最少活躍調用數、一致性哈希
  • 服務容錯
    • 熔斷機制:當服務提供者出現故障或響應時間過長時,能夠自動熔斷對該服務的調用,避免大量請求積壓,影響整個系統的穩定性。
    • 降級策略:在系統資源緊張或服務出現故障時,能夠自動降級服務,提供默認的響應結果或采取其他臨時措施,保證系統的基本可用性。

易用性與可擴展性

  • 簡單的編程模型:讓開發者能夠像調用本地函數一樣調用遠程服務,無需關注底層的網絡通信細節,降低開發難度,提高開發效率
  • 插件化與擴展性
    • SPI 機制:具備良好的插件化架構,通過服務提供者接口(SPI)機制,允許開發者根據實際需求擴展框架的功能,如自定義序列化方式、負載均衡策略、過濾器等。

在這里插入圖片描述

現在就按照上面的能力,來一個一個實現,最終將其組合成一個框架。

2. 角色

一個Rpc框架的大致角色分布:

在這里插入圖片描述

服務提供者將自己的數據信息【例如,端口、ip、接口等信息提供給注冊中心】(服務注冊)

消費者從注冊中心拉取到可用的服務信息(服務發現),然后選擇一個合適的服務(負載均衡),發送網絡請求【請求里面封裝了需要調用的接口,參數等等】,

服務提供者接收到請求之后,本地調用方法,然后通過網絡把響應結果過傳輸給消費者

最后消費者解析響應結果。

注冊中心: (本文就選擇zookeeper為注冊中心)

3. 注冊中心的接入

zookeeper的安裝與啟動,就不在這里贅述了。

思考注冊中心的主要能力:【服務注冊,服務發現】

定義接口

Registry接口

/** 注冊中心的能力: 注冊服務, 拉取服務列表*/
public interface Registry {/*** 注冊服務* @param serviceConfig 服務的配置內容*/void register(ServiceConfig<?> serviceConfig);/*** 從注冊中心拉取服務列表* @param serviceName 服務的名稱* @return 服務的地址*/List<InetSocketAddress> lookup(String serviceName, String group);
}

ServiceConfig 封裝服務信息的class

/*服務信息*/
public class ServiceConfig<T> {// 接口的類型/*比如UserServiceImpl實現了UserService, 真實對象就是實現類,interfaceProvider就是UserService.class*/private Class<?> interfaceProvider;private Object ref; // 真實對象private String group = "default"; // 分組// get set.....
}

Zookeeper注冊中心的實現類 ZookeeperRegistry

服務注冊在zookeeper上面的節點如圖所示

trpc根節點
==?消費者節點
==▼生產者節點
====▼接口的全限定名 
======▼分組
========▼地址信息1...

在這里插入圖片描述

@Slf4j
public class ZookeeperRegistry extends AbstractRegistry {// 維護一個zk實例private ZooKeeper zooKeeper;public ZookeeperRegistry() {this.zooKeeper = ZookeeperUtil.createZookeeper();}public ZookeeperRegistry(String connectString,int timeout) {this.zooKeeper = ZookeeperUtil.createZookeeper(connectString,timeout);}@Overridepublic void register(ServiceConfig<?> service) {// 服務名稱的節點 ----  "/tprc-metadata/providers/接口全限定名"String parentNode = Constant.BASE_PROVIDERS_PATH +"/"+service.getInterface().getName();// 建立服務節點這個節點應該是一個持久節點if(!ZookeeperUtil.exists(zooKeeper,parentNode,null)){ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode,null);ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);}// 建立分組節點parentNode = parentNode + "/" + service.getGroup();if(!ZookeeperUtil.exists(zooKeeper,parentNode,null)){ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode,null);ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);}// 創建本機的臨時節點, ip:port ,// 服務提供方的端口一般自己設定,我們還需要一個獲取ip的方法// ip我們通常是需要一個局域網ip,不是127.0.0.1,也不是ipv6// 192.168.12.121String node = parentNode + "/" + NetUtils.getIp() + ":" + TrpcBootstrap.getInstance().getConfiguration().getPort();if(!ZookeeperUtil.exists(zooKeeper,node,null)){ZookeeperNode zookeeperNode = new ZookeeperNode(node,null);ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.EPHEMERAL);}log.info("服務{},注冊ok",service.getInterface().getName());}/*** 拉取合適的服務列表* @param serviceName 服務名稱* @return 服務列表*/@Overridepublic List<InetSocketAddress> lookup(String serviceName,String group) {// 1、找到服務對應的節點String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName + "/" +group;List<String> children = ZookeeperUtil.getChildren(zooKeeper, serviceNode, null);// 獲取了所有的可用的服務列表List<InetSocketAddress> inetSocketAddresses = children.stream().map(ipString -> {String[] ipAndPort = ipString.split(":");String ip = ipAndPort[0];int port = Integer.parseInt(ipAndPort[1]);return new InetSocketAddress(ip, port);}).toList();if(inetSocketAddresses.isEmpty()){throw new DiscoveryException("未發現任何可用的服務主機.");}return inetSocketAddresses;}
}

上面的ZookeeperUtil是自定義的操作Zookeeper的工具類。-- 詳情見源碼里面的注釋,值得說明一下,Zookeeper要先有父結點才能創建子節點,不能把路徑直接寫全了直接創建,故在源碼里面會用createRoot方法初始化所有的父結點。

public static ZooKeeper createZookeeper(String connectPath, int timeout) {CountDownLatch countDownLatch = new CountDownLatch(1);try {.................// 連接成功就創建根節點,檢查是否存在rpc根節點  /trpc-metadata/providers  &&  /trpc-metadata/consumerscreateRoot(zooKeeper);...............} catch (IOException | InterruptedException e) {log.info("創建zookeeper實例時發生異常:",e);throw new ZookeeperException("創建zookeeper實例時發生異常");}
}

至此,注冊中心的兩個基本方法就可以告一段落了。

4.Trpc框架啟動器

既然是一個框架,那么,我們必然有一個入口來啟動這一套流程。

①服務提供方

- 基本功能信息appName、registry

形如: all-tRpc-demo / demo-simple-provider / …/ProviderApplication.java 這樣來啟動我們的提供方。

public class ProviderApplication {public static void main(String[] args) {TrpcBootstrap.getInstance() .appName("user-provider")// 配置注冊中心.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))// 掃描包下的類,然后批量發布接口.scan("com.feng.demo")// 啟動服務.start();}
}

在TrpcBootstrap.java里面

@Slf4j
public class TrpcBootstrap {// 單例,每個應用程序只有一個private static final TrpcBootstrap trpcBootstrap = new TrpcBootstrap();// 配置private final RpcConfiguration configuration;// 獲取實例public static TrpcBootstrap getInstance() {return trpcBootstrap;}// 設置應用名稱 *****public TrpcBootstrap appName( String appName ) {configuration.setAppName(appName);return this;}// 配置注冊中心 *****public TrpcBootstrap registry( RegistryConfig registryConfig ) {// 傳遞過來的參數registryConfig,還沒有創建連接,在這里創建與注冊中心的連接registryConfig.createRegistryConnection(); // 創建注冊中心的連接configuration.setRegistryConfig(registryConfig); // 設置服務注冊中心return this;}
}

RpcConfiguration是封裝的配置信息

// 全局的配置類,代碼配置 --> xml配置 --> 默認項
@Data
public class RpcConfiguration {// 配置信息-->端口號private int port = 8094;// 配置信息-->應用程序的名字private String appName = "default";// 分組信息private String group = "default";// 配置信息-->注冊中心private RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181"); // 默認的// 配置信息-->序列化協議private String serializeType = "jdk";// 配置信息-->壓縮使用的協議private String compressType = "gzip";// 配置信息@Getterpublic IdGenerator idGenerator = new IdGenerator(1, 2);// 配置信息-->負載均衡策略private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();// 為每一個ip配置一個限流器private final Map<SocketAddress, RateLimiter> everyIpRateLimiter = new ConcurrentHashMap<>(16);// 為每一個ip配置一個斷路器,熔斷private final Map<SocketAddress, CircuitBreaker> everyIpCircuitBreaker = new ConcurrentHashMap<>(16);// 讀xml,dom4jpublic RpcConfiguration() {............}
}// 里面又持有注冊中心的類
@Slf4j
public class RegistryConfig {// 定義連接的 url zookeeper://127.0.0.1:2181  redis://192.168.12.125:3306private final String connectString;// 持有一個 Registryprivate Registry registry;public RegistryConfig(String connectString) {this.connectString = connectString;}public Registry getRegistry() {if ( registry == null ) createRegistryConnection();return registry;}/*** 可以使用簡單工廠來完成* @return 具體的注冊中心實例*/public void createRegistryConnection() {if ( connectString == null ) throw new DiscoveryException("未配置注冊中心!");// 1、獲取注冊中心的類型String registryType = getRegistryType(connectString,true).toLowerCase().trim();log.info("【創建與注冊中心的連接~~~】 注冊中心的類型: {}", registryType);// 2、通過類型獲取具體注冊中心if( registryType.equals("zookeeper") ){String host = getRegistryType(connectString, false);this.registry = new ZookeeperRegistry(host, Constant.ZK_TIME_OUT);} else if (registryType.equals("nacos")){String host = getRegistryType(connectString, false);this.registry = new NacosRegistry(host, Constant.ZK_TIME_OUT);} else {throw new DiscoveryException("未發現合適的注冊中心。");}}private String getRegistryType(String connectString,boolean ifType){String[] typeAndHost = connectString.split("://");if(typeAndHost.length != 2){throw new RuntimeException("給定的注冊中心連接url不合法");}if(ifType){return typeAndHost[0];} else {return typeAndHost[1];}}
}
- 掃描接口并發布scan
// 掃描項目指定包下面的接口,并且將他們發布到注冊中心
public TrpcBootstrap scan(String packageName) {// 1. 獲取指定包 path 下的所有類的全限定名List<String> classNames = getAllClassName(packageName);// 2.1 拿到所有標注了TrpcApi注解的類List<? extends Class<?>> classes = getTrpcClassesByList(classNames);//  2.2遍歷這些構建實例for (Class<?> clazz : classes) {Class<?>[] interfaces = clazz.getInterfaces(); // 獲取到他的接口Object instance;try {instance = clazz.getConstructor().newInstance(); // 通過無參構造器創建一個實例} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {throw new RuntimeException(e);}// 獲取注解的groupTrpcApi annotation = clazz.getAnnotation(TrpcApi.class);String group = annotation.group();// 3.將這些接口發布for (Class<?> anInterface : interfaces) {ServiceConfig<?> serviceConfig = new ServiceConfig<>();serviceConfig.setInterface(anInterface);serviceConfig.setRef(instance);serviceConfig.setGroup(group);if (log.isDebugEnabled()){log.debug("---->已經通過包掃描,將服務【{}】發布.",anInterface);}// 3、發布publish(serviceConfig);}}return this;
}//
private TrpcBootstrap publish( ServiceConfig<?> service ) {configuration.getRegistryConfig().getRegistry().register(service);// 維護一個映射關系SERVERS_LIST.put(service.getInterface().getName(), service);return this;
}

具體可以看源碼里面的實現

- 啟動netty
public void start() {// 1、創建eventLoop,老板只負責處理請求,之后會將請求分發至workerEventLoopGroup boss = new NioEventLoopGroup(2);EventLoopGroup worker = new NioEventLoopGroup(10);try {ServerBootstrap bootstrap = new ServerBootstrap();// 3.配置服務bootstrap = bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 是核心,我們需要添加很多入站和出站的handlersocketChannel.pipeline().addLast(new LoggingHandler()) // 打印日志.addLast(new TrpcRequestDecoder())  // 請求過來,需要解碼// 根據請求進行方法調用.addLast(new MethodCallHandler()).addLast(new TrpcResponseEncoder()) // 響應回去,需要編碼;}});// 4.綁定端口ChannelFuture channelFuture = bootstrap.bind(configuration.getPort()).sync();// 5.關閉channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}
}

② 服務消費方

形如 all-tRpc-demo / demo-simple-consumer /…/ConsumerApplication.java

public class ConsumerApplication {public static void main(String[] args) {ReferenceConfig<UserService> reference = new ReferenceConfig<>();reference.setReference(UserService.class);// 1、連接注冊中心// 2、拉取服務列表// 3、選擇一個服務并建立連接// 4、發送請求,攜帶一些信息(接口名,參數列表,方法的名字),獲得結果TrpcBootstrap.getInstance().appName("user-consumer").registry(new RegistryConfig("zookeeper://127.0.0.1:2181")).reference(reference);UserService userService = reference.get();System.out.println("=======================================");for (int i = 0; i < 10; i++) {System.out.println("【rpc調用開始=============】");// 開始時間long start = System.currentTimeMillis();List<User> users = userService.getUserByName("田小鋒q" + i);for (User user : users) {System.out.println(user);}// 結束時間long end = System.currentTimeMillis();System.out.println("rpc執行耗時:" + (end - start));System.out.println("【rpc調用=============結束-----】");}}
}@Slf4j
public class ReferenceConfig<T> {// 接口類型private Class<T> interfaceRef;// 注冊中心private Registry registry;// 分組信息private String group;/*** 代理設計模式* @return 代理對象*/public T get() {// 此處一定是使用動態代理完成了一些工作ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 類加載器Class<T>[] classes = new Class[]{interfaceRef}; // 接口類型InvocationHandler handler = new ProxyConsumerInvocationHandler(registry,interfaceRef,group);// 使用動態代理生成代理對象Object helloProxy = Proxy.newProxyInstance(classLoader, classes, handler);return (T)helloProxy;}
}

主要是jdk動態代理 在invoke里面實現我們的遠程調用

5. 序列化&&壓縮

在 RPC(遠程過程調用)框架中,序列化和壓縮是兩個重要的概念,它們在數據傳輸過程中起著關鍵作用

序列化

上圖里面可以看出來,序列化是將對象轉換為字節流的過程,以便在網絡上傳輸或存儲到文件中。在 RPC 中,客戶端調用遠程服務時,需要將調用的參數對象序列化為字節流,通過網絡發送到服務端;服務端接收到字節流后,再將其反序列化為對象進行處理。處理完后,又將結果對象序列化為字節流返回給客戶端,客戶端再反序列化得到結果。

常見的序列化方式

那么我們就定義一下序列化的接口

public interface Serializer {/*** 抽象的用來做序列化的方法*/byte[] serialize(Object object);/*** 反序列化的方法*/<T> T deserialize(byte[] bytes, Class<T> clazz);
}
1. JDK序列化
@Slf4j // lombok里面的日志注解
public class JdkSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if (object == null) return null;try (ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream outputStream = new ObjectOutputStream(baos);) { // try - with寫法outputStream.writeObject(object);byte[] result = baos.toByteArray();if(log.isInfoEnabled()){log.info("對象【{}】已經完成了序列化操作,序列化后的字節數為【{}】",object,result.length);}return result;} catch (IOException e) {log.error("序列化對象【{}】時放生異常.",object);throw new SerializeException(e);}}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if(bytes == null || clazz == null) return null;try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);ObjectInputStream objectInputStream = new ObjectInputStream(bais);) {Object object = objectInputStream.readObject();if(log.isInfoEnabled()){log.info("類【{}】已經完成了反序列化操作.",clazz);}return (T)object;} catch (IOException | ClassNotFoundException e) {log.error("反序列化對象【{}】時放生異常.",clazz);throw new SerializeException(e);}}
}
2.JSON序列化

這里就用fastjson了

@Slf4j
public class JsonSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if (object == null) return null;byte[] result = JSON.toJSONBytes(object);if (log.isInfoEnabled()) {log.info("對象【{}】已經完成了序列化操作,序列化后的字節數為【{}】", object, result.length);}return result;}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if (bytes == null || clazz == null) return null;T t = JSON.parseObject(bytes, clazz);if (log.isInfoEnabled()) {log.info("類【{}】已經完成了反序列化操作.", clazz);}return t;}
}
3.Hessian序列化

Hessian是一個輕量級的、基于HTTP的RPC(遠程過程調用)框架,由Resin開源提供。它使用一個簡單的、基于二進制的協議來序列化對象,并通過HTTP進行傳輸。Hessian的設計目標是提供一種高效、可靠且易于使用的遠程服務調用機制。

maven依賴

<dependency><groupId>com.caucho</groupId><artifactId>hessian</artifactId><version>版本號</version>  <!-- 4.0.66 -->
</dependency>
@Slf4j
public class HessianSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if (object == null) return null;try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) {Hessian2Output hessian2Output = new Hessian2Output(baos);hessian2Output.writeObject(object);hessian2Output.flush();byte[] result = baos.toByteArray();if(log.isInfoEnabled())log.info("對象【{}】已經完成了序列化操作,序列化后的字節數為【{}】",object,result.length);return result;} catch (IOException e) {log.error("使用hessian進行序列化對象【{}】時放生異常.",object);throw new SerializeException(e);}}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if(bytes == null || clazz == null) return null;try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);) {Hessian2Input hessian2Input = new Hessian2Input(bais);T t = (T) hessian2Input.readObject();if(log.isInfoEnabled()log.info("類【{}】已經使用hessian完成了反序列化操作.",clazz);return t;} catch (IOException  e) {log.error("使用hessian進行反序列化對象【{}】時發生異常.",clazz);throw new SerializeException(e);}}
}
4.序列化工廠

轉念一想嚯,我們這是實現一個框架額,肯定是要有對外擴展的能力的,那么,我們就將序列化的所有方式默認加載到內存中去,通過一個工廠類來獲取指定的序列化方法就可以了,然后暴露添加其他序列化方法的接口。

/*** @version 1.0* @Author txf* @Date 2025/2/10 15:17* @注釋 序列化工廠*/
@Slf4j
public class SerializerFactory {private final static ConcurrentHashMap<String, ObjectWrapper<Serializer>> SERIALIZER_CACHE = new ConcurrentHashMap<>(8);private final static ConcurrentHashMap<Byte, ObjectWrapper<Serializer>> SERIALIZER_CACHE_CODE = new ConcurrentHashMap<>(8);static {ObjectWrapper<Serializer> jdk = new  ObjectWrapper<>((byte) 1, "jdk", new JdkSerializer());ObjectWrapper<Serializer> json = new  ObjectWrapper<>((byte) 2, "json", new JsonSerializer());ObjectWrapper<Serializer> hessian = new  ObjectWrapper<>((byte) 3, "hessian", new HessianSerializer());SERIALIZER_CACHE.put("jdk",jdk);SERIALIZER_CACHE.put("json",json);SERIALIZER_CACHE.put("hessian",hessian);SERIALIZER_CACHE_CODE.put((byte) 1, jdk);SERIALIZER_CACHE_CODE.put((byte) 2, json);SERIALIZER_CACHE_CODE.put((byte) 3, hessian);}/*** 使用工廠方法獲取一個SerializerWrapper* @param serializeType 序列化的類型* @return SerializerWrapper*/public static  ObjectWrapper<Serializer> getSerializer(String serializeType) {ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CACHE.get(serializeType);if(serializerWrapper == null){log.error("未找到您配置的【{}】序列化工具,默認選用jdk的序列化方式。",serializeType);return SERIALIZER_CACHE.get("jdk");}return SERIALIZER_CACHE.get(serializeType);}public static  ObjectWrapper<Serializer> getSerializer(Byte serializeCode) {ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CACHE_CODE.get(serializeCode);if(serializerWrapper == null){log.error("未找到您配置的【{}】序列化工具,默認選用jdk的序列化方式。",serializeCode);return SERIALIZER_CACHE.get("jdk");}return SERIALIZER_CACHE_CODE.get(serializeCode);}/*** 新增一個新的序列化器* @param serializerObjectWrapper 序列化器的包裝*/public static void addSerializer(ObjectWrapper<Serializer> serializerObjectWrapper){SERIALIZER_CACHE.put(serializerObjectWrapper.getName(),serializerObjectWrapper);SERIALIZER_CACHE_CODE.put(serializerObjectWrapper.getCode(),serializerObjectWrapper);}
}

壓縮

壓縮是指通過特定的算法對數據進行處理,減少數據的存儲空間或傳輸帶寬。在 RPC 中,對序列化后的字節流進行壓縮可以進一步減少數據傳輸量,提高傳輸效率。說白了,就是傳的少了。RPC嘛,將性能追求到極致!!!!

設計方式同序列化一樣的。

public interface Compressor {// 序列化后的字節數組壓縮byte[] compress(byte[] bytes);// 解壓縮byte[] decompress(byte[] bytes);
}

Gzip壓縮

@Slf4j
public class GzipCompressor implements Compressor {@Overridepublic byte[] compress(byte[] bytes) {try (ByteArrayOutputStream baos = new ByteArrayOutputStream();GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);) {gzipOutputStream.write(bytes);gzipOutputStream.finish();byte[] result = baos.toByteArray();if(log.isInfoEnabled())log.info("對字節數組進行了壓縮長度由【{}】壓縮至【{}】.",bytes.length,result.length);return result;} catch (IOException e){log.error("對字節數組進行壓縮時發生異常",e);throw new CompressException(e);}}@Overridepublic byte[] decompress(byte[] bytes) {try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);GZIPInputStream gzipInputStream = new GZIPInputStream(bais);) {byte[] result = gzipInputStream.readAllBytes();if(log.isInfoEnabled())log.info("對字節數組進行了解壓縮長度由【{}】變為【{}】.",bytes.length,result.length);return result;} catch (IOException e){log.error("對字節數組進行壓縮時發生異常",e);throw new CompressException(e);}}
}

壓縮工廠就不在這里寫了。

導讀部分結束了。。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/71565.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/71565.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/71565.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

網絡安全:防范NetBIOS漏洞的攻擊

稍微懂點電腦知識的朋友都知道&#xff0c;NetBIOS 是計算機局域網領域流行的一種傳輸方式&#xff0c;但你是否還知道&#xff0c;對于連接互聯網的機器來講&#xff0c;NetBIOS是一大隱患。 漏洞描述 NetBIOS(Network Basic Input Output System&#xff0c;網絡基本輸入輸…

VIE(可變利益實體)架構通俗解析 —— 以阿里巴巴為例(中英雙語)

VIE&#xff08;可變利益實體&#xff09;架構通俗解析 —— 以阿里巴巴為例 什么是 VIE 架構&#xff1f; VIE&#xff08;Variable Interest Entity&#xff0c;可變利益實體&#xff09;是一種特殊的法律結構&#xff0c;主要用于中國企業在海外上市&#xff0c;特別是受中…

使用代碼與 AnythingLLM 交互的基本方法和示例

AnythingLLM 是一個基于大語言模型&#xff08;LLM&#xff09;的工具&#xff0c;主要用于構建和管理個人或企業知識庫。雖然它主要提供圖形化界面&#xff08;GUI&#xff09;進行操作&#xff0c;但也可以通過代碼進行一些高級配置和集成。以下是使用代碼與 AnythingLLM 交互…

用DeepSeek零基礎預測《哪吒之魔童鬧海》票房——從數據爬取到模型實戰

系列文章目錄 1.元件基礎 2.電路設計 3.PCB設計 4.元件焊接 5.板子調試 6.程序設計 7.算法學習 8.編寫exe 9.檢測標準 10.項目舉例 11.職業規劃 文章目錄 **一、為什么要預測票房&#xff1f;****二、準備工作****三、實戰步驟詳解****Step 1&#xff1a;數據爬取與清洗&am…

如何將MySQL數據庫遷移至阿里云

將 MySQL 數據庫遷移至阿里云可以通過幾種不同的方法&#xff0c;具體選擇哪種方式取決于你的數據庫大小、數據復雜性以及對遷移速度的需求。阿里云提供了多種遷移工具和服務&#xff0c;本文將為你介紹幾種常見的方法。 方法一&#xff1a;使用 阿里云數據庫遷移服務 (DTS) 阿…

Ubuntu22.04 - gflags的安裝和使用

目錄 gflags 介紹gflags 安裝gflags 使用 gflags 介紹 gflags 是Google 開發的一個開源庫&#xff0c;用于 C應用程序中命令行參數的聲明、定義和解析。gflags 庫提供了一種簡單的方式來添加、解析和文檔化命令行標志(flags),使得程序可以根據不同的運行時配置進行調整。 它具…

Git LFS介紹(Large File Storage)大文件擴展,將大文件存儲在外部存儲,倉庫中只記錄文件的元數據(大文件的指針,類似一個小的占位符文件)

文章目錄 LFS的功能&#xff1f;如何使用LFS&#xff1f;將大文件存儲在外部系統是什么意思&#xff1f;具體是如何運作的&#xff1f;為什么要這樣做&#xff1f; 對開發者的影響&#xff1f;1. **性能和效率**2. **協作體驗**3. **版本管理差異**4. **額外的工具和配置** LFS…

Fastgpt學習(5)- FastGPT 私有化部署問題解決

1.? 問題描述&#xff1a; Windows系統&#xff0c;本地私有化部署&#xff0c;postgresql數據庫鏡像日志持續報錯" data directory “/var/lib/postgresql/data” has invalid permissions "&#xff0c;“ DETAIL: Permissions should be urwx (0700) or urwx,gr…

2026考研趨勢深度解析:政策變化+高效工具指南

2026考研深度解析&#xff1a;趨勢洞察高效工具指南&#xff0c;助你科學備戰上岸 從政策變化到工具實戰&#xff0c;這份千字攻略解決99%考生的核心焦慮 【熱點引入&#xff1a;考研賽道進入“高難度模式”】 2025年全國碩士研究生報名人數突破520萬&#xff0c;報錄比預計擴…

娛樂使用,可以生成轉賬、圖片、聊天等對話內容

軟件介紹 今天要給大家介紹一款由吾愛大佬 lifeixue 開發的趣味軟件。它的玩法超豐富&#xff0c;能夠生成各式各樣的角色&#xff0c;支持文字聊天、發紅包、轉賬、發語音以及分享圖片等多種互動形式&#xff0c;不過在分享前得著重提醒&#xff0c;此軟件僅供娛樂&#xff0…

DeepSeek動畫視頻全攻略:從架構到本地部署

DeepSeek 本身并不直接生成動畫視頻,而是通過與一系列先進的 AI 工具和傳統軟件協作,完成動畫視頻的制作任務。這一獨特的架構模式,使得 DeepSeek 在動畫視頻創作領域發揮著不可或缺的輔助作用。其核心流程主要包括腳本生成、畫面設計、視頻合成與后期處理這幾個關鍵環節。 …

C++類與對象深度解析(一):從引用、內聯函數到構造析構的編程實踐

目錄 一.引用 引用的特征&#xff1a;1.引用必須初始化 2.本質是別名 3.函數參數傳遞 4.常引用 5.函數返回值 6.權限 放大 縮小 平移 引用 vs 指針 二.內聯函數 關鍵點說明 三.宏函數 四.類 什么是類&#xff1f; 簡單的類 五.構造函數與析構函數 1. 構造函數&…

vsan數據恢復—vsan緩存盤故障導致虛擬磁盤文件丟失的數據恢復案例

vsan數據恢復環境&故障&#xff1a; VMware vsan架構采用21模式。每臺設備只有一個磁盤組&#xff08;71&#xff09;&#xff0c;緩存盤的大小為240GB&#xff0c;容量盤的大小為1.2TB。 由于其中一臺主機&#xff08;0號組設備&#xff09;的緩存盤出現故障&#xff0c;導…

開源在線考試系統開源在線考試系統:支持數學公式的前后端分離解決方案

開源在線考試系統&#xff1a;支持數學公式的前后端分離解決方案 項目介紹項目概述&#xff1a;技術棧&#xff1a;版本要求主要功能&#xff1a;特色亮點 項目倉庫地址演示地址GiteeGitHub 系統效果展示教師端系統部分功能截圖學生端系統部分功能截圖 結語 項目介紹 項目概述…

redis解決高并發看門狗策略

當一個業務執行時間超過自己設定的鎖釋放時間&#xff0c;那么會導致有其他線程進入&#xff0c;從而搶到同一個票,所有需要使用看門狗策略&#xff0c;其實就是開一個守護線程&#xff0c;讓守護線程去監控key&#xff0c;如果到時間了還未結束&#xff0c;就會將這個key重新s…

新數據結構(12)——代理

什么是代理 在進行操作時有時不希望用戶直接接觸到目標&#xff0c;這時需要使用代理讓用戶間接接觸到目標 給目標對象提供一個代理對象&#xff0c;并且由代理對象控制著對目標對象的引用 圖解&#xff1a; 代理的目的 控制訪問&#xff1a;通過代理對象的方式間接的訪問目…

Unity Shader Graph 2D - Procedural程序化圖形之夾心圓環

前言 本文將使用Unity Shader Graph的節點來繪制一個夾心圓環,分成三部分外環、內環和中心環。通過制作一個夾心圓環能夠更好地理解和實踐Shader Graph中的基礎節點以及思維。 創建一個Ring的Shader Graph文件,再創建一個對應的材質球M_Ring以及一個Texture2D的MainT…

緩存三大問題及其解決方案

緩存三大問題及其解決方案 1. 前言 ? 在現代系統架構中&#xff0c;緩存與數據庫的結合使用是一種經典的設計模式。為了確保緩存中的數據與數據庫中的數據保持一致&#xff0c;通常會給緩存數據設置一個過期時間。當系統接收到用戶請求時&#xff0c;首先會訪問緩存。如果緩…

【算法】----多重背包問題I,II(動態規劃)

&#x1f339;作者:云小逸 &#x1f4dd;個人主頁:云小逸的主頁 &#x1f4dd;Github:云小逸的Github &#x1f91f;motto:要敢于一個人默默的面對自己&#xff0c;強大自己才是核心。不要等到什么都沒有了&#xff0c;才下定決心去做。種一顆樹&#xff0c;最好的時間是十年前…

LeetCode-524. 通過刪除字母匹配到字典里最長單詞

1、題目描述&#xff1a; 給你一個字符串 s 和一個字符串數組 dictionary &#xff0c;找出并返回 dictionary 中最長的字符串&#xff0c;該字符串可以通過刪除 s 中的某些字符得到。 如果答案不止一個&#xff0c;返回長度最長且字母序最小的字符串。如果答案不存在&#x…