Spark Streaming是Apache Spark中用于實時流數據處理的模塊。以下是一些常見功能的實用PySpark代碼示例:
- 基礎流處理:從TCP套接字讀取數據并統計單詞數量
from pyspark import `SparkContext
from pyspark.streaming import StreamingContext# 創建SparkContext和StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1) # 1秒的批處理間隔# 創建一個DStream,從TCP源讀取數據
lines = ssc.socketTextStream("localhost", 9999)# 對每一行數據進行分詞,映射為(word, 1)的鍵值對,然后按單詞統計數量
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)# 打印每個RDD中的前10個元素
word_counts.pprint()# 啟動流計算
ssc.start()
# 等待流計算結束
ssc.awaitTermination()
在上述代碼中:
- sc 是 SparkContext ,用于與Spark集群交互。
- ssc 是 StreamingContext ,定義了批處理間隔。
- lines 是一個 DStream ,從指定的TCP套接字讀取數據。
- words 對每行數據進行分詞, word_counts 統計每個單詞出現的次數。
- pprint 方法打印每個批次的前10個元素。
- 使用窗口函數
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "WindowedWordCount")
ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用窗口函數,窗口大小為3秒,滑動間隔為1秒
windowed_word_counts = word_counts.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 3, 1)windowed_word_counts.pprint()ssc.start()
ssc.awaitTermination()
在這個示例中:
- reduceByKeyAndWindow 方法用于在窗口上進行聚合操作。
- 第一個參數是用于合并窗口內元素的函數,第二個參數是用于移除窗口外元素的函數。
- 狀態更新
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "StatefulWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint") # 啟用檢查點def updateFunction(new_values, running_count):if running_count is None:running_count = 0return sum(new_values, running_count)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用updateStateByKey進行狀態更新
stateful_word_counts = word_counts.updateStateByKey(updateFunction)stateful_word_counts.pprint()ssc.start()
ssc.awaitTermination()
在上述代碼中:
- updateStateByKey 方法用于維護每個鍵的狀態。
- updateFunction 定義了如何根據新值和現有狀態更新狀態。
- 與Kafka集成
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilssc = SparkContext("local[2]", "KafkaWordCount")
ssc = StreamingContext(sc, 1)# Kafka參數
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topics = ["test"]# 創建Kafka輸入DStream
kvs = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)
lines = kvs.map(lambda x: x[1])words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)word_counts.pprint()ssc.start()
ssc.awaitTermination()
在這個示例中:
- KafkaUtils.createDirectStream 用于從Kafka主題讀取數據。
- kvs 是一個包含Kafka消息的DStream, lines 提取消息內容。