這段代碼展示了如何使用 Apache Flink 將數據流寫入 MySQL 數據庫,并使用了?JdbcSink
?來實現自定義的 Sink 邏輯。以下是對代碼的詳細解析和說明:
代碼結構
包聲明:
package sink
定義了代碼所在的包。導入依賴:
導入了必要的 Flink 和 JDBC 相關類庫,包括:java.sql.PreparedStatement
:用于執行 SQL 語句。org.apache.flink.connector.jdbc
:Flink 的 JDBC 連接器相關類。org.apache.flink.streaming.api.scala._
:Flink 流處理 API。
sinkToMysql
?對象:
主程序入口,包含 Flink 流處理邏輯和 MySQL Sink 的配置。
package sinkimport java.sql.PreparedStatementimport org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: sink* @author: 趙嘉盟-HONOR* @data: 2023-11-20 15:23* @DESCRIPTION**/
object sinkToMysql {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))data.addSink( JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1,u.user)t.setString(2,u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.jdbc.jdbc.Driver").withUsername("root").withPassword("root").build()))env.execute("sinkRedis")}
}
基于scala使用flink將讀取到的數據寫入到Mysql
data.addSink( JdbcSink.sink(...) )
:這行代碼將一個JdbcSink添加到Flink的數據流中,用于將數據寫入到數據庫中。"insert into clicks values(?,?)"
:這是SQL語句,表示將用戶和URL插入到名為clicks的表中。new JdbcStatementBuilder[Event] {...}
:這是一個匿名內部類,用于構建PreparedStatement對象。在這個類中,我們重寫了accept
方法,該方法接受一個PreparedStatement對象和一個Event對象,然后將Event對象的user和url屬性設置到PreparedStatement對象中。new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()...
:這是一個JdbcConnectionOptionsBuilder對象,用于構建數據庫連接選項。在這個對象中,我們設置了數據庫的URL、驅動名稱、用戶名和密碼。.build()
:這是JdbcConnectionOptionsBuilder對象的一個方法,用于構建JdbcConnectionOptions對象。
代碼解析
(1) 主程序入口
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment
- 創建 Flink 流處理環境?
StreamExecutionEnvironment
。
(2) 定義數據流
val data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L)
)
- 使用?
fromElements
?方法生成一個包含 4 個?Event
?對象的流。
(3) 自定義 MySQL Sink
data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1, u.user)t.setString(2, u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.jdbc.jdbc.Driver").withUsername("root").withPassword("root").build()
))
- 使用?
JdbcSink.sink
?方法將數據寫入 MySQL:- SQL 語句:
insert into clicks values(?,?)
,插入?user
?和?url
?字段。 JdbcStatementBuilder
:用于將?Event
?對象映射到 SQL 語句的參數。JdbcConnectionOptions
:配置 MySQL 連接信息,包括 URL、驅動名稱、用戶名和密碼。
- SQL 語句:
(4) 執行任務
env.execute("sinkRedis")
- 啟動 Flink 流處理任務,任務名稱為?
sinkRedis
。
優化版本
異常處理
- 在 Sink 中添加異常處理邏輯,避免程序因 MySQL 寫入失敗而崩潰:
data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {try {t.setString(1, u.user)t.setString(2, u.url)} catch {case e: Exception => e.printStackTrace()}}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").build() ))
批量寫入
- 如果需要提高寫入性能,可以啟用批量寫入功能:
data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {t.setString(1, u.user)t.setString(2, u.url)}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").withBatchSize(1000) // 設置批量大小.build() ))
優化后的代碼
以下是優化后的完整代碼:
package sinkimport java.sql.PreparedStatement
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala._object sinkToMysql {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval data = env.fromElements(Event("Mary", "./home", 100L),Event("Sum", "./cart", 500L),Event("King", "./prod", 1000L),Event("King", "./root", 200L))data.addSink(JdbcSink.sink("insert into clicks values(?,?)",new JdbcStatementBuilder[Event] {override def accept(t: PreparedStatement, u: Event): Unit = {try {t.setString(1, u.user)t.setString(2, u.url)} catch {case e: Exception => e.printStackTrace()}}},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://master:3306/test").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").withBatchSize(1000) // 啟用批量寫入.build()))env.execute("sinkToMysql")}
}