目錄
- 獨立消費者案例(訂閱主題)
- 獨立消費者案例(訂閱分區)
- 消費者組案例
獨立消費者案例(訂閱主題)
package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("first");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}
獨立消費者案例(訂閱分區)
package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerPartition {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("first",0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}
消費者組案例
測試同一個主題的分區數據,只能由于一個消費者組中的一個一個消費
消費者1
package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}
消費者2
package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}
消費者3
package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer2 {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}
三個消費者的組ID相同,會形成消費者組,每個消費者消費一個分區數據
生產者發送數據
package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 連接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定對應的key和value的序列化類型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 設置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重試次數retries,默認是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 創建kafka生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 發送數據for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",2,"","分區2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主題:" +metadata.topic() + " 分區: " +metadata.partition());}}});}// 關閉資源kafkaProducer.close();}
}
package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 連接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定對應的key和value的序列化類型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 設置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重試次數retries,默認是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 創建kafka生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 發送數據for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",1,"","分區2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主題:" +metadata.topic() + " 分區: " +metadata.partition());}}});}// 關閉資源kafkaProducer.close();}
}
```c
package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 連接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定對應的key和value的序列化類型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 設置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重試次數retries,默認是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 創建kafka生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 發送數據for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",0,"","分區2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主題:" +metadata.topic() + " 分區: " +metadata.partition());}}});}// 關閉資源kafkaProducer.close();}
}