RocketMQ NettyRemotingServer、NettyRemotingClient 實例化、初始化、啟動源碼解析

在這里插入圖片描述

🔭 嗨,您好 👋 我是 vnjohn,在互聯網企業擔任后端開發,CSDN 優質創作者
📖 推薦專欄:Spring、MySQL、Nacos、Java,后續其他專欄會持續優化更新迭代
🌲文章所在專欄:RocketMQ
🤔 我當前正在學習微服務領域、云原生領域、消息中間件等架構、原理知識
💬 向我詢問任何您想要的東西,ID:vnjohn
🔥覺得博主文章寫的還 OK,能夠幫助到您的,感謝三連支持博客🙏
😄 代詞: vnjohn
? 有趣的事實:音樂、跑步、電影、游戲

目錄

  • 前言
  • new
    • BrokerOuterAPI
    • MQClientInstance
    • NettyRemotingClient
  • initialize
    • NettyRemotingServer
  • start
    • NettyRemotingServer
    • NettyRemotingClient
  • 總結

前言

RocketMQ 專欄篇:

從零開始:手把手搭建 RocketMQ 單節點、集群節點實例

保護數據完整性:探索 RocketMQ 分布式事務消息的力量

RocketMQ 分布式事務消息實戰指南:確保數據一致性的關鍵設計

RocketMQ 生產者源碼分析:DefaultMQProducer、DefaultMQProducerImpl

RocketMQ MQClientInstance、生產者實例啟動源碼分析

RocketMQ 投遞消息方式以及消息體結構分析:Message、MessageQueueSelector

RocketMQ DefaultMQProducer#send 方法源碼解析:生產者投遞消息(一)
RocketMQ DefaultMQProducer#send 方法源碼解析:生產者投遞消息(二)
RocketMQ 通信機制底層數據結構及源碼解析

上篇文章【RocketMQ 通信機制底層數據結構及源碼解析 】主要介紹了 RocketMQ 中底層的網絡通信機制涉及到的數據結構以及線程模型通信,未做過多源碼的介紹,這篇文章主要圍繞著一塊的源碼解讀.

new

在 Broker 服務端創建 BrokerController 時,會實例化 BrokerController,在里面會傳遞 NettyServerConfig、NettyClientConfig,如下:

final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
nettyServerConfig.setListenPort(10911);
nettyServerConfig.setUseEpollNativeSelector(true);
// .....
final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);

Netty Server Boss 默認綁定的端口:10911

BrokerOuterAPI

在實例化 BrokerController 時,會先將 NettyRemotingClient 先創建好,它主要用來與其他 Broker 之間進行相互通信的,比如:當通過命令在某臺 Broker 創建一個 Topic,會通過當前 Broker 組裝好信息,發送給其他 Broker 進行 Topic 路由信息進行傳遞,以便于其他 Broker 都得知該 Topic 信息,進行消息的接收.

public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig) {this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;// ....this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {this.remotingClient = new NettyRemotingClient(nettyClientConfig);this.remotingClient.registerRPCHook(rpcHook);
}

MQClientInstance

在生產者、消費者啟動時,通過 MQClientManager#getOrCreateMQClientInstance會創建 MQClientInstance 實例,會將 NettyClientConfig 綁定好

public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {this.nettyClientConfig = new NettyClientConfig();this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());// 客戶端遠程調用的處理器,接受來自 Broker 請求并做出響應this.clientRemotingProcessor = new ClientRemotingProcessor(this);// MQ 客戶端 API 發起請求的類this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
}
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,final ClientRemotingProcessor clientRemotingProcessor,RPCHook rpcHook, final ClientConfig clientConfig) {this.clientConfig = clientConfig;// RocketMQ 網絡模型的核心類this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);this.clientRemotingProcessor = clientRemotingProcessor;this.remotingClient.registerRPCHook(rpcHook); 		this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);// 消費組數量發生變化,觸發重平衡this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);// 消費者客戶端重置偏移量this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);// 獲取消費者狀態this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);// 獲取消費者運行的信息this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);// 消費消息this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);// 回復消息this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}

NettyRemotingClient

NettyRemotingClient 充當 RocketMQ 網絡通信模型下的客戶端,生產者、消費者、Broker 都持有對它的引用進行使用,它整體的實例化過程源碼如下:

public NettyRemotingClient(final NettyClientConfig nettyClientConfig,final ChannelEventListener channelEventListener) {// 單向發送信號量數、異步發送信號量數 = 65535super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());this.nettyClientConfig = nettyClientConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 使用公共線程池處理來自客戶端的各種 Processor,最低線程數為 4、最大線程數為 CPU 核數this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());}});this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));}});if (nettyClientConfig.isUseTLS()) {try {sslContext = TlsHelper.buildSslContext(true);log.info("SSL enabled for client");} catch (IOException e) {log.error("Failed to create SSLContext", e);} catch (CertificateException e) {log.error("Failed to create SSLContext", e);throw new RuntimeException("Failed to create SSLContext", e);}}
}

在其實例化時,提供了一個內部局部變量為 Bootstrap

 private final Bootstrap bootstrap = new Bootstrap();

initialize

在實例化 BrokerController 期間,只是會將 Netty 服務端,給設置好,不做任何處理

NettyRemotingServer

調用 BrokerController#initialize 初始化方法時,會實例化 NettyRemotingServer

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// Processor 公共處理的線程池,當未指定 Executor 時this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 默認都是創建 EpollEventLoopGroupif (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}loadSslContext();
}
  1. 創建 Semaphore Oneway 信號量:256,Semaphore Async 信號量:64
  2. 創建 Processor 公共處理的線程池,當 Processor 未指定 Executor 時,分配給這個 Executor 進行處理,公共的業務線程池
  3. 創建 1 個線程數的 EpollEventLoopGroup,Reactor 主線程
  4. 創建 3 個線程數的 EpollEventLoopGroup,Reactor 線程池

通過 useEpoll 方法來判別 EpollEventLoopGroup 還是 NioEventLoopGroup

private boolean useEpoll() {// OS 類型:Windows、Linuxreturn RemotingUtil.isLinuxPlatform()// 通過 NettyServerConfig.setUseEpollNativeSelector 方法設置是否開啟 Epoll Selector 模型&& nettyServerConfig.isUseEpollNativeSelector()&& Epoll.isAvailable();
}

在實例化 BrokerController 時已經設置 useEpollNativeSelector 變量為 true.

start

NettyRemotingServer

通過 BrokerController#start 方法再調用 NettyRemotingServer#start 方法啟動 Netty Server 服務端

public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();/*1. SslHandler:SSL安全套接字協議2. ?3. FileRegionEncoder:文件區域采用 Zero-Copy SendFile 編碼傳輸4. ?5. NettyEncoder:編碼器6. ?7. NettyDecoder:解碼器8. ?9. IdleStateHandler:空閑檢查10. ?11. NettyConnectManageHandler:網絡連接管理12. ?13. NettyServerHandler:服務端請求處理器*/ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 在 TCP 協議中,當服務器端接收到客戶端的連接請求時,會創建一個連接隊列來存儲這些請求,然后依次處理// ChannelOption.SO_BACKLOG 參數就是用來設置這個連接隊列的大小.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())// 默認情況下,TCP 連接在 TIME_WAIT 狀態時,不能立即被重用,必須等待一段時間才能重用// 通過給套接字配置可重用屬性,告訴操作系統內核,這樣的 TCP 連接可以復用 TIME_WAIT 狀態的連接.option(ChannelOption.SO_REUSEADDR, true)// 用于開啟或者關閉保活探測,默認情況下是關閉的// 當 SO_KEEPALIVE 開啟時,可以保持連接檢測對方主機是否崩潰,避免(服務器)永遠阻塞于 TCP 連接的輸入.option(ChannelOption.SO_KEEPALIVE, false)// TCP_NODELAY 是禁用Nagle算法,即數據包立即發送出去// 如果要求高實時性,有數據發送時就馬上發送,就將該選項設置為 true 關閉 Nagle 算法// 如果要減少發送次數減少網絡交互,就設置為 false 等累積一定大小后再發送。默認為 false.childOption(ChannelOption.TCP_NODELAY, true)// 綁定本地端口 Broker:10911、NameSrv:9876.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});// 設置發送緩沖區大小if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}// 設置接收緩沖區大小if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}// 設置寫緩沖區大小if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}// 設置是否開啟池化 ByteBufAllocator,采用默認的 PooledByteBufAllocatorif (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}

啟動 NettyRemotingServer 流程如下:

  1. 創建 DefaultEventExecutorGroup Worker 線程池,默認線程數量:8,線程名 prefix:NettyServerCodecThread_
  2. 通過 ServerBootstrap 指定好分組:Reactor 主線程、Reactor 線程池
  3. 創建 EpollServerSocketChannel,ServerSocketChannel 實現類
  4. 設置服務端參數,如下表
  5. 調用 io.netty.bootstrap.AbstractBootstrap#bind 方法創建一個 EpollServerSocketChannel,并且綁定好地址、端口
  6. 啟動 NettyEventExecutor,它是一個單獨的線程,用來接收來自 Netty 客戶端空閑、關閉、連接、異常事件并進行監聽回調處理.
  7. 創建一個 Timer 定時器,每隔 3 秒掃描哪些超時等待的客戶端請求,并對它們進行處理,響應超時等待回調請求返回給客戶端
參數名參數值參數描述
ChannelOption.SO_BACKLOG1024當服務器端接收到客戶端的連接請求時,會創建一個連接隊列來存儲這些請求
ChannelOption.SO_REUSEADDRtrue通過給套接字配置可重用屬性,告訴操作系統內核,這樣的 TCP 連接可以復用 TIME_WAIT 狀態的連接
ChannelOption.SO_KEEPALIVEfalse用于開啟或者關閉保活探測,默認情況下是關閉的
ChannelOption.TCP_NODELAYtrue如果要求高實時性,有數據發送時就馬上發送,就將該選項設置為 true 關閉 Nagle 算法
如果要減少發送次數減少網絡交互,就設置為 false 等累積一定大小后再發送,默認為 false
ChannelOption.SO_SNDBUF0設置發送緩沖區大小
ChannelOption.SO_RCVBUF0設置接收緩沖區大小
ChannelOption.WRITE_BUFFER_WATER_MARK0設置寫緩沖區大小
ChannelOption.ALLOCATORPooledByteBufAllocator.DEFAULT優先分配直接內存

Broker 服務端會在初始化階段,通過調用 BrokerController#registerProcessor 方法注冊,請求 -> Processor 處理器之間的映射關系,將其寫入到 NettyRemotingAbstract#processorTable 集合中,當接收來自客戶端請求時,代表輸入由 Netty 最后一個處理器:NettyRemotingServer.NettyServerHandler 接收處理,執行其內部的 channelRead0 方法處理消息收到的請求,根據請求體 RequestCommand 攜帶的 code,從 processorTable 集合中找到 Pair 組合「Processor,Executor」等待 Broker 處理完成之后,再執行客戶端的回調方法,返回給客戶端具體的請求結果.

NettyRemotingClient

在執行 BrokerController#start 時,同時會將 BrokerOuterAPI 啟動,也就是啟動 NettyRemotingClient

在執行 DefaultMQProducer#start、DefaultMQPushConsumerImpl#start 方法時,同時會將 MQClientAPIImpl 也啟動,也就是啟動 NettyRemotingClient

所以,從 Broker、生產者、消費者角度作為客戶端,它們使用的都是同一個類 NettyRemotingClient 邏輯作為 Netty 客戶端使用,以下是其啟動時具體的源碼:

public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// 默認線程數為 4nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());}});// 后續會發起請求時會通過 eventLoopGroupWorker 去建立 Socket 連接與服務端之間進行讀、寫交互,NioSocketChannel 代表的就是非阻塞的 SocketChannelBootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// 數據包組裝為更大的幀然后進行發送.option(ChannelOption.TCP_NODELAY, true)// 定時發送探測包來探測連接的對端是否存活.option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}// DefaultEventExecutorGroup 用來執行以下五個 ChannelHandlerpipeline.addLast(defaultEventExecutorGroup,// 編碼 -> 處理請求new NettyEncoder(),// 解碼 -> 處理響應new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),// 遠程調用->請求、響應處理器new NettyClientHandler());}});// 操作系統客戶端發送緩沖區的大小if (nettyClientConfig.getClientSocketSndBufSize() > 0) {log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}// 操作系統客戶端接收緩沖區的大小if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}// Timer 定時執行哪些請求過期的事件,每隔 3 秒this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);// 生產者、消費者客戶端一般為空,在 nameserver 與 Broker 交互時會使用到,做一些連接、關閉、異常、死亡狀態的回調處理if (this.channelEventListener != null) {this.nettyEventExecutor.start();}
}

啟動 NettyRemotingClient 流程如下:

  1. 創建 DefaultEventExecutorGroup Worker 線程池,用于向客戶端發起寫事件、接收讀事件的處理
  2. 通過 group 綁定 Worker 主線程,創建 NioSocketChannel 非阻塞 SocketChannel
  3. 設置相關的客戶端參數,如下表
  4. 設置客戶端請求、響應時要執行的處理器邏輯,主要是:編碼-NettyEncoder、解碼-NettyDecoder、請求_響應處理器-NettyClientHandler
  5. 創建一個 Timer 定時器,每隔 3 秒掃描哪些超時等待的請求,并對它們進行處理,響應超時等待回調請求返回給業務調用方
參數名參數值參數描述
ChannelOption.TCP_NODELAYtrue如果要求高實時性,有數據發送時就馬上發送,就將該選項設置為 true 關閉 Nagle 算法
如果要減少發送次數減少網絡交互,就設置為 false 等累積一定大小后再發送,默認為 false
ChannelOption.SO_KEEPALIVEfalse用于開啟或者關閉保活探測,默認情況下是關閉的
CONNECT_TIMEOUT_MILLIS3000連接超時時長 3 秒,在規定時間內未處理完成返回 Timeout 異常
ChannelOption.SO_SNDBUF0客戶端發送緩沖區的大小
ChannelOption.SO_RCVBUF0客戶端接收緩沖區的大小
ChannelOption.WRITE_BUFFER_WATER_MARK0設置寫緩沖區大小

在作為客戶端角度,只有當每次發起投遞消息、消費消息請求時,才會創建與服務端之間的 Channel 通道,核心方法 NettyRemotingClient#createChannel 內部調用 Bootstrap#connect(java.net.SocketAddress) 建立與服務端之間的連接,然后再發起請求,請求的內容以及協議已經在本節專欄的上一篇博文講到過了.

總結

該篇文章主要介紹在 RocketMQ remoting 底層通信模塊中的 NettyRemotingServer、NettyRemotingClient 實例化、初始化、啟動時源碼的分析,在 BrokerController 實例化會優先構建好 Netty 客戶端實例,在其初始化階段會構建好 Netty 服務端實例,而在生產者、消費者側,是在實例化 MQClientInstance 實例時會將 Netty 客戶端實例也構建好,同時在 Broker、生產者、消費者啟動時,會將對應的 Netty 服務端、客戶端都一并啟動,比編寫文章不易,希望對您有幫助,能夠喜歡~

博文放在 RocketMQ 專欄里,歡迎訂閱,會持續更新!

如果覺得博文不錯,關注我 vnjohn,后續會有更多實戰、源碼、架構干貨分享!

推薦專欄:Spring、MySQL,訂閱一波不再迷路

大家的「關注?? + 點贊👍 + 收藏?」就是我創作的最大動力!謝謝大家的支持,我們下文見!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/41628.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/41628.shtml
英文地址,請注明出處:http://en.pswp.cn/web/41628.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

數學系C++ 類與對象 STL(九)

目錄 目錄 面向對象&#xff1a;py&#xff0c;c艸&#xff0c;Java都是,但c是面向過程 特征&#xff1a; 對象 內斂成員函數【是啥】&#xff1a; 構造函數和析構函數 構造函數 復制構造函數/拷貝構造函數&#xff1a; ?【……】 實參與形參的傳遞方式&#xff1a;值…

Node.js Stream

Node.js Stream Node.js 是一個基于 Chrome V8 引擎的 JavaScript 運行環境&#xff0c;它允許開發者使用 JavaScript 編寫服務器端代碼。Node.js 的一個核心特性是其對流&#xff08;Stream&#xff09;的處理能力。流是一種在 Node.js 中處理讀/寫文件、網絡通信或任何端到端…

【LeetCode】螺旋矩陣

目錄 一、題目二、解法完整代碼 一、題目 給你一個 m 行 n 列的矩陣 matrix &#xff0c;請按照 順時針螺旋順序 &#xff0c;返回矩陣中的所有元素。 示例 1&#xff1a; 輸入&#xff1a;matrix [[1,2,3],[4,5,6],[7,8,9]] 輸出&#xff1a;[1,2,3,6,9,8,7,4,5] 示例 2&…

go-redis 封裝事件-client封裝模型、批量數據處理的導出器設計

一、redis-go的封裝實踐-client模型 // Copyright 2020 Lingfei Kong <colin404foxmail.com>. All rights reserved. // Use of this source code is governed by a MIT style // license that can be found in the LICENSE file.package storageimport ("context&q…

MySQL性能優化 二、表結構設計優化

1.設計中間表 設計中間表&#xff0c;一般針對于統計分析功能&#xff0c;或者實時性不高的需求。 2.設計冗余字段 為減少關聯查詢&#xff0c;創建合理的冗余字段&#xff08;創建冗余字段還需要注意數據一致性問題&#xff09; 3.折表 對于字段太多的大表&#xff0c;考…

C++ STL容器:序列式容器-鏈list,forward_list

摘要&#xff1a; CC STL&#xff08;Standard Template Library&#xff0c;標準模板庫&#xff09;在C編程中的重要性不容忽視&#xff0c;STL提供了一系列容器、迭代器、算法和函數對象&#xff0c;這些組件極大地提高了C程序的開發效率和代碼質量。 STL 容器 分為 2 大類 …

Halcon 銑刀刀口破損缺陷檢測

一 OTSU OTSU&#xff0c;是一種自適應閾值確定的方法,又叫大津法&#xff0c;簡稱OTSU&#xff0c;是一種基于全局的二值化算法,它是根據圖像的灰度特性,將圖像分為前景和背景兩個部分。當取最佳閾值時&#xff0c;兩部分之間的差別應該是最大的&#xff0c;在OTSU算法中所采…

排序 -- 萬能測試oj

. - 力扣&#xff08;LeetCode&#xff09; 這道題我們可以使用我們學過的那些常見的排序方法來進行解答 //插入排序 void InsertSort(int* nums, int n) {for (int i 0; i < n-1; i){int end i;int tmp nums[end 1];while (end > 0){if (tmp < nums[end]){nums[…

PyVideoTrans:一款功能全面的視頻翻譯配音工具!【送源碼】

PyVideoTrans是一款功能全面的視頻翻譯配音工具&#xff0c;專為視頻內容創作者設計。它能夠將視頻中的語言翻譯成另一種語言&#xff0c;并自動生成與之匹配的字幕和配音。支持多種語言&#xff0c;包括但不限于中文&#xff08;簡繁體&#xff09;、英語、韓語、日語、俄語、…

10、廣告-用戶數據中心

用戶數據中心 用戶數據中心在程序化廣告中扮演著至關重要的角色&#xff0c;它主要包括DMP原理、用戶畫像邏輯、Look Alike原理和DMP對接DSP四個部分。下面&#xff0c;我們將詳細講解每個部分的內容。 &#xff08;一&#xff09;DMP原理 數據管理平臺&#xff08;Data Man…

Wormhole Filters: Caching Your Hash on Persistent Memory——泛讀筆記

EuroSys 2024 Paper 論文閱讀筆記整理 問題 近似成員關系查詢&#xff08;AMQ&#xff09;數據結構可以高效地近似確定元素是否在集合中&#xff0c;例如Bloom濾波器[10]、cuckoo濾波器[23]、quotient濾波器[8]及其變體。但AMQ數據結構的內存消耗隨著數據規模的增長而快速增長…

MSPM0G3507——串口0從數據線傳輸變為IO口傳輸

默認的跳線帽時這樣的&#xff0c;這樣時是數據線傳輸 需要改成這樣&#xff0c;即可用IO口進行數據傳輸

windows系統本地端口被占用的問題

第一步&#xff1a;查找所有運行的端口 按住“WindowsR”組合鍵&#xff0c;打開命令窗口&#xff0c;輸入【cmd】命令&#xff0c;回車。在彈出的窗口中輸入 命令【netstat -ano】&#xff0c;再按一下回車鍵 Win系統端口被占用-查找所有運行的端口 第二步&#xff1a;查看…

opencv_C++學習筆記(入門30講)

文章目錄 1.配置開發環境2.圖像讀取與顯示3.圖像色彩空間轉換4.圖像對象的創建與賦值5.圖像像素的讀寫操作6.圖像像素的算數操作7.滾動條-調整圖像亮度8.滾動條-調整對比度和亮度9.鍵盤響應操作10.圖像像素的邏輯操作11.圖像的通道分離和合并12.圖像色彩空間轉換13.圖像的像素值…

阿里云存儲的降本增效與運維

小浩負責公司存儲架構層&#xff0c;需要確保存儲層不會成為公司業務系統的性能瓶頸&#xff0c;讓數據讀寫達到最佳性能。那么小浩可以從哪些方面著手優化性能呢&#xff1f;他繼續求助系統架構師大雷。 小浩&#xff1a;雷哥&#xff0c;PD反饋公司系統最近響應很慢&#xff…

HTTP模塊(一)

HTTP服務 本小節主要講解HTTP服務如何創建服務&#xff0c;查看HTTP請求&響應報文&#xff0c;還有注意事項說明&#xff0c;另外講解本地環境&Node環境&瀏覽器之間的鏈路圖示&#xff0c;如何提取HTTP報文字符串&#xff0c;及報錯信息查詢。 創建HTTP服務端 c…

lspci

【原】Linux之PCIE三種空間解析 PCIe學習筆記——2.PCIe配置空間 PCIE學習&#xff08;2&#xff09;PCIE配置空間詳解 開發者分享 | 使用 lspci 和 setpci 調試 PCIe 問題 b : 字節 w&#xff1a;word L&#xff1a; 4byte

LLM - 詞表示和語言模型

一. 詞的相似度表示 (1): 用一系列與該詞相關的詞來表示 (2): 把每個詞表示一個獨立的符號(one hot) (3): 利用該詞上下文的詞來表示該詞 (3): 建立一個低維度的向量空間&#xff0c;用深度學習方法將該詞映射到這個空間里(Word Embedding) 二&#xff1a;語言模型 (1): 根…

Postman中數據文件的高效使用:測試自動化與數據驅動測試實踐

摘要 Postman 是一個強大的 API 開發工具&#xff0c;它不僅支持 API 的設計、開發和測試&#xff0c;還提供了數據驅動測試的功能。通過使用數據文件&#xff0c;我們可以模擬不同的測試場景&#xff0c;實現測試的自動化和重復執行。本文將詳細介紹如何在 Postman 中使用數據…

PHP-實例-CSRF

1 需求 按照用途分類&#xff1a; 會話&#xff08;會話ID和會話令牌 二選一&#xff09; 會話ID&#xff1a;服務器側自動生成&#xff0c;自動存儲在cookie中&#xff0c;需要在服務器側存儲會話令牌&#xff1a;服務器側手動生成&#xff0c;手動存儲在cookie中&#xff0…