大綱
1.關于Nacos配置中心的幾個問題
2.Nacos如何整合SpringBoot讀取遠程配置
3.Nacos加載讀取遠程配置數據的源碼分析
4.客戶端如何感知遠程配置數據的變更
5.集群架構下節點間如何同步配置數據
4.客戶端如何感知遠程配置數據的變更
(1)ConfigService對象使用介紹
(2)客戶端注冊監聽器的源碼
(3)回調監聽器的方法的源碼
(1)ConfigService對象使用介紹
ConfigService是一個接口,定義了獲取配置、發布配置、移除配置等方法。ConfigService只有一個實現類NacosConfigService,Nacos配置中心源碼的核心其實就是這個NacosConfigService對象。
步驟一:手動創建ConfigService對象
首先定義好基本的Nacos信息,然后利用NacosFactory工廠類來創建ConfigService對象。
public class Demo {public static void main(String[] args) throws Exception {//步驟一:配置信息String serverAddr = "124.223.102.236:8848";String dataId = "stock-service-test.yaml";String group = "DEFAULT_GROUP";Properties properties = new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);//步驟一:獲取配置中心服務ConfigService configService = NacosFactory.createConfigService(properties);}
}
步驟二:獲取配置、發布配置
創建好ConfigService對象后,就可以使用ConfigService對象的getConfig()方法來獲取配置信息,還可以使用ConfigService對象的publishConfig()方法來發布配置信息。
如下Demo先獲取一次配置數據,然后發布新配置,緊接著重新獲取數據。發現第二次獲取的配置數據已發生變化,從而也說明發布配置成功了。
public class Demo {public static void main(String[] args) throws Exception {//步驟一:配置信息String serverAddr = "124.223.102.236:8848";String dataId = "stock-service-test.yaml";String group = "DEFAULT_GROUP";Properties properties = new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);//步驟一:獲取配置中心服務ConfigService configService = NacosFactory.createConfigService(properties);//步驟二:從配置中心獲取配置String content = configService.getConfig(dataId, group, 5000);System.out.println("發布配置前" + content);//步驟二:發布配置configService.publishConfig(dataId, group, "userName: userName被修改了", ConfigType.PROPERTIES.getType());Thread.sleep(300L);//步驟二:從配置中心獲取配置content = configService.getConfig(dataId, group, 5000);System.out.println("發布配置后" + content);}
}
步驟三:添加監聽器
可以使用ConfigService對象的addListener()方法來添加監聽器。通過dataId + group這兩個參數,就可以注冊一個監聽器。當dataId + group對應的配置在服務端發生改變時,客戶端的監聽器就可以馬上感知并對配置數據進行刷新。
public class Demo {public static void main(String[] args) throws Exception {//步驟一:配置信息String serverAddr = "124.223.102.236:8848";String dataId = "stock-service-test.yaml";String group = "DEFAULT_GROUP";Properties properties = new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);//步驟一:獲取配置中心服務ConfigService configService = NacosFactory.createConfigService(properties);//步驟二:從配置中心獲取配置String content = configService.getConfig(dataId, group, 5000);System.out.println("發布配置前" + content);//步驟二:發布配置configService.publishConfig(dataId, group, "userName: userName被修改了", ConfigType.PROPERTIES.getType());Thread.sleep(300L);//步驟二:從配置中心獲取配置content = configService.getConfig(dataId, group, 5000);System.out.println("發布配置后" + content);//步驟三:注冊監聽器configService.addListener(dataId, group, new Listener() {@Overridepublic void receiveConfigInfo(String configInfo) {System.out.println("感知配置變化:" + configInfo);}@Overridepublic Executor getExecutor() {return null;}});//阻斷進程關閉Thread.sleep(Integer.MAX_VALUE);}
}
(2)客戶端注冊監聽器的源碼
Nacos客戶端是什么時候為dataId + group注冊監聽器的?
在nacos-config下的spring.factories文件中,有一個自動裝配的配置類NacosConfigAutoConfiguration,在該配置類中定義了一個NacosContextRefresher對象,而NacosContextRefresher對象會監聽ApplicationReadyEvent事件。
在NacosContextRefresher的onApplicationEvent()方法中,會執行registerNacosListenersForApplications()方法,這個方法中會遍歷每一個dataId + group注冊Nacos監聽器。
對于每一個dataId + group,則通過調用registerNacosListener()方法來進行Nacos監聽器的注冊,也就是最終調用ConfigService對象的addListener()方法來注冊監聽器。
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigAutoConfiguration {...@Beanpublic NacosContextRefresher nacosContextRefresher(NacosConfigManager nacosConfigManager, NacosRefreshHistory nacosRefreshHistory) {return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);}...
}public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {private final ConfigService configService;...@Overridepublic void onApplicationEvent(ApplicationReadyEvent event) {//many Spring contextif (this.ready.compareAndSet(false, true)) {this.registerNacosListenersForApplications();}}//register Nacos Listeners.private void registerNacosListenersForApplications() {if (isRefreshEnabled()) {//獲取全部的配置for (NacosPropertySource propertySource : NacosPropertySourceRepository.getAll()) {//判斷當前配置是否需要刷新if (!propertySource.isRefreshable()) {continue;}String dataId = propertySource.getDataId();//注冊監聽器registerNacosListener(propertySource.getGroup(), dataId);}}}private void registerNacosListener(final String groupKey, final String dataKey) {String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener = listenerMap.computeIfAbsent(key, lst -> new AbstractSharedListener() {@Overridepublic void innerReceive(String dataId, String group, String configInfo) {//監聽器的回調方法處理邏輯refreshCountIncrement();//記錄刷新歷史nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);//發布RefreshEvent刷新事件applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));if (log.isDebugEnabled()) {log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo));}}});try {//注冊監聽器configService.addListener(dataKey, groupKey, listener);} catch (NacosException e) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), e);}}...
}
(3)回調監聽器的方法的源碼
給每一個dataId + group注冊Nacos監聽器后,當Nacos服務端的配置文件發生變更時,就會回調監聽器的方法,也就是會觸發調用AbstractSharedListener的innerReceive()方法。然后調用applicationContext.publishEvent()發布RefreshEvent刷新事件,而發布的RefreshEvent刷新事件會被RefreshEventListener類來處理。
RefreshEventListener類不是Nacos中的類了,而是SpringCloud的類。它在處理刷新事件時,會銷毀被@RefreshScope注解修飾的類的Bean,也就是會調用添加了@RefreshScope注解的類的destroy()方法。把Bean實例銷毀后,后面需要用到這個Bean時才重新進行創建。重新進行創建的時候,就會獲取最新的配置文件,從而完成刷新效果。
(4)總結
客戶端注冊Nacos監聽器,服務端修改配置后,客戶端刷新配置的流程:
5.集群架構下節點間如何同步配置數據
(1)Nacos控制臺的配置管理模塊
(2)變更配置數據時的源碼
(3)集群節點間的配置數據變更同步
(4)服務端通知客戶端配置數據已變更
(5)總結
(1)Nacos控制臺的配置管理模塊
在這個模塊中,可以通過配置列表維護我們的配置文件,可以通過歷史版本找到配置的發布記錄,并且支持回滾操作。當編輯配置文件時,客戶端可以及時感知變化并刷新其配置文件。當服務端通知客戶端配置變更時,也會通知集群節點進行數據同步。
當用戶在Nacos控制臺點擊確認發布按鈕時,Nacos會大概進行如下處理:
一.修改配置文件數據
二.保存配置發布歷史
三.通知并觸發客戶端監聽事件進行配置文件變更
四.通知集群對配置文件進行變更
點擊確認發布按鈕時,會發起HTTP請求,地址為"/nacos/v1/cs/configs"。通過請求地址可知處理入口是ConfigController的publishConfig()方法。
(2)變更配置數據時的源碼
ConfigController的publishConfig()方法中的兩行核心代碼是:一.新增或修改配置數據的PersistService的insertOrUpdate()方法,二.發布配置變更事件的ConfigChangePublisher的notifyConfigChange()方法。
一.新增或者修改配置數據
其中PersistService有兩個實現類:一是EmbeddedStoragePersistServiceImpl,它是Nacos內置的Derby數據庫。二是ExternalStoragePersistServiceImpl,它是Nacos外置數據庫如MySQL。
在ExternalStoragePersistServiceImpl的insertOrUpdate()方法中,如果執行ExternalStoragePersistServiceImpl的updateConfigInfo()方法,那么會先查詢對應的配置,然后更新配置,最后保存配置歷史。
@RestController
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
public class ConfigController {private final PersistService persistService;...@PostMapping@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,@RequestParam(value = "appName", required = false) String appName,@RequestParam(value = "src_user", required = false) String srcUser,@RequestParam(value = "config_tags", required = false) String configTags,@RequestParam(value = "desc", required = false) String desc,@RequestParam(value = "use", required = false) String use,@RequestParam(value = "effect", required = false) String effect,@RequestParam(value = "type", required = false) String type,@RequestParam(value = "schema", required = false) String schema) throws NacosException {final String srcIp = RequestUtil.getRemoteIp(request);final String requestIpApp = RequestUtil.getAppName(request);srcUser = RequestUtil.getSrcUserName(request);//check typeif (!ConfigType.isValidType(type)) {type = ConfigType.getDefaultType().getType();}//check tenantParamUtils.checkTenant(tenant);ParamUtils.checkParam(dataId, group, "datumId", content);ParamUtils.checkParam(tag);Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);ParamUtils.checkParam(configAdvanceInfo);if (AggrWhitelist.isAggrDataId(dataId)) {LOGGER.warn("[aggr-conflict] {} attemp to publish single data, {}, {}", RequestUtil.getRemoteIp(request), dataId, group);throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");}final Timestamp time = TimeUtils.getCurrentTime();String betaIps = request.getHeader("betaIps");ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);configInfo.setType(type);if (StringUtils.isBlank(betaIps)) {if (StringUtils.isBlank(tag)) {//新增配置或者修改配置persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);//發布配置改變事件ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));} else {persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);//發布配置改變事件ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));}} else {//beta publishpersistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);//發布配置改變事件ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));}ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, content);return true;}...
}//External Storage Persist Service.
@SuppressWarnings(value = {"PMD.MethodReturnWrapperTypeRule", "checkstyle:linelength"})
@Conditional(value = ConditionOnExternalStorage.class)
@Component
public class ExternalStoragePersistServiceImpl implements PersistService {private DataSourceService dataSourceService;...@Overridepublic void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time, Map<String, Object> configAdvanceInfo, boolean notify) {try {addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify);} catch (DataIntegrityViolationException ive) { // Unique constraint conflictupdateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify);}}@Overridepublic void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser, final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {boolean result = tjt.execute(status -> {try {//查詢已存在的配置數據ConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());String appNameTmp = oldConfigInfo.getAppName();if (configInfo.getAppName() == null) {configInfo.setAppName(appNameTmp);}//更新配置數據updateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo);String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");if (configTags != null) {// delete all tags and then recreateremoveTagByIdAtomic(oldConfigInfo.getId());addConfigTagsRelation(oldConfigInfo.getId(), configTags, configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());}//保存到發布配置歷史表insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U");} catch (CannotGetJdbcConnectionException e) {LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);throw e;}return Boolean.TRUE;});}@Overridepublic ConfigInfo findConfigInfo(final String dataId, final String group, final String tenant) {final String tenantTmp = StringUtils.isBlank(tenant) ? StringUtils.EMPTY : tenant;try {return this.jt.queryForObject("SELECT ID,data_id,group_id,tenant_id,app_name,content,md5,type FROM config_info WHERE data_id=? AND group_id=? AND tenant_id=?", new Object[] {dataId, group, tenantTmp}, CONFIG_INFO_ROW_MAPPER);} catch (EmptyResultDataAccessException e) { // Indicates that the data does not exist, returns null.return null;} catch (CannotGetJdbcConnectionException e) {LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);throw e;}}...
}
二.發布配置變更事件
執行ConfigChangePublisher的notifyConfigChange()方法發布配置變更事件時,最終會把事件添加到DefaultPublisher.queue阻塞隊列中,完成事件發布。
NotifyCenter在其靜態方法中,會創建DefaultPublisher并進行初始化。在執行DefaultPublisher的init()方法時,就會開啟一個異步任務。該異步任務便會不斷從阻塞隊列DefaultPublisher.queue中獲取事件,然后調用DefaultPublisher的receiveEvent()方法處理配置變更事件。
在DefaultPublisher的receiveEvent()方法中,會循環遍歷事件訂閱者。其中就會包括來自客戶端,以及來自集群節點的兩個訂閱者。前者會通知客戶端發生了配置變更事件,后者會通知各集群節點發生了配置變更事件。而且進行事件通知時,都會調用DefaultPublisher的notifySubscriber()方法。該方法會異步執行訂閱者的監聽邏輯,也就是subscriber.onEvent()方法。
具體的subscriber訂閱者有:用來通知集群節點進行數據同步的訂閱者AsyncNotifyService,用來通知客戶端處理配置文件變更的訂閱者LongPollingService。
事件發布機制的實現簡單總結:發布者需要一個Set存放注冊的訂閱者,發布者發布事件時,需要遍歷調用訂閱者處理事件的方法。
public class ConfigChangePublisher {//Notify ConfigChange.public static void notifyConfigChange(ConfigDataChangeEvent event) {if (PropertyUtil.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {return;}NotifyCenter.publishEvent(event);}
}//Unified Event Notify Center.
public class NotifyCenter {static {...try {// Create and init DefaultSharePublisher instance.INSTANCE.sharePublisher = new DefaultSharePublisher();INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize); } catch (Throwable ex) {LOGGER.error("Service class newInstance has error : {}", ex);}ThreadUtils.addShutdownHook(new Runnable() {@Overridepublic void run() {shutdown();}});}//注冊訂閱者public static <T> void registerSubscriber(final Subscriber consumer) {...addSubscriber(consumer, subscribeType);}private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {...EventPublisher publisher = INSTANCE.publisherMap.get(topic);//執行DefaultPublisher.addSubscriber()方法publisher.addSubscriber(consumer);}...//Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is actually published.public static boolean publishEvent(final Event event) {try {return publishEvent(event.getClass(), event);} catch (Throwable ex) {LOGGER.error("There was an exception to the message publishing : {}", ex);return false;}}//Request publisher publish event Publishers load lazily, calling publisher.private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}final String topic = ClassUtils.getCanonicalName(eventType);EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {//執行DefaultPublisher.publish()方法return publisher.publish(event);}LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;}...
}//The default event publisher implementation.
public class DefaultPublisher extends Thread implements EventPublisher {protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();private BlockingQueue<Event> queue;...@Overridepublic void addSubscriber(Subscriber subscriber) {//注冊事件訂閱者subscribers.add(subscriber);}@Overridepublic boolean publish(Event event) {checkIsStart();//將事件添加到阻塞隊列,則表示已完成事件發布boolean success = this.queue.offer(event);if (!success) {LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);receiveEvent(event);return true;}return true;}@Overridepublic void init(Class<? extends Event> type, int bufferSize) {setDaemon(true);setName("nacos.publisher-" + type.getName());this.eventType = type;this.queueMaxSize = bufferSize;this.queue = new ArrayBlockingQueue<Event>(bufferSize);start();}@Overridepublic synchronized void start() {if (!initialized) {//執行線程的run()方法,start just called oncesuper.start();if (queueMaxSize == -1) {queueMaxSize = ringBufferSize;}initialized = true;}}@Overridepublic void run() {openEventHandler();}void openEventHandler() {try {//This variable is defined to resolve the problem which message overstock in the queue.int waitTimes = 60;//To ensure that messages are not lost, enable EventHandler when waiting for the first Subscriber to registerfor (; ;) {if (shutdown || hasSubscriber() || waitTimes <= 0) {break;}ThreadUtils.sleep(1000L);waitTimes--;}for (; ;) {if (shutdown) {break;}final Event event = queue.take();receiveEvent(event);UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));}} catch (Throwable ex) {LOGGER.error("Event listener exception : {}", ex);}}//Receive and notifySubscriber to process the event.void receiveEvent(Event event) {final long currentEventSequence = event.sequence();//循環遍歷事件的訂閱者for (Subscriber subscriber : subscribers) {// Whether to ignore expiration eventsif (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass());continue;}//通知事件訂閱者notifySubscriber(subscriber, event);}}@Overridepublic void notifySubscriber(final Subscriber subscriber, final Event event) {LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);final Runnable job = new Runnable() {@Overridepublic void run() {//異步執行訂閱者的監聽邏輯subscriber.onEvent(event);}};final Executor executor = subscriber.executor();if (executor != null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error("Event callback exception : {}", e);}}}...
}
(3)集群節點間的配置數據變更同步
核心處理方法便是AsyncNotifyService的onEvent()方法。該方法首先會獲取集群節點列表,然后遍歷集群列表構造通知任務NotifySingleTask,接著把通知任務NotifySingleTask添加到隊列queue當中,最后根據通知任務隊列queue封裝一個異步任務提交到線程池去處理,也就是異步任務AsyncTask的run()方法會處理通知任務NotifySingleTask。
在異步任務AsyncTask的run()方法中,會一直從queue中獲取通知任務,以便將配置數據同步到對應的集群節點。具體就是在while循環中,首先獲得通知任務中對應的集群節點的IP地址。然后判斷該集群節點的IP是否在當前節點的配置中,并且是否是健康狀態。如果該集群節點不健康,則放入隊列并將隊列提交給異步任務來延遲處理。如果該集群節點是健康狀態,則通過HTTP方式發起配置數據的同步,地址是"/v1/cs/communication/dataChange"。
@Service
public class AsyncNotifyService {...@Autowiredpublic AsyncNotifyService(ServerMemberManager memberManager) {this.memberManager = memberManager;//Register ConfigDataChangeEvent to NotifyCenter.NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);//Register A Subscriber to subscribe ConfigDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {//配置中心數據變更,同步其他集群節點數據if (event instanceof ConfigDataChangeEvent) {ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;long dumpTs = evt.lastModifiedTs;String dataId = evt.dataId;String group = evt.group;String tenant = evt.tenant;String tag = evt.tag;//獲取集群節點列表Collection<Member> ipList = memberManager.allMembers();Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();//遍歷集群列表構造通知任務NotifySingleTask去同步數據for (Member member : ipList) {//把通知任務NotifySingleTask添加到隊列queue當中queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta));}//根據通知任務隊列Queue<NotifySingleTask>,封裝一個異步任務AsyncTask,提交到線程池執行ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));}}@Overridepublic Class<? extends Event> subscribeType() {return ConfigDataChangeEvent.class;}});}...class AsyncTask implements Runnable {private Queue<NotifySingleTask> queue;private NacosAsyncRestTemplate restTemplate;public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue<NotifySingleTask> queue) {this.restTemplate = restTemplate;this.queue = queue;}@Overridepublic void run() {executeAsyncInvoke();}private void executeAsyncInvoke() {while (!queue.isEmpty()) {//一直從queue隊列中獲取通知任務,以便將配置數據同步到對應的集群節點NotifySingleTask task = queue.poll();//獲取通知任務中對應的集群節點的IP地址String targetIp = task.getTargetIP();if (memberManager.hasMember(targetIp)) {//start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify//判斷該集群節點的ip是否在當前節點的配置中,并且是否是健康狀態boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);if (unHealthNeedDelay) {//target ip is unhealthy, then put it in the notification list//如果該集群節點不健康,則放入另外一個隊列,同樣會將隊列提交給異步任務,然后延遲處理ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0, task.target);//get delay time and set fail count to the taskasyncTaskExecute(task);} else {//如果該集群節點是健康狀態,則通過HTTP方式發起配置數據的同步Header header = Header.newInstance();header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());if (task.isBeta) {header.addParam("isBeta", "true");}AuthHeaderUtil.addIdentityToHeader(header);//通過HTTP方式發起配置數據的同步,請求的HTTP地址:/v1/cs/communication/dataChangerestTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));}}}}}private void asyncTaskExecute(NotifySingleTask task) {int delay = getDelayTime(task);Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();queue.add(task);AsyncTask asyncTask = new AsyncTask(nacosAsyncRestTemplate, queue);//提交異步任務給線程池延遲執行ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);}
}
當集群節點處理"/v1/cs/communication/dataChange"這個HTTP請求時,會調用CommunicationController的notifyConfigInfo()方法,接著調用DumpService的dump()方法將請求包裝成DumpTask同步數據任務,然后調用TaskManager的addTask()方法將DumpTask同步數據任務放入map。
TaskManager的父類NacosDelayTaskExecuteEngine在初始化時,會開啟一個異步任務執行ProcessRunnable的run()方法,也就是會不斷從map中取出DumpTask同步數據任務,然后調用DumpProcessor的process()方法處理具體的配置數據同步邏輯。也就是查詢數據庫最新的配置,然后持久化配置數據到磁盤上,從而完成集群之間配置數據的同步。
@RestController
@RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH)
public class CommunicationController {private final DumpService dumpService;...@GetMapping("/dataChange")public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,@RequestParam("group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "tag", required = false) String tag) {dataId = dataId.trim();group = group.trim();String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);String isBetaStr = request.getHeader("isBeta");if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);} else {dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);}return true;}...
}public abstract class DumpService {private TaskManager dumpTaskMgr;public DumpService(PersistService persistService, ServerMemberManager memberManager) {...this.processor = new DumpProcessor(this);this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");this.dumpTaskMgr.setDefaultTaskProcessor(processor);...}...public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {String groupKey = GroupKey2.getKey(dataId, group, tenant);dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));}...
}public final class TaskManager extends NacosDelayTaskExecuteEngine implements TaskManagerMBean {...@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {super.addTask(key, newTask);MetricsMonitor.getDumpTaskMonitor().set(tasks.size());}...
}public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任務池public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));//開啟延時任務processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}...@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {lock.lock();try {AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {newTask.merge(existTask);}//最后放入到ConcurrentHashMap中tasks.put(key, newTask);} finally {lock.unlock();}}...private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}@Overridepublic Collection<Object> getAllTaskKeys() {Collection<Object> keys = new HashSet<Object>();lock.lock();try {keys.addAll(tasks.keySet());} finally {lock.unlock();}return keys;}protected void processTasks() {//獲取tasks中所有的任務,然后進行遍歷Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {//通過任務key,獲取具體的任務,并且從任務池中移除掉AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}//DumpService在初始化時會設置TaskManager的默認processor是DumpProcessor//根據taskKey獲取NacosTaskProcessor延遲任務處理器:DumpProcessorNacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {//ReAdd task if process failed//調用DumpProcessor.process()方法if (!processor.process(task)) {//如果失敗了,會重試添加task回tasks這個map中retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error : " + e.toString(), e);retryFailedTask(taskKey, task);}}}
}
(4)服務端通知客戶端配置數據已變更
服務端通知客戶端配置文件變更的方法是LongPollingService.onEvent()。
由前面客戶端如何感知遠程配置數據的變更可知,Nacos客戶端啟動時:會調用ConfigService的addListener()方法為每個dataId + group添加一個監聽器。而NacosConfigService初始化時會創建ClientWorker對象,此時會開啟多個長連接任務即執行LongPollingRunnable的run()方法。
執行LongPollingRunnable的run()方法時,會觸發執行ClientWorker的checkUpdateDataIds()方法,該方法最后會調用服務端的"/v1/cs/configs/listener"接口,將當前客戶端添加到LongPollingService的allSubs屬性中。
這樣當以后dataId + group的配置發生變更時,服務端會觸發執行LongPollingService的onEvent()方法,然后遍歷LongPollingService.allSubs屬性通知客戶端配置已變更。
客戶端收到變更事件通知后,會將最新的配置刷新到容器中,同時將@RefreshScope注解修飾的Bean從緩存中刪除。這樣再次訪問這些Bean,就會重新創建Bean,從而讀取到最新的配置。
public class NacosConfigService implements ConfigService {//long polling.private final ClientWorker worker;...public NacosConfigService(Properties properties) throws NacosException {...this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);}...
}//Long polling.
public class ClientWorker implements Closeable {public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {...this.executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {checkConfigInfo();} catch (Throwable e) {LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);}}}, 1L, 10L, TimeUnit.MILLISECONDS);}//Check config info.public void checkConfigInfo() {//Dispatch taskes.int listenerSize = cacheMap.size();//Round up the longingTaskCount.int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {//The task list is no order.So it maybe has issues when changing.//執行長連接任務:LongPollingRunnable.run()executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}}...class LongPollingRunnable implements Runnable {...@Overridepublic void run() {...//check server configList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);...}}//Fetch the dataId list from server.List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {...return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);}//Fetch the updated dataId list from server.List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {...try {//In order to prevent the server from handling the delay of the client's long task, increase the client's read timeout to avoid this problem.long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);//發起HTTP請求:/v1/cs/configs/listener,將客戶端添加到LongPollingService.allSubs屬性中HttpRestResult<String> result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), readTimeoutMs);...} catch (Exception e) {...}return Collections.emptyList();}
}@RestController
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
public class ConfigController {private final ConfigServletInner inner;...@PostMapping("/listener")@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {...//do long-pollinginner.doPollingConfig(request, response, clientMd5Map, probeModify.length());}...
}@Service
public class ConfigServletInner {...//輪詢接口.public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {//Long polling.if (LongPollingService.isSupportLongPolling(request)) {longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);return HttpServletResponse.SC_OK + "";}...}...
}@Service
public class LongPollingService {//客戶端長輪詢訂閱者final Queue<ClientLongPolling> allSubs;...public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) {...//添加訂閱者ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}class ClientLongPolling implements Runnable {@Overridepublic void run() {...allSubs.add(this);}...}...public LongPollingService() {allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();...NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if (isFixedPolling()) {// Ignore.} else {if (event instanceof LocalDataChangeEvent) {LocalDataChangeEvent evt = (LocalDataChangeEvent) event;//觸發執行DataChangeTask.run()方法ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));}}}...});}class DataChangeTask implements Runnable {@Overridepublic void run() {try {ConfigCacheService.getContentBetaMd5(groupKey);//遍歷訂閱了配置變更事件的客戶端for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {ClientLongPolling clientSub = iter.next();if (clientSub.clientMd5Map.containsKey(groupKey)) {...getRetainIps().put(clientSub.ip, System.currentTimeMillis());iter.remove(); // Delete subscribers' relationships.//發送服務端數據變更的響應給客戶端clientSub.sendResponse(Arrays.asList(groupKey));}}} catch (Throwable t) {LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));}}}...
}
(5)總結
一.配置中心數據變更同步集群節點的整體邏輯
當在Nacos后臺變更配置數據后:首先自身節點會把最新的配置數據更新到數據庫中,并且添加變更歷史。然后利用事件發布訂閱機制來通知訂閱者,其中訂閱者AsyncNotifyService會通過HTTP方式來通知其他集群節點。當其他集群節點收到通知后,會重新查詢數據庫最新的配置數據。然后持久化到磁盤上,因為獲取配置數據的接口是直接讀磁盤文件的。集群節點的配置數據同步完成后,還要通知客戶端配置數據已變更。
二.服務端通知客戶端配置數據已變更
在客戶端給dataId + group添加監聽器后,會和服務端建立一個長輪詢,所以另外一個訂閱者LongPollingService會通過長輪詢通知客戶端。也就是會遍歷每一個客戶端,通過長輪詢向客戶端進行響應。最終會調用到客戶端監聽器的回調方法,從而去刷新客戶端的配置Bean。