使用java遠程提交flink任務到yarn集群

使用java遠程提交flink任務到yarn集群

背景

由于業務需要,使用命令行的方式提交flink任務比較麻煩,要么將后端任務部署到大數據集群,要么弄一個提交機,感覺都不是很離線。經過一些調研,發現可以實現遠程的任務發布。接下來就記錄一下實現過程。這里用flink on yarn 的Application模式實現

環境準備

  • 大數據集群,只要有hadoop就行
  • 后端服務器,linux mac都行,windows不行

正式開始

1. 上傳flink jar包到hdfs

去flink官網下載你需要的版本,我這里用的是flink-1.18.1,把flink lib目錄下的jar包傳到hdfs中。

在這里插入圖片描述
其中flink-yarn-1.18.1.jar需要大家自己去maven倉庫下載。

2. 編寫一段flink代碼

隨便寫一段flink代碼就行,我們目的是測試

package com.azt;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;
import java.util.concurrent.TimeUnit;public class WordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> ctx) throws Exception {String[] words = {"spark", "flink", "hadoop", "hdfs", "yarn"};Random random = new Random();while (true) {ctx.collect(words[random.nextInt(words.length)]);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {}});source.print();env.execute();}
}

3. 打包第二步的代碼,上傳到hdfs

在這里插入圖片描述

4. 拷貝配置文件

  • 拷貝flink conf下的所有文件到java項目的resource中
  • 拷貝hadoop配置文件到到java項目的resource中

具體看截圖
在這里插入圖片描述

5. 編寫java遠程提交任務的程序

這一步有個注意的地方就是,如果你跟我一樣是windows電腦,那么本地用idea提交會報錯;如果你是mac或者linux,那么可以直接在idea中提交任務。

package com.test;import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;/*** @date :2021/5/12 7:16 下午*/
public class Main {public static void main(String[] args) throws Exception {///home/root/flink/lib/libSystem.setProperty("HADOOP_USER_NAME","root");
//        String configurationDirectory = "C:\\project\\test_flink_mode\\src\\main\\resources\\conf";String configurationDirectory = "/export/server/flink-1.18.1/conf";org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());String flinkLibs = "hdfs://node1.itcast.cn/flink/lib";String userJarPath = "hdfs://node1.itcast.cn/flink/user-lib/original.jar";String flinkDistJar = "hdfs://node1.itcast.cn/flink/lib/flink-yarn-1.18.1.jar";YarnClient yarnClient = YarnClient.createYarnClient();YarnConfiguration yarnConfiguration = new YarnConfiguration();yarnClient.init(yarnConfiguration);yarnClient.start();YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever.create(yarnClient);//獲取flink的配置Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory);flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);flinkConfiguration.set(PipelineOptions.JARS,Collections.singletonList(userJarPath));YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration,configurationDirectory);Path remoteLib = new Path(flinkLibs);flinkConfiguration.set(YarnConfigOptions.PROVIDED_LIB_DIRS,Collections.singletonList(remoteLib.toString()));flinkConfiguration.set(YarnConfigOptions.FLINK_DIST_JAR,flinkDistJar);//設置為application模式flinkConfiguration.set(DeploymentOptions.TARGET,YarnDeploymentTarget.APPLICATION.getName());//yarn application nameflinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobname");//設置配置,可以設置很多flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));flinkConfiguration.set(TaskManagerOptions.NUM_TASK_SLOTS, 4);flinkConfiguration.setInteger("parallelism.default", 4);ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();//		設置用戶jar的參數和主類ApplicationConfiguration appConfig = new ApplicationConfiguration(args,"com.azt.WordCount");YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfiguration,yarnConfiguration,yarnClient,clusterInformationRetriever,true);ClusterClientProvider<ApplicationId> clusterClientProvider = null;try {clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(clusterSpecification,appConfig);} catch (ClusterDeploymentException e){e.printStackTrace();}ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();System.out.println(clusterClient.getWebInterfaceURL());ApplicationId applicationId = clusterClient.getClusterId();System.out.println(applicationId);Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();int counts = 30;while (jobStatusMessages.size() == 0 && counts > 0) {Thread.sleep(1000);counts--;jobStatusMessages = clusterClient.listJobs().get();if (jobStatusMessages.size() > 0) {break;}}if (jobStatusMessages.size() > 0) {List<String> jids = new ArrayList<>();for (JobStatusMessage jobStatusMessage : jobStatusMessages) {jids.add(jobStatusMessage.getJobId().toHexString());}System.out.println(String.join(",",jids));}}
}

由于我這里是windows電腦,所以我打包放到服務器上去運行
執行命令 :

java -cp test_flink_mode-1.0-SNAPSHOT.jar com.test.Main

不出以外的話,會打印如下日志

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
http://node2:33811
application_1715418089838_0017
6d4d6ed5277a62fc9a3a274c4f34a468

復制打印的url連接,就可以打開flink的webui了,在yarn的前端頁面中也可以看到flink任務。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/11847.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/11847.shtml
英文地址,請注明出處:http://en.pswp.cn/web/11847.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

LOTO示波器軟件PC緩存(波形錄制與回放)功能

當打開PC緩存功能后, 軟件將采用先進先出的原則排隊對示波器采集的每一幀數據, 進行幀緩存。 當發現屏幕中有感興趣的波形掠過時, 鼠標點擊軟件的(暫停)按鈕, 可以選擇回看某一幀的波形。一幀數據的量 是 當前用戶選擇時基檔位緩沖區總數據大小。不同時基檔位緩沖區大小不同&am…

談談std::map的lower_bound

我們知道std::map內部是一個紅黑樹&#xff0c;放到std::map里的數據等有一個能比較大小的方法。它相當于java里面的TreeMap。 它里面有個lower_bound方法&#xff0c;返回一個迭代器&#xff0c;它指向map里第一個大于等于參數的元素。 方法的簽名很簡單&#xff0c;但是在不同…

富格林:有效預防黑幕阻撓被騙

富格林指出&#xff0c;在投資領域&#xff0c;現貨黃金是一種備受推崇的貴金屬投資品種。倘若能有效預防黑幕阻撓被騙的情況&#xff0c;事實上現貨黃金是很多投資者的“理想型”。然而要想有效地預防黑幕阻撓被騙&#xff0c;就需要掌握足夠多的投資技巧。為此&#xff0c;富…

Milvus 基本概念

Milvus 是一個開源的向量數據庫&#xff0c;專門用于高效地存儲、管理和檢索大規模向量數據。它基于 Apache 許可證 2.0 版本發布&#xff0c;由 Zilliz 公司開源并維護。 Milvus 的設計理念是為了解決向量數據存儲和檢索的挑戰。在許多應用中&#xff0c;向量數據是一種重要的…

強化學習——馬爾可夫過程的理解

目錄 一、馬爾可夫過程1.隨機過程2.馬爾可夫性質3.馬爾可夫過程4.馬爾可夫過程示例 參考文獻 一、馬爾可夫過程 1.隨機過程 隨機過程是概率論的“動態”版本。普通概率論研究的是固定不變的隨機現象&#xff0c;而隨機過程則專注于那些隨時間不斷變化的情況&#xff0c;比如天…

C# 使用channel 實現Plc 異步任務之間的通信

channel 通信的例子: using ConsoleApp2; using System.Collections.Concurrent; using System.Threading.Channels;var queue = new BlockingCollection<Message>(new ConcurrentQueue<Message>());var opt = new BoundedChannelOptions(10) {FullMode = BoundedC…

Linux環境快速部署mysql5.7

1 網絡下載rpm包 wget -c https://repo.huaweicloud.com/mysql/Downloads/MySQL-5.7/mysql-5.7.37-1.el7.x86_64.rpm-bundle.tar2 解壓 tar xf mysql-5.7.37-1.el7.x86_64.rpm-bundle.tar3 數據庫之間會沖突因此需要卸載mariadb-libs yum remove mariadb-libs4 安裝 如果沒有…

R語言兩種方法實現隨機分層抽樣

為了減少數據分布的不平衡&#xff0c;提供高樣本的代表性&#xff0c;可將數據按特征分層一定的層次&#xff0c;在每個層次抽取一定量的樣本&#xff0c;為分層抽樣。分層抽樣的特點是將科學分組法與抽樣法結合在一起&#xff0c;分組減小了各抽樣層變異性的影響&#xff0c;…

HTTP協議及Python實現

最近的項目需要頻繁在前后端之間傳輸數據&#xff0c;本篇主要介紹HTTP協議以及數據傳輸方法。 1 HTTP協議 1.1 http協議簡介 HTTP(Hypertext Transfer Protocol)是一種用于傳輸超文本數據的應用層協議。它是萬維網上數據交換的基礎&#xff0c;定義了客戶端和服務器之間進行通…

C語言指針詳解(三)

目錄 前言 一. 回調函數是什么&#xff1f; 1.定義 2. 代碼示例&#xff1a;計數器 2.1 使用回調函數改造前 2.2 使用回調函數改造后 二. qsort使用舉例 1. qsort介紹 2. 使用qsort函數排序整型數據 3. 使用qsort排序結構體數據 三. qsort函數的模擬實現 四. sizeo…

代碼隨想錄:螺旋矩陣II相關題目推薦(54、LCR146)

59.螺旋矩陣II 題目 給你一個正整數 n &#xff0c;生成一個包含 1 到 n2 所有元素&#xff0c;且元素按順時針順序螺旋排列的 n x n 正方形矩陣 matrix 。 示例 1&#xff1a; 輸入&#xff1a;n 3 輸出&#xff1a;[[1,2,3],[8,9,4],[7,6,5]] 代碼&#xff08;新解法&am…

MyBatis——MyBatis 參數處理

一、單個簡單類型參數 簡單類型包括&#xff1a; byte short int long float double char Byte Short Integer Long Float Double Character String java.util.Date java.sql.Date parameterType 屬性&#xff1a;告訴 MyBatis 參數的類型 MyBatis 自帶類型自動推斷機制…

LLM應用-prompt提示:生成搜索相關問題、生成回答格式包含參考資料

參考: https://isou.chat/ (AI回答與相關問題都是根據問題的搜索引擎結果結合大模型生成的) prompt參考: https://github.com/yokingma/search_with_ai/blob/6d32aa8f05f5f6ee12b5204787035b3f7797c22a/src/prompt.ts#L8 ##rag 根據搜索結果知識回答RagQueryPrompt = ` …

在Go語言中,可以這樣使用Json

在Go語言中&#xff0c;處理JSON數據通常涉及編碼&#xff08;將Go結構體轉換為JSON字符串&#xff09;和解碼&#xff08;將JSON字符串轉換為Go結構體&#xff09;。Go標準庫中的encoding/json包提供了這些功能。第三方插件可以使用"github.com/goccy/go-json"也有同…

Git | git log 和 git status 的區別

如是我聞&#xff1a; git log和git status是Git中的兩個非常有用的命令&#xff0c;它們用于不同的目的&#xff0c;并提供不同類型的信息。 git log git log命令用于顯示一個或多個分支的提交歷史記錄。這個命令會列出提交歷史&#xff0c;包括每次提交的SHA-1哈希值、提交…

程控水冷阻性負載主要工作方式

程控水冷阻性負載是一種先進的電力設備&#xff0c;主要用于電力系統的測試和研究。它的主要工作方式是通過控制水冷系統的溫度&#xff0c;來模擬不同的阻性負載條件&#xff0c;從而對電力設備進行各種性能測試。 首先&#xff0c;我們需要了解什么是阻性負載。阻性負載是指那…

博弈智能的特點

博弈智能是指通過算法和模型對博弈過程進行分析和決策的智能系統。在博弈中&#xff0c;各方參與者追求自身利益和目標&#xff0c;會采取各種策略來達到自己的目標。其中&#xff0c;包括了一些不正當手段&#xff0c;如詭計和欺騙&#xff08;詭&#xff09;&#xff08;詐&a…

代碼隨想錄算法訓練營Day 42| 動態規劃part04 | 01背包問題理論基礎I、01背包問題理論基礎II、416. 分割等和子集

代碼隨想錄算法訓練營Day 42| 動態規劃part04 | 01背包問題理論基礎I、01背包問題理論基礎II、416. 分割等和子集 文章目錄 代碼隨想錄算法訓練營Day 42| 動態規劃part04 | 01背包問題理論基礎I、01背包問題理論基礎II、416. 分割等和子集01背包問題理論基礎一、01背包問題二、…

WSL設置啟動時自動啟動docker服務或其他服務

方式一: Windows系統的WSL,當windows關機再開機后,WSL等于是重新開機的,默認情況下,不會啟動Docker服務。例如在Ubuntu 22.04中,需要使用命令 service docker start來啟動。由于我習慣關機斷電,因此每天開機打開WSL后都要手動輸入這個命令,非常麻煩。所以找了一個方法…

Redis教程——哨兵

在上篇文章我們學習了Redis教程——主從復制&#xff0c;這篇文章我們學習Redis教程——哨兵監控。 在主從復制中如果主機發生宕機&#xff0c;從機Redis會一直等到主機的恢復&#xff0c;這樣會導致只能進行讀操作&#xff0c;不能進行寫操作&#xff0c;這大大降低了系統的高…