以下結合案例:統計消息中單詞出現次數,來測試并說明kafka消息流式處理的執行流程
Maven依賴
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency></dependencies>
準備工作
首先編寫創建三個類,分別作為消息生產者、消息消費者、流式處理者
KafkaStreamProducer
:消息生產者
public class KafkaStreamProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//kafka的連接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//發送失敗,失敗的重試次數properties.put(ProducerConfig.RETRIES_CONFIG, 5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for (int i = 0; i < 5; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");producer.send(producerRecord);}producer.close();}
}
該消息生產者向主題kafka-stream-topic-input
發送五次hello kafka
KafkaStreamConsumer
:消息消費者
public class KafkaStreamConsumer {public static void main(String[] args) {Properties properties = new Properties();//kafka的連接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");//消費者組properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//手動提交偏移量properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//訂閱主題consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));try {while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println("consumerRecord.key() = " + consumerRecord.key());System.out.println("consumerRecord.value() = " + consumerRecord.value());}// 異步提交偏移量consumer.commitAsync();}} catch (Exception e) {e.printStackTrace();} finally {// 同步提交偏移量consumer.commitSync();}}
}
KafkaStreamQuickStart
:流式處理類
public class KafkaStreamQuickStart {public static void main(String[] args) {Properties properties = new Properties();properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");StreamsBuilder streamsBuilder = new StreamsBuilder();//流式計算streamProcessor(streamsBuilder);KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);kafkaStreams.start();}/*** 消息格式:hello world hello world* 配置并處理流數據。* 使用StreamsBuilder創建并配置KStream,對輸入的主題中的數據進行處理,然后將處理結果發送到輸出主題。* 具體處理包括:分割每個消息的值,按值分組,對每個分組在10秒的時間窗口內進行計數,然后將結果轉換為KeyValue對并發送到輸出主題。** @param streamsBuilder 用于構建KStream對象的StreamsBuilder。*/private static void streamProcessor(StreamsBuilder streamsBuilder) {// 從"kafka-stream-topic-input"主題中讀取數據流KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");System.out.println("stream = " + stream);// 將每個值按空格分割成數組,并將數組轉換為列表,以擴展單個消息的值stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);})// 按消息的值進行分組,為后續的窗口化計數操作做準備.groupBy((key, value) -> value)// 定義10秒的時間窗口,在每個窗口內對每個分組進行計數.windowedBy(TimeWindows.of(Duration.ofSeconds(10))).count()// 將計數結果轉換為流,以便進行進一步的處理和轉換.toStream()// 顯示鍵值對的內容,并將鍵和值轉換為字符串格式.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());})// 將處理后的流數據發送到"kafka-stream-topic-output"主題.to("kafka-stream-topic-output");}}
該處理類首先從主題kafka-stream-topic-input
中獲取消息數據,經處理后發送到主題kafka-stream-topic-output
中,再由消息消費者KafkaStreamConsumer
進行消費
執行結果
流式處理流程及原理說明
初始階段
當從輸入主題kafka-stream-topic-input
讀取數據流時,每個消息都是一個鍵值對。假設輸入消息的鍵是null
或一個特定的字符串,這取決于消息是如何被發送到輸入主題的。
KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
分割消息值
使用flatMapValues
方法分割消息的值,但這個操作不會改變消息的鍵。如果輸入消息的鍵是null
,那么在這個階段消息的鍵仍然是null
。
stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {String[] valAry = value.split(" ");return Arrays.asList(valAry);
})
按消息的值進行分組
在 Kafka Streams 中,當使用groupBy
方法對流進行分組時,實際上是在指定一個新的鍵,這個鍵將用于后續的窗口化操作和聚合操作。在這個案例中groupBy
方法被用來按消息的值進行分組:
.groupBy((key, value) -> value)
這意味著在分組操作之后,流中的每個消息的鍵被設置為消息的值。因此,當你在后續的map
方法中看到key
參數時,這個key
實際上是消息的原始值,因為在groupBy
之后,消息的值已經變成了鍵。
定義時間窗口并計數
在這個階段,消息被窗口化并計數,但是鍵保持不變。
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()
將計數結果轉換為流
當將計數結果轉換為流時,鍵仍然是之前分組時的鍵
.toStream()
處理和轉換結果
在map
方法中,你看到的key
參數實際上是分組后的鍵,也就是消息的原始值:
.map((key, value) -> {System.out.println("key = " + key);System.out.println("value = " + value);return new KeyValue<>(key.key().toString(), value.toString());
})
map
方法中的key.key().toString()
是為了獲取鍵的字符串表示,而value.toString()
是為了將計數值轉換為字符串。
將處理后的數據發送到輸出主題
.to("kafka-stream-topic-output");