簡單的服務器間的通信示例
netcat,簡寫為 nc,是 unix 系統下一個強大的命令行網絡通信工具,用于在兩臺主機之間建立 TCP 或者 UDP 連接,并提供豐富的命令進行數據通信。nc 在網絡參考模型屬于應用層。使用 nc 可以做很多事情:建立連接,發送數據包,監聽端口,掃描端口,處理 ip4 和 ip6,和 telnet 不同,nc 會區分錯誤輸出和標準輸出,telnet 則都是標準輸出。
啟動服務端 nc -l 端口號nc -l 6666啟動客戶端nc 服務端IP 6666
Kafka 類似,但更加強大,下面是一個Kafka 生產者生產數據到topic ,消費者(flink)在topic中消費到數據,將數據落地為文件的案例:
第一步:創建一個topic?
1、topic名稱帶有明顯來源和業務的單詞,例如:t_jif_tgcdr
2、topic備份數量小于等于kafka節點數;
3、topic分區數應是備份數的倍數關系;
4、檢查topic是否已經存在,如果存在,需另外起名
kafka-topics.sh \
--bootstrap-server xxx.xxx.xxx.xxx:xxxx \
--create \
--replication-factor 3 \
--partitions 3 \
--topic kfk_big_data_study
也許會出現這個警告,就是建議topic 名稱
?
查看kafka是否創建成功
kafka-topics.sh --list --bootstrap-server?xxx.xxx.xxx.xxx:xxxx |grep study
二、對接表字段
1、對端一定要提供數據的結構;
2、對端要提供數據樣例;
3、通過樣例判斷是txt、json、還是混合數據格式
4、要確定數據是實時、增量、全量問題
5、在數據云調度上創建物理模型并落地hive
6、如需同步行云,需創建物理模型落地行云
這里面我們只介紹自己生產數據, 數據樣例:
結構如:
name|age|kungfu
例如
歐陽鋒|42|蛤蟆功
三、創建生產者
kafka-console-producer.sh --broker-list xxx.xxx.xxx.xxx:xxxx --topic kfk_big_data_study
四、測試消費
kafka-console-consumer.sh --bootstrap-server?xxx.xxx.xxx.xxx:xxxx ?--from-beginning --topic kfk_big_data_study
五、創建Flink來消費Topic中的數據
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/connectors/table/kafka/
set setexecution.checkpointing.interval=30sec;
set parallelism.default=9;
set execution.target=yarn-per-job;
set yarn.application.name=yarn_kfk_big_data_study;
set yarn.application.queue=root|default|hadoop|user-defined;--創建Kafka表
drop table if EXISTS kafka_big_data_study;
CREATE TABLE IF NOT EXISTS kafka_big_data_study(
name string,
age string,
kungfu string
) WITH ('connector' = 'kafka','topic' = 'kfk_big_data_study','properties.group.id'='group_01','properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:xxxx','scan.startup.mode' = 'earliest-offset','format' = 'csv','csv.ignore-parse-errors' = 'true','csv.allow-comments' = 'true'
);--創建Sink表
drop table IF EXISTS t_big_data_study;
CREATE TABLE t_big_data_study(
name string,
age string,
kungfu string
) WITH ('connector' = 'filesystem','path' = 'hdfs://beh001/gsdx_data/spooldirtohive/study/t_big_data_study/','format' = 'csv','csv.field-delimiter' = '|'
);--從kafka表插入數據到Sink表
insert into t_big_data_study
select
name ,
age ,
kungfu
from kafka_big_data_study;
將以上腳本保存在一個自定義的sql文件中,然后使用下面的命令調用
?
sql-client.sh -f study.sql?
看到這個情況說明flink job已經啟動;
接下來,生產一條消息看看是否會落地到hdfs目錄