【踩坑記錄】
本人下載的Nacos 服務端版本是2.3.2,在開始進行源碼編譯便遇到問題,下面是各個問題記錄
源碼大量爆紅
在最開始用Idea加載Maven項目的時候,發現項目中大量的代碼爆紅,提示其類或者包不存在,后來結果查閱資料發現,是Nacos在2.x的版本引入了Grpc通信方式,并且采用Protobuf作為序列化協議。
導致源碼中有一些類是采用其進行編寫的,我們需要對其進行編譯,才能生成對應的包和Class,大家如果在閱讀其他源碼時遇到這種情況,可以向這個方面去想一想。
具體來說有圖中兩個模塊,需要編譯,但是直接在父項目進行Compile即可,編譯后重新刷新項目或者重新構建即可。
斷點失效?
在成功啟動Nacos服務的時候,是采用斷點調試方式啟動的
同時也對其進行設置了單機模式
最后也成功啟動了,就在我創建Demo項目后,進行啟動。
控制臺也有了對應的服務(這里需要提示一下,Nacos的客戶端必須要有 name 這個配置,否則不會注冊
spring:application:name: nacos-democloud:nacos:discovery:server-addr: 127.0.0.1:8841ephemeral: true // 是否臨時注冊,默認為true
通過上一篇客戶端的文章,跨域知道,其底層進行注冊時是調用
this.reqApi(UtilAndComs.nacosUrlInstance, params, "POST");
可以知道,其接口地址是/nacos/v1/ns/instance,于是我興高采烈的去服務的打斷點,如下
然后重新啟動客戶端,奇怪的是,并沒有在斷點處阻塞,而是直接注冊成功了,這我非常蒙,就在絞盡腦汁的時候,突然想到前面不是說了,引入了Grpc嗎,后面一搜,果然,2.x版本默認采用grpc進行交互。那這個坑到這就解決了。
對于Nacos 2.x版本,默認是通過gRPC協議進行通信的
正確的入口在這。
實例注冊源碼分析
1.0 入口
客戶端調用入口,首先通過傳入的元信息,插件Service,然后根據操作類型,進入對應的函數;
2.0注冊流程開始
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)throws NacosException {clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),request.getInstance().getIp(), request.getInstance().getPort()));return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}
2.1 注冊并發布事件
@Overridepublic void registerInstance(Service service, Instance instance, String clientId) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);// 通過單例模式 + map 獲取對應的服務(沒有就添加到對應的map中)Service singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM,String.format("Current service %s is persistent service, can't register ephemeral instance.",singleton.getGroupedServiceName()));}// 獲取當前連接ID,也是通過并發map 管理連接,注意不要長時間阻塞斷點,否則連接會斷開,導致拿不到Client client = clientManager.getClient(clientId);checkClientIsLegal(client, clientId);InstancePublishInfo instanceInfo = getPublishInfo(instance);client.addServiceInstance(singleton, instanceInfo);// 設置當前連接最新更新的時間client.setLastUpdatedTime();// 重新設置版本號,用于版本控制client.recalculateRevision();// 發布事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}
3.0 NotifyCenter - 統一事件通知中心。
/** Copyright 1999-2018 Alibaba Group Holding Ltd.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/import static com.alibaba.nacos.api.exception.NacosException.SERVER_ERROR;
/*** Unified Event Notify Center.* 統一事件通知中心 - 實現了一個事件發布-訂閱框架,用于處理系統內的事件通知** @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>* @author zongtanghu*/
public class NotifyCenter {// 日志記錄器private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class);// 環形緩沖區大小 - 用于普通事件發布器的隊列大小public static int ringBufferSize;// 共享緩沖區大小 - 用于慢速事件共享發布器的隊列大小public static int shareBufferSize;// 通知中心關閉狀態標志 - 使用原子布爾值確保線程安全private static final AtomicBoolean CLOSED = new AtomicBoolean(false);// 默認事件發布器工廠 - 用于創建不同類型的事件發布器private static final EventPublisherFactory DEFAULT_PUBLISHER_FACTORY;// NotifyCenter單例實例 - 通過靜態final保證單例private static final NotifyCenter INSTANCE = new NotifyCenter();// 共享發布器實例 - 用于處理所有SlowEvent慢速事件private DefaultSharePublisher sharePublisher;// 事件發布器類型 - 通過SPI加載或使用默認實現private static Class<? extends EventPublisher> clazz;/*** 發布器管理容器 - 存儲不同事件類型對應的發布器* key: 事件類的規范名稱* value: 對應的事件發布器*/private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);// 靜態初始化塊 - 在類首次加載時執行初始化static {// 從系統屬性讀取環形緩沖區大小,默認為16384// 對于寫入吞吐量高的應用,需要適當增加這個值String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);// 從系統屬性讀取共享緩沖區大小,默認為1024// 用于公共發布器的消息暫存隊列緩沖區大小String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);// 通過SPI機制加載EventPublisher的實現類final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);Iterator<EventPublisher> iterator = publishers.iterator();// 如果找到自定義實現類,使用第一個;否則使用默認實現if (iterator.hasNext()) {clazz = iterator.next().getClass();} else {clazz = DefaultPublisher.class;}// 初始化默認發布器工廠 - 使用lambda表達式實現工廠接口DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {try {// 創建發布器實例并初始化EventPublisher publisher = clazz.newInstance();publisher.init(cls, buffer);return publisher;} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);throw new NacosRuntimeException(SERVER_ERROR, ex);}};try {// 創建并初始化共享發布器實例 - 用于處理所有SlowEventINSTANCE.sharePublisher = new DefaultSharePublisher();INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);}// 添加JVM關閉鉤子,確保系統關閉時能夠正確釋放資源ThreadUtils.addShutdownHook(NotifyCenter::shutdown);}/*** 獲取發布器映射表 - 僅用于測試** @return 發布器映射表*/@JustForTestpublic static Map<String, EventPublisher> getPublisherMap() {return INSTANCE.publisherMap;}/*** 根據事件類型獲取對應的發布器** @param topic 事件類型* @return 對應的事件發布器*/public static EventPublisher getPublisher(Class<? extends Event> topic) {// 如果是SlowEvent類型,使用共享發布器if (ClassUtils.isAssignableFrom(SlowEvent.class, topic)) {return INSTANCE.sharePublisher;}// 否則從映射表中獲取對應的發布器return INSTANCE.publisherMap.get(topic.getCanonicalName());}/*** 獲取共享發布器實例** @return 共享發布器實例*/public static EventPublisher getSharePublisher() {return INSTANCE.sharePublisher;}/*** 關閉通知中心及其包含的所有發布器實例* 使用CAS操作確保只執行一次*/public static void shutdown() {// 如果已經關閉,則直接返回,避免重復關閉if (!CLOSED.compareAndSet(false, true)) {return;}LOGGER.warn("[NotifyCenter] Start destroying Publisher");// 關閉所有普通發布器for (Map.Entry<String, EventPublisher> entry : INSTANCE.publisherMap.entrySet()) {try {EventPublisher eventPublisher = entry.getValue();eventPublisher.shutdown();} catch (Throwable e) {LOGGER.error("[EventPublisher] shutdown has error : ", e);}}// 關閉共享發布器try {INSTANCE.sharePublisher.shutdown();} catch (Throwable e) {LOGGER.error("[SharePublisher] shutdown has error : ", e);}LOGGER.warn("[NotifyCenter] Destruction of the end");}/*** 注冊訂閱者(使用默認發布器工廠)* 如果發布器不存在,會使用默認工廠創建一個新的發布器** @param consumer 訂閱者實例*/public static void registerSubscriber(final Subscriber consumer) {registerSubscriber(consumer, DEFAULT_PUBLISHER_FACTORY);}/*** 使用指定工廠注冊訂閱者* 如果發布器不存在,會使用指定工廠創建一個新的發布器** @param consumer 訂閱者實例* @param factory 發布器工廠*/public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {// 處理智能訂閱者 - 可以訂閱多種事件類型// 如果要監聽多個事件,需要分別進行處理// 基于子類的subscribeTypes方法返回的列表,可以注冊到發布器if (consumer instanceof SmartSubscriber) {for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {// 對于慢速事件,注冊到共享發布器if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);} else {// 對于普通事件,注冊到對應的發布器addSubscriber(consumer, subscribeType, factory);}}return;}// 處理普通訂閱者 - 只訂閱一種事件類型final Class<? extends Event> subscribeType = consumer.subscribeType();// 如果是慢速事件,注冊到共享發布器if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);return;}// 對于普通事件,注冊到對應的發布器addSubscriber(consumer, subscribeType, factory);}/*** 將訂閱者添加到發布器中* 如果發布器不存在,會先創建發布器** @param consumer 訂閱者實例* @param subscribeType 訂閱的事件類型* @param factory 發布器工廠*/private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,EventPublisherFactory factory) {// 獲取事件類型的規范名稱作為topicfinal String topic = ClassUtils.getCanonicalName(subscribeType);synchronized (NotifyCenter.class) {// 確保發布器存在,如果不存在則創建// 注釋說MapUtils.computeIfAbsent是不安全的方法,這里使用自定義的MapUtilMapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);}// 獲取發布器并添加訂閱者EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher instanceof ShardedEventPublisher) {// 如果是分片發布器,需要傳入訂閱類型((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);} else {// 普通發布器直接添加訂閱者publisher.addSubscriber(consumer);}}/*** 取消訂閱者的注冊** @param consumer 訂閱者實例* @throws NoSuchElementException 如果訂閱者沒有對應的發布器*/public static void deregisterSubscriber(final Subscriber consumer) {// 處理智能訂閱者 - 需要逐一取消多個事件類型的訂閱if (consumer instanceof SmartSubscriber) {for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {// 從共享發布器中移除訂閱INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);} else {// 從普通發布器中移除訂閱removeSubscriber(consumer, subscribeType);}}return;}// 處理普通訂閱者 - 只有一個訂閱類型final Class<? extends Event> subscribeType = consumer.subscribeType();if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {// 從共享發布器中移除訂閱INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);return;}// 從普通發布器中移除訂閱,如果移除失敗則拋出異常if (removeSubscriber(consumer, subscribeType)) {return;}throw new NoSuchElementException("The subscriber has no event publisher");}/*** 從發布器中移除訂閱者** @param consumer 訂閱者實例* @param subscribeType 訂閱的事件類型* @return 移除是否成功*/private static boolean removeSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {// 獲取事件類型的規范名稱作為topicfinal String topic = ClassUtils.getCanonicalName(subscribeType);// 查找對應的發布器EventPublisher eventPublisher = INSTANCE.publisherMap.get(topic);if (null == eventPublisher) {return false;}// 根據發布器類型調用不同的移除方法if (eventPublisher instanceof ShardedEventPublisher) {((ShardedEventPublisher) eventPublisher).removeSubscriber(consumer, subscribeType);} else {eventPublisher.removeSubscriber(consumer);}return true;}/*** 請求發布器發布事件* 發布器采用懶加載模式,只有在實際發布事件時才會調用publisher.start()** @param event 事件實例* @return 發布是否成功*/public static boolean publishEvent(final Event event) {try {// 通過事件實例獲取事件類型,并調用內部發布方法return publishEvent(event.getClass(), event);} catch (Throwable ex) {// 捕獲所有異常,確保不影響調用方LOGGER.error("There was an exception to the message publishing : ", ex);return false;}}/*** 請求發布器發布事件的內部實現** @param eventType 事件類型* @param event 事件實例* @return 發布是否成功*/private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {// 如果是慢速事件,使用共享發布器發布if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}// 獲取事件類型的規范名稱作為topicfinal String topic = ClassUtils.getCanonicalName(eventType);// 查找對應的發布器并發布事件EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {return publisher.publish(event);}// 對于插件事件,允許沒有對應發布器if (event.isPluginEvent()) {return true;}// 找不到發布器,記錄警告日志LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;}/*** 注冊到共享發布器* 用于將慢速事件類型注冊到共享發布器** @param eventType 慢速事件類型* @return 共享發布器實例*/public static EventPublisher registerToSharePublisher(final Class<? extends SlowEvent> eventType) {return INSTANCE.sharePublisher;}/*** 使用默認工廠注冊發布器** @param eventType 事件類型* @param queueMaxSize 發布器隊列最大大小* @return 注冊的發布器實例*/public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);}/*** 使用指定工廠注冊發布器** @param eventType 事件類型* @param factory 發布器工廠* @param queueMaxSize 發布器隊列最大大小* @return 注冊的發布器實例*/public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,final EventPublisherFactory factory, final int queueMaxSize) {// 如果是慢速事件,直接返回共享發布器if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher;}// 獲取事件類型的規范名稱作為topicfinal String topic = ClassUtils.getCanonicalName(eventType);synchronized (NotifyCenter.class) {// 確保發布器存在,如果不存在則創建// 注釋說MapUtils.computeIfAbsent是不安全的方法,這里使用自定義的MapUtilMapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);}return INSTANCE.publisherMap.get(topic);}/*** 注冊指定的發布器* 允許用戶提供自定義的發布器實例** @param eventType 事件類型* @param publisher 指定的事件發布器*/public static void registerToPublisher(final Class<? extends Event> eventType, final EventPublisher publisher) {// 空檢查,避免空指針異常if (null == publisher) {return;}// 獲取事件類型的規范名稱作為topicfinal String topic = ClassUtils.getCanonicalName(eventType);synchronized (NotifyCenter.class) {// 只有在不存在時才放入,避免覆蓋已有發布器INSTANCE.publisherMap.putIfAbsent(topic, publisher);}}/*** 取消注冊發布器* 將指定事件類型的發布器從管理容器中移除并關閉** @param eventType 事件類型*/public static void deregisterPublisher(final Class<? extends Event> eventType) {// 獲取事件類型的規范名稱作為topicfinal String topic = ClassUtils.getCanonicalName(eventType);// 從管理容器中移除發布器EventPublisher publisher = INSTANCE.publisherMap.remove(topic);try {// 關閉發布器,釋放資源publisher.shutdown();} catch (Throwable ex) {LOGGER.error("There was an exception when publisher shutdown : ", ex);}}}
Nacos NotifyCenter 核心設計與功能
一、整體架構
NotifyCenter 是 Nacos 中的統一事件通知中心,實現了一個高效的事件發布-訂閱框架,用于系統內部組件間的解耦通信。它采用單例模式,集中管理事件的發布和訂閱,支持高吞吐量的事件處理。
┌────────────────────────────────────────────────────────┐
│ NotifyCenter │
├────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ │
│ │ 事件發布管理 │ │ 訂閱者管理 │ │ 資源管理 │ │
│ └─────────────┘ └─────────────┘ └───────────┘ │
│ │
└────────────────────────────────────────────────────────┘▲ ▲ ▲│ │ │
┌─────────┴──────┐ ┌────────┴────────┐ ┌────┴─────┐
│ Publisher │ │ Subscriber │ │ Event │
└────────────────┘ └─────────────────┘ └──────────┘
二、核心設計特點
1. 雙類型事件機制
NotifyCenter 將事件分為兩類,并采用不同的處理策略:
- 普通事件:每種事件類型對應一個專用發布器,存儲在
publisherMap
中 - 慢速事件(SlowEvent):所有慢速事件共享一個發布器
sharePublisher
,避免阻塞普通事件處理
// 決定事件路由到哪個發布器
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);
} else {EventPublisher publisher = INSTANCE.publisherMap.get(topic);return publisher.publish(event);
}
2. 延遲加載與按需創建
發布器采用懶加載模式,僅在需要時才創建,節約系統資源:
// 發布器不存在時才創建
synchronized (NotifyCenter.class) {MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, ringBufferSize);
}
3. 可配置的緩沖區大小
通過系統屬性支持自定義緩沖區大小,適應不同場景的性能需求:
// 讀取系統配置設置緩沖區大小
String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
4. SPI 擴展機制
使用 Java SPI 機制支持自定義發布器實現,提高框架擴展性:
// 通過SPI加載自定義發布器實現
final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
5. 線程安全設計
使用并發工具和同步機制確保多線程環境下的正確性:
- 采用
ConcurrentHashMap
存儲發布器 - 使用
synchronized
塊保護關鍵操作 - 使用
AtomicBoolean
確保關閉操作僅執行一次
三、核心功能模塊
1. 訂閱者管理
支持注冊和注銷訂閱者,支持兩類訂閱者:
- 普通訂閱者(Subscriber):只訂閱一種事件類型
- 智能訂閱者(SmartSubscriber):可同時訂閱多種事件類型
public static void registerSubscriber(final Subscriber consumer) {// 針對SmartSubscriber,遍歷其所有訂閱類型if (consumer instanceof SmartSubscriber) {for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {// 分別注冊每種事件類型// ...}} else {// 針對普通Subscriber處理單一事件類型// ...}
}
2. 發布器管理
維護事件類型到發布器的映射,支持注冊、獲取和注銷發布器:
// 發布器容器
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);// 注冊發布器
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {// ...
}// 獲取發布器
public static EventPublisher getPublisher(Class<? extends Event> topic) {// ...
}// 注銷發布器
public static void deregisterPublisher(final Class<? extends Event> eventType) {// ...
}
3. 事件發布
提供統一的事件發布入口,根據事件類型路由到對應發布器:
public static boolean publishEvent(final Event event) {try {return publishEvent(event.getClass(), event);} catch (Throwable ex) {LOGGER.error("There was an exception to the message publishing : ", ex);return false;}
}
4. 資源管理
提供優雅的資源釋放機制,確保系統關閉時能正確清理資源:
public static void shutdown() {if (!CLOSED.compareAndSet(false, true)) {return;}// 關閉所有普通發布器for (EventPublisher publisher : publisherMap.values()) {publisher.shutdown();}// 關閉共享發布器sharePublisher.shutdown();
}
四、使用流程示例
1. 定義事件
// 普通事件
public class ConfigChangeEvent extends Event {// 事件內容
}// 慢速事件
public class ServiceChangeEvent extends SlowEvent {// 事件內容
}
2. 創建訂閱者
// 普通訂閱者
public class ConfigChangeListener implements Subscriber<ConfigChangeEvent> {@Overridepublic void onEvent(ConfigChangeEvent event) {// 處理事件}@Overridepublic Class<? extends Event> subscribeType() {return ConfigChangeEvent.class;}
}
3. 注冊訂閱者
NotifyCenter.registerSubscriber(new ConfigChangeListener());
4. 發布事件
ConfigChangeEvent event = new ConfigChangeEvent();
NotifyCenter.publishEvent(event);
五、設計優勢
- 高性能:通過區分快慢事件、使用環形緩沖區提高事件處理吞吐量
- 高可靠:完善的異常處理和資源管理,確保系統穩定性
- 良好擴展性:通過SPI機制支持定制發布器實現
- 靈活配置:可通過系統屬性調整性能參數
- 解耦設計:發布者和訂閱者完全解耦,提高系統模塊化程度
NotifyCenter 作為 Nacos 中的核心基礎設施,為整個系統提供了高效可靠的事件通知機制,是理解 Nacos 內部通信機制的關鍵組件。