所用資源:
通過網盤分享的文件:spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar等4個文件
鏈接: https://pan.baidu.com/s/1zYHu29tLgDvS_L2Ud-22ZA?pwd=hnpg 提取碼: hnpg
1.需求分析 :
假定有一個手機通信計費系統,用戶通話時在基站交換機上臨時保存了相關記錄,由于交換機的容量 有限且分散在各地,因此需要及時將這些通話記錄匯總到計費系統中進行長時間保存,以方便后續的 統計分析。
2.準備工作:
(1)確保Kafka服務已經啟動,可在Linux終端窗體中使用jps命令查看具體的進程
spark@vm01:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties &
[1] 2770
spark@vm01:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties &
[2] 3128
spark@vm01:/usr/local/kafka$ jps
2770 QuorumPeerMain
3128 Kafka
2104 Main
3529 Jps
(2)創建從130到139的十個主題,為簡單起見,通過kafka附帶的腳本命令來完成
spark@vm01:/usr/local/kafka$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 130
Created topic 130.
查看:
spark@vm01:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181
130
(3)啟動HDFS服務,使用jps查看是否有相關進程在運行
spark@vm01:/usr/local/kafka$ start-dfs.sh
Starting namenodes on [localhost]
localhost: starting namenode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-namenode-vm01.out
localhost: starting datanode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-datanode-vm01.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-secondarynamenode-vm01.out
spark@vm01:/usr/local/kafka$ ^C
spark@vm01:/usr/local/kafka$ jps
4081 SecondaryNameNode
2770 QuorumPeerMain
4181 Jps
3128 Kafka
2104 Main
3710 NameNode
3887 DataNode
(4)在HDFS根目錄中創建datas目錄及日期的子目錄,根據自己當前運行的程序時間進行創建即可
spark@vm01:/usr/local/kafka$ cd
spark@vm01:~$ hdfs dfs -mkdir -p /datas/202505
spark@vm01:~$ hdfs dfs -mkdir -p /datas/202506
(5)在python3.6環境中安裝一個kafka-python庫,以便程序能夠正常訪問kafka,后面需要填寫一個專門的python程序來模擬基站交換機隨機產生通話記錄
spark@vm01:~$ sudo pip install kafka-python
[sudo] spark 的密碼:
(6)啟動PyCharm集成開發環境,在其中創建一個名為SparkKafkaBilling的項目,對應的Python解釋器使用python3.6即可
點擊file,newproject
文件名(位置):/home/spark/PycharmProjects/SparkKafkaBilling
編譯:python3.6
點擊創建
3.通話記錄生產者模擬:
(1)在新建的項目SparkKafkaBilling中創建CallMsgProducer.py文件,然后輸入代碼,負責按照要求的記錄格式模擬產生通話消息,并將其發送到Kafka的topic中。
from kafka import KafkaProducer
import random, datetime, time
# 產生一個以13開頭的手機號字符串,共11位
def gen_phone_num():
????phone = '13'
????for x in range(9):
????????phone = phone + str(random.randint(0, 9))
????return phone
(2)為了持續不斷地生成新的通話記錄信息,可以使用一個循環創建符合格式要求的通話記錄信息字符串,且每產生一條消息后休眠隨機的時長,然后繼續生成下一條通話記錄
# Kafka的消息生產者對象準備
producer = KafkaProducer(bootstrap_servers="localhost:9092")
working = True
tformat = '%Y-%m-%d %H:%M:%S' ????#設置時間日志格式
while working:
????# 主叫號碼,被叫號碼,呼叫時間(模擬當前時間的前一天),接通時間,掛斷時間
????src_phone = gen_phone_num()
????dst_phone = gen_phone_num()
????dail_time = datetime.datetime.now() + datetime.timedelta(days=-1)
????call_time = dail_time + datetime.timedelta(seconds=random.randint(0, 10))
????hangup_time = call_time + datetime.timedelta(seconds=random.randint(5, 600))
????# 將時間格式化為所需的字符串格式,類似2025-05-27 09:30:25
????s_dail_time = dail_time.strftime(tformat)
????s_call_time = call_time.strftime(tformat)
????s_hangup_time = hangup_time.strftime(tformat)
????# 生成通話記錄消息字符串
????record = '%s,%s,%s,%s,%s' % (src_phone, dst_phone, s_dail_time, s_call_time, s_hangup_time)
????print('send : ', record)
????# 通話記錄的主叫號碼前三位為topic主題
????topic = src_phone[0:3]
????# 將通話記錄字符串轉換為字節數組
????msg = bytes(record, encoding='utf-8')
????# 調用send()將通話記錄消息發送給Kafka
????producer.send(topic=topic, key=b"call",value=msg)
????# 休眠一個隨機的時長,為一個0-1秒之間的隨機小數
????time.sleep( random.random() )
producer.close()
4.消息接收者測試:
(1)在SparkKafkaBilling項目中創建CallMsgBilling.py文件,將Kafka中130~139這10個topic(主題)的消息接收并在屏幕上打印顯示出來
如果報錯 先執行(2),再重新運行
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext('local[2]','KafkaStreamBilling')
sc.setLogLevel("OFF")
ssc = StreamingContext(sc, 5)
streamRdd = KafkaUtils.createDirectStream(ssc,
???????????????topics = ['130','131','132','133','134',
?????????????????????????'135','136','137','138','139'],
???????????????kafkaParams = {"metadata.broker.list":"localhost:9092"} )
streamRdd.pprint()
ssc.start()
ssc.awaitTermination()
(2)打開一個Linux終端窗體,在其中輸入下面的命令,將消息接收者程序提交到Spark中運行,其中用到的spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar依賴庫文件此前已下載放在~/streaming目錄中,為避免每次提交應用程序時在命令行手動指定,可以將其復制到集群的各節點Spark安裝目錄中(位于/usr/local/spark/jars目錄)
spark@vm01:~$ ls streaming
?FileStreamDemo.py
'IDLE (Python 3.7 64-bit).lnk'
?KafkaStreamDemo.py
?NetworkWordCountall.py
?NetworkWordCount.py
?NetworkWordCount.py.txt
?NetworkWordCountSave.py
?NetworkWordCountWindow.py
?spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar
spark@vm01:~$ cp streaming/*.jar /usr/local/spark/jars
spark@vm01:~$ cd PycharmProjects/SparkKafkaBilling
spark@vm01:~/PycharmProjects/SparkKafkaBilling$ spark-submit CallMsgBilling.py
(3)回到PyCharm集成開發環境中,運行CallMsgProducer.py程序,在底部會源源不斷地顯示模擬產生的通話記錄消息
(4)再切換到運行消息接收者程序的Linux終端窗體,發現其不斷地接收發送過來的消息
從輸出結果可以清楚地看到,接收的Kafka消息是一系列(K,V)鍵值對形式的二元組,其中的K代表
CallMsgProducer.py程序中設定的"call"字符串,V代表消息內容。鍵(K)可以設置成任意字符串,當然
也可以不設置,實際使用的是二元組里面的值(V),即消息內容
5.Spark Streaming通話記錄消息處理:將生成的通話記錄消息進行簡單的處理并保存在HDFS中
(1)在項目的main.py文件中將原有代碼刪除,并添加下面的代碼
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from datetime import datetime
# 初始化sc、ssc、spark等幾個核心變量
spark = SparkSession.builder \
????.master('local[2]') \
????.appName('KafkaStreamingBilling') \
????.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("OFF") ?#關閉日志
ssc = StreamingContext(sc, 5)
spark = SparkSession(sc)
(2)定義process()和saveYmCallData()函數
# 定義一個處理消息的函數,返回一個通話記錄的元組
# (主叫號碼,呼叫時間,接通時間,掛斷時間,通話時長,被叫號碼,年月)
def process(x):
????v = x[1].split(',')
????tformat = '%Y-%m-%d %H:%M:%S'
????d1 = datetime.strptime(v[3], tformat)
????d2 = datetime.strptime(v[4], tformat)
????ym = '%d%02d' % (d1.year, d1.month)
????sec = (d2-d1).seconds
????minutes = sec//60 if sec%60==0 else sec//60+1
????return (v[0],v[2],v[3],v[4],str(minutes),v[1],ym)
# 根據參數row中的年月信息,獲取相應的通話消息記錄,并保存到HDFS
def saveYmCallData(row):
????year_month = row.ym
????path = "hdfs://localhost:9000/datas/" + year_month + "/"
????ymdf = spark.sql("select * from phonecall where ym='" + year_month +"'")
????ymdf.drop('ym').write.save(path, format="csv", mode="append")
(3)再定義一個save()函數,以實現DStream的通話記錄消息保存
# 保存DStream的消息記錄
def save(rdd):
????if not rdd.isEmpty():
????????rdd2 = rdd.map(lambda x: process(x))
????????print(rdd2.count())
????????df0 = rdd2.toDF(['src_phone', 'dail_time', 'call_time', 'hangup_time',
?????????????????????????'call_minutes', 'dst_phone', 'ym'])
????????df0.createOrReplaceTempView("phonecall")
????????df1 = spark.sql('select distinct ym from phonecall')
????????if df1.count() == 1:
????????????print('ooooooooooo')
????????????year_month = df1.first().ym
????????????path = "hdfs://localhost:9000/datas/" + year_month + "/"
????????????df0.drop("ym").write.save(path, format="csv", mode="append")
????????else:
????????????df1.foreach(saveYmCallData)
(4)通過Kafka數據源創建一個DSteam對象,并開始Spark Streaming應用程序的循環
# 從Kafka的多個topic中接收消息
streamRdd = KafkaUtils.createDirectStream(ssc,
???????????????topics = ['130','131','132','133','134',
?????????????????????????'135','136','137','138','139'],
???????????????kafkaParams = {"metadata.broker.list":"localhost:9092"})
streamRdd.pprint()
streamRdd.foreachRDD(save)
ssc.start()
ssc.awaitTermination()
(5)功能代碼編寫完畢,現在可以切換到Linux終端窗體,啟動main.py程序
spark@vm01:~/PycharmProjects/SparkKafkaBilling$ spark-submit main.py
(6)再打開一個新的Linux終端窗體,啟動消息生產者程序CallMsgProducer.py
cd ~/PycharmProjects/SparkKafkaBilling
spark@vm01:~/PycharmProjects/SparkKafkaBilling$ python CallMsgProducer.py
然后可以查看main.py程序所在終端窗體顯示的通話記錄消息
(7)最后,在HDFS上可以驗證收到的通話記錄消息是否被成功保存,注意應將下面目錄路徑中的年月改為實際的時間,這是因為數據是按照當前機器時間在運行的
spark@vm01:~$ hdfs dfs -cat /datas/202505/part-*
至此,我們就完成了整個通話記錄處理功能的實現