文章目錄
- 前言
- 正文
- 實現架構
- 實現模型
- OAP 同步 Apollo
- ConfigWatcherRegister
- ConfigChangeWatcher
- Agent 側
前言
本文代碼 OAP 基于 v9.7,Java Agent 基于 v9.1,配置中心使用 apollo。
看本文需要配合代碼“食用”。
正文
Skywalking 中就使用這種模型實現了 Agent 同步Apollo 配置,本文介紹下提供的功能以及代碼實現,一起學習下。
Skywalking 支持 agent 動態更新配置,使 agent 可以依據業務需求進行自定義配置;更重要的是建立起這一個通信機制,那么 agent 的可管理性、擴展性都大大提升。
目前 Skywalking 提供了以下配置項
按照文檔描述,主要為以下內容:
-
控制采樣速率
-
忽略指定后綴的請求,注意必須是 first span 的 opretationName 匹配到
針對 web 服務,有些靜態資源是放在服務端,那么可以過濾掉這些請求
-
忽略某些 path 的 trace
-
限定每個 segment 中的 span 最大數量
-
是否收集執行 sql 的參數
樣例配置
configurations:serviceA:trace.sample_n_per_3_secs: 1000trace.ignore_path: /a/b/c,/a1/b1/c1serviceB:trace.sample_n_per_3_secs: 1000trace.ignore_path: /a/b/c,/a1/b1/c1
注意:這個是按照服務來進行逐項配置,如果不需要變動,不要添加對應 key,會使用默認值。
實現架構
-
OAP 同步 Apollo 配置
-
Agent 同步 OAP 配置。
每階段的操作無關聯,都是作為 Client 的一端發起的請求來同步數據。
實現模型
配置動態變更實際上是一個訂閱發布模型,簡單描述就是有發布者和訂閱者兩種角色,之間交互一般是:有一個注冊接口,方便訂閱者注冊自身,以及發布者可以獲取到訂閱者列表;一個通知接口,方便發布者發送消息給訂閱者。
例如需要訂水,只要給訂水公司留下自己的電話、地址及數量(發布者知道如何找到你),之后就有人送水上門(有水時進行派送)。
這種模型理解起來很簡單,實現上難度也不大,且使用場景很廣泛。
OAP 同步 Apollo
首先看下 OAP 是如何同步 apollo 數據。
ConfigWatcherRegister
這是一個抽象類,代表的是配置中心的角色,實現上有 apollo、nacos、zk 等方式。
先看下 notifySingleValue 方法:
protected void notifySingleValue(final ConfigChangeWatcher watcher, ConfigTable.ConfigItem configItem) {String newItemValue = configItem.getValue();if (newItemValue == null) {if (watcher.value() != null) {// Notify watcher, the new value is null with delete event type.// 調用 watcher 的 notify 進行處理 watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE));} else {// Don't need to notify, stay in null.}} else {if (!newItemValue.equals(watcher.value())) {watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(newItemValue,ConfigChangeWatcher.EventType.MODIFY));} else {// Don't need to notify, stay in the same config value.}}
}
該方法的邏輯是:讀取 configItem 中的值,并且與 watcher 中的值進行比較,不相等之后判定是 DELETE、還是 UPDATE 操作,并封裝成一個 ConfigChangeEvent 發送給 ConfigChangeWatcher,那么可以看出 ConfigChangeWatcher 是個訂閱者的角色。
繼續看下調用 notifySingleValue 方法的地方:
FetchingConfigWatcherRegister#singleConfigsSync
private final Register singleConfigChangeWatcherRegister = new Register();public abstract Optional<ConfigTable> readConfig(Set<String> keys);private void singleConfigsSync() {// 1. 讀取配置數據Optional<ConfigTable> configTable = readConfig(singleConfigChangeWatcherRegister.keys());// Config table would be null if no change detected from the implementation.configTable.ifPresent(config -> {config.getItems().forEach(item -> {// 2. 遍歷獲取配置中的 itemNameString itemName = item.getName();// 3. 依據 itemName 找到 WatcherHolderWatcherHolder holder = singleConfigChangeWatcherRegister.get(itemName);if (holder == null) {return;}ConfigChangeWatcher watcher = holder.getWatcher();// 從 WatcherHolder 得到 ConfigChangeWatcher,發送通知notifySingleValue(watcher, item);});});
}
該方法執行的邏輯就是:
- 依據 singleConfigChangeWatcherRegister.keys() 作為參數讀取配置信息
- 遍歷配置信息,依據配置中的 name(即 itemName)找到 WatcherHolder,進而獲取 ConfigChangeWatcher
- 調用 notifySingleValue。
readConfig 是個抽象方法,由具體的配置中心插件實現,本例中使用的 apollo,具體實現就是 ApolloConfigWatcherRegister。
讀取到的內容類型 ConfigTable,并且可以知道是存儲的 k-v 集合,那么 ConfigItem 就是每個配置項,itemName 就是 apollo 中配置的 key。
再看看調用 singleConfigsSync 的邏輯:
// FetchingConfigWatcherRegister.javapublic void start() {isStarted = true;Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new RunnableWithExceptionProtection(this::configSync, // 啟動定時任務來執行t -> log.error("Sync config center error.", t)), 0, syncPeriod, TimeUnit.SECONDS);
}void configSync() {singleConfigsSync();groupConfigsSync();
}
再回到 singleConfigsSync 中,讀取配置時需要先獲取到配置項的 key 的集合:singleConfigChangeWatcherRegister.keys()
先看下 singleConfigChangeWatcherRegister 的具體實現:FetchingConfigWatcherRegister$Register 內部就是一個 Map<String, WatcherHolder> 來存儲。
static class Register {private Map<String, WatcherHolder> register = new HashMap<>();private boolean containsKey(String key) {return register.containsKey(key);}private void put(String key, WatcherHolder holder) {register.put(key, holder);}public WatcherHolder get(String name) {return register.get(name);}public Set<String> keys() {return register.keySet();}
}
有讀取就有存儲,看看調用 put 邏輯:
// FetchingConfigWatcherRegister
synchronized public void registerConfigChangeWatcher(ConfigChangeWatcher watcher) {WatcherHolder holder = new WatcherHolder(watcher);if (singleConfigChangeWatcherRegister.containsKey(holder.getKey()) || groupConfigChangeWatcherRegister.containsKey(holder.getKey())) {}switch (holder.getWatcher().getWatchType()) {case SINGLE:// put 調用singleConfigChangeWatcherRegister.put(holder.getKey(), holder);break;case GROUP:groupConfigChangeWatcherRegister.put(holder.getKey(), holder);break;default:}
}
registerConfigChangeWatcher 方法,用于注冊 ConfigChangeWatcher ,內部處理邏輯:先將 watcher 放入 watchHolder 中,再以 holder key 分開存儲 holder(放入 FetchingConfigWatcherRegister$Register 中)。
WatcherHolder 是 ConfigWatcherRegister 一個內部類,代碼如下,重點是 key 生成規則:String.join(".", watcher.getModule(), watcher.getProvider().name(), watcher.getItemName());
,每個 itemName 對應一個 watcher。
@Getter
protected static class WatcherHolder {private ConfigChangeWatcher watcher;private final String key;public WatcherHolder(ConfigChangeWatcher watcher) {this.watcher = watcher;this.key = String.join(".", watcher.getModule(), watcher.getProvider().name(),watcher.getItemName());}
}
總結:OAP 啟動定時任務,同步 apollo 的配置數據,遍歷每個配置項(configItem),找到對應的 ConfigChangerWater,將 watcher 中的值與 configItem 中的值進行比較,不相等之后繼續判定是 DELETE、還是 UPDATE 操作,封裝成一個 ConfigChangeEvent 發送給對應的 ConfigChangeWatcher。
ConfigChangeWatcher
抽象類,依據命名,表示的是關注配置變化的 watcher,是 OAP 中定義的用于對不同配置的具體實現;對于 Apollo 上的每個 Key 都有對應的 ConfigChangeWatcher。
具體的 ConfigChangeWatcher 獲取到 ConfigChangeEvent,處理邏輯各有不同,本次具體看下 AgentConfigurationsWatcher。
private volatile String settingsString;private volatile AgentConfigurationsTable agentConfigurationsTable;public void notify(ConfigChangeEvent value) {if (value.getEventType().equals(EventType.DELETE)) {settingsString = null;this.agentConfigurationsTable = new AgentConfigurationsTable();} else {settingsString = value.getNewValue();AgentConfigurationsReader agentConfigurationsReader =new AgentConfigurationsReader(new StringReader(value.getNewValue()));this.agentConfigurationsTable = agentConfigurationsReader.readAgentConfigurationsTable();}
}
方法邏輯為:config value 存儲到了 agentConfigurationsTable。
apollo value 是什么樣子呢?
configurations:serviceA:trace.sample_n_per_3_secs: 1000trace.ignore_path: /a/b/c,/a1/b1/c1serviceB:trace.sample_n_per_3_secs: 1000trace.ignore_path: /a/b/c,/a1/b1/c1
AgentConfigurationsTable 如下具體實現
public class AgentConfigurationsTable {private Map<String, AgentConfigurations> agentConfigurationsCache;public AgentConfigurationsTable() {this.agentConfigurationsCache = new HashMap<>();}
}public class AgentConfigurations {private String service;private Map<String, String> configuration;/*** The uuid is calculated by the dynamic configuration of the service.*/private volatile String uuid;public AgentConfigurations(final String service, final Map<String, String> configuration, final String uuid) {this.service = service;this.configuration = configuration;this.uuid = uuid;}
}
將 agentConfigurationsTable 轉換成 json 展示更容里理解數據存儲的結構:
{"serviceB": {"service": "serviceB","configuration": {"trace.sample_n_per_3_secs": "1000","trace.ignore_path": "/a/b/c,/a1/b1/c1"},"uuid": "92670f1ccbdee60e14ffc0"},"serviceA": {"service": "serviceA","configuration": {"trace.sample_n_per_3_secs": "1000","trace.ignore_path": "/a/b/c,/a1/b1/c1"},"uuid": "92670f1ccbdee60e14ffc0"}
}
查看讀取 agentConfigurationsTable 值的邏輯:
// AgentConfigurationsWatcher#getAgentConfigurations
public AgentConfigurations getAgentConfigurations(String service) {// 依據 service 獲取數據AgentConfigurations agentConfigurations = this.agentConfigurationsTable.getAgentConfigurationsCache().get(service);if (null == agentConfigurations) {return emptyAgentConfigurations;} else {return agentConfigurations;}
}
繼續查看調用 getAgentConfigurations 的代碼,并且將 value 包裝成 ConfigurationDiscoveryCommand 返回。
// ConfigurationDiscoveryServiceHandler#fetchConfigurations
public void fetchConfigurations(final ConfigurationSyncRequest request,final StreamObserver<Commands> responseObserver) {Commands.Builder commandsBuilder = Commands.newBuilder();AgentConfigurations agentConfigurations = agentConfigurationsWatcher.getAgentConfigurations(request.getService());if (null != agentConfigurations) {// 請求時會帶有 uuid,會跟現有配置的 uuid 進行比對,如果不同,則獲取最新值 if (disableMessageDigest || !Objects.equals(agentConfigurations.getUuid(), request.getUuid())) {ConfigurationDiscoveryCommand configurationDiscoveryCommand =newAgentDynamicConfigCommand(agentConfigurations);commandsBuilder.addCommands(configurationDiscoveryCommand.serialize().build());}}responseObserver.onNext(commandsBuilder.build());responseObserver.onCompleted();
}
ConfigurationDiscoveryServiceHandler 屬于 GRPCHandler,類似 SpringBoot 中 Controller,暴露接口,外部就可以獲取數據。
ConfigurationDiscoveryCommand 這個方法中有個屬性來標識 command 的具體類型,這個在 agent 端接收到 command 需要依據 command 類型找到真正的處理器。
public static final String NAME = "ConfigurationDiscoveryCommand";
總結:當 AgentConfigurationsWatcher 收到訂閱的 ConfigChangeEvent 時,會將值存儲至 AgentConfigurationsTable,之后通過 ConfigurationDiscoveryServiceHandler 暴露接口,以方便 agent 可以獲取到相應服務的配置。
至此,OAP 與 Apollo 間的配置更新邏輯以及值的處理邏輯大致理清了。
接下來看看 agent 與 oap 間的交互。
Agent 側
找到調用 ConfigurationDiscoveryServiceGrpc#fetchConfigurations 的代碼,看到 ConfigurationDiscoveryService,查看具體調用邏輯:
// ConfigurationDiscoveryService
private void getAgentDynamicConfig() {if (GRPCChannelStatus.CONNECTED.equals(status)) {try {// 準備參數ConfigurationSyncRequest.Builder builder = ConfigurationSyncRequest.newBuilder();builder.setService(Config.Agent.SERVICE_NAME);if (configurationDiscoveryServiceBlockingStub != null) {final Commands commands = configurationDiscoveryServiceBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).fetchConfigurations(builder.build()); // 方法調用// 結果處理ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);}} catch (Throwable t) {}}
}
而 getAgentDynamicConfig 是在 ConfigurationDiscoveryService#boot 執行時 init 了一個定時任務調用。
public void boot() throws Throwable {getDynamicConfigurationFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ConfigurationDiscoveryService")).scheduleAtFixedRate(new RunnableWithExceptionProtection(this::getAgentDynamicConfig,t -> LOGGER.error("Sync config from OAP error.", t)),Config.Collector.GET_AGENT_DYNAMIC_CONFIG_INTERVAL,Config.Collector.GET_AGENT_DYNAMIC_CONFIG_INTERVAL,TimeUnit.SECONDS);
}
獲取結果后的處理邏輯:CommandService 接收 Commands,先是放入到隊列中,
private LinkedBlockingQueue<BaseCommand> commands = new LinkedBlockingQueue<>(64);public void receiveCommand(Commands commands) {for (Command command : commands.getCommandsList()) {try {BaseCommand baseCommand = CommandDeserializer.deserialize(command);// 將結果放入隊列中boolean success = this.commands.offer(baseCommand);if (!success && LOGGER.isWarnEnable()) {}} catch (UnsupportedCommandException e) {}}
}
新開線程來消費隊列,commandExecutorService 處理 Commands,通過代碼調用鏈看到,最后依據 command 的類型找到真正指令執行器。
// CommandService#run
public void run() {final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class);while (isRunning) {try {// 消費隊列BaseCommand command = this.commands.take();// 判斷是否已經執行過了if (isCommandExecuted(command)) {continue;}// 分發 commandcommandExecutorService.execute(command);serialNumberCache.add(command.getSerialNumber());} catch (CommandExecutionException e) {}}
}// CommandExecutorService#execute
public void execute(final BaseCommand command) throws CommandExecutionException {this.executorForCommand(command).execute(command);
}
// CommandExecutorService#executorForCommand
private CommandExecutor executorForCommand(final BaseCommand command) {final CommandExecutor executor = this.commandExecutorMap.get(command.getCommand());if (executor != null) {return executor;}return NoopCommandExecutor.INSTANCE;
}
依據指令類型獲取具體的指令執行器,這里為 ConfigurationDiscoveryService,發現又調用了 ConfigurationDiscoveryService#handleConfigurationDiscoveryCommand 處理。
// ConfigurationDiscoveryService#handleConfigurationDiscoveryCommand
public void handleConfigurationDiscoveryCommand(ConfigurationDiscoveryCommand configurationDiscoveryCommand) {final String responseUuid = configurationDiscoveryCommand.getUuid();List<KeyStringValuePair> config = readConfig(configurationDiscoveryCommand);// 遍歷配置項config.forEach(property -> {String propertyKey = property.getKey();List<WatcherHolder> holderList = register.get(propertyKey);for (WatcherHolder holder : holderList) {if (holder != null) {// 依據配置項找到對應的 AgentConfigChangeWatcher,封裝成 ConfigChangeEvent AgentConfigChangeWatcher watcher = holder.getWatcher();String newPropertyValue = property.getValue();if (StringUtil.isBlank(newPropertyValue)) {if (watcher.value() != null) {// Notify watcher, the new value is null with delete event type.watcher.notify(new AgentConfigChangeWatcher.ConfigChangeEvent(null, AgentConfigChangeWatcher.EventType.DELETE));}} else {if (!newPropertyValue.equals(watcher.value())) {watcher.notify(new AgentConfigChangeWatcher.ConfigChangeEvent(newPropertyValue, AgentConfigChangeWatcher.EventType.MODIFY));}}}}});this.uuid = responseUuid;
}
ConfigurationDiscoveryService#handleConfigurationDiscoveryCommand 進行處理,遍歷配置項列表,依據 Key 找到對應的 AgentConfigChangeWatcher,進行 notify。
這個過程是不是很熟悉,跟 OAP 中處理邏輯不能說是完全一樣,簡直一模一樣。
AgentConfigChangeWatcher 是個抽象類,查看其具體實現,關注其注冊以及處理 value 的邏輯即可。
具體邏輯就不再展開細說了,需要自行了解下。
總之,agent 可以進行動態配置,能做的事情就多了,尤其是對 agent.config 中的配置大部分就可以實現動態管理了。