一、前言
為什么要設計kafka告警方案?現成的監控項目百度一下一大堆,KafkaOffsetMonitor、KafkaManager、 Burrow等,具體參考:kafka的消息擠壓監控。由于本小組的項目使用的kafka集群并沒有被公司的kafka-manager管理,所以只能自己簡單做一個告警。
二、告警方案
首先需要兩個定時任務,之間的通信依靠延遲隊列。
左邊的定時任務按周期掃面配置Topic-Consumer列表,通過kafka api獲取消費詳情并判斷消息積壓量是否已經大于閾值,如果閾值校驗失敗則放入延遲隊里。
右邊的定時任務按照周期從延遲隊列對應的真實隊列中取出一個Topic-Consumer關系,再次進行一下閾值的校驗,如果檢驗失敗才發送告警短信。
三、準備工作
1、依賴配置中心
配置中心是實現告警方案的一個關鍵點,通過配置中心可以動態獲取告警相關的屬性配置,并刷新對應的 java bean。如下是告警對應的配置bean。
@ConfigCenterBean @ConfigurationProperties(prefix = "wmhcontrol.alarm") @Component public class AlarmConstants extends BaseConfigCenterBean {private static Logger LOGGER = LoggerFactory.getLogger(AlarmConstants.class);//告警電話號碼 @ConfigFieldprivate String[] phones;//短信模板ID @ConfigFieldprivate String templateId;//延遲時間 @ConfigFieldprivate Integer delay = 600;//輪訓時間 @ConfigFieldprivate Integer period = 60;//獲取topic-consumer消費詳情地址 @ConfigFieldprivate String tcsr;//查看topic-consumer消費詳情地址 @ConfigFieldprivate String tclr;//全局統一閾值 @ConfigFieldprivate Integer threshold = 1000;//topic和consumer關系列表 @ConfigFieldprivate List<TCR> tcrs;@ToInitialprivate void refreshProperties() {try {super.doBind();LOGGER.info(String.format("%s 刷新成功..., 當前配置=%s...", this.getModuleName(), this));} catch (Exception e) {LOGGER.error("AlarmConstants 對象屬性綁定失敗...", e);}}private void toRefresh() {try {if (isCfgCenterEffect()) {ZookeeperPropertySource propertySource = ConfigHelper.getZookeeperPropertySource();if (ConfigCenterUtils.propertySourceShouldRefresh(this.getModuleName(), propertySource)) {this.refreshProperties();}}} catch (Exception e) {LOGGER.error("AlarmConstants 對象屬性刷新失敗", e);}}@ToRefreshpublic Integer getThreshold() {return threshold;}public void setThreshold(Integer threshold) {this.threshold = threshold;}@ToRefreshpublic List<TCR> getTcrs() {return tcrs;}public void setTcrs(List<TCR> tcrs) {this.tcrs = tcrs;}@ToRefreshpublic String getTcsr() {return tcsr;}public void setTcsr(String tcsr) {this.tcsr = tcsr;}@ToRefreshpublic Integer getPeriod() {return period;}public void setPeriod(Integer period) {this.period = period;}@ToRefreshpublic Integer getDelay() {return delay;}public void setDelay(Integer delay) {this.delay = delay;}@ToRefreshpublic String[] getPhones() {return phones;}public void setPhones(String[] phones) {this.phones = phones;}@ToRefreshpublic String getTemplateId() {return templateId;}public void setTemplateId(String templateId) {this.templateId = templateId;}@ToRefreshpublic String getTclr() {return tclr;}public void setTclr(String tclr) {this.tclr = tclr;}@Overridepublic String toString() {return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE, false, false, AlarmConstants.class);}@Overridepublic String getDefaultResourcePath() {return "config/alarm.properties";}@Overridepublic String getConfigPrefix() {return "wmhcontrol.alarm";}@Overridepublic String getModuleName() {return "告警配置";}@Overridepublic void refreshForEvent() {this.refreshProperties();}/*** topic 和 consumer之間的關系實體*/public static class TCR {private String topic;private String consumer;private String channel;private Integer threshold;public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}public String getConsumer() {return consumer;}public void setConsumer(String consumer) {this.consumer = consumer;}public String getChannel() {return channel;}public void setChannel(String channel) {this.channel = channel;}public Integer getThreshold() {return threshold;}public void setThreshold(Integer threshold) {this.threshold = threshold;}@Overridepublic String toString() {return "TCR{" +"topic='" + topic + '\'' +", consumer='" + consumer + '\'' +", channel='" + channel + '\'' +", threshold=" + threshold +'}';}}public static class TopicConsumerDetail {private String group;private String topic;private Integer pid;private Long offset;private Long logsize;@Overridepublic String toString() {return "TopicConsumerDetail{" +"group='" + group + '\'' +", topic='" + topic + '\'' +", pid=" + pid +", offset=" + offset +", logsize=" + logsize +", lag=" + lag +", owner='" + owner + '\'' +'}';}private Long lag;private String owner;public String getGroup() {return group;}public void setGroup(String group) {this.group = group;}public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}public Integer getPid() {return pid;}public void setPid(Integer pid) {this.pid = pid;}public Long getOffset() {return offset;}public void setOffset(Long offset) {this.offset = offset;}public Long getLogsize() {return logsize;}public void setLogsize(Long logsize) {this.logsize = logsize;}public Long getLag() {return lag;}public void setLag(Long lag) {this.lag = lag;}public String getOwner() {return owner;}public void setOwner(String owner) {this.owner = owner;}} }
告警有個全局統一的閾值,每一個topic可以指定不同的閾值。
配置中心 和 java bean建立關聯請參考:依賴配置中心實現注有@ConfigurationProperties的bean相關屬性刷新。
2、定時任務的周期性可動態配置
借助?org.springframework.scheduling.annotation.SchedulingConfigurer。
由@EnableScheduling注釋的@Configuration類實現的可選接口。通常用于設置在執行計劃任務時使用的特定TaskScheduler bean,或者用于以編程方式注冊計劃任務,而不是使用@Scheduled注釋的聲明性方法。例如,在實現基于觸發器的任務時可能需要這樣做,而@Scheduled注釋不支持這些任務。
基本的代碼輪廓如下。
@Configuration public class MessageCenterAlarmTask implements SchedulingConfigurer {@Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {try {//每5s檢測一下隊列taskRegistrar.addFixedRateTask(() -> {}, 5 * 1000L);//動態修改定時任務周期taskRegistrar.addTriggerTask(() -> {}, triggerContext -> new PeriodicTrigger(alarmConstants.getPeriod(), TimeUnit.SECONDS).nextExecutionTime(triggerContext));} catch (Exception e) {LOGGER.error("消息中心topic-consumer定時任務初始化失敗...", e);}} }
上面的代碼中的定時任務分別表示了告警方案中右邊和左邊的定時任務。
3、延遲隊列的實現
借助redisson分布式延遲隊列 或者 java delayqueue + redistemplate 實現分布式延遲隊列。
參考:Redisson分布式延遲隊列官方文檔
參考:Redisson DelayedQueue實現原理
Redisson的集群模式配置如下。
public class RedissonBuilder {private static Logger LOGGER = LoggerFactory.getLogger(RedissonBuilder.class);public static RedissonClient getRedisson(String cluster) {String[] nodes = cluster.split(",");for (int i = 0; i < nodes.length; i++) {nodes[i] = "redis://" + nodes[i];}Config config = new Config();config.useClusterServers() //這是用的集群server.setScanInterval(2000) //設置集群狀態掃描時間.setConnectTimeout(2000).addNodeAddress(nodes);try {RedissonClient rc = Redisson.create(config);return rc;} catch (Exception e) {LOGGER.error("RedissonClient getRedisson errors...", e);return null;}} }@Configuration public class RedissonConfig {private static Logger LOGGER = LoggerFactory.getLogger(RedissonConfig.class);@Beanpublic RedissonClient redissonClient(@Value("${redisAddress}") String cacheAddress) {RedissonClient rc = RedissonBuilder.getRedisson(cacheAddress);try {if (!Objects.isNull(rc)) {LOGGER.info(rc.getConfig().toJSON());}} catch (IOException e) {LOGGER.error("RedissonConfig redissonClient errors... ", e);}return rc;}}
Redisson創建延遲隊列方式
RQueue<AlarmConstants.TCR> topicConsumerQueue = redissonClient.getQueue(commonRedisKey + "message_center_tcrs");
RDelayedQueue<AlarmConstants.TCR> topicConsumerDelayedQueue = redissonClient.getDelayedQueue(topicConsumerQueue);
首先創建目標隊列,然后通過目標隊列拿到延遲隊列。
4、kafka api返回數據處理
參考:簡單封裝kafka相關的api
更具topic和consumer可以拿到如下數據。其中Lag對應的這一列表示未消費的消息數量。
需要做的是把如上數據映射到?AlarmConstants.TopicConsumerDetail 這個java bean中,借助Spring BeanWrapperImpl,如下。
private static List<AlarmConstants.TopicConsumerDetail> retrieveDetail(String detailResponse) {List<AlarmConstants.TopicConsumerDetail> result = new ArrayList<>();try {Scanner scanner = new Scanner(detailResponse.replace("<pre>", StringUtils.EMPTY).replace("</pre>", StringUtils.EMPTY));String[] propertyNames = null;
//第一行對應java bean的field nameif (scanner.hasNextLine()) {String nameLine = scanner.nextLine();if (StringUtils.isBlank(nameLine)) {return result;}propertyNames = Arrays.stream(nameLine.split("\\s+")).map(propertyName -> propertyName.toLowerCase()).toArray(String[]::new);}
//剩余行對應java bean的field valuewhile (scanner.hasNextLine()) {AlarmConstants.TopicConsumerDetail tcd = new AlarmConstants.TopicConsumerDetail();BeanWrapper br = new BeanWrapperImpl(tcd);String valueLine = scanner.nextLine();if (StringUtils.isBlank(valueLine)) {continue;}String[] propertyValues = valueLine.split("\\s+");for (int i = 0; i < propertyValues.length; i++) { br.setPropertyValue(propertyNames[i], propertyValues[i]);}result.add(tcd);}LOGGER.info("消息中心提取topic-consumer詳情信息:" + result);} catch (Exception e) {LOGGER.error("消息中心提取topic-consumer信息異常..." + detailResponse, e);}return result; }
處理之后的效果如下。
[TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=0, offset=10956087, logsize=10956091, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=1, offset=10950487, logsize=10950502, lag=15, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=2, offset=10958523, logsize=10958529, lag=6, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=3, offset=10955709, logsize=10955717, lag=8, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=4, offset=10956550, logsize=10956563, lag=13, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=5, offset=10956343, logsize=10956347, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=6, offset=10954124, logsize=10954128, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=7, offset=10949075, logsize=10949082, lag=7, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=8, offset=10963839, logsize=10963843, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=9, offset=10958536, logsize=10958540, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=10, offset=10955316, logsize=10955327, lag=11, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=11, offset=10957850, logsize=10957856, lag=6, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=12, offset=10954508, logsize=10954515, lag=7, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=13, offset=10960468, logsize=10960477, lag=9, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=14, offset=10955540, logsize=10955544, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}]
四、告警邏輯
1、短息發送
private String toShortMessage(AlarmConstants.TCR tcr, Long lag) {JSONObject info = new JSONObject();StringBuilder text = new StringBuilder();String messageDate = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);text.append("【Topic-Consumer閾值告警 " + messageDate + "】\r\n");text.append("\t渠道: " + tcr.getChannel() + "\r\n");text.append("\t主題: " + tcr.getTopic() + "\r\n");text.append("\t消費: " + tcr.getConsumer() + "\r\n");text.append("\t閾值: " + (Objects.isNull(tcr.getThreshold()) ? alarmConstants.getThreshold() : tcr.getThreshold()) + "\r\n");text.append("\t堆積: " + lag + "\r\n");try {String refUrl = alarmConstants.getTclr() + "?topic=" + tcr.getTopic() + "&group=" + tcr.getConsumer();JSONObject params = new JSONObject();params.put("url", refUrl);String shortUrlResponse = HttpClient.post("https://dwz.cn/admin/create", params.toJSONString(), "application/json");LOGGER.info("獲取短鏈接返回內容:" + shortUrlResponse);if (StringUtils.isNotBlank(shortUrlResponse)) {JSONObject shortUrlJson = JSON.parseObject(shortUrlResponse);Integer code = (Integer) FastJsonUtils.search(shortUrlJson, "Code");if (Integer.valueOf(0).equals(code)) {String shortUrl = (String) FastJsonUtils.search(shortUrlJson, "ShortUrl");if (StringUtils.isNotBlank(shortUrl)) {text.append("\t查看: " + shortUrl + "\r\n");}}}} catch (Exception e) {LOGGER.error("短鏈接生成異常...", e);}info.put("txt_name", "消息中心");info.put("txt_result", text.toString());return info.toJSONString(); }
2、閾值校驗
private Long thresholdCheck(AlarmConstants.TCR tcr) {String detailUrl = alarmConstants.getTcsr() + "?topic=" + tcr.getTopic() + "&group=" + tcr.getConsumer();String detailResponseStr = HttpClient.get(detailUrl);LOGGER.info(detailUrl + " " + detailResponseStr);List<AlarmConstants.TopicConsumerDetail> detailResponse = retrieveDetail(detailResponseStr);if (CollectionUtils.isEmpty(detailResponse)) {return -1L;}Long lag = detailResponse.stream().mapToLong(AlarmConstants.TopicConsumerDetail::getLag).sum();Long threshold = Long.valueOf(Objects.isNull(tcr.getThreshold()) ? alarmConstants.getThreshold() : tcr.getThreshold());if (lag.compareTo(threshold) > 0) {return lag;}return -1L; }
3、兩個定時任務邏輯補充
@Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {try {RQueue<AlarmConstants.TCR> topicConsumerQueue = redissonClient.getQueue(commonRedisKey + "message_center_tcrs");RDelayedQueue<AlarmConstants.TCR> topicConsumerDelayedQueue = redissonClient.getDelayedQueue(topicConsumerQueue);//每5s檢測一下隊列taskRegistrar.addFixedRateTask(() -> {AlarmConstants.TCR tcr = topicConsumerQueue.poll();if (!Objects.isNull(tcr)) {//發送告警信息Long lag = thresholdCheck(tcr);if (lag > 0) {if (ArrayUtils.isNotEmpty(alarmConstants.getPhones())) {String message = toShortMessage(tcr, lag);String tmplateId = alarmConstants.getTemplateId();LOGGER.info("消息中心告警短信內容:" + message);for (String phone : alarmConstants.getPhones()) {try {MessageUtils.sendMessage(phone, messageUrl, message, tmplateId);} catch (Exception e) {LOGGER.error(String.format("消息中心告警短信發送異常...%s %s %s", phone, messageUrl, message), e);}}}}}}, 5 * 1000L);taskRegistrar.addTriggerTask(() -> {RLock lock = null;try {lock = redissonClient.getLock(commonRedisKey + "TopicConsumerForEach");// 嘗試加鎖,最多等待5秒,上鎖以后5秒自動解鎖if (!lock.tryLock(5, 5, TimeUnit.SECONDS)) {return;}if (!CollectionUtils.isEmpty(alarmConstants.getTcrs())) {alarmConstants.getTcrs().stream().filter(tcr -> !topicConsumerDelayedQueue.contains(tcr) && (thresholdCheck(tcr) > 0)).forEach(tcr -> topicConsumerDelayedQueue.offer(tcr, alarmConstants.getDelay(), TimeUnit.SECONDS));}} catch (Exception e) {LOGGER.error("消息中心topic-consumer定時任務執行失敗...", e);} finally {if (!Objects.isNull(lock)) {lock.unlock();}} //動態周期性檢測Topic-Consumer閾值}, triggerContext -> new PeriodicTrigger(alarmConstants.getPeriod(), TimeUnit.SECONDS).nextExecutionTime(triggerContext));} catch (Exception e) {LOGGER.error("消息中心topic-consumer定時任務初始化失敗...", e);} }
五、告警定時任務源碼
請關注訂閱號(算法和技術SHARING),回復:kafka告警, 便可查看。