1.導入依賴?:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
2.編寫代碼?:
創建SparkConf和StreamingContext。
定義Kafka相關參數,如bootstrap servers、group id、key和value的deserializer。
使用KafkaUtils.createDirectStream方法創建DStream,該方法接受StreamingContext、位置策略、消費者策略等參數。
提取數據中的value部分,并進行word count計算。
啟動StreamingContext并等待其終止。
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object DirectAPI {
? def main(args: Array[String]): Unit = {
??? val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")
??? val ssc = new StreamingContext(sparkConf,Seconds(3))
??? //定義kafka相關參數
??? val kafkaPara :Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
????? ->"node01:9092,node02:9092,node03:9092",
????? ConsumerConfig.GROUP_ID_CONFIG->"kafka",
????? "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
????? "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
??? )
??? //通過讀取kafka數據,創建DStream
??? val kafkaDStream:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
????? ssc,LocationStrategies.PreferConsistent,
????? ConsumerStrategies.Subscribe[String,String](Set("kafka"),kafkaPara)
??? )
??? //提取出數據中的value部分
??? val valueDStream :DStream[String] = kafkaDStream.map(record=>record.value())
??? //wordCount計算邏輯
??? valueDStream.flatMap(_.split(" "))
????? .map((_,1))
????? .reduceByKey(_+_)
????? .print()
??? ssc.start()
??? ssc.awaitTermination()
? }
? }
3.運行程序?:
開啟Kafka集群。
4.使用Kafka生產者產生數據。
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka
?5、運行Spark Streaming程序,接收Kafka生產的數據并進行處理。
6.查看消費進度?:
使用Kafka提供的kafka-consumer-groups.sh腳本查看消費組的消費進度。