關于flink兩階段提交高并發下程序卡住問題

先拋出代碼

package com.dpf.flink;import com.dpf.flink.sink.MysqlSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;public class MysqlTwoPhaseCommit {//topicprivate static final String topic_ExactlyOnce = "TwoPhaseCommit";public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設置并行度,為了方便測試,查看消息的順序,這里設置為1,可以更改為多并行度env.setParallelism(1);//checkpoint的設置//每隔10s進行啟動一個檢查點【設置checkpoint的周期】env.enableCheckpointing(30000);//設置模式為:exactly_one,僅一次語義env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//確保檢查點之間有1s的時間間隔【checkpoint最小間隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);//檢查點必須在10s之內完成,或者被丟棄【checkpoint超時時間】env.getCheckpointConfig().setCheckpointTimeout(10000);//同一時間只允許進行一次檢查點env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//表示一旦Flink程序被cancel后,會保留checkpoint數據,以便根據實際需要恢復到指定的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設置statebackend,將檢查點保存在hdfs上面,默認保存在內存中。這里先保存到本地env.setStateBackend(new FsStateBackend("file:///Users/david.dong/tmp/flink/checkpoint"));//設置kafka消費參數Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, topic_ExactlyOnce);/*SimpleStringSchema可以獲取到kafka消息,JSONKeyValueDeserializationSchema可以獲取都消息的key,value,metadata:topic,partition,offset等信息*/FlinkKafkaConsumer<String> kafkaConsumer011 = new FlinkKafkaConsumer<>(topic_ExactlyOnce,new SimpleStringSchema(),properties);//加入kafka數據源DataStreamSource<String> streamSource = env.addSource(kafkaConsumer011);SingleOutputStreamOperator<Tuple2<String, Integer>> tupleStream = streamSource.map(str -> Tuple2.of(str, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));tupleStream.print();//數據傳輸到下游tupleStream.addSink(new MysqlSink()).name("MySqlTwoPhaseCommitSink");//觸發執行env.execute("StreamDemoKafka2Mysql");}
}
package com.dpf.flink.sink;import com.dpf.flink.utils.DBConnectUtil;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;public class MysqlSink extends TwoPhaseCommitSinkFunction<Tuple2<String,Integer>, Connection,Void> {private static final Logger log = LoggerFactory.getLogger(MysqlSink.class);public MysqlSink() {super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE);}/*** 執行數據庫入庫操作  task初始化的時候調用* @param connection* @param tuple* @param context* @throws Exception*/@Overrideprotected void invoke(Connection connection, Tuple2<String, Integer> tuple, Context context) throws Exception {log.info("start invoke...");String value = tuple.f0;Integer total = tuple.f1;String sql = "update student set name = 'aaa' where id = 1";log.info("====執行SQL:{}===",sql);PreparedStatement ps = connection.prepareStatement(sql);ps.setString(1, value);ps.setInt(2, total);ps.setLong(3, System.currentTimeMillis());log.info("要插入的數據:{}----{}",value,total);if (ps != null) {String sqlStr = ps.toString().substring(ps.toString().indexOf(":")+2);log.error("執行的SQL語句:{}",sqlStr);}//執行insert語句ps.execute();}/*** 獲取連接,開啟手動提交事物(getConnection方法中)* @return* @throws Exception*/@Overrideprotected Connection beginTransaction() throws Exception {log.info("start beginTransaction.......");String url = "jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";Connection connection = DBConnectUtil.getConnection(url, "root", "12345678");return connection;}/***預提交,這里預提交的邏輯在invoke方法中* @param connection* @throws Exception*/@Overrideprotected void preCommit(Connection connection) throws Exception {log.info("start preCommit...");}/*** 如果invoke方法執行正常,則提交事務* @param connection*/@Overrideprotected void commit(Connection connection) {log.info("start commit...");DBConnectUtil.commit(connection);}/*** 如果invoke執行異常則回滾事物,下一次的checkpoint操作也不會執行* @param connection*/@Overrideprotected void abort(Connection connection) {log.info("start abort rollback...");DBConnectUtil.rollback(connection);}
}
package com.dpf.flink.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;/*** 數據庫連接工具類*/
public class DBConnectUtil {private static final Logger log = LoggerFactory.getLogger(DBConnectUtil.class);/*** 獲取連接** @param url* @param user* @param password* @return* @throws SQLException*/public static Connection getConnection(String url, String user, String password) throws SQLException {Connection conn = null;try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {log.error("獲取mysql.jdbc.Driver失敗");e.printStackTrace();}try {conn = DriverManager.getConnection(url, user, password);log.info("獲取連接:{" + conn + "} 成功...");} catch (Exception e) {log.error("獲取連接失敗,url:" + url + ",user:" + user);}//設置手動提交conn.setAutoCommit(false);return conn;}/*** 提交事務*/public static void commit(Connection conn) {if (conn != null) {try {conn.commit();} catch (SQLException e) {log.error("提交事務失敗,Connection:" + conn);e.printStackTrace();} finally {close(conn);}}}/*** 事務回滾** @param conn*/public static void rollback(Connection conn) {if (conn != null) {try {conn.rollback();} catch (SQLException e) {log.error("事務回滾失敗,Connection:" + conn);e.printStackTrace();} finally {close(conn);}}}/*** 關閉連接** @param conn*/public static void close(Connection conn) {if (conn != null) {try {conn.close();} catch (SQLException e) {log.error("關閉連接失敗,Connection:" + conn);e.printStackTrace();}}}
}

這部分代碼網上抄的,但是大致不差

前提:

1.source消息密集,全據并行度設置1

2.sink就執行update操作,并且就update同一條數據,為了更好驗證問題

結果:

這邊我嘗試了很多次,中間有時候能順利執行,但是有時候程序在sink這里卡住了,過一段時間就報錯socket interrupt異常。

我的分析:

1.首先設置ck的間隔是10秒一次,那么當ck barrier到達sink算子的時候,就會進行預提交,并且立刻開啟一個新事物用來處理后續的消息。那么這里就會出現多事務同時存在的情況,比如預提交的事務A,和新開啟的事務B

2.新事物B開啟后立刻就可以繼續處理后續到來的消息。

3.那么此時如果事務A預提交后,他需要等待來自JobManager的complete指令,到代碼層面也就是調用notifyComplete方法來進行commit。那么加入在這個期間,就是還沒有收到complete指令的時候,事務B已經執行到了? ps.execute();這里,此時事務B就會卡住,因為他們都是操作同一條數據,那么問題來了,此時程序已經卡住了,也就是線程卡住了,那么此時就算JobManager發送complete指令了,然后調用notifyComplete方法,但是,此時沒有線程執行這個方法!(因為主線程卡在了(ps.execute();這里)所以整個程序就徹底卡住了。

然后flnik dag上看到的是sink紅了,前面的節點都黑了,就是背壓瞬間就很嚴重了

以上是我對flink兩階段提交存在的問題的分析,我的source消息大概100多萬,我分析是這個原因,如果分析的不對,還請大佬幫我分析下我哪里理解不對?或者為什么會卡住?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/77800.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/77800.shtml
英文地址,請注明出處:http://en.pswp.cn/web/77800.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

html css js網頁制作成品——HTML+CSS+js美甲店網頁設計(5頁)附源碼

美甲店 目錄 一、&#x1f468;?&#x1f393;網站題目 二、??網站描述 三、&#x1f4da;網站介紹 四、&#x1f310;網站效果 五、&#x1fa93; 代碼實現 &#x1f9f1;HTML 六、&#x1f947; 如何讓學習不再盲目 七、&#x1f381;更多干貨 一、&#x1f468;?&a…

LeetCode[347]前K個高頻元素

思路&#xff1a; 使用小頂堆&#xff0c;最小的元素都出去了&#xff0c;省的就是大&#xff0c;高頻的元素了&#xff0c;所以要維護一個小頂堆&#xff0c;使用map存元素高頻變化&#xff0c;map存堆里&#xff0c;然后輸出堆的東西就行了 代碼&#xff1a; class Solution…

2024年網站開發語言選擇指南:PHP/Java/Node.js/Python如何選型?

2024年網站開發語言選擇指南&#xff1a;PHP/Java/Node.js/Python如何選型&#xff1f; 一、8大主流Web開發語言技術對比 1. PHP開發&#xff1a;中小型網站的首選方案 最新版本&#xff1a;PHP 8.3&#xff08;2023年11月發布&#xff09;核心優勢&#xff1a; 全球78%的網站…

從數據結構說起(一)

1 揭開數據結構神奇的面紗 1.1 初識數據結構 在C的標準庫模板&#xff08;Standard Template Library,STL&#xff09;課程上&#xff0c;我初次結識了《數據結構》。C語言提供的標準庫模板是面向對象程序設計與泛型程序設計思想相結合的典范。所謂的泛型編程就是編寫不依賴于具…

JAVA--- 關鍵字static

之前我們學習了JAVA 面向對象的一些基本知識&#xff0c;今天來進階一下&#xff01;&#xff01;&#xff01; static關鍵字 static表示靜態&#xff0c;是JAVA中的一個修飾符&#xff0c;可以修飾成員方法&#xff0c;成員變量&#xff0c;可用于修飾類的成員&#xff08;變…

4.27比賽總結

文章目錄 T1T2法一&#xff1a;倍增求 LCA法二&#xff1a;Dijkstra 求最短路法三&#xff1a;dfs 求深度 T3T4總結 T1 一道非常簡單的題&#xff0c;結果我因為一句話沒寫掛了 80pts…… 題目中沒寫 a a a 數組要按照 b b b 數組的順序&#xff0c;所以對于最大方案&#x…

數據一致性巡檢總結:基于分桶采樣的設計與實現

數據一致性巡檢總結&#xff1a;基于分桶采樣的設計與實現 背景 在分布式系統中&#xff0c;緩存&#xff08;如 Redis&#xff09;與數據庫&#xff08;如 MySQL&#xff09;之間的數據一致性問題是一個常見的挑戰。由于緩存的引入&#xff0c;數據在緩存和數據庫之間可能存…

SpringBoot與Druid整合,實現主從數據庫同步

通過引入主從數據庫同步系統&#xff0c;可以顯著提升平臺的性能和穩定性&#xff0c;同時保證數據的一致性和安全性。Druid連接池也提供了強大的監控和安全防護功能&#xff0c;使得整個系統更加健壯和可靠。 我們為什么選擇Druid&#xff1f; 高效的連接管理&#xff1a;Dru…

在Linux系統中安裝MySQL,二進制包版

1、檢查是否已安裝數據庫&#xff08;rpm軟件包管理器&#xff09; rpm -qa | grep mysql rpm -qa | grep mariadb #centOS7自帶mariadb與mysql數據庫沖突2、刪除已有數據庫 rpm -e –nodeps 軟件名稱 3、官網下載MySQL包 4、上傳 # 使用FinalShell或Xshell工具上傳&#…

【含文檔+PPT+源碼】基于SpringBoot電腦DIY裝機教程網站的設計與實現

項目介紹 本課程演示的是一款 基于SpringBoot電腦DIY裝機教程網站的設計與實現&#xff0c;主要針對計算機相關專業的正在做畢設的學生與需要項目實戰練習的 Java 學習者。 1.包含&#xff1a;項目源碼、項目文檔、數據庫腳本、軟件工具等所有資料 2.帶你從零開始部署運行本套…

Spring Boot 緩存機制:從原理到實踐

文章目錄 一、引言二、Spring Boot 緩存機制原理2.1 緩存抽象層2.2 緩存注解2.3 緩存管理器 三、入門使用3.1 引入依賴3.2 配置緩存3.3 啟用緩存3.4 使用緩存注解3.5 實體類 四、踩坑記錄4.1 緩存鍵生成問題4.2 緩存過期與更新問題4.3 事務與緩存的一致性問題 五、心得體會5.1 …

Spark讀取Apollo配置

--conf spark.driver.extraJavaOptions-Dapp.idapollo的app.id -Denvfat -Dapollo.clusterfat -Dfat_metaapollo的meta地址 --conf spark.executor.extraJavaOptions-Dapp.idapollo的app.id -Denvfat -Dapollo.clusterfat -Dfat_metaapollo的meta地址 在spark的提交命令中&…

[逆向工程]如何理解小端序?逆向工程中的字節序陷阱與實戰解析

[逆向工程]如何理解小端序&#xff1f;逆向工程中的字節序陷阱與實戰解析 關鍵詞&#xff1a;逆向工程、小端序、字節序、二進制分析、數據解析 引言&#xff1a;為什么字節序是逆向工程師的必修課&#xff1f; 在逆向工程中&#xff0c;分析二進制數據是最基礎的任務之一。…

項目三 - 任務2:創建筆記本電腦類(一爹多叔)

在本次實戰中&#xff0c;我們通過Java的單根繼承和多接口實現特性&#xff0c;設計了一個筆記本電腦類。首先創建了Computer抽象類&#xff0c;提供計算的抽象方法&#xff0c;模擬電腦的基本功能。接著定義了NetCard和USB兩個接口&#xff0c;分別包含連接網絡和USB設備的抽象…

ElasticSearch深入解析(六):集群核心配置

1.開發模式和生產模式 Elasticsearch默認運行在開發模式下&#xff0c;此模式允許節點在配置存在錯誤時照常啟動&#xff0c;僅將警告信息寫入日志文件。而生產模式則更為嚴格&#xff0c;一旦檢測到配置錯誤&#xff0c;節點將無法啟動&#xff0c;這是一種保障系統穩定性的安…

【Prometheus-MySQL Exporter安裝配置指南,開機自啟】

目錄 1. 創建 MySQL 監控用戶2. 配置 MySQL 認證文件3. 安裝 mysqld_exporter4. 配置 Systemd 服務5. 啟動并驗證服務6. 修改Prometheus配置常見錯誤排查錯誤現象排查步驟 6. 驗證監控數據關鍵注意事項 7. Grafana看板 1. 創建 MySQL 監控用戶 mysql -uroot -p123456 # 登錄M…

redis未授權訪問漏洞學習

一、Redis常見用途 1. Redis介紹 全稱與起源: Redis全稱Remote Dictionary Service(遠程字典服務)&#xff0c;最初由antirez在2009年開發&#xff0c;用于解決網站訪問記錄統計的性能問題。發展歷程: 從最初僅支持列表功能的內存數據庫&#xff0c;經過十余年發展已支持多種…

4.27搭建用戶界面

更新 router下面的index.js添加新的children 先區分一下views文件夾下的不同vue文件&#xff1a; Home.vue是繪制home頁面的所有的表格。 Main.vue是架構頭部和左側目錄的框架的。 研究一下這個routes對象&#xff0c;就可以發現重定向redirect的奧妙所在&#xff0c;我們先把…

【MySQL】(8) 聯合查詢

一、聯合查詢的作用 由于范式的規則&#xff0c;數據分到多個表中&#xff0c;想要查詢完整的信息&#xff0c;就需要聯合查詢多張表。比如查詢學生的學生信息和所在班級的信息&#xff0c;就需要聯合查詢學生表和班級表。 二、聯合查詢過程 案例&#xff1a;查詢學生姓名為孫…

圖漾官網Sample_V1版本C++語言完整參考例子---單相機版本

文章目錄 1.參考例子 主要梳理了圖漾官網Sample_V1版本的例子 1.參考例子 主要增加了從storage區域讀取相機參數的設置&#xff0c;使用圖漾PercipioViewer軟件&#xff0c;如何將相機參數保存到srorage區&#xff0c;可參考鏈接&#xff1a;保存相機參數操作 保存參數設置 注…