背景與問題概述
????????這一周(2025-05-26-2026-05-30)我在搞數據擬合修復優化的任務,有大量的數據需要進行數據處理及回寫,大概一個表一天一分區有五六千萬數據,大約一百多列的字段。 ????????具體是這樣的我先取檔案,關聯對應表hive對應分區的數據,然后進行算法一系列邏輯處理后,將結果輸出到hive,然后再從hive回寫一份到oracle里面。 ????????
????????spark資源大概我給了不小,數據大概一天40左右吧,大概12個excutor,每一個12G內存,2core吧,擬合完數據,將數據入hive時候,進行了整體去重。 包括且不限于如下操作 ??????
1、.distinct(), ????????
2、對應主鍵的去重.dropDuplicates(id), ????????
3、row_number對id,type主鍵字段開窗取first ????????
4、對id,type主鍵字段開窗,取后續字段的max()
????????經過以上操作,我的數據得以在沒有主鍵沖突的情況下順利的入庫到hive中,并且我對入庫數據進行group by?id,type having count(1) >1時數據也沒有出現重復的情況。 ???????
????????OK。鬼知道我對上述數據驗證進行多少次跑批總結出來的上面的操作。以上是我寫入hive的操作。 下面即將是從hive入到oracle艱辛的探索之路。 正常來講經過上面的數據操作,我從hive入到oracle是不應該出現主鍵沖突的情況了,因為我有一部分表已經處理入庫了,但有一個表就是死活入不進去,我impala都快查爛了,資源監控的同事都給我致電了。 ????????
????????為什么調了一天呢,因為跑一個 程序就要個吧小時,代碼都快被我調抑郁了。
Hive數據寫入階段的去重策略
經過多次實驗和驗證,我總結出一套有效的去重方法,確保數據在寫入Hive時不出現主鍵沖突:
1. 整體去重 - distinct()
val distinctDF = originalDF.distinct()
這種方法簡單直接,但性能開銷較大,適合小數據集或初步去重。
2. 基于主鍵的去重 - dropDuplicates()
val dedupByKeyDF = originalDF.dropDuplicates("id")
比整體去重更高效,只針對指定列進行去重。
3. 開窗函數取第一條記錄
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._val windowSpec = Window.partitionBy("id", "type").orderBy("timestamp") val firstRecordDF = originalDF.withColumn("rn", row_number().over(windowSpec)).filter("rn = 1").drop("rn")
這種方法在有多條相同主鍵記錄時,可以按指定排序條件保留一條。
4. 開窗函數取最大值記錄
val maxValueDF = originalDF.groupBy("id", "type").agg(max("value1").as("value1"), max("value2").as("value2"),/* 其他字段的max操作 */)
對于需要保留最大值的場景,這種聚合方式非常有效。
Hive到Oracle的數據的遷移問題結局
盡管Hive中的數據已經嚴格去重,但在遷移到Oracle時仍遇到了兩個主要問題:
問題1:NULL值導致的主鍵沖突
-- 問題發現查詢
SELECT id, type, COUNT(1)
FROM hive_table
WHERE id IS NULL
GROUP BY id, type
HAVING COUNT(1) > 1;
解決方案:
// 在寫入Oracle前增加NULL值處理
val cleanDF = processedDF.na.fill("NULL", Seq("id")).filter("id IS NOT NULL") // 或者直接過濾
問題2:資源不足導致的作業失敗
最初配置:
-
12個Executor
-
每個Executor 12G內存,2個核心
-
一個表一天的分區大概處理約40GB數據
作業在運行10-20分鐘后失敗,經過多次調整,最終穩定運行的配置:
-
每個Executor 45G內存,這個我覺得得看集群資源,我們集群資源很緊張,大概10TB的內存,都不太夠用
-
適當增加核心數(根據集群情況)我一般都設置2
性能優化經驗總結
1. 內存配置黃金法則
對于大規模數據處理,Executor內存配置應遵循:
-
基礎內存 = 數據分區大小 × 安全系數(2-3)
-
考慮序列化開銷和中間數據結構
2. 高效去重策略選擇
方法 | 適用場景 | 優點 | 缺點 |
---|---|---|---|
distinct() | 小數據集或全字段去重 | 簡單 | 性能差 |
dropDuplicates() | 已知主鍵字段 | 高效 | 僅針對指定列 |
開窗函數 | 需要按條件保留記錄 | 靈活可控 | 計算開銷大 |
聚合函數 | 需要保留極值 | 高效 | 只能處理數值字段 |
3. NULL值處理最佳實踐
-
在數據處理的早期階段識別和處理NULL值
-
對于主鍵字段,NULL值應被替換或過濾
-
考慮使用COALESCE或NVL函數提供默認值
4. 資源監控與調優技巧
-
觀察GC時間和頻率,內存不足時GC會頻繁發生
-
監控Executor心跳丟失情況
-
適當增加
spark.memory.fraction
(默認0.6) -
考慮啟用
spark.memory.offHeap.enabled
使用堆外內存
優化Demo示例代碼
/*** @date 2025-05-30* @author hebei_xidaocun_laoli*/
// 1. 讀取原始數據
val rawDF = spark.table("source_table").where("dt = '20250530'") // 按分區過濾// 2. 多階段去重處理
val stage1DF = rawDF.dropDuplicates("id") // 初步去重val windowSpec = Window.partitionBy("id", "type").orderBy(col("update_time").desc)
val stage2DF = stage1DF.withColumn("rn", row_number().over(windowSpec)).filter("rn = 1").drop("rn")// 3. NULL值處理
val cleanDF = stage2DF.na.fill(Map("id" -> "NULL_ID","type" -> "DEFAULT"
)).filter("id != 'NULL_ID'") // 或者保留但確保不沖突// 4. 寫入Hive
cleanDF.write.mode("overwrite").partitionBy("dt").saveAsTable("result_hive_table")// 5. 配置優化后寫入Oracle
cleanDF.write.format("jdbc").option("url", "jdbc:oracle:thin:@//host:port/service").option("dbtable", "target_table").option("user", "username").option("password", "password").option("batchsize", 10000) // 調整批量大小.option("isolationLevel", "NONE") // 對于大數據量寫入可提高性能.mode("append").save()
通過這次項目,總結了以下經驗:
-
數據質量優先:在數據處理早期階段解決NULL值、重復數據等問題
-
漸進式調優:從較小資源開始,逐步增加直至作業穩定運行
-
監控驅動:密切監控作業執行情況,特別是GC和內存使用指標
-
文檔記錄:記錄每次調整的參數和效果,形成知識庫
????????大數據處理中的問題往往不是單一因素導致的,需要綜合考慮數據特性、處理邏輯和集群資源。希望諸君避免類似的"坑",更高效地完成大數據處理任務。
????????這個資源調優是真的惡心,代碼沒問題,就是和資源有問題,跑著跑著就突然報錯了,唉,還好這個端午節前解決了