生命無罪,健康萬歲,我是laity。
我曾七次鄙視自己的靈魂:
第一次,當它本可進取時,卻故作謙卑;
第二次,當它在空虛時,用愛欲來填充;
第三次,在困難和容易之間,它選擇了容易;
第四次,它犯了錯,卻借由別人也會犯錯來寬慰自己;
第五次,它自由軟弱,卻把它認為是生命的堅韌;
第六次,當它鄙夷一張丑惡的嘴臉時,卻不知那正是自己面具中的一副;
第七次,它側身于生活的污泥中,雖不甘心,卻又畏首畏尾。
基于Kafka實現簡單的延時隊列
業務場景:
listener kafka 中的指定topic,接收并處理其中的message,再基于websocket向前端推送數據,前端接收到數據后將數據放置到定時隊列中,進行5s的倒計時
,情況1:時間到了進行觸發下一步的接口(該操作為自動操作)
,情況2:時間未到有人為干預進行點擊進入下一步的接口(該操作為人工操作)。
問題:當前端頁面進行切換頁面后,前端是無法將定時隊列中的數據進行存儲,從而進行清空;
解決方案:kafka的延時隊列解決切換頁面后未處理的message;
1.解決流程:
(1)listener 到 message 并處理后直接進行走一遍自動操作,
(2)并將存入庫中的saveId進行返回至websocket推向前端的JSON數據中,
(3)再通過寫好的send方法將JSONObject發送至kafka的消息隊列中,
(4)之后不論人工還是自動都進行update操作(基于saveId可以去查詢到’自動處理’時的留痕,判斷update_time的是否存在,不在則進行update(因為觸發update只有當前端時間到了或kafka延時隊列的listener))。
實現流程
1.引入pom依賴
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><optional>true</optional>
</dependency>
2.yaml進行自動配置
spring:application:# 應用名稱name: youServerNamekafka:bootstrap-servers: youKafkaIp:9092consumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerkey-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
3.各各類的實現
DelayEnum.java
import lombok.AllArgsConstructor;
import lombok.Getter;/*** @author laity*/
@Getter
@AllArgsConstructor
public enum DelayEnum {FIVE_S(5, "topic_message_5s"),TEN_S(10, "topic_message_10s");private final int delay_time;private final String topic_name;
}
KafkaDelayMsg.java
import lombok.Data;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author laity */
@Data
public class KafkaDelayMsg implements Delayed {private String msg; // content - 可以換成對應的實體private DelayEnum delayEnum;private long time;public KafkaDelayMsg() {}public KafkaDelayMsg(String msg, DelayEnum delayEnum) {this.msg = msg;this.delayEnum = delayEnum;this.time = System.currentTimeMillis() + delayEnum.getDelay_time() * 1000;}/* 延時隊列實現的關鍵 */@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return Long.compare(this.time, ((KafkaDelayMsg) o).time);}
}
KafkaUtil.java
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;import java.util.HashMap;
import java.util.Map;/*** @author laity*/
@Slf4j
public class KafkaUtil {KafkaProducer<String, String> kafkaProducer = null;private static final Map<String, Object> map = new HashMap<>();public KafkaUtil() {map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.x.x:9092");map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());}// sendpublic void send(KafkaDelayMsg msg) {KafkaProducer producer = new KafkaProducer(map);if (msg.getDelayEnum().getDelay_time() == DelayEnum.FIVE_S.getDelay_time()) {KafkaDelayQueue.FIVE_S.add(msg);}KafkaDelayQueue.run(producer);}
}
KafkaDelayQueue.java
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @author laity 設計隊列*/
@Slf4j
public class KafkaDelayQueue {public static DelayQueue<KafkaDelayMsg> FIVE_S = new DelayQueue<>();// 實際sendMsg的functionpublic static void run(KafkaProducer<String, String> producer) {ExecutorService executorService = Executors.newFixedThreadPool(5);executorService.execute(() -> {log.info("=============== 開始推送執行 ==================");while (true) {try {KafkaDelayMsg take = FIVE_S.take();// 推送數據RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(take.getDelayEnum().getTopic_name(), take.getMsg())).get();log.info("============================= CONTENT:" + take.toString() + "-" + recordMetadata.topic() + "-" + recordMetadata.partition() + " ===============================");} catch (Exception e) {e.printStackTrace();}}});}
}
KafkaConsumer.java - 業務代碼
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;/*** @author laity 消費*/@Component
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class KafkaConsumer {private final KelCjTaskResultMapper mapper;// beanRef的功能public String getTopicName() {return DelayEnum.FIVE_S.getTopic_name();}// @KafkaListener(topics = "send_message_5s", groupId = "consumer-laity")@KafkaListener(topics = "#{__listener.getTopicName()}", groupId = "consumer-laity")public void listen(String message) {// 1.實現你的業務}
}
總結
年輕人,你的職責是平整土地,而非焦慮時光。你做三四月的事,在八九月自有答案。我是Laity,正在前進的Laity。