Centos 7下使用C++使用Rdkafka庫實現生產者消費者

1. 了解 Kafka

Apache Kafka 是一個分布式流處理平臺,核心功能包括:

  • 發布/訂閱消息系統:解耦生產者和消費者

  • 分布式存儲:持久化、容錯的消息存儲

  • 流處理:實時處理數據流

核心概念

概念說明
BrokerKafka 集群中的單個服務器節點
Topic消息的邏輯分類(如:user_activity
PartitionTopic 的分區(并行處理單位),消息按順序存儲
Producer向 Topic 發布消息的客戶端
Consumer訂閱 Topic 并處理消息的客戶端
Consumer Group多個消費者協同消費同一 Topic(每個分區只被組內一個消費者消費)
Offset消息在分區中的唯一位置標識

2. 了解 rdkafka

rdkafka?是 Kafka 的 C/C++ 客戶端庫,提供:

  • 高性能生產/消費 API(支持 C/C++/Python 等)

  • 特性:

    • 異步/同步發送模式

    • 自動負載均衡

    • 消息壓縮(gzip, snappy, lz4)

    • SASL 認證

    • 精確一次語義(EOS)

  • 開源地址:edenhill/librdkafka

3. 代碼實現

以下是使用 librdkafka 的 C++ 接口操作 Kafka 的生產者和消費者完整實現:

生產者代碼 (producer.cpp)
#include <iostream>
#include <string>
#include <sstream>
#include <librdkafka/rdkafkacpp.h>class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:void dr_cb(RdKafka::Message &message) {if (message.err()) {std::cerr << "消息發送失敗: " << message.errstr() << std::endl;} else {std::cout << "消息發送成功: " << message.topic_name() << " [" << message.partition() << "] @ " << message.offset() << std::endl;}}
};int main() {// 1. 創建配置對象RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);std::string errstr;// 2. 設置配置參數if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置錯誤: " << errstr << std::endl;return 1;}// 設置消息確認模式 (all = 最高可靠性)if (conf->set("acks", "all", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置錯誤: " << errstr << std::endl;return 1;}// 3. 創建生產者實例ProducerDeliveryReportCb delivery_cb;if (conf->set("dr_cb", &delivery_cb, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置回調錯誤: " << errstr << std::endl;return 1;}RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "創建生產者失敗: " << errstr << std::endl;return 1;}delete conf;// 4. 創建Topic對象RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);RdKafka::Topic *topic = RdKafka::Topic::create(producer,"cpp_test_topic",tconf,errstr);if (!topic) {std::cerr << "創建Topic失敗: " << errstr << std::endl;delete tconf;return 1;}delete tconf;// 5. 生產消息for (int i = 0; i < 10; ++i) {std::string key = "key-" + std::to_string(i);std::string payload = "Message #" + std::to_string(i);// 發送消息RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA, // 自動分區分配RdKafka::Producer::RK_MSG_COPY,const_cast<char*>(payload.c_str()), payload.size(),const_cast<char*>(key.c_str()), key.size(),NULL);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "生產消息失敗: " << RdKafka::err2str(resp) << std::endl;} else {std::cout << "已發送: " << payload << std::endl;}// 處理事件隊列producer->poll(0);}// 6. 等待所有消息完成發送while (producer->outq_len() > 0) {std::cout << "等待發送隊列: " << producer->outq_len() << std::endl;producer->poll(100);}// 7. 清理資源delete topic;delete producer;return 0;
}
消費者代碼 (consumer.cpp)
#include <iostream>
#include <string>
#include <csignal>
#include <vector>
#include <librdkafka/rdkafkacpp.h>bool running = true;void stop(int sig) {running = false;
}class ConsumerEventCb : public RdKafka::EventCb {
public:void event_cb(RdKafka::Event &event) {switch (event.type()) {case RdKafka::Event::EVENT_ERROR:std::cerr << "錯誤: " << RdKafka::err2str(event.err()) << std::endl;break;case RdKafka::Event::EVENT_LOG:std::cout << "日志: " << event.str() << std::endl;break;default:std::cout << "事件: " << event.type() << ": " << event.str() << std::endl;break;}}
};int main() {// 注冊信號處理signal(SIGINT, stop);signal(SIGTERM, stop);// 1. 創建配置對象RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);std::string errstr;// 2. 設置配置參數if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置錯誤: " << errstr << std::endl;return 1;}// 設置消費組if (conf->set("group.id", "cpp_consumer_group", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置錯誤: " << errstr << std::endl;return 1;}// 從最早的消息開始消費if (conf->set("auto.offset.reset", "earliest", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置錯誤: " << errstr << std::endl;return 1;}// 3. 設置事件回調ConsumerEventCb event_cb;if (conf->set("event_cb", &event_cb, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "設置回調失敗: " << errstr << std::endl;return 1;}// 4. 創建消費者實例RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!consumer) {std::cerr << "創建消費者失敗: " << errstr << std::endl;return 1;}delete conf;// 5. 訂閱Topicstd::vector<std::string> topics;topics.push_back("cpp_test_topic");RdKafka::ErrorCode resp = consumer->subscribe(topics);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "訂閱失敗: " << RdKafka::err2str(resp) << std::endl;return 1;}// 6. 消費消息while (running) {// 等待消息 (1000ms超時)RdKafka::Message *msg = consumer->consume(1000);switch (msg->err()) {case RdKafka::ERR__TIMED_OUT:break;  // 超時繼續case RdKafka::ERR_NO_ERROR:// 成功消費到消息std::cout << "收到消息: "<< "主題: " << msg->topic_name() << " | 分區: [" << msg->partition() << "]"<< " | 偏移量: " << msg->offset() << std::endl;if (msg->key()) {std::cout << "鍵: " << *msg->key() << " => ";}std::cout << "值: " << static_cast<const char*>(msg->payload()) << std::endl;break;default:std::cerr << "消費錯誤: " << msg->errstr() << std::endl;break;}// 手動提交偏移量consumer->commitAsync(msg);delete msg;}// 7. 關閉消費者consumer->close();delete consumer;return 0;
}
編譯運行

# 編譯生產者
g++ -o producer producer.cpp -lrdkafka++ -lrdkafka -lpthread -lz -ldl# 編譯消費者
g++ -o consumer consumer.cpp -lrdkafka++ -lrdkafka -lpthread -lz -ldl

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/88461.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/88461.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/88461.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

UE5多人MOBA+GAS 13、添加死亡、復活邏輯以及布娃娃含物理資產的修改調整

文章目錄使用GE為角色添加定時的Tag控制死亡時間1、添加死亡Tag2、創建死亡GE&#xff0c;并完成相關配置3、在AbilitySystemComponent中監聽屬性的變化&#xff0c;調用GE來添加Tag到角色上4、在角色中監聽ASC傳入的Tag以及Tag的層數&#xff0c;來響應不同的函數添加死亡、復…

Jiasou TideFlow重塑AI SEO全鏈路自動化新標桿

引言 在Google日均處理85億次搜索請求的數字化浪潮中&#xff0c;傳統SEO工作流面臨三大致命瓶頸&#xff1a;人工拓詞效率低下、跨部門協作成本高企、數據監控鏈路斷裂。因此諸如Jiasou AI SEO這樣專門為AI SEO而生的Agent就應運而生了。 背景 Jiasou AIGC不僅僅可以批量生成…

CentOs 7 MySql8.0.23之前的版本主從復制

準備倆臺虛擬機并啟動倆臺虛擬機都開啟mysql后查看二進制日志是否開啟先登錄mysqlmysql -u root -r輸入sql命令show variables like %log_bin%;如果log_bin 的value為OFF則是沒有開啟&#xff0c;跟著下面步驟開啟二進制日志退出mysqlexitvim /etc/my.cnf在最底下添加log_binmy…

Leetcode 3607. Power Grid Maintenance

Leetcode 3607. Power Grid Maintenance 1. 解題思路2. 代碼實現 題目鏈接&#xff1a;3607. Power Grid Maintenance 1. 解題思路 這一題思路上首先是一個DSU的思路&#xff0c;將所有的連通網絡計算出來&#xff0c;并對每一個網絡的節點進行歸類。然后我們需要對每一個網…

開源 python 應用 開發(三)python語法介紹

最近有個項目需要做視覺自動化處理的工具&#xff0c;最后選用的軟件為python&#xff0c;剛好這個機會進行系統學習。短時間學習&#xff0c;需要快速開發&#xff0c;所以記錄要點步驟&#xff0c;防止忘記。 鏈接&#xff1a; 開源 python 應用 開發&#xff08;一&#xf…

1-Kafka介紹及常見應用場景

Kafka 介紹 Apache Kafka 是一個開源的 分布式流處理平臺&#xff0c;最初由 LinkedIn 開發&#xff0c;后捐贈給 Apache 軟件基金會。它被設計用于高吞吐量、低延遲、可水平擴展地處理實時數據流。官網地址是&#xff1a;https://kafka.apache.org/ 以下是 Kafka 的核心介紹…

CH9121T電路及配置詳解

目錄1. CH9121T簡介2. 原理圖及接口2.1 參考電路2.2 CH9121T評估板2.3 差分端口2.4 網口燈顯示2.5 晶振2.6 其他接口3. 使用手冊及說明3.1 配置介紹3.2 默認參數3.3 串口波特率3.4 配置指令3.5 應用示例1. CH9121T簡介 CH9121 是一款網絡串口透傳芯片&#xff0c;自帶 10/100M…

科研數據可視化核心技術:基于 AI 與 R 語言的熱圖、火山圖及網絡圖繪制實踐指南

在學術研究競爭日趨激烈的背景下&#xff0c;高質量的數據可視化已成為科研成果呈現與學術傳播的關鍵要素。據統計&#xff0c;超過 60% 的學術稿件拒稿原因與圖表質量存在直接關聯&#xff0c;而傳統繪圖工具在處理組學數據、復雜關聯數據時&#xff0c;普遍存在效率低下、規范…

Windows體驗macOS完整指南

一、虛擬機安裝macOS專業方案1. 環境準備階段硬件檢測&#xff1a;進入BIOS&#xff08;開機時按Del/F2鍵&#xff09;確認開啟VT-x/AMD-V虛擬化選項建議配置&#xff1a;i5十代以上CPU/16GB內存/256GB SSD軟件準備&#xff1a;官網下載VMware Workstation 17 Pro獲取Unlocker補…

【普及/提高?】洛谷P1577 ——切繩子

見&#xff1a;P1577 切繩子 - 洛谷 題目描述 有 N 條繩子&#xff0c;它們的長度分別為 Li?。如果從它們中切割出 K 條長度相同的繩子&#xff0c;這 K 條繩子每條最長能有多長&#xff1f;答案保留到小數點后 2 位(直接舍掉 2 位后的小數)。 輸入格式 第一行兩個整數 N …

imx6ull-裸機學習實驗16——I2C 實驗

目錄 前言 I2C簡介 基本特性?? I2C 協議 起始位 停止位 數據傳輸 應答信號 I2C 寫時序 I2C 讀時序 I.MX6U I2C 簡介 寄存器 地址寄存器I2Cx_IADR(x1~4) 分頻寄存器I2Cx_IFDR 控制寄存器I2Cx_I2CR 狀態寄存器I2Cx_I2SR 數據寄存器I2Cx_I2DR AP3216C 簡介 …

【TCP/IP】5. IP 協議

5. IP 協議5. IP 協議5.1 概述5.2 IP 數據報格式5.3 無連接數據報傳輸5.3.1 首部校驗5.3.2 數據分片與重組5.4 IP 數據報選項5.4.1 選項格式5.4.2 選項類型5.5 IP 模塊的結構本章要點5. IP 協議 5.1 概述 IP 協議是 TCP/IP 協議簇的核心協議&#xff0c;位于網絡層&#xff0…

Linux 服務器挖礦病毒深度處理與防護指南

在 Linux 服務器運維中&#xff0c;挖礦病毒是常見且危害較大的安全威脅。此類病毒通常會隱蔽占用大量 CPU 資源進行加密貨幣挖礦&#xff0c;導致服務器性能驟降、能耗激增&#xff0c;甚至被黑客遠程控制。本文將從病毒特征識別、應急處理流程、深度防護措施三個維度&#xf…

MySQL數據表設計 系統的營銷功能 優惠券、客戶使用優惠券的設計

系統的營銷功能營銷功能概述&#xff1a;系統的營銷功能主要是&#xff1a;市場活動管理、營銷自動化、銷售線索管理以及數據分析和報告等。?ToC?&#xff08;Consumer&#xff09;&#xff1a;面向個人消費者&#xff0c;滿足日常消費需求。?優惠券的種類&#xff1a;ToC的…

讓 3 個線程串行的幾種方式

1、通過join()的方式 子線程調用join()的時候&#xff0c;主線程等待子線程執行完再執行。如果讓多個線程順序執行的話&#xff0c;那么需要他們按順序調用start()。/*** - 第一個迭代&#xff08;i0&#xff09;&#xff1a;* 啟動線程t1 -> 然后調用t1.join()。* …

在 Vue 項目中關閉 ESLint 規則

在 Vue 2 項目中關閉 ESLint 規則有以下幾種方法&#xff0c;根據您的需求選擇合適的方式&#xff1a; 1. 完全禁用 ESLint 修改 vue.config.js&#xff08;推薦&#xff09; module.exports {// 關閉 ESLintlintOnSave: false }或修改 package.json {"scripts": {&…

電腦息屏工具,一鍵黑屏超方便

軟件介紹 今天為大家推薦一款實用的PC端屏幕管理工具——CloseDsp。這款"息屏小能手"能一鍵關閉顯示器&#xff0c;解決各種場景下的屏幕管理需求。 核心功能 CloseDsp最突出的特點是能瞬間關閉顯示器屏幕。只需點擊"關閉顯示器"按鈕&#xff0c;屏幕…

嵌入式調試LOG日志輸出(以STM32為例)

引言在嵌入式系統開發中&#xff0c;調試是貫穿整個生命周期的關鍵環節。與傳統PC端程序不同&#xff0c;嵌入式設備資源受限&#xff08;如內存、存儲、處理器性能&#xff09;&#xff0c;且運行環境復雜&#xff08;無顯示器、鍵盤&#xff09;&#xff0c;傳統的斷點調試或…

Zephyr的設備驅動模型

默認配置默認配置 boards/arm/nucleo_f401re/ ├── nucleo_f401re.dts ← 板卡設備樹主入口 ├── nucleo_f401re_defconfig ← 默認 Kconfig 配置 ├── board.cmake ← CMake 構建入口overlay1.新增加驅動需要修改對應板的設備樹文件&#xf…

Mysql字段沒有索引,通過where x = 3 for update是使用什么級別的鎖

沒有索引時&#xff0c;FOR UPDATE 會鎖住整個表 現在&#xff0c;你正在一本一本地翻看所有書&#xff0c;尋找“維修中”的書&#xff0c;并且你對管理員說&#xff1a;“在我清點和修改完之前&#xff0c;別人不能動這些書&#xff0c;也不能往這個范圍里加新書&#xff01;…