版本:kafka_2.11-0.10.1.0 ?(之前安裝2.10-0.10.0.0,一直出問題)
?
- 安裝
- Springboot結合Kafka的使用
?
安裝
- 下載并解壓代碼
wget http://mirrors.cnnic.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz #http://kafka.apache.org/downloadstar -zxvf kafka_2.10-0.10.0.0.tgz
cd kafka_2.10-0.10.0.0
?
- 修改每個broker安裝目錄下的配置文件
# $targetID默認是0,每個broker的broker.id必須要唯一 broker.id=$targetID#默認是注釋的,$IP改成當前節點的IP即可。如果不改該配置項,在節點通過命令行可以收發消息,而在其他機器是無法通過IP去訪問隊列的 #在之前的版本不叫listeners,而是advertised.host.name和host.name listeners=PLAINTEXT://$IP:9092
?
- 啟動服務
#kafka依賴于zookeeper #如果沒有的話,可以通過kafka提供的腳本快速創建一個單節點zookeeper實例: #bin/zookeeper-server-start.sh config/zookeeper.properties#確認zookeeper服務已經啟動后,啟動kafka服務 nohup bin/kafka-server-start.sh config/server.properties &
?
- 創建一個名為test,有一份備份,一個分區的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #查看所有topicbin/kafka-topics.sh --list --zookeeper localhost:2181
?
- 發送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
?
- 開啟一個消費者接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
?
- 查看topic信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
?
Springboot結合Kafka的使用?
1.在pom文件添加依賴
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>
?
2.在application.properties中添加配置
# APACHE KAFKA (KafkaProperties) spring.kafka.bootstrap-servers=192.168.0.155:9092,192.168.0.156:9092 spring.kafka.client-id=K1
spring.kafka.consumer.auto-offset-reset= earliest spring.kafka.consumer.enable-auto-commit= true spring.kafka.consumer.group-id= test-consumer-group
spring.kafka.producer.batch-size=2 spring.kafka.producer.bootstrap-servers= 192.168.0.155:9092,192.168.0.156:9092 spring.kafka.producer.client-id= P1 spring.kafka.producer.retries=3 spring.kafka.template.default-topic= test
?
3.創建消費者類(訂閱消息的對象)
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;@Component public class ListenerBean {@KafkaListener(topics = "myTopic")public void processMessage(String content) {System.out.println("you have a new message:" + content);// ... } }
?
4.創建生產者類(發布消息的對象)
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController;@Component @RestController @RequestMapping("/send") @EnableAutoConfiguration public class SendMsgBean {private final KafkaTemplate<String,String> kafkaTemplate;@Autowiredpublic SendMsgBean(KafkaTemplate<String,String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@RequestMapping(path="/{msg}",method=RequestMethod.GET)public String send(@PathVariable("msg") String msg) {System.out.println("==sending msg:" + msg);kafkaTemplate.send("test","test-"+msg);return "message has been sent!";} }
?
?
只需這4步,就可以在springboot中使用kafka了,現在我們訪問 http://localhost:8080/send/mymessage ?就可以在控制臺看到信息了。
源碼下載
?
參考:
- Kafka producer程序本地運行時發送信息失敗解決方案