生產環境Spark Structured Streaming實時數據處理應用實踐分享

封面圖片

生產環境Spark Structured Streaming實時數據處理應用實踐分享

一、業務場景描述

我們所在的電商平臺需要實時監控用戶行為數據(如點擊、下單、支付等),基于事件級別的流式數據進行實時統計、會話聚合、漏斗分析,并將結果推送到Dashboard和報表存儲。原有系統使用的Storm+Kafka方案在高并發時存在容錯難、狀態管理復雜、維護成本高的問題。

核心需求:

  • 低延遲:端到端處理延遲控制在2秒以內。
  • 可伸縮:能水平擴展,應對峰值10萬條/秒消息吞吐。
  • 容錯性:任務失敗自動重啟且保證端到端數據不丟失。
  • 狀態管理:支持有狀態聚合(窗口、會話)和超大狀態存儲。

二、技術選型過程

我們對主流實時計算框架進行了對比:

| 框架 | 延遲 | 狀態管理 | 易用性 | 擴展性 | 社區成熟度 | | ---- | ---- | ---- | ---- | ---- | ---- | | Apache Storm | 500ms~1s | 需自行實現State Store | 開發復雜 | 高 | 高 | | Apache Flink | 200ms~500ms | 內置強大狀態管理 | 編程模型復雜 | 高 | 高 | | Spark Structured Streaming | 1s~2s | 使用Checkpoint and WAL,可容錯 | API友好,基于Spark SQL | 高 | 高 | | Apache Kafka Streams | <1s | 基于RocksDB,狀態管理受限 | 與Kafka耦合高 | 中 | 中 |

綜合考慮團隊技術棧和運維成本,我們最終選定Spark Structured Streaming:

  • 與現有Spark Batch集群共用資源。
  • 編程模型統一,SQL/DS/Lambda API支持靈活。
  • Checkpoint與WAL機制簡化狀態管理,集成HDFS持久化狀態。

三、實現方案詳解

3.1 項目結構

├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com.company.streaming
│   │   │       ├── App.java
│   │   │       └── utils
│   │   │           └── KafkaOffsetManager.java
│   │   └── resources
│   │       └── application.conf
└── README.md

3.2 核心配置(application.conf)

spark.app.name=RealTimeUserBehavior
spark.master=yarn
spark.sql.shuffle.partitions=200
spark.streaming.backpressure.enabled=true
spark.checkpoint.dir=hdfs://namenode:8020/app/checkpoints/structured-streaming
kafka.bootstrap.servers=broker1:9092,broker2:9092
kafka.topic.user=topic_user_behavior
kafka.group.id=user_behavior_group

3.3 主入口代碼(App.java)

package com.company.streaming;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.Trigger;public class App {public static void main(String[] args) throws Exception {SparkSession spark = SparkSession.builder().appName("RealTimeUserBehavior").getOrCreate();// 從Kafka讀取原始數據Dataset<Row> raw = spark.readStream().format("kafka").option("kafka.bootstrap.servers", spark.sparkContext().getConf().get("kafka.bootstrap.servers")).option("subscribe", spark.sparkContext().getConf().get("kafka.topic.user")).option("startingOffsets", "latest").load();// 解析JSON并選取字段Dataset<Row> userEvents = raw.selectExpr("CAST(value AS STRING) as json").select(org.apache.spark.sql.functions.from_json(org.apache.spark.sql.functions.col("json"),DataSchema.eventSchema()).as("data")).select("data.*");// 實時會話聚合:10分鐘無操作認為會話結束Dataset<Row> sessions = userEvents.withWatermark("eventTime", "2 minutes").groupBy(org.apache.spark.sql.functions.window(org.apache.spark.sql.functions.col("eventTime"),"10 minutes", "5 minutes"),org.apache.spark.sql.functions.col("userId")).agg(org.apache.spark.sql.functions.count("eventType").alias("eventCount"),org.apache.spark.sql.functions.min("eventTime").alias("startTime"),org.apache.spark.sql.functions.max("eventTime").alias("endTime"));// 輸出到HDFS OR 更新到外部系統sessions.writeStream().outputMode(OutputMode.Update()).trigger(Trigger.ProcessingTime("30 seconds")).option("path", "hdfs://namenode:8020/app/output/user_sessions").option("checkpointLocation", spark.sparkContext().getConf().get("spark.checkpoint.dir") + "/sessions").start().awaitTermination();}
}

3.4 關鍵工具類(KafkaOffsetManager.java)

package com.company.streaming.utils;// 省略:管理Kafka手動提交offset、讀寫Zookeeper存儲偏移量

四、踩過的坑與解決方案

  1. 狀態膨脹導致Checkpoint文件過大:

    • 方案:定期做State TTL清理,結合Spark 3.1.1+的state cleanup策略。
  2. Kafka消費位點重復或丟失:

    • 方案:使用KafkaOffsetManager手動管理,結合冪等寫入目標系統保證At-Least-Once語義。
  3. 延遲抖動:

    • 方案:開啟backpressure,限制最大并行度,并合理調整Trigger頻率。
  4. Driver內存溢出:

    • 方案:提升driver內存,拆分業務流程;或將部分輕量計算遷移至Executors。

五、總結與最佳實踐

  • 合理規劃Checkpoint和WAL存儲目錄,避免與業務數據混淆。
  • 利用Spark監控UI實時觀察批次時長、shuffle寫入、延遲指標。
  • 結合PeriodicStateCleanup+Watermark確保有狀態算子狀態可控。
  • 抽象共通工具類(KafkaOffsetManager、JSON解析、公用Schema),提高代碼可維護性。
  • 復雜業務可拆分成多個流式子作業,下游合并結果,增強可擴展性。

通過以上實踐,我們成功將平臺數據實時處理延遲穩定在1.2秒左右,作業穩定運行10+節點集群一個季度零故障。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/97593.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/97593.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/97593.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

海康相機開發---HCNetSDK

HCNetSDK&#xff08;Hikvision Network Software Development Kit&#xff09;是海康威視專為旗下安防監控設備打造的二次開發工具包&#xff0c;是連接上層應用與海康設備的核心橋梁。其封裝了設備底層通信協議&#xff08;包括私有協議與部分標準協議&#xff09;&#xff0…

構建無廣告私人圖書館Reader與cpolar讓電子書庫隨身攜帶

文章目錄前言&#xff1a;告別書荒&#xff0c;拯救靈魂的“摸魚神器”1、關于Reader&#xff1a;小而美的開源在線閱讀器2、Docker部署3、簡單使用reader和添加書源4.群暉安裝Cpolar工具5.創建reader閱讀器的公網地址6.配置固定公網地址前言&#xff1a;告別書荒&#xff0c;拯…

amd cpu是x86架構嗎

是的&#xff0c;AMD CPU屬于x86架構?&#xff0c;其64位擴展&#xff08;x86-64&#xff09;最初由AMD設計并成為行業標準。? ?AMD與x86架構的關系? ?技術淵源?&#xff1a;AMD自1976年起通過技術授權成為x86架構的合法制造商&#xff0c;與英特爾共同主導x86市場。2003…

vercel上線資源無法加載

背景&#xff1a;在本地跑開發服務器沒問題&#xff0c;但是部署到 vercel 上就有問題上一次出現類似問題是在更新游戲引擎方法后本地可以跑但是上線沒有成功&#xff0c;當時是因為 runner.html 是在部署時通過腳本從遠端倉庫拉取的&#xff0c;所以解決方案&#xff1a;1.更新…

Node.js 的模塊化規范是什么?CommonJS 和 ES6 模塊有什么區別?

目錄 一、為什么需要模塊化&#xff1f; 二、Node.js 的模塊化規范 三、CommonJS 模塊化 1. 基本語法 2. 特點 3. 缺點 四、ES6 模塊&#xff08;ESM&#xff09; 1. 基本語法 2. 特點 3. 在 Node.js 中的使用 五、CommonJS 和 ES6 模塊的區別 六、實際開發中的選擇…

設計模式:代理模式(Proxy Pattern)

文章目錄一、代理模式的定義二、實例分析三、示例代碼一、代理模式的定義 代理模式是一種結構型設計模式&#xff0c;它為某個對象提供一個代理或占位符&#xff0c;以控制對這個對象的訪問。簡單來說代理對象在客戶端和目標對象之間起到中介作用&#xff0c;客戶端并不會直接操…

數據類型序列化-封裝

/// <summary> /// 定義泛型接口 /// </summary> /// <typeparam name"T">T</typeparam> public interface ISettingValue<T> {/// <summary>/// value/// </summary>T DoubleValue { get; }/// <summary>/// key//…

PitVis-2023挑戰賽:內鏡下垂體瘤手術視頻中的手術流程識別|文獻速遞-深度學習人工智能醫療圖像

Title題目PitVis-2023 challenge: Workflow recognition in videos of endoscopic pituitary surgeryPitVis-2023挑戰賽&#xff1a;內鏡下垂體瘤手術視頻中的手術流程識別01文獻速遞介紹內鏡視覺挑戰賽與PitVis-2023挑戰賽背景及核心內容 “內鏡視覺&#xff08;EndoVis&#…

2025年8月個人工作生活總結

本文為 2025年8月工作生活總結。研發編碼 無處不在的AI 現在很多地方都在推AI&#xff0c;廣西的人工智能走在前列&#xff0c;要賦能各行各業。至于我&#xff0c;主要就是在寫點代碼&#xff0c;寫點交差的文檔。其實現在我已經有點分析哪些代碼哪些文字是AI寫的了。我工作用…

Dubbo常見面試題

1、默認使用的是什么通信框架&#xff0c;還有別的選擇嗎? 默認也推薦使用netty框架&#xff0c;還有mina。 2、服務調用是阻塞的嗎&#xff1f; 默認是阻塞的&#xff0c;可以異步調用&#xff0c;沒有返回值的可以這么做。 3、一般使用什么注冊中心&#xff1f;還有別的…

簡單的加密算法

// 加密函數&#xff08;32位版本&#xff09; //這里的 data 是ID&#xff0c; dword encrypt(dword data, dword key, int shift) {data ^ key; // 第一步&#xff1a;異或混淆// 循環左移&#xff08;shift范圍1-31&#xff09;return (data << sh…

升級的MS9125S USB投屏控制芯片(VGAHD輸出)

MS9125S是一款USB單芯片投屏器&#xff0c;內部集成了USB 2.0控制器和數據收發模塊、視頻DAC、HD接口和音視頻處理模塊&#xff0c;支持壓縮視頻傳輸。MS9125S可以通過USB接口顯示或者擴展PC、智能手機、平板電腦的顯示信息到更大尺寸的顯示設備上&#xff0c;支持VGA和HD視頻接…

求歐拉回路:Hierholzer算法圖解模擬

代碼模板&#xff1a;List<Integer> resultList new ArrayList<>();List<Integer> hierholzer() {dfs(0);resultList.add(0);// 數組反轉Collections.reverse(resultList);return resultList; }void dfs(int start) {for(int end : G[start]) {if(!vis[star…

Kafka面試精講 Day 2:Topic、Partition與Replica機制

【Kafka面試精講 Day 2】Topic、Partition與Replica機制 在“Kafka面試精講”系列的第二天&#xff0c;我們將深入剖析Kafka最核心的三大數據組織機制&#xff1a;Topic&#xff08;主題&#xff09;、Partition&#xff08;分區&#xff09;與Replica&#xff08;副本&#x…

【備戰2025數模國賽】(三)數模常見賽題類型及解決辦法

在進行數學建模競賽時&#xff0c;很多同學面臨的第一個挑戰是如何對賽題進行歸類&#xff0c;并選擇合適的模型。本篇梳理了數學建模中最常見的幾類賽題&#xff0c;并針對每類題型提供了基本的解決思路&#xff0c;幫助大家快速選擇合適的解題方法&#xff0c;高效完成模型構…

LabVIEW測斜設備承壓試驗臺

為保障煤礦井下地質勘探鉆孔中測斜裝備的可靠運行&#xff0c;設計基于 LabVIEW的鉆孔測斜設備承壓性能試驗臺。該試驗臺以氣動增壓泵為壓力執行元件&#xff0c;結合虛擬儀器與 PLC 控制技術&#xff0c;可精準模擬井下壓力環境&#xff0c;完成水壓、疲勞等試驗&#xff0c;實…

四、練習1:Git基礎操作

練習1&#xff1a;Git基礎操作 練習目標 通過實際操作掌握Git的基本命令&#xff0c;包括初始化倉庫、添加文件、提交更改等。 練習步驟 步驟1&#xff1a;環境準備 確保已安裝Git配置用戶信息&#xff08;如果未配置&#xff09; # 檢查Git版本 git --version# 配置用戶信息 g…

RK3399內核驅動實戰:獲取設備號控制LED的四種方法(由淺入深、代碼注釋詳盡)

RK3399 內核驅動實戰&#xff1a;獲取設備號控制 LED 的四種方法&#xff08;由淺入深、代碼注釋詳盡&#xff09; 在 Linux 字符設備驅動開發中&#xff0c;設備號&#xff08;major minor&#xff09;是內核與用戶空間溝通的橋梁。文章圍繞設備號這一條線展開&#xff0c;從…

2025年AI智能體開源技術棧全面解析:從基礎框架到垂直應用

2025年&#xff0c;開源AI智能體技術正以前所未有的速度重塑人工智能領域&#xff0c;從單一任務處理到復雜多智能體協作&#xff0c;開源生態已成為技術創新的核心驅動力。一、開源AI智能體生態概述 1.1 技術演進與發展歷程 AI智能體技術經歷了從規則式智能體&#xff08;2015…

Empire: LupinOne靶場滲透

Empire: LupinOne 來自 <https://www.vulnhub.com/entry/empire-lupinone,750/#top> 1&#xff0c;將兩臺虛擬機網絡連接都改為NAT模式 2&#xff0c;攻擊機上做namp局域網掃描發現靶機 nmap -sn 192.168.23.0/24 那么攻擊機IP為192.168.23.128&#xff0c;靶場IP192.16…