文章目錄
- 1. 前言
- 2. BrokerOuterAPI
- 2.1 NettyRemotingClient
- 2.2 start 啟動
- 2.2.1 NettyRemotingClient#start
- 3. NettyRemotingServer
- 3.1 ClientHousekeepingService
- 3.2 ProducerManager#doChannelCloseEvent
- 3.3 ConsumerManager#doChannelCloseEvent
- 3.3.1 DefaultConsumerIdsChangeListener#handle
- 3.3.2 ConsumerFilterManager#unRegister
- 3.3.3 Broker2Client#notifyConsumerIdsChanged
- 3.4 FilterServerManager#doChannelCloseEvent
- 4. 小結
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源碼系列目錄
- 【RocketMQ NameServer】- NameServer 啟動源碼
在前面的 NameServer 啟動源碼,我們探討了 NettyRemotingServer 的啟動源碼,NameServer 作為服務端通過 NettyRemotingServer 處理 Broker 上報的信息,當然了 NettyRemotingServer 不單單是 NameServer 獨有的,Broker 既可以作為服務端和生產者、消費者通信,又可以作為客戶端和 NameServer 通信。
Broker 作為客戶端通信時,將一些方法比如 fetchNameServerAddr 拉取 NameServer 地址、注冊 Broker 信息到 NameServer、取消注冊 Broker 信息都封裝到了 BrokerOuterAPI 中。
2. BrokerOuterAPI
BrokerOuterAPI 在 BrokerController 構造器中被初始化,可以看到傳入的就是 NettyClientConfig,也就是 Netty 客戶端的配置,下面就來看下構造器的初始化。
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {this(nettyClientConfig, null);
}public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {this.remotingClient = new NettyRemotingClient(nettyClientConfig);this.remotingClient.registerRPCHook(rpcHook);
}
可以看到 BrokerOuterAPI 的構造器就是初始化了 NettyRemotingClient 這個客戶端通信類,同時注冊進去的 RPC 鉤子為 null,也就是在跟 NameServer 通信的前后不會調用 RPC 鉤子的前置和后置方法,所以核心還是 NettyRemotingClient。
2.1 NettyRemotingClient
public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {this(nettyClientConfig, null);
}public NettyRemotingClient(final NettyClientConfig nettyClientConfig,final ChannelEventListener channelEventListener) {// 首先設置服務器單向、異步發送請求的信號量, 這個是為了防止同一時間發送太多單向請求或者異步請求, 默認都是 65536super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());// 客戶端配置this.nettyClientConfig = nettyClientConfig;// 連接事件監聽器this.channelEventListener = channelEventListener;// 公開線程池 publicExecutor 的核心線程數, 默認就是 4int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 創建一個默認線程數為 4 的線程池, 這個線程池可以用于處理請求回調this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 單線程 I/O 事件處理器(線程數 1)this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));}});if (nettyClientConfig.isUseTLS()) {try {sslContext = TlsHelper.buildSslContext(true);log.info("SSL enabled for client");} catch (IOException e) {log.error("Failed to create SSLContext", e);} catch (CertificateException e) {log.error("Failed to create SSLContext", e);throw new RuntimeException("Failed to create SSLContext", e);}}
}
這里的邏輯跟 NettyRemotingServer 差不多,只是作為客戶端不需要創建 Accept 事件 Reactor 組,也就是 bossGroup。
2.2 start 啟動
創建出 BrokerOuterAPI 之后,會在 BrokerController#start 方法中啟動。
這里面也是啟動了 remotingClient,所以下面就來看下 NettyRemotingClient 的 start 方法。
2.2.1 NettyRemotingClient#start
/*** 客戶端 Netty 服務啟動*/
@Override
public void start() {// 創建默認的事件處理組, 專門用于處理一些執行前的編解碼、連接空閑狀態檢測、網絡連接管理等操作, 不管是接收到消息還是發送消息(入站或者出站處理器)// 都可以通過這個線程池去做處理, 這樣就能避免 I/O 線程浪費資源在這些狀態檢測上, 這里默認創建的是 4 個線程大小的線程池this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {// 創建出來的線程是 NettyClientWorkerThread_0、NettyClientWorkerThread_1 ...return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());}});// I/O 事件處理器(線程數 1)Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// 禁用 Nagle 算法, Nagle 算法是說當應用程序發送的數據量較小時,TCP 不會立即將這些數據發送出去,而是會等待一段時間,將后續的數據合并// 成一個較大的數據包后再發送。這樣可以減少網絡上的數據包數量,降低網絡擁塞的可能性,提高傳輸效率.option(ChannelOption.TCP_NODELAY, true)// 用于 TCP 連接心跳檢測, 檢測 TCP 連接是否活躍, 這里可能是 Netty 自己已經存在更精細的 IdleStateHandler 處理器就行了.option(ChannelOption.SO_KEEPALIVE, false)// 連接超時時間.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}pipeline.addLast(defaultEventExecutorGroup,// 編碼處理器new NettyEncoder(),// 解碼處理器new NettyDecoder(),// 維護心跳連接的處理器, 指定的三個參數是值讀空閑時間、寫空閑時間、總空閑時間, 如果服務端一定時間內沒有讀寫就會出發不同的事件,這三個參數對應的// 事件分別是: IdleStateEvent.READER_IDLE、IdleStateEvent.WRITER_IDLE、IdleStateEvent.ALL_IDLE, 總之這玩意就是用來檢測有沒有長時間不// 讀寫的, 這樣可以判斷一個連接是不是空閑連接new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),// 連接管理器, 主要是管理連接的活躍、非活躍、注冊、取消注冊 ... 事件new NettyConnectManageHandler(),// 客戶端業務處理器new NettyClientHandler());}});// 設置 Netty 客戶端的消息接收緩沖區, 默認是 0if (nettyClientConfig.getClientSocketSndBufSize() > 0) {log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());}// 設置 Netty 客戶端的消息發送緩沖區, 默認也是 0if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());}// 寫緩沖區的低水位和高水位if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));}// 定時任務, 初始化之后 3s 執行, 之后 1s 執行一次this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {// 掃描 responseTable, 將超時的 ResponseFuture 刪掉, 然后執行回調邏輯NettyRemotingClient.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);// 如果連接監聽器設置了, 就啟動 NettyEventExecutor 去處理 NettyEventif (this.channelEventListener != null) {// broker 啟動的時候是不會設置連接監聽器的, 生產者和消費者啟動才會設置this.nettyEventExecutor.start();}
}
這里的方法跟 NettyRemotingServer 也是差不多的,首先是創建 defaultEventExecutorGroup 默認的事件處理組,專門用于處理一些執行前的編解碼、連接空閑狀態檢測、網絡連接管理等操作,不管是接收到消息還是發送消息(入站或者出站處理器) 都可以通過這個線程池去做處理,這樣就能避免 I/O 線程浪費資源在這些狀態檢測上, 這里默認創建的是 4 個線程大小的線程池。
接下來就是創建 Bootstrap,因為客戶端不涉及處理 Accept 連接事件,所以只需要設置從 Reactor 組就可以,不過這里面的 Reactor 線程組大小是 1,所以是單線程處理 IO 讀寫事件?很久沒看了也不是很清楚。
接著往下看,下面 initChannel 方法中往 Channel 的 Pipeline 里面設置的處理器包括編碼處理器,解碼處理器,心跳連接處理器,連接管理器和客戶端業務處理器。參看 NettyRemotingServer 的處理器。
可以看到的是跟 NettyRemotingServer 對比,入站處理器就少了一個 HandshakeHandler,因為不需要處理連接事件,同時最后的 NettyServerHandler 換成了 NettyClientHandler。出站處理器則是一樣的。
最后就是啟動定時任務,初始化之后 3s 執行, 之后 1s 執行一次,掃描 responseTable,將超時的 ResponseFuture 刪掉,然后執行回調邏輯。
由于 BrokerOuterAPI 并沒有設置 channelEventListener,所以就不會啟動 nettyEventExecutor,broker 啟動的時候是不會設置連接監聽器的, 生產者和消費者啟動才會設置,關于 nettyEventExecutor 可以看這篇文章:【RocketMQ NameServer】- NettyEventExecutor 處理 Netty 事件。
3. NettyRemotingServer
在第二小節我們說了,Broker 可以作為服務端跟生產者、消費者、過濾服務通信,也可以作為客戶端向 NameServer 發送請求,作為服務端,Broker 會在 initialize 初始化的時候創建出 fastRemotingServer 和 remotingServer。
fastRemotingServer 和 remotingServer 都是作為 Netty 服務端通信用的,只是一個監聽 10909,一個監聽 10911,fastRemotingServer 所建立的連接通道也是 VIP 通道,VIP 意思就是性能比較高的,一般都是用于內部使用,比如超時同步加鎖請求、獲取配置等等,而 remotingServer 監聽端口 10911,專門處理來自生產者和消費者的請求,比如消息發送、消息拉取,因為生產者和消費者的交互是很頻繁的,所以自熱而然請求的性能就會慢一點。
當然了,創建 NettyRemotingServer 和啟動 NettyRemotingServer 的流程在 NameServer 啟動的文章中已經有講解過源碼,在 NettyRemotingServer 啟動的源碼中,我們也知道了 NettyRemotingServer 啟動的時候會啟動 nettyEventExecutor 線程,專門從 eventQueue 獲取不同的連接事件然后做不同的處理,關于連接事件 NettyEvent,后面我也寫了一篇文章 【RocketMQ NameServer】- NettyEventExecutor 處理 Netty 事件 來講述 NettyEventExecutor 是如何處理 Netty 事件的,NettyEvent 又是怎么來的。
當時寫上面這篇文章的時候可以看到 NettyEventExecutor#run 方法實際上是調用了 BrokerHousekeepingService 去處理連接事件,而 broker 的 remotingServer 看源碼也能看到是傳入了 clientHousekeepingService 去處理生產者和消費者的連接事件的,所以這里我們就來看下 clientHousekeepingService 這個類的邏輯。
3.1 ClientHousekeepingService
其實看名字就能看出來了,NameServer 通過 BrokerHousekeepingService 是專門處理 Broker 的連接事件,而 Broker 通過 ClientHousekeepingService 是專門處理生產者、消費者、過濾服務(已廢棄)的連接通道。
在 【RocketMQ NameServer】- NettyEventExecutor 處理 Netty 事件
這篇文章我們也知道了
- NettyEventExecutor 會調用 ClientHousekeepingService#onChannelIdle 處理連接空閑事件(IDLE)
- NettyEventExecutor 會調用 ClientHousekeepingService#onChannelClose 處理連接關閉事件(CLOSE)
- NettyEventExecutor 會調用 ClientHousekeepingService#onChannelConnect 處理生產者和消費者連接事件(CONNECT)
- NettyEventExecutor 會調用 ClientHousekeepingService#onChannelException 處理連接異常事件(EXCEPTION)
那下面來看下這幾個方法,由于源碼不多,就直接貼出來。
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {}/*** 連接關閉* @param remoteAddr* @param channel*/
@Override
public void onChannelClose(String remoteAddr, Channel channel) {this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}/*** 連接處理發生異常* @param remoteAddr* @param channel*/
@Override
public void onChannelException(String remoteAddr, Channel channel) {this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}/*** 空閑連接處理* @param remoteAddr* @param channel*/
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}
可以看到,onChannelConnect 方法是一個空實現,應該是因為生產者和消費者都會定時上報心跳給 broker,所以連接不需要額外處理。
然后其他三個類型的事件都是調用同一個方法 doChannelCloseEvent 去處理,只是生產者、消費者、過濾服務器的邏輯不同,下面就一個一個來看下這里面的處理邏輯。
3.2 ProducerManager#doChannelCloseEvent
/*** broker 關閉和生產者的連接* @param remoteAddr* @param channel*/
public synchronized void doChannelCloseEvent(final String remoteAddr, final Channel channel) {if (channel != null) {// 遍歷所有生產者連接for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {// 生產者組final String group = entry.getKey();// 生產者組下面的生產者連接final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable =entry.getValue();// 刪除這個連接final ClientChannelInfo clientChannelInfo =clientChannelInfoTable.remove(channel);if (clientChannelInfo != null) {// 刪除成功之后, 將這個生產者的客戶端 ID 也從 clientChannelTable 集合中刪掉clientChannelTable.remove(clientChannelInfo.getClientId());log.info("NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",clientChannelInfo.toString(), remoteAddr, group);}}}
}
生產者管理類 ProducerManager#groupChannelTable
集合會存儲生產者組下面的所有生產者連接,clientChannelTable
存儲了【客戶端 ID -> 連接】映射關系。
/*** 生產者組 -> 生產者組下面的生產者連接*/
private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =new ConcurrentHashMap<>();
/*** 客戶端 ID -> 生產者連接*/
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
上面的 doChannelCloseEvent 方法就是在處理這兩個集合,由于參數只有一個 channel,所以不知道是哪個生產者組下面的,就需要遍歷所有生產者組,一個一個去判斷刪除。
3.3 ConsumerManager#doChannelCloseEvent
/*** 關閉消費者連接* @param remoteAddr* @param channel*/
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {// 遍歷消費者組下面的消費者Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConsumerGroupInfo> next = it.next();// 獲取消費者組消息, 里面包括這個消費者組的訂閱信息、連接信息ConsumerGroupInfo info = next.getValue();// 刪除, 因為參數只有連接通道, 不確定是哪個消費者組下面的, 所以會遍歷所有消費組一個一個去判斷刪除boolean removed = info.doChannelCloseEvent(remoteAddr, channel);if (removed) {// 刪除成功了, 判斷下這個消費者組下面還有沒有消費者連接if (info.getChannelInfoTable().isEmpty()) {// 如果沒有了, 把這個消費者組也刪掉ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());if (remove != null) {// 如果刪除成功了log.info("unregister consumer ok, no any connection, and remove consumer group, {}",next.getKey());// 處理消費者組 UNREGISTER 事件, 這里面不回去通知消費者重平衡, 因為這個消費者組下面的消費者都刪掉了this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());}}// 通知其他所有消費者去重平衡, 但是由于上面 handle 之后沒有返回, 下面這里就會有兩種情況// 1.info.getChannelInfoTable().isEmpty() = true, 這里 getAllChannel 獲取到的就是空集合, CHANGE 事件對于空集合就是不處理// 2.info.getChannelInfoTable().isEmpty() = false, 意思就是只刪除了消費者組下面的一個消費者, 所以就通知其余消費者進行重平衡this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());}}
}/*** 關閉連接* @param remoteAddr* @param channel* @return*/
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {// 從 channelInfoTable 中刪除消費者連接final ClientChannelInfo info = this.channelInfoTable.remove(channel);if (info != null) {// 刪除成功log.warn("NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",info.toString(), groupName);return true;}// 刪除失敗, 就是沒找到消費連接return false;
}
消費者組會稍微復雜一點,由于消費者的信息比較多(消費點位、topic 訂閱消息),因此消費者的管理使用了 consumerTable
去管理消費者組下面的消費者信息。
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);public class ConsumerGroupInfo {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);// 消費者組名private final String groupName;// 消費者組下面的消費者的訂閱情況, 不過一個消費者組一般都只會訂閱消費一個 topic 吧private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =new ConcurrentHashMap<String, SubscriptionData>();// 消費者組下面的連接信息private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =new ConcurrentHashMap<Channel, ClientChannelInfo>(16);// 消費者組的消費類型, PULL 或者 PUSHprivate volatile ConsumeType consumeType;// 消費模式, 廣播還是集群private volatile MessageModel messageModel;// 從哪個位置開始消費private volatile ConsumeFromWhere consumeFromWhere;// 上一次上報心跳的事件private volatile long lastUpdateTimestamp = System.currentTimeMillis();...
}
因此這里面的處理邏輯就是遍歷 consumerTable
集合,使用 doChannelCloseEvent
方法去判斷這個消費者組下面的 channelInfoTable
集合是否包含要刪除的 channel
,如果包含就刪掉。刪除成功之后如果這個消費者組已經沒有消費者了,那么這個消費者組也可以刪掉了。
不過要注意一下,消費者涉及到重平衡的邏輯,但是重平衡是針對消費者組下面的消費者對 topic 下面的隊列去重平衡,如果消費者組都沒了就沒必要重平衡,因此可以看到如果說刪除消費者組成功了,通過 handle 處理的是 UNREGISTER 事件。而如果沒有刪除消費者組,只是刪除了消費者,那么通過 handle 處理的是 CHANGE 事件。
3.3.1 DefaultConsumerIdsChangeListener#handle
這個方法就是處理消費者變更的方法,只需要關注 CHANGE 和 UNREGISTER 就行。
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {if (event == null) {return;}switch (event) {case CHANGE:// 消費者變更, 但是消費者組還在, 同時消費者組下面的其余消費者進行重平衡if (args == null || args.length < 1) {return;}// 消費者組下面的消費者連接List<Channel> channels = (List<Channel>) args[0];// 如果參數里面傳入了連接且配置是設置了當消費者變更就通知其他消費者重平衡if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {for (Channel chl : channels) {// 通知消費者重平衡this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);}}break;case UNREGISTER:// 這里就是整個消費者組從 consumerTable 集合中刪掉了, 由于消費者組下面沒有消費者, 因此也不需要重平衡this.brokerController.getConsumerFilterManager().unRegister(group);break;case REGISTER:// 消費者注冊if (args == null || args.length < 1) {return;}Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);break;default:throw new RuntimeException("Unknown event " + event);}
}
3.3.2 ConsumerFilterManager#unRegister
UNREGISTER 就是整個消費者組從 consumerTable 集合中刪掉了, 由于消費者組下面沒有消費者, 因此也不需要重平衡,所以 unRegister
就是簡單處理下消費者組的消費者過濾信息。
/*** 刪除消費者組過濾信息* @param consumerGroup*/
public void unRegister(String consumerGroup) {// 如果不包括, 直接返回if (!this.groupFilterData.containsKey(consumerGroup)) {return;}// 獲取消費者組的過濾信息ConsumerFilterData data = this.groupFilterData.get(consumerGroup);// 如果過濾信息過期了, 就是 dead > bornTimeif (data == null || data.isDead()) {return;}long now = System.currentTimeMillis();log.info("Unregister consumer filter: {}, deadTime: {}", data, now);// 設置刪除時間是當前時間data.setDeadTime(now);
}
當然了這里也是簡單看下里面的邏輯,等到后面講消費者的時候也會講消息過濾這塊的內容。
3.3.3 Broker2Client#notifyConsumerIdsChanged
/*** 通知消費者重平衡* @param channel* @param consumerGroup*/
public void notifyConsumerIdsChanged(final Channel channel,final String consumerGroup) {if (null == consumerGroup) {log.error("notifyConsumerIdsChanged consumerGroup is null");return;}// 構建請求頭NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();requestHeader.setConsumerGroup(consumerGroup);// 請求類型是 NOTIFY_CONSUMER_IDS_CHANGEDRemotingCommand request =RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);try {// 發送 OneWay 請求this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);} catch (Exception e) {log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());}
}
這里就是直接發送單向請求,請求 Code 為 NOTIFY_CONSUMER_IDS_CHANGED,意思是消費者組消費者發生變化,發送請求進行重平衡。
消費者的重平衡我們后面講消費者的時候再詳細介紹,這里先不說,先知道 Broker 怎么處理的就行。
3.4 FilterServerManager#doChannelCloseEvent
過濾服務的 doChannelCloseEvent 就是簡單講過濾服務連接從 filterServerTable 中刪掉。
/*** 刪除過濾服務連接, 直接從 filterServerTable 集合中刪除* @param remoteAddr* @param channel*/
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {FilterServerInfo old = this.filterServerTable.remove(channel);if (old != null) {log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),remoteAddr);}
}
4. 小結
這篇文章中我們主要是講述了 broker 的 NettyRemotingClient 和 NettyRemotingServer,其中 NettyRemotingServer 重點是講了 ClientHousekeepingServer 這個連接處理類,根據前面的探討我們也知道了 broker 既可以作為服務端和生產者、消費者、消息過濾服務通信,也可以作為客戶端和 NameServer 通信。
如有錯誤,歡迎指出!!!!