1. 了解 Kafka
Apache Kafka 是一個分布式流處理平臺,核心功能包括:
-
發布/訂閱消息系統:解耦生產者和消費者
-
分布式存儲:持久化、容錯的消息存儲
-
流處理:實時處理數據流
核心概念:
概念 | 說明 |
---|---|
Broker | Kafka 集群中的單個服務器節點 |
Topic | 消息的邏輯分類(如:user_activity ) |
Partition | Topic 的分區(并行處理單位),消息按順序存儲 |
Producer | 向 Topic 發布消息的客戶端 |
Consumer | 訂閱 Topic 并處理消息的客戶端 |
Consumer Group | 多個消費者協同消費同一 Topic(每個分區只被組內一個消費者消費) |
Offset | 消息在分區中的唯一位置標識 |
2. 了解 rdkafka
rdkafka?是 Kafka 的 C/C++ 客戶端庫,提供:
-
高性能生產/消費 API(支持 C/C++/Python 等)
-
特性:
-
異步/同步發送模式
-
自動負載均衡
-
消息壓縮(gzip, snappy, lz4)
-
SASL 認證
-
精確一次語義(EOS)
-
-
開源地址:edenhill/librdkafka
3. 代碼實現
以下是使用 librdkafka 的 C++ 接口操作 Kafka 的生產者和消費者完整實現:
生產者代碼 (producer.cpp)
#include <iostream>
#include <string>
#include <sstream>
#include <librdkafka/rdkafkacpp.h>class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:void dr_cb(RdKafka::Message &message) {if (message.err()) {std::cerr << "消息發送失敗: " << message.errstr() << std::endl;} else {std::cout << "消息發送成功: " << message.topic_name() << " [" << message.partition() << "] @ " << message.offset() << std::endl;}}
};int main() {// 1. 創建配置對象RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);std::string errstr;// 2. 設置配置參數if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置錯誤: " << errstr << std::endl;return 1;}// 設置消息確認模式 (all = 最高可靠性)if (conf->set("acks", "all", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置錯誤: " << errstr << std::endl;return 1;}// 3. 創建生產者實例ProducerDeliveryReportCb delivery_cb;if (conf->set("dr_cb", &delivery_cb, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置回調錯誤: " << errstr << std::endl;return 1;}RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "創建生產者失敗: " << errstr << std::endl;return 1;}delete conf;// 4. 創建Topic對象RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);RdKafka::Topic *topic = RdKafka::Topic::create(producer,"cpp_test_topic",tconf,errstr);if (!topic) {std::cerr << "創建Topic失敗: " << errstr << std::endl;delete tconf;return 1;}delete tconf;// 5. 生產消息for (int i = 0; i < 10; ++i) {std::string key = "key-" + std::to_string(i);std::string payload = "Message #" + std::to_string(i);// 發送消息RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA, // 自動分區分配RdKafka::Producer::RK_MSG_COPY,const_cast<char*>(payload.c_str()), payload.size(),const_cast<char*>(key.c_str()), key.size(),NULL);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "生產消息失敗: " << RdKafka::err2str(resp) << std::endl;} else {std::cout << "已發送: " << payload << std::endl;}// 處理事件隊列producer->poll(0);}// 6. 等待所有消息完成發送while (producer->outq_len() > 0) {std::cout << "等待發送隊列: " << producer->outq_len() << std::endl;producer->poll(100);}// 7. 清理資源delete topic;delete producer;return 0;
}
消費者代碼 (consumer.cpp)
#include <iostream>
#include <string>
#include <csignal>
#include <vector>
#include <librdkafka/rdkafkacpp.h>bool running = true;void stop(int sig) {running = false;
}class ConsumerEventCb : public RdKafka::EventCb {
public:void event_cb(RdKafka::Event &event) {switch (event.type()) {case RdKafka::Event::EVENT_ERROR:std::cerr << "錯誤: " << RdKafka::err2str(event.err()) << std::endl;break;case RdKafka::Event::EVENT_LOG:std::cout << "日志: " << event.str() << std::endl;break;default:std::cout << "事件: " << event.type() << ": " << event.str() << std::endl;break;}}
};int main() {// 注冊信號處理signal(SIGINT, stop);signal(SIGTERM, stop);// 1. 創建配置對象RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);std::string errstr;// 2. 設置配置參數if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置錯誤: " << errstr << std::endl;return 1;}// 設置消費組if (conf->set("group.id", "cpp_consumer_group", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置錯誤: " << errstr << std::endl;return 1;}// 從最早的消息開始消費if (conf->set("auto.offset.reset", "earliest", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "配置錯誤: " << errstr << std::endl;return 1;}// 3. 設置事件回調ConsumerEventCb event_cb;if (conf->set("event_cb", &event_cb, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "設置回調失敗: " << errstr << std::endl;return 1;}// 4. 創建消費者實例RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!consumer) {std::cerr << "創建消費者失敗: " << errstr << std::endl;return 1;}delete conf;// 5. 訂閱Topicstd::vector<std::string> topics;topics.push_back("cpp_test_topic");RdKafka::ErrorCode resp = consumer->subscribe(topics);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "訂閱失敗: " << RdKafka::err2str(resp) << std::endl;return 1;}// 6. 消費消息while (running) {// 等待消息 (1000ms超時)RdKafka::Message *msg = consumer->consume(1000);switch (msg->err()) {case RdKafka::ERR__TIMED_OUT:break; // 超時繼續case RdKafka::ERR_NO_ERROR:// 成功消費到消息std::cout << "收到消息: "<< "主題: " << msg->topic_name() << " | 分區: [" << msg->partition() << "]"<< " | 偏移量: " << msg->offset() << std::endl;if (msg->key()) {std::cout << "鍵: " << *msg->key() << " => ";}std::cout << "值: " << static_cast<const char*>(msg->payload()) << std::endl;break;default:std::cerr << "消費錯誤: " << msg->errstr() << std::endl;break;}// 手動提交偏移量consumer->commitAsync(msg);delete msg;}// 7. 關閉消費者consumer->close();delete consumer;return 0;
}
編譯運行
# 編譯生產者
g++ -o producer producer.cpp -lrdkafka++ -lrdkafka -lpthread -lz -ldl# 編譯消費者
g++ -o consumer consumer.cpp -lrdkafka++ -lrdkafka -lpthread -lz -ldl