Kafka中的Topic

在Kafka中,Topic是消息的邏輯容器,用于組織和分類消息。本文將深入探討Kafka Topic的各個方面,包括創建、配置、生產者和消費者,以及一些實際應用中的示例代碼。

1. 介紹

在Kafka中,Topic是消息的邏輯通道,生產者將消息發布到Topic,而消費者從Topic訂閱消息。每個Topic可以有多個分區(Partitions),每個分區可以在不同的服務器上,以實現橫向擴展。

2. 創建和配置Topic

2.1 創建Topic

使用Kafka提供的命令行工具(kafka-topics.sh)或Kafka的API來創建Topic。下面是一個使用命令行工具創建Topic的示例:

bin/kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

這將創建一個名為my_topic的Topic,有3個分區,復制因子為2。

2.2 配置Topic

Kafka的Topic有各種配置選項,可以通過修改Topic的屬性來滿足不同的需求。例如,可以設置消息保留時間、清理策略等。以下是一個配置Topic屬性的示例:

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my_topic --alter --add-config max.message.bytes=1048576

這將修改my_topic的配置,將最大消息字節數設置為1 MB。

3. 生產者和消費者

3.1 生產者

生產者負責將消息發布到Topic。使用Kafka的Producer API,可以輕松地創建一個生產者。以下是一個簡單的Java示例代碼:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(properties);producer.send(new ProducerRecord<>("my_topic", "key1", "value1"));
producer.close();

3.2 消費者

消費者從Topic中讀取消息。Kafka的Consumer API提供了強大而靈活的方式來實現消費者。

以下是一個簡單的Java示例代碼:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "my_group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("my_topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());}
}

4. 實際應用示例

4.1 實時日志處理

在實時日志處理的場景中,Kafka的Topic可以按照日志類型進行劃分,每個Topic代表一種日志類型。這樣的設計可以使得系統更具可維護性、可擴展性,并且允許不同類型的日志通過獨立的消費者進行處理。以下是一個更詳細的示例代碼,展示如何在實時日志處理中使用Kafka Topic:

4.1.1 創建日志類型Topic

首先,為不同的日志類型創建各自的Topic。以錯誤日志和訪問日志為例:

# 創建錯誤日志Topic
bin/kafka-topics.sh --create --topic error_logs --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092# 創建訪問日志Topic
bin/kafka-topics.sh --create --topic access_logs --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
4.1.2 生產者發布日志消息

在應用中,生成錯誤日志和訪問日志的代碼可能如下:

// 錯誤日志生產者
Producer<String, String> errorLogProducer = new KafkaProducer<>(errorLogProperties);
errorLogProducer.send(new ProducerRecord<>("error_logs", "Error message"));// 訪問日志生產者
Producer<String, String> accessLogProducer = new KafkaProducer<>(accessLogProperties);
accessLogProducer.send(new ProducerRecord<>("access_logs", "Access log message"));
4.1.3 消費者實時處理日志

創建獨立的消費者來處理錯誤日志和訪問日志:

// 錯誤日志消費者
Consumer<String, String> errorLogConsumer = new KafkaConsumer<>(errorLogProperties);
errorLogConsumer.subscribe(Collections.singletonList("error_logs"));while (true) {ConsumerRecords<String, String> records = errorLogConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 處理錯誤日志System.out.printf("Error Log - Offset = %d, Value = %s%n", record.offset(), record.value());}
}// 訪問日志消費者
Consumer<String, String> accessLogConsumer = new KafkaConsumer<>(accessLogProperties);
accessLogConsumer.subscribe(Collections.singletonList("access_logs"));while (true) {ConsumerRecords<String, String> records = accessLogConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 處理訪問日志System.out.printf("Access Log - Offset = %d, Value = %s%n", record.offset(), record.value());}
}
4.1.4 實時監控和分析

消費者可以通過實時處理日志來進行監控和分析。例如,可以使用流處理框架(如Kafka Streams)對日志進行聚合、過濾或轉換。以下是一個簡化的示例:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> errorLogsStream = builder.stream("error_logs");
KStream<String, String> accessLogsStream = builder.stream("access_logs");// 在這里進行實時處理,如聚合、過濾等// 通過輸出Topic將處理結果發送到下游系統
errorLogsStream.to("processed_error_logs");
accessLogsStream.to("processed_access_logs");KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

通過這種設計,可以根據實際需要擴展不同類型的日志處理,同時確保系統具有高度的靈活性和可擴展性。在實際應用中,可能需要更詳細的配置和處理邏輯,以滿足具體的監控和分析需求。

4.2 事件溯源

在事件驅動的架構中,事件溯源是一種強大的方式,通過創建一個專門的Kafka Topic來記錄每個業務事件的發生,以便隨時追蹤和回溯整個系統的狀態。以下是一個基于Kafka的事件溯源的詳細示例代碼:

4.2.1 創建事件Topic

首先,為每個關鍵的業務事件創建一個專用的Kafka Topic,例如order_createdorder_shipped等:

# 創建訂單創建事件Topic
bin/kafka-topics.sh --create --topic order_created --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092# 創建訂單發貨事件Topic
bin/kafka-topics.sh --create --topic order_shipped --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
4.2.2 發布業務事件

在應用中,當業務事件發生時,將事件發布到相應的Topic。以下是一個訂單創建事件和訂單發貨事件的示例:

// 訂單創建事件生產者
Producer<String, String> orderCreatedProducer = new KafkaProducer<>(orderCreatedProperties);
orderCreatedProducer.send(new ProducerRecord<>("order_created", "order_id", "Order created - Order ID: 123"));// 訂單發貨事件生產者
Producer<String, String> orderShippedProducer = new KafkaProducer<>(orderShippedProperties);
orderShippedProducer.send(new ProducerRecord<>("order_shipped", "order_id", "Order shipped - Order ID: 123"));
4.2.3 事件溯源消費者

為了實現事件溯源,我們需要一個專用的消費者來訂閱所有的事件Topic,并將事件記錄到一個持久化存儲中(如數據庫、日志文件等):

// 事件溯源消費者
Consumer<String, String> eventTraceConsumer = new KafkaConsumer<>(eventTraceProperties);
eventTraceConsumer.subscribe(Arrays.asList("order_created", "order_shipped"));while (true) {ConsumerRecords<String, String> records = eventTraceConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 處理事件,可以將事件記錄到數據庫或日志文件中System.out.printf("Event Trace - Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());// 持久化處理邏輯}
}
4.2.4 事件回溯和分析

通過上述設置,可以在任何時候回溯系統中的每個事件,了解事件的發生時間、順序和內容。通過將事件存儲到持久化存儲中,可以建立一個事件溯源系統,支持系統狀態的分析、回滾和審計。

還可以使用流處理來實時分析事件,例如計算每個訂單的處理時間、統計每個事件類型的發生頻率等。以下是一個簡單的流處理示例:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> eventStream = builder.stream(Arrays.asList("order_created", "order_shipped"));// 在這里進行實時處理,如計算處理時間、統計頻率等// 通過輸出Topic將處理結果發送到下游系統
eventStream.to("processed_events");KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

通過這種方式,可以在事件溯源系統中實現強大的監控、分析和管理功能,提高系統的可觀察性和可維護性。

5. 消息處理語義

Kafka支持不同的消息處理語義,包括最多一次、最少一次和正好一次。這些語義由消費者的配置決定,可以根據應用的要求進行選擇。以下是一個使用最多一次語義的消費者示例代碼:

properties.put("enable.auto.commit", "false"); // 禁用自動提交偏移量
properties.put("auto.offset.reset", "earliest"); // 設置偏移量重置策略為最早Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("my_topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 處理消息System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());}consumer.commitSync(); // 手動提交偏移量}
} finally {consumer.close();
}

6. 安全性和權限控制

Kafka提供了安全性特性,包括SSL加密、SASL認證等。在生產環境中,確保適當的安全性設置是至關重要的。

以下是一個使用SSL連接的生產者示例:

properties.put("security.protocol", "SSL");
properties.put("ssl.truststore.location", "/path/to/truststore");
properties.put("ssl.truststore.password", "truststore_password");Producer<String, String> producer = new KafkaProducer<>(properties);

7. 故障容忍和可伸縮性

7.1 多節點分布和分區

在Kafka中,分布式的設計允許數據分布在多個節點上,這提供了高度的可伸縮性。每個Topic可以分成多個分區,而這些分區可以分布在不同的服務器上。這種分布式設計使得Kafka可以輕松地處理大規模數據,并實現水平擴展。

7.1.1 增加分區數

要增加Topic的分區數,可以使用以下命令:

bin/kafka-topics.sh --alter --topic my_topic --partitions 5 --bootstrap-server localhost:9092

這將把my_topic的分區數增加到5,從而提高系統的吞吐量和可伸縮性。

7.2 復制因子

Kafka通過數據的復制來實現容錯性。每個分區可以有多個副本,這些副本分布在不同的節點上。在節點發生故障時,其他副本可以繼續提供服務。

7.2.1 增加復制因子

要增加Topic的復制因子,可以使用以下命令:

bin/kafka-topics.sh --alter --topic my_topic --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092

這將把my_topic的復制因子增加到3,確保每個分區有3個副本。增加復制因子提高了系統的容錯性,因為每個分區都有多個副本,即使一個節點發生故障,其他節點上的副本仍然可用。

7.3 節點故障處理

Kafka能夠處理節點故障,確保系統的可用性。當一個節點發生故障時,Kafka會自動將該節點上的分區重新分配到其他可用節點上,以保持分區的復制因子。

7.3.1 節點故障模擬

為了模擬節點故障,你可以通過停止一個Kafka broker進程來模擬。Kafka會自動感知到該節點的故障,并進行分區的重新分配。

# 停止一個Kafka broker進程
bin/kafka-server-stop.sh config/server-1.properties

7.4 性能調優

在實際應用中,通過監控系統的性能指標,你可以調整Kafka的配置以滿足不同的性能需求。例如,調整日志刷寫頻率、調整內存和磁盤的配置等,都可以對系統的性能產生影響。

總結

Kafka的Topic是構建實時流數據處理系統的核心組件之一。通過深入了解Topic的創建、配置、生產者和消費者,以及實際應用中的示例代碼,可以更好地理解和應用Kafka。在實際項目中,根據具體需求和場景進行靈活配置,以確保系統的可靠性、性能和安全性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/206703.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/206703.shtml
英文地址,請注明出處:http://en.pswp.cn/news/206703.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【華為數據之道學習筆記】3-2 基礎數據治理

基礎數據用于對其他數據進行分類&#xff0c;在業界也稱作參考數據。基礎數據通常是靜態的&#xff08;如國家、幣種&#xff09;&#xff0c;一般在業務事件發生之前就已經預先定義。它的可選值數量有限&#xff0c;可以用作業務或IT的開關和判斷條件。當基礎數據的取值發生變…

GSAP動畫庫,探究蘋果官網頁面滾動動畫是如何實現的

GSAP動畫庫&#xff0c;探究蘋果官網頁面滾動動畫是如何實現的 前言 每次瀏覽蘋果官網時都在好奇&#xff0c;當我們向下滾動頁面時一個個文字或圖片總能緩緩浮現&#xff0c;往上滾動時又能慢慢收起來&#xff0c;這就究竟是如是實現的呢。在查閱一些資料時發現了Scrollmagi…

基于OpenCV+CNN+IOT+微信小程序智能果實采摘指導系統——深度學習算法應用(含pytho、JS工程源碼)+數據集+模型(五)

目錄 前言總體設計系統整體結構圖系統流程圖 運行環境Python環境TensorFlow 環境Jupyter Notebook環境Pycharm 環境微信開發者工具OneNET云平臺 模塊實現1. 數據預處理2. 創建模型并編譯3. 模型訓練及保存4. 上傳結果5. 小程序開發1&#xff09;查詢圖片2&#xff09;查詢識別結…

paypal貝寶怎么綁卡支付

一、PayPal是什么 PayPal是一個很多國家地區通用的支付渠道&#xff0c;我們可以把它理解為一項在線服務&#xff0c;相當于美國版的支付寶。你可以通過PayPal進行匯款和收款&#xff0c;相比傳統的電匯和西聯那類的匯款方式&#xff0c;PayPal更加簡單和容易&#xff0c;被很…

利用proteus實現串口助手和arduino Mega 2560的串口通信

本例用到的proteus版本為8.13&#xff0c;ardunio IDE版本為2.2.1&#xff0c;虛擬串口vspd版本為7.2&#xff0c;串口助手SSCOM V5.13.1。軟件的下載安裝有很多教程&#xff0c;大家可以自行搜索&#xff0c;本文只介紹如何利用這4種軟件在proteus中實現arduino Mega 2560的串…

Day45| 爬樓梯 (進階)Leetcode 322. 零錢兌換 Leetcode 279. 完全平方數

爬樓梯 &#xff08;進階&#xff09; 題目鏈接 爬樓梯&#xff08;進階版&#xff09; 本題目屬于排列中的背包問題&#xff0c;所以先遍歷背包&#xff0c;后遍歷物品&#xff0c;剩下的就是完全背包的板子了&#xff0c;下面直接上代碼&#xff1a; #include<iostream…

刷題記錄--算法--簡單

第一題 2582. 遞枕頭 已解答 簡單 相關標簽 相關企業 提示 n 個人站成一排&#xff0c;按從 1 到 n 編號。 最初&#xff0c;排在隊首的第一個人拿著一個枕頭。每秒鐘&#xff0c;拿著枕頭的人會將枕頭傳遞給隊伍中的下一個人。一旦枕頭到達隊首或隊尾&#xff0c;傳遞…

高防IP是什么?有什么優勢?

隨著互聯網的普及和快速發展&#xff0c;網絡安全問題日益突出。在眾多安全問題中&#xff0c;DDOS攻擊是一種常見的攻擊手段&#xff0c;它通過發送大量的無效或低效請求&#xff0c;使得目標服務器無法響應正常用戶的請求&#xff0c;從而造成服務不可用的情況。為了解決這個…

部署zabbix

源碼下載地址&#xff1a; Download Zabbix sources nginx: download 防火墻和selinux都需要關閉 1、部署監控服務器 1&#xff09;安裝LNMP環境 Zabbix監控管理控制臺需要通過Web頁面展示出來&#xff0c;并且還需要使用MySQL來存儲數據&#xff0c;因此需要先為Zabbix準備基礎…

vue的el

類型&#xff1a;string | Element 限制&#xff1a; 只在用 new 創建實例時生效。 詳細&#xff1a; 提供一個在頁面上已存在的 DOM 元素作為 Vue 實例的掛載目標。可以是 CSS 選擇器&#xff0c;也可以是一個 HTMLElement 實例。 在實例掛載之后&#xff0c;元素可以用 vm.…

Java創建線程有哪幾種方式?

Java創建線程有哪幾種方式&#xff1f; 在 Java 中&#xff0c;創建線程有多種方式&#xff0c;主要包括使用 Thread 類和實現 Runnable 接口。以下是幾種常見的創建線程的方式&#xff1a; 繼承 Thread 類&#xff1a; 通過繼承 Thread 類并重寫 run 方法來創建線程。 class …

如何使用eXtplorer+cpolar內網穿透搭建個人云存儲實現公網訪問

文章目錄 1. 前言2. eXtplorer網站搭建2.1 eXtplorer下載和安裝2.2 eXtplorer網頁測試2.3 cpolar的安裝和注冊 3.本地網頁發布3.1.Cpolar云端設置3.2.Cpolar本地設置 4.公網訪問測試5.結語 1. 前言 通過互聯網傳輸文件&#xff0c;是互聯網最重要的應用之一&#xff0c;無論是…

關于互聯網安全方面需要了解的一些知識

關于互聯網安全方面需要了解的一些知識 文章目錄 關于互聯網安全方面需要了解的一些知識一、資產掃描二、漏洞掃描三、滲透測試四、POC五、Exp六、代碼規范七、函數命名八、注釋怎么寫 一、資產掃描 資產掃描是一種通過掃描網絡或系統中所有設備、應用程序和服務&#xff0c;識…

PHP escapeshellarg()+escapeshellcmd()繞過

文章目錄 函數利用escapeshellarg()函數escapeshellcmd()函數 exp執行原理攻擊面例題 [BUUCTF 2018]Online Tool例題 [網鼎杯 2020 朱雀組]Nmap 函數利用 escapeshellarg()函數 單引號 ()&#xff1a;轉義為 \。 雙引號 (")&#xff1a;轉義為 \"。 反斜杠 (\)&…

HTTP不同場景下的通信過程和用戶上網認證過程分析

目錄 HTTP不同場景的通信過程 HTTP正常交互過程 HTTP透明加速傳輸過程 HTTP代理服務器場景下交互過程 通過AC對上網用戶不同場景的認證過程 AC上網認證正常交互過程 通過Cookie實現免認證交互過程 代理服務器場景下HTTP密碼認證交互過程 HTTP不同場景的通信過程 HTTP、…

專業130+總分400+云南大學通信847專業基礎綜考研經驗(原專業課827)

今年專業130總分400云南大學通信上岸&#xff0c;整體考研感覺還是比較滿意&#xff0c;期間也付出了很多心血&#xff0c;走過彎路&#xff0c;下面分享一下這一年考研得失&#xff0c;希望大家可以從中有所借鑒。 先說明我在考研報名前更換成云南大學的理由&#xff1a;&…

谷歌正式發布最強 AI 模型 Gemini

2023年12月6日&#xff0c;谷歌公司宣布推出其被認為是規模最大、功能最強大的人工智能模型 Gemini。 Gemini將分為三個不同的套件&#xff1a;Gemini Ultra、Gemini Pro和Gemini Nano。 Gemini Ultra被認為具備最強大的能力&#xff0c;Gemini Pro則可擴展至多任務&#x…

xilinx原語詳解及仿真——ODDR

ODDR位于OLOGIC中&#xff0c;可以把單沿傳輸的數據轉換為雙沿傳輸的數據&#xff0c; 在講解ODDR功能之前&#xff0c;需要先了解OLOGIC的結構及功能。 1、OLOGIC OLOGIC塊位于IOB的內側&#xff0c;FPGA內部信號想要輸出到管腳&#xff0c;都必須經過OLOGIC。OLOGIC資源的類…

CleanMyMac4.16中文最新版本下載

當很多人還在為電腦運行緩慢、工作問題不能快速得到解決而煩惱的時候&#xff0c;我已經使用過了多款系統清理工具&#xff0c;并找到了最適合我的那一款。我的電腦是超耐用的Mac book&#xff0c;接下來給大家介紹三種在眾多蘋果電腦清理軟件的排名較高的軟件。 一、Maintena…

【ET8】0.ET8入門-ET框架介紹

ET8 新特性 多線程多進程架構,架構更加靈活強大&#xff0c;多線程設計詳細內容請看多線程設計課程抽象出纖程(Fiber)的概念&#xff0c;類似erlang的進程&#xff0c;非常輕松的創建多個纖程&#xff0c;利用多核&#xff0c;仍然是單線程開發的體驗纖程調度: 主線程&#xf…