使用 PySpark 從 Kafka 讀取數據流并處理為表

使用 PySpark 從 Kafka 讀取數據流并處理為表

下面是一個完整的指南,展示如何通過 PySpark 從 Kafka 消費數據流,并將其處理為可以執行 SQL 查詢的表。

1. 環境準備

確保已安裝:

  • Apache Spark (包含Spark SQL和Spark Streaming)
  • Kafka
  • PySpark
  • 對應的Kafka連接器 (通常已包含在Spark發行版中)

2. 完整代碼示例

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, expr
from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 初始化SparkSession,啟用Kafka支持
spark = SparkSession.builder \.appName("KafkaToSparkSQL") \.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \.getOrCreate()# 定義數據的schema (根據你的實際數據結構調整)
schema = StructType([StructField("user_id", StringType()),StructField("item_id", StringType()),StructField("price", IntegerType()),StructField("timestamp", StringType())
])# 1. 從Kafka讀取數據流
kafka_df = spark.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "your_topic_name") \.option("startingOffsets", "latest") \.load()# 2. 將Kafka的value從二進制轉為字符串,然后解析JSON
parsed_df = kafka_df \.selectExpr("CAST(value AS STRING)") \.select(from_json(col("value"), schema).alias("data")) \.select("data.*")# 3. 注冊為臨時視圖以便執行SQL查詢
def process_batch(df, epoch_id):# 注冊為臨時視圖df.createOrReplaceTempView("kafka_stream_table")# 執行SQL查詢result_df = spark.sql("""SELECT user_id, item_id, price,timestamp,COUNT(*) OVER (PARTITION BY user_id) as user_purchase_countFROM kafka_stream_tableWHERE price > 100""")# 輸出結果 (可根據需要改為其他sink)result_df.show(truncate=False)# 4. 啟動流處理
query = parsed_df.writeStream \.foreachBatch(process_batch) \.outputMode("update") \.start()# 5. 等待終止
query.awaitTermination()

3. 詳細步驟說明

3.1 配置Kafka連接

.option("kafka.bootstrap.servers", "localhost:9092")  # Kafka broker地址
.option("subscribe", "your_topic_name")              # 訂閱的topic
.option("startingOffsets", "latest")                 # 從最新offset開始

3.2 數據解析

  1. Kafka消息的value通常是JSON格式的二進制數據
  2. 先轉換為字符串,再用定義好的schema解析為結構化數據

3.3 流式表處理

  • 使用foreachBatch可以在每個微批次中將DataFrame注冊為臨時表
  • 然后在臨時表上執行SQL查詢
  • 這種模式稱為"Stream-static join"

3.4 輸出模式

.outputMode("update") 表示只輸出有變化的行,其他選項包括:

  • append: 只添加新行
  • complete: 輸出所有結果(用于聚合操作)

4. 高級用法

4.1 與靜態表join

# 假設有一個靜態的user_profile表
user_profile_df = spark.read.parquet("hdfs://path/to/user_profiles")
user_profile_df.createOrReplaceTempView("user_profiles")# 在process_batch函數中可以這樣join
result_df = spark.sql("""SELECT k.user_id, u.user_name,k.item_id, k.priceFROM kafka_stream_table kJOIN user_profiles u ON k.user_id = u.user_id
""")

4.2 窗口聚合

result_df = spark.sql("""SELECT user_id,window(timestamp, '5 minutes') as window,SUM(price) as total_spent,COUNT(*) as purchase_countFROM kafka_stream_tableGROUP BY user_id, window(timestamp, '5 minutes')
""")

4.3 寫入其他系統

# 寫入Hive表
query = result_df.writeStream \.outputMode("complete") \.format("hive") \.option("checkpointLocation", "/path/to/checkpoint") \.start()# 或寫入Kafka
query = result_df.writeStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("topic", "output_topic") \.start()

5. 部署注意事項

  1. 確保所有worker節點都能訪問Kafka集群
  2. 設置合理的checkpoint位置以支持故障恢復
  3. 根據數據量調整微批處理間隔:
    .trigger(processingTime='10 seconds')  # 每10秒處理一次
    
  4. 監控消費延遲:
    .option("maxOffsetsPerTrigger", 10000)  # 每次最多處理10000條
    

6. 性能優化建議

  1. 使用更高效的序列化格式(Avro/Protobuf代替JSON)
  2. 增加分區數提高并行度
  3. 調整spark.sql.shuffle.partitions參數
  4. 對頻繁查詢的字段建立索引(如果使用Delta Lake等)

希望這個完整指南能幫助你實現從Kafka到Spark SQL的流式處理!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/81577.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/81577.shtml
英文地址,請注明出處:http://en.pswp.cn/web/81577.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

第十天的嘗試

目錄 一、每日一言 二、練習題 三、效果展示 四、下次題目 五、總結 一、每日一言 哈哈,十天缺了兩天,我寫的文章現在質量不高,所以我可能考慮,應該一星期或者三四天出點高質量的文章,同時很開心大家能夠學到知識&a…

mediapipe標注視頻姿態關鍵點(基礎版加進階版)

前言 手語視頻流的識別有兩種大的分類,一種是直接將視頻輸入進網絡,一種是識別了關鍵點之后再進入網絡。所以這篇文章我就要來講講如何用mediapipe對手語視頻進行關鍵點標注。 代碼 需要直接使用代碼的,我就放這里了。環境自己配置一下吧&…

Redis數據遷移方案及持久化機制詳解

#作者:任少近 文章目錄 前言Redis的持久化機制RDBAOF Redis save和bgsave的區別redis數據遷移redis單機-單機數據遷移redis 主從-主從數據遷移redis 單機-cluster數據遷移redis cluster –redis cluster數據遷移 前言 Redis數據遷移是常見需求,主要包括…

圖論回溯

圖論 200.島嶼數量DFS 給你一個由 ‘1’(陸地)和 ‘0’(水)組成的的二維網格,請你計算網格中島嶼的數量。島嶼總是被水包圍,并且每座島嶼只能由水平方向和/或豎直方向上相鄰的陸地連接形成。此外&#xff…

真實網絡項目中交換機常用的配置與解析

一、配置三層鏈路聚合增加鏈路帶寬 1.組網需求 某企業有多個部門分布在不同的地區,由于業務發展的需要,不同區域的部門與部門之間有進行帶有VLAN Tag的報文的傳輸需求。采用透明網橋的遠程橋接和QinQ功能,可以實現企業在不同區域部門之間進…

【Redis】過期鍵刪除策略,LRU和LFU在redis中的實現,緩存與數據庫雙寫一致性問題,go案例

一、Redis 中的過期鍵刪除策略有哪些? 采用了 惰性刪除 和 定期刪除 兩種策略處理過期鍵: 1. 惰性刪除(Lazy Deletion) 機制:只有在訪問 key 時才檢查是否過期,如果已過期則立刻刪除。優點:對…

為什么單張表索引數量建議控制在 6 個以內

單張表索引數量建議控制在6個以內的主要原因包括以下幾點?: ?性能影響?:索引會占用額外的磁盤空間。如果索引數量過多,會占用大量的磁盤空間,尤其是在數據量較大的情況下,索引占用的空間可能會超過數據本身。此外&…

深度學習實戰109-智能醫療隨訪與健康管理系統:基于Qwen3(32B)、LangChain框架、MCP協議和RAG技術研發

大家好,我是微學AI,今天給大家介紹一下深度學習實戰109-智能醫療隨訪與健康管理系統:基于Qwen3(32B)、LangChain框架、MCP協議和RAG技術研發。在當今醫療信息化快速發展的背景下,醫療隨訪與健康管理面臨著數據分散、信息整合困難、個性化方案生成效率低等挑戰。傳統的醫療隨…

聊一聊 .NET Dump 中的 Linux信號機制

一:背景 1. 講故事 當 .NET程序 在Linux上崩潰時,我們可以配置一些參考拿到對應程序的core文件,拿到core文件后用windbg打開,往往會看到這樣的一句信息 Signal SIGABRT code SI_USER (Sent by kill, sigsend, raise)&#xff0c…

如何在uniapp H5中實現路由守衛

目錄 Vue3 app.config.globalProperties 1. 創建 Vue 應用實例 2. 添加全局屬性或方法 3. 在組件中使用全局屬性或方法 beforeEach在uniapp的注冊 1、在H5中這兩個對象是都存在的。「router:route」但是功能并不全面,具體可參考下圖。 2、剛剛測試了一下,在微信小程序…

無人機降落傘設計要點難點及原理!

一、設計要點 1. 傘體結構與折疊方式 傘體需采用輕量化且高強度的材料(如抗撕裂尼龍或芳綸纖維),并通過多重折疊設計(如三重折疊縫合)減少展開時的阻力,同時增強局部承力區域的強度。 傘衣的幾何參數&am…

AI時代新詞-AI增強現實(AI - Enhanced Reality)

一、什么是AI增強現實(AI - Enhanced Reality)? AI增強現實(AI - Enhanced Reality)是指將人工智能(AI)技術與增強現實(Augmented Reality,簡稱AR)技術相結合…

基于Matlab實現各種光譜數據預處理

在IT領域,尤其是在數據分析和科學研究中,光譜數據的預處理是至關重要的步驟。光譜數據通常包含了豐富的信息,但往往受到噪聲、雜散光、背景信號等因素的影響,需要通過預處理來提取有效信號,提高分析的準確性和可靠性。…

用 commitizen-go 來實現標準化你的Git提交信息 【windows 版】

前言 團隊中有部分人的 commit 信息比較隨意,因此想用工具來進行約束, web 項目可以使用 commitizen 來實現, 但是 golang 又該用什么來約束呢, 在 Github 上找到 commitizen-go 可以做為 commitizen 平替,但該說明文…

為什么共現矩陣是高維稀疏的

為什么共現矩陣是高維稀疏的? 共現矩陣(Co-occurrence Matrix)的高維稀疏性是其固有特性,主要由以下原因導致: 1. 高維性的根本原因 詞匯表大小決定維度: 共現矩陣的維度為 ( V \times V ),其…

OpenLayers 加載鼠標位置控件

注:當前使用的是 ol 5.3.0 版本,天地圖使用的key請到天地圖官網申請,并替換為自己的key 地圖控件是一些用來與地圖進行簡單交互的工具,地圖庫預先封裝好,可以供開發者直接使用。OpenLayers具有大部分常用的控件&#x…

知識宇宙-學習篇:學編程為什么從C語言開始學起?

名人說:博觀而約取,厚積而薄發。——蘇軾《稼說送張琥》 創作者:Code_流蘇(CSDN)(一個喜歡古詩詞和編程的Coder😊) 目錄 一、C語言的歷史地位與影響力1. 編程語言的"鼻祖"2. 現代技術的基礎 二、…

手機IP地址更換的影響與操作指南

在移動互聯網時代,IP地址如同手機的“網絡身份證”,其變更可能對上網體驗、隱私安全及服務訪問產生連鎖反應。無論是為了繞過地域限制、保護隱私,還是解決網絡沖突,了解IP更換的影響與正確操作方法都至關重要。本文將系統分析影響…

基于Alibaba Cloud Linux + 寶塔面板安裝 LibreOffice 全攻略流程

LibreOffice 是一款功能強大的辦公軟件,默認使用開放文檔格式 (OpenDocument Format , ODF), 并支持 *.docx, *.xlsx, *.pptx 等其他格式。 官網:https://www.libreoffice.org/ 或 https://zh-cn.libreoffice.org/ Alibaba Cloud Linux 3(Soaring Falcon) 是阿里云自主研發…

UniApp 微信小程序綁定動態樣式 :style 避坑指南

在使用 UniApp 開發跨端應用時,綁定動態樣式 :style 是非常常見的操作。然而,很多開發者在編譯為 微信小程序 時會遇到一個奇怪的問題: 原本在 H5 中可以正常渲染的樣式,在微信小程序中卻不生效! 讓我們通過一個示例來…