?producer:發布消息的對象,稱為消息產生者 (Kafka topic producer)
topic:Kafka將消息分門別類,每一個消息稱為一個主題(topic)
consumer:訂閱消息并處理發布消息的對象稱為消費者(consumer)
broker:已發布的消息保存在一組服務器中,稱為kafka集群,集群中的每一個服務器都是一個代理(broker),消費者(consumer)可以訂閱一個或者多個主題(topic),并從broker中拉取數據,從而消費這些已發布的信息。
1、Kafka對zookeeper是一個強依賴,保存Kafka相關的節點數據,所以安裝kafka之前要先安裝zookeeper
下載鏡像
docker pull zookeeper:3.4.14
創建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
下載鏡像
docker pull wurstmeister/kafka:2.12-2.3.1
創建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1
2、入門案例
①創建kafka-demo工程并引入依賴
<!--kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>
②創建ProducerQuickStart生產者類并實現
package com.heima.kafkademo.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*生產者*/
public class ProducerQuickStart {public static void main(String[] args) {/*1、kafka配置信息*/Properties properties = new Properties();//kafka連接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//發送失敗,失敗的重試次數properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);//key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*2、生產對象*/KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封裝發送消息的對象ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");/*3、發送消息*/producer.send(record);/*4、關閉通道,負責消息發送不成功*/producer.close();}
}
③創建ConsumerQuickStart消費者類并實現
package com.heima.kafkademo.sample;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*消費者*/
public class ConsumerQuickStart {public static void main(String[] args) {/*1、kafka配置信息*/Properties properties = new Properties();//kafka連接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//發送失敗,失敗的重試次數properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);//key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*2、消費者對象*/KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);/*3、訂閱主題*/consumer.subscribe(Collections.singletonList("itheima-topic"));//當前線程處于一直監聽狀態while (true){//4、獲取消息ConsumerRecords<String,String> consumerRecords =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : consumerRecords) {System.out.println(record.key());System.out.println(record.value());}}}
}
④運行測試
? ? ? ? 成功接收到消息
總結
-
生產者發送消息,多個消費者訂閱同一個主題,只能有一個消費者收到消息(一對一)
-
生產者發送消息,多個消費者訂閱同一個主題,所有消費者都能收到消息(一對多)
下一篇:?springboot集成kafka收發消息