一:搭建kafka。
1. 三臺機器執行以下命令。
cd /opt
wget wget https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar zxvf kafka_2.13-3.6.1.tgz
cd kafka_2.13-3.6.1/config
vim server.properties
修改以下倆內容
1.三臺機器分別給予各自的broker_id。
2. 配置zk。
3. 啟動測試。
3.1 后臺啟動。
第一步:啟動zk。
第二步:執行啟動命令
nohup /opt/kafka_2.13-3.6.1/bin/kafka-server-start.sh /opt/kafka_2.13-3.6.1/config/server.properties > /dev/null 2>&1 &
3.2 測試。
在一臺機器上執行創建topic命令。
/opt/kafka_2.13-3.6.1/bin/kafka-topics.sh --create --topic my-topic-kraft --bootstrap-server localhost:9092
在另外一臺機器上執行查看topic命令。
/opt/kafka_2.13-3.6.1/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
二:搭建flink。
1. 三臺機器下載flink。
cd /opt
https://www.apache.org/dyn/closer.lua/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz
2.修改配置參數。
- 三臺機器都修改 flink-conf.yaml
cd /opt/flink-1.13.6/conf
vim flink-conf.yaml
填寫主節點地址
zk地址修改
high-availability.storageDir: hdfs://10.15.250.196/flink/ha/
state.checkpoints.dir: hdfs://10.15.250.196/flink-checkpoints
- 修改 masters
vim masters
3. 修改works
vim works
其他倆臺機器地址填寫到此處。
4. 添加jar包到lib目錄下。
3.啟動flink。
cd /opt/flink-1.13.6/bin
./start-cluster.sh
查看頁面,ip位主節點,端口8081
三:dinky
1. mysql初始化。
mysql -uroot -p123456
create database dinky;
grant all privileges on dinky.* to 'dinky'@'%' identified by 'dinky' with grant option;
flush privileges;
2:上傳dinky。
上傳安裝包至目錄/opt
tar -zxvf dlink-release-0.6.6.tar.gz
mv dlink-release-0.6.6.tar.gz dinky
cd dinky
#首先登錄 mysql
mysql -udinky -pdinky
mysql>use dinky;
mysql>source /opt/dinky/sql/dlink.sql
3. 配置mysql。
cd config/
vim application.yml
spring:datasource:url: jdbc:mysql://xxxx:3306/dinky?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRet
rieval=true username: dinkypassword: dinky
4. 添加jar包。
mkdir /opt/dlink/plugins
5. 啟動服務。
cd /opt/dinky
sh auto.sh start 1.13
sh auto.sh stop
jps
地址:http://192.168.50.60:8888/#/datastudio
賬號:admin
密碼:admin
四:實時計算小案例。
1. flink申請yarn資源。
/opt/flink-1.13.6/bin/yarn-session.sh -n 4 -tm 1024m -s 2 &
2. kafka生成topic
/opt/kafka_2.13-3.6.1/bin/kafka-topics.sh --create --topic my-topic-kraft --bootstrap-server localhost:9092
3. dinky編寫sql,造數據實時寫入kafka
set execution.checkpointing.interval = 30s;
set state.checkpoints.dir=hdfs://192.168.50.60:8020/cluster/flink/checkpointes_;
set state.savepoints.dir=hdfs://192.168.50.60:8020/cluster/flink/savepointkes_;CREATE TABLE source_table (age INT, sex STRING, t_insert_time AS localtimestamp,WATERMARK FOR t_insert_time AS t_insert_time ) WITH ('connector' = 'datagen', 'rows-per-second'='5','fields.age.min'='1','fields.age.max'='1000','fields.sex.length'='10');CREATE TABLE KafkaTable (`age` int,`sex` STRING,t_insert_time TIMESTAMP
) WITH ('connector' = 'kafka','topic' = 'my-topic-kraft','properties.bootstrap.servers' = '192.168.50.60:9092',--'properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'json'
);
insert into KafkaTable
select age,sex,t_insert_time from source_table;
4. 抽取kafka中數據,進行累加計算,插入到mysql
set execution.checkpointing.interval = 30s;
SET execution.type = streaming;
set state.checkpoints.dir=hdfs://192.168.50.60:8020/cluster/flink/checkpointes_;
set state.savepoints.dir=hdfs://192.168.50.60:8020/cluster/flink/savepointkes_;CREATE TABLE MyUserTable (window_end_time TIMESTAMP,create_time TIMESTAMP,window_proctime_time TIMESTAMP,age int,count_sum bigint,PRIMARY KEY (age) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.50.60:3306/test','table-name' = 'test_kafka','username' = 'root','password' = '123456','sink.buffer-flush.max-rows' = '1');-- select * from MyUserTable;
CREATE TABLE KafkaTable (`age` int,`sex` STRING,t_insert_time TIMESTAMP,`ts1` as CAST(t_insert_time AS TIMESTAMP_LTZ(3)),WATERMARK FOR ts1 AS ts1 - INTERVAL '5' SECOND -- 在t_s上定義5 秒延遲的 watermark
) WITH ('connector' = 'kafka','topic' = 'my-topic-kraft','properties.bootstrap.servers' = '192.168.50.60:9092',--'properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'json'
);--insert into MyUserTable
selectwindow_end as window_end_time,window_start as create_time,PROCTIME() as window_proctime_time,age,count(*) as count_sum
FROM TABLE(CUMULATE(TABLE KafkaTable, DESCRIPTOR(ts1), INTERVAL '5' MINUTES, INTERVAL '1' DAY))
group bywindow_end,window_start,age
;