flink 讀取debezium-json數據獲取數據操作類型op/rowkind方法。
op類型有c(create),u(update),d(delete)
參考官網案例:此處的"op": "u",
就是操作類型。
{"before": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.18},"after": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.15},"source": {...},"op": "u","ts_ms": 1589362330904,"transaction": null
}
思路:添加自定義metareader
具體修改方法:去github flink 下載flink源碼,選擇tags,后按照自己需要的版本下載。
至idea中。修改org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat
在enum ReadableMetadata
枚舉類中添加枚舉對象:
注意:operation.type
可以按需更改,opudf
也能按需更改,但不能是op
,否則原有的沖突。
添加如下代碼:
OPERATION_TYPE("operation.type",DataTypes.STRING().nullable(),true,DataTypes.FIELD("opudf", DataTypes.STRING()),new MetadataConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(GenericRowData row, int pos) {return row.getString(2);}})
編譯項目:
# install
mvn clean package -DskipTests "-Dmaven.test.skip=true" "-Pfast" -pl flink-formats/flink-json
# maven install 到本地
mvn install:install-file "-DgroupId=org.apache.flink" "-DartifactId=flink-json-udf" "-Dversion=1.13.6" -Dpackaging=jar "-Dfile=D:\projects\flink-release-1.13.6\flink-formats\flink-json\target\flink-json-1.13.6.jar"# gradle 不能使用 implement(files("libs/flink-json-1.13.6.jar")) 的方式。需要直接加載到倉庫。或者user/.m2目錄下的臨時倉庫。
# 如下路徑不能直接復制,可供參考,自己找下。
copy D:\projects\flink-release-1.13.6\flink-formats\flink-json\target\flink-json-1.13.6.jar D:\env\Gradle_Repository\caches\modules-2\files-2.1\org.apache.flink\flink-json\1.13.6\cb4daaf018e2
10faa54e76488cbb11e179502728
使用方法:
添加列定義:
opt_type STRING METADATA FROM 'value.operation.type' VIRTUAL,
create table test.some_debezium
(event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,opt_type STRING METADATA FROM 'value.operation.type' VIRTUAL,id BIGINT,name STRING,) WITH('connector'='kafka','topic'='your_topic','properties.bootstrap.servers'='host1:6667,host2:6667,host3:6667','properties.group.id'='your_group_id','properties.security.protocol'='SASL_PLAINTEXT','properties.sasl.kerberos.service.name'='kafka','format'='debezium-json','debezium-json.timestamp-format.standard'='ISO-8601','debezium-json.schema-include'='true','debezium-json.ignore-parse-errors'='true'
);