hedfs和hive數據遷移后校驗腳本

先談論校驗方法,本人騰訊云大數據工程師。

1、hdfs的校驗

這個通常就是distcp校驗,hdfs通過distcp遷移到另一個集群,怎么校驗你的對不對。

有人會說,默認會有校驗CRC校驗。我們關閉了,為什么關閉?全量遷移,如果當前表再寫數據,開自動校驗就會失敗。數據量大(PB級)遷移流程是先遷移全量,后面在定時補最近幾天增量,再找個時間點,進行業務割接

那么怎么知道你遷移的hdfs是否有問題呢?

2個文件,一個是腳本,一個是需要校驗的目錄

data_checksum.py

# -*- coding: utf-8 -*-
# @Time    : 2025/1/16 22:52
# @Author  : fly-wlx
# @Email   : xxx@163.com
# @File    : data_compare.py
# @Software: PyCharmimport subprocess#output_file = 'data_checksum_result.txt'
def load_file_paths_from_conf(conf_file):file_list = []with open(conf_file, 'r') as file:lines = file.readlines()for line in lines:path = line.strip()if path and not path.startswith('#'):  # 跳過空行和注釋full_path = f"{path}"file_list.append(full_path)return file_list#def write_sizes_to_file(filepath,source_namenode,source_checksum,target_namenode,target_checksum,status, output_file):
#    with open(output_file, 'w') as file:
#file.write(f"{source_namenode}/{filepath},{source_checksum},{target_namenode}/{filepath},{target_checksum},{status}\n")def write_sizes_to_file(source_path, src_info, destination_path, target_info, status,output_file):with open(output_file, 'a') as file:file.write(f"{source_path},{src_info},{destination_path}, {target_info}, {status}\n")
def run_hadoop_command(command):"""運行 Hadoop 命令并返回輸出"""try:result = subprocess.check_output(command, shell=True, text=True)return result.strip()except subprocess.CalledProcessError as e:print(f"Command failed: {e}")return Nonedef get_hdfs_count(hdfs_filepath):"""獲取 HDFS 路徑的文件和目錄統計信息"""command = f"hadoop fs -count {hdfs_filepath}"output = run_hadoop_command(command)if output:parts = output.split()if len(parts) >= 3:dir_count, file_count, content_size = parts[-3:]return dir_count, file_count, content_sizereturn None, None, Nonedef get_hdfs_size(hdfs_filepath):"""獲取 HDFS 路徑的總文件大小"""command = f"hadoop fs -du -s {hdfs_filepath}"output = run_hadoop_command(command)if output:parts = output.split()if len(parts) >= 1:return parts[0]return Nonedef validate_hdfs_data(source_namenode, target_namenode,filepath):output_file = 'data_checksum_result.txt'source_path=f"{source_namenode}/{filepath}"destination_path = f"{target_namenode}/{filepath}""""校驗 HDFS 源路徑和目標路徑的數據一致性"""print("Fetching source path statistics...")src_dir_count, src_file_count, src_content_size = get_hdfs_count(source_path)src_total_size = get_hdfs_size(source_path)print("Fetching destination path statistics...")dest_dir_count, dest_file_count, dest_content_size = get_hdfs_count(destination_path)dest_total_size = get_hdfs_size(destination_path)src_info={}src_info["src_dir_count"] = src_dir_countsrc_info["src_file_count"] = src_file_count#src_info["src_content_size"] = src_content_sizesrc_info["src_total_size"] = src_total_sizetarget_info = {}target_info["src_dir_count"] = dest_dir_counttarget_info["src_file_count"] = dest_file_count#target_info["src_content_size"] = dest_content_sizetarget_info["src_total_size"] = dest_total_sizeprint("\nValidation Results:")if (src_dir_count == dest_dir_count andsrc_file_count == dest_file_count and# src_content_size == dest_content_size andsrc_total_size == dest_total_size):print("? Source and destination paths are consistent!")write_sizes_to_file(source_path, src_info, destination_path,target_info, 0,output_file)else:print("? Source and destination paths are inconsistent!")write_sizes_to_file(source_path, src_info, destination_path, target_info, 1,output_file)#print(f"Source: DIR_COUNT={src_dir_count}, FILE_COUNT={src_file_count}, CONTENT_SIZE={src_content_size}, TOTAL_SIZE={src_total_size}")#print(f"Destination: DIR_COUNT={dest_dir_count}, FILE_COUNT={dest_file_count}, CONTENT_SIZE={dest_content_size}, TOTAL_SIZE={dest_total_size}")# 設置源路徑和目標路徑
#source_path = "hdfs://namenode1:8020/"
#destination_path = "hdfs://namenode2:8020/path/to/destination"
# 定義源和目標集群的 namenode 地址
source_namenode = "hdfs://10.xx.xx.6:8020"
target_namenode= "hdfs://10.xx.xx.106:4007"def main():# 配置文件路徑和輸出文件路徑conf_file = 'distcp_paths.conf'# 定義源和目標集群的 namenode 地址# 設置源路徑和目標路徑#source_namenode = "hdfs://source-namenode:8020"#target_namenode = "hdfs://target-namenode:8020"# 文件列表file_paths = load_file_paths_from_conf(conf_file)# 對每個目錄進行校驗for filepath in file_paths:validate_hdfs_data(source_namenode, target_namenode, filepath)if __name__ == "__main__":main()# 執行校驗
#validate_hdfs_data(source_path, destination_path)

distcp_paths.conf

/apps/hive/warehouse/xx.db/dws_ixx_features
/apps/hive/warehouse/xx.db/dwd_xx_df

用法

直接python3 data_checksum.py(需要改為自己的)

他會實時打印對比結果,并且將結果生成到一個文件中(data_checksum_result.txt)

2、hive文件內容比對

最終客戶要的是任務的數據對得上,而不是管你遷移怎么樣,所以驗證任務的方式:兩邊同時跑同多個Hive任務流的任務,查看表數據內容是否一致。(因為跑出來的hdfs的文件大小由于mapreduce原因,肯定是不一致的,校驗實際數據一致就行了)

方法是先對比表字段,然后對比count數,然后將每行拼起來對比md5

涉及3個文件,單檢測腳本,批量入口腳本,需要批量檢測的表文件

check_script.sh

#!/bin/bash
#owner:clark.shi
#date:2025/1/22
#背景:用于hive從源端任務和目標端任務,兩邊跑完結果表的內容校驗(因為mapreduce和小文件不同,所以要用數據內容校驗)
#     --用trino(presto)會更好,因為可以跨集群使用,目前客戶因為資源情況沒裝,此為使用hive引擎,將數據放到本地進行比對#輸入:源端表,目標表,分區名,分區值
#$0是腳本本身,最低從1開始#限制腳本運行內存大小,30gb
#ulimit -v 30485760#---注意,要保證,2個表的字段順序是一樣的(md5是根據順序拼接的)
echo "================"
echo "注意"
echo "要保證,2個表的字段順序是一樣的(md5是根據順序拼接的)"
echo "要保證,這2個表是存在的"
echo "要保證,雙端是可以互相訪問"
echo "要保證,2個hive集群的MD5算法相同"
echo "禁止表,一個分區數據量超過本地磁盤,此腳本會寫入本地磁盤(雙端數據),對比后刪除"
echo "注意,如果分區字段是數字不用加引號,如果是字符串需要加引號,搜partition_value,這里分區是int如20250122是沒有引號"
echo "================"a_table=$1
b_table=$2
partition_column=$3
partition_value=$4if [ $# -ne 4 ]; thenecho "錯誤:必須輸入 4 個參數,源端表,目標表,分區名,分區值"exit 1
fi#------------函數check_value() {# 第一個參數是布爾值,第二個參數是要 echo 的內容local value=$1local message=$2# 檢查第一個參數的值if [ "$value" == "false" ]; thenecho "校驗失敗:$message" >> rs.txtexit fi
}#-----------函數結束echo "需要對比表的數據內容是$a_table和$b_table--,需要對比分區$partition_column是$partition_value--"sleep 2
echo "===============開始校驗============="
#todo改成自己的,kerbers互信認證(也可以用ldap)
`kinit -kt /root/s_xx_tbds.keytab s_xx_tbds@TBDS-V12X10CS`#校驗字段類型
echo "1.開始校驗字段類型"#todo這里要改成自己的beeline -u "jdbc:hive2://10.xx.xx.4:10001/XXdatabase;principal=hive/tbds-10-xx-xx-4.hadooppdt.xxjin.srv@TBDS-V12X10CS;transportMode=http;httpPath=cliservice" -e "DESCRIBE $b_table" > 1_a_column.txtbeeline -u "jdbc:hive2://10.xx.xx.104:7001/XXdatabase;principal=hadoop/10.xx.xx.104@TBDS-09T7KXLE" -e "DESCRIBE $a_table" > 1_b_column.txtif diff 1_a_column.txt 1_b_column.txt > /dev/null; thenecho "表結構一致"elseecho "表結構不一致"check_value false "$a_table和$b_table字段類型不一致"fi echo "------------1.表字段,校驗完畢,通過-------------"#校驗count數
echo "2.開始count校驗"beeline -u "jdbc:hive2://10.xx.xx.4:10001/XXdatabase;principal=hive/tbds-10-xx-xx-4.hadooppdt.xxjin.srv@TBDS-V12X10CS;transportMode=http;httpPath=cliservice" -e "select count(*) from $b_table where $partition_column=$partition_value" > 2_a_count.txtbeeline -u "jdbc:hive2://10.xx.xx.104:7001/XXdatabase;principal=hadoop/10.xx.xx.104@TBDS-09T7KXLE" -e "select count(*) from $a_table where $partition_column=$partition_value" > 2_b_count.txtif diff 2_a_count.txt 2_b_count.txt > /dev/null; thenecho "數據行一致"elseecho "數據行不一致"check_value false "$a_table和$b_table的數據行不一致"fiecho "------------2.數據行,校驗完畢,通過-------------"#拼接每一行的值,作為唯一值,創建2個臨時表
echo "3.生成每條數據唯一標識"#1.獲取表列名#使用awk,去除第一行字段名,,刪除#字號以及他后面的內容(一般是分區的描述),根據分隔符|取第一列數據,去掉空的行beeline -u "jdbc:hive2://10.xx.xx.104:7001/XXdatabase;principal=hadoop/10.xx.xx.104@TBDS-09T7KXLE" --outputformat=dsv -e "DESCRIBE $a_table" |awk 'NR > 1' |awk '!/^#/ {print} /^#/ {exit}'|awk 'BEGIN {FS="|"} {print $1}'|awk 'NF > 0' > 3_table_field_name.txt#2.拼接表列名,生成md5的表 (第一步已經檢測過雙方的表結構了,這里用同一個拼接字段即可)# 使用 while 循環逐行讀取文件內容name_fields=""while IFS= read -r line; doif [ -z "$name_fields" ]; thenname_fields="$line"elsename_fields="$name_fields,$line"fidone < "3_table_field_name.txt"echo "$name_fields"#將每行數據進行拼接,并且生成含一個字段的md5表md5_sql="SELECT distinct(MD5(CONCAT($name_fields))) AS md5_value "a_md5_sql="$md5_sql from (select * from dim_user_profile_df where $partition_column=$partition_value  limit 100)a;"b_md5_sql="$md5_sql from $a_table where $partition_column=$partition_value;"echo "a表的sql是:$a_md5_sql"echo "b表的sql是:$b_md5_sql"#源端是生產環境,這里做了特殊處理,源端就取100條(沒使用order by rand(),客戶主要是檢測函數,order by 會占用他們集群資源)beeline -u "jdbc:hive2://10.xx.xx.4:10001/XXdatabase;principal=hive/tbds-10-xx-xx-4.hadooppdt.xxjin.srv@TBDS-V12X10CS;transportMode=http;httpPath=cliservice" --outputformat=dsv -e "$a_md5_sql" > 4_a_md5_data.txtbeeline -u "jdbc:hive2://10.xx.xx.104:7001/XXdatabase;principal=hadoop/10.xx.xx.104@TBDS-09T7KXLE" --outputformat=dsv -e "$b_md5_sql" > 4_b_md5_data.txt#3.(由于不是同集群,需要下載到本地,再進行導入--如果耗費資源時長太長,再導入到hive,否則直接shell腳本搞定)# 設置large_file和small_file的路徑large_file="4_b_md5_data.txt"small_file="4_a_md5_data.txt"# 遍歷small_file中的每一行while IFS= read -r line; do# 檢查line是否存在于large_file中if grep -qxF "$line" "$large_file"; then# 如果line存在于large_file中,輸出1#echo "1"a=1else# 如果line不存在于large_file中,輸出2echo "2"check_value false "$a_table和$b_table抽樣存在數據內容不一致"fidone < "$small_file"echo echo "------------3.數據內容,校驗完畢,通過-------------"
#抽樣核對md5(取數據時已抽樣,否則數據太大容易跑掛生產環境) 

input_file.txt需要校驗的表文件

源端表名,目標端表名,分區字段(寫1級分區就可以),分區值

ods_xxnfo_di ods_xxnfo_dii dt 20250106

ods_asxx_log_di ods_asxx_log_dii dt 20250106

ods_xxog_di ods_xxog_di dt 20250106

dwd_xxx dwd_xxx dt 20250106

run.sh

#!/bin/bash# 設置文件路徑
input_file="input_file.txt"# 遍歷文件中的每一行
while IFS= read -r line; do# 調用另一個腳本并傳遞當前行的參數echo $line./check_script.sh $line# 在每次執行完后間隔一小段時間,避免系統過載(可選)sleep 1
done < "$input_file"

使用方法

sh run.sh(需要把check_scripe和run里的內容改成自己的哈)

他會把不通過的,生成一個rs.txt

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/67077.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/67077.shtml
英文地址,請注明出處:http://en.pswp.cn/web/67077.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

61,【1】BUUCTF WEB BUU XSS COURSE 11

進入靶場 左邊是吐槽&#xff0c;右邊是登錄&#xff0c;先登錄試試 admin 123456 admiin# 123456 admin"# 123456 不玩了&#xff0c;先去回顧下xss 回顧完就很尷尬了&#xff0c;我居然用SQL的知識去做xss的題 重來 吐槽這里有一個輸入框&#xff0c;容易出現存儲型…

海外問卷調查如何影響企業的經營?在品牌建設中有何指導意義?

市場調查的定義&#xff1a;通過科學的方法&#xff0c;有目的地、系統地搜集整理一些市場信息&#xff0c;其目的在于了解當下市場現狀和發展前景&#xff0c;為企業生產和品牌打造提供一些科學的指導意見&#xff0c;這是任何大企業、中小企業、初創企業都必須重視的一個重要…

STM32新建不同工程的方式

新建工程的方式 1. 安裝開發工具 MDK5 / keil52. CMSIS 標準3. 新建工程3.1 寄存器版工程3.2 標準庫版工程3.3 HAL/LL庫版工程3.4 HAL庫、LL庫、標準庫和寄存器對比3.5 庫開發和寄存器的關系 4. STM32CubeMX工具的作用 1. 安裝開發工具 MDK5 / keil5 MDK5 由兩個部分組成&#…

idea maven本地有jar包,但還要從遠程下載

idea 中&#xff0c;java 工程執行 maven reimport&#xff0c;報jar報無法下載。 我奇了個怪&#xff0c;我明明在本地倉庫有啊&#xff0c;你非得從遠程下載&#xff1f; 我從供應商那里拿來的&#xff0c;遠程當然沒有了。 這太奇葩了吧&#xff0c;折騰好久不行。 后來…

springboot 調用 c++生成的so庫文件

一、創建c文件 SoTest.h #pragma once class SoTest {int Add(int a,int b); };SoTest.cpp #include "SoTest.h"int SoTest::Add(int a, int b) {return a b; }二、創建so文件 /home/ubuntu/projects/SoTest/bin/x64/Debug/libSoTest.so 三、java代碼 Maven依…

Windows 靶機常見服務、端口及枚舉工具與方法全解析:SMB、LDAP、NFS、RDP、WinRM、DNS

在滲透測試中&#xff0c;Windows 靶機通常會運行多種服務&#xff0c;每種服務都有其默認端口和常見的枚舉工具及方法。以下是 Windows 靶機常見的服務、端口、枚舉工具和方法的詳細說明&#xff1a; 1. SMB&#xff08;Server Message Block&#xff09; 端口 445/TCP&…

250125-package

1. 定義 包就是文件夾&#xff0c;作用是在大型項目中&#xff0c;避免不同人的編寫的java文件出現同名進而導致報錯&#xff1b;想象一個場景&#xff0c;在一個根目錄中&#xff0c;每一個人都有自己的一個java文件夾&#xff0c;他可以將自己編寫的文件放在該文件夾里&…

系統思考—動態問題分析

“不是解決問題&#xff0c;而是根本改變它的方式&#xff0c;才能真正創造持久的成功。”——彼得德魯克 在很多情況下&#xff0c;企業面對問題時&#xff0c;總會急于尋找解決方案&#xff0c;但這些方案往往只是暫時的“應急措施”。它們看似有效&#xff0c;卻難以從根本…

ASP.NET Core WebAPI的異步及返回值

目錄 Action方法的異步 Action方法參數 捕捉URL占位符 捕捉QueryString的值 JSON報文體 其他方式 Action方法的異步 Action方法既可以同步也可以異步。異步Action方法的名字一般不需要以Async結尾。Web API中Action方法的返回值如果是普通數據類型&#xff0c;那么返回值…

系統架構設計師教材:信息系統及信息安全

信息系統 信息系統的5個基本功能&#xff1a;輸入、存儲、處理、輸出和控制。信息系統的生命周期分為4個階段&#xff0c;即產生階段、開發階段、運行階段和消亡階段。 信息系統建設原則 1. 高層管理人員介入原則&#xff1a;只有高層管理人員才能知道企業究竟需要什么樣的信…

Golang Gin系列-5:數據模型和數據庫

在這篇Gin教程的博客中&#xff0c;我們將探索如何將模型和數據庫與Gin框架無縫集成&#xff0c;使你能夠構建健壯且可擴展的web應用程序。通過利用流行的庫并遵循最佳實踐&#xff0c;你將學習如何定義模型、建立數據庫連接、執行CRUD操作以及確保基于gin的項目中的數據完整性…

Moretl FileSync增量文件采集工具

永久免費: <下載> <使用說明> 我們希望Moretl FileSync是一款通用性很好的文件日志采集工具,解決工廠環境下,通過共享目錄采集文件,SMB協議存在的安全性,兼容性的問題. 同時,我們發現工廠設備日志一般為增量,為方便MES,QMS等后端系統直接使用數據,我們推出了增量采…

SWPU 2022 新生賽--web題

奇妙的MD5 進入靶場 然我們輸入一個特殊的字符串&#xff0c;然后我到處翻了翻&#xff0c;發現有提示 在MD5中有兩個特殊的字符串 0e215962017 //MD5加密后弱比較等于自身 ffifdyop //MD5加密后變成萬能密碼 這里明顯就是萬能密碼了 輸入之后就來到了這個頁…

PyQt6醫療多模態大語言模型(MLLM)實用系統框架構建初探(上.文章部分)

一、引言 1.1 研究背景與意義 在數字化時代,醫療行業正經歷著深刻的變革,智能化技術的應用為其帶來了前所未有的發展機遇。隨著醫療數據的指數級增長,傳統的醫療診斷和治療方式逐漸難以滿足現代醫療的需求。據統計,全球醫療數據量預計每年以 48% 的速度增長,到 2025 年將…

怎么樣把pdf轉成圖片模式(不能復制文字)

貴但好用的wps&#xff0c; 轉換——轉為圖片型pdf —————————————————————————————————————————— 轉換前&#xff1a; 轉換后&#xff1a; 肉眼可見&#xff0c;模糊了&#xff0c;且不能復制。 其他免費辦法&#xff0c;參考&…

C# OpenCV機器視覺:利用CNN實現快速模板匹配

在一個陽光燦爛的周末&#xff0c;阿強正癱在沙發上&#xff0c;百無聊賴地換著電視頻道。突然&#xff0c;一則新聞吸引了他的注意&#xff1a;某博物館里一幅珍貴的古畫離奇失蹤&#xff0c;警方懷疑是被一伙狡猾的盜賊偷走了&#xff0c;現場只留下一些模糊不清的監控畫面&a…

智能電動汽車系列 --- 智能汽車向車載軟件轉型

我是穿拖鞋的漢子,魔都中堅持長期主義的汽車電子工程師。 老規矩,分享一段喜歡的文字,避免自己成為高知識低文化的工程師: 簡單,單純,喜歡獨處,獨來獨往,不易合同頻過著接地氣的生活,除了生存溫飽問題之外,沒有什么過多的欲望,表面看起來很高冷,內心熱情,如果你身…

YOLOv8改進,YOLOv8檢測頭融合DynamicHead,并添加小目標檢測層(四頭檢測),適合目標檢測、分割等,全網獨發

摘要 作者提出一種新的檢測頭,稱為“動態頭”,旨在將尺度感知、空間感知和任務感知統一在一起。如果我們將骨干網絡的輸出(即檢測頭的輸入)視為一個三維張量,其維度為級別 空間 通道,這樣的統一檢測頭可以看作是一個注意力學習問題,直觀的解決方案是對該張量進行全自…

[ Spring ] Spring Cloud Gateway 2025 Comprehensive Overview

文章目錄 Spring Gateway ArchitectureProject Level DependencyService CenterService ProviderGateway ServiceLaunch All Service Spring Gateway Architecture Service Center : register and find service providerService Provider : programs that provide actual serv…

GitLab配置免密登錄和常用命令

SSH 免密登錄 Windows免密登錄 刪除現有Key 訪問目錄&#xff1a;C:\Users\Administrator\ .ssh&#xff0c;刪除公鑰&#xff1a;id_rsa.pub &#xff0c;私鑰&#xff1a;id_rsa 2.生成.ssh 秘鑰 運行命令生成.ssh 秘鑰目錄&#xff08; ssh-keygen -t rsa -C xxxxxx126.…