Flink數據流高效寫入MySQL實戰

這段代碼展示了如何使用 Apache Flink 將數據流寫入 MySQL 數據庫,并使用了?JdbcSink?來實現自定義的 Sink 邏輯。以下是對代碼的詳細解析和說明:

代碼結構

  • 包聲明package sink
    定義了代碼所在的包。

  • 導入依賴
    導入了必要的 Flink 和 JDBC 相關類庫,包括:

    • java.sql.PreparedStatement:用于執行 SQL 語句。
    • org.apache.flink.connector.jdbc:Flink 的 JDBC 連接器相關類。
    • org.apache.flink.streaming.api.scala._:Flink 流處理 API。
  • sinkToMysql?對象
    主程序入口,包含 Flink 流處理邏輯和 MySQL Sink 的配置。

package sinkimport java.sql.PreparedStatementimport org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: sink* @author: 趙嘉盟-HONOR* @data: 2023-11-20 15:23* @DESCRIPTION**/
object sinkToMysql {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))data.addSink( JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1,u.user)t.setString(2,u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.jdbc.jdbc.Driver").withUsername("root").withPassword("root").build()))env.execute("sinkRedis")}
}

基于scala使用flink將讀取到的數據寫入到Mysql

  1. data.addSink( JdbcSink.sink(...) ):這行代碼將一個JdbcSink添加到Flink的數據流中,用于將數據寫入到數據庫中。

  2. "insert into clicks values(?,?)":這是SQL語句,表示將用戶和URL插入到名為clicks的表中。

  3. new JdbcStatementBuilder[Event] {...}:這是一個匿名內部類,用于構建PreparedStatement對象。在這個類中,我們重寫了accept方法,該方法接受一個PreparedStatement對象和一個Event對象,然后將Event對象的user和url屬性設置到PreparedStatement對象中。

  4. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()...:這是一個JdbcConnectionOptionsBuilder對象,用于構建數據庫連接選項。在這個對象中,我們設置了數據庫的URL、驅動名稱、用戶名和密碼。

  5. .build():這是JdbcConnectionOptionsBuilder對象的一個方法,用于構建JdbcConnectionOptions對象。

代碼解析

(1) 主程序入口
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment
  • 創建 Flink 流處理環境?StreamExecutionEnvironment
(2) 定義數據流
val data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L)
)
  • 使用?fromElements?方法生成一個包含 4 個?Event?對象的流。
(3) 自定義 MySQL Sink
data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1, u.user)t.setString(2, u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.jdbc.jdbc.Driver").withUsername("root").withPassword("root").build()
))
  • 使用?JdbcSink.sink?方法將數據寫入 MySQL:
    • SQL 語句insert into clicks values(?,?),插入?user?和?url?字段。
    • JdbcStatementBuilder:用于將?Event?對象映射到 SQL 語句的參數。
    • JdbcConnectionOptions:配置 MySQL 連接信息,包括 URL、驅動名稱、用戶名和密碼。
(4) 執行任務
env.execute("sinkRedis")
  • 啟動 Flink 流處理任務,任務名稱為?sinkRedis

優化版本

異常處理
  • 在 Sink 中添加異常處理邏輯,避免程序因 MySQL 寫入失敗而崩潰:
    data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {try {t.setString(1, u.user)t.setString(2, u.url)} catch {case e: Exception => e.printStackTrace()}}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").build()
    ))
批量寫入
  • 如果需要提高寫入性能,可以啟用批量寫入功能:
    data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1, u.user)t.setString(2, u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").withBatchSize(1000) // 設置批量大小.build()
    ))

優化后的代碼

以下是優化后的完整代碼:

package sinkimport java.sql.PreparedStatement
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._object sinkToMysql {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {try {t.setString(1, u.user)t.setString(2, u.url)} catch {case e: Exception => e.printStackTrace()}}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").withBatchSize(1000) // 啟用批量寫入.build()))env.execute("sinkToMysql")}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/89299.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/89299.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/89299.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MATLAB下載安裝教程(附安裝包)2025最新版(MATLAB R2024b)

文章目錄前言一、MATLAB R2024b下載二、MATLAB下載安裝教程前言 MATLAB R2024b 的推出,進一步提升了其在工程實踐中的實用性和專業性。它不僅提供了更多針對特定工程領域的解決方案,還在性能和兼容性方面進行了顯著改進。 本教程將一步一步引導完成 MA…

Linux 基礎命令學習,立即上手Linux操作

Linux?基礎命令學習本文挑選最常用、最容易上手的 Linux 命令。每條都附帶一句話說明 真實示例,直接復制即可練習,零基礎也能跟得上。1? 先掌握 目錄導航:pwd?/?ls?/?cdpwd – 顯示當前所在目錄 pwd # 輸出示例 /home/yournamels??a…

Android構建流程與Transform任務

1. 完整構建流程概覽 1.1 主要構建階段 預構建階段 → 代碼生成階段 → 資源處理階段 → 編譯階段 → Transform階段 → 打包階段1.2 詳細任務執行順序 ┌─────────────────────────────────────────────────────────…

CKS認證 | Day6 監控、審計和運行時安全 sysdig、falco、審計日志

一、分析容器系統調用:Sysdig Sysdig:定位是系統監控、分析和排障的工具,在 linux 平臺上,已有很多這方面的工具 如tcpdump、htop、iftop、lsof、netstat,它們都能用來分析 linux 系統的運行情況,而且還有…

Redis:持久化配置深度解析與實踐指南

🧠 1、簡述 Redis 是一款基于內存的高性能鍵值數據庫,為了防止數據丟失,Redis 提供了兩種主要的持久化機制:RDB(快照)和 AOF(追加日志)。本文將從原理到配置,再到實際項目…

共創 Rust 十年輝煌時刻:RustChinaConf 2025 贊助與演講征集正式啟動

🚀 共創 Rust 十年輝煌時刻:RustChinaConf 2025 贊助與演講征集正式啟動2025年,是 Rust 編程語言誕生十周年的里程碑時刻。在這個具有歷史意義的節點,RustChinaConf 2025 攜手 RustGlobal 首次登陸中國,聯合 GOSIM HAN…

EMS4100芯祥科技USB3.1高速模擬開關芯片規格介紹

EMS4100一款適用于USB Type-C應用的二通道差分2:1/1:2 USB 3.1高速雙向被動開關。該器件支持USB 3.1 Gen 1和Gen 2數據速率,具有高帶寬、低串擾、寬供電電壓范圍等特點。EMS4100芯片內部框架:EMS4100主要特性:2-獨立頻道1:2/2:1 M…

HTML 常用語義標簽與常見搭配詳解

一、什么是語義標簽&#xff1f; 語義標簽是 HTML5 引入的一組具有特定含義的標簽&#xff0c;用于描述頁面中不同部分的內容類型&#xff0c;如頁眉、導航欄、主內容區域、側邊欄、頁腳等。相比傳統的 <div> 和 <span>&#xff0c;語義標簽更具表達力和結構化。 …

遷移學習的概念和案例

遷移學習概念 預訓練模型 定義: 簡單來說別人訓練好的模型。一般預訓練模型具備復雜的網絡模型結構&#xff1b;一般是在大量的語料下訓練完成的。 預訓練語言模型的類別&#xff1a; 現在我們接觸到的預訓練語言模型&#xff0c;基本上都是基于transformer這個模型迭代而來…

DAOS系統架構-RDB

1. 概述 基于Raft共識算法和強大的領導地位策略&#xff0c;pool service和container service可以通過復制其內部的元數據來實現高可用。通過這種方法實現具有副本能力的服務可以容忍少數副本中的任何一個出現故障。通過將每個服務的副本分布在容災域中&#xff0c;pool servic…

深入GPU硬件架構及運行機制

轉自深入GPU硬件架構及運行機制 - 0向往0 - 博客園&#xff0c;基本上是其理解。 一、GPU概述 1.1 GPU是什么&#xff1f; GPU全稱是Graphics Processing Unit&#xff0c;圖形處理單元。它的功能最初與名字一致&#xff0c;是專門用于繪制圖像和處理圖元數據的特定芯片&…

數值計算庫:Eigen與Boost.Multiprecision全方位解析

在科學計算、工程模擬、機器學習等領域&#xff0c;高效的數值計算能力是構建高性能應用的基石。C作為性能優先的編程語言&#xff0c;擁有眾多優秀的數值計算庫&#xff0c;其中Eigen和Boost.Multiprecision是兩個極具代表性的工具。本文將深入探討這兩個庫的核心特性、使用場…

第十八節:第三部分:java高級:反射-獲取構造器對象并使用

Class提供的獲取類構造器的方法以及獲取類構造器的作用代碼&#xff1a;掌握獲取類的構造器&#xff0c;并對其進行操作 Cat類 package com.itheima.day9_reflect;public class Cat {private String name;private int age;private Cat(String name, int age) {this.name name;…

集中打印和轉換Office 批量打印精靈:Word/Excel/PDF 全兼容,效率翻倍

各位辦公小能手們&#xff01;你們平時辦公的時候&#xff0c;是不是經常要打印一堆文件&#xff0c;煩得要命&#xff1f;別慌&#xff0c;今天我給大家介紹一款超厲害的神器——Office批量打印精靈&#xff01; 軟件下載地址安裝包 這玩意兒啊&#xff0c;是專門為高效辦公設…

docker的搭建

一、安裝docker使用以下命令進行安裝dockerapt-get install docker.io docker-compose使用以下命令進行查看docker是否開啟systemctl status docker由此可見&#xff0c;docker沒有打開&#xff0c;進行使用命令打開。systemctl start docker再次查看是否開啟。肉眼可見&#x…

數據庫管理-第349期 Oracle DB 23.9新特性一覽(20250717)

數據庫管理349期 2025-07-17數據庫管理-第349期 Oracle DB 23.9新特性一覽&#xff08;20250717&#xff09;1 JavaScript過程和函數的編譯時語法檢查2 不再需要JAVASCRIPT上的EXECUTE權限3 GROUP BY ALL4 使用SQL創建并測試UUID5 IVF索引在線重組6 JSON到二元性遷移器&#xf…

將CSDN文章導出為PDF

作者&#xff1a;翟天保Steven 版權聲明&#xff1a;著作權歸作者所有&#xff0c;商業轉載請聯系作者獲得授權&#xff0c;非商業轉載請注明出處前言在日常學習和技術積累過程中&#xff0c;我們經常會在 CSDN 等技術博客平臺上閱讀高質量的技術文章。然而&#xff0c;網頁閱讀…

macOS - Chrome 關閉自動更新

進入 Google 相關資源文件夾 刪除 GoogleSoftwareUpdate 文件夾 open ~/Library/Google 部分教程推薦&#xff0c;在 chrome://flags/ 頁面設置&#xff0c;但最近沒看到 自動更新相關開關。2025-07-13&#xff08;日&#xff09;

Python 模塊化編程全解析:模塊、包與第三方庫管理指南

模塊與包 模塊化編程是什么&#xff1f;用生活例子秒懂 想象你在搭樂高積木&#xff1a; 每個小積木塊都有特定功能&#xff08;比如輪子、窗戶、墻壁&#xff09;—— 這就像模塊&#xff08;一個.py 文件&#xff0c;封裝了函數或類&#xff09;。把相關的積木塊裝進一個盒…

小白學Python,網絡爬蟲篇(2)——selenium庫

前言 selenium 庫是一種用于 Web 應用程序測試的工具&#xff0c;它可以驅動瀏覽器執行特定操作&#xff0c;自動按照腳本代碼做出單擊、輸入、打開、驗證等操作&#xff0c;支持的瀏覽器包括 IE、Firefox、Safari、Chrome、Opera 等。 與 requests 庫不同的是&#xff0c;se…