Spark-Streaming

找出所有有效數據,要求電話號碼為11位,但只要列中沒有空值就算有效數據。
按地址分類,輸出條數最多的前20個地址及其數據。
代碼講解:

導包和聲明對象,設置Spark配置對象和SparkContext對象。
使用Spark SQL語言進行數據處理,包括創建數據庫、數據表,導入數據文件,進行數據轉換。
篩選有效數據并存儲到新表中。
按地址分組并統計出現次數,排序并輸出前20個地址。

代碼如下

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject Demo {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Demo")val spark = SparkSession.builder().enableHiveSupport().config("spark.sql.warehouse.dir", "hdfs://node01:9000/user/hive/warehouse").config(sparkConf).getOrCreate()spark.sql(sqlText = "create database spark_sql_2")spark.sql(sqlText = "use spark_sql_2")//創建存放原始數據的表spark.sql("""|create table user_login_info(data string|row format delimited|""".stripMargin)spark.sql(sqlText = "load data local inpath 'Spark-SQL/input/user_login_info.json' into table user_login_info")//利用get_json_object將數據做轉換spark.sql("""|create table user_login_info_1|as|select get_json_object(data,'$.uid') as uid,|get_json_object(data,'$.phone') as phone,|get_json_object(data,'$.addr') as addr from user_login_info|""".stripMargin)spark.sql(sqlText = "select count(*) count from user_login_info_1").show()//獲取有效數據spark.sql("""|create table user_login_info_2|as|select * from user_login_info_1|where uid != ' ' and phone != ' ' and addr != ' '|""".stripMargin)spark.sql(sqlText = "select count(*) count from user_login_info_2").show()//獲取前20個地址spark.sql("""|create table hot_addr|as|select addr,count(addr) count from user_login_info_2|group by addr order by count desc limit 20|""".stripMargin)spark.sql(sqlText = "select * from hot_addr").show()spark.stop()}}


Spark Streaming介紹
Spark Streaming概述:

用于流式計算,處理實時數據流。
支持多種數據輸入源(如Kafka、Flume、Twitter、TCP套接字等)和輸出存儲位置(如HDFS、數據庫等)。
Spark Streaming特點:

易用性:支持Java、Python、Scala等編程語言,編寫實時計算程序如同編寫批處理程序。
容錯性:無需額外代碼和配置即可恢復丟失的數據,確保實時計算的可靠性。
整合性:可以在Spark上運行,允許重復使用相關代碼進行批處理,實現交互式查詢操作。
Spark Streaming架構:

驅動程序(StreamingContext)處理數據并傳給SparkContext。
工作節點接收和處理數據,執行任務并備份數據到其他節點。
背壓機制協調數據接收能力和資源處理能力,避免數據堆積和資源浪費。
Spark Streaming實操
詞頻統計案例:

使用ipad工具向999端口發送數據,Spark Streaming讀取端口數據并統計單詞出現次數。
代碼配置包括設置關鍵對象、接收TCP套接字數據、扁平化處理、累加相同鍵值對、分組統計詞頻。
啟動和運行:

啟動netpad發送數據,Spark Streaming每隔三秒收集和處理數據。
代碼中沒有顯式關閉狀態,流式計算默認持續運行,確保數據處理不間斷。
DStream創建
DStream創建方式:

RDD隊列:通過SSC創建RDD隊列,將RDD推送到隊列中作為DStream處理。
自定義數據源:下節課詳細講解。
RDD隊列案例:

循環創建多個RDD并推送到隊列中,使用Spark Streaming處理RDD隊列進行詞頻統計。
代碼包括配置對象、創建可變隊列、轉換RDD為DStream、累加和分組統計詞頻。
代碼如下

import org.apache.spark.SparkConfobject WordCount {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))val lineStreams = ssc.socketTextStream("node01",9999)val wordStreams = lineStreams.flatMap(_.split(" "))val wordAndOneStreams = wordStreams.map((_,1))val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)wordAndCountStreams.print()ssc.start()ssc.awaitTermination()}}


結果展示:


展示了詞頻統計的結果,驗證了Spark Streaming的正確性和有效性。

自定義數據源的實現

需要導入新的函數并繼承現有的函數。
創建數據源時需選擇class而不是object。
在class中定義on start和on stop方法,并在這些方法中實現具體的功能。
類的定義和初始化

類的定義包括數據類型的設定,如端口號和TCP名稱。
使用extends關鍵字繼承父類的方法。
數據存儲類型設定為內存中保存。
數據接收和處理

在on start方法中創建新線程并調用接收數據的方法。
連接到指定的主機和端口號,創建輸入流并轉換為字符流。
逐行讀取數據并寫入到spark stream中,進行詞頻統計。
數據扁平化和詞頻統計

使用block map進行數據扁平化處理。
將原始數據轉換為鍵值對形式,并根據相同鍵進行分組和累加。
輸出詞頻統計結果。
程序終止條件

設定手動終止和程序異常時的終止條件。
在滿足終止條件時輸出結果并終止程序。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/902575.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/902575.shtml
英文地址,請注明出處:http://en.pswp.cn/news/902575.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Sentinel源碼—9.限流算法的實現對比一

大綱 1.漏桶算法的實現對比 (1)普通思路的漏桶算法實現 (2)節省線程的漏桶算法實現 (3)Sentinel中的漏桶算法實現 (4)Sentinel中的漏桶算法與普通漏桶算法的區別 (5)Sentinel中的漏桶算法存在的問題 2.令牌桶算法的實現對比 (1)普通思路的令牌桶算法實現 (2)節省線程的…

Redis 詳解:安裝、數據類型、事務、配置、持久化、訂閱/發布、主從復制、哨兵機制、緩存

目錄 Redis 安裝與數據類型 安裝指南 Windows Linux 性能測試 基本知識 數據類型 String List(雙向列表) Set(集合) Hash(哈希) Zset(有序集合) 高級功能 地理位置&am…

Docker配置帶證書的遠程訪問監聽

一、生成證書和密鑰 1、準備證書目錄和生成CA證書 # 創建證書目錄 mkdir -p /etc/docker/tls cd /etc/docker/tls # 生成CA密鑰和證書 openssl req -x509 -newkey rsa:4096 -keyout ca-key.pem \ -out ca-cert.pem -days 365 -nodes -subj "/CNDocker CA" 2、為…

MCP接入方式介紹

上一篇文章,我們介紹了MCP是什么以及MCP的使用。 MCP是什么,MCP的使用 接下來,我們來詳細介紹一下MCP的接入 先看官網的架構圖 上圖的MCP 服務 A、MCP 服務 B、MCP 服務 C是可以運行在你的本地計算機(本地服務器方式&#xff…

關于Agent的簡單構建和分享

前言:Agent 具備自主性、環境感知能力和決策執行能力,能夠根據環境的變化自動調整行為,以實現特定的目標。 一、Agent 的原理 Agent(智能體)被提出時,具有四大能力 感知、分析、決策和執行。是一種能夠在特定環境中自主行動、感…

Gitlab runner 安裝和注冊

Gitlab Runner GitLab Runner是一個用于運行GitLab CI/CD流水線作業的軟件包,由GitLab官方開發,完全開源。你可以在很多主流的系統環境或平臺上安裝它,如Linux、macOS、Windows和Kubernetes。如果你熟悉Jenkins 的話,你可以把它…

精益數據分析(18/126):權衡數據運用,精準把握創業方向

精益數據分析(18/126):權衡數據運用,精準把握創業方向 大家好!一直以來,我都希望能和大家在創業與數據分析的領域共同探索、共同進步。今天,我們繼續深入研讀《精益數據分析》,探討…

Git技術詳解:從核心原理到實際應用

Git技術詳解:從核心原理到實際應用 一、Git的本質與核心價值 Git是由Linux之父Linus Torvalds在2005年開發的分布式版本控制系統,其核心功能是通過記錄文件變更歷史,幫助開發者實現以下目標: 版本回溯:隨時恢復到項…

Java從入門到“放棄”(精通)之旅——String類⑩

Java從入門到“放棄”(精通)之旅🚀——String類⑩ 前言 在Java編程中,String類是最常用也是最重要的類之一。無論是日常開發還是面試,對String類的深入理解都是必不可少的。 1. String類的重要性 在C語言中&#xf…

抓取淘寶數據RPA--影刀

最近用了一下RPA軟件,挑了影刀,發現很無腦也很簡單,其語法大概是JAVA和PYTHON的混合體,如果懂爬蟲的話,學這個軟件就快的很,看了一下官方的教程,對于有基礎的人來說很有點枯燥,但又不…

docker部署seafile修改默認端口并安裝配置onlyoffice實現在線編輯

背景 有很多場景會用到類似seafile功能的需求,比如: 在內網中傳輸和共享文件個人部署私人網盤文檔協同在線編輯寫筆記… 這些功能seafile均有實現,并且社區版提供的功能基本可以滿足個人或者小型團隊的日常需求 問題 由于主機的80和443端…

計算機視覺cv2入門之視頻處理

在我們進行計算機視覺任務時,經常會對視頻中的圖像進行操作,這里我來給大家分享一下,cv2對視頻文件的操作方法。這里我們主要介紹cv2.VideoCapture函數的基本使用方法。 cv2.VideoCapture函數 當我們在使用cv2.VideoCapture函數時&#xff…

Linux之徹底掌握防火墻-----安全管理詳解

—— 小 峰 編 程 目錄: 一、防火墻作用 二、防火墻分類 1、邏輯上劃分:大體分為 主機防火墻 和 網絡防火墻 2、物理上劃分: 硬件防火墻 和 軟件防火墻 三、硬件防火墻 四、軟件防火墻 五、iptables 1、iptables的介紹 2、netfilter/…

python項目實戰-后端個人博客系統

本文分享一個基于 Flask 框架開發的個人博客系統后端項目,涵蓋用戶注冊登錄、文章發布、分類管理、評論功能等核心模塊。適合初學者學習和中小型博客系統開發。 一、項目結構 blog │ app.py │ forms.py │ models.py │ ├───instance │ blog.d…

Unity 接入阿里的全模態大模型Qwen2.5-Omni

1 參考 根據B站up主陰沉的怪咖 開源的項目的基礎上修改接入 AI二次元老婆開源項目地址(unity-AI-Chat-Toolkit): Github地址:https://github.com/zhangliwei7758/unity-AI-Chat-Toolkit Gitee地址:https://gitee.com/DammonSpace/unity-ai-chat-too…

第十五屆藍橋杯 2024 C/C++組 合法密碼

目錄 題目: 題目描述: 題目鏈接: 思路: substr函數: 思路詳解: 代碼: 代碼詳解; 題目: 題目描述: 題目鏈接: P10906 [藍橋杯 2024 國 B] 合法密碼 -…

NoSQL 簡單講解

目錄 1. NoSQL 的背景與意義 1.1 數據庫的演變 1.2 NoSQL 的興起 2. NoSQL 數據庫的分類 2.1 鍵值存儲(Key-Value Stores) 2.2 文檔數據庫(Document Stores) 2.3 列族存儲(Column-Family Stores) 2.…

122.在 Vue3 中使用 OpenLayers 實現圖層層級控制(zIndex)顯示與設置詳解

?? 作者:彭麒 ?? 郵箱:1062470959@qq.com ?? 聲明:本文源碼歸吉檀迦俐所有,歡迎學習借鑒,如用于商業項目請注明出處 ?? ?? 技術棧:Vue 3 + Composition API + OpenLayers 6+ + Element Plus + Tailwind CSS ?? 一、什么是 zIndex(圖層層級)? 在地圖開發中…

車載測試用例開發-如何平衡用例覆蓋度和測試效率的方法論

1 摘要 在進行車載測試用例編寫時,會遇到多個條件導致用例排列組合爆炸的情況,但是為了產品測試質量,我們又不得不保證用例設計的需求覆蓋度,這樣又會使得測試周期非常長。我們如何平衡效率和測試質量?本文進行了一些…

AI——神經網絡以及TensorFlow使用

文章目錄 一、TensorFlow安裝二、張量、變量及其操作1、張量Tensor2、變量 三、tf.keras介紹1、使用tf.keras構建我們的模型2、激活函數1、sigmoid/logistics函數2、tanh函數3、RELU函數4、LeakReLu5、SoftMax6、如何選擇激活函數 3、參數初始化1、bias偏置初始化2、weight權重…