🔭 嗨,您好 👋 我是 vnjohn,在互聯網企業擔任后端開發,CSDN 優質創作者
📖 推薦專欄:Spring、MySQL、Nacos、Java,后續其他專欄會持續優化更新迭代
🌲文章所在專欄:RocketMQ
🤔 我當前正在學習微服務領域、云原生領域、消息中間件等架構、原理知識
💬 向我詢問任何您想要的東西,ID:vnjohn
🔥覺得博主文章寫的還 OK,能夠幫助到您的,感謝三連支持博客🙏
😄 代詞: vnjohn
? 有趣的事實:音樂、跑步、電影、游戲
目錄
- 前言
- new
- BrokerOuterAPI
- MQClientInstance
- NettyRemotingClient
- initialize
- NettyRemotingServer
- start
- NettyRemotingServer
- NettyRemotingClient
- 總結
前言
RocketMQ 專欄篇:
從零開始:手把手搭建 RocketMQ 單節點、集群節點實例
保護數據完整性:探索 RocketMQ 分布式事務消息的力量
RocketMQ 分布式事務消息實戰指南:確保數據一致性的關鍵設計
RocketMQ 生產者源碼分析:DefaultMQProducer、DefaultMQProducerImpl
RocketMQ MQClientInstance、生產者實例啟動源碼分析
RocketMQ 投遞消息方式以及消息體結構分析:Message、MessageQueueSelector
RocketMQ DefaultMQProducer#send 方法源碼解析:生產者投遞消息(一)
RocketMQ DefaultMQProducer#send 方法源碼解析:生產者投遞消息(二)
RocketMQ 通信機制底層數據結構及源碼解析
上篇文章【RocketMQ 通信機制底層數據結構及源碼解析 】主要介紹了 RocketMQ 中底層的網絡通信機制涉及到的數據結構以及線程模型通信,未做過多源碼的介紹,這篇文章主要圍繞著一塊的源碼解讀.
new
在 Broker 服務端創建 BrokerController 時,會實例化 BrokerController,在里面會傳遞 NettyServerConfig、NettyClientConfig,如下:
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
nettyServerConfig.setListenPort(10911);
nettyServerConfig.setUseEpollNativeSelector(true);
// .....
final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);
Netty Server Boss 默認綁定的端口:10911
BrokerOuterAPI
在實例化 BrokerController 時,會先將 NettyRemotingClient 先創建好,它主要用來與其他 Broker 之間進行相互通信的,比如:當通過命令在某臺 Broker 創建一個 Topic,會通過當前 Broker 組裝好信息,發送給其他 Broker 進行 Topic 路由信息進行傳遞,以便于其他 Broker 都得知該 Topic 信息,進行消息的接收.
public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig) {this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;// ....this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {this.remotingClient = new NettyRemotingClient(nettyClientConfig);this.remotingClient.registerRPCHook(rpcHook);
}
MQClientInstance
在生產者、消費者啟動時,通過 MQClientManager#getOrCreateMQClientInstance會創建 MQClientInstance 實例,會將 NettyClientConfig 綁定好
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {this.nettyClientConfig = new NettyClientConfig();this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());// 客戶端遠程調用的處理器,接受來自 Broker 請求并做出響應this.clientRemotingProcessor = new ClientRemotingProcessor(this);// MQ 客戶端 API 發起請求的類this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
}
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,final ClientRemotingProcessor clientRemotingProcessor,RPCHook rpcHook, final ClientConfig clientConfig) {this.clientConfig = clientConfig;// RocketMQ 網絡模型的核心類this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);this.clientRemotingProcessor = clientRemotingProcessor;this.remotingClient.registerRPCHook(rpcHook); this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);// 消費組數量發生變化,觸發重平衡this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);// 消費者客戶端重置偏移量this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);// 獲取消費者狀態this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);// 獲取消費者運行的信息this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);// 消費消息this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);// 回復消息this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
NettyRemotingClient
NettyRemotingClient 充當 RocketMQ 網絡通信模型下的客戶端,生產者、消費者、Broker 都持有對它的引用進行使用,它整體的實例化過程源碼如下:
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,final ChannelEventListener channelEventListener) {// 單向發送信號量數、異步發送信號量數 = 65535super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());this.nettyClientConfig = nettyClientConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 使用公共線程池處理來自客戶端的各種 Processor,最低線程數為 4、最大線程數為 CPU 核數this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());}});this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));}});if (nettyClientConfig.isUseTLS()) {try {sslContext = TlsHelper.buildSslContext(true);log.info("SSL enabled for client");} catch (IOException e) {log.error("Failed to create SSLContext", e);} catch (CertificateException e) {log.error("Failed to create SSLContext", e);throw new RuntimeException("Failed to create SSLContext", e);}}
}
在其實例化時,提供了一個內部局部變量為 Bootstrap
private final Bootstrap bootstrap = new Bootstrap();
initialize
在實例化 BrokerController 期間,只是會將 Netty 服務端,給設置好,不做任何處理
NettyRemotingServer
調用 BrokerController#initialize 初始化方法時,會實例化 NettyRemotingServer
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// Processor 公共處理的線程池,當未指定 Executor 時this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 默認都是創建 EpollEventLoopGroupif (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}loadSslContext();
}
- 創建 Semaphore Oneway 信號量:256,Semaphore Async 信號量:64
- 創建 Processor 公共處理的線程池,當 Processor 未指定 Executor 時,分配給這個 Executor 進行處理,公共的業務線程池
- 創建 1 個線程數的 EpollEventLoopGroup,Reactor 主線程
- 創建 3 個線程數的 EpollEventLoopGroup,Reactor 線程池
通過 useEpoll 方法來判別 EpollEventLoopGroup 還是 NioEventLoopGroup
private boolean useEpoll() {// OS 類型:Windows、Linuxreturn RemotingUtil.isLinuxPlatform()// 通過 NettyServerConfig.setUseEpollNativeSelector 方法設置是否開啟 Epoll Selector 模型&& nettyServerConfig.isUseEpollNativeSelector()&& Epoll.isAvailable();
}
在實例化 BrokerController 時已經設置 useEpollNativeSelector 變量為 true.
start
NettyRemotingServer
通過 BrokerController#start 方法再調用 NettyRemotingServer#start 方法啟動 Netty Server 服務端
public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();/*1. SslHandler:SSL安全套接字協議2. ?3. FileRegionEncoder:文件區域采用 Zero-Copy SendFile 編碼傳輸4. ?5. NettyEncoder:編碼器6. ?7. NettyDecoder:解碼器8. ?9. IdleStateHandler:空閑檢查10. ?11. NettyConnectManageHandler:網絡連接管理12. ?13. NettyServerHandler:服務端請求處理器*/ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 在 TCP 協議中,當服務器端接收到客戶端的連接請求時,會創建一個連接隊列來存儲這些請求,然后依次處理// ChannelOption.SO_BACKLOG 參數就是用來設置這個連接隊列的大小.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())// 默認情況下,TCP 連接在 TIME_WAIT 狀態時,不能立即被重用,必須等待一段時間才能重用// 通過給套接字配置可重用屬性,告訴操作系統內核,這樣的 TCP 連接可以復用 TIME_WAIT 狀態的連接.option(ChannelOption.SO_REUSEADDR, true)// 用于開啟或者關閉保活探測,默認情況下是關閉的// 當 SO_KEEPALIVE 開啟時,可以保持連接檢測對方主機是否崩潰,避免(服務器)永遠阻塞于 TCP 連接的輸入.option(ChannelOption.SO_KEEPALIVE, false)// TCP_NODELAY 是禁用Nagle算法,即數據包立即發送出去// 如果要求高實時性,有數據發送時就馬上發送,就將該選項設置為 true 關閉 Nagle 算法// 如果要減少發送次數減少網絡交互,就設置為 false 等累積一定大小后再發送。默認為 false.childOption(ChannelOption.TCP_NODELAY, true)// 綁定本地端口 Broker:10911、NameSrv:9876.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});// 設置發送緩沖區大小if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}// 設置接收緩沖區大小if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}// 設置寫緩沖區大小if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}// 設置是否開啟池化 ByteBufAllocator,采用默認的 PooledByteBufAllocatorif (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}
啟動 NettyRemotingServer 流程如下:
- 創建 DefaultEventExecutorGroup Worker 線程池,默認線程數量:8,線程名 prefix:NettyServerCodecThread_
- 通過 ServerBootstrap 指定好分組:Reactor 主線程、Reactor 線程池
- 創建 EpollServerSocketChannel,ServerSocketChannel 實現類
- 設置服務端參數,如下表
- 調用 io.netty.bootstrap.AbstractBootstrap#bind 方法創建一個 EpollServerSocketChannel,并且綁定好地址、端口
- 啟動 NettyEventExecutor,它是一個單獨的線程,用來接收來自 Netty 客戶端空閑、關閉、連接、異常事件并進行監聽回調處理.
- 創建一個 Timer 定時器,每隔 3 秒掃描哪些超時等待的客戶端請求,并對它們進行處理,響應超時等待回調請求返回給客戶端
參數名 | 參數值 | 參數描述 |
---|---|---|
ChannelOption.SO_BACKLOG | 1024 | 當服務器端接收到客戶端的連接請求時,會創建一個連接隊列來存儲這些請求 |
ChannelOption.SO_REUSEADDR | true | 通過給套接字配置可重用屬性,告訴操作系統內核,這樣的 TCP 連接可以復用 TIME_WAIT 狀態的連接 |
ChannelOption.SO_KEEPALIVE | false | 用于開啟或者關閉保活探測,默認情況下是關閉的 |
ChannelOption.TCP_NODELAY | true | 如果要求高實時性,有數據發送時就馬上發送,就將該選項設置為 true 關閉 Nagle 算法 如果要減少發送次數減少網絡交互,就設置為 false 等累積一定大小后再發送,默認為 false |
ChannelOption.SO_SNDBUF | 0 | 設置發送緩沖區大小 |
ChannelOption.SO_RCVBUF | 0 | 設置接收緩沖區大小 |
ChannelOption.WRITE_BUFFER_WATER_MARK | 0 | 設置寫緩沖區大小 |
ChannelOption.ALLOCATOR | PooledByteBufAllocator.DEFAULT | 優先分配直接內存 |
Broker 服務端會在初始化階段,通過調用 BrokerController#registerProcessor 方法注冊,請求 -> Processor 處理器之間的映射關系,將其寫入到 NettyRemotingAbstract#processorTable 集合中,當接收來自客戶端請求時,代表輸入由 Netty 最后一個處理器:NettyRemotingServer.NettyServerHandler 接收處理,執行其內部的 channelRead0 方法處理消息收到的請求,根據請求體 RequestCommand 攜帶的 code,從 processorTable 集合中找到 Pair 組合「Processor,Executor」等待 Broker 處理完成之后,再執行客戶端的回調方法,返回給客戶端具體的請求結果.
NettyRemotingClient
在執行 BrokerController#start 時,同時會將 BrokerOuterAPI 啟動,也就是啟動 NettyRemotingClient
在執行 DefaultMQProducer#start、DefaultMQPushConsumerImpl#start 方法時,同時會將 MQClientAPIImpl 也啟動,也就是啟動 NettyRemotingClient
所以,從 Broker、生產者、消費者角度作為客戶端,它們使用的都是同一個類 NettyRemotingClient 邏輯作為 Netty 客戶端使用,以下是其啟動時具體的源碼:
public void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// 默認線程數為 4nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());}});// 后續會發起請求時會通過 eventLoopGroupWorker 去建立 Socket 連接與服務端之間進行讀、寫交互,NioSocketChannel 代表的就是非阻塞的 SocketChannelBootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// 數據包組裝為更大的幀然后進行發送.option(ChannelOption.TCP_NODELAY, true)// 定時發送探測包來探測連接的對端是否存活.option(ChannelOption.SO_KEEPALIVE, false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}// DefaultEventExecutorGroup 用來執行以下五個 ChannelHandlerpipeline.addLast(defaultEventExecutorGroup,// 編碼 -> 處理請求new NettyEncoder(),// 解碼 -> 處理響應new NettyDecoder(),new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),new NettyConnectManageHandler(),// 遠程調用->請求、響應處理器new NettyClientHandler());}});// 操作系統客戶端發送緩沖區的大小if (nettyClientConfig.getClientSocketSndBufSize() > 0) {log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}// 操作系統客戶端接收緩沖區的大小if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}// Timer 定時執行哪些請求過期的事件,每隔 3 秒this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);// 生產者、消費者客戶端一般為空,在 nameserver 與 Broker 交互時會使用到,做一些連接、關閉、異常、死亡狀態的回調處理if (this.channelEventListener != null) {this.nettyEventExecutor.start();}
}
啟動 NettyRemotingClient 流程如下:
- 創建 DefaultEventExecutorGroup Worker 線程池,用于向客戶端發起寫事件、接收讀事件的處理
- 通過 group 綁定 Worker 主線程,創建 NioSocketChannel 非阻塞 SocketChannel
- 設置相關的客戶端參數,如下表
- 設置客戶端請求、響應時要執行的處理器邏輯,主要是:編碼-NettyEncoder、解碼-NettyDecoder、請求_響應處理器-NettyClientHandler
- 創建一個 Timer 定時器,每隔 3 秒掃描哪些超時等待的請求,并對它們進行處理,響應超時等待回調請求返回給業務調用方
參數名 | 參數值 | 參數描述 |
---|---|---|
ChannelOption.TCP_NODELAY | true | 如果要求高實時性,有數據發送時就馬上發送,就將該選項設置為 true 關閉 Nagle 算法 如果要減少發送次數減少網絡交互,就設置為 false 等累積一定大小后再發送,默認為 false |
ChannelOption.SO_KEEPALIVE | false | 用于開啟或者關閉保活探測,默認情況下是關閉的 |
CONNECT_TIMEOUT_MILLIS | 3000 | 連接超時時長 3 秒,在規定時間內未處理完成返回 Timeout 異常 |
ChannelOption.SO_SNDBUF | 0 | 客戶端發送緩沖區的大小 |
ChannelOption.SO_RCVBUF | 0 | 客戶端接收緩沖區的大小 |
ChannelOption.WRITE_BUFFER_WATER_MARK | 0 | 設置寫緩沖區大小 |
在作為客戶端角度,只有當每次發起投遞消息、消費消息請求時,才會創建與服務端之間的 Channel 通道,核心方法 NettyRemotingClient#createChannel 內部調用 Bootstrap#connect(java.net.SocketAddress) 建立與服務端之間的連接,然后再發起請求,請求的內容以及協議已經在本節專欄的上一篇博文講到過了.
總結
該篇文章主要介紹在 RocketMQ remoting 底層通信模塊中的 NettyRemotingServer、NettyRemotingClient 實例化、初始化、啟動時源碼的分析,在 BrokerController 實例化會優先構建好 Netty 客戶端實例,在其初始化階段會構建好 Netty 服務端實例,而在生產者、消費者側,是在實例化 MQClientInstance 實例時會將 Netty 客戶端實例也構建好,同時在 Broker、生產者、消費者啟動時,會將對應的 Netty 服務端、客戶端都一并啟動,比編寫文章不易,希望對您有幫助,能夠喜歡~
博文放在 RocketMQ 專欄里,歡迎訂閱,會持續更新!
如果覺得博文不錯,關注我 vnjohn,后續會有更多實戰、源碼、架構干貨分享!
推薦專欄:Spring、MySQL,訂閱一波不再迷路
大家的「關注?? + 點贊👍 + 收藏?」就是我創作的最大動力!謝謝大家的支持,我們下文見!