本篇我們將主要介紹如何在 Ubuntu 22.04.2 LTS 環境下,實現一個Kafka+Telegraf+CnosDB 同步實時獲取流數據并存儲的方案。在本次操作中,CnosDB 版本是2.3.0,Kafka 版本是2.5.1,Telegraf 版本是1.27.1?
隨著越來越多的應用程序架構轉向微服務或無服務器結構,應用程序和服務的數量每天都在增加。用戶既可以通過實時聚合,也可以通過輸出為測量或指標的計算,來處理數量不斷增加的時間序列數據。面對產生的海量數據,用戶可以通過多種方式來捕獲和觀察系統中數據的變化,在云原生環境中,最流行的一種是使用事件。
Apache Kafka是一個耐用、高性能的消息系統,也被認為是分布式流處理平臺。它可應用于許多用例,包括消息傳遞、數據集成、日志聚合和指標。而就指標而言,僅有消息主干或代理是不夠的。雖然 Apache Kafka 很耐用,但它并不是為運行指標和監控查詢而設計的。這恰恰正是 CnosDB 的長處。
架構方案
通過將這Kafka、Telegraf和CnosDB 三者結合起來,可以實現數據的完整流程:
- 數據生成:使用傳感器、設備或其他數據源產生數據,并將其發送到Kafka主題。
- Kafka 消息隊列:Kafka 接收并存儲數據流,確保數據安全和可靠性。
- Telegraf 消費者:Telegraf 作為 Kafka 的消費者,訂閱 Kafka 主題并獲取數據流。
- CnosDB 數據存儲:經過預處理的數據由 Telegraf 發送到 CnosDB 中進行時序數據的存儲。
整體的應用程序架構如圖所示:
Kafka
Apache Kafka 是一個開源分布式流處理平臺,它被設計用于處理實時數據流,具有高可靠性、高吞吐量和低延遲的特點,目前已經被大多數公司使用。它的使用方式非常多樣化,包括:
- 流處理:它通過存儲實時事件以進行聚合、豐富和處理來提供事件主干。
- 指標:Apache Kafka 成為許多分布式組件或應用程序(例如微服務)的集中聚合點。這些應用程序可以發送實時指標以供其他平臺使用,包括 CnosDB。
- 數據集成:可以捕獲數據和事件更改并將其發送到 Apache Kafka,任何需要對這些更改采取行動的應用程序都可以使用它們。
- 日志聚合:Apache Kafka 可以充當日志流平臺的消息主干,將日志塊轉換為數據流。
幾個核心概念
- 實例(Broker):Kafka的Broker是Kafka集群中的服務器節點,負責存儲和轉發消息,提供高可用性、容錯性和可靠性。
- 主題(Topic):Apache Kafka 中的 topic ,是邏輯存儲單元,就像關系數據庫的表一樣。主題通過分區通過代理進行分發,提供可擴展性和彈性。
- 生產者(Producer):生產者將消息發布到Kafka的指定主題。生產者可以選擇將消息發送到特定的分區,也可以讓Kafka自動決定分配策略。
- 消費者(Consumer):消費者從指定主題的一個或多個分區中讀取消息。消費者可以以不同的方式進行組織,如單播、多播、消費者組等。
- 發布-訂閱模式:是指生產者將消息發布到一個或多個主題,而消費者可以訂閱一個或多個主題,從中接收并處理消息。
簡單來說就是,當客戶端將數據發送到 Apache Kafka 集群實例時,它必須將其發送到某個主題。
此外,當客戶端從 Apache Kafka 集群讀取數據時,它必須從主題中讀取。向 Apache Kafka 發送數據的客戶端成為生產者,而從 Kafka 集群讀取數據的客戶端則成為消費者。數據流向示意圖如下:
注:這里沒有引入更復雜的概念,如topic分區、偏移量、消費者組等,用戶可自行參考官方指導文檔學習:
Kafka:【https://kafka.apache.org/documentation/#gettingStarted】
部署 Kafka
下載并安裝Kafka【https://kafka.apache.org/】
1.前提:需確保有 JDK 環境和 Zookeeper 環境,如果沒有可以使用下面的命令進行安裝:
sudo apt install openjdk-8-jdk
sudo apt install zookeeper
2.下載 Kafka 安裝包并解壓
wget https://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz
tar -zxvf kafka_2.12-2.5.1.tgz
3.進入解壓后的 Kafka 目錄
cd kafka_2.12-2.5.1
4.修改$KAFKA_HOME/config/server.properties的配置文件(可按需修改端口、日志路徑等配置信息)
5.保存并關閉編輯器。運行下面的命令來啟動Kafka:
bin/kafka-server-start.sh config/server.properties
Kafka 將在后臺運行,并通過默認的 9092 端口監聽連接。
Telegraf
Telegraf 是一個開源的服務器代理程序,用于收集、處理和傳輸系統和應用程序的指標數據。Telegraf 支持多種輸入插件和輸出插件,并且能夠與各種不同類型的系統和服務進行集成。它可以從系統統計、日志文件、API 接口、消息隊列等多個來源采集指標數據,并將其發送到各種目標,如 CnosDB 、Elasticsearch、Kafka、Prometheus 等。這使得 Telegraf 非常靈活,可適應不同的監控和數據處理場景。
- 輕量級:Telegraf被設計為一個輕量級的代理程序,對系統資源的占用相對較小,可以高效運行在各種環境中。
- 插件驅動:Telegraf使用插件來支持各種輸入和輸出功能。它提供了豐富的插件生態系統,涵蓋了眾多的系統和服務。用戶可以根據自己的需求選擇合適的插件來進行指標數據的采集和傳輸。
- 數據處理和轉換:Telegraf具有靈活的數據處理和轉換功能,可以通過插件鏈(Plugin Chain)來對采集到的指標數據進行過濾、處理、轉換和聚合,從而提供更加精確和高級的數據分析。
部署 Telegraf
1.安裝 Telegraf
sudo apt-get update && sudo apt-get install telegraf
2.切換到 Telegraf 的默認配置文件所處目錄 /etc/telegraf 下
3.在配置文件 telegraf.config 中添加目標 OUTPUT PLUGIN
[[outputs.http]]url = "http://127.0.0.1:8902/api/v1/write?db=telegraf"timeout = "5s"method = "POST"username = "root"password = ""data_format = "influx"use_batch_format = truecontent_encoding = "identity"idle_conn_timeout = 10
按需修改的參數:
url:CnosDB 地址和端口
username:連接 CnosDB 的用戶名
password:連接 CnosDB 的用戶名對應的密碼
注:其余參數可與上述配置示例中保持一致
4.在配置文件中將下面的配置注釋放開,可按需修改
[[inputs.kafka_consumer]]
brokers = ["127.0.0.1:9092"]
topics = ["oceanic"]
data_format = "json"
參數:
brokers:Kafka 的 broker list
topics:指定寫入 Kafka 目標的 topic
data_format:寫入數據的格式
注:其余參數可與上述配置示例中保持一致
5.啟動 Telegraf
telegraf -config /etc/telegraf/telegraf.conf
CnosDB
部署 CnosDB
詳細操作請參考: CnosDB 安裝
【https://docs.cnosdb.com/zh/latest/start/install.html】
整合
Kafka創建topic
1.進入 kafka 的 bin 文件夾下
2.執行命令,創建 topic
./kafka-topics.sh --create --topic oceanic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Python 模擬寫入數據到Kakfa
1.編寫代碼:
import time
import json
import randomfrom kafka import KafkaProducerdef random_pressure():return round(random.uniform(0, 10), 1)def random_tempreture():return round(random.uniform(0, 100), 1)def random_visibility():return round(random.uniform(0, 100), 1)def get_json_data():data = {}data["pressure"] = random_pressure()data["temperature"] = random_temp_cels()data["visibility"] = random_visibility()return json.dumps(data) def main():producer = KafkaProducer(bootstrap_servers=['ip:9092'])for _ in rang(2000):json_data = get_json_data()producer.send('oceanic', bytes(f'{json_data}','UTF-8'))print(f"Sensor data is sent: {json_data}")time.sleep(5)if __name__ == "__main__":main()
2.運行Python腳本
python3 test.py
查看 kafka topic 中的數據
1.執行下面查看指定 topic 數據的命令
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic oceanic --from-beginning
查看同步到 CnosDB 中的數據
1.使用工具連接到CnosDB
cnosdb-cli
2.切換到指定庫
\c public
3.查看數據
select * from kafka_consumer;
補充閱讀
1.使用 Telegraf 采集數據并寫入 CnosDB:
https://docs.cnosdb.com/zh/latest/versatility/collect/telegraf.html
2.Python 連接器:
https://docs.cnosdb.com/zh/latest/reference/connector/python.html
3.CnosDB 快速開始:
https://docs.cnosdb.com/zh/latest/start/quick_start.html