Flink使用

Window下啟動支持

下載或復制老版本的放在bin目錄下即可;

flink.bat

@echo off
setlocalSET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\pluginsSET JVM_ARGS=-Xmx512mSET FLINK_JM_CLASSPATH=%FLINK_LIB_DIR%\*java %JVM_ARGS% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.client.cli.CliFrontend %*endlocal

start-cluster.bat

@echo off
setlocal EnableDelayedExpansionSET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
SET FLINK_CONF_DIR=%FLINK_HOME%\conf
SET FLINK_LOG_DIR=%FLINK_HOME%\logSET JVM_ARGS=-Xms1024m -Xmx1024mSET FLINK_CLASSPATH=%FLINK_LIB_DIR%\*SET logname_jm=flink-%username%-jobmanager.log
SET logname_tm=flink-%username%-taskmanager.log
SET log_jm=%FLINK_LOG_DIR%\%logname_jm%
SET log_tm=%FLINK_LOG_DIR%\%logname_tm%
SET outname_jm=flink-%username%-jobmanager.out
SET outname_tm=flink-%username%-taskmanager.out
SET out_jm=%FLINK_LOG_DIR%\%outname_jm%
SET out_tm=%FLINK_LOG_DIR%\%outname_tm%SET log_setting_jm=-Dlog.file="%log_jm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
SET log_setting_tm=-Dlog.file="%log_tm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties":: Log rotation (quick and dirty)
CD "%FLINK_LOG_DIR%"
for /l %%x in (5, -1, 1) do ( 
SET /A y = %%x+1 
RENAME "%logname_jm%.%%x" "%logname_jm%.!y!" 2> nul
RENAME "%logname_tm%.%%x" "%logname_tm%.!y!" 2> nul
RENAME "%outname_jm%.%%x" "%outname_jm%.!y!"  2> nul
RENAME "%outname_tm%.%%x" "%outname_tm%.!y!"  2> nul
)
RENAME "%logname_jm%" "%logname_jm%.0"  2> nul
RENAME "%logname_tm%" "%logname_tm%.0"  2> nul
RENAME "%outname_jm%" "%outname_jm%.0"  2> nul
RENAME "%outname_tm%" "%outname_tm%.0"  2> nul
DEL "%logname_jm%.6"  2> nul
DEL "%logname_tm%.6"  2> nul
DEL "%outname_jm%.6"  2> nul
DEL "%outname_tm%.6"  2> nulfor %%X in (java.exe) do (set FOUND=%%~$PATH:X)
if not defined FOUND (echo java.exe was not found in PATH variablegoto :eof
)echo Starting a local cluster with one JobManager process and one TaskManager process.echo You can terminate the processes via CTRL-C in the spawned shell windows.echo Web interface by default on http://localhost:8081/.start java %JVM_ARGS% %log_setting_jm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir "%FLINK_CONF_DIR%" > "%out_jm%" 2>&1
start java %JVM_ARGS% %log_setting_tm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir "%FLINK_CONF_DIR%" > "%out_tm%" 2>&1endlocal

Flink實戰(Java/MongoDB/Mysql)

    <properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version><flink-version>1.14.6</flink-version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>${flink-version}</version></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><optional>true</optional></dependency><!-- mongodb --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mongodb</artifactId><version>1.1.0-1.18</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.50</version></dependency><!-- Spring Boot Starter MyBatis --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.6</version></dependency><!-- MySQL Driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency></dependencies>

Flink mongodb?

Flink CDC在MongoDB的副本集(replica set)上使用了$changeStream操作,而該操作僅支持副本集。如果你的MongoDB是單個實例(standalone),則不支持$changeStream操作。

MongoDB開啟副本集(Replica Set),否則代碼運行會報錯。

Flink報錯記錄

[20210910 10:16:40.107] [DEBUG] [main] [StreamGraph.java:391][addOperator] Vertex: 2
java.lang.IllegalStateException: No ExecutorFactory found to execute the application.at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1947)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)at com.example.Fink_Kafka_Demo.main(Fink_Kafka_Demo.java:27)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)

添加

            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink-version}</version></dependency>

[20210910 10:20:02.172] [INFO] [main] [RestServerEndpoint.java:139][start] Starting rest endpoint.
[20210910 10:20:02.203] [DEBUG] [main] [DispatcherRestEndpoint.java:124][initializeWebSubmissionHandlers] Failed to load web based job submission extension.
org.apache.flink.util.FlinkException: The module flink-runtime-web could not be found in the class path. Please add this jar in order to enable web based job submission.at org.apache.flink.runtime.webmonitor.WebMonitorUtils.loadWebSubmissionExtension(WebMonitorUtils.java:197)at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.initializeWebSubmissionHandlers(DispatcherRestEndpoint.java:112)at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.initializeHandlers(WebMonitorEndpoint.java:268)at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.initializeHandlers(DispatcherRestEndpoint.java:89)at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:144)at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)at org.apache.flink.runtime.minicluster.MiniCluster.createDispatcherResourceManagerComponents(MiniCluster.java:463)at org.apache.flink.runtime.minicluster.MiniCluster.setupDispatcherResourceManagerComponents(MiniCluster.java:422)at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:366)at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:75)at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:85)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)at com.example.Fink_Kafka_Demo.main(Fink_Kafka_Demo.java:27)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)
[20210910 10:20:02.535] [DEBUG] [main] [InternalLoggerFactory.java:45][newDefaultFactory] Using SLF4J as the default logging framework
[20210910 10:20:02.536] [DEBUG] [main] [InternalThreadLocalMap.java:56][<clinit>] -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024

?添加

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>${flink-version}</version>
</dependency>

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/65156.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/65156.shtml
英文地址,請注明出處:http://en.pswp.cn/web/65156.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Python基礎知識回顧-數據結構

Tuple 在 Python 中&#xff0c;我們經常使用 Tuple 來將相關數據分組。Tuple 指的是有序且不可變的元素集合。 形式&#xff1a;通常以逗號分隔的元素寫在括號"() "中。 數據類型和索引&#xff1a;包含 String、整數和浮點數&#xff0c;并使用正索引和負索引訪問…

8. C++ 面向對象之特性一(封裝)

面向對象主要包括三大類&#xff1a;封裝&#xff0c;繼承&#xff0c;多態 1.類和對象 c認為&#xff0c;萬物皆為對象&#xff0c;對象上有其屬性和行為 人可以作為對象&#xff0c;屬性有姓名、年齡、身高、體重...&#xff0c;行為有走、跑、跳、吃飯、唱歌... 車也可以作…

WebRtc02:WebRtc架構、目錄結構、運行機制

整體架構 WebRtc主要分為三層&#xff1a; CAPI層&#xff1a;外層調用Session管理核心層&#xff1a;包括視頻引擎、音頻引擎、網絡傳輸 可由使用者重寫視頻引擎&#xff1a;編解碼器、視頻緩存、視頻增強音頻引擎&#xff1a;編解碼器、音頻緩存、回音消除、降噪傳輸&#x…

【Qt】快速添加對應類所需的頭文件包含

快速添加對應類所需的頭文件包含 一&#xff0c;簡介二&#xff0c;操作步驟 一&#xff0c;簡介 本文介紹一下&#xff0c;如何快速添加對應類所需要包含的頭文件&#xff0c;可以提高開發效率&#xff0c;供參考。 二&#xff0c;操作步驟 以QTime類為例&#xff1a; 選中…

Apache MINA 反序列化漏洞CVE-2024-52046

漏洞描述&#xff1a; Apache MINA 是一個功能強大、靈活且高性能的網絡應用框架。它通過抽象網絡層的復雜性&#xff0c;提供了事件驅動架構和靈活的 Filter 鏈機制&#xff0c;使得開發者可以更容易地開發各種類型的網絡應用。 Apache MINA 框架的 ObjectSerializationDeco…

服務器docker配置過程

1.docker安裝 參考官方文檔&#xff1a;https://docker.cadn.net.cn/manuals/engine_install_ubuntu 2.docker鏡像源替換 官方文檔&#xff1a;https://cloud.tencent.com/document/product/1207/45596 鏡像源根據你租了哪家的去找官方文檔即可。

RabbitMQ通過代碼創建交換機和隊列

常見交換機 RabbitMQ提供的交換機一共的四種&#xff0c;分別是&#xff1a; 1. Fanout&#xff1a;采用廣播形式來發送消息&#xff0c;會將消息路由到所有綁定了的隊列之中。 2. Direct&#xff1a;通過Binding Key與隊列綁定&#xff0c;生產者在發送信息的時候會通過Routin…

js es6 reduce函數, 通過規格生成sku

const specs [{ name: 顏色, values: [紅色, 藍色, 綠色] },{ name: 尺寸, values: [S, M, L] } ];function generateSKUs(specs) {return specs.reduce((acc, spec) > {const newAcc [];for (const combination of acc) {for (const value of spec.values) {newAcc.push(…

WPF通過反射機制動態加載控件

Activator.CreateInstance 是 .NET 提供的一個靜態方法&#xff0c;它屬于 System 命名空間。此方法通過反射機制根據提供的類型信息。 寫一個小demo演示一下 要求&#xff1a;在用戶反饋界面點擊建議或者評分按鈕 彈出相應界面 編寫MainWindow.xmal 主窗體 <Window x:C…

寬帶、光貓、路由器、WiFi、光纖之間的關系

1、寬帶&#xff08;Broadband&#xff09; 1.1 寬帶的定義寬帶指的是一種高速互聯網接入技術&#xff0c;通常包括ADSL、光纖、4G/5G等不同類型的接入方式。寬帶的關鍵特點是能夠提供較高的數據傳輸速率&#xff0c;使得用戶可以享受到穩定的上網體驗。 1.2 寬帶的作用寬帶是…

Pytest鉤子函數,測試框架動態切換測試環境

在軟件測試中&#xff0c;測試環境的切換是個令人頭疼的問題。不同環境的配置不同&#xff0c;如何高效切換測試環境成為許多測試開發人員關注的重點。你是否希望在運行測試用例時&#xff0c;能夠動態選擇測試環境&#xff0c;而不是繁瑣地手動修改配置&#xff1f; Pytest 測…

印象筆記07——試一試PDF標注

印象筆記07——試一試PDF標注 [!CAUTION] 根據第六期&#xff0c;我再次查詢了資料&#xff0c;印象筆記還是有一些可圈可點的功能的&#xff08;當然部分有平替&#xff09;&#xff0c;針對會員作用&#xff0c;開發使用場景雖然是逆向的&#xff0c;但我堅信這是一部分人的現…

【Vue】分享一個快速入門的前端框架以及如何搭建

先上效果圖: 登錄 菜單: 下載地址: 鏈接&#xff1a;https://pan.baidu.com/s/1m-ZlBARWU6_2n8jZil_RAQ 提取碼&#xff1a;ui20 … 主要是可以自定義設置token,更改后端請求地址較為方便。 應用設置: 登錄與token設置: 在這里設置不用登錄,可以請求的接口: request.js i…

通過串口通信控制led燈的亮滅

初始化led燈的gpio接口控制燈的亮滅 初始化uart1串口 將gpio9和gpio10設置為復用模式進行串口通信 通過串口的輸入輸出函數實現串口通信控制led燈的亮滅

計算機xinput1_4.dll丟失怎么修復?

電腦運行時常見問題及修復指南 作為軟件開發從業者&#xff0c;深知電腦在日常使用中難免會遇到各種問題&#xff0c;如文件丟失、文件損壞和系統報錯等。這些問題不僅影響工作效率&#xff0c;還可能帶來數據丟失的風險。本文將詳細介紹一些常見問題及其解決辦法&#xff0c;…

DeepSeek V3“報錯家門”:我是ChatGPT

搜 &#xff1a;海訊無雙Ai 要說這兩天大模型圈的頂流話題&#xff0c;那絕對是非DeepSeek V3莫屬了。 不過在網友們紛紛測試之際&#xff0c;有個bug也成了熱議的焦點—— 只是少了一個問號&#xff0c;DeepSeek V3竟然稱自己是ChatGPT。 甚至讓它講個笑話&#xff0c;生成…

C++:范圍for

范圍for&#xff08;range-based for&#xff09;是C的一種循環結構&#xff0c; 是在 C11 這個標準中引入的&#xff0c;這種類型的for循環使得遍歷數組、容器中的元素更加簡便和直觀。 一、范圍for語法 for ( 類型 變量名 : 數組名 ) 語句 //多條語句需要加?括號 示例&#…

C++基礎概念復習

前言 本篇文章作基礎復習用&#xff0c;主要是在C學習中遇到的概念總結&#xff0c;后續會繼續補充。如有不足&#xff0c;請前輩指出&#xff0c;萬分感謝。 1、什么是封裝&#xff0c;有何優點&#xff0c;在C中如何體現封裝這一特性&#xff1f; 封裝是面向對象編程&…

前端工程化之手搓webpack5 --【elpis全棧項目】

前端工程化之手搓webpack5 --【elpis全棧項目】 導讀 基本流程&#xff1a;輸入 – 編譯 – 輸出 #mermaid-svg-V8Gi7RFNikCuEhax {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-V8Gi7RFNikCuEhax .error-icon{fil…

vue3使用setup語法糖組件基礎傳值

(1)&#xff1a;defineProps&#xff1a;傳入要使用的props定義自定義屬性&#xff0c;傳遞過來的值具有響應式&#xff0c;和props一樣&#xff1b; (2)&#xff1a;defineEimts&#xff1a;傳入要自定義的事件&#xff0c;emit實例去傳入自定義事件的值&#xff0c;和$emit或…