大數據-玩轉數據-Flink 自定義Sink(Mysql)

一、說明

如果Flink沒有提供給我們可以直接使用的連接器,那我們如果想將數據存儲到我們自己的存儲設備中,mysql 的安裝使用請參考
mysql-玩轉數據-centos7下mysql的安裝
創建表

CREATE TABLE `sensor` (`id` int(10)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

二、pom.xml 導入驅動

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version>
</dependency>

三、編寫程序

package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkMysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value.intValue();}});keyedStream.addSink(new MysqlSink());env.execute();}public static class  MysqlSink extends RichSinkFunction<Integer>{private Connection sunbo;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");sunbo = DriverManager.getConnection("jdbc:mysql://192.168.220.100:3306/test?useSSL=false", "sunbo", "Mysql123456#");}@Overridepublic void close() throws Exception {if (sunbo != null) {sunbo.close();}}@Overridepublic void invoke(Integer value, Context context) throws Exception {String sql = "insert into sensor(id)values(?)";PreparedStatement ps = sunbo.prepareStatement(sql);ps.setInt(1,value.intValue());ps.execute();ps.close();}}
}

四、運行測試

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/35776.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/35776.shtml
英文地址,請注明出處:http://en.pswp.cn/news/35776.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

二 根據用戶行為數據創建ALS模型并召回商品

二 根據用戶行為數據創建ALS模型并召回商品 2.0 用戶行為數據拆分 方便練習可以對數據做拆分處理 pandas的數據分批讀取 chunk 厚厚的一塊 相當大的數量或部分 import pandas as pd reader pd.read_csv(behavior_log.csv,chunksize100,iteratorTrue) count 0; for chunk in …

DNS協議及其工作原理

DNS是域名系統&#xff08;Domain Name System&#xff09;的縮寫&#xff0c;它是一種用于將域名轉換為IP地址的分布式數據庫系統。它是因特網的基石&#xff0c;能夠使人們通過域名方便地訪問互聯網&#xff0c;而無需記住復雜的IP地址。 DNS的歷史可以追溯到1983年&#xf…

4個簡化IT服務臺任務的ChatGPT功能

最近幾個月&#xff0c;ChatGPT 風靡全球&#xff0c;這是一個 AI 聊天機器人&#xff0c;使用戶能夠生成腳本、文章、鍛煉圖表等。這項技術在各行各業都有無窮無盡的應用&#xff0c;在本文中&#xff0c;我們將研究這種現代技術如何幫助服務臺團隊增強服務交付和客戶體驗。 什…

最佳實踐:如何優雅地提交一個 Amazon EMR Serverless 作業?

《大數據平臺架構與原型實現&#xff1a;數據中臺建設實戰》一書由博主歷時三年精心創作&#xff0c;現已通過知名IT圖書品牌電子工業出版社博文視點出版發行&#xff0c;點擊《重磅推薦&#xff1a;建大數據平臺太難了&#xff01;給我發個工程原型吧&#xff01;》了解圖書詳…

章節7:XSS檢測和利用

章節7&#xff1a;XSS檢測和利用 測試payload <script>alert(XSS)</script> <script>alert(document.cookie)</script> ><script>alert(document.cookie)</script> ><script>alert(document.cookie)</script> &qu…

元宇宙之經濟(02)理解NFT

1 NFT是什么&#xff1f; 想象一下&#xff0c;你小時候曾經在操場上集齊過各種不同的貼紙&#xff0c;然后和朋友們交換&#xff0c;這些貼紙有著獨特的圖案和價值。NFT的概念與此類似&#xff0c;但在數字世界中運作。NFT是一種基于區塊鏈技術的數字資產&#xff0c;每個NFT…

golang—面試題大全

目錄標題 sliceslice和array的區別slice擴容機制slice是否線程安全slice分配到棧上還是堆上擴容過程中是否重新寫入go深拷貝發生在什么情況下&#xff1f;切片的深拷貝是怎么做的copy和左值進行初始化區別slice和map的區別 mapmap介紹map的key的類型map對象如何比較map的底層原…

《Java極簡設計模式》第03章:工廠方法模式(FactoryMethod)

作者&#xff1a;冰河 星球&#xff1a;http://m6z.cn/6aeFbs 博客&#xff1a;https://binghe.gitcode.host 文章匯總&#xff1a;https://binghe.gitcode.host/md/all/all.html 源碼地址&#xff1a;https://github.com/binghe001/java-simple-design-patterns/tree/master/j…

無法正確識別車牌(Python、OpenCv、Tesseract)

我正在嘗試識別車牌&#xff0c;但出現了錯誤&#xff0c;例如錯誤/未讀取字符 以下是每個步驟的可視化&#xff1a; 從顏色閾值變形關閉獲得遮罩 以綠色突出顯示的車牌輪廓過濾器 將板輪廓粘貼到空白遮罩上 Tesseract OCR的預期結果 BP 1309 GD 但我得到的結果是 BP 1309…

騰訊云標準型CVM云服務器詳細介紹

騰訊云CVM服務器標準型實例的各項性能參數平衡&#xff0c;標準型云服務器適用于大多數常規業務&#xff0c;例如&#xff1a;web網站及中間件等&#xff0c;常見的標準型云服務器有CVM標準型S5、S6、SA3、SR1、S5se等規格&#xff0c;騰訊云服務器網來詳細說下云服務器CVM標準…

NAS搭建指南一——服務器的選擇與搭建

一、服務器的選擇 有自己的本地的公網 IP 的請跳過此篇文章按需求選擇一個云服務器&#xff0c;目的就是為了進行 frp 的搭建&#xff0c;完成內網穿透我選擇的是騰訊云服務器&#xff0c;我的配置如下&#xff0c;僅供參考&#xff1a; 4. 騰訊云服務器官網地址 二、服務器…

docker 鏡像的導出與導入 save 與 load

一、鏡像導出 docker save 導出 將系統中的鏡像保存為壓縮包&#xff0c;進行文件傳輸。使用 docker save --help 查看命令各參數&#xff0c;或者去docker官網查看.以 hello-world鏡像為例。 A&#xff1a;將鏡像保存為tar包 docker save image > package.tar docker sa…

day9 10-牛客67道劍指offer-JZ66、19、20、75、23、76、8、28、77、78

文章目錄 1. JZ66 構建乘積數組暴力解法雙向遍歷 2. JZ19 正則表達式匹配3. JZ20 表示數值的字符串有限狀態機遍歷 4. JZ75 字符流中第一個不重復的字符5. JZ23 鏈表中環的入口結點快慢指針哈希表 6. JZ76 刪除鏈表中重復的結點快慢指針三指針如果只保留一個重復結點 7. JZ8 二…

gitblit-使用

1.登入GitBlit服務器 默認用戶和密碼: admin/admin 2.創建一個新的版本庫 點擊圖中的“版本庫”&#xff0c;然后點擊圖中“創建版本庫” 填寫名稱和描述&#xff0c;注意名稱最后一定要加 .git選擇限制查看、克隆和推送勾選“加入README”和“加入.gitignore文件”在圖中的1處…

使用IIS服務器部署Flask python Web項目

參考文章 ""D:\Program Files (x86)\Python310\python310.exe"|"D:\Program Files (x86)\Python310\lib\site-packages\wfastcgi.py"" can now be used as a FastCGI script processor參考文章 請求路徑填寫*&#xff0c;模塊選擇FastCgiModule&…

一鍵部署 Umami 統計個人網站訪問數據

談到網站統計&#xff0c;大家第一時間想到的肯定是 Google Analytics。然而&#xff0c;我們都知道 Google Analytics 會收集所有用戶的信息&#xff0c;對數據沒有任何控制和隱私保護。 Google Analytics 收集的指標實在是太多了&#xff0c;有很多都是不必要的&#xff0c;…

Javascript 深入了解map

map() 是 JavaScript 數組提供的一個高階函數&#xff0c;它用于對數組中的每個元素執行指定的函數&#xff0c;并返回一個新的數組&#xff0c;新數組中的元素是原數組中的每個元素經過函數處理后的結果。 map() 函數的語法如下&#xff1a; javascript array.map(callback(…

Multi-object navigation in real environments using hybrid policies 論文閱讀

論文信息 題目&#xff1a;Multi-object navigation in real environments using hybrid policies 作者&#xff1a;Assem Sadek, Guillaume Bono 來源&#xff1a;CVPR 時間&#xff1a;2023 Abstract 機器人技術中的導航問題通常是通過 SLAM 和規劃的結合來解決的。 最近…

優化堆排序(Java 實例代碼)

目錄 優化堆排序 Java 實例代碼 src/runoob/heap/HeapSort.java 文件代碼&#xff1a; 優化堆排序 上一節的堆排序&#xff0c;我們開辟了額外的空間進行構造堆和對堆進行排序。這一小節&#xff0c;我們進行優化&#xff0c;使用原地堆排序。 對于一個最大堆&#xff0c;首…

【設計模式】-策略模式:優雅處理條件邏輯

Java 策略模式之優雅處理條件邏輯 前言 在軟件開發中&#xff0c;我們經常會遇到根據不同的條件執行不同邏輯的情況。這時&#xff0c;策略模式是一種常用的設計模式&#xff0c;能夠使代碼結構清晰、易于擴展和維護。 本文將詳細介紹策略模式的概念及其在Java中的應用&#x…