Kafka
簡介
傳統定義:Kafka是一個分布式的基于發布/訂閱模式的消息隊列,應用于大數據實時處理領域。
Kafka最新定義:Apache Kafka是一個開源分布式事件流平臺,被數千家公司用于高性能數據管道、流分析、數據集成和關鍵任務應用程序
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications
消息隊列的應用場景:
- 緩沖/消峰:有助于控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。比如雙十一用戶發送訪問的請求速度和系統處理的速度也是不一樣的,就需要消息隊列作為緩沖。
- 解耦:允許獨立的擴展或修改兩邊的處理過程,只要保證他們遵守同樣的接口約束。
- 異步通信:允許用戶把一個消息放入隊列,但是不立即處理它,在需要的時候再進行處理。
消息隊列的兩種模式:
- 點對點模式:一個生產者對應一個消費者,進行點對點的傳輸消息,消費者主動拉取數據,消息消費之后消除消息。
- 發布/訂閱模式:相當于可以分成多個不同的主題的消息,消費者只會接受自己想要的主題內的消息,消費者消費數據后不刪除數據,每個消費者之間相互獨立,都可以消費到消息。
基本結構
Kafka由以下幾個部分構成
- 生產者:向kafka發送消息的客戶端稱作生產者
- 消費者:向Kafka獲取消息的客戶端稱作消費者
- Broker:每一臺Kafka服務器稱之為Broker,一個集群由多個Broker組成
- Topic:主題,類似于存儲消息的隊列,生產者和消費者根據對應主題進行發送、接收消息
- Partition:分區,每一個Topic主題可以對應多個分區,也就是將一個大的Topic拆分成幾個小的partiton,可以存儲到不同的Broker上
- Replica:副本,為了讓集群中節點發送故障時,該節點上的分區數據不丟失,每一個Topic的每個分區都有副本,副本之間存在一個Leader(生產者發送消息、消費者消費消息都是通過Leader)和若干Follower(實時從Leader同步數據,保存和Leader數據的同步)
- Controller:Controller可以看作Broker的領導者,用于管理和協調Kafka集群,比如管理集群中所有分區的狀態,當副本的Leader所在節點崩潰,Controller會發起選舉。
環境配置
配置虛擬機
首先創建一個虛擬機,并克隆三臺虛擬機
登錄101虛擬機,進行配置端口
vim /etc/sysconfig/network-scripts/ifcfg-ens33
修改IPADDR
IPADDR="192.168.27.101"
配置主機名
vim /etc/hostname
修改主機名為centos101
centos101
然后reboot
reboot
其余兩臺虛擬機按照101的配置進行設置
安裝JDK
在101虛擬機安裝JDK
首先查看虛擬機有沒有自帶的jdk,如果有需要先刪除再安裝jdk
rpm -qa | grep -i java
獲取jdk安裝包
wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz
解壓安裝包
tar -zxvf jdk-8u131-linux-x64.tar.gz
配置環境變量
sudo vim /etc/profile.d/my_env.sh
在my_env.sh中輸入以下內容
#JAVA_HOME
export JAVA_HOME=/mydata/jdk/jdk1.8.0_131
export PATH=$PATH:$JAVA_HOME/bin
執行source命令
source /etc/profile
查看jdk是否安裝完畢
java -version
安裝Hadoop
使用華為云開源鏡像獲取安裝包
https://mirrors.huaweicloud.com/home
wget https://mirrors.huaweicloud.com/apache/hadoop/core/hadoop-3.1.3/hadoop-3.1.3.tar.gz
解壓安裝包
tar -zxvf hadoop-3.1.3.tar.gz
配置環境變量
sudo vim /etc/profile.d/my_env.sh
加入Hadoop的環境變量
#HADOOP_HOME
export HADOOP_HOME=/mydata/hadoop/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
執行source
source /etc/profile
查看是否安裝完成
hadoop version
配置其余兩臺虛擬機
scp安全拷貝命令將安裝好的jdk和hadoop拷貝到102和103
scp -r jdk/ root@192.168.27.102:/mydata
scp -r hadoop/ root@192.168.27.102:/mydata
注意:rsync遠程同步工具可以只對差異文件做更新,而scp是復制所有文件
xsync集群分發腳本可以循環復制文件所有的節點的相同目錄下
為了使用xsync,需要建立一個腳本來實現,腳本需要放在聲明了全局變量的路徑下,因此修改之前創建的my_env.sh,將/mydata/bin聲明全局變量
#MYDATA_HOME
export MYDATA_HOME=/mydata
export PATH=$PATH:$MYDATA_HOME/bin
[root@centos101 mydata]# source /etc/profile
[root@centos101 mydata]# echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/mydata/jdk/jdk1.8.0_131/bin:/mydata/jdk/jdk1.8.0_131/bin:/mydata/hadoop/hadoop-3.1.3/bin:/mydata/hadoop/hadoop-3.1.3/sbin:/mydata/jdk/jdk1.8.0_131/bin:/mydata/hadoop/hadoop-3.1.3/bin:/mydata/hadoop/hadoop-3.1.3/sbin:/mydata/bin
可以看到現在已經加入成功,進入/mydata/bin,新建一個xsync腳本
[root@centos101 mydata]# cd bin/
[root@centos101 bin]# ls
[root@centos101 bin]# vim xsync
#!/bin/bash#1. 判斷參數個數
if [ $# -lt 1 ]
thenecho Not Enough Arguement!exit;
fi#2. 遍歷集群所有機器
for host in 192.168.27.101 192.168.27.102 192.168.27.103
doecho ==================== $host ====================#3. 遍歷所有目錄,挨個發送for file in $@do#4. 判斷文件是否存在if [ -e $file ]then#5. 獲取父目錄pdir=$(cd -P $(dirname $file); pwd)#6. 獲取當前文件的名稱fname=$(basename $file)ssh $host "mkdir -p $pdir"rsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
done
配置腳本可執行權限
chmod 777 xsync
然后就可以直接使用xsync將新增的文件同步到其他主機,比如剛剛在101上面建立的/mydata/bin
xsync bin/
如果遇到rsync未找到命令可以先安裝,同步和被同步的主機都需要安裝這個命令
yum -y install rsync
完成之后將配置環境變量的文件my_env.sh同步到其他主機
進入到my_env.sh文件所在目錄下執行命令
xsync my_env.sh
同步完成后,將剛同步的主機執行source命令即可
配置SSH免密
首先先ssh 訪問別的主機,生成.ssh隱藏文件
ssh 192.168.27.102
然后可以在/root目錄下,通過ls -al查看隱藏文件
ls -al
進入.ssh,生成公鑰和私鑰,輸入命令執行三次回車即可
ssh-keygen -t rsa
可以查看公鑰
cat id_rsa.pub
將公鑰拷貝給其他主機
ssh-copy-id 192.168.27.102
這樣就可以不需要密碼訪問別的主機了(自己也可以配置一下免密訪問自己)
可以用xsync命令來測試一下,如果不需要使用密碼就可以同步文件到其他主機則設置成功
安裝Kafka
首先在官網下載Kafka
https://kafka.apache.org/downloads
然后將安裝包放入/mydata/kafka
如果需要直接拖入文件到虛擬機,可以提前執行命令下載相應組件
yum -y install lrzsz
[root@localhost kafka]# yum -y install lrzsz
解壓壓縮包
tar -zxvf kafka_2.12-3.0.0.tgz
進入解壓后的文件夾,配置kafka信息
[root@localhost kafka]# cd kafka_2.12-3.0.0
[root@localhost kafka_2.12-3.0.0]# ls
bin config libs LICENSE licenses NOTICE site-docs
[root@localhost kafka_2.12-3.0.0]# cd config
注意:如果發現vim:未找到命令情況
[root@localhost config]# vim server.properties -bash: vim: 未找到命令
顯示未找到命令,則查看有什么安裝包沒有安裝
[root@localhost config]# rpm -qa | grep vim vim-minimal-7.4.629-7.el7.x86_64
正常情況下應該有以下安裝包
[root@localhost config]# rpm -qa | grep vim vim-minimal-7.4.629-8.el7_9.x86_64 vim-common-7.4.629-8.el7_9.x86_64 vim-filesystem-7.4.629-8.el7_9.x86_64 vim-enhanced-7.4.629-8.el7_9.x86_64
將缺少的安裝包安裝上即可
yum -y install vim-enhanced
執行vim命令,查看server.properties
vim server.properties
其中有幾個地方需要注意
-
broker.id=0 相當于kafka在集群中的唯一標識,不同kafka必須要不同,不能重復
-
log.dirs=/tmp/kafka-logs 日志的路徑默認是在臨時文件夾下,要進行修改,將文件放到kafka的安裝目錄中
log.dirs=/mydata/kafka/datas
-
zookeeper.connect=localhost:2181 配置zookeeper的鏈接地址,后面加/kafka是為了規范管理kafka在zk中存儲的路徑
zookeeper.connect=192.168.27.101:2181,192.168.27.102:2181,192.168.27.103:2181/kafka
將kafka文件夾分發到其他主機
xsync kafka/
然后修改其他主機的brokerid,依次設置為0,1,2
最后設置環境變量
vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/mydata/kafka/kafka_2.12-3.0.0
export PATH=$PATH:$KAFKA_HOME/bin
修改完之后執行source命令
source /etc/profile
分發到其他主機上,其他主機也需要執行source命令
xsync /etc/profile.d/my_env.sh
啟動Kafka
先啟動zk,然后在啟動Kafka,zk啟動使用之前在Zookeeper中編寫的zk.sh腳本
zk.sh start
啟動kafka,三臺主機都要執行
bin/kafka-server-start.sh -daemon config/server.properties
查看是否啟動成功
[root@centos101 kafka_2.12-3.0.0]# jps
1954 QuorumPeerMain
3605 Kafka
3641 Jps
為了方便集群控制,編寫腳本
#!/bin/bash
case $1 in
"start")for i in 192.168.27.101 192.168.27.102 192.168.27.103doecho "-------------啟動 $i kafka-------------"ssh $i "/mydata/kafka/kafka_2.12-3.0.0/bin/kafka-server-start.sh -daemon /mydata/kafka/kafka_2.12-3.0.0/config/server.properties"done
;;
"stop")for i in 192.168.27.101 192.168.27.102 192.168.27.103doecho "-------------停止 $i kafka-------------"ssh $i "/mydata/kafka/kafka_2.12-3.0.0/bin/kafka-server-stop.sh"done
;;
esac
在/mydata/bin(自己配的具備環境變量的目錄,專門存放腳本文件)下新建kf.sh,寫入腳本
vim kf.sh
配置權限
chmod 777 kf.sh
[root@centos101 bin]# ll
總用量 12
-rwxrwxrwx. 1 root root 480 7月 18 15:50 kf.sh
-rwxrwxrwx. 1 root root 766 7月 14 15:10 xsync
-rwxrwxrwx. 1 root root 674 7月 17 14:40 zk.sh
注意:如果要關閉時,要先關閉Kafka再關閉Zookeeper,并且要確認kafka已經完全關閉。