首先自行安裝docker,通過docker容器安裝kafka
CentOS 系統 docker安裝地址
?1.pom.xml和application.properties或者application.yml文件配置
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
spring:kafka:bootstrap-servers: [fafka地址1,fafka地址2,....]
# producer序列化設置producer:#key序列化設置,設置成json對象
# key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# val序列化設置,設置成json對象value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
2.博主安裝了kafka ui插件,就直接創建主題了
當前一個集群,因為博主只搭建了一臺服務器,也可以稱為一個節點
創建主題
沒有安裝kafka ui,就再main那里啟動項目時創建
package com.atguigu.boot3_08_kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.TopicBuilder;@EnableKafka //掃描kafka注解,開啟基于注解的模式
@SpringBootApplication
public class Boot308KafkaApplication {public static void main(String[] args) {SpringApplication.run(Boot308KafkaApplication.class, args);TopicBuilder.name("my-new-topic")//主題.partitions(3)//分區.replicas(2)//副本.build();}}
副本就是備份,有幾節點就可以創建幾個副本,副本數量一般采取分區數量-1,只有一個節點就N分區1副本
?3.在main 加上這個注解@EnableKafka
package com.atguigu.boot3_08_kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@EnableKafka //掃描kafka注解,開啟基于注解的模式
@SpringBootApplication
public class Boot308KafkaApplication {public static void main(String[] args) {SpringApplication.run(Boot308KafkaApplication.class, args);}}
4.生產者發送消息
package com.atguigu.boot3_08_kafka.controller;import com.atguigu.boot3_08_kafka.entity.Person;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping("/jjj") public String hello() {kafkaTemplate.send("tach", 0,"hello","急急急132");//send("主題", 分區號,"key","val")return "ok";}@GetMapping("/odj")public String odj() {kafkaTemplate.send("tach", 0,"hello",new Person(1L,"odj",19));//對象json需要序列化,可用配置文件配置,也可以在對象中序列化對象return "OK";}
}
5.消費者監聽消息
package com.atguigu.boot3_08_kafka.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;@Component
public class MykafkaListener {/*** 默認的監聽是從最后一個消息開始拿,也就是只會拿新消息,不會拿歷史的* @KafkaListener(topics = "主題",groupId = "用戶組")* ConsumerRecord 消費者從 Kafka 獲取消息的各種元數據和實際的消息* @param record*/@KafkaListener(topics = "tach",groupId = "teach")public void listen(ConsumerRecord<?, ?> record) {Object key = record.key();Object val = record.value();System.out.println("收到值key:"+key+"收到值val:"+val);}/*** 想要到歷史的消息或者全部消息,只能設置偏移量* @KafkaListener(groupId = "用戶組" ,topicPartitions = {設置分區,設置偏移量})* @TopicPartition(topic = "主題" ,partitionOffsets 設置偏移量)* @PartitionOffset(partition = "哪個分區", initialOffset = "從第幾個偏移量開始")** @param record*/@KafkaListener(groupId = "teach" ,topicPartitions = {@TopicPartition(topic = "tach" ,partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "0")})})public void listens(ConsumerRecord<?, ?> record) {Object key = record.key();Object val = record.value();System.out.println("收到值key:"+key+"收到值val:"+val);}
}
最后查看結果
最后補充一個小知識
groupId = "用戶組"
組里的成員是競爭模式
用戶組和用戶組之間是發布/訂閱模式
由zookeeper分配管理
好了可以和面試官吹牛逼了
課外話
如果是傳對象json需要序列化,創建對象時序列化,不推薦太原始重要是很占資源
因為開始我們都配置好了,有對象就會自動序列化
package com.atguigu.boot3_08_kafka.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person implements Serializable {//不推薦implements Serializable private Long id;private String name;private Integer age;
}