Flink之JDBCSink連接MySQL

輸出到MySQL

  1. 添加依賴
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency>
<dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.32</version>
</dependency>
  1. 啟動MySQL, 在test庫下建表clicks
CREATE TABLE `clicks` (`user` VARCHAR(100) NOT NULL,`url` VARCHAR(100) DEFAULT NULL,`ts` BIGINT DEFAULT NULL
) ENGINE=INNODB DEFAULT CHARSET=utf8
  1. 示例代碼
public class Flink04_JdbcSink {public static void main(String[] args) {//1.創建運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默認是最大并行度env.setParallelism(1);DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);//SinkFunction<Event> sink = JdbcSink.sink("insert into clicks(user, url, ts) values (?,?,?)", new JdbcStatementBuilder<Event>() {@Overridepublic void accept(PreparedStatement preparedStatement, Event event) throws SQLException {//給SQL的占位符賦值preparedStatement.setString(1, event.getUser());preparedStatement.setString(2, event.getUrl());preparedStatement.setLong(3, event.getTs());}},JdbcExecutionOptions.builder().withBatchSize(5).withBatchIntervalMs(10000).withMaxRetries(3).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("000000").withUrl("jdbc:mysql://hadoop102:3306/flink").build());ds.addSink(sink);try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

MySQL的冪等性處理

  1. 將插入關鍵字替換為replace,如果主鍵重復,將除了主鍵外的所有字段都替換。
  2. 使用on duplicate key update 字段名 = values(字段名)語法,如果主鍵重復,可以選擇部分字段進行替換,其余字段保持不變。
  3. 示例代碼
public class Flink05_JdbcSinkReplace {public static void main(String[] args) {//1.創建運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默認是最大并行度env.setParallelism(1);DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);SingleOutputStreamOperator<WordCount> countDs =ds.map(event -> new WordCount(event.getUrl(), 1)).keyBy(WordCount::getWord).sum("count");//SinkFunction<WordCount> sink = JdbcSink.sink(
//                "replace into url_count(url, cnt) values (?,?)""insert into url_count(url, cnt) values(?,?) on duplicate key update cnt = values(cnt)",new JdbcStatementBuilder<WordCount>() {@Overridepublic void accept(PreparedStatement preparedStatement, WordCount wordCount) throws SQLException {//注意:這里的起始下標是1preparedStatement.setString(1, wordCount.getWord());preparedStatement.setInt(2, wordCount.getCount());}},JdbcExecutionOptions.builder().withBatchSize(5).withBatchIntervalMs(10000).withMaxRetries(3).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("000000").withUrl("jdbc:mysql://hadoop102:3306/flink").build());countDs.addSink(sink);try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/208009.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/208009.shtml
英文地址,請注明出處:http://en.pswp.cn/news/208009.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

在pytorch中自定義dataset讀取數據

這篇是我對嗶哩嗶哩up主 霹靂吧啦Wz 的視頻的文字版學習筆記 感謝他對知識的分享 有關我們數據讀取預訓練 以及如何將它打包成一個一個batch輸入我們的網絡的 首先我們來看一下之前我們在講resnet網絡時所使用的源碼 我們去使用了官方實現的image folder去讀取我們的圖像數據 然…

xilinx的XVC協議

文章目錄 概述JTAG工作方式XVC協議 其他Debug Bridge IP 概述 JTAG工作方式 XVC協議 其他 Debug Bridge IP

Python正則表達式指南

正則表達式指南 摘要 本文是關于在 Python 中通過 re 模塊使用正則表達式的入門教程。它提供了比“標準庫參考”的相關章節更平易的介紹。 引言 正則表達式&#xff08;Regular expressions&#xff0c;也叫 REs、 regexs 或 regex patterns&#xff09;&#xff0c;本質上…

設計模式基礎——概述(1/2)

目錄 一、設計模式的定義 二、設計模式的三大類別 三、設計模式的原則 四、主要設計模式目錄 4.1 創建型模式&#xff08;Creational Patterns&#xff09; 4.2 結構型模式&#xff08;Structural Patterns&#xff09; 4.3 行為型模式&#xff08;Behavioral Patterns&…

Vue腳手架 生命周期 組件化開發

Vue腳手架 & 生命周期 & 組件化開發 一、今日目標 1.生命周期 生命周期介紹生命周期的四個階段生命周期鉤子聲明周期案例 2.綜合案例-小黑記賬清單 列表渲染添加/刪除餅圖渲染 3.工程化開發入門 工程化開發和腳手架項目運行流程組件化組件注冊 4.綜合案例-小兔…

yolov8 pose coco2yolo

import os import json from tqdm import tqdm import argparseparser argparse.ArgumentParser() # 這里根據自己的json文件位置&#xff0c;換成自己的就行 parser.add_argument(--json_path,defaultrC:\Users\k167\Desktop\dataset\person_dataset/instances_val2017_perso…

Echarts運用之柱狀圖常見問題及案例代碼

前言 ECharts 是一個開源的 JavaScript 可視化庫,用于生成各種類型的圖形和圖表。其中,柱狀圖(Bar chart)是一種常見的圖表類型,用于表示不同類別之間的數值比較。 初學者,可參考下我的另外一篇文章,從基礎到深入,解讀柱狀圖的運用。 Echarts之柱狀圖 常見問題及案例…

MQTT協議對比TCP網絡性能測試模擬弱網測試

MQTT正常外網壓測數據---時延diff/ms如下圖&#xff1a; MQTT弱網外網壓測數據 TCP正常外網壓測數據 TCP弱網外網壓測數據 結論&#xff1a; 在弱網場景下&#xff0c;MQTT和TCP的網絡性能表現會有所不同。下面是它們在弱網環境中的對比&#xff1a; 連接建立&#xff1a;M…

python文件讀寫

文章目錄 讀文件python2&python3差異示例代碼 文件路徑問題處理&#xff1a;字符編碼報錯 讀文件 python2&python3差異 普通模式&#xff08;python2、python3通用&#xff09; f open(fileName, moder)open函數在python2和python3差異點&#xff1a; python3支持…

【代碼隨想錄刷題】Day20 二叉樹06

文章目錄 1.【654】最大二叉樹1.1 題目描述1.2 解題思路1.3 java代碼實現1.4 總結 2.【617】合并二叉樹2.1 題目描述2.2 解題思路2.3 java代碼實現 3.【700】二叉搜索樹中的搜索3.1 題目描述3.2 解題思路3.3 java代碼實現 4.【98】驗證二叉搜索樹4.1 題目描述4.2 解題思路4.3 j…

盤點11月Sui生態發展,了解Sui的近期成長歷程!

11月是Web3的“回暖期”&#xff0c;行業持續展現增長趨勢。Sui緊隨行業腳步&#xff0c;開展了一系列生態活動。其中歷時一個多月的Quest 3游戲活動順利結束并公布獎勵&#xff0c;在多地區成功舉辦Move和Sui生態黑客松&交流會&#xff0c;還有針對中文社區開發者教育的星…

MQTT協議對比QUIC網絡性能測試模擬弱網測試

MQTT正常外網壓測數據---時延diff/ms如下圖&#xff1a; MQTT弱網外網壓測數據 QUIC正常外網壓測數據 QUIC弱網外網壓測數據 結論&#xff1a; 在弱網情況下&#xff0c;MQTT和QUIC&#xff08;Quick UDP Internet Connections&#xff09;這兩種協議的網絡性能表現也會有…

Axure原型圖表組件庫,數據可視化元件(Axure9大屏組件)

針對Axure制作的大屏圖表元件庫&#xff0c;幫助產品經理更高效地制作高保真圖表原型&#xff0c;是產品經理必備元件工具。現分享完整的組件庫&#xff0c;大家一起學習。 本組件庫的圖表模塊&#xff0c;已包含所有常用的圖表&#xff0c;以下為部分組件截圖示意。文末可下載…

頁面初始化后,需要滾動到某個元素的位置,但是該元素尚未渲染完成。

vue方式 <template><div class"doc"><!-- 判斷是否還在渲染期間 --><div class"fixed" v-show"loading">頁面仍在渲染中&#xff0c;請稍后</div><div class"green" v-show"!loading">…

TA-Lib學習研究筆記(九)——Pattern Recognition (2)

TA-Lib學習研究筆記&#xff08;九&#xff09;——Pattern Recognition &#xff08;2&#xff09; 形態識別的函數的應用&#xff0c;通過使用A股實際的數據&#xff0c;驗證形態識別函數&#xff0c;用K線顯示出現標志的形態走勢&#xff0c;由于入口參數基本上是open, hig…

反向傳播算法

反向傳播算法的數學解釋 反向傳播算法是深度學習中用于訓練神經網絡的核心算法。它通過計算損失函數相對于網絡權重的梯度來更新權重&#xff0c;從而最小化損失。 反向傳播的基本原理 反向傳播算法基于鏈式法則&#xff0c;它按層反向傳遞誤差&#xff0c;從輸出層開始&…

寒冬不再寒冷:氣膜體育館如何打造溫馨運動天地

取暖季即將來臨&#xff0c;隨著氣溫逐漸下降&#xff0c;人們在寒冷的冬季里如何保持運動熱情和身體的健康成為了一項挑戰。而在這個時候&#xff0c;氣膜體育館成為了運動愛好者們的理想場所&#xff0c;提供如春般溫暖舒適的運動環境。那么&#xff0c;讓我們一起揭秘氣膜體…

2024年SEO策略:如何優化您的知識庫?

如今很多人在遇到問題時都會求助于谷歌。谷歌已經成為提供解決方案不可或缺的工具。作為全球搜索引擎的巨頭&#xff0c;擁有大量用戶流量。這就是為什么確保您的產品和服務在谷歌搜索結果中排名靠前是至關重要的&#xff0c;如果您想獲得更多的客戶&#xff0c;SEO是一個非常關…

Filed II 繪制超聲 3D/2D 點擴散函數

點擴散函數可以較好地描述超聲對成像目標分辨能力,利用 filed II 仿真工具實現點擴算函數 PSF 的 3D 和 2D 繪制。 定義換能器基本參數 f0=5e6; % Transducer center frequency [Hz] fs=100e6; % Sampling frequency [Hz] c=1540; % Speed of sound [m/s] width=0.15/1000

<Linux> 文件系統

目錄 前言&#xff1a; 一、 磁盤 &#xff08;一&#xff09;磁盤的物理結構 &#xff08;二&#xff09;磁盤的物理存儲結構 1. 數據存儲 2. 存儲結構 二、磁盤的邏輯抽象 三、磁盤信息 &#xff08;一&#xff09;具體結構 &#xff08;二&#xff09;重新認識目錄…