單節點:
1、解壓kafka壓縮包到安裝目錄(自己指定);
2、進入kafka目錄并執行命令?
> bin/zookeeper-server-start.sh config/zookeeper.properties
#如果報錯,修改kafka-run-class.sh,將 -XX:+UseCompressedOops 刪除;
3、執行命令> bin/kafka-start.sh config/server.properties 1 >/dev/null 2>&1 &
4、創建topic ?
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --zookeeper localhost:2181
5、發送消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
6、接收消息
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
?
集群搭建:
hdp0 ?hdp1 ?hdp2 ?hdp3
zookeeper配置在0 1 2上,并已啟動。
1、在hdp0上修改配置文件
> vi config/server.properties:
broker.id=1
zookeeper.connect=hdp0:2181,hdp1:2181,hdp2:2181
(注意:log.dir=/tmp/kafka-logs ?#生產環境中不用tmp目錄)
?
2、將kafka目錄傳到1 2 3上
3、修改各自的broker.id
4、在4臺主機,kafka目錄下分別執行命令> bin/kafka-start.sh config/server.properties &?
5、創建topic ?
> bin/kafka-topics.sh --create --zookeeper hdp0:2181 --replication-factor 3 --partitions 1 --topic mytopic
> bin/kafka-topics.sh --create --zookeeper hdp0:2181 --replication-factor 3 --partitions 1 --topic mytopic1
> bin/kafka-topics.sh --list --zookeeper hdp0:2181
?
6、發送消息
hdp1主機:
> bin/kafka-console-producer.sh --broker-list hdp0:9092 --topic mytopic
?
7、接受消息
hdp0主機:
> bin/kafka-console-consumer.sh --zookeeper hdp1:2181 --from-beginning --topic mytopic
?
8、查看topic的狀態信息
> bin/kafka-topics.sh --describe --zookeeper hdp0:2181 --topic mytopic
?
?
?