生產者代碼:?
package com.kafka.test;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;public class KafkaProducerTest {public static void main(String[] args) {//創建配置對象Map<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//對生產者的數據k v 進行序列化的操作config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//創建生產者對象 生產者需要設定泛型:數據的類型約束KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);//創建數據ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("test", "hello", "Hello Kafka!");//通過生產者對象發送數據到kafkaproducer.send(producerRecord);//關閉生產者對象producer.close();}
}
?消費者代碼
package com.kafka.test;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;public class KafkaConsumerTest {public static void main(String[] args) {//創建配置對象Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//對消費者的數據k v 進行反序列化的操作config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());config.put(ConsumerConfig.GROUP_ID_CONFIG,"Test");//取哪個group里的消息//創建消費者對象 消費者需要設定泛型:數據的類型約束KafkaConsumer<String, String> consumer =new KafkaConsumer<String,String>(config);//訂閱主題,可以多個 所以這里是listconsumer.subscribe(Collections.singletonList("Test"));//從kafka拉取數據ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}
}