高效數據傳輸:輕松上手將Kafka實時數據接入CnosDB

本篇我們將主要介紹如何在 Ubuntu 22.04.2 LTS 環境下,實現一個Kafka+Telegraf+CnosDB 同步實時獲取流數據并存儲的方案。在本次操作中,CnosDB 版本是2.3.0,Kafka 版本是2.5.1,Telegraf 版本是1.27.1?

隨著越來越多的應用程序架構轉向微服務或無服務器結構,應用程序和服務的數量每天都在增加。用戶既可以通過實時聚合,也可以通過輸出為測量或指標的計算,來處理數量不斷增加的時間序列數據。面對產生的海量數據,用戶可以通過多種方式來捕獲和觀察系統中數據的變化,在云原生環境中,最流行的一種是使用事件。

Apache Kafka是一個耐用、高性能的消息系統,也被認為是分布式流處理平臺。它可應用于許多用例,包括消息傳遞、數據集成、日志聚合和指標。而就指標而言,僅有消息主干或代理是不夠的。雖然 Apache Kafka 很耐用,但它并不是為運行指標和監控查詢而設計的。這恰恰正是 CnosDB 的長處。

架構方案

通過將這Kafka、Telegraf和CnosDB 三者結合起來,可以實現數據的完整流程:

  1. 數據生成:使用傳感器、設備或其他數據源產生數據,并將其發送到Kafka主題。
  2. Kafka 消息隊列:Kafka 接收并存儲數據流,確保數據安全和可靠性。
  3. Telegraf 消費者:Telegraf 作為 Kafka 的消費者,訂閱 Kafka 主題并獲取數據流。
  4. CnosDB 數據存儲:經過預處理的數據由 Telegraf 發送到 CnosDB 中進行時序數據的存儲。

整體的應用程序架構如圖所示:

圖片

Kafka

Apache Kafka 是一個開源分布式流處理平臺,它被設計用于處理實時數據流,具有高可靠性、高吞吐量和低延遲的特點,目前已經被大多數公司使用。它的使用方式非常多樣化,包括:

  • 流處理:它通過存儲實時事件以進行聚合、豐富和處理來提供事件主干。
  • 指標:Apache Kafka 成為許多分布式組件或應用程序(例如微服務)的集中聚合點。這些應用程序可以發送實時指標以供其他平臺使用,包括 CnosDB。
  • 數據集成:可以捕獲數據和事件更改并將其發送到 Apache Kafka,任何需要對這些更改采取行動的應用程序都可以使用它們。
  • 日志聚合:Apache Kafka 可以充當日志流平臺的消息主干,將日志塊轉換為數據流。

幾個核心概念

  1. 實例(Broker):Kafka的Broker是Kafka集群中的服務器節點,負責存儲和轉發消息,提供高可用性、容錯性和可靠性。
  2. 主題(Topic):Apache Kafka 中的 topic ,是邏輯存儲單元,就像關系數據庫的表一樣。主題通過分區通過代理進行分發,提供可擴展性和彈性。
  3. 生產者(Producer):生產者將消息發布到Kafka的指定主題。生產者可以選擇將消息發送到特定的分區,也可以讓Kafka自動決定分配策略。
  4. 消費者(Consumer):消費者從指定主題的一個或多個分區中讀取消息。消費者可以以不同的方式進行組織,如單播、多播、消費者組等。
  5. 發布-訂閱模式:是指生產者將消息發布到一個或多個主題,而消費者可以訂閱一個或多個主題,從中接收并處理消息。

簡單來說就是,當客戶端將數據發送到 Apache Kafka 集群實例時,它必須將其發送到某個主題。

此外,當客戶端從 Apache Kafka 集群讀取數據時,它必須從主題中讀取。向 Apache Kafka 發送數據的客戶端成為生產者,而從 Kafka 集群讀取數據的客戶端則成為消費者。數據流向示意圖如下:

圖片

注:這里沒有引入更復雜的概念,如topic分區、偏移量、消費者組等,用戶可自行參考官方指導文檔學習:

Kafka:【https://kafka.apache.org/documentation/#gettingStarted】

部署 Kafka

下載并安裝Kafka【https://kafka.apache.org/】

1.前提:需確保有 JDK 環境和 Zookeeper 環境,如果沒有可以使用下面的命令進行安裝:

sudo apt install openjdk-8-jdk
sudo apt install zookeeper

2.下載 Kafka 安裝包并解壓

wget https://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz
tar -zxvf kafka_2.12-2.5.1.tgz

3.進入解壓后的 Kafka 目錄

cd  kafka_2.12-2.5.1

4.修改$KAFKA_HOME/config/server.properties的配置文件(可按需修改端口、日志路徑等配置信息)

5.保存并關閉編輯器。運行下面的命令來啟動Kafka:

bin/kafka-server-start.sh config/server.properties

Kafka 將在后臺運行,并通過默認的 9092 端口監聽連接。

Telegraf

Telegraf 是一個開源的服務器代理程序,用于收集、處理和傳輸系統和應用程序的指標數據。Telegraf 支持多種輸入插件和輸出插件,并且能夠與各種不同類型的系統和服務進行集成。它可以從系統統計、日志文件、API 接口、消息隊列等多個來源采集指標數據,并將其發送到各種目標,如 CnosDB 、Elasticsearch、Kafka、Prometheus 等。這使得 Telegraf 非常靈活,可適應不同的監控和數據處理場景。

  • 輕量級:Telegraf被設計為一個輕量級的代理程序,對系統資源的占用相對較小,可以高效運行在各種環境中。
  • 插件驅動:Telegraf使用插件來支持各種輸入和輸出功能。它提供了豐富的插件生態系統,涵蓋了眾多的系統和服務。用戶可以根據自己的需求選擇合適的插件來進行指標數據的采集和傳輸。
  • 數據處理和轉換:Telegraf具有靈活的數據處理和轉換功能,可以通過插件鏈(Plugin Chain)來對采集到的指標數據進行過濾、處理、轉換和聚合,從而提供更加精確和高級的數據分析。

部署 Telegraf

1.安裝 Telegraf

sudo apt-get update && sudo apt-get install telegraf

2.切換到 Telegraf 的默認配置文件所處目錄 /etc/telegraf 下

3.在配置文件 telegraf.config 中添加目標 OUTPUT PLUGIN

[[outputs.http]]url = "http://127.0.0.1:8902/api/v1/write?db=telegraf"timeout = "5s"method = "POST"username = "root"password = ""data_format = "influx"use_batch_format = truecontent_encoding = "identity"idle_conn_timeout = 10

按需修改的參數:

url:CnosDB 地址和端口
username:連接 CnosDB 的用戶名
password:連接 CnosDB 的用戶名對應的密碼

注:其余參數可與上述配置示例中保持一致

4.在配置文件中將下面的配置注釋放開,可按需修改

[[inputs.kafka_consumer]]
brokers = ["127.0.0.1:9092"]
topics = ["oceanic"]
data_format = "json"

參數:

brokers:Kafka 的 broker list 
topics:指定寫入 Kafka 目標的 topic
data_format:寫入數據的格式

注:其余參數可與上述配置示例中保持一致

5.啟動 Telegraf

telegraf -config /etc/telegraf/telegraf.conf

CnosDB

部署 CnosDB

詳細操作請參考: CnosDB 安裝

【https://docs.cnosdb.com/zh/latest/start/install.html】

整合

Kafka創建topic

1.進入 kafka 的 bin 文件夾下

2.執行命令,創建 topic

./kafka-topics.sh --create --topic oceanic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Python 模擬寫入數據到Kakfa

1.編寫代碼:

import time
import json
import randomfrom kafka import KafkaProducerdef random_pressure():return round(random.uniform(0, 10), 1)def random_tempreture():return round(random.uniform(0, 100), 1)def random_visibility():return round(random.uniform(0, 100), 1)def get_json_data():data = {}data["pressure"] = random_pressure()data["temperature"] = random_temp_cels()data["visibility"] = random_visibility()return json.dumps(data) def main():producer = KafkaProducer(bootstrap_servers=['ip:9092'])for _ in rang(2000):json_data = get_json_data()producer.send('oceanic', bytes(f'{json_data}','UTF-8'))print(f"Sensor data is sent: {json_data}")time.sleep(5)if __name__ == "__main__":main()

2.運行Python腳本

python3 test.py

查看 kafka topic 中的數據

1.執行下面查看指定 topic 數據的命令

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic oceanic --from-beginning

圖片

查看同步到 CnosDB 中的數據

1.使用工具連接到CnosDB

cnosdb-cli

2.切換到指定庫

\c public

3.查看數據

select * from kafka_consumer;

圖片

補充閱讀

1.使用 Telegraf 采集數據并寫入 CnosDB:

https://docs.cnosdb.com/zh/latest/versatility/collect/telegraf.html

2.Python 連接器:

https://docs.cnosdb.com/zh/latest/reference/connector/python.html

3.CnosDB 快速開始:

https://docs.cnosdb.com/zh/latest/start/quick_start.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/35078.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/35078.shtml
英文地址,請注明出處:http://en.pswp.cn/news/35078.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

無涯教程-Perl - redo函數

描述 此函數將重新啟動當前循環,而不會強制判斷控制語句。塊中不再執行任何語句。如果存在繼續塊,將不會執行。如果指定了LABEL,則在LABEL標識的循環開始時重新開始執行。 語法 以下是此函數的簡單語法- redo LABELredo返回值 此函數不返回任何值。 例 以下是顯示其基本…

用友時空KSOA SQL注入漏洞復現(HW0day)

0x01 產品簡介 用友時空KSOA是建立在SOA理念指導下研發的新一代產品,是根據流通企業最前沿的I需求推出的統一的IT基礎架構,它可以讓流通企業各個時期建立的IT系統之間彼此輕松對話,幫助流通企業保護原有的IT投資,簡化IT管理&#…

以商業大數據技術助力數據合規流通體系建立,合合信息參編《數據經紀從業人員評價規范》團標

經國務院批準,由北京市人民政府、國家發展和改革委員會、工業和信息化部、商務部、國家互聯網信息辦公室、中國科學技術協會共同主辦的2023 全球數字經濟大會于近期隆重召開。由數交數據經紀(深圳)有限公司為主要發起單位,合合信息…

深度剖析堆棧指針

為什么打印root的值與&root->value的值是一樣的呢 測試結果: *號一個變量到底取出來的是什么? 以前我寫過一句話,就是說,如果看到一個*變量,那就是直逼這個變量所保存的內存地址,然后取出里面保存的…

Skeleton-Aware Networks for Deep Motion Retargeting

Skeleton-Aware Networks for Deep Motion Retargeting解析 摘要1. 簡介2. Related Work2.1 運動重定向(Motion Retargeting)2.2 Neural Motion Processing 3. 概述(Overview)4. 骨骼感知深度運動處理4.1 運動表征4.2 骨架卷積4.3…

Spring Boot + Vue3前后端分離實戰wiki知識庫系統<十二>--用戶管理單點登錄開發一

目標: 在上一次Spring Boot Vue3前后端分離實戰wiki知識庫系統<十一>--文檔管理功能開發三我們已經完成了文檔管理的功能模塊開發,接下來則開啟新模塊的學習---用戶登錄,這塊還是有不少知識點值得學習的,…

指針與引用:C語言中的內存魔法

開始本篇文章之前先推薦一個好用的學習工具,AIRIght,借助于AI助手工具,學習事半功倍。歡迎訪問:http://airight.fun/。 也把我學習過程中搜集的資料分享給大家,希望可以幫助大家少走彎路,鏈接:h…

機器人CPP編程基礎-02變量Variables

機器人CPP編程基礎-01第一個程序Hello World 基礎代碼都可以借助人工智能工具進行學習。 C #include<iostream>using namespace std;main() {//Declaring an integer type variable A, allocates 4 bytes of memory.int A4;cout<<A <<endl;//Prints the a…

Matlab繪制圓形(rectangle函數、viscircles函數和圓的參數方程)

基于matlab繪制圓形 一、rectangle函數 對于繪制圓心坐標為&#xff08;x&#xff0c;y&#xff09;半徑為r的圓形&#xff0c;函數為&#xff1a; x0; y0; r1; rectangle(Position, [x-r,y-r,2*r,2*r], Curvature, [1 1],EdgeColor, r); axis equalEdgeColor表示顏色 二、…

多版本node環境搭建切換管理NVM

Node.js NVM 全名 Node Version Management 一、Node 模塊對象 參考博客 Node 模塊對象 二、Node 多版本管理NVM &#xff08;1&#xff09;參考 Node 多版本管理 &#xff08;2&#xff09;github上NVM工具 nvm-windows mirrors / coreybutler / nvm-windows GitCode…

消息隊列(12) - 定義服務器類

目錄 前言設計思想 前言 之前,我們寫了通信協議的具體設計,接下來我們設計服務器類 設計思想 我們先只考慮一個虛擬主機的情況下, 在一個虛擬主機的情況下,我們需要有一個session會話來幫助我們存儲信息,并且既然是網絡通信,那么socket關鍵字肯定也必不可少,我們在引入一個線…

解決lldb調試時可能出現的personality set failed: Function not implemented

最近在嘗試使用Visual Studio 2022遠程連接Linux進行C/C的開發&#xff0c;由于CentOS風波不斷&#xff0c;所以現在的開發基本上都是使用ubuntu了&#xff0c;但是目前VS2022有一些BUG&#xff0c;就是遠程調試時&#xff0c;如果目標系統是ubuntu則會出現啟動調試器很慢的問題…

mysql高并發下主鍵自增打來的問題

在一般情況下&#xff0c;在新增領域對象后&#xff0c;都需要獲取對應的主鍵值。使用應用層來維護主鍵&#xff0c;在一定程度上有利于程序性能的優化和應用移植性的提高。在采用數據庫自增主鍵的方案里&#xff0c;如果JDBC驅動不能綁定新增記錄對應的主鍵&#xff0c;就需要…

LeetCode 1281. 整數的各位積和之差

【LetMeFly】1281.整數的各位積和之差 力扣題目鏈接&#xff1a;https://leetcode.cn/problems/subtract-the-product-and-sum-of-digits-of-an-integer/ 給你一個整數 n&#xff0c;請你幫忙計算并返回該整數「各位數字之積」與「各位數字之和」的差。 示例 1&#xff1a; …

學習筆記整理-JS-03-表達式和運算符

[[toc]] 一、表達式和運算符 1. 表達式 表達式種類 算術、關系、邏輯、賦值、綜合 二、JS基本表達式 1. 算術運算符 意義運算符加減-乘*除/取余% 加減乘除 加減的符號和數學一致&#xff0c;乘號是*號&#xff0c;除法是/號默認情況&#xff0c;乘除法的優先級高于加法和…

安卓源碼分析(10)Lifecycle實現組件生命周期管理

參考&#xff1a; https://developer.android.google.cn/topic/libraries/architecture/lifecycle?hlzh-cn#java https://developer.android.google.cn/reference/androidx/lifecycle/Lifecycle 文章目錄 1、概述2、LifeCycle類3、LifecycleOwner類4、LifecycleObserver類 1、…

數據庫字段命名導致的SQL報錯

1.表設計 create table variables (id bigint not null comment 主鍵,business_key varchar(128) null comment 業務key,key varchar(128) null comment Map中的key,value varchar(255) null comment…

Centos yum命令大全

1.使用YUM查找軟件包 $ yum search python 2.列出所有可安裝的軟件包 $ yum list | grep python 3.列出所有可更新的軟件包 $ yum list updates 4.列出所有已安裝的軟件包 $ yum list installed | grep python

[GIN-debug] [ERROR] listen tcp: address 8080: missing port in address

學習Golang_gin框架的第一天 遇到一下報錯 : [GIN-debug] [ERROR] listen tcp: address 8080: missing port in address 錯誤代碼 : package mainimport "github.com/gin-gonic/gin"func main() {router : gin.Default()router.GET("/index", func…

910數據結構(2014年真題)

算法設計題 問題1 已知一個帶頭結點的單鏈表head&#xff0c;假設結點中的元素為整數&#xff0c;試編寫算法&#xff1a;按遞增次序輸出單鏈表中各個結點的數據元素&#xff0c;并釋放結點所占的存儲空間。要求&#xff1a;(1)用文字給出你的算法思想&#xff1b;(2)不允許使…