1、拉取flink鏡像,創建網絡
docker pull flink
docker network create flink-network
2、創建 jobmanager
# 創建 JobManager docker run \-itd \--name=jobmanager \--publish 8081:8081 \--network flink-network \--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \flink:latest jobmanager
3、創建 taskmanager
# 創建 TaskManager docker run \-itd \--name=taskmanager \--network flink-network \--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \flink:latest taskmanager
4、訪問 http://localhost:8081/
4.1 修改Task Slots
默認的Slots num是1,我們可以修改為5:
修改的目錄是jobmanager和taskmanager的/opt/flink/conf
的flink-conf.yaml
文件:
修改taskmanager.numberOfTaskSlots:
即可。
注意:默認的docker容器中沒有vi/vim命令,可以使用docker cp命令,復制出來修改,然后在復制回去,如下:
docker cp taskmanager:/opt/flink/conf/flink-conf.yaml .
docker cp flink-conf.yaml taskmanager:/opt/flink/conf/
5、通過flinksql消費Kafka
確保有一個可用的kafka,如果沒有,可以五分鐘內,Docker搭建一個
Docker安裝kafka 3.5
并且通過python,簡單寫一個生產者
Python生產、消費Kafka
5.1 導入flink-sql-connector-kafka jar包
顧名思義,用于連接flinksql和kafka。
進入flink
docker exec -it jobmanager /bin/bash
進入 flink的bin目錄
cd /opt/flink/bin
查看flink版本:
flink --version
可以看出,我的版本是1.18.0
根據自己的flink版本,下載對應的 flink-sql-connector-kafka jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
因為我是1.18.0,所以選擇下圖的版本包:
點進去進行下載:
將下載的jar包,分別在jobmanager,taskmanager /opt/flink/lib
目錄下,注意,是兩個都要放,如下圖:
可以使用docker cp test.txt jobmanager:/opt/flink/lib
命令,用戶宿主機和docker容器文件傳輸。把test.txt換成對應的jar包即可
5.2 flinksql消費kafka
進入jobmanager中,執行
cd /opt/flink/bin
sql-client.sh
Flink SQL執行以下語句:
CREATE TABLE KafkaTable (`count_num` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'kafka_demo','properties.bootstrap.servers' = '192.168.10.15:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'json'
);show tables;
select * from KafkaTable;
可以看到Flink在消費kafka數據,如下圖: