大數據量下的數據修復與回寫Spark on Hive 的大數據量主鍵沖突排查:COUNT(DISTINCT) 的陷阱

背景與問題概述

????????這一周(2025-05-26-2026-05-30)我在搞數據擬合修復優化的任務,有大量的數據需要進行數據處理及回寫,大概一個表一天一分區有五六千萬數據,大約一百多列的字段。 ????????具體是這樣的我先取檔案,關聯對應表hive對應分區的數據,然后進行算法一系列邏輯處理后,將結果輸出到hive,然后再從hive回寫一份到oracle里面。 ????????

????????spark資源大概我給了不小,數據大概一天40左右吧,大概12個excutor,每一個12G內存,2core吧,擬合完數據,將數據入hive時候,進行了整體去重。 包括且不限于如下操作 ??????

1、.distinct(), ????????

2、對應主鍵的去重.dropDuplicates(id), ????????

3、row_number對id,type主鍵字段開窗取first ????????

4、對id,type主鍵字段開窗,取后續字段的max()

????????經過以上操作,我的數據得以在沒有主鍵沖突的情況下順利的入庫到hive中,并且我對入庫數據進行group by?id,type having count(1) >1時數據也沒有出現重復的情況。 ???????

????????OK。鬼知道我對上述數據驗證進行多少次跑批總結出來的上面的操作。以上是我寫入hive的操作。 下面即將是從hive入到oracle艱辛的探索之路。 正常來講經過上面的數據操作,我從hive入到oracle是不應該出現主鍵沖突的情況了,因為我有一部分表已經處理入庫了,但有一個表就是死活入不進去,我impala都快查爛了,資源監控的同事都給我致電了。 ????????

????????為什么調了一天呢,因為跑一個 程序就要個吧小時,代碼都快被我調抑郁了。

Hive數據寫入階段的去重策略

經過多次實驗和驗證,我總結出一套有效的去重方法,確保數據在寫入Hive時不出現主鍵沖突:

1. 整體去重 - distinct()

val distinctDF = originalDF.distinct()

這種方法簡單直接,但性能開銷較大,適合小數據集或初步去重。

2. 基于主鍵的去重 - dropDuplicates()

val dedupByKeyDF = originalDF.dropDuplicates("id")

比整體去重更高效,只針對指定列進行去重。

3. 開窗函數取第一條記錄

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._val windowSpec = Window.partitionBy("id", "type").orderBy("timestamp")
val firstRecordDF = originalDF.withColumn("rn", row_number().over(windowSpec)).filter("rn = 1").drop("rn")

這種方法在有多條相同主鍵記錄時,可以按指定排序條件保留一條。

4. 開窗函數取最大值記錄

val maxValueDF = originalDF.groupBy("id", "type").agg(max("value1").as("value1"), max("value2").as("value2"),/* 其他字段的max操作 */)

對于需要保留最大值的場景,這種聚合方式非常有效。

Hive到Oracle的數據的遷移問題結局

盡管Hive中的數據已經嚴格去重,但在遷移到Oracle時仍遇到了兩個主要問題:

問題1:NULL值導致的主鍵沖突

-- 問題發現查詢
SELECT id, type, COUNT(1) 
FROM hive_table 
WHERE id IS NULL 
GROUP BY id, type 
HAVING COUNT(1) > 1;

解決方案

// 在寫入Oracle前增加NULL值處理
val cleanDF = processedDF.na.fill("NULL", Seq("id")).filter("id IS NOT NULL") // 或者直接過濾

問題2:資源不足導致的作業失敗

最初配置:

  • 12個Executor

  • 每個Executor 12G內存,2個核心

  • 一個表一天的分區大概處理約40GB數據

作業在運行10-20分鐘后失敗,經過多次調整,最終穩定運行的配置:

  • 每個Executor 45G內存,這個我覺得得看集群資源,我們集群資源很緊張,大概10TB的內存,都不太夠用

  • 適當增加核心數(根據集群情況)我一般都設置2

性能優化經驗總結

1. 內存配置黃金法則

對于大規模數據處理,Executor內存配置應遵循:

  • 基礎內存 = 數據分區大小 × 安全系數(2-3)

  • 考慮序列化開銷和中間數據結構

2. 高效去重策略選擇

方法適用場景優點缺點
distinct()小數據集或全字段去重簡單性能差
dropDuplicates()已知主鍵字段高效僅針對指定列
開窗函數需要按條件保留記錄靈活可控計算開銷大
聚合函數需要保留極值高效只能處理數值字段

3. NULL值處理最佳實踐

  • 在數據處理的早期階段識別和處理NULL值

  • 對于主鍵字段,NULL值應被替換或過濾

  • 考慮使用COALESCE或NVL函數提供默認值

4. 資源監控與調優技巧

  • 觀察GC時間和頻率,內存不足時GC會頻繁發生

  • 監控Executor心跳丟失情況

  • 適當增加spark.memory.fraction(默認0.6)

  • 考慮啟用spark.memory.offHeap.enabled使用堆外內存

優化Demo示例代碼

  /*** @date 2025-05-30* @author hebei_xidaocun_laoli*/
// 1. 讀取原始數據
val rawDF = spark.table("source_table").where("dt = '20250530'") // 按分區過濾// 2. 多階段去重處理
val stage1DF = rawDF.dropDuplicates("id") // 初步去重val windowSpec = Window.partitionBy("id", "type").orderBy(col("update_time").desc)
val stage2DF = stage1DF.withColumn("rn", row_number().over(windowSpec)).filter("rn = 1").drop("rn")// 3. NULL值處理
val cleanDF = stage2DF.na.fill(Map("id" -> "NULL_ID","type" -> "DEFAULT"
)).filter("id != 'NULL_ID'") // 或者保留但確保不沖突// 4. 寫入Hive
cleanDF.write.mode("overwrite").partitionBy("dt").saveAsTable("result_hive_table")// 5. 配置優化后寫入Oracle
cleanDF.write.format("jdbc").option("url", "jdbc:oracle:thin:@//host:port/service").option("dbtable", "target_table").option("user", "username").option("password", "password").option("batchsize", 10000) // 調整批量大小.option("isolationLevel", "NONE") // 對于大數據量寫入可提高性能.mode("append").save()

通過這次項目,總結了以下經驗:

  1. 數據質量優先:在數據處理早期階段解決NULL值、重復數據等問題

  2. 漸進式調優:從較小資源開始,逐步增加直至作業穩定運行

  3. 監控驅動:密切監控作業執行情況,特別是GC和內存使用指標

  4. 文檔記錄:記錄每次調整的參數和效果,形成知識庫

????????大數據處理中的問題往往不是單一因素導致的,需要綜合考慮數據特性、處理邏輯和集群資源。希望諸君避免類似的"坑",更高效地完成大數據處理任務。

????????這個資源調優是真的惡心,代碼沒問題,就是和資源有問題,跑著跑著就突然報錯了,唉,還好這個端午節前解決了

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/83360.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/83360.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/83360.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

基于 AUTOSAR 的域控產品軟件開發:從 CP 到 AP 的跨越

基于 AUTOSAR 的域控產品軟件開發:從 CP 到 AP 的跨越 一、AUTOSAR AP 架構解析:面向智能汽車的自適應框架 (一)引言 隨著汽車智能化向 L3 演進,傳統 AUTOSAR CP(經典平臺)在實時性、動態性和…

Nacos 配置管理案例:nacos-spring-cloud-config-example詳解

一、結構說明:基于Spring Cloud Alibaba的微服務示例 nacos-spring-cloud-config-example : 服務提供者 二、技術棧:Spring BootSpring CloudSpring Cloud Alibaba Nacos Actuator(可選:監控) 三、使用環境 安裝…

BUUCTF[ACTF2020 新生賽]Include 1題解

BUUCTF[ACTF2020 新生賽]Include 1題解 題目分析:知識準備:php://filter 過濾器參數說明常用過濾器功能對照表 開始解題:原理解析構造payload 總結 題目分析: 生成靶機,打開網址,查看源碼,抓包…

vscode + cmake + ninja+ gcc 搭建MCU開發環境

vscode cmake ninja gcc 搭建MCU開發環境 文章目錄 vscode cmake ninja gcc 搭建MCU開發環境1. 前言2. 工具安裝及介紹2.1 gcc2.1.1 gcc 介紹2.1.2 gcc 下載及安裝 2.2 ninja2.2.1 ninja 介紹2.2 ninja 安裝 2.3 cmake2.3.1 cmake 介紹2.3.2 cmake 安裝 2.4 VScode 3. 上手…

九(1). 引用作為函數參數的使用

引用作為參數使用 在 C 中,引用作為函數參數是一種高效且靈活的參數傳遞方式,它避免了拷貝開銷,同時允許函數直接操作原始數據。 以下是關于引用作為參數的詳細使用指南和最佳實踐: 1. 引用作為參數的基本用法 (1) 普通引用&…

Linux多路TTS混音播放:讓多個語音同時清晰可聽

Linux多路TTS混音播放:讓多個語音同時清晰可聽 為什么需要多路混音播放?技術原理概述第一步:配置ALSA dmix混音插件為什么需要dmix?具體配置步驟第二步:生成TTS語音文件為什么需要格式轉換?Python生成腳本第三步:實現多路同時播放播放器設計原理Python實現代碼多路同時播…

Spring AI 1.0 GA 深度解析:構建企業級AI應用的全棧實踐指南

目錄 Spring AI 1.0 核心架構解析統一接口與多模型支持檢索增強生成(RAG)全流程實戰對話記憶與工具調用進階模型評估與可觀測性體系企業級應用案例與最佳實踐未來演進與技術展望1. Spring AI 1.0 核心架構解析 1.1 技術架構演進 #mermaid-svg-ymTZMAaxOwd4OAMu {font-family…

Docker 安裝 Redis 容器

系列文章目錄 文章目錄 系列文章目錄前言1 獲取redis鏡像2 創建和部署redis容器3 查看redis是否啟動成功4 使用Redis客戶端驗證連接總結 前言 搭建環境: ubuntu22.04.05 docker redis: 7.0.10 測試環境: windows: win11 Redis測試客戶端:Ti…

學習vue3階段性復習(插槽,Pinia,生命周期)

目錄 插槽(匿名插槽,具名插槽) 插槽概述 匿名插槽 具名插槽 Pinia(統一管理,共享數據) pinia概述 安裝和使用Pinia 1 使用命令下載Pinia 2 再main.js中導入,注冊到vue框架中 3使用pinia 持久化存儲插件 1 第一步&…

嵌入式Linux 期末復習指南(上)

鑒于互聯網上針對本科目相關復習視頻及資料過少, 撰寫本篇期末復習指南用作期末復習知識點掃盲,以應對本科期末考試及格之用。 由于任課老師并透露考試范圍或任何有關試卷的相關信息,本篇指南基于教材、上機實驗報告及作者經驗編寫&#xff0…

VScode ios 模擬器安裝cocoapods

使用 Homebrew 安裝(推薦) 如果你有 Homebrew,直接用它安裝更穩定: brew install cocoapods

Python趣學篇:用Pygame打造絢爛流星雨動畫

名人說:路漫漫其修遠兮,吾將上下而求索。—— 屈原《離騷》 創作者:Code_流蘇(CSDN)(一個喜歡古詩詞和編程的Coder😊) 專欄介紹:《Python星球日記》 目錄 一、項目簡介與效果展示二、技術棧與核…

可視化大屏通用模板Axure原型設計案例

本文將介紹一款基于Axure設計的可視化大屏通用模板,適用于城市、網絡安全、園區、交通、社區、工業、醫療、能源等多個領域。 模板概述 這款Axure可視化大屏通用模板集成了多種數據展示模塊和組件,旨在為用戶提供一個靈活、可定制的數據展示平臺。無論…

20250530-C#知識:萬物之父Object

C#知識:萬物之父Object Object類(即object)是所有類的基類,這里面的方法還是需要好好了解一下。 1、Object類 是頂級父類,其他類默認都是Object類的子類(自定義類也會默認繼承Object類)可以用O…

蘋果應用開發詳細教程(2025最新版)

蘋果應用開發詳細教程(2025最新版) 第一階段:開發環境搭建 硬件準備 Mac電腦(macOS Monterey 12或更高版本)iPhone/iPad(真機調試建議iOS 16+)軟件安裝 # 通過App Store安裝Xcode xcode-select --installXcode 15+(包含Swift 5.9編譯器)安裝CocoaPods(依賴管理工具)…

flutter項目遷移空安全

重中之重 備份好項目文件,甚至連已經加載好的flutter庫也可以備份。環境包升級 2.1 不要直接換成flutter:3.0以上的版本,這樣做既有基本的庫兼容問題,又有空安全下的語法問題(整個項目中需要增加 late、?、!的語法錯誤,一片報錯的…

架構師面試題整理

以下是從提供的HTML代碼中提取的所有class"title-txt"的文本內容,已排除重復項并按順序整理: 緩存專題 實戰解決大規模緩存擊穿導致線上數據庫壓力暴增面試常問的緩存穿透是怎么回事基于DCL機制解決突發性熱點緩存并發重建問題實戰Redis分布…

pytest 中 fixture 與類繼承交互導致的問題

文章目錄 問題分析將屬性綁定到 **類** 上使用 scopefunction 解決方法為什么有兩個不同的對象核心原因:fixture 的執行上下文scopefunction 的情況scopeclass 的情況 為什么 pytest 要這樣做?這是 pytest 的設計局限 總結 本文探討 Pytest 中 fixture 作…

uniapp+ts模擬popup彈出框(下拉框)

效果圖&#xff08;未展開的樣子&#xff09;&#xff1a; 效果圖&#xff08;展開的樣子&#xff09;&#xff1a; 子組件代碼&#xff1a; <!--* Date: 2024-04-26 14:30:00* LastEditTime: 2025-05-29 09:01:06* Description: 技術服務 --> <template><view …

中小型企業大數據平臺全棧搭建:Hive+HDFS+YARN+Hue+ZooKeeper+MySQL+Sqoop+Azkaban 保姆級配置指南

目錄 背景?一、環境規劃與依賴準備?1. 服務器規劃(3節點集群)2. 系統與依賴?3. Hadoop生態組件版本與下載路徑4. 架構圖二、Hadoop(HDFS+YARN)安裝與配置?1. 下載與解壓(所有節點)2. HDFS高可用配置3. YARN資源配置?4. 啟動Hadoop集群三、MySQL安裝與Hive元數據配置…