【RocketMQ Broker 相關源碼】- NettyRemotingClient 和 NettyRemotingServer

文章目錄

  • 1. 前言
  • 2. BrokerOuterAPI
    • 2.1 NettyRemotingClient
    • 2.2 start 啟動
      • 2.2.1 NettyRemotingClient#start
  • 3. NettyRemotingServer
    • 3.1 ClientHousekeepingService
    • 3.2 ProducerManager#doChannelCloseEvent
    • 3.3 ConsumerManager#doChannelCloseEvent
      • 3.3.1 DefaultConsumerIdsChangeListener#handle
      • 3.3.2 ConsumerFilterManager#unRegister
      • 3.3.3 Broker2Client#notifyConsumerIdsChanged
    • 3.4 FilterServerManager#doChannelCloseEvent
  • 4. 小結


本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源碼系列目錄
  • 【RocketMQ NameServer】- NameServer 啟動源碼

在前面的 NameServer 啟動源碼,我們探討了 NettyRemotingServer 的啟動源碼,NameServer 作為服務端通過 NettyRemotingServer 處理 Broker 上報的信息,當然了 NettyRemotingServer 不單單是 NameServer 獨有的,Broker 既可以作為服務端和生產者、消費者通信,又可以作為客戶端和 NameServer 通信。

Broker 作為客戶端通信時,將一些方法比如 fetchNameServerAddr 拉取 NameServer 地址、注冊 Broker 信息到 NameServer、取消注冊 Broker 信息都封裝到了 BrokerOuterAPI 中。


2. BrokerOuterAPI

在這里插入圖片描述BrokerOuterAPI 在 BrokerController 構造器中被初始化,可以看到傳入的就是 NettyClientConfig,也就是 Netty 客戶端的配置,下面就來看下構造器的初始化。

public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {this(nettyClientConfig, null);
}public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {this.remotingClient = new NettyRemotingClient(nettyClientConfig);this.remotingClient.registerRPCHook(rpcHook);
}

可以看到 BrokerOuterAPI 的構造器就是初始化了 NettyRemotingClient 這個客戶端通信類,同時注冊進去的 RPC 鉤子為 null,也就是在跟 NameServer 通信的前后不會調用 RPC 鉤子的前置和后置方法,所以核心還是 NettyRemotingClient。


2.1 NettyRemotingClient

public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {this(nettyClientConfig, null);
}public NettyRemotingClient(final NettyClientConfig nettyClientConfig,final ChannelEventListener channelEventListener) {// 首先設置服務器單向、異步發送請求的信號量, 這個是為了防止同一時間發送太多單向請求或者異步請求, 默認都是 65536super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());// 客戶端配置this.nettyClientConfig = nettyClientConfig;// 連接事件監聽器this.channelEventListener = channelEventListener;// 公開線程池 publicExecutor 的核心線程數, 默認就是 4int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 創建一個默認線程數為 4 的線程池, 這個線程池可以用于處理請求回調this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 單線程 I/O 事件處理器(線程數 1)this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));}});if (nettyClientConfig.isUseTLS()) {try {sslContext = TlsHelper.buildSslContext(true);log.info("SSL enabled for client");} catch (IOException e) {log.error("Failed to create SSLContext", e);} catch (CertificateException e) {log.error("Failed to create SSLContext", e);throw new RuntimeException("Failed to create SSLContext", e);}}
}

這里的邏輯跟 NettyRemotingServer 差不多,只是作為客戶端不需要創建 Accept 事件 Reactor 組,也就是 bossGroup。


2.2 start 啟動

創建出 BrokerOuterAPI 之后,會在 BrokerController#start 方法中啟動。
在這里插入圖片描述
在這里插入圖片描述
這里面也是啟動了 remotingClient,所以下面就來看下 NettyRemotingClient 的 start 方法。


2.2.1 NettyRemotingClient#start

/*** 客戶端 Netty 服務啟動*/
@Override
public void start() {// 創建默認的事件處理組, 專門用于處理一些執行前的編解碼、連接空閑狀態檢測、網絡連接管理等操作, 不管是接收到消息還是發送消息(入站或者出站處理器)// 都可以通過這個線程池去做處理, 這樣就能避免 I/O 線程浪費資源在這些狀態檢測上, 這里默認創建的是 4 個線程大小的線程池this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {// 創建出來的線程是 NettyClientWorkerThread_0、NettyClientWorkerThread_1 ...return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());}});// I/O 事件處理器(線程數 1)Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// 禁用 Nagle 算法, Nagle 算法是說當應用程序發送的數據量較小時,TCP 不會立即將這些數據發送出去,而是會等待一段時間,將后續的數據合并// 成一個較大的數據包后再發送。這樣可以減少網絡上的數據包數量,降低網絡擁塞的可能性,提高傳輸效率.option(ChannelOption.TCP_NODELAY, true)// 用于 TCP 連接心跳檢測, 檢測 TCP 連接是否活躍, 這里可能是 Netty 自己已經存在更精細的 IdleStateHandler 處理器就行了.option(ChannelOption.SO_KEEPALIVE, false)// 連接超時時間.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}pipeline.addLast(defaultEventExecutorGroup,// 編碼處理器new NettyEncoder(),// 解碼處理器new NettyDecoder(),// 維護心跳連接的處理器, 指定的三個參數是值讀空閑時間、寫空閑時間、總空閑時間, 如果服務端一定時間內沒有讀寫就會出發不同的事件,這三個參數對應的// 事件分別是: IdleStateEvent.READER_IDLE、IdleStateEvent.WRITER_IDLE、IdleStateEvent.ALL_IDLE, 總之這玩意就是用來檢測有沒有長時間不// 讀寫的, 這樣可以判斷一個連接是不是空閑連接new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),// 連接管理器, 主要是管理連接的活躍、非活躍、注冊、取消注冊 ... 事件new NettyConnectManageHandler(),// 客戶端業務處理器new NettyClientHandler());}});// 設置 Netty 客戶端的消息接收緩沖區, 默認是 0if (nettyClientConfig.getClientSocketSndBufSize() > 0) {log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}// 設置 Netty 客戶端的消息發送緩沖區, 默認也是 0if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}// 寫緩沖區的低水位和高水位if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}// 定時任務, 初始化之后 3s 執行, 之后 1s 執行一次this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {// 掃描 responseTable, 將超時的 ResponseFuture 刪掉, 然后執行回調邏輯NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);// 如果連接監聽器設置了, 就啟動 NettyEventExecutor 去處理 NettyEventif (this.channelEventListener != null) {// broker 啟動的時候是不會設置連接監聽器的, 生產者和消費者啟動才會設置this.nettyEventExecutor.start();}
}

這里的方法跟 NettyRemotingServer 也是差不多的,首先是創建 defaultEventExecutorGroup 默認的事件處理組,專門用于處理一些執行前的編解碼、連接空閑狀態檢測、網絡連接管理等操作,不管是接收到消息還是發送消息(入站或者出站處理器) 都可以通過這個線程池去做處理,這樣就能避免 I/O 線程浪費資源在這些狀態檢測上, 這里默認創建的是 4 個線程大小的線程池。

接下來就是創建 Bootstrap,因為客戶端不涉及處理 Accept 連接事件,所以只需要設置從 Reactor 組就可以,不過這里面的 Reactor 線程組大小是 1,所以是單線程處理 IO 讀寫事件?很久沒看了也不是很清楚。

接著往下看,下面 initChannel 方法中往 Channel 的 Pipeline 里面設置的處理器包括編碼處理器,解碼處理器,心跳連接處理器,連接管理器和客戶端業務處理器。參看 NettyRemotingServer 的處理器。
在這里插入圖片描述
可以看到的是跟 NettyRemotingServer 對比,入站處理器就少了一個 HandshakeHandler,因為不需要處理連接事件,同時最后的 NettyServerHandler 換成了 NettyClientHandler。出站處理器則是一樣的。
在這里插入圖片描述
最后就是啟動定時任務,初始化之后 3s 執行, 之后 1s 執行一次,掃描 responseTable,將超時的 ResponseFuture 刪掉,然后執行回調邏輯。

由于 BrokerOuterAPI 并沒有設置 channelEventListener,所以就不會啟動 nettyEventExecutor,broker 啟動的時候是不會設置連接監聽器的, 生產者和消費者啟動才會設置,關于 nettyEventExecutor 可以看這篇文章:【RocketMQ NameServer】- NettyEventExecutor 處理 Netty 事件。


3. NettyRemotingServer

在第二小節我們說了,Broker 可以作為服務端跟生產者、消費者、過濾服務通信,也可以作為客戶端向 NameServer 發送請求,作為服務端,Broker 會在 initialize 初始化的時候創建出 fastRemotingServer 和 remotingServer。
在這里插入圖片描述
fastRemotingServer 和 remotingServer 都是作為 Netty 服務端通信用的,只是一個監聽 10909,一個監聽 10911,fastRemotingServer 所建立的連接通道也是 VIP 通道,VIP 意思就是性能比較高的,一般都是用于內部使用,比如超時同步加鎖請求、獲取配置等等,而 remotingServer 監聽端口 10911,專門處理來自生產者和消費者的請求,比如消息發送、消息拉取,因為生產者和消費者的交互是很頻繁的,所以自熱而然請求的性能就會慢一點。

當然了,創建 NettyRemotingServer 和啟動 NettyRemotingServer 的流程在 NameServer 啟動的文章中已經有講解過源碼,在 NettyRemotingServer 啟動的源碼中,我們也知道了 NettyRemotingServer 啟動的時候會啟動 nettyEventExecutor 線程,專門從 eventQueue 獲取不同的連接事件然后做不同的處理,關于連接事件 NettyEvent,后面我也寫了一篇文章 【RocketMQ NameServer】- NettyEventExecutor 處理 Netty 事件 來講述 NettyEventExecutor 是如何處理 Netty 事件的,NettyEvent 又是怎么來的。

當時寫上面這篇文章的時候可以看到 NettyEventExecutor#run 方法實際上是調用了 BrokerHousekeepingService 去處理連接事件,而 broker 的 remotingServer 看源碼也能看到是傳入了 clientHousekeepingService 去處理生產者和消費者的連接事件的,所以這里我們就來看下 clientHousekeepingService 這個類的邏輯。


3.1 ClientHousekeepingService

其實看名字就能看出來了,NameServer 通過 BrokerHousekeepingService 是專門處理 Broker 的連接事件,而 Broker 通過 ClientHousekeepingService 是專門處理生產者、消費者、過濾服務(已廢棄)的連接通道。

【RocketMQ NameServer】- NettyEventExecutor 處理 Netty 事件 這篇文章我們也知道了

  • NettyEventExecutor 會調用 ClientHousekeepingService#onChannelIdle 處理連接空閑事件(IDLE)
  • NettyEventExecutor 會調用 ClientHousekeepingService#onChannelClose 處理連接關閉事件(CLOSE)
  • NettyEventExecutor 會調用 ClientHousekeepingService#onChannelConnect 處理生產者和消費者連接事件(CONNECT)
  • NettyEventExecutor 會調用 ClientHousekeepingService#onChannelException 處理連接異常事件(EXCEPTION)

那下面來看下這幾個方法,由于源碼不多,就直接貼出來。

@Override
public void onChannelConnect(String remoteAddr, Channel channel) {}/*** 連接關閉* @param remoteAddr* @param channel*/
@Override
public void onChannelClose(String remoteAddr, Channel channel) {this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}/*** 連接處理發生異常* @param remoteAddr* @param channel*/
@Override
public void onChannelException(String remoteAddr, Channel channel) {this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}/*** 空閑連接處理* @param remoteAddr* @param channel*/
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}

可以看到,onChannelConnect 方法是一個空實現,應該是因為生產者和消費者都會定時上報心跳給 broker,所以連接不需要額外處理。

然后其他三個類型的事件都是調用同一個方法 doChannelCloseEvent 去處理,只是生產者、消費者、過濾服務器的邏輯不同,下面就一個一個來看下這里面的處理邏輯。


3.2 ProducerManager#doChannelCloseEvent

/*** broker 關閉和生產者的連接* @param remoteAddr* @param channel*/
public synchronized void doChannelCloseEvent(final String remoteAddr, final Channel channel) {if (channel != null) {// 遍歷所有生產者連接for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {// 生產者組final String group = entry.getKey();// 生產者組下面的生產者連接final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable =entry.getValue();// 刪除這個連接final ClientChannelInfo clientChannelInfo =clientChannelInfoTable.remove(channel);if (clientChannelInfo != null) {// 刪除成功之后, 將這個生產者的客戶端 ID 也從 clientChannelTable 集合中刪掉clientChannelTable.remove(clientChannelInfo.getClientId());log.info("NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",clientChannelInfo.toString(), remoteAddr, group);}}}
}

生產者管理類 ProducerManager#groupChannelTable 集合會存儲生產者組下面的所有生產者連接,clientChannelTable 存儲了【客戶端 ID -> 連接】映射關系。

/*** 生產者組 -> 生產者組下面的生產者連接*/
private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =new ConcurrentHashMap<>();
/*** 客戶端 ID -> 生產者連接*/
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();

上面的 doChannelCloseEvent 方法就是在處理這兩個集合,由于參數只有一個 channel,所以不知道是哪個生產者組下面的,就需要遍歷所有生產者組,一個一個去判斷刪除。


3.3 ConsumerManager#doChannelCloseEvent

/*** 關閉消費者連接* @param remoteAddr* @param channel*/
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {// 遍歷消費者組下面的消費者Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConsumerGroupInfo> next = it.next();// 獲取消費者組消息, 里面包括這個消費者組的訂閱信息、連接信息ConsumerGroupInfo info = next.getValue();// 刪除, 因為參數只有連接通道, 不確定是哪個消費者組下面的, 所以會遍歷所有消費組一個一個去判斷刪除boolean removed = info.doChannelCloseEvent(remoteAddr, channel);if (removed) {// 刪除成功了, 判斷下這個消費者組下面還有沒有消費者連接if (info.getChannelInfoTable().isEmpty()) {// 如果沒有了, 把這個消費者組也刪掉ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());if (remove != null) {// 如果刪除成功了log.info("unregister consumer ok, no any connection, and remove consumer group, {}",next.getKey());// 處理消費者組 UNREGISTER 事件, 這里面不回去通知消費者重平衡, 因為這個消費者組下面的消費者都刪掉了this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());}}// 通知其他所有消費者去重平衡, 但是由于上面 handle 之后沒有返回, 下面這里就會有兩種情況// 1.info.getChannelInfoTable().isEmpty() = true, 這里 getAllChannel 獲取到的就是空集合, CHANGE 事件對于空集合就是不處理// 2.info.getChannelInfoTable().isEmpty() = false, 意思就是只刪除了消費者組下面的一個消費者, 所以就通知其余消費者進行重平衡this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());}}
}/*** 關閉連接* @param remoteAddr* @param channel* @return*/
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {// 從 channelInfoTable 中刪除消費者連接final ClientChannelInfo info = this.channelInfoTable.remove(channel);if (info != null) {// 刪除成功log.warn("NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",info.toString(), groupName);return true;}// 刪除失敗, 就是沒找到消費連接return false;
}

消費者組會稍微復雜一點,由于消費者的信息比較多(消費點位、topic 訂閱消息),因此消費者的管理使用了 consumerTable 去管理消費者組下面的消費者信息。

private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);public class ConsumerGroupInfo {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);// 消費者組名private final String groupName;// 消費者組下面的消費者的訂閱情況, 不過一個消費者組一般都只會訂閱消費一個 topic 吧private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =new ConcurrentHashMap<String, SubscriptionData>();// 消費者組下面的連接信息private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =new ConcurrentHashMap<Channel, ClientChannelInfo>(16);// 消費者組的消費類型, PULL 或者 PUSHprivate volatile ConsumeType consumeType;// 消費模式, 廣播還是集群private volatile MessageModel messageModel;// 從哪個位置開始消費private volatile ConsumeFromWhere consumeFromWhere;// 上一次上報心跳的事件private volatile long lastUpdateTimestamp = System.currentTimeMillis();...
}

因此這里面的處理邏輯就是遍歷 consumerTable 集合,使用 doChannelCloseEvent 方法去判斷這個消費者組下面的 channelInfoTable 集合是否包含要刪除的 channel,如果包含就刪掉。刪除成功之后如果這個消費者組已經沒有消費者了,那么這個消費者組也可以刪掉了。

不過要注意一下,消費者涉及到重平衡的邏輯,但是重平衡是針對消費者組下面的消費者對 topic 下面的隊列去重平衡,如果消費者組都沒了就沒必要重平衡,因此可以看到如果說刪除消費者組成功了,通過 handle 處理的是 UNREGISTER 事件。而如果沒有刪除消費者組,只是刪除了消費者,那么通過 handle 處理的是 CHANGE 事件。


3.3.1 DefaultConsumerIdsChangeListener#handle

這個方法就是處理消費者變更的方法,只需要關注 CHANGE 和 UNREGISTER 就行。

@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {if (event == null) {return;}switch (event) {case CHANGE:// 消費者變更, 但是消費者組還在, 同時消費者組下面的其余消費者進行重平衡if (args == null || args.length < 1) {return;}// 消費者組下面的消費者連接List<Channel> channels = (List<Channel>) args[0];// 如果參數里面傳入了連接且配置是設置了當消費者變更就通知其他消費者重平衡if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {for (Channel chl : channels) {// 通知消費者重平衡this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);}}break;case UNREGISTER:// 這里就是整個消費者組從 consumerTable 集合中刪掉了, 由于消費者組下面沒有消費者, 因此也不需要重平衡this.brokerController.getConsumerFilterManager().unRegister(group);break;case REGISTER:// 消費者注冊if (args == null || args.length < 1) {return;}Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);break;default:throw new RuntimeException("Unknown event " + event);}
}

3.3.2 ConsumerFilterManager#unRegister

UNREGISTER 就是整個消費者組從 consumerTable 集合中刪掉了, 由于消費者組下面沒有消費者, 因此也不需要重平衡,所以 unRegister 就是簡單處理下消費者組的消費者過濾信息。

/*** 刪除消費者組過濾信息* @param consumerGroup*/
public void unRegister(String consumerGroup) {// 如果不包括, 直接返回if (!this.groupFilterData.containsKey(consumerGroup)) {return;}// 獲取消費者組的過濾信息ConsumerFilterData data = this.groupFilterData.get(consumerGroup);// 如果過濾信息過期了, 就是 dead > bornTimeif (data == null || data.isDead()) {return;}long now = System.currentTimeMillis();log.info("Unregister consumer filter: {}, deadTime: {}", data, now);// 設置刪除時間是當前時間data.setDeadTime(now);
}

當然了這里也是簡單看下里面的邏輯,等到后面講消費者的時候也會講消息過濾這塊的內容。


3.3.3 Broker2Client#notifyConsumerIdsChanged

/*** 通知消費者重平衡* @param channel* @param consumerGroup*/
public void notifyConsumerIdsChanged(final Channel channel,final String consumerGroup) {if (null == consumerGroup) {log.error("notifyConsumerIdsChanged consumerGroup is null");return;}// 構建請求頭NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();requestHeader.setConsumerGroup(consumerGroup);// 請求類型是 NOTIFY_CONSUMER_IDS_CHANGEDRemotingCommand request =RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);try {// 發送 OneWay 請求this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);} catch (Exception e) {log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());}
}

這里就是直接發送單向請求,請求 Code 為 NOTIFY_CONSUMER_IDS_CHANGED,意思是消費者組消費者發生變化,發送請求進行重平衡。

消費者的重平衡我們后面講消費者的時候再詳細介紹,這里先不說,先知道 Broker 怎么處理的就行。


3.4 FilterServerManager#doChannelCloseEvent

過濾服務的 doChannelCloseEvent 就是簡單講過濾服務連接從 filterServerTable 中刪掉。

/*** 刪除過濾服務連接, 直接從 filterServerTable 集合中刪除* @param remoteAddr* @param channel*/
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {FilterServerInfo old = this.filterServerTable.remove(channel);if (old != null) {log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),remoteAddr);}
}

4. 小結

這篇文章中我們主要是講述了 broker 的 NettyRemotingClient 和 NettyRemotingServer,其中 NettyRemotingServer 重點是講了 ClientHousekeepingServer 這個連接處理類,根據前面的探討我們也知道了 broker 既可以作為服務端和生產者、消費者、消息過濾服務通信,也可以作為客戶端和 NameServer 通信。





如有錯誤,歡迎指出!!!!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/80316.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/80316.shtml
英文地址,請注明出處:http://en.pswp.cn/web/80316.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C++性能測試工具——AMD CodeAnalyst及其新工具的使用

一、CodeAnalyst及其新的替代工具 與VTune相比&#xff0c;AMD也有自己的性能測試工具&#xff0c;也就是CodeAnalyst。不過目前看&#xff0c;其應該已經有些過時&#xff0c;目前AMD提供了更新的性能測試工具uProf或CodeXL&#xff0c;這些新工具的優點在于對新的硬件架構和…

ProfibusDP主站轉modbusTCP網關與ABB電機保護器數據交互

ProfibusDP主站轉modbusTCP網關與ABB電機保護器數據交互 在工業自動化領域&#xff0c;Profibus DP&#xff08;Process Field Bus&#xff09;和Modbus TCP是兩種常見的通訊協議&#xff0c;它們各自在不同的場合發揮著重要作用。然而&#xff0c;隨著技術的發展和應用需求的…

2025.05.17淘天機考筆試真題第三題

&#x1f4cc; 點擊直達筆試專欄 &#x1f449;《大廠筆試突圍》 &#x1f4bb; 春秋招筆試突圍在線OJ &#x1f449; 筆試突圍OJ 03. 奇偶平衡樹分割問題 問題描述 K小姐是一位園林設計師&#xff0c;她設計了一個由多個花壇組成的樹形公園。每個花壇中種植了不同數量的花…

第三十五節:特征檢測與描述-ORB 特征

1. 引言:為什么需要ORB? 在計算機視覺領域,特征檢測與描述是許多任務(如圖像匹配、目標跟蹤、三維重建等)的核心基礎。傳統的算法如SIFT(尺度不變特征變換)和SURF(加速穩健特征)因其優異的性能被廣泛應用,但它們存在兩個顯著問題: 專利限制:SIFT和SURF受專利保護,…

深入解讀WPDRRC信息安全模型:構建中國特色的信息安全防護體系

目錄 前言1 WPDRRC模型概述2 模型結構詳解2.1 預警&#xff08;Warning&#xff09;2.2 保護&#xff08;Protect&#xff09;2.3 檢測&#xff08;Detect&#xff09;2.4 響應&#xff08;React&#xff09;2.5 恢復&#xff08;Restore&#xff09;2.6 反擊&#xff08;Count…

《算法導論(第4版)》閱讀筆記:p82-p82

《算法導論(第4版)》學習第 17 天&#xff0c;p82-p82 總結&#xff0c;總計 1 頁。 一、技術總結 1. Matrix Matrices(矩陣) (1)教材 因為第 4 章涉及到矩陣&#xff0c;矩陣屬于線性代數(linear algebra)范疇&#xff0c;如果不熟悉&#xff0c;可以看一下作者推薦的兩本…

基于Spring Boot和Vue的在線考試系統架構設計與實現(源碼+論文+部署講解等)

源碼項目獲取聯系 請文末卡片dd我獲取更詳細的演示視頻 系統介紹 基于Spring Boot和Vue的在線考試系統。為學生和教師/管理員提供一個高效、便捷的在線學習、考試及管理平臺。系統采用前后端分離的架構&#xff0c;后端基于成熟穩定的Spring Boot框架&#xff0c;負責數據處理…

Codeforces Round 1024 (Div.2)

比賽鏈接&#xff1a;CF1024 A. Dinner Time 只有當 n n n 是 p p p 的倍數而且 n ? q p ? m \frac{n \cdot q}{p} \not m pn?q?m 時輸出 NO&#xff0c;其余情況均滿足條件。 時間復雜度&#xff1a; O ( 1 ) O(1) O(1)。 #include <bits/stdc.h> using na…

【LeetCode 熱題 100】二叉樹的最大深度 / 翻轉二叉樹 / 二叉樹的直徑 / 驗證二叉搜索樹

??個人主頁&#xff1a;小羊 ??所屬專欄&#xff1a;LeetCode 熱題 100 很榮幸您能閱讀我的文章&#xff0c;誠請評論指點&#xff0c;歡迎歡迎 ~ 目錄 二叉樹的中序遍歷二叉樹的最大深度翻轉二叉樹對稱二叉樹二叉樹的直徑二叉樹的層序遍歷將有序數組轉換為二叉搜索樹驗…

Tomcat發布websocket

一、tomcal的lib放入文件 tomcat-websocket.jar websocket-api.jar 二、代碼示例 package com.test.ws;import com.test.core.json.Jmode;import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.util.concurrent.CopyOnWriteArraySet; imp…

LLM筆記(二)LLM數據基礎-分詞算法(2)

文章目錄 1. 分詞算法概述1.1 基于詞典的&#xff08;或基于規則的&#xff09;分詞算法1.2 基于統計的&#xff08;或基于機器學習的&#xff09;分詞算法1.3 基于深度學習的分詞算法1.4 子詞&#xff08;Subword&#xff09;分詞算法1.5 混合分詞算法1.6 針對不同語言的特點 …

Uniapp開發鴻蒙應用時如何運行和調試項目

經過前幾天的分享&#xff0c;大家應該應該對uniapp開發鴻蒙應用的開發語法有了一定的了解&#xff0c;可以進行一些簡單的應用開發&#xff0c;今天分享一下在使用uniapp開發鴻蒙應用時怎么運行到鴻蒙設備&#xff0c;并且在開發中怎么調試程序。 運行 Uniapp項目支持運行到…

數據湖與數據倉庫融合:Hudi、Iceberg、Delta Lake 實踐對比

在實時與離線一體化的今天,數據湖與數據倉庫邊界不斷融合,越來越多企業選用如 Hudi、Iceberg、Delta Lake 等開源方案實現統一的數據存儲、計算、分析平臺。本篇將圍繞以下關鍵點,展開實戰對比與解決方案分享: ? 實時寫入能力 ? ACID 保證 ? 增量數據處理能力 ? 流批一…

Python爬蟲(29)Python爬蟲高階:動態頁面處理與云原生部署全鏈路實踐(Selenium、Scrapy、K8s)

目錄 引言&#xff1a;動態爬蟲的技術挑戰與云原生機遇一、動態頁面處理&#xff1a;Selenium與Scrapy的協同作戰1.1 Selenium的核心價值與局限1.2 Scrapy-Selenium中間件開發1.3 動態分頁處理實戰&#xff1a;京東商品爬蟲 二、云原生部署&#xff1a;Kubernetes架構設計與優化…

數據結構(十)——排序

一、選擇排序 1.簡單選擇排序 基本思想&#xff1a;假設排序表為[1,…,n]&#xff0c;第i趟排序即從[i,…,n]中選擇關鍵字最小的元素與L[i]交換 eg&#xff1a;給定關鍵字序列{87&#xff0c;45&#xff0c;78&#xff0c;32&#xff0c;17&#xff0c;65&#xff0c;53&…

小結:jvm 類加載過程

類加載過程 是Java虛擬機&#xff08;JVM&#xff09;將字節碼文件&#xff08;.class文件&#xff09;加載到內存中&#xff0c;并轉換為運行時數據結構的過程。這個過程可以分為多個步驟&#xff0c;每個步驟都有其特定的任務和目的。根據你提供的信息&#xff0c;以下是類加…

2024 山東省ccpc省賽

目錄 I&#xff08;簽到&#xff09; 題目簡述&#xff1a; 思路&#xff1a; 代碼&#xff1a; A&#xff08;二分答案&#xff09; 題目簡述&#xff1a; 思路&#xff1a; 代碼&#xff1a; K&#xff08;構造&#xff09; 題目&#xff1a; 思路&#xff1a; 代…

turn.js與 PHP 結合使用來實現 PDF 文件的頁面切換效果

將 Turn.js 與 PHP 結合使用來實現 PDF 文件的頁面切換效果&#xff0c;你需要一個中間步驟將 PDF 轉換為 Turn.js 可以處理的格式&#xff08;如 HTML 頁面或圖片&#xff09;。以下是實現這一功能的步驟和示例代碼&#xff1a; 步驟 1: 安裝必要的庫 首先&#xff0c;你需要…

Python實現NOA星雀優化算法優化卷積神經網絡CNN回歸模型項目實戰

說明&#xff1a;這是一個機器學習實戰項目&#xff08;附帶數據代碼文檔視頻講解&#xff09;&#xff0c;如需數據代碼文檔視頻講解可以直接到文章最后關注獲取。 1.項目背景 在當今數據驅動的時代&#xff0c;卷積神經網絡&#xff08;CNN&#xff09;不僅在圖像分類任務中…

(面試)View相關知識

1、View繪制流程 onMeasure() 確定View的測量寬高。onLayout() 確定View的最終寬高和四個頂點的位置。onDraw() 將View 繪制到屏幕上。 2、MeasureSpec有三種測量模式&#xff1a; 2.1. EXACTLY&#xff08;精確模式&#xff09; 含義&#xff1a;父容器明確指定了子View的精…