【flink】 flink 讀取debezium-json數據獲取數據操作類型op/rowkind方法

flink 讀取debezium-json數據獲取數據操作類型op/rowkind方法。
op類型有c(create),u(update),d(delete)
參考官網案例:此處的"op": "u",就是操作類型。

{"before": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.18},"after": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.15},"source": {...},"op": "u","ts_ms": 1589362330904,"transaction": null
}

思路:添加自定義metareader
具體修改方法:去github flink 下載flink源碼,選擇tags,后按照自己需要的版本下載。
至idea中。修改org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat

enum ReadableMetadata枚舉類中添加枚舉對象:
注意:operation.type可以按需更改,opudf也能按需更改,但不能是op,否則原有的沖突。
添加如下代碼:

        OPERATION_TYPE("operation.type",DataTypes.STRING().nullable(),true,DataTypes.FIELD("opudf", DataTypes.STRING()),new MetadataConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(GenericRowData row, int pos) {return row.getString(2);}})

編譯項目:

# install
mvn clean package -DskipTests "-Dmaven.test.skip=true" "-Pfast" -pl flink-formats/flink-json
# maven install 到本地
mvn install:install-file "-DgroupId=org.apache.flink" "-DartifactId=flink-json-udf" "-Dversion=1.13.6" -Dpackaging=jar "-Dfile=D:\projects\flink-release-1.13.6\flink-formats\flink-json\target\flink-json-1.13.6.jar"# gradle 不能使用 implement(files("libs/flink-json-1.13.6.jar")) 的方式。需要直接加載到倉庫。或者user/.m2目錄下的臨時倉庫。
# 如下路徑不能直接復制,可供參考,自己找下。
copy D:\projects\flink-release-1.13.6\flink-formats\flink-json\target\flink-json-1.13.6.jar D:\env\Gradle_Repository\caches\modules-2\files-2.1\org.apache.flink\flink-json\1.13.6\cb4daaf018e2
10faa54e76488cbb11e179502728

使用方法:
添加列定義:
opt_type STRING METADATA FROM 'value.operation.type' VIRTUAL,

create table test.some_debezium
(event_time      TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,origin_table    STRING METADATA FROM 'value.source.table' VIRTUAL,opt_type        STRING METADATA FROM 'value.operation.type' VIRTUAL,id            BIGINT,name          STRING,) WITH('connector'='kafka','topic'='your_topic','properties.bootstrap.servers'='host1:6667,host2:6667,host3:6667','properties.group.id'='your_group_id','properties.security.protocol'='SASL_PLAINTEXT','properties.sasl.kerberos.service.name'='kafka','format'='debezium-json','debezium-json.timestamp-format.standard'='ISO-8601','debezium-json.schema-include'='true','debezium-json.ignore-parse-errors'='true'
);

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/85739.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/85739.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/85739.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

某手游cocos2dlua反編譯

一、獲取加載的luac文件 通過frida hook libccos2dlua.so 的luaL_loadbuffer函數對luac進行dump js代碼如下,得到dump后的lua文件 // 要加載的目標庫名 var targetLibrary "libcocos2dlua.so"; var dlopen Module.findExportByName(null, "dlope…

`toRaw` 與 `markRaw`:Vue3 響應式系統的細粒度控制

🤍 前端開發工程師、技術日更博主、已過CET6 🍨 阿珊和她的貓_CSDN博客專家、23年度博客之星前端領域TOP1 🕠 牛客高級專題作者、打造專欄《前端面試必備》 、《2024面試高頻手撕題》、《前端求職突破計劃》 🍚 藍橋云課簽約作者、…

Python文件遷移之Shutil庫詳解

Shutil是一個Python內置的用來高效處理文件和目錄遷移任務的庫。Shutil不僅支持基本的文件復制、移動和刪除操作,還具備處理大文件、批量遷移目錄、以及跨平臺兼容性等特性。通過使用Shutil,我們可以更加輕松地實現文件系統的管理和維護,本文…

【服務器R環境架構】基于 micromamba下載 R 庫包

目錄 準備工作:下載并安裝R環境下載并安裝R環境方式1:下載 .tar.bz2 壓縮包進行解壓執行(官方推薦)方式2: 創建并激活R環境 下載R庫包安裝CRAN包在 micromamba 中安裝 GitHub 包(如 BPST) 參考 …

基于 Apache POI 實現的 Word 操作工具類

基于 Apache POI 實現的 Word 操作工具類 這個工具類是讓 AI 寫的,已覆蓋常用功能。 如不滿足場景的可以讓 AI 繼續加功能。 已包含的功能: 文本相關: 添加文本、 設置字體顏色、 設置字體大小、 設置對齊方式、 設置字符間距、 設置字體加粗…

時間序列預測、分類 | 圖神經網絡開源代碼分享(上)

本期結合《時間序列圖神經網絡(GNN4TS)綜述》,整理了關于圖神經網絡在時間序列預測、分類等任務上的開源代碼和學習資料以供大家學習、研究。 參考論文:《A Survey on Graph Neural Networks for Time Series: Forecasting, Classification, Imputation,…

Vue 添加水印(防篡改: 刪除水印元素節點、修改水印元素的樣式)

MutationObserver_API: 觀察某一個元素的變化// index.vue<template><div class="container"><Watermark text="版權所有" style="background: #28c848"><!-- 可給圖片、視頻、div...添加水印 --><div class=&quo…

如何處理開發不認可測試發現的問題

解決方案 第一步&#xff1a;收集確鑿證據 確保有完整的復現結果準備詳細的記錄材料&#xff1a; 截屏錄屏操作步驟記錄 帶著這些證據與開發人員進行溝通 第二步&#xff1a;多角度驗證 如果與開發人員溝通無果&#xff1a; 競品分析&#xff1a;查看市場上同類產品如何…

linux生產環境下根據關鍵字搜索指定日志文件命令

grep -C 100 "error" server.log 用于在 server.log 文件中查找包含 “error” 的行&#xff0c;并同時顯示該行前后100行的上下文。這是排查日志問題的常用技巧&#xff0c;解釋一下&#xff1a; 命令參數詳解 grep&#xff1a;文本搜索工具&#xff0c;用于在文件…

用vue和echarts怎么寫一個甘特圖,并且是分段式瀑布流

vue echarts 甘特圖功能 index.vue <template><div ref"echart" id"echart" class"echart"></div> </template><script setup>import { nextTick, onMounted, ref } from "vue";import * as echarts f…

Pandas使用教程:從入門到實戰的數據分析利器

一、Pandas基礎入門 1.1 什么是Pandas Pandas是Python生態中核心的數據分析庫&#xff0c;提供高效的數據結構&#xff08;Series/DataFrame&#xff09;和數據分析工具。其名稱源于"Panel Data"&#xff08;面板數據&#xff09;和"Python Data Analysis"…

NuttX Socket 源碼學習

概述 NuttX 的 socket 實現是一個精心設計的網絡編程接口&#xff0c;提供了標準的 BSD socket API。該實現采用分層架構設計&#xff0c;支持多種網絡協議族&#xff08;如 TCP/IP、UDP、Unix域套接字等&#xff09;&#xff0c;具有良好的可擴展性和模塊化特性。 整體架構設…

基于YOLO的語義分割實戰(以豬的分割為例)

數據集準備 數據集配置文件 其實語義分割和目標檢測類似&#xff0c;包括數據集制備、存放格式基本一致像這樣放好即可。 然后需要編寫一個data.yaml文件&#xff0c;對應的是數據的配置文件。 train: C:\圖標\dan\語義分割pig\dataset\train\images #絕對路徑即可 val: C:\…

釘釘智能會議室集成指紋密碼鎖,臨時開門密碼自動下發

在當今快節奏的工作環境中&#xff0c;會議室的高效管理和使用成為了企業提升工作效率的關鍵一環。湖南某知名企業近期成功升級了原有使用的釘釘智能會議室系統&#xff0c;并配套使用了啟辰智慧聯網指紋密碼鎖&#xff0c;實現了會議室管理的智能化升級&#xff0c;提升了會議…

C++講解—類(1)

類 在 C 中&#xff0c;類是一個關鍵概念&#xff0c;憑借其封裝和繼承的特性&#xff0c;能夠助力程序員之間實現高效的分工協作&#xff0c;共同完成復雜的大型項目。我們先從最簡單的概念入手&#xff0c;再進行更深層次的了解和應用。 1. 類的定義 類是用戶自定義的一種…

什么是Hadoop Yarn

Hadoop YARN&#xff1a;分布式集群資源管理系統詳解 1. 什么是YARN&#xff1f; YARN&#xff08;Yet Another Resource Negotiator&#xff09;是 Apache Hadoop 生態系統中的資源管理和作業調度系統&#xff0c;最初在 Hadoop 2.0 中引入&#xff0c;取代了 Hadoop 1.0 的…

項目開發中途遇到困難的解決方案

1. 正視困難&#xff0c;避免逃避 開發遇阻時&#xff0c;退縮會帶來雙重損失&#xff1a;既成為"失敗者逃兵"&#xff0c;又損害職業信心1。 行動建議&#xff1a; 立即向團隊透明化問題&#xff08;如進度延遲、技術瓶頸&#xff09;&#xff0c;避免問題滾雪球…

Blender硬表面建模篇收集學習建模過程中的Demo

c 齒輪 創建一個圓柱體&#xff0c;選擇側面的所有&#xff0c;然后進行隔斷選擇&#xff0c;兩次擠出面&#xff0c;一次縮放面&#xff0c;通過圓柱面三次插入面縮放擠出得到齒輪中心&#xff0c;選中齒輪的鋸齒中間&#xff0c;然后進行相同周長選擇行選擇齒與齒中間的面&…

Chromium 136 編譯指南 macOS篇:獲取源代碼(四)

1. 引言 在現代軟件開發的宏大版圖中&#xff0c;源代碼的獲取往往標志著從理論探索向實踐應用的關鍵轉折。對于Chromium 136這樣一個擁有超過2500萬行代碼、涉及數百個第三方庫的超大規模開源項目而言&#xff0c;源代碼的獲取不僅僅是簡單的文件下載&#xff0c;更是一個涉及…

OpenCV C++ 邊緣檢測與圖像分割

一、邊緣檢測 在數字圖像處理領域&#xff0c;邊緣檢測是一項至關重要的基礎技術。它如同為圖像賦予 “骨架”&#xff0c;幫助計算機快速識別圖像中的物體輪廓、形狀與結構&#xff0c;廣泛應用于目標識別、圖像分割、圖像配準等多個領域。 1.1 概念 邊緣檢測的核心目標是找…