一、簡介
Kafka
是最初由 Linkedin 公司開發,是一個分布式、分區的、多副本的、多訂閱者,基于 zookeeper
協調的分布式日志系統(也可以當做 MQ
系統),常見可以用于 web/nginx
日志、訪問日志,消息服務等等,Linkedin于2010年12月貢獻給了 Apache基金會 并成為頂級開源項目。
應用特性
- 分布式存儲:數據被自動分區并分布在集群的節點中。
- 消息有序性:
Kafka
能確保從生產者傳到消費者的記錄都是有序的。 - 高容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)。
- 高吞吐量:
Kafka
支持單機每秒至少處理10萬以上消息,通常可以達到數百萬條消息。 - 易擴展性:支持集群熱擴展。
- 高并發:支持數千個客戶端同時讀寫。
- 持久性:支持消息數據持久化到本地磁盤 并支持數據備份和靈活配置數據的持久化時間。
- 實時處理/低延遲:在數據寫入的同時對進行處理,消息延遲最低只有幾毫秒。
應用場景
Kafka
本質是 支持分布式的消息系統/消息中間件
。分析 Kafka
的應用場景等同于分析 消息中級件
的應用場景。通常,使用 消息系統
的 發布/訂閱模型 功能來連接 生產者
和 消費者
。實現以下三大功能:
- 生產者和消費者的解耦
- 消息持久化 / 消息冗余
- 消息緩沖 / 流量消峰
具體應用場景有:
- 日志收集或數據管道:作為日志收集系統或數據處理管道的一部分,以處理大量的日志數據或實時數據流。
- 負載均衡:如果系統收到大量請求或數據流,可以使用消息隊列把這些任務平均分配給多個處理器或服務,從而實現負載均衡。
- 系統解耦:消息隊列經常用作不同服務間的通信機制,以解耦系統的不同部分。
- 分布式事務:如果一個事務需要跨多個服務進行,可以使用消息隊列來協調不同服務之間的通信,確保事務的原子性。
- 實時流數據處理:比如實時日志分析或者實時數據報警。Kafka 能接收實時數據流并保證它的可靠性和持久性,這樣就可以在上游源源不斷生產數據的同時,下游可以實時地進行分析。
- 通知和實時更新:消息隊列可以用作通知的中介,比如告知用戶完成某個任務,或者在后端數據更新時實時通知前端。
設計目標
- 高性能:以時間復雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間的訪問性能。
- 高吞吐率:即使在非常廉價的商用機器上也能做到單機支持每秒100K條消息的傳輸。
- 消息系統:支持Kafka Server間的消息分區,及分布式消費,同時保證每個partition內的消息順序傳輸。
- 橫向擴展:支持在線水平熱擴展
二、kafka安裝和配置
1. zookeeper安裝配置
需要說明一下, 為了支持 Kafka
的集群功能, Zookeeper
必須使用集群模式部署。
本文以部署 3 個Zookeeper 實例的偽集群為例。具體安裝步驟參閱之前的文章:Zookeeper 安裝教程和使用指南
2. kafka安裝配置
下載鏈接:Kafka Downloads
下載頁面中包含兩種下載方式
- : kafka-[version]-src.tgz:包含 Kafka 源碼和API源碼,需要自己編譯
a) 安裝
[root@Ali ~]# wget https://downloads.apache.org/kafka/3.6.2/kafka_2.12-3.6.2.tgz
[root@Ali ~]# tar xzvf kafka_2.12-3.6.2.tgz
[root@Ali ~]# mv /usr/local/kafka_2.12-3.6.2 /usr/local/kafka
b) 配置實例
配置第一個 Kafka
實例
# broker 編號,集群內必須唯一
broker.id=1
# 監聽所有ip的9091端口,PLAINTEXT表示明文傳輸
listeners=PLAINTEXT://:9091
# 相當于listeners=PLAINTEXT://0.0.0.0:9091
# 消息日志存放地址
log.dirs=/usr/local/kafka/logs
# ZooKeeper 地址,多個用,分隔 /kafka指定在zk上的目錄
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka
配置第二個 Kafka
實例
# broker 編號,集群內必須唯一
broker.id=1
# 監聽所有ip的9092端口,PLAINTEXT表示明文傳輸
listeners=PLAINTEXT://:9092
# 消息日志存放地址
log.dirs=/opt/kafka/logs
# ZooKeeper 地址,多個用,分隔
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka
注:兩個客戶端的listeners中的port不能一樣
4) 服務管理
# 啟動服務 -daemon 表示后臺啟動
$KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties# 查看服務
jps -l43330 org.apache.zookeeper.server.quorum.QuorumPeerMain14356 org.elasticsearch.bootstrap.Elasticsearch14583 org.logstash.Logstash45976 kafka.Kafka # kafka服務進程netstat -anlpt | grep 9091tcp6 0 0 :::9091 :::* LISTEN 45976/javatcp6 0 0 192.168.18.128:9091 192.168.18.128:49356 TIME_WAIT -# 關閉服務
$KAFKA_HOME/bin/kafka-server-stop.sh
3. 常用操作
1) 創建topic
#兩條命令效果一樣
bin/kafka-topics.sh --create --bootstrap-server localhost:9091 --partitions 2 --replication-factor 2 --topic yumu
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 2 --replication-factor 2 --topic yumu
在kafka1上創建一個topic,會自動同步到其他客戶端
--create
表示創建操作--zookeeper
指定了 Kafka 連接的 ZooKeeper--partitions
表示每個主題4個分區--replication-factor
表示創建每個分區創建2個副本(副本因子)--topic
表示主題名稱。
注:副本因子不能超過存活的broker數量,否則報錯:Replication factor: 20 larger than available brokers: xxx.
2) 查看topic
# 查看topic列表 #兩條命令效果一樣
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka __consumer_offsetstopic-demoyumu# 查看topic詳細信息 #兩條命令效果一樣
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic yumu
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic yumu Topic: yumu PartitionCount: 2 ReplicationFactor: 2 Configs:Topic: yumu Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2Topic: yumu Partition: 1 Leader: 1 Replicas: 2,1 Isr: 1,2
3) 測試通信
# 窗口1,啟動生產者,向yumu主題發送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu# 窗口2,啟動消費者,訂閱yumu主題
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu# 窗口3,啟動消費者,訂閱yumu主題
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu=====結果=====
# 生產者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu
>hello, kafka!
>once again.
>
# 消費者1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu
hello, kafka!
once again.# 消費者2
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu
hello, kafka!
once again.# 查看所有消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu --from-beginning# 刪除topic
bin/kafka-topics.sh --delete --bootstrap-server localhost:9091 --topic yumu
三、遇到的問題
1. 第一次啟動kafka成功后,關閉kafka并修改配置,再次啟動失敗,報錯如下:
[2020-11-07 20:43:00,866] INFO Cluster ID = MChFWWMBT9GJClVEriND5A (kafka.server.KafkaServer)
[2020-11-07 20:43:00,873] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID MChFWWMBT9GJClVEriND5A doesn't match stored clusterId Some(c6QPfvqlS6C3gtsYZptQ8Q) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.at kafka.server.KafkaServer.startup(KafkaServer.scala:235)at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)at kafka.Kafka$.main(Kafka.scala:82)at kafka.Kafka.main(Kafka.scala)
[2020-11-07 20:43:00,875] INFO shutting down (kafka.server.KafkaServer)
[2020-11-07 20:43:00,877] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,986] INFO Session: 0x1000da0dde2000c closed (org.apache.zookeeper.ZooKeeper)
[2020-11-07 20:43:00,986] INFO EventThread shut down for session: 0x1000da0dde2000c (org.apache.zookeeper.ClientCnxn)
[2020-11-07 20:43:00,987] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,992] INFO shut down completed (kafka.server.KafkaServer)[2020-11-07 20:43:00,992] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
[2020-11-07 20:43:00,993] INFO shutting down (kafka.server.KafkaServer)
原因:
kafka啟動之后會生成一些日志和配置,導致這個問題的原因是第一次啟動之后生成了log/meta.properties文件
cat meta.properties
#
#Sat Nov 07 21:43:51 CST 2020
broker.id=1
version=0
cluster.id=MChFWWMBT9GJClVEriND5A
第二次改完配置后再去啟動的時候生成應該會生成一個新的id,新的id和舊的ID不一致導致無法啟動,刪除log/meta.properties文件后重新啟動即可(疑問:是不是我關閉的方法不對呢?)
推薦閱讀:
- Kafka介紹
- ELK介紹
- Kafka安裝
- C語言操作kafka以及安裝librdkafka庫
下一篇:Kafka消息系統原理