(七)消息隊列-Kafka 序列化avro(傳遞)

(七)消息隊列-Kafka 序列化avro(傳遞)

客從遠方來,遺我雙鯉魚。呼兒烹鯉魚,中有尺素書。
——佚名《飲馬長城窟行》

在這里插入圖片描述

本文已同步CSDN、掘金平臺、知乎等多個平臺,圖片依然保持最初發布的水印(如CSDN水印)。(以后屬于本人原創均以新建狀態在多個平臺分享發布)

前言

多年前,由于工作的性質,發現這系列沒有寫完,想了想,做人做事還是要有始有終。🤣實在是借口太多了,太不像話了…由于時間過得太久了,這篇開始,可能很多技術以最新或最近的幾個版本為主了。

問題背景

在Kafka中,生產者與消費者之間傳輸消息時,通常需要對數據進行序列化和反序列化。常見的序列化方式如JSON或String存在以下問題:

  1. 數據冗余:字段名重復存儲,占用帶寬;
  2. 兼容性差:新增或刪除字段時容易導致上下游解析失敗;
  3. 類型安全缺失:動態解析易引發運行時錯誤。

而Avro作為一種高效的二進制序列化框架,通過Schema定義數據結構,可實現緊湊存儲動態兼容性強類型校驗,成為Kafka生態中推薦的序列化方案27。


核心原理
  1. Schema驅動
    Avro要求所有數據必須與預定義的Schema文件(.avsc)匹配。Schema以JSON格式描述數據結構,例如:

    {"type": "record","name": "User","namespace": "com.example.avro","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"}]
    }
    

    然后使用 avro-maven-plugin 生成 Java 類:

    <plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.11.0</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals></execution></executions>
    </plugin>
    

    執行 mvn clean compile 后,com.example.avro.User 類會被自動生成。

    生產者與消費者需共享同一Schema,確保序列化與反序列化的一致性。

  2. 二進制編碼
    Avro將數據轉換為緊湊的二進制格式,相比JSON減少約30%-50%的存儲與傳輸開銷。例如,整型字段直接以二進制存儲,無需字段名冗余7。

  3. Schema Registry
    為實現Schema動態管理,通常搭配Schema Registry(如Confluent或Apicurio)使用。其核心功能包括:

    • Schema版本控制與兼容性檢查;
    • 通過唯一ID關聯消息與Schema,避免傳輸完整Schema帶來的性能損耗。

實現步驟

以下以Java代碼為例,展示Kafka集成Avro的配置方法:

1. 添加依賴
<dependencies><!-- Spring Kafka 依賴--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Avro 依賴 --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId></dependency><!-- Schema Registry 依賴 --><dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.2.1</version></dependency>
</dependencies>

運行 HTML

2. 配置生產者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081"); // Schema Registry地址Producer<String, GenericRecord> producer = new KafkaProducer<>(props);// 構建Avro消息
GenericRecord user = new GenericData.Record(schema);
user.put("id", 1);
user.put("name", "Alice");producer.send(new ProducerRecord<>("user-topic", user));------ SpringBoot框架 直接用配置application.yml 和生產者服務類--------------
spring:kafka:bootstrap-servers: localhost:9092properties:schema.registry.url: http://localhost:8081producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer@Service
public class UserProducer {private final KafkaTemplate<String, User> kafkaTemplate;@Value("${kafka.topic.user}")private String topic;public UserProducer(KafkaTemplate<String, User> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendUser(User user) {kafkaTemplate.send(topic, user.getId().toString(), user);}
}在 Spring Boot 啟動后,我們可以使用以下代碼發送一個 User 消息:
User user = User.newBuilder().setId(1).setName("Alice").build();
userProducer.sendUser(user);控制臺應該能夠看到消費者成功接收到 User 數據
3. 配置消費者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "avro-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-topic"));while (true) {ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, GenericRecord> record : records) {System.out.println("Received: " + record.value().get("name"));}
}------ SpringBoot框架 直接用配置application.yml 和消費者服務類--------------
在 application.yml 中配置消費者參數:spring:kafka:consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:specific.avro.reader: true然后編寫 Kafka 消費者代碼:@Service
@KafkaListener(topics = "user_topic", groupId = "user_group")
public class UserConsumer {@KafkaHandlerpublic void consume(User user) {System.out.println("Received user: " + user.getName());}
}

常見問題與解決方案
  1. Schema兼容性錯誤
    • 現象:生產者更新Schema后消費者無法解析舊數據。
    • 解決:在Schema Registry中配置兼容性策略(如BACKWARD),允許新增字段并設置默認值7。
  2. ClassNotFoundException
    • 現象:反序列化時提示Avro生成的類不存在。
    • 解決:通過Maven插件avro-maven-plugin自動生成Java類,并確保生成路徑在編譯范圍內2。
  3. 性能瓶頸
    • 現象:高吞吐場景下序列化延遲較高。
    • 優化:復用DatumWriterDatumReader實例,避免重復初始化開銷7。

總結

Avro通過Schema定義與二進制編碼,為Kafka提供了高效、類型安全的序列化方案。結合Schema Registry可實現動態兼容性管理,適用于復雜業務場景下的數據演進需求。實踐中需注意Schema版本控制與性能調優,具體工具鏈配置可參考Confluent官方文檔27。


引用說明

  • 代碼結構參考自SpringBoot RestTemplate配置方案,通過替換默認組件實現功能增強。
  • Schema兼容性問題分析借鑒了MAT工具中內存對象關聯性的排查思路。

后續

下期預告,敬請關注:
(八)消息隊列-Kafka 生產者

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/70953.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/70953.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/70953.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

PXE批量網絡裝機與Kickstart自動化安裝工具

目錄 一、系統裝機的原理 1.1、系統裝機方式 1.2、系統安裝過程 二、PXE批量網絡裝機 2.1、PXE實現原理 2.2、搭建PXE實際案例 2.2.1、安裝必要軟件 2.2.2、搭建DHCP服務器 2.2.3、搭建TFTP服務器 2.2.4、掛載鏡像并拷貝引導文件到tftp服務啟動引導文件夾下 2.2.5、編…

【全棧開發】從0開始搭建一個圖書管理系統【一】框架搭建

【全棧開發】從0開始搭建一個圖書管理系統【一】框架搭建 前言 現在流行降本增笑&#xff0c;也就是不但每個人都要有事干不能閑著&#xff0c;更重要的是每個人都要通過報功的方式做到平日的各項工作異常飽和&#xff0c;實現1.5人的支出干2人的活計。單純的數據庫開發【膚淺…

部署Flink1.20.1

1、設置環境變量 export JAVA_HOME/cluster/jdk export CLASSPATH.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jarp #export HIVE_HOME/cluster/hive export MYSQL_HOME/cluster/mysql export HADOOP_HOME/cluster/hadoop3 export HADOOP_CONF_DIR$HADOOP_HOME/etc/hadoop …

【超詳細】神經網絡的可視化解釋

《------往期經典推薦------》 一、AI應用軟件開發實戰專欄【鏈接】 項目名稱項目名稱1.【人臉識別與管理系統開發】2.【車牌識別與自動收費管理系統開發】3.【手勢識別系統開發】4.【人臉面部活體檢測系統開發】5.【圖片風格快速遷移軟件開發】6.【人臉表表情識別系統】7.【…

深入了解 Python 中的 MRO(方法解析順序)

文章目錄 深入了解 Python 中的 MRO&#xff08;方法解析順序&#xff09;什么是 MRO&#xff1f;如何計算 MRO&#xff1f;C3 算法的合并規則C3 算法的合并步驟示例&#xff1a;合并過程解析 MRO 解析失敗的場景使用 mro() 方法查看 MRO示例 1&#xff1a;基本用法 菱形繼承與…

數字化賦能:制造業如何突破低效生產的瓶頸?

隨著全球經濟的快速發展與市場需求的變化&#xff0c;制造業面臨著前所未有的壓力與挑戰。生產效率、資源管理、品質控制、成本控制等方面的問題日益突出&#xff0c;尤其是低效生產成為了許多制造企業亟待解決的瓶頸。在這種背景下&#xff0c;數字化轉型成為提升制造業效率的…

Element-Plus,使用 El-form中 的 scroll-to-error 沒有效果問題記錄

因業務需要表單組件中嵌套著表格列表&#xff0c;內容比較多&#xff1b; 所以需要表單校驗不通過時&#xff0c;自動定位到不通過的節點&#xff1b; 但發現這個像是沒有起到效果一樣&#xff0c;后面就是排查的思路了&#xff1a; 容器高度問題&#xff1a;如果表單容器的高度…

基于Javase的停車場收費管理系統

基于Javase的停車場收費管理系統 停車場管理系統開發文檔 項目概述 1.1 項目背景 隨著現代化城市的不斷發展&#xff0c;車輛數量不斷增加&#xff0c;停車難問題也日益突出。為了更好地管理停車場資 源&#xff0c;提升停車效率&#xff0c;需要一個基于Java SE的停車場管理…

網絡協議 HTTP、HTTPS、HTTP/1.1、HTTP/2 對比分析

1. 基本定義 HTTP&#xff08;HyperText Transfer Protocol&#xff09; 應用層協議&#xff0c;用于客戶端與服務器之間的數據傳輸&#xff08;默認端口 80&#xff09;。 HTTP/1.0&#xff1a;早期版本&#xff0c;每個請求需單獨建立 TCP 連接&#xff0c;效率低。HTTP/1.1&…

DeepSeek掘金——調用DeepSeek API接口 實現智能數據挖掘與分析

調用DeepSeek API接口:實現智能數據挖掘與分析 在當今數據驅動的時代,企業和開發者越來越依賴高效的數據挖掘與分析工具來獲取有價值的洞察。DeepSeek作為一款先進的智能數據挖掘平臺,提供了強大的API接口,幫助用戶輕松集成其功能到自己的應用中。本文將詳細介紹如何調用D…

LabVIEW同步數據采集功能

VI通過使用數據采集&#xff08;DAQ&#xff09;硬件系統&#xff0c;進行多通道同步采集&#xff0c;實時獲取模擬信號數據。它利用外部時鐘信號觸發數據采集&#xff0c;支持連續采樣模式&#xff0c;并將采集到的數據實時顯示在波形圖上&#xff0c;方便用戶進行數據監控和分…

SpringDataJPA使用deleteAllInBatch方法邏輯刪除失效

概述 在使用Spring Boot JPA時&#xff0c;執行批量刪除操作時&#xff0c;遇到邏輯刪除失效的問題。具體而言&#xff0c;當使用deleteAllInBatch方法時&#xff0c;數據會被物理刪除&#xff0c;而不是進行邏輯刪除&#xff1b;但是當使用deleteAll時&#xff0c;邏輯刪除操…

【Docker】使用Docker搭建-MySQL數據庫服務

零、更換Docker鏡像源 因為國內現在封鎖了Docker默認拉取鏡像的站點&#xff08;DockerHub&#xff09;&#xff0c;而且國內大部分Docker鏡像站已全部下線&#xff0c;導致現在很多朋友在拉取鏡像的時候會出現無法拉取的現象&#xff0c;這時候就需要進行更換Docker鏡像源。 可…

人類駕駛的人腦兩種判斷模式(反射和預判)-->自動駕駛兩種AI模式

一種模式是直覺模式&#xff0c;判斷基于條件反射&#xff0c;視覺感知 觸發到 直接條件反射&#xff08;從經歷中沉淀形成的神經信息閉環&#xff09;&#xff0c;類似現在自動駕駛技術的傳統AI模式&#xff1b;另一種是圖式推理模式&#xff0c;判斷是基于預判&#xff0c;人…

3.17 AI Agent 場景革命:解鎖企業級應用的 15 個黃金賽道

AI Agent 場景革命:解鎖企業級應用的 15 個黃金賽道 關鍵詞:AI Agent 應用場景, 企業級智能體案例, 多模態 Agent 實現, 工具鏈自動化, 智能決策系統 1. 企業級 Agent 場景分類圖譜 #mermaid-svg-UjUmmToEKigfdlFf {font-family:"trebuchet ms",verdana,arial,san…

Docker基礎-常見命令

docker images -查看所有的本地鏡像。 docker pull -把遠端鏡像拉取到本地。 docker rmi -刪除鏡像。 docker push -推到鏡像倉庫。 docker run -創建并運行容器&#xff08;自動化&#xff0c;如果發現鏡像不存在會先去拉取&#xff0c; 拉取完了以后再去自動創建容器&am…

TinyEngine v2.2版本發布:支持頁面嵌套路由,提升多層級路由管理能力開發分支調整

2025年春節假期已過&#xff0c;大家都帶著慢慢的活力回到了工作崗位。為了讓大家在新的一年繼續感受到 Tiny Engine 的成長與變化&#xff0c;我們很高興地宣布&#xff1a;TinyEngine v2.2版本正式發布&#xff01;本次更新帶來了重要的功能增強------頁面支持嵌套路由&#…

LSTM長短期記憶網絡-原理分析

1 簡介 概念 LSTM&#xff08;Long Short-Term Memory&#xff09;也稱為長短期記憶網絡&#xff0c;是一種改進的循環神經網絡&#xff08;RNN&#xff09;&#xff0c;專門設計用于解決傳統RNN的梯度消失問題和長程依賴問題。LSTM通過引入門機制和細胞狀態&#xff0c;能夠更…

SQL Server 中遇到的常見問題集

SQL Server 中遇到的常見問題集 問題一&#xff1a; 無法創建關系“FK_Research_Teacher”。 ALTER TABLE 語句與 FOREIGN KEY 約束"FK_Research_Teacher"沖突 解決方法&#xff1a; 外鍵表中的數據主鍵表中是有的&#xff0c;并且不能刪除主外鍵表中數據 1&…

神經網絡中感受野的概念和作用

在神經網絡中&#xff0c;感受野&#xff08;Receptive Field&#xff09;是指某個神經單元&#xff08;神經元或者卷積核&#xff09;關注的輸入特征區域的大小。它決定了神經網絡對輸入數據的特定區域的感知能力。 感受野的形成過程 在卷積神經網絡中&#xff0c;卷積層是感受…