flink sql讀hive catalog數據,將string類型的時間戳數據排序后寫入kafka,如何保障寫入kafka的數據是有序的

在 Flink SQL 中,要確保從 Hive 讀取的 STRING 類型時間戳數據排序后有序寫入 Kafka,需要結合 批處理模式時間類型轉換單分區寫入Kafka 生產者配置。以下是完整解決方案:

一、核心解決方案

1. 批處理模式 + 全局排序

將作業設置為批處理模式,并對字符串時間戳進行類型轉換后排序:

-- 設置為批處理模式(關鍵!)
SET 'execution.runtime-mode' = 'batch';-- 從 Hive 讀取數據
CREATE TABLE hive_source (id STRING,ts_str STRING,  -- 字符串類型的時間戳,如 '2024-01-01 12:00:00' 或 '1718524800000'value DOUBLE
) WITH ('connector' = 'hive','catalog-name' = 'myhive','database-name' = 'default','table-name' = 'my_table'
);-- 創建 Kafka 目標表(單分區)
CREATE TABLE kafka_sink (id STRING,ts_str STRING,value DOUBLE
) WITH ('connector' = 'kafka','topic' = 'output_topic','properties.bootstrap.servers' = 'kafka:9092','properties.max.in.flight.requests.per.connection' = '1',  -- 確保生產者按順序發送'properties.acks' = 'all',  -- 等待所有副本確認'format' = 'json'
);-- 轉換時間戳類型并全局排序后寫入 Kafka
INSERT INTO kafka_sink
SELECT id,ts_str,value
FROM hive_source
ORDER BY CASE WHEN REGEXP_EXTRACT(ts_str, '^\\d{4}-\\d{2}-\\d{2}', 0) != '' THEN TO_TIMESTAMP(ts_str)  -- 處理 'yyyy-MM-dd HH:mm:ss' 格式ELSE TO_TIMESTAMP_LTZ(CAST(ts_str AS BIGINT), 3)  -- 處理毫秒時間戳END ASC;  -- 按時間升序排列

2. 強制寫入單 Kafka 分區

通過 固定分區鍵 確保所有數據寫入同一 Kafka 分區:

-- 創建帶分區鍵的 Kafka 表
CREATE TABLE kafka_sink (id STRING,ts_str STRING,value DOUBLE,partition_key STRING  -- 用于分區的字段
) WITH ('connector' = 'kafka','topic' = 'output_topic','properties.bootstrap.servers' = 'kafka:9092','format' = 'json','sink.partitioner' = 'fixed'  -- 使用固定分區器
);-- 寫入時指定相同分區鍵(確保所有數據在同一分區內有序)
INSERT INTO kafka_sink
SELECT id,ts_str,value,'fixed_key' AS partition_key  -- 固定分區鍵,所有數據寫入同一分區
FROM (SELECT *,CASE WHEN REGEXP_EXTRACT(ts_str, '^\\d{4}-\\d{2}-\\d{2}', 0) != '' THEN TO_TIMESTAMP(ts_str) ELSE TO_TIMESTAMP_LTZ(CAST(ts_str AS BIGINT), 3) END AS ts_time  -- 轉換為時間類型FROM hive_source
)
ORDER BY ts_time ASC;  -- 按轉換后的時間排序

二、關鍵配置說明

配置項作用
execution.runtime-mode = 'batch'啟用批處理模式,支持全局排序(流模式僅支持時間屬性字段排序)
properties.max.in.flight.requests.per.connection = '1'限制 Kafka 生產者并發請求數,確保消息按順序發送
properties.acks = 'all'等待所有 Kafka 副本確認,保證消息不丟失
sink.partitioner = 'fixed'使用固定分區器,結合相同分區鍵,確保所有數據寫入同一分區

三、注意事項

  1. 時間戳格式適配

    • 代碼示例中通過 REGEXP_EXTRACT 自動判斷格式(字符串日期或毫秒),需根據實際數據調整。
    • 若格式固定,可簡化為單一轉換函數(如 TO_TIMESTAMP(ts_str))。
  2. 性能與有序性權衡

    • 單分區寫入會導致吞吐量下降,適合對順序要求極高但數據量較小的場景。
    • 若數據量大,可考慮按時間窗口分組,每個窗口內有序寫入不同分區。
  3. Kafka 主題配置

    • 確保 Kafka 主題的分區數至少為 1。若需更高吞吐量,可增加分區但需接受不同分區間可能亂序。

四、驗證方法

  1. 檢查 Kafka 消息順序

    kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--topic output_topic \--from-beginning | jq -r '.ts_str'  # 使用 jq 解析 JSON 中的時間戳字段
    
  2. 在 Flink WebUI 中觀察

    • 訪問 http://jobmanager-host:8081,查看作業是否正常完成,以及 sink 算子的并行度是否為 1(若設置)。

五、總結

要保障寫入 Kafka 的數據有序,需同時滿足:

  1. 批處理模式:確保全局排序生效。
  2. 類型轉換:將字符串時間戳正確轉換為 TIMESTAMPTIMESTAMP_LTZ 類型。
  3. 單分區寫入:通過固定分區鍵將所有數據路由到同一 Kafka 分區。
  4. 生產者配置:限制并發請求,確保消息按順序發送和確認。

通過以上步驟,可實現從 Hive 到 Kafka 的有序數據傳輸。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/89188.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/89188.shtml
英文地址,請注明出處:http://en.pswp.cn/web/89188.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

7.17 滑動窗口 |assign |memo

lcp56. memo優化tle或者改用bfsclass Solution {int m, n;int dx[4] {0, 0, 1, -1};int dy[4] {1, -1, 0, 0};public:int conveyorBelt(vector<string>& matrix, vector<int>& start, vector<int>& end) {int ret INT_MAX;m matrix.size();n…

統計功效是什么?

統計功效的通俗理解可以把“統計功效”想象成偵探破案的能力——它代表統計檢驗&#xff08;偵探&#xff09;在犯罪事實確實存在&#xff08;真實效應存在&#xff09;時&#xff0c;成功發現真相&#xff08;檢測出效應&#xff09;的概率。核心比喻假設你是一個偵探&#xf…

大語言模型(LLM)訓練的教師強制(Teacher Forcing)方法

大語言模型&#xff08;LLM&#xff09;在訓練時使用一種名為“教師強制&#xff08;Teacher Forcing&#xff09;”的方法&#xff0c;而不是它們在推理&#xff08;生成文本&#xff09;時使用的“自回歸&#xff08;Autoregressive&#xff09;”方法 。闡明關于LLM訓練的一…

歸一化與激活函數:深度學習的雙引擎

歸一化和激活函數區別 歸一化和激活函數是深度學習中兩個不同但又存在關聯的技術,前者聚焦于“數據分布的調整”,后者聚焦于“引入非線性與輸出轉換”。 Softmax 既可以被視為一種歸一化操作,也屬于激活函數 因為它同時滿足兩者的核心特征,只是從不同角度定義:從“輸出…

C# --- 單例類錯誤初始化 + 沒有釋放資源導致線程泄漏

C# --- 單例類錯誤初始化 沒有釋放資源導致線程泄漏Background原因分析問題一&#xff1a; 錯誤初始化&#xff08;使用了箭頭函數&#xff09;問題一&#xff1a; 沒有Dispose資源Background 背景: service A的其中一個Api會向mq發送消息問題&#xff1a;線上發現這個服務經常…

MySQL基礎學習之DML,DQL(二)

這里寫目錄標題一、DML1、INSERT語句1)、給指定列添加數據2)、給全部列添加數據3)、批量數據添加數據4)、操作2、UPDATE語句3、DELETE語句二、DQL1、單表查詢1&#xff09;查詢語法2&#xff09;查詢全部3&#xff09;查詢部分4&#xff09;條件查詢5&#xff09;聚合函數6&…

在 Linux 系統中實現 Spring Boot 程序自動啟動的最佳實踐

在實際部署 Spring Boot 項目的生產環境中&#xff0c;如何確保服務自動啟動&#xff08;如開機自動運行、宕機自動恢復&#xff09;是一項基礎而關鍵的運維能力。本文將系統介紹如何在 Linux 中將 Spring Boot 應用注冊為 systemd 服務&#xff0c;實現進程守護與自動啟動。&a…

如何建立項目團隊的自驅力文化?

建立項目團隊的自驅力文化&#xff0c;關鍵在于賦權機制、目標共創、持續反饋、內在激勵、價值認同。 其中&#xff0c;“目標共創”尤其重要。項目成員若未參與目標制定&#xff0c;僅被動接受任務&#xff0c;將很難激發責任感和參與熱情。反之&#xff0c;通過共創目標&…

【React Native】布局文件-底部TabBar

布局文件-底部tabBar 內容配置 export default function Layout() {return (<Tabs />); }默認會將布局文件是將與它在同一個目錄的所有文件&#xff0c;包括下級目錄的文件&#xff0c;全都配置成Tab了。&#xff1a; 這樣做顯然不對&#xff0c;正確的做法是 在app目…

CompareFace使用

CompareFace 使用 CompareFace 有三種服務&#xff0c;分別是人臉識別&#xff08;RECOGNITION&#xff09;、人臉驗證&#xff08;VERIFICATION&#xff09;、人臉檢測&#xff08;DETECTION&#xff09;。 人臉識別其實就是人臉身份識別(每張照片只有一個人臉)&#xff0c;…

APP測試之Monkey壓力測試

&#xff08;一&#xff09;Monkey簡介 Monkey意指猴子&#xff0c;頑皮淘氣。所以Monkey測試&#xff0c;顧名思義也就像猴子一樣在軟件上亂敲按鍵&#xff0c;猴子什么都不懂&#xff0c;就愛搗亂。 Monkey 是 Android SDK 自帶的命令行工具&#xff0c;它通過向系統發送偽…

時序大模型為時序數據庫帶來的變革與機遇

時序數據&#xff08;Time Series Data&#xff09;作為記錄系統狀態隨時間變化的重要數據類型&#xff0c;在物聯網、金融交易、工業監控等領域呈爆炸式增長。傳統時序數據庫專注于高效存儲和查詢時序數據&#xff0c;而時序大模型&#xff08;Time Series Foundation Models&…

深入核心:理解Spring Boot的三大基石:起步依賴、自動配置與內嵌容器

深入核心&#xff1a;理解Spring Boot的三大基石&#xff1a;起步依賴、自動配置與內嵌容器 摘要&#xff1a;在上一章&#xff0c;我們領略了Spring Boot帶來的革命性開發體驗。但魔法的背后&#xff0c;必有其科學的支撐。本章將帶你深入Spring Boot的內核&#xff0c;系統性…

達夢數據庫配置兼容MySQL

前言 作為一名數據庫管理員或開發者&#xff0c;當項目需要從MySQL遷移到達夢數據庫時&#xff0c;最關心的莫過于兼容性問題。達夢作為國產數據庫的佼佼者&#xff0c;提供了良好的MySQL兼容模式&#xff0c;今天我就來分享一下如何配置達夢數據庫以實現對MySQL的兼容。 一、為…

js與vue基礎學習

vue創建項目 安裝node安裝node、npm、cnpm node -v npm -v #npm服務器位置處于國外&#xff0c;下載包的速度會比較緩慢。阿里為國內用戶提供的cnpm&#xff0c;他是npm的鏡像&#xff0c;下載第三方包時&#xff0c;們完全可以使用cnpm來替代npm。 cnpm -v在node中執行JavaScr…

【開源.NET】一個 .NET 開源美觀、靈活易用、功能強大的圖表庫

文章目錄一、項目介紹二、適用場景三、功能模塊四、功能特點五、效果展示六、開源地址一、項目介紹 LiveCharts2 是一個開源、簡單、靈活、交互式且功能強大的 .NET 圖表庫。LiveCharts2 現在幾乎可以在任何地方運行&#xff1a;Maui、Uno Platform、Blazor-wasm、WPF、WinFor…

使用Whistle自定義接口返回內容:Mock流式JSON數據全解析

一.mock接口返回數據流程 定位目標接口 在Whistle的Network面板中找到需要Mock的接口&#xff0c;右鍵點擊請求信息&#xff0c;選擇COPY -> URL復制完整URL&#xff0c;確保URL路徑精確到具體接口。準備Mock數據 點擊對應接口&#xff0c;在右側面板切換到response標簽頁&a…

【前端】富文本編輯器插件 wangEditor 5 基本使用(Vue2)

https://www.wangeditor.com/v5 一、安裝 首先安裝editor yarn add wangeditor/editor # 或者 npm install wangeditor/editor --save安裝Vue2組件 yarn add wangeditor/editor-for-vue # 或者 npm install wangeditor/editor-for-vue --save或者Vue3 yarn add wangeditor/…

自適應哈希索引 和 日志緩沖區

目錄 1. 自適應哈希索引在內存中的位置 2. 自適應哈希索引的作用 3. 為什么要創建自適應哈希索引 4. 適應哈希索引的Key -Value如何設置&#xff1f; 5. 日志緩沖區在內存中的位置 6. 日志緩沖區的作用 7. 日志不通過LogBuffer直接寫入磁盤不行嗎&#xff1f; 1. 自適應哈…

中國旅行社協會在京召開“文旅人工智能應用研討會”,助力文旅創新發展

7月15日&#xff0c;由中國旅行社協會數字經濟專業委員會和在線旅行服務商分會聯合主辦的“人工智能技術在文旅產業中的應用”研討會在北京舉行。中國旅行社協會副會長、秘書長孫桂珍出席并致辭&#xff0c;中國工程院外籍院士、具身智能機器人專家張建偉、北京第二外國語學院旅…