(七)消息隊列-Kafka 序列化avro(傳遞)
客從遠方來,遺我雙鯉魚。呼兒烹鯉魚,中有尺素書。
——佚名《飲馬長城窟行》
本文已同步CSDN、掘金平臺、知乎等多個平臺,圖片依然保持最初發布的水印(如CSDN水印)。(以后屬于本人原創均以新建狀態在多個平臺分享發布)
前言
多年前,由于工作的性質,發現這系列沒有寫完,想了想,做人做事還是要有始有終。🤣實在是借口太多了,太不像話了…由于時間過得太久了,這篇開始,可能很多技術以最新或最近的幾個版本為主了。
問題背景
在Kafka中,生產者與消費者之間傳輸消息時,通常需要對數據進行序列化和反序列化。常見的序列化方式如JSON或String存在以下問題:
- 數據冗余:字段名重復存儲,占用帶寬;
- 兼容性差:新增或刪除字段時容易導致上下游解析失敗;
- 類型安全缺失:動態解析易引發運行時錯誤。
而Avro作為一種高效的二進制序列化框架,通過Schema定義數據結構,可實現緊湊存儲、動態兼容性和強類型校驗,成為Kafka生態中推薦的序列化方案27。
核心原理
-
Schema驅動
Avro要求所有數據必須與預定義的Schema文件(.avsc)匹配。Schema以JSON格式描述數據結構,例如:{"type": "record","name": "User","namespace": "com.example.avro","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"}] }
然后使用
avro-maven-plugin
生成 Java 類:<plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.0</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals></execution></executions> </plugin>
執行
mvn clean compile
后,com.example.avro.User
類會被自動生成。生產者與消費者需共享同一Schema,確保序列化與反序列化的一致性。
-
二進制編碼
Avro將數據轉換為緊湊的二進制格式,相比JSON減少約30%-50%的存儲與傳輸開銷。例如,整型字段直接以二進制存儲,無需字段名冗余7。 -
Schema Registry
為實現Schema動態管理,通常搭配Schema Registry(如Confluent或Apicurio)使用。其核心功能包括:- Schema版本控制與兼容性檢查;
- 通過唯一ID關聯消息與Schema,避免傳輸完整Schema帶來的性能損耗。
實現步驟
以下以Java代碼為例,展示Kafka集成Avro的配置方法:
1. 添加依賴
<dependencies><!-- Spring Kafka 依賴--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Avro 依賴 --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId></dependency><!-- Schema Registry 依賴 --><dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.2.1</version></dependency>
</dependencies>
運行 HTML
2. 配置生產者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081"); // Schema Registry地址Producer<String, GenericRecord> producer = new KafkaProducer<>(props);// 構建Avro消息
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");producer.send(new ProducerRecord<>("user-topic", user));------ SpringBoot框架 直接用配置application.yml 和生產者服務類--------------
spring:kafka:bootstrap-servers: localhost:9092properties:schema.registry.url: http://localhost:8081producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer@Service
public class UserProducer {private final KafkaTemplate<String, User> kafkaTemplate;@Value("${kafka.topic.user}")private String topic;public UserProducer(KafkaTemplate<String, User> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendUser(User user) {kafkaTemplate.send(topic, user.getId().toString(), user);}
}在 Spring Boot 啟動后,我們可以使用以下代碼發送一個 User 消息:
User user = User.newBuilder().setId(1).setName("Alice").build();
userProducer.sendUser(user);控制臺應該能夠看到消費者成功接收到 User 數據
3. 配置消費者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "avro-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-topic"));while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, GenericRecord> record : records) {System.out.println("Received: " + record.value().get("name"));}
}------ SpringBoot框架 直接用配置application.yml 和消費者服務類--------------
在 application.yml 中配置消費者參數:spring:kafka:consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:specific.avro.reader: true然后編寫 Kafka 消費者代碼:@Service
@KafkaListener(topics = "user_topic", groupId = "user_group")
public class UserConsumer {@KafkaHandlerpublic void consume(User user) {System.out.println("Received user: " + user.getName());}
}
常見問題與解決方案
- Schema兼容性錯誤
- 現象:生產者更新Schema后消費者無法解析舊數據。
- 解決:在Schema Registry中配置兼容性策略(如
BACKWARD
),允許新增字段并設置默認值7。
- ClassNotFoundException
- 現象:反序列化時提示Avro生成的類不存在。
- 解決:通過Maven插件
avro-maven-plugin
自動生成Java類,并確保生成路徑在編譯范圍內2。
- 性能瓶頸
- 現象:高吞吐場景下序列化延遲較高。
- 優化:復用
DatumWriter
和DatumReader
實例,避免重復初始化開銷7。
總結
Avro通過Schema定義與二進制編碼,為Kafka提供了高效、類型安全的序列化方案。結合Schema Registry可實現動態兼容性管理,適用于復雜業務場景下的數據演進需求。實踐中需注意Schema版本控制與性能調優,具體工具鏈配置可參考Confluent官方文檔27。
引用說明
- 代碼結構參考自SpringBoot RestTemplate配置方案,通過替換默認組件實現功能增強。
- Schema兼容性問題分析借鑒了MAT工具中內存對象關聯性的排查思路。
后續
下期預告,敬請關注:
(八)消息隊列-Kafka 生產者