大綱
7.服務端對服務實例進行健康檢查
8.服務下線如何注銷注冊表和客戶端等信息
9.事件驅動架構源碼分析
一.處理ClientChangedEvent事件
也就是同步數據到集群節點:
public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {...@Overridepublic void onEvent(Event event) {...if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();//Only ephemeral data sync by Distro, persist client should sync by raft.//臨時實例使用Distro協議,持久化實例使用Raft協議//ClientManager.isResponsibleClient()方法,判斷只有該client的責任節點才能進行集群數據同步if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {//如果event是客戶端注銷實例時需要進行集群節點同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {//如果event是客戶端注冊實例時需要進行集群節點同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}...
}@Component
public class DistroProtocol {private final ServerMemberManager memberManager;private final DistroTaskEngineHolder distroTaskEngineHolder;...//Start to sync by configured delay.public void sync(DistroKey distroKey, DataOperation action) {sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());}//Start to sync data to all remote server.public void sync(DistroKey distroKey, DataOperation action, long delay) {//遍歷集群中除自身節點外的其他節點for (Member each : memberManager.allMembersWithoutSelf()) {syncToTarget(distroKey, action, each.getAddress(), delay);}}//Start to sync to target server.public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {//先把要同步的集群節點targetServer包裝成DistroKey對象DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer);//然后根據DistroKey對象創建DistroDelayTask任務DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);//接著調用NacosDelayTaskExecuteEngine.addTask()方法//往延遲任務執行引擎DistroDelayTaskExecuteEngine中添加延遲任務DistroDelayTaskdistroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);}}...
}
二.處理ClientDeregisterServiceEvent事件
也就是移除注冊表 + 訂閱表的服務實例:
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注冊表(服務提供者),一個Service服務對象,對應多個服務實例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();//訂閱者列表(服務消費者),一個Service服務對象,對應多個訂閱者的clientIdprivate final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();...@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientOperation(ClientOperationEvent event) {Service service = event.getService();String clientId = event.getClientId();if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {//處理客戶端注冊事件ClientRegisterServiceEventaddPublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {//處理客戶端注銷事件ClientDeregisterServiceEventremovePublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {//處理客戶端訂閱服務事件ClientSubscribeServiceEventaddSubscriberIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {//處理客戶端取消訂閱事件ClientUnsubscribeServiceEventremoveSubscriberIndexes(service, clientId);}}private void removePublisherIndexes(Service service, String clientId) {if (!publisherIndexes.containsKey(service)) {return;}//移除注冊表中的服務實例publisherIndexes.get(service).remove(clientId);//發布服務改變事件ServiceChangedEventNotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}...
}
三.處理ServiceChangeEvent事件
也就是通知訂閱了該服務的客戶端:
@org.springframework.stereotype.Service
public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements NamingSubscriberService {...@Overridepublic void onEvent(Event event) {if (!upgradeJudgement.isUseGrpcFeatures()) {return;}if (event instanceof ServiceEvent.ServiceChangedEvent) {//If service changed, push to all subscribers.//如果服務變動,會向Service服務的所有訂閱者推送Service服務的實例信息,讓訂閱者(客戶端)更新本地緩存ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;Service service = serviceChangedEvent.getService();//調用NacosDelayTaskExecuteEngine.addTask()方法,往延遲任務執行引擎添加任務delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {//If service is subscribed by one client, only push this client.//如果Service服務被一個客戶端訂閱,則只推送Service服務的實例信息給該客戶端ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;Service service = subscribedEvent.getService();//調用NacosDelayTaskExecuteEngine.addTask()方法,往延遲任務執行引擎添加任務delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId()));}}...
}
(3)總結
9.事件驅動架構源碼分析
(1)如何使用Nacos的事件發布
(2)Nacos通知中心的事件發布源碼
(3)Nacos通知中心注冊訂閱者的源碼
Nacos 2.x大量使用了事件發布的動作,比如客戶端注冊服務實例、客戶端下線服務實例、服務改變、服務訂閱等。
(1)如何使用Nacos的事件發布
一.首先自定義一個事件
下面定義了一個名為TestEvent的事件,繼承自Nacos的Event類。
import com.alibaba.nacos.common.notify.Event;public class TestEvent extends Event {}
二.然后定義一個訂閱者
有了事件之后,還需要一個訂閱者,這樣發布的事件才能被這個訂閱者進行處理。
自定義的訂閱者需要繼承Nacos的SmartSubscriber抽象類,自定義的訂閱者需要實現三個方法。
方法一:構造方法
需要將自定義的訂閱者注冊到Nacos的通知中心NotifyCenter里,這樣NotifyCenter在發布自定義事件時,才能讓自定義的訂閱者進行響應。
方法二:subscribeTypes()方法
實現該方法時,需要把自定義的事件添加到方法的返回結果中,所以可以通過該方法獲取自定義訂閱者監聽了哪些事件。
方法三:onEvent()方法
Nacos的通知中心NotifyCenter在發布自定義事件時,便會調用該方法,所以該方法中需要實現自定義訂閱者對自定義事件的處理。
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.SmartSubscriber;
import org.springframework.stereotype.Component;
import java.util.LinkedList;
import java.util.List;//自定義的訂閱者需要繼承Nacos的SmartSubscriber抽象類
@Component
public class TestSubscriber extends SmartSubscriber {//構造方法中需要將自定義的訂閱者TestSubscriber注冊到Nacos的通知中心NotifyCenterpublic TestSubscriber() {NotifyCenter.registerSubscriber(this);}//實現subscribeTypes()方法時,把自定義的事件TestEvent添加進去返回@Overridepublic List<Class<? extends Event>> subscribeTypes() {List<Class<? extends Event>> result = new LinkedList<>();result.add(TestEvent.class);return result;}//實現onEvent()方法//當Nacos的通知中心NotifyCenter發布一個TestEvent事件時,就會響應該方法處理訂閱者的邏輯@Overridepublic void onEvent(Event event) {System.out.println("TestSubscriber onEvent");}
}
三.最后通過Nacos的通知中心NotifyCenter發布自定義事件
這樣便完成了自定義事件、自定義訂閱者通過Nacos實現發布訂閱功能。
@RestController
@RequestMapping("/sub/")
public class SubscriberController {@GetMapping("/test")public void test() {NotifyCenter.publishEvent(new TestEvent()); }
}
(2)Nacos通知中心的事件發布源碼
通知中心NotifyCenter執行publishEvent()方法發布事件時,比如會調用DefaultPublisher的publish()方法來發布事件。
DefaultPublisher的publish()方法會先把事件放入到一個阻塞隊列queue中,而在DefaultPublisher創建時會啟動一個線程從阻塞隊列取出事件來處理。處理時就會調用到DefaultPublisher的receiveEvent()方法通知事件訂閱者,也就是執行DefaultPublisher的notifySubscriber()方法通知事件訂閱者。
在DefaultPublisher的notifySubscriber()方法中,首先會創建一個調用訂閱者的onEvent()方法的任務,然后如果訂閱者有線程池,則將任務提交給訂閱者的線程池去執行。如果訂閱者沒有線程池,則直接執行該任務。
可見事件的發布也使用了阻塞隊列 + 異步任務,來實現對訂閱者的通知。
public class NotifyCenter {private static final NotifyCenter INSTANCE = new NotifyCenter();//key是事件Class的canonicalName,value是EventPublisher對象,一個事件對應一個EventPublisher對象//在EventPublisher對象中就會包含訂閱了該事件的所有訂閱者//EventPublisher的實現類有DefaultPublisher、NamingEventPublisherprivate final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);...//Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is actually published.public static boolean publishEvent(final Event event) {try {return publishEvent(event.getClass(), event);} catch (Throwable ex) {LOGGER.error("There was an exception to the message publishing : ", ex);return false;}}//Request publisher publish event Publishers load lazily, calling publisher.private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}//獲取發布的事件的Class的canonicalNamefinal String topic = ClassUtils.getCanonicalName(eventType);//根據發布事件類型獲取EventPublisher對象,該對象中會包含所發布事件的所有訂閱者信息EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {//比如調用DefaultPublisher.publish()方法發布事件return publisher.publish(event);}LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;}...
}//The default event publisher implementation.
//一個事件只會對應一個DefaultPublisher
public class DefaultPublisher extends Thread implements EventPublisher {private Class<? extends Event> eventType;//阻塞隊列存放待發布的事件private BlockingQueue<Event> queue;//Class為eventType的事件的所有訂閱者protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<>();@Overridepublic void init(Class<? extends Event> type, int bufferSize) {...start();}@Overridepublic synchronized void start() {if (!initialized) {super.start();...}}@Overridepublic void run() {openEventHandler();}void openEventHandler() {try {...for (; ;) {...//從阻塞隊列取數據final Event event = queue.take();//處理事件receiveEvent(event);...}} catch (Throwable ex) {LOGGER.error("Event listener exception : ", ex);}}...@Overridepublic boolean publish(Event event) {checkIsStart();//把事件放入到了一個阻塞隊列queue中,由DefaultPublisher創建時啟動的線程來處理boolean success = this.queue.offer(event);if (!success) {//如果事件放入阻塞隊列失敗,則直接處理LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);//通知事件的訂閱者去進行事件處理receiveEvent(event);return true;}return true;}//通知事件的訂閱者去進行事件處理void receiveEvent(Event event) {...//遍歷當前事件的訂閱者,對訂閱者執行notifySubscriber()方法,實際上就是執行訂閱者的onEvent()方法for (Subscriber subscriber : subscribers) {...//觸發執行訂閱者的onEvent()方法,實現對訂閱者的通知notifySubscriber(subscriber, event);}}@Overridepublic void notifySubscriber(final Subscriber subscriber, final Event event) {//創建一個任務,該任務會調用訂閱者的onEvent方法final Runnable job = () -> subscriber.onEvent(event);final Executor executor = subscriber.executor();if (executor != null) {//將任務提交給訂閱者的線程池去執行executor.execute(job);} else {try {//如果訂閱者沒有線程池,則直接執行該任務job.run();} catch (Throwable e) {LOGGER.error("Event callback exception: ", e);}}}...
}
(3)Nacos通知中心注冊訂閱者的源碼
在執行NotifyCenter的registerSubscriber()方法注冊訂閱者時,會調用訂閱者實現的subscribeTypes()方法獲取訂閱者要監聽的所有事件,然后遍歷這些事件并調用NotifyCenter的addSubscriber()方法。
執行NotifyCenter的addSubscriber()方法時會為這些事件添加訂閱者。由于每個事件都會對應一個EventPublisher對象,所以會先從NotifyCenter.publisherMap中獲取EventPublisher對象,然后調用EventPublisher的addSubscriber()方法向EventPublisher添加訂閱者,從而完成向通知中心注冊訂閱者。
public class NotifyCenter {private static final NotifyCenter INSTANCE = new NotifyCenter();//key是事件Class的canonicalName,value是EventPublisher對象,一個事件對應一個EventPublisher對象//在EventPublisher對象中就會包含訂閱了該事件的所有訂閱者//EventPublisher的實現類有DefaultPublisher、NamingEventPublisherprivate final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);...public static void registerSubscriber(final Subscriber consumer) {//注冊訂閱者registerSubscriber(consumer, DEFAULT_PUBLISHER_FACTORY);}public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {if (consumer instanceof SmartSubscriber) {//調用subscribeTypes()方法獲取訂閱者consumer需要監聽的事件,然后對這些事件進行遍歷for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {//For case, producer: defaultSharePublisher -> consumer: smartSubscriber.if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {//添加訂閱者INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);} else {//For case, producer: defaultPublisher -> consumer: subscriber.//添加訂閱者addSubscriber(consumer, subscribeType, factory);}}return;}final Class<? extends Event> subscribeType = consumer.subscribeType();if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);return;}addSubscriber(consumer, subscribeType, factory);}//Add a subscriber to publisher.private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType, EventPublisherFactory factory) {//獲取訂閱的事件的Class的canonicalNamefinal String topic = ClassUtils.getCanonicalName(subscribeType);synchronized (NotifyCenter.class) {//MapUtils.computeIfAbsent is a unsafe method.//創建EventPublisher對象,一個事件會對應一個EventPublisher對象MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);}//獲取事件對應的EventPublisher對象,比如DefaultPublisher對象EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher instanceof ShardedEventPublisher) {((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);} else {//往EventPublisher對象添加訂閱者信息,比如調用DefaultPublisher.addSubscriber()方法publisher.addSubscriber(consumer);}}...
}//一個事件只會對應一個DefaultPublisher
public class DefaultPublisher extends Thread implements EventPublisher {private Class<? extends Event> eventType;//Class為eventType的事件的所有訂閱者protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<>();...@Overridepublic void addSubscriber(Subscriber subscriber) {//添加訂閱者subscribers.add(subscriber);}...
}