Flink 客戶端操作命令及可視化工具

Flink提供了豐富的客戶端操作來提交任務和與任務進行交互。下面主要從Flink命令行、Scala ShellSQL ClientRestful APIWeb五個方面進行整理。

Flink安裝目錄的bin目錄下可以看到flinkstart-scala-shell.shsql-client.sh等文件,這些都是客戶端操作的入口。
[點擊并拖拽以移動] ?

flink 常見操作:可以通過 -help 查看幫助

run 運行任務

-d:以分離模式運行作業
-c:如果沒有在jar包中指定入口類,則需要在這里通過這個參數指定;
-m:指定需要連接的jobmanager(主節點)地址,使用這個參數可以指定一個不同于配置文件中的jobmanager,可以說是yarn集群名稱;
-p:指定程序的并行度。可以覆蓋配置文件中的默認值;
-s:保存點savepoint的路徑以還原作業來自(例如hdfs:///flink/savepoint-1537);

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID dce7b69ad15e8756766967c46122736f

就可以看到我們提交的JobManager,默認是一個并發。
[點擊并拖拽以移動] ?

點進去就可以看到詳細的信息
[點擊并拖拽以移動] ?

點擊左側TaskManager —Stdout能看到具體輸出的日志信息。
[點擊并拖拽以移動] ?

或者查看TaskManager節點的log目錄下的*.out文件,也能看到具體的輸出信息。
[點擊并拖拽以移動] ?

list 查看任務列表

-mjobmanager<arg>作業管理器(主)的地址連接。

[root@hadoop1 flink-1.10.1]# bin/flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
09.07.2020 16:44:09 : dce7b69ad15e8756766967c46122736f : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

Stop 停止任務

需要指定jobmanagerip:protjobId。如下報錯可知,一個job能夠被stop要求所有的source都是可以stoppable的,即實現了 StoppableFunction接口。

[root@hadoop1 flink-1.10.1]# bin/flink stop -m 127.0.0.1:8081 dce7b69ad15e8756766967c46122736f
Suspending job "dce7b69ad15e8756766967c46122736f" with a savepoint.------------------------------------------------------------The program finished with the following exception:org.apache.flink.util.FlinkException: Could not stop with a savepoint job "dce7b69ad15e8756766967c46122736f".at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)

StoppableFunction接口如下,屬于優雅停止任務。

 /*** @Description 需要 stoppabel 的函數必須實現此接口,例如流式任務 source**               stop() 方法在任務收到 stop信號的時候調用*               source 在接收到這個信號后,必須停止發送新的數據優雅的停止。* @Date 2020/7/9 17:26*/@PublicEvolvingpublic interface StoppableFunction {/*** 停止 source,與 cancel() 不同的是,這是一個讓 source優雅停止的請求。* 等待中的數據可以繼續發送出去,不需要立即停止*/void stop();
}

Cancel 取消任務

如果在conf/flink-conf.yaml里面配置state.savepoints.dir,會保存savepoint,否則不會保存savepoint。(重啟)

state.savepoints.dir: file:///tmp/savepoint

執行 Cancel命令 取消任務

[root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s e8ce0d111262c52bf8228d5722742d47
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job e8ce0d111262c52bf8228d5722742d47 with savepoint to default savepoint directory.
Cancelled job e8ce0d111262c52bf8228d5722742d47. Savepoint stored in file:/tmp/savepoint/savepoint-e8ce0d-f7fa96a085d8.

也可以在停止的時候顯示指定savepoint目錄

1 [root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint f58bb4c49ee5580ab5f27fdb24083353
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job f58bb4c49ee5580ab5f27fdb24083353 with savepoint to /tmp/savepoint.
Cancelled job f58bb4c49ee5580ab5f27fdb24083353. Savepoint stored in file:/tmp/savepoint/savepoint-f58bb4-127b7e84910e.

取消和停止(流作業)的區別如下:
cancel()調用, 立即調用作業算子的cancel()方法,以盡快取消它們。如果算子在接到cancel()調用后沒有停止,Flink將開始定期中斷算子線程的執行,直到所有算子停止為止。
stop()調用 ,是更優雅的停止正在運行流作業的方式。stop()僅適用于source實現了StoppableFunction接口的作業。當用戶請求停止作業時,作業的所有source都將接收stop()方法調用。直到所有source正常關閉時,作業才會正常結束。這種方式,使 作業正常處理完所有作業。

觸發 savepoint

當需要生成savepoint文件時,需要手動觸發savepoint。如下,需要指定正在運行的 JobID 和生成文件的存放目錄。同時,我們也可以看到它會返回給用戶存放的savepoint的文件名稱等信息。

 [root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar Executing TopSpeedWindowing example with default input data set.Use --input to specify file input.Printing result to stdout. Use --output to specify output path.Job has been submitted with JobID 216c427d63e3754eb757d2cc268a448d[root@hadoop1 flink-1.10.1]# bin/flink savepoint -m 127.0.0.1:8081 216c427d63e3754eb757d2cc268a448d /tmp/savepoint/Triggering savepoint for job 216c427d63e3754eb757d2cc268a448d.Waiting for response...Savepoint completed. Path: file:/tmp/savepoint/savepoint-216c42-154a34cf6bfdYou can resume your program from this savepoint with the run command.

savepointcheckpoint的區別:
checkpoint是增量做的,每次的時間較短,數據量較小,只要在程序里面啟用后會自動觸發,用戶無須感知;savepoint是全量做的,每次的時間較長,數據量較大,需要用戶主動去觸發。
checkpoint是作業failover的時候自動使用,不需要用戶指定。savepoint一般用于程序的版本更新,bug修復,A/B Test等場景,需要用戶指定。

從指定 savepoint 中啟動

[root@hadoop1 flink-1.10.1]# bin/flink run -d -s /tmp/savepoint/savepoint-f58bb4-127b7e84910e/ examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 1a5c5ce279e0e4bd8609f541b37652e2

查看JobManager的日志能夠看到Reset the checkpoint ID為我們指定的savepoint文件中的ID
[點擊并拖拽以移動] ?

modify 修改任務并行度

這里修改masterconf/flink-conf.yamltask slot數修改為4。并通過xsync分發到 兩個slave節點上。

taskmanager.numberOfTaskSlots: 4

修改參數后需要重啟集群生效:關閉/啟動集群

[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh && bin/start-cluster.sh 
Stopping taskexecutor daemon (pid: 8236) on host hadoop2.
Stopping taskexecutor daemon (pid: 8141) on host hadoop3.
Stopping standalonesession daemon (pid: 22633) on host hadoop1.
Starting cluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop2.
Starting taskexecutor daemon on host hadoop3.

啟動任務

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 2e833a438da7d8052f14d5433910515a

從頁面上能看到Task Slots總計變為了8,運行的Slot1,剩余Slot數量為7
[點擊并拖拽以移動] ?

這時候默認的并行度是1
[點擊并拖拽以移動] ?

Flink1.0版本命令行flink modify已經沒有這個行為了,被移除了。。。Flink1.7上是可以運行的。

[root@hadoop1 flink-1.10.1]# bin/flink modify -p 4 cc22cc3d09f5d65651d637be6fb0a1c3
"modify" is not a valid action.

Info 顯示程序的執行計劃

[root@hadoop1 flink-1.10.1]# bin/flink info examples/streaming/TopSpeedWindowing.jar 
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------

拷貝輸出的json內容,粘貼到這個網站:http://flink.apache.org/visualizer/可以生成類似如下的執行圖。

[點擊并拖拽以移動] ?

可以與實際運行的物理執行計劃進行對比。
[點擊并拖拽以移動] ?

SQL Client Beta

進入 Flink SQL

[root@hadoop1 flink-1.10.1]# bin/sql-client.sh embedded

Select查詢,按Q退出如下界面;

Flink SQL> select 'hello word';SQL Query Result (Table)Table program finished.                                                                                       Page: Last of 1                                                                                         Updated: 16:37:04.649EXPR$0hello wordQ Quit                                         + Inc Refresh                                  G Goto Page                                    N Next Page                                    O Open Row
R Refresh                                      - Dec Refresh                                  L Last Page                                    P Prev Page

打開http://hadoop1:8081能看到這條select語句產生的查詢任務已經結束了。這個查詢采用的是讀取固定數據集的Custom Source,輸出用的是Stream Collect Sink,且只輸出一條結果。
[點擊并拖拽以移動] ?

[點擊并拖拽以移動] ?

explain 查看 SQL 的執行計劃。

Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
== Abstract Syntax Tree ==         //抽象語法樹
LogicalAggregate(group=[{0}], cnt=[COUNT()])
+- LogicalValues(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])== Optimized Logical Plan ==      //優化后的邏輯執行計劃
GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
+- Exchange(distribution=[hash[name]])+- Values(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])== Physical Execution Plan ==    //物理執行計劃
Stage 13 : Data Sourcecontent : Source: Values(tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])Stage 15 : Operatorcontent : GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])ship_strategy : HASH

結果展示

SQL Client支持兩種模式來維護并展示查詢結果:

table mode

在內存中物化查詢結果,并以分頁table形式展示。用戶可以通過以下命令啟用table mode:例如如下案例;

Flink SQL> SET execution.result-mode=table;
[INFO] Session property has been set.Flink SQL>  SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;SQL Query Result (Table)Table program finished.                                                                                       Page: Last of 1                                                                                         Updated: 16:55:08.589name                       cntAlice                         1Greg                         1Bob                         2Q Quit                                         + Inc Refresh                                  G Goto Page                                    N Next Page                                    O Open Row
R Refresh                                      - Dec Refresh                                  L Last Page                                    P Prev Page

? [點擊并拖拽以移動] ??

? [點擊并拖拽以移動] ??

changelog mode

不會物化查詢結果,而是直接對continuous query產生的添加和撤回retractions結果進行展示:如下案例中的-表示撤回消息

Flink SQL> SET execution.result-mode=changelog;
[INFO] Session property has been set.Flink SQL>  SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;SQL Query Result (Changelog)Table program finished.                                                                                                                                                                                               Updated: 16:58:05.777+/-                      name                       cnt+                       Bob                         1+                     Alice                         1+                      Greg                         1-                       Bob                         1+                       Bob                         2Q Quit                                                                        + Inc Refresh                                                                 O Open Row
R Refresh                                                                     - Dec Refresh

? [點擊并拖拽以移動] ??

? [點擊并拖拽以移動] ??

Environment Files

CREATE TABLE 創建表DDL語句:

Flink SQL> CREATE TABLE pvuv_sink (
>     dt VARCHAR,
>     pv BIGINT,
>     uv BIGINT
> ) ;
[INFO] Table has been created.

SHOW TABLES 查看所有表名

Flink SQL>  show tables;
pvuv_sink

DESCRIBE 表名 查看表的詳細信息;

Flink SQL>  describe pvuv_sink;
root|-- dt: STRING|-- pv: BIGINT|-- uv: BIGINT

插入等操作均與關系型數據庫操作語句一樣,省略N個操作

Restful API

接下來我們演示如何通過rest api來提交jar包和執行任務。
[點擊并拖拽以移動] ?

通過Show Plan可以看到執行圖
[點擊并拖拽以移動] ?

提交之后的操作,取消的話點擊頁面的Cancel Job

? [點擊并拖拽以移動] ??

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/382567.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/382567.shtml
英文地址,請注明出處:http://en.pswp.cn/news/382567.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ySQL挑戰搭建一個簡易的成績管理系統的數據庫

文章為自己搜索網上資源&#xff0c;再在這里進行整理&#xff0c;所以標注為轉載 [實驗步驟](https://www.shiyanlou.com/courses/reports/1347700) 總結做實驗注意事項&#xff1a; 1.添加主鍵 2.主鍵和外鍵的關系 3.注意自增的書寫添加 mysql 如何修改、添加、刪除表主鍵…

網絡之DNS協議圖解

DNS是計算機域名系統 (Domain Name System) 域名系統采用類似目錄樹的等級結構。 域名服務器是指保存有該網絡中所有主機的域名和對應IP地址&#xff0c;并具有將域名轉換為IP地址功能的服務器。 域名服務器為客戶機/服務器模式中的服務器方&#xff0c;它主要有兩種形式&am…

C++ 謂詞,

#define _CRT_SECURE_NO_WARNINGS #include<iostream> #include <vector> #include <algorithm> using namespace std;class GreaterThen20 { public:bool operator()(int val){return val > 20;} };//一元謂詞 void test01() {vector<int>v;v.push…

網絡之ARP

地址解析協議&#xff0c;即ARP&#xff08;Address Resolution Protocol&#xff09;&#xff0c;是根據IP地址獲取物理地址的一個TCP/IP協議。 主機發送信息時將包含目標IP地址的ARP請求廣播到網絡上的所有主機&#xff0c;并接收返回消息&#xff0c;以此確定目標的物理地址…

C++ 內建函數對象

STL內建了一些函數對象。分為:算數類函數對象,關系運算類函數對象&#xff0c;邏輯運算類仿函數。這些仿函數所產生的對象&#xff0c;用法和一般函數完全相同&#xff0c;當然我們還可以產生無名的臨時對象來履行函數功能。使用內建函數對象&#xff0c;需要引入頭文件 functi…

網絡之ICMP協議

ICMP 主要功能&#xff1a; 確認IP包是否成功送達目標地址通知在發送過程當中IP包被廢棄的具體原因改善網絡設置等 在IP通信中如果某個IP包因為某種原因未到達目標地址&#xff0c;那么這個原因由ICMP通知。 過程&#xff08;圖解TCP/IP&#xff09; ICMP類型 常見的&am…

C++ 常用算法之遍歷

#define _CRT_SECURE_NO_WARNINGS #include<iostream> #include <algorithm> #include <vector> #include <functional> using namespace std;/* 遍歷算法 遍歷容器元素 param beg 開始迭代器 param end 結束迭代器 param _callback 函數回調或者函數…

網絡之NAT協議

由來&#xff1a; 2011年2月3日中國農歷新年&#xff0c; IANA對外宣布&#xff1a;IPv4地址空間最后5個地址塊已經被分配給下屬的5個地區委員會。2011年4月15日&#xff0c;亞太區委員會APNIC對外宣布&#xff0c;除了個別保留地址外&#xff0c;本區域所有的IPv4地址基本耗盡…

C++ 常用查找算法

#define _CRT_SECURE_NO_WARNINGS #include<iostream> #include <algorithm> using namespace std; #include <vector> #include <string> #include <functional> /* find算法 查找元素 param beg 容器開始迭代器 param end 容器結束迭代器 para…

CentOS7卸載并安裝mysql教程

MySQL安裝 先卸載其他 刪除Mysql yum remove mysql mysql-server mysql-libs mysql-server;find / -name mysql 將找到的相關東西delete掉(rm -rf /var/lib/mysql)&#xff1b;rpm -qa|grep mysql(查詢出來的東東yum remove掉) rm /etc/my.cnf查看是否還有mysql軟件&#x…

C++ 常用排序算法

#define _CRT_SECURE_NO_WARNINGS #include<iostream> using namespace std; #include <algorithm> #include <vector> #include <functional> #include <ctime> /* merge算法 容器元素合并&#xff0c;并存儲到另一容器中 這兩個容器 必須也是…

排序穩定性的意義

首先&#xff0c;為什么會有排序算法穩定性的說法&#xff1f;只要能排好不就可以了嗎&#xff1f; 看例子 第1行是數字2 記作 1 2 第2行是數字4 記作 2 4 第3行是數字2 記作 3 2 排序后的結果&#xff08;如果看不懂命令的意思&#xff0c;參照這個博客&#xff09; 那么引入…

C++ 常用拷貝和替換算法

#define _CRT_SECURE_NO_WARNINGS #include<iostream> #include <vector> #include <algorithm> #include <iterator> using namespace std;/* copy算法 將容器內指定范圍的元素拷貝到另一容器中 param beg 容器開始迭代器 param end 容器結束迭代器 p…

防火墻的基礎知識入門

文章目錄防火墻基于實現方式&#xff0c;防火墻的發展分為四個階段:Linux 環境中主要的防火墻形式TCP wrappers~~詳解~~ 粗解Tcp wrappers的認識它的基本過程是這樣的&#xff1a;iptable攻擊和防御DDOS 攻擊常見的可能受到 DDOS 攻擊體現的癥狀有&#xff1a;而常見的 DDOS 攻…

C++ 常用算數生成算法

#define _CRT_SECURE_NO_WARNINGS #include<iostream> #include <vector> using namespace std; #include <algorithm> //不好使 #include <numeric> //好使 #include <iterator> /* accumulate算法 計算容器元素累計總和 param beg 容器開始迭代…

fork()請問下面的程序一共輸出多少個“A”?多少個-?

題目&#xff1a;請問下面的程序一共輸出多少個“-”&#xff1f; #include #include #include int main(void) { int i; for(i0; i<2; i){ fork(); printf("-"); } return 0; } 解析&#xff1a;一共輸出8個。 首先程序一開始&am…

C++ 常用集合算法

#define _CRT_SECURE_NO_WARNINGS #include<iostream> #include <algorithm> #include <vector> #include <iterator> using namespace std;/* set_intersection算法 求兩個set集合的交集 注意:兩個集合必須是有序序列 param beg1 容器1開始迭代器 par…

本能富可敵國,最后卻選擇拯救世界!Bram的Vim和烏干達兒童

他本能富可敵國&#xff0c;最后卻選擇拯救世界 在命令行界面輸入vim會出現一堆文件&#xff0c;但是一直有這么一句話 Help poor children in Uganda! “幫助可憐的烏干達兒童” 查詢了一下這里面相關的歷史背景和知識 在Vim許可證文件結束后的部分翻譯 &#xff0d;如果…

linux 常用命令01

/bin/bash 就是linux默認的shell ls命令 ls -a 顯示所有文件 包含隱藏文件 ls -R 遞歸顯示子目錄 ls -l 顯示詳細信息 ls -lrt 按照時間排序&#xff0c;顯示文件信息 配合通配符使用 ls *.c *匹配任意多個字符 ls xx.? 匹配任意一個字符 cd 命令 cd - 為切換到上次目錄 cd 回…

Linux基礎查漏補缺

文章目錄第二遍重新回顧Linux基礎查看主機名修改主機名查看IP地址Linux的 “--”和“-”根目錄文件的意義和作用alias直接在命令行界面輸入firefox數組越界發生什么命令行光標移動的幾個操作重定向第二遍重新回顧Linux基礎 1.查找忽略的知識點 2.再次記憶一些基礎知識 3.鞏固基…