? kafka快速入門與知識匯總
一、前言
kafka是一款消息中間件,可以用于傳輸消息和日志收集、監控項目狀況。與其類似的技術棧有rocketmq、rabbitmq等,但這些技術棧大多應用在一些簡單的消息傳輸平臺,而kafka則因其對大量數據的高性能處理在大數據領域受到青睞。
二、為什么要有消息中間件?
1、應用解耦。想象這樣一個場景,有一個服務依賴于另外一個服務,即某個服務需要將數據傳輸給另外一個服務進行處理數據,那么這個服務就必須要等待另一服務處理完數據。如果使用消息中間件,我們可以將數據發送到kafka中,其他服務監聽kafka相關服務,接收消息進行處理,這樣,服務間就不需要互相等待。
2、異步提速。我們在進行上網購物時,需要進行一系列操作,如下訂單–>預減庫存–>支付–>短信發送–>訂單狀態。如果每一步都需要1s時間,那么這一操作就需要花費至少5s時間,這對于用戶來說是不能忍受的。使用消息中間件后,我們只需要下訂單后即可去做其他事情,之間操作異步處理,處理完成返回處理結果即可,響應就快多了
3、削峰填谷。當有大量請求發送到服務器時,如果直接將這些請求交由數據庫處理,會對數據庫造成很大的性能瓶頸。所以可以先將請求發送到kafka中,消費者根據自身處理請求的能力進行消費即可。提高了系統的可用性。
三、架構設計
名詞解釋:
事件:可以理解為發送的一條消息,這條消息包含一些元數據,指明了其發送的地點以及消息內容。
生產者:用于將消息發送出去
消費者:消費消息
主題:可以理解為消息的類型
Broker:kafka協調者,用于管理消息隊列等。
下面用一張圖帶你看看kafka的架構:
生產者producer像broker集群發送消息,在消息到達之前,首先會對消息進行序列化(要在網絡間傳輸),進行分區選擇(確認該消息最終發送到哪個主題的哪個分區下),如果實現了攔截器,則會攔截消息處理后在發送到緩沖區,待消息數量到達一定數量或者時間到了將該批數據發送到 對應的partition分區(只會發送到leader分區,follower分區同步leader分區數據,用于leader分區所在broker宕機后不影響后續操作,這是kafka高可用的一個保證),其中broker集群信息由zookerpeer集群統一管理。每個消費者都屬于一個消費組,消費分區中的消息。
四、基本的命令行操作
將kafka壓縮文件解壓后,我們需要修改一些屬性,在kafka/config包下找到server.properties文件
按需修改以下內容
#broker的全局唯一編號,不能重復
broker.id=0
#刪除topic功能,當前版本此配置默認為true
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的線程數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區大小
socket.request.max.bytes=104857600
#topic在當前broker上的分區個數
num.partitions=1
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時7*24h將被刪除,單位小時
log.retention.hours=168
#每個segment文件的大小,默認最大 1G
log.segment.bytes=1073741824
# 檢查過期數據的時間,默認5分鐘檢查一次是否數據過期,單位ms
log.retention.check.interval.ms=300000
#配置連接zk集群地址
zookeeper.connect=node2:2181,node3:2181,node4:2181/kafka
啟動kafka服務
kafka-server-start.sh -daemon /你的kafka路勁/config/server.properties
關閉服務
kafka-server-stop.sh
1、操作topic
–bootstrap-server | node3:9092 | 連接的 Kafka Broker 主機名稱和端口號 |
---|---|---|
–topic | <String: topic> 比如:topicA | 操作的 topic 名稱 |
–list | 查看所有主題 | |
–create | 創建主題 | |
–delete | 刪除主題 | |
–alter | 修改主題 | |
–describe | 查看主題詳細描述 | |
–partitions | <Integer: # of partitions> | 設置分區數 |
–replication-factor | <Integer: replication factor> | 設置分區副本 |
–config | <String: name=value> | 更新系統默認的配置 |
–version | 查看當前系統kafka的版本 | |
–help | 輸出幫助信息 |
示例:
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topicA --partition 1 --replication-factor 2
2、發送消息
參數 | 值 | 描述 |
---|---|---|
–bootstrap-server | localhost:9092 | 連接的 Kafka Broker 主機名稱和端口號 |
–topic | topicA | 操作的 topic 名稱 |
–batch-size | <Integer: size> | 生成多少條提交一次 |
–version | 查詢kafka版本 |
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topicA
3、消費消息
參數 | 值 | 描述 |
---|---|---|
–bootstrap-server | localhost:9092 | 連接的 Kafka Broker 主機名稱和端口號 |
–topic | topicA | 操作的 topic 名稱 |
–from-beginning | 從頭開始消費數據 | |
–group | 指定消費者的消費組id | |
–version | 查詢kafka版本 |
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicA
五、生產者
在學習生產者發送消息之前,我們先來了解一下其發送消息的原理。
一條消息通常需要指定其 消息主題(topic)、分區(可選)、key(可選)、value(消息正文)。
消息被包裝完成后,調用生產者的send()方法,此時會檢查是否實現了攔截器(多個則鏈式處理),此時可以攔截消息做一些處理,比如說添加唯一業務編號(防止重復消費),接著經過序列化器,將key和value進行序列化,再經過分區器,選擇發送的分區,經過這一系列操作,消息最終被發送到一個緩沖區中,此時消息不會立即發送到對應的分區,而是會等待消息數量達到設置的值或者時間到了,會將該批數據一起發送給partitioner(提高效率)。消息發送到對應的分區后,會觸發相應的確認應答機制:
如果ack:0 此時消息還沒被leader分區接收就回復發送成功信息,消息不可靠
如果ack:1 此時leader分區接收到消息后應答,但是follower分區還未同步數據
ack:-1或者all 可靠性最強,需要等待leader及其所有follower確認后應答
1、創建maven項目,導入依賴
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.1</version></dependency>
</dependencies>
2、發送同步消息 producer.send(record).get()
// 同步自定義生產者
public class SyncCustomerProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 創建配置對象Properties props = new Properties();// 添加配置屬性props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092"); // kafka集群地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // key序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化// 創建生產者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for(int i=0;i<10;i++) {// 創建消息對象ProducerRecord<String, String> record= new ProducerRecord<String, String>("topicA","message_"+i);// 發送同步消息producer.send(record).get();}// 關閉生產者producer.close();}
}
3、發送異步消息 producer.send(record)
// 異步自定義生產者
public class ASyncCustomerProducer {public static void main(String[] args) {Properties prop = new Properties();prop.put("bootstrap.servers","192.168.66.99:9092");prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(prop);for(int i=0;i<5;i++) {producer.send(new ProducerRecord<String, String>("topicA","message_"+i));}producer.close();}}
4、異步發送后回調結果
// 異步回調自定義生產者
public class ASyncCallBackCustomerProducer {public static void main(String[] args) {Properties prop = new Properties();prop.put("bootstrap.servers","192.168.66.99:9092");prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(prop);for(int i=0;i<5;i++) {producer.send(new ProducerRecord<String, String>("topicA","message_"+i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception!=null) { exception.printStackTrace();}else { //異常為空說明發送成功System.out.println("分區:"+metadata.partition()+" 偏移量:"+metadata.offset());}}});}producer.close();}}
5、攔截消息
5.1 自定義攔截器,需要實現ProducerInterceptor接口下的方法
// 消息攔截器
public class MyProducerInterceptor implements ProducerInterceptor {private Integer succectNum = 0;private Integer failNum = 0;//做一些初始化的工作。@Overridepublic void configure(Map<String, ?> map) {}/*它運行在用戶的main線程中,producer確保在消息被序列化以計算分區前調用該方法。用戶可以在該方法中對消息做任何操作,但最好不要修改消息所屬的topic和分區,否則會影響目標分區的計算。*/@Overridepublic ProducerRecord onSend(ProducerRecord producerRecord) {return new ProducerRecord(producerRecord.topic(),producerRecord.partition(),producerRecord.timestamp(),producerRecord.key(),"攔截器處理后的消息:"+producerRecord.value());}/*該方法會在消息被應答之前或消息發送失敗時調用,并且通常都是在回調邏輯觸發之前。該方法運行在producer的I/O線程中,因此不要在該方法中放入很“重”的邏輯,否則會拖慢producer的消息發送效率。*/@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(e == null){succectNum++;}else{failNum++;}}//主要用于執行一些資源清理的工作。@Overridepublic void close() {System.out.println("succectNum:"+succectNum);System.out.println("failNum:"+failNum);}}
5.2 應用自定義攔截器,發送消息
// 實現攔截器
public class SyncCustomerProducerInterceptor {public static void main(String[] args) throws ExecutionException, InterruptedException {// 創建配置對象Properties props = new Properties();// 添加配置屬性props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092"); // kafka集群地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // key序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化// 添加攔截器鏈List<String> interceptors = new ArrayList<>();interceptors.add(MyProducerInterceptor.class.getName());// 添加其他的攔截器構成攔截器鏈。。。props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);// 創建生產者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for(int i=0;i<10;i++) {// 創建消息對象ProducerRecord<String, String> record= new ProducerRecord<String, String>("topicA","message_"+i);// 發送同步消息producer.send(record).get();}// 關閉生產者producer.close();}
}
6、消息序列化
kafka已經為我們提供了一些內置的序列化類,我們可以直接使用,如果不能滿足需要我們也可以自定義序列化。
6.1 創建消息體對象(即需要被序列化的主體
public class UserVo {private String name;private Integer age;private String address;@Overridepublic String toString() {return "UserVo{" +"name='" + name + '\'' +", age=" + age +", address='" + address + '\'' +'}';}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}
}
6.2自定義序列化器
// 自定義序列化器
public class UserSerializer implements Serializer {private ObjectMapper objectMapper;@Overridepublic void configure(Map configs, boolean isKey) {objectMapper = new ObjectMapper();Serializer.super.configure(configs, isKey);}@Overridepublic byte[] serialize(String s, Object o) {byte[] bytes = null;try {bytes = objectMapper.writeValueAsBytes(o);} catch (Exception e) {e.printStackTrace();}return bytes;}@Overridepublic void close() {objectMapper = null;Serializer.super.close();}
}
6.3 使用自定義的序列化對value進行序列化
// 自定義序列化生產者
public class MySerializerProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 創建配置對象Properties props = new Properties();// 添加配置屬性props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092"); // kafka集群地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // key序列化// 使用自定義的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName()); // value序列化// 創建生產者KafkaProducer<String, UserVo> producer = new KafkaProducer<String, UserVo>(props);UserVo userVo = new UserVo();userVo.setName("張三");userVo.setAge(18);userVo.setAddress("北京");// 創建消息對象ProducerRecord<String, UserVo> record= new ProducerRecord<String, UserVo>("topicA",userVo);// 發送同步消息producer.send(record).get();// 關閉生產者producer.close();}}
7、分區
7.1 分區策略:查看源碼可以發現kafka再選擇分區時采用以下規則,如果指定了分區號,則一定選擇該分區,如果沒有分區號,則對key進行hash計算后對分區數取模,如果key也不存在的話,則采用輪詢的方式依次發送到各分區(也可以采用隨機法)。
7.2 發送數據到指定的分區(可以實現消息的有序性)
// 發送給指定的分區
public class PartitionProducer {public static void main(String[] args) {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(prop);// 1、情況1:手動指定分區ProducerRecord<String, String> record= new ProducerRecord<>("topicA", 0, null, "Hello World");// 2、情況2:沒指定分區時,當key存在,會根據key的hash值計算分區
// ProducerRecord<String, String> record
// = new ProducerRecord<>("topicA", "key", "Hello World");// 3、情況3:沒指定分區時,當key不存在,會輪詢發送到每個分區
// ProducerRecord<String, String> record
// = new ProducerRecord<>("topicA", "Hello World");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {e.printStackTrace();} else {System.out.println(recordMetadata.topic() + " " + recordMetadata.partition() + " " + recordMetadata.offset());}}});producer.close();}
}
7.3自定義分區規則
// 自定義分區器
public class MyPartitioner implements Partitioner {// 初始化工作,用于分配和創建資源@Overridepublic void configure(Map<String, ?> map) {}/** 計算信息對應的分區* @param topic 主題* @param key 消息的key* @param keyBytes 消息的key序列化后的字節數組* @param value 消息的value* @param valueBytes 消息value序列化后的字節數組* @param cluster 集群元數據 可以獲取分區信息* @return 息對應的分區號*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 根據你的業務邏輯進行分區。。。// 這里使用key的長度作為分區號int len = key.toString().length();// 分區數量int partitionNum = cluster.partitionCountForTopic(topic);return len % partitionNum;}@Overridepublic void close() {}}
// 自定義分區規則
public class MyPartitionerProducer {public static void main(String[] args) {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 指定分區規則(如果設置了分區,就不會使用自定義規則)prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.itbaizhan.kafka.producer.partition.MyPartitioner");KafkaProducer<String, String> producer = new KafkaProducer<>(prop);ProducerRecord<String, String> record1 = new ProducerRecord<>("topicA",1, "kkk", "我是長度為一的key");producer.send(record1, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.out.println("發送失敗");} else {System.out.println(metadata.topic() + " ----- " + metadata.partition());}}});producer.close();}}
8、消息去重(冪等性)
思考這樣一個場景,你再進行點外賣時,由于學校的網絡實在是一言難盡,導致你一連發送了幾個支付請求,如果不對該請求進行冪等性處理,你就要花冤枉錢了。。。
于是,我們就希望實現對于用戶發送過來的某些請求,只處理一次,重復的就不再進行處理了。
再kafka中設計了一個 <PID,Partition,SeqNumber>的主鍵,當主鍵相同時認為是同一條數據,不會進行處理,其中PID是每次kafka重啟后都會分配一個新的,Pratition為分區號,SeqNumber為自增的數。所以kafka只能保證單分區單會話消息不重復。
為了實現真正的去重,我們可以使用kafka事務特性
// 事務生產者
public class TransactionProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 創建配置對象Properties props = new Properties();// 添加配置屬性props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092"); // kafka集群地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // key序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化//唯一事務idprops.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id_1");// 創建生產者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);// 1、初始話事務producer.initTransactions();// 2、開啟事務producer.beginTransaction();// 3、發送消息,成功則提交,失敗回滾(放棄try{for(int i=0;i<10;i++) {// 創建消息對象ProducerRecord<String, String> record= new ProducerRecord<String, String>("topicA","message_"+i);// 發送同步消息producer.send(record).get();}// 提交事務producer.commitTransaction();}catch (Exception e){// 回滾事務producer.abortTransaction();// 拋出異常throw e;}// 關閉生產者producer.close();}}
六、消費組
消費規則:一個分區的數據可以被不同的消費組中的消費者消費,同一個消費組的不能消費同一個分區的數據。
consumer采用pull(拉)模式從broker中讀取數據。pull模式則可以根據consumer的消費能力以適當的速率消費消息。
pull模式不足之處是,如果kafka沒有數據,消費者可能會陷入循環中,一直返回空數據。針對這一點,Kafka的消費者在消費數據時會傳入一個時長參數timeout,如果當前沒有數據可供消費,consumer會等待一段時間之后再返回,這段時長即為timeout。
1、消費某主題下的數據
// 消費指定主題下的消息
public class TopicConsumer {public static void main(String[] args) {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); //設置消費組// 創建消費者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);// 訂閱主題List<String> topics = new ArrayList<>();topics.add("topicA");consumer.subscribe(topics);// 循環監聽主題while (true) {// 每一秒拉取一批數據ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.println(record.partition()+" "+ record.value());}}}}
2、指定分區消費
// 消費指定主題下指定分區的消息
public class TopicPartitionConsumer {public static void main(String[] args) {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.99:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); //設置消費組// 創建消費者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);// 訂閱主題分區List<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("topicA", 0));consumer.assign(topicPartitions);// 循環監聽主題while (true) {// 每一秒拉取一批數據ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.println(record.partition()+" "+ record.value());}}}}
3、在broker中有一個名為__consumer_offsets的主題,這里面記錄著每個消費者消費的偏移量,以確保消息被正確無遺漏的消費