- 目錄
- 1、Kafka集群部署
- 2、Kafka常用操作命令
目錄
1、Kafka集群部署
1.1、下載安裝包
http://kafka.apache.org/downloads.html
在linux中使用wget命令下載安裝包
wget http://mirrors.hust.edu.cn/apache/kafka/0.8.2.2/kafka_2.11-0.8.2.2.tgz
1.2、解壓安裝包
tar -zxvf /export/software/kafka_2.11-0.8.2.2.tgz -C /export/servers/
1.3、修改配置文件
vi /export/servers/kafka/config/server.properties
輸入以下內容:
#broker的全局唯一編號,不能重復
broker.id=0#用來監聽鏈接的端口,producer或consumer將在此端口建立連接
port=9092#處理網絡請求的線程數量
num.network.threads=3#用來處理磁盤IO的線程數量
num.io.threads=8#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400#接受套接字的緩沖區大小
socket.receive.buffer.bytes=102400#請求套接字的緩沖區大小
socket.request.max.bytes=104857600#kafka運行日志存放的路徑
log.dirs=/root/kafkalog#topic在當前broker上的分片個數
num.partitions=2#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168#滾動生成新的segment文件的最大時間
log.roll.hours=168#日志文件中每個segment的大小,默認為1G
log.segment.bytes=1073741824#周期性檢查文件大小的時間
log.retention.check.interval.ms=300000#日志清理是否打開
log.cleaner.enable=true#broker需要使用zookeeper保存meta數據
zookeeper.connect=shizhan:2181,mini2:2181,mini3:2181#zookeeper鏈接超時時間
zookeeper.connection.timeout.ms=6000#partion buffer中,消息的條數達到閾值,將觸發flush到磁盤
log.flush.interval.messages=10000#消息buffer的時間,達到閾值,將觸發flush到磁盤
log.flush.interval.ms=3000#刪除topic需要server.properties中設置delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true#此處的host.name為本機IP(重要),如果不改,則客戶端會拋出:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=192.168.112.200
1.4、分發安裝包
scp -r /export/servers/kafka_2.11-0.8.2.2 kafka02:/export/servers
1.5、再次修改配置文件(重要)
依次修改各服務器上配置文件的的broker.id,分別是0,1,2不得重復。
對應的host的IP地址更改為各個主機的ip地址
另外將產生的log文件的輸出地址更改下
zk的地址更改為自己機器的地址
1.6、啟動集群
依次在各節點上啟動kafka
bin/kafka-server-start.sh config/server.properties
2、Kafka常用操作命令
- 查看當前服務器中的所有topic
bin/kafka-topics.sh –list –zookeeper zk01:2181 - 創建topic
./kafka-topics.sh –create –zookeeper mini1:2181 –replication-factor 1 –partitions 3 –topic first - 刪除topic
sh bin/kafka-topics.sh –delete –zookeeper zk01:2181 –topic test
需要server.properties中設置delete.topic.enable=true否則只是標記刪除或者直接重啟。 - 通過shell命令發送消息
kafka-console-producer.sh –broker-list kafka01:9092 –topic itheima - 通過shell消費消息
sh bin/kafka-console-consumer.sh –zookeeper zk01:2181 –from-beginning –topic test1 - 查看消費位置
sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –zookeeper zk01:2181 –group testGroup - 查看某個Topic的詳情
sh kafka-topics.sh –topic test –describe –zookeeper zk01:2181