Flink2.0學習筆記:Flink服務器搭建與flink作業提交

一,下載flink:Downloads | Apache Flink,解壓后放入IDE工作目錄:我這里以1.17版本為例

可以看到,flink后期的版本中沒有提供window啟動腳本:start-cluster.bat

所以這里要通過windows自帶的wsl 系統啟動它

打開終端依次運行下列命令完成wsl linux 系統的安裝以及jdk的安裝

wsl --install
wsl.exe -d Ubuntu
sudo apt update
sudo apt install openjdk-11-jdk -y

之后繼續在終端中執行 wsl.exe -d Ubuntu 啟動wsl,wsl 默認系統為:Ubuntu,當然也可以切換其他類型的系統,重要的是:wsl會自動掛載windows 目錄,這就實現了在wsl上運行windows目錄中的項目。

然后 一路cd 到flink bin目錄,啟動flink:


這里啟動前要注意修改flink 的配置:把localhost 統統改為 0.0.0.0,,除jobmanager.rpc.address: 這項要設置為wsl? 的ip,不然flink集群選舉master會失敗: [jobmanager.rpc.address: 172.29.145.42],這樣啟動后,就可以在本機瀏覽器輸入wsl的ip訪問flink服務的web ui了


二,提交flink作業

為了方便測試,這里寫一個程序每隔1秒向本機(192.168.0.39) 端口:9999發送數據:“test flink window hallo word”。

package org.example.demo01;import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;public class PushDataTo9999 {private static final String HOST = "192.168.0.39";private static final int PORT = 9999;private static final String DATA = "test flink window hallo word";public static void main(String[] args) {try {System.out.println("Connecting to " + HOST + ":" + PORT);// 創建到WSL的連接try (Socket socket = new Socket(HOST, PORT);OutputStream outputStream = socket.getOutputStream()) {System.out.println("Connected to " + HOST + ":" + PORT);// 持續發送數據while (!Thread.currentThread().isInterrupted() && !socket.isClosed()) {// 獲取當前系統時間String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));// 每秒發送一次帶時間戳的數據String dataToSend = DATA + " " + currentTime + "\n";outputStream.write(dataToSend.getBytes(StandardCharsets.UTF_8));outputStream.flush();System.out.println("Sent: " + dataToSend.trim());// 等待1秒Thread.sleep(1000);}}} catch (IOException | InterruptedException e) {System.err.println("Error: " + e.getMessage());}}
}

?

然后flink 作業內容為在wsl服務器(172.29.145.42)中 監聽本機(192.168.0.39)端口9999,并實時統計每個單詞出現的次數,這里注意關閉windows 防火墻

package org.example.demo01;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.util.Collector;/*** Hello world!*/
public class App {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 設置為流處理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);// 基本配置env.setParallelism(1); // 設置并行度為1env.disableOperatorChaining(); // 禁用算子鏈,使執行更清晰// 禁用檢查點,因為是簡單的演示程序env.getCheckpointConfig().disableCheckpointing();// 創建周期性的數據源
//        DataStream<String> dataStream = env
//                .socketTextStream("localhost", 9999) // 從socket讀取數據
//                .name("source-strings")
//                .setParallelism(1);DataStream<String> dataStream = env.addSource(new SocketTextStreamFunction("192.168.0.39", 9999, "\n", 0)).name("socket-source");// 轉換算子 keyBy: 按單詞分組并計數dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}).name("flatmap-split-words").setParallelism(1).keyBy(tuple -> tuple.f0) // 按單詞分組.sum(1) // 計算每個單詞的出現次數.print().name("printer-word-count");// 執行任務env.execute("Flink Streaming Java API Hello");}
}

注意pom 需要加入flink的打包插件:

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- Replace this with the main class of your job --><mainClass>org.example.demo01.App</mainClass></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins></build>

通過maven編譯,打包后,我們把jar包通過web ui上傳到flink 服務端:

點擊我們上傳的jar,進入提交項:

提交了后作業會自動啟動:


作業的print輸出可以在taskmanagers中查看:

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/93380.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/93380.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/93380.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MySQL鎖的分類

MySQL鎖可以按照多個維度進行分類&#xff0c;下面我用最清晰的方式為你梳理所有分類方式&#xff1a;一、按鎖的粒度分類&#xff08;最常用分類&#xff09;鎖類型作用范圍特點適用引擎示例場景表級鎖整張表開銷小、加鎖快&#xff0c;并發度低MyISAM, MEMORY數據遷移、全表統…

電腦上搭建HTTP服務器在局域網內其它客戶端無法訪問的解決方案

在電腦上開發一套HTTP服務器的程序在調試時&#xff0c;在本機內訪問正常&#xff0c;但是在本機外訪問就不正常&#xff0c;外部客戶端無法訪問或無法連接到本機的服務器的問題&#xff0c;這可能涉及網絡配置、防火墻、端口轉發或服務綁定等問題&#xff0c;在這里提供了解決…

雙指針和codetop2(最短路問題BFS)

雙指針和codetop21.雙指針1.[復寫0](https://leetcode.cn/problems/duplicate-zeros/)2.動態規劃1.[珠寶的最高價值](https://leetcode.cn/problems/li-wu-de-zui-da-jie-zhi-lcof/description/)2.[解碼方法](https://leetcode.cn/problems/decode-ways/)3.[下降路徑最小和](ht…

基于K鄰近算法(KNN)的數據回歸預測模型

一、作品詳細簡介 1.1附件文件夾程序代碼截圖 全部完整源代碼&#xff0c;請在個人首頁置頂文章查看&#xff1a; 學行庫小秘_CSDN博客https://blog.csdn.net/weixin_47760707?spm1000.2115.3001.5343 1.2各文件夾說明 1.2.1 main.m主函數文件 該MATLAB代碼實現了一個基于…

【123頁PPT】化工行業數字化解決方案(附下載方式)

篇幅所限&#xff0c;本文只提供部分資料內容&#xff0c;完整資料請看下面鏈接 https://download.csdn.net/download/2501_92808859/91654005 資料解讀&#xff1a;【123頁PPT】化工行業數字化解決方案 詳細資料請看本解讀文章的最后內容。化工行業作為國民經濟的重要支柱之…

c++--文件頭注釋/doxygen

文件頭注釋 開源項目&#xff1a; /*** file robot_base.cpp* author Mr.Wu* date 2025-05-28* version 1.0.0* brief Robot basic drive to communicate with controller** copyright Copyright (c) 2025 google.** Licensed under the Apache License, Version 2.…

【教程】筆記本安裝FnOS設置合蓋息屏不休眠

重裝FnOS好幾次了&#xff0c;合蓋后屏幕關閉但不休眠的問題每次都要網上找參差不齊的教程&#xff0c;麻煩死了&#xff0c;索性記錄一下方便以后復制粘貼。 使用root登錄 sudo -i修改系統配置文件編輯logind.conf文件&#xff1a; 打開終端&#xff0c;輸入以下命令以編輯log…

深入解析 Monkey OCR:本地化、多語言文本識別的利器與實踐指南

在信息爆炸的時代&#xff0c;從圖片、掃描文檔中高效提取結構化文本的需求日益迫切。OCR&#xff08;光學字符識別&#xff09;技術成為解決這一問題的核心工具。盡管市面上有 Abbyy FineReader、Adobe Acrobat 等商業巨頭&#xff0c;以及 Tesseract、PaddleOCR 等開源方案&a…

動態規劃法 - 53. 最大子數組和

什么是動態規劃法&#xff1f; 簡單說&#xff0c;動態規劃&#xff08;Dynamic Programming&#xff0c;簡稱 DP&#xff09; 是一種**「把復雜問題拆解成小問題&#xff0c;通過解決小問題來解決大問題」**的方法。 核心思路有兩個&#xff1a; 1.拆分問題&#xff1a;把原問…

STM32CUBEMX配置stm32工程

1.新建工程2.選擇芯片3.配置各種片上外設和時鐘4.創建工程5.根據文件內容進行修改工程注意&#xff1a;最好根據工程規范來做&#xff0c;因為有時我們需要更改配置并重新生成&#xff0c;如果不按規范來會導致部分代碼會被系統清除&#xff0c;在工程中中有很多成對的BEGIN和E…

Day07 緩存商品 購物車

緩存菜品問題說明用戶端小程序展示的菜品數據都是通過查詢數據庫獲得&#xff0c;如果用戶端訪問量比較大&#xff0c;數據庫訪問壓力隨之增大。結果&#xff1a;系統響應慢&#xff0c;用戶體驗差實現思路通過 Redis 來緩存菜品數據&#xff0c;減少數據庫查詢操作。緩存邏輯分…

Jenkins(集群與流水線配置)

Jenkins&#xff08;集群與流水線配置&#xff09; Jenkins集群 集群化構建可以提升構建效率&#xff0c;也可以并發在多臺機器上執行構建。 安裝前提&#xff1a;內存至少512MB、Java 17 以上、Maven環境、Git環境 配置集群步驟 配置節點菜單新建節點查看節點配置狀態 新建完節…

深入剖析ROS參數服務器通信機制 ——共享全局數據的“云端倉庫”實現原理

?1. 核心概念&#xff1a;分布式數據共享容器? ?定位?&#xff1a;ROS參數服務器&#xff08;Parameter Server&#xff09;是ROS架構中的全局共享存儲系統&#xff0c;相當于機器人的“云端倉庫”。 ?作用?&#xff1a; 存儲多節點共享的靜態配置參數&#xff08;如機器…

21.AlexNet

雖然LeNet在手寫數字識別上取得了不錯的結果&#xff0c;但是他在對于更大的數據集效果就十分有限。 一方面&#xff0c;對于更大尺寸的圖像效果有限 另一方面&#xff0c;對于更多分類的任務效果有限 自LeNet后的十幾年&#xff0c;計算機視覺領域步入寒冬&#xff0c;神經網絡…

Shell腳本-條件判斷相關參數

一、前言在 Shell 腳本編程中&#xff0c;條件判斷 是實現流程控制的核心機制之一。無論是判斷文件是否存在、字符串是否相等&#xff0c;還是數值大小比較&#xff0c;都離不開條件判斷語句。本文將帶你全面掌握 Shell 腳本中與條件判斷相關的參數和語法&#xff0c;包括&…

何為“低空經濟”?

低空經濟&#xff08;Low-Altitude Economy&#xff09;是指以1000米以下空域&#xff08;部分場景可延伸至3000米&#xff09;為核心&#xff0c;以無人機&#xff08;UAV&#xff09;、電動垂直起降飛行器&#xff08;eVTOL&#xff09;、直升機、通航飛機等航空器為載體&…

線性代數 | 直觀理解一些概念

注&#xff1a;本文為 “線性代數 直觀理解概念” 相關合輯。 英文引文&#xff0c;機翻未校。 中文引文&#xff0c;略作重排。 如有內容異常&#xff0c;請看原文。 直觀理解線性代數的一些概念 2015-03-06 Updated: 2015-05-09 本文介紹矩陣的一些相關概念的直觀理解&…

Spring AI 集成阿里云百煉平臺

Spring AI 集成阿里云百煉平臺 創建API key 在阿里云百煉平臺創建API key設置系統變量。阿里云百煉 api key 創建 API 參考 官方API地址&#xff1a;https://bailian.console.aliyun.com &#xff08;1&#xff09;在阿里云百煉控制臺&#xff0c;選擇API參考菜單。 API…

Codeforces Round 859 (Div. 4) A - D + F - G2 題解

Codeforces Round 859 (Div. 4) A - D F - G2 題解A. Plus or Minus&#xff08;800 分難度&#xff09; 思路&#xff1a; 直接 if - else 判斷。 參考代碼&#xff1a; #include<bits/stdc.h> using namespace std; void solve(){int a, b, c;cin >> a >&g…

【Java web】Servlet 詳解

一、什么是 Servlet&#xff1f;—— 你不知道的 "網頁服務員"想象你走進一家網紅書店&#xff08;比如 "在線 Java 書店"&#xff09;&#xff0c;想買一本《Java 編程思想》。你告訴前臺服務員你的需求&#xff0c;服務員去倉庫找書、包裝、收款&#xf…