Spark實時流數據處理實例(SparkStreaming通話記錄消息處理)

所用資源:

通過網盤分享的文件:spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar等4個文件
鏈接: https://pan.baidu.com/s/1zYHu29tLgDvS_L2Ud-22ZA?pwd=hnpg 提取碼: hnpg

1.需求分析 :

假定有一個手機通信計費系統,用戶通話時在基站交換機上臨時保存了相關記錄,由于交換機的容量 有限且分散在各地,因此需要及時將這些通話記錄匯總到計費系統中進行長時間保存,以方便后續的 統計分析。

2.準備工作:

(1)確保Kafka服務已經啟動,可在Linux終端窗體中使用jps命令查看具體的進程

spark@vm01:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties &

[1] 2770

spark@vm01:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties &

[2] 3128

spark@vm01:/usr/local/kafka$ jps

2770 QuorumPeerMain

3128 Kafka

2104 Main

3529 Jps

(2)創建從130到139的十個主題,為簡單起見,通過kafka附帶的腳本命令來完成

spark@vm01:/usr/local/kafka$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 130

Created topic 130.

查看:

spark@vm01:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181

130

(3)啟動HDFS服務,使用jps查看是否有相關進程在運行

spark@vm01:/usr/local/kafka$ start-dfs.sh

Starting namenodes on [localhost]

localhost: starting namenode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-namenode-vm01.out

localhost: starting datanode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-datanode-vm01.out

Starting secondary namenodes [0.0.0.0]

0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-secondarynamenode-vm01.out

spark@vm01:/usr/local/kafka$ ^C

spark@vm01:/usr/local/kafka$ jps

4081 SecondaryNameNode

2770 QuorumPeerMain

4181 Jps

3128 Kafka

2104 Main

3710 NameNode

3887 DataNode

(4)在HDFS根目錄中創建datas目錄及日期的子目錄,根據自己當前運行的程序時間進行創建即可

spark@vm01:/usr/local/kafka$ cd

spark@vm01:~$ hdfs dfs -mkdir -p /datas/202505

spark@vm01:~$ hdfs dfs -mkdir -p /datas/202506

(5)在python3.6環境中安裝一個kafka-python庫,以便程序能夠正常訪問kafka,后面需要填寫一個專門的python程序來模擬基站交換機隨機產生通話記錄

spark@vm01:~$ sudo pip install kafka-python

[sudo] spark 的密碼:

(6)啟動PyCharm集成開發環境,在其中創建一個名為SparkKafkaBilling的項目,對應的Python解釋器使用python3.6即可

點擊file,newproject

文件名(位置):/home/spark/PycharmProjects/SparkKafkaBilling

編譯:python3.6

點擊創建

3.通話記錄生產者模擬:

(1)在新建的項目SparkKafkaBilling中創建CallMsgProducer.py文件,然后輸入代碼,負責按照要求的記錄格式模擬產生通話消息,并將其發送到Kafka的topic中。

from kafka import KafkaProducer

import random, datetime, time

# 產生一個以13開頭的手機號字符串,共11位

def gen_phone_num():

????phone = '13'

????for x in range(9):

????????phone = phone + str(random.randint(0, 9))

????return phone

(2)為了持續不斷地生成新的通話記錄信息,可以使用一個循環創建符合格式要求的通話記錄信息字符串,且每產生一條消息后休眠隨機的時長,然后繼續生成下一條通話記錄

# Kafka的消息生產者對象準備

producer = KafkaProducer(bootstrap_servers="localhost:9092")

working = True

tformat = '%Y-%m-%d %H:%M:%S' ????#設置時間日志格式

while working:

????# 主叫號碼,被叫號碼,呼叫時間(模擬當前時間的前一天),接通時間,掛斷時間

????src_phone = gen_phone_num()

????dst_phone = gen_phone_num()

????dail_time = datetime.datetime.now() + datetime.timedelta(days=-1)

????call_time = dail_time + datetime.timedelta(seconds=random.randint(0, 10))

????hangup_time = call_time + datetime.timedelta(seconds=random.randint(5, 600))

????# 將時間格式化為所需的字符串格式,類似2025-05-27 09:30:25

????s_dail_time = dail_time.strftime(tformat)

????s_call_time = call_time.strftime(tformat)

????s_hangup_time = hangup_time.strftime(tformat)

????# 生成通話記錄消息字符串

????record = '%s,%s,%s,%s,%s' % (src_phone, dst_phone, s_dail_time, s_call_time, s_hangup_time)

????print('send : ', record)

????# 通話記錄的主叫號碼前三位為topic主題

????topic = src_phone[0:3]

????# 將通話記錄字符串轉換為字節數組

????msg = bytes(record, encoding='utf-8')

????# 調用send()將通話記錄消息發送給Kafka

????producer.send(topic=topic, key=b"call",value=msg)

????# 休眠一個隨機的時長,為一個0-1秒之間的隨機小數

????time.sleep( random.random() )

producer.close()

4.消息接收者測試:

(1)在SparkKafkaBilling項目中創建CallMsgBilling.py文件,將Kafka中130~139這10個topic(主題)的消息接收并在屏幕上打印顯示出來

如果報錯 先執行(2),再重新運行

from pyspark.streaming.kafka import KafkaUtils

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

sc = SparkContext('local[2]','KafkaStreamBilling')

sc.setLogLevel("OFF")

ssc = StreamingContext(sc, 5)

streamRdd = KafkaUtils.createDirectStream(ssc,

???????????????topics = ['130','131','132','133','134',

?????????????????????????'135','136','137','138','139'],

???????????????kafkaParams = {"metadata.broker.list":"localhost:9092"} )

streamRdd.pprint()

ssc.start()

ssc.awaitTermination()

(2)打開一個Linux終端窗體,在其中輸入下面的命令,將消息接收者程序提交到Spark中運行,其中用到的spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar依賴庫文件此前已下載放在~/streaming目錄中,為避免每次提交應用程序時在命令行手動指定,可以將其復制到集群的各節點Spark安裝目錄中(位于/usr/local/spark/jars目錄)

spark@vm01:~$ ls streaming

?FileStreamDemo.py

'IDLE (Python 3.7 64-bit).lnk'

?KafkaStreamDemo.py

?NetworkWordCountall.py

?NetworkWordCount.py

?NetworkWordCount.py.txt

?NetworkWordCountSave.py

?NetworkWordCountWindow.py

?spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar

spark@vm01:~$ cp streaming/*.jar /usr/local/spark/jars

spark@vm01:~$ cd PycharmProjects/SparkKafkaBilling

spark@vm01:~/PycharmProjects/SparkKafkaBilling$ spark-submit CallMsgBilling.py

(3)回到PyCharm集成開發環境中,運行CallMsgProducer.py程序,在底部會源源不斷地顯示模擬產生的通話記錄消息

(4)再切換到運行消息接收者程序的Linux終端窗體,發現其不斷地接收發送過來的消息

從輸出結果可以清楚地看到,接收的Kafka消息是一系列(K,V)鍵值對形式的二元組,其中的K代表

CallMsgProducer.py程序中設定的"call"字符串,V代表消息內容。鍵(K)可以設置成任意字符串,當然

也可以不設置,實際使用的是二元組里面的值(V),即消息內容

5.Spark Streaming通話記錄消息處理:將生成的通話記錄消息進行簡單的處理并保存在HDFS中

(1)在項目的main.py文件中將原有代碼刪除,并添加下面的代碼

from pyspark.streaming.kafka import KafkaUtils

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.sql import SparkSession

from datetime import datetime

# 初始化sc、ssc、spark等幾個核心變量

spark = SparkSession.builder \

????.master('local[2]') \

????.appName('KafkaStreamingBilling') \

????.getOrCreate()

sc = spark.sparkContext

sc.setLogLevel("OFF") ?#關閉日志

ssc = StreamingContext(sc, 5)

spark = SparkSession(sc)

(2)定義process()和saveYmCallData()函數

# 定義一個處理消息的函數,返回一個通話記錄的元組

# (主叫號碼,呼叫時間,接通時間,掛斷時間,通話時長,被叫號碼,年月)

def process(x):

????v = x[1].split(',')

????tformat = '%Y-%m-%d %H:%M:%S'

????d1 = datetime.strptime(v[3], tformat)

????d2 = datetime.strptime(v[4], tformat)

????ym = '%d%02d' % (d1.year, d1.month)

????sec = (d2-d1).seconds

????minutes = sec//60 if sec%60==0 else sec//60+1

????return (v[0],v[2],v[3],v[4],str(minutes),v[1],ym)

# 根據參數row中的年月信息,獲取相應的通話消息記錄,并保存到HDFS

def saveYmCallData(row):

????year_month = row.ym

????path = "hdfs://localhost:9000/datas/" + year_month + "/"

????ymdf = spark.sql("select * from phonecall where ym='" + year_month +"'")

????ymdf.drop('ym').write.save(path, format="csv", mode="append")

(3)再定義一個save()函數,以實現DStream的通話記錄消息保存

# 保存DStream的消息記錄

def save(rdd):

????if not rdd.isEmpty():

????????rdd2 = rdd.map(lambda x: process(x))

????????print(rdd2.count())

????????df0 = rdd2.toDF(['src_phone', 'dail_time', 'call_time', 'hangup_time',

?????????????????????????'call_minutes', 'dst_phone', 'ym'])

????????df0.createOrReplaceTempView("phonecall")

????????df1 = spark.sql('select distinct ym from phonecall')

????????if df1.count() == 1:

????????????print('ooooooooooo')

????????????year_month = df1.first().ym

????????????path = "hdfs://localhost:9000/datas/" + year_month + "/"

????????????df0.drop("ym").write.save(path, format="csv", mode="append")

????????else:

????????????df1.foreach(saveYmCallData)

(4)通過Kafka數據源創建一個DSteam對象,并開始Spark Streaming應用程序的循環

# 從Kafka的多個topic中接收消息

streamRdd = KafkaUtils.createDirectStream(ssc,

???????????????topics = ['130','131','132','133','134',

?????????????????????????'135','136','137','138','139'],

???????????????kafkaParams = {"metadata.broker.list":"localhost:9092"})

streamRdd.pprint()

streamRdd.foreachRDD(save)

ssc.start()

ssc.awaitTermination()

(5)功能代碼編寫完畢,現在可以切換到Linux終端窗體,啟動main.py程序

spark@vm01:~/PycharmProjects/SparkKafkaBilling$ spark-submit main.py

(6)再打開一個新的Linux終端窗體,啟動消息生產者程序CallMsgProducer.py

cd ~/PycharmProjects/SparkKafkaBilling

spark@vm01:~/PycharmProjects/SparkKafkaBilling$ python CallMsgProducer.py

然后可以查看main.py程序所在終端窗體顯示的通話記錄消息

(7)最后,在HDFS上可以驗證收到的通話記錄消息是否被成功保存,注意應將下面目錄路徑中的年月改為實際的時間,這是因為數據是按照當前機器時間在運行的

spark@vm01:~$ hdfs dfs -cat /datas/202505/part-*

至此,我們就完成了整個通話記錄處理功能的實現

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/81911.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/81911.shtml
英文地址,請注明出處:http://en.pswp.cn/web/81911.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Vue3處理number輸入框避免NaN

在 Vue3 中處理 number 類型輸入框避免顯示 NaN&#xff0c;核心在于正確處理用戶輸入的非數字值。以下是幾種解決方案&#xff1a; 方案1&#xff1a;使用字符串中轉 計算屬性&#xff08;推薦&#xff09; vue 復制 下載 <template><input v-model"input…

Python自動化之selenium語句——瀏覽器設置顯示尺寸、截圖、刷新網頁

目錄 一、瀏覽器設置最大化、最小化 1.瀏覽器最大化 2.瀏覽器最小化 二、瀏覽器打開的位置、尺寸 1.瀏覽器打開位置 2.瀏覽器打開尺寸 三、瀏覽器截圖 1.截圖語句 2.運行成功后查看 四、刷新網頁 上一節實現了打開瀏覽器、打開指定網址、關閉瀏覽器的操作&#xff0c…

model.classifier:分類頭

model.classifier:分類頭 分類頭(model.classifier)含義 在基于Transformer架構的模型(如BERT、GPT等 )用于分類任務時,“分類頭(model.classifier)” 是模型的一個重要組成部分。以Hugging Face的Transformers庫為例,許多預訓練模型在完成通用的預訓練任務(如語言…

4.1.2 操作數據集

在本實戰中&#xff0c;我們深入學習了Spark SQL的操作數據集&#xff0c;包括了解Spark會話、準備數據文件、啟動Spark Shell以及獲取和操作學生數據集。通過Spark Shell&#xff0c;我們可以直接使用SparkSession實例來加載、轉換和處理數據。我們學習了如何將文本文件加載為…

LangChain整合Milvus向量數據庫實戰:數據新增與刪除操作

導讀&#xff1a;在AI應用開發中&#xff0c;向量數據庫已成為處理大規模語義搜索和相似性匹配的核心組件。本文通過詳實的代碼示例&#xff0c;深入探討LangChain框架與Milvus向量數據庫的集成實踐&#xff0c;為開發者提供生產級別的向量數據管理解決方案。 文章聚焦于向量數…

從根源解決Augment免費額度限制問題:Windows詳細教程

從根源解決Augment免費額度限制問題&#xff1a;Windows詳細教程 本文將詳細介紹如何在Windows系統上解決Augment AI助手的"Too many free trials"限制問題&#xff0c;通過清理VS Code緩存和修改設備ID實現無限制使用Augment的方法。 視頻地址 augment從根源上解決免…

IoTDB 集成 DBeaver,簡易操作實現時序數據清晰管理

數據結構一目了然&#xff0c;跨庫分析輕松實現&#xff0c;方便 IoTDB “內部構造”管理&#xff01; 隨著物聯網場景對時序數據處理需求激增&#xff0c;時序數據庫與數據庫管理工具的集成尤為關鍵。作為數據資產的 “智能管家”&#xff0c;借助數據庫管理工具的可視化操作界…

應用層協議http(無代碼版)

目錄 認識URL urlencode 和 urldecode HTTP 協議請求與響應格式 HTTP 的請求方法 GET 方法 POST 方法 HTTP 的狀態碼 HTTP 常見 Header Location 關于 connection 報頭 HTTP版本 遠程連接服務器工具 setsockopt 我們來學習應用層協議http。 雖然我們說, 應用層協…

Cangjie 中的值類型與引用類型

1. 值類型和引用類型 1.1 值的存儲方式 所有變量在底層實現中&#xff0c;都會關聯一個具體的“值”&#xff0c;這個值可能存儲在 內存地址 或 寄存器 中。 寄存器用于優化常用變量的訪問速度。只有局部、小、頻繁使用的變量才更可能被分配到寄存器中。實際行為由編譯器根據…

使用el-input數字校驗,輸入漢字之后校驗取消不掉

先說說復現方式 本來input是只能輸入數字的&#xff0c;然后你不小心輸入了漢字&#xff0c;觸發校驗了&#xff0c;然后這時候&#xff0c;你發現校驗取消不掉了 就這樣了 咋辦啊&#xff0c;你一看校驗沒錯啊&#xff0c;各種number啥的也寫了,發現沒問題啊 <el-inputv…

使用 Zabbix 監控 MySQL 存儲空間和性能指標的完整實踐指南

目錄 引言 一、最終目標支持功能 二、監控方案設計 2.1 技術選型 2.2 設計思路 三、實現步驟 3.1 準備工作 3.11 創建 MySQL 監控賬號 3.12 配置 .my.cnf 文件 3.2 編寫統一腳本 3.3 配置 Zabbix Agent UserParameter 3.4 Zabbix 前端配置建議 四、總結 引言 MySQL …

多元素納米顆粒:開啟能源催化新紀元

在能源轉型的浪潮中&#xff0c;納米催化劑正成為推動能源技術突破的關鍵力量。多元素納米顆粒&#xff08;Polyelemental Nanoparticles&#xff09;憑借其獨特的元素協同效應&#xff0c;展現出在能源催化領域的巨大潛力。然而&#xff0c;合成這些復雜體系的納米顆粒面臨著諸…

鐵路行業數字化應用建設方案

數字化轉型面臨的挑戰 鐵路行業正處于數字化轉型的關鍵時期&#xff0c;鐵路行業應用場景復雜&#xff0c;數據量巨大&#xff0c;傳統信息化建設模式難以滿足日益增長的業務需求。鐵路企業亟需引入敏捷高效的數字化工具&#xff0c;加速推進業務創新&#xff0c;實現提質增效…

PlankAssembly 筆記 DeepWiki 正交視圖三維重建

manycore-research/PlankAssembly | DeepWiki PlankAssembly項目原理 這個項目是一個基于深度學習的3D重建系統&#xff0c;其核心原理是從三個正交視圖的工程圖紙中重建出3D形狀的結構化程序表示。 核心技術原理 1. 問題定義 PlankAssembly旨在從三個正交視圖的工程圖紙中…

分布式不同數據的一致性模型

1. 強一致性&#xff08;Strong Consistency&#xff09; 定義&#xff1a;所有節點在任何時間點看到的數據完全一致&#xff0c;讀操作總是返回最近的寫操作結果。特點&#xff1a; 寫操作完成后&#xff0c;所有后續讀操作都能立即看到更新。通常需要同步機制&#xff08;如…

C文件操作1

一、為什么使用文件 如果沒有文件&#xff0c;我們寫的程序的數據是存儲在電腦的內存中&#xff0c;如果程序退出&#xff0c;內存回收&#xff0c;數據就丟失 了&#xff0c;等再次運行程序&#xff0c;是看不到上次程序的數據的&#xff0c;如果要將數據進行持久化的保存&am…

Centos7.x內網環境Jenkins前端打包環境配置

Centos7.x內網環境Jenkins前端打包環境配置 參考地址&#xff1a; https://www.cnblogs.com/guangdelw/p/18763336 https://2048.csdn.net/682c1be8606a8318e857d687.html 前言&#xff1a;環境描述和目標 最近公司新接了一個項目&#xff0c;要求是&#xff1a;需要再桌面…

Hash 的工程優勢: port range 匹配

昨天和朋友聊到 “如何匹配一個 port range”&#xff0c;覺得挺有意思&#xff0c;簡單寫篇散文。 回想起十多年前&#xff0c;我移植并優化了 nf-HiPAC&#xff0c;當時還看不上 ipset hash&#xff0c;后來大約七八年前&#xff0c;我又舔 nftables&#xff0c;因為用它可直…

kafka學習筆記(三、消費者Consumer使用教程——使用實例及及核心流程源碼講解)

1.核心概念與架構 1.1.消費者與消費者組 Kafka消費者是訂閱主題&#xff08;Topic&#xff09;并拉取消息的客戶端實例&#xff0c;其核心邏輯通過KafkaConsumer類實現。消費者組&#xff08;Consumer Group&#xff09;是由多個邏輯關聯的消費者組成的集合。 核心規則 同一…

《java創世手記》---java基礎篇(下)

《Java 創世手記 - 基礎篇&#xff08;下&#xff09;》 第五章&#xff1a;契約與規范 —— 接口 (Interfaces) 與抽象類 (Abstract Classes) 造物主&#xff0c;在你日益繁榮的世界里&#xff0c;你發現僅僅依靠“繼承”來構建“物種體系”有時會遇到一些限制。比如&#x…