Spark Streaming的核心功能及其示例PySpark代碼

Spark Streaming是Apache Spark中用于實時流數據處理的模塊。以下是一些常見功能的實用PySpark代碼示例:

  1. 基礎流處理:從TCP套接字讀取數據并統計單詞數量
from pyspark import `SparkContext
from pyspark.streaming import StreamingContext# 創建SparkContext和StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)  # 1秒的批處理間隔# 創建一個DStream,從TCP源讀取數據
lines = ssc.socketTextStream("localhost", 9999)# 對每一行數據進行分詞,映射為(word, 1)的鍵值對,然后按單詞統計數量
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)# 打印每個RDD中的前10個元素
word_counts.pprint()# 啟動流計算
ssc.start()
# 等待流計算結束
ssc.awaitTermination()

在上述代碼中:

  • sc 是 SparkContext ,用于與Spark集群交互。
  • ssc 是 StreamingContext ,定義了批處理間隔。
  • lines 是一個 DStream ,從指定的TCP套接字讀取數據。
  • words 對每行數據進行分詞, word_counts 統計每個單詞出現的次數。
  • pprint 方法打印每個批次的前10個元素。
  1. 使用窗口函數
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "WindowedWordCount")
ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用窗口函數,窗口大小為3秒,滑動間隔為1秒
windowed_word_counts = word_counts.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 3, 1)windowed_word_counts.pprint()ssc.start()
ssc.awaitTermination()

在這個示例中:

  • reduceByKeyAndWindow 方法用于在窗口上進行聚合操作。
  • 第一個參數是用于合并窗口內元素的函數,第二個參數是用于移除窗口外元素的函數。
  1. 狀態更新
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "StatefulWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")  # 啟用檢查點def updateFunction(new_values, running_count):if running_count is None:running_count = 0return sum(new_values, running_count)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用updateStateByKey進行狀態更新
stateful_word_counts = word_counts.updateStateByKey(updateFunction)stateful_word_counts.pprint()ssc.start()
ssc.awaitTermination()

在上述代碼中:

  • updateStateByKey 方法用于維護每個鍵的狀態。
  • updateFunction 定義了如何根據新值和現有狀態更新狀態。
  1. 與Kafka集成
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilssc = SparkContext("local[2]", "KafkaWordCount")
ssc = StreamingContext(sc, 1)# Kafka參數
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topics = ["test"]# 創建Kafka輸入DStream
kvs = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)
lines = kvs.map(lambda x: x[1])words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)word_counts.pprint()ssc.start()
ssc.awaitTermination()

在這個示例中:

  • KafkaUtils.createDirectStream 用于從Kafka主題讀取數據。
  • kvs 是一個包含Kafka消息的DStream, lines 提取消息內容。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/66702.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/66702.shtml
英文地址,請注明出處:http://en.pswp.cn/web/66702.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

深度學習系列75:sql大模型工具vanna

1. 概述 vanna是一個可以將自然語言轉為sql的工具。簡單的demo如下: !pip install vanna import vanna from vanna.remote import VannaDefault vn VannaDefault(modelchinook, api_keyvanna.get_api_key(my-emailexample.com)) vn.connect_to_sqlite(https://va…

【線性代數】列主元法求矩陣的逆

列主元方法是一種用于求解矩陣逆的數值方法,特別適用于在計算機上實現。其基本思想是通過高斯消元法將矩陣轉換為上三角矩陣,然后通過回代求解矩陣的逆。以下是列主元方法求解矩陣 A A A 的逆的步驟: [精確算法] 列主元高斯消元法 步驟 1&am…

[0242-06].第06節:SpringBoot對SpringMVC的自動配置

SpringBoot學習大綱 一、基于SpringBoot搭建Web工程: 1.1.編碼實現步驟: a.創建SpringBoot項目 b.選中依賴:選中我們所需要的模塊 1.2.SSM中的WEB開發配置與SpringBoot中WEB開發自動配置對比: a.SSM中的WEB開發: 1…

【21】Word:德國旅游業務?

目錄 題目 NO1.2.3 NO4 NO5.6 NO7 NO8.9.10.11 題目 NO1.2.3 F12:另存為布局→頁面設置→頁邊距:上下左右選中“德國主要城市”→開始→字體對話框→字體/字號→文本效果:段落對話框→對齊方式/字符間距/段落間距 NO4 布局→表對話框…

什么是軟件架構

什么是軟件架構 程序員說,軟件架構是要決定編寫哪些C程序或OO類、使用哪些庫和框架 程序經理說,軟件架構就是模塊的劃分和接口的定義 系統分析員說,軟件架構就是為業務領域對象的關系建模 配置管理員說,軟件架構就是開發出來的…

1/20賽后總結

1/20賽后總結 T1『討論區管理員』的旅行 - BBC編程訓練營 算法:IDA* 分數:0 damn it! Ac_code走丟了~~(主要是沒有寫出來)~~ T2華強買瓜 - BBC編程訓練營 算法:雙向DFS或者DFS剪枝 分數:0 Ac_code…

大數據與AI驅動的商業查詢平臺:企業市場拓展的變革引擎?

在競爭白熱化的商業環境里,企業對準確市場信息的高效獲取能力,直接關系到業務拓展的成敗。商業查詢平臺借助大數據和人工智能技術,為企業提供精準客戶篩選、市場拓展分析以及風險評估服務,正逐漸成為企業市場開拓的得力助手。本文…

redis 各個模式的安裝

一、Redis單機安裝 1、安裝gcc依賴 Redis是C語言編寫的,編譯需要GCC。 Redis6.x.x版本支持了多線程,需要gcc的版本大于4.9,但是CentOS7的默認版本是4.8.5。 升級gcc版本: yum -y install centos-release-scl yum -y install d…

TiDB 的優勢與劣勢

TiDB 的優勢與劣勢 TiDB 作為一款新興的分布式數據庫,在業界逐漸嶄露頭角。它兼具傳統關系型數據庫的特性,又充分利用分布式架構的優勢。那么,TiDB 究竟有怎樣的優缺點呢?今天我們來聊聊 TiDB 的優勢與劣勢,幫你全面了…

藍橋杯算法日常|c\c++常用競賽函數總結備用

一、字符處理相關函數 大小寫判斷函數 islower和isupper:是C標準庫中的字符分類函數,用于檢查一個字符是否為小寫字母或大寫字母,需包含頭文件cctype.h(也可用萬能頭文件包含)。返回布爾類型值。例如: #…

微服務知識——4大主流微服務架構方案

文章目錄 1、微服務聚合模式2、微服務共享模式3、微服務代理模式4、微服務異步消息模式 微服務是大型架構的必經之路,也是大廠重點考察對象,下面我就重點詳解4大主流微服務架構方案。 1、微服務聚合模式 微服務聚合設計模式,解決了如何從多個…

【HTML+CSS】使用HTML與后端技術連接數據庫

目錄 一、概述 1.1 HTML前端 1.2 后端技術 1.3 數據庫 二、HTML表單示例 三、PHP后端示例 3.1 連接數據庫 3.2 接收數據并插入數據庫 四、安全性 4.1 防止SQL注入 4.2 數據驗證與清洗 五、優化 5.1 索引優化 5.2 查詢優化 六、現代Web開發中的最佳實踐 6.1 使用…

T-SQL語言的數據庫編程

T-SQL語言的數據庫編程 1. 引言 在信息化迅速發展的今天,數據庫已經成為數據管理和使用的重要工具。其中,T-SQL(Transact-SQL)作為微軟SQL Server的擴展SQL語言,不僅用于數據查詢和管理,還能夠進行復雜的…

通信協議—WebSocket

一、WebSocket編程概念 1.1 什么是WebSocket WebSocket 是一種全雙工通信協議,允許在客戶端(通常是瀏覽器)和服務器之間建立持久連接,以實現實時的雙向通信。它是 HTML5 標準的一部分,相比傳統的 HTTP 請求&#xff…

cadence筆記--畫PMU6050原理圖和封裝

簡介 本文主要介紹使用Cadence自己畫一個PMU6050的原理圖PCB的實際用例,Cadence使用的是24.1版本。 原理圖 首先獲取PMU6050引腳參數,使用立創商城查詢PMU6050型號,點擊數據手冊如下圖所示: 如下圖所示,左邊是原理圖&…

CSS3 3D 轉換介紹

CSS3 中的 3D 轉換提供了一種在二維屏幕上呈現三維效果的方式,主要包括translate3d、rotate3d、scale3d等轉換函數,下面來詳細介紹: 1. 3D 轉換的基本概念 坐標系 在 CSS3 的 3D 空間中,使用的是右手坐標系。X 軸是水平方向&…

Text2SQL 智能報表方案介紹

0 背景 Text2SQL智能報表方案旨在通過自然語言處理(NLP)技術,使用戶能夠以自然語言的形式提出問題,并自動生成相應的SQL查詢,從而獲取所需的數據報表,用戶可根據得到結果展示分析從而為結論提供支撐&#…

FFmpeg音視頻采集

文章目錄 音視頻采集音頻采集獲取設備信息錄制麥克風錄制聲卡 視頻采集攝像機畫面采集 音視頻采集 DirectShow(簡稱DShow)是一個Windows平臺上的流媒體框架,提供了高質量的多媒體流采集和回放功能,它支持多種多樣的媒體文件格式&…

【漫話機器學習系列】056.F1值(F1 score)

F1值(F1 Score) 定義 F1值是機器學習中一種用于評估模型性能的指標,特別適合用于 不平衡數據集 的分類任務。它是 精確率(Precision) 和 召回率(Recall) 的調和平均值。通過綜合考慮精確率和召…

Mac安裝Homebrew

目錄 安裝修改homeBrew源常用命令安裝卸載軟件升級軟件相關清理相關 安裝 官網 https://brew.sh/不推薦官網安裝方式(很慢很慢或者安裝失敗聯網失敗) 檢測是否安裝homebrewbrew -v執行安裝命令 蘋果電腦 常規安裝腳本 (推薦 完全體 幾分鐘就…