物聯網_TDengine_EMQX_性能測試

一、Tdengine接口開發文檔

1、數據庫

1.創建數據庫

  • URL

    /dp/createdb/

  • method

    post

  • 請求示例

    {"db_name":"demo01" // 必填
    }
    
  • 響應示例

    // 成功
    {"code": 1,"data": {"成功創建數據庫": "demo04"},"error": null
    }
    
    // 失敗
    {"code": 0,"data": null,"error": "創建數據庫失敗, 該數據庫已存在!"
    }2.獲取數據庫
    

2.獲取數據庫

  • URL

    /dp/createdb/

  • method

    get

  • 請求示例

    /dp/createdb/?db_name=test

    參數:db_name 數據庫名稱,非必填

    未填數據庫名稱則查詢所有數據庫

    填寫數據庫名稱則查詢數據庫參數

  • 響應示例

    // 查詢所有數據庫
    {"code": 1,"data": ["information_schema","performance_schema","demo01","demo06","test","demo03","demo02"],"error": null
    }
    
    // 查詢數據庫參數
    {"code": 1,"data": {"name": "test","create_time": "2024-02-21T10:33:32.727000","vgroups": 2,"ntables": 2,"replica": 1,"strict": "on","duration": "14400m","keep": "5256000m,5256000m,5256000m","buffer": 256,"pagesize": 4,"pages": 256,"minrows": 100,"maxrows": 4096,"comp": 2,"precision": "ms","status": "ready","retentions": null,"single_stable": false,"cachemodel": "last_row","cachesize": 1,"wal_level": 1,"wal_fsync_period": 3000,"wal_retention_period": 0,"wal_retention_size": 0,"wal_roll_period": 0,"wal_segment_size": 0,"stt_trigger": 1,"table_prefix": 0,"table_suffix": 0,"tsdb_pagesize": 4},"error": null
    }
    

3.修改數據庫參數

  • URL

    /dp/createdb/

  • method

    put

  • 請求示例

    {"db_name":"demo01", // 必填"cachemodel":"none","wal_level":"1"
    }
    
    • CACHEMODEL:表示是否在內存中緩存子表的最近數據。默認為 none。
      • none:表示不緩存。
      • last_row:表示緩存子表最近一行數據。這將顯著改善 LAST_ROW 函數的性能表現。
      • last_value:表示緩存子表每一列的最近的非 NULL 值。這將顯著改善無特殊影響(WHERE、ORDER BY、GROUP BY、INTERVAL)下的 LAST 函數的性能表現。
      • both:表示同時打開緩存最近行和列功能。 Note:CacheModel 值來回切換有可能導致 last/last_row 的查詢結果不準確,請謹慎操作。推薦保持打開。
    • CACHESIZE:表示每個 vnode 中用于緩存子表最近數據的內存大小。默認為 1 ,范圍是[1, 65536],單位是 MB。
    • BUFFER: 一個 VNODE 寫入內存池大小,單位為 MB,默認為 256,最小為 3,最大為 16384。
    • PAGES:一個 VNODE 中元數據存儲引擎的緩存頁個數,默認為 256,最小 64。一個 VNODE 元數據存儲占用 PAGESIZE * PAGES,默認情況下為 1MB 內存。
    • PAGESIZE:一個 VNODE 中元數據存儲引擎的頁大小,單位為 KB,默認為 4 KB。范圍為 1 到 16384,即 1 KB 到 16 MB。
    • REPLICA:表示數據庫副本數,取值為 1 或 3,默認為 1。在集群中使用,副本數必須小于或等于 DNODE 的數目。
    • STT_TRIGGER:表示落盤文件觸發文件合并的個數。默認為 1,范圍 1 到 16。對于少表高頻場景,此參數建議使用默認配置,或較小的值;而對于多表低頻場景,此參數建議配置較大的值。
    • WAL_LEVEL:WAL 級別,默認為 1。
      • 1:寫 WAL,但不執行 fsync。
      • 2:寫 WAL,而且執行 fsync。
    • WAL_FSYNC_PERIOD:當 WAL_LEVEL 參數設置為 2 時,用于設置落盤的周期。默認為 3000,單位毫秒。最小為 0,表示每次寫入立即落盤;最大為 180000,即三分鐘。
    • DURATION:數據文件存儲數據的時間跨度。可以使用加單位的表示形式,如 DURATION 100h、DURATION 10d 等,支持 m(分鐘)、h(小時)和 d(天)三個單位。不加時間單位時默認單位為天,如 DURATION 50 表示 50 天。
    • KEEP:表示數據文件保存的天數,缺省值為 3650,取值范圍 [1, 365000],且必須大于或等于3倍的 DURATION 參數值。數據庫會自動刪除保存時間超過 KEEP 值的數據。KEEP 可以使用加單位的表示形式,如 KEEP 100h、KEEP 10d 等,支持 m(分鐘)、h(小時)和 d(天)三個單位。也可以不寫單位,如 KEEP 50,此時默認單位為天。企業版支持多級存儲功能, 因此, 可以設置多個保存時間(多個以英文逗號分隔,最多 3 個,滿足 keep 0 <= keep 1 <= keep 2,如 KEEP 100h,100d,3650d); 社區版不支持多級存儲功能(即使配置了多個保存時間, 也不會生效, KEEP 會取最大的保存時間)。
    • COMP:表示數據庫文件壓縮標志位,缺省值為 2,取值范圍為[0, 2]。
      • 0:表示不壓縮。
      • 1:表示一階段壓縮。
      • 2:表示兩階段壓縮。
    • SINGLE_STABLE:表示此數據庫中是否只可以創建一個超級表,用于超級表列非常多的情況。
      • 0:表示可以創建多張超級表。
      • 1:表示只可以創建一張超級表。
    • MAXROWS:文件塊中記錄的最大條數,默認為 4096 條。
    • MINROWS:文件塊中記錄的最小條數,默認為 100 條。
    • TSDB_PAGESIZE:一個 VNODE 中時序數據存儲引擎的頁大小,單位為 KB,默認為 4 KB。范圍為 1 到 16384,即 1 KB到 16 MB。
    • WAL_RETENTION_PERIOD: 為了數據訂閱消費,需要WAL日志文件額外保留的最大時長策略。WAL日志清理,不受訂閱客戶端消費狀態影響。單位為 s。默認為 3600,表示在 WAL 保留最近 3600 秒的數據,請根據數據訂閱的需要修改這個參數為適當值。
    • WAL_RETENTION_SIZE:為了數據訂閱消費,需要WAL日志文件額外保留的最大累計大小策略。單位為 KB。默認為 0,表示累計大小無上限。
  • 響應示例

    {"code": 1,"data": {"修改數據庫參數成功": 0},"error": null
    }
    

4.刪除數據庫

  • URL

    /dp/createdb/

  • method

    delete

  • 請求示例

    {// 可以刪除多個"db_names":["demo04","demo03"]
    }
    
  • 響應示例

    {"code": 1,"data": {"刪除數據庫成功": 2},"error": null
    }
    

2、超級表

1.創建超級表

  • URL

    /dp/createstable/

  • method

    post

  • 請求示例

    {"db_name": "demo06", // 必填"stable_name": "st01", // 必填"fields": {"ts": "TIMESTAMP", // 必填"current": "FLOAT", // 最低選填一個"voltage": "INT","phase": "FLOAT"},"tags":{"point":"varchar(100)", // 最低選填一個"status":"varchar(100)"}
    }
    
  • 響應示例

    {"code": 1,"data": {"創建超級表成功": "st04"},"error": null
    }
    

2.查詢超級表

  • URL

    /dp/createstable/

  • method

    get

  • 請求示例

    /dp/createstable/?db_name=demo06&stable_name=st01

    參數:db_name 數據庫名稱,必填

    ? stable_name 超級表名稱,非必填

    未填超級表名稱則查詢該數據庫下的所有超級表

    填寫超級表名稱則查詢該數據庫下的超級表的結構信息

  • 響應示例

    // 查詢該庫下的所有超級表
    {"code": 1,"data": ["st01","st02","st03","st04","demo06"],"error": null
    }
    
    // 查詢該庫下的該超級表的結構信息
    {"code": 1,"data": [{"field": "ts","type": "TIMESTAMP","length": 8,"note": ""},{"field": "current","type": "FLOAT","length": 4,"note": ""},{"field": "voltage","type": "INT","length": 4,"note": ""},{"field": "phase","type": "FLOAT","length": 4,"note": ""},{"field": "point","type": "VARCHAR","length": 100,"note": "TAG"},{"field": "status","type": "VARCHAR","length": 100,"note": "TAG"}],"error": null
    }
    

3.修改超級表

  • URL

    /dp/createstable/

  • method

    put

  • 請求示例

    // 添加表注釋
    {"db_name":"demo06","stable_name":"st01","command":"COMMENT","value": ["This is a table comment."]
    }
    
    // 添加新列
    {"db_name":"demo06","stable_name":"st01","command":"ADD COLUMN","value": ["test","FLOAT"]
    }
    
    // 刪除列
    {"db_name":"demo06","stable_name":"st01","command":"DROP COLUMN","value": ["test"]
    }
    
    // 重命名標簽
    {"db_name":"demo06","stable_name":"st01","command":"RENAME TAG","value": ["test_tag01","test_tag001"]
    }
    
    • “command”:“COMMENT”–添加表注釋
    • “command”:“ADD COLUMN”–添加新列
    • “command”:“DROP COLUMN”–刪除列
    • “command”:“ADD TAG”–添加標簽
    • “command”:“DROP TAG”–刪除標簽
    • “command”:“RENAME TAG”–重命名標簽
  • 響應信息

    {"code": 1,"data": {"修改超級表": "修改成功!"},"error": null
    }
    

4.刪除超級表

  • URL

    /dp/createstable/

  • method

    delete

  • 請求示例

    // 可刪除多個
    {"db_name":"demo06","stable_names":["st06","st05"]
    }
    
  • 響應示例

    {"code": 1,"data": {"刪除超級表成功": 2},"error": null
    }
    

3、子表

1.創建子表

  • URL

    /dp/createtable/

  • method

    post

  • 請求示例

    // 可以創建多個
    {"db_name":"demo06","stable_name":"st01","sub_tables":{"sub01":["辦公樓一層101","正常","1"],"sub02":["辦公樓二層201","正常","2"]}
    }
    
  • 響應示例

    {"code": 1,"data": {"創建子表成功": ["sub03","sub04"]},"error": null
    }
    

2.獲取子表

  • URL

    /dp/createtable/

  • method

    get

  • 請求示例

    /dp/createtable/?db_name=demo06

    參數:db_name 數據庫名稱,必填

    ? sub_table_name 子表名稱,非必填

    未填子表名稱則查詢該數據庫下的所有子表

    填寫子表名稱則查詢該數據庫下的子表的結構信息

  • 響應示例

    // 獲取所有子表
    {"code": 1,"data": ["sub01","sub02","sub03","sub04"],"error": null
    }
    
    // 獲取子表的結構信息
    {"code": 1,"data": [{"field": "ts","type": "TIMESTAMP","length": 8,"note": ""},{"field": "current","type": "FLOAT","length": 4,"note": ""},{"field": "voltage","type": "INT","length": 4,"note": ""},{"field": "phase","type": "FLOAT","length": 4,"note": ""},{"field": "test","type": "INT","length": 4,"note": ""},{"field": "point","type": "VARCHAR","length": 100,"note": "TAG"},{"field": "status","type": "VARCHAR","length": 100,"note": "TAG"},{"field": "test_tag001","type": "FLOAT","length": 4,"note": "TAG"}],"error": null
    }
    

3.修改子表

  • URL

    /dp/createtable/

  • method

    put

  • 請求示例

    // 只能修改子表標簽值
    {"db_name":"test","sub_table_name":"meters_sub3","tag_name": "location","new_tag_value":"beijing"
    }
    
  • 響應示例

    {"code": 1,"data": {"修改子表": "修改成功!"},"error": null
    }
    

4.刪除子表

  • URL

    /dp/createtable/

  • method

    delete

  • 請求示例

    // 可以同時刪除多個表
    {"db_name":"test","sub_table_names"["meters_sub7","meters_sub6"]
    }
    
  • 響應示例

    {"code": 1,"data": {"刪除子表成功": 2},"error": null
    }
    

4、向子表插入數據

1.插入數據

  • URL

    /dp/insertdata/

  • method

    post

  • 請求示例

    // 可以向多個子表根據字段插入多條記錄
    {"db_name":"test","table_data": {"d001":{"fields": ["ts", "current", "voltage", "phase"],"values": [["2021-07-13 14:06:34.630", 10.2, 219, 0.32],["2021-07-13 14:06:35.779", 10.15, 217, 0.33]]},"d1002": {"fields": ["ts", "current", "phase"],"values": [["2021-07-13 14:06:34.255", 10.27, 0.31]]}}
    }
    
  • 響應示例

    {"code": 1,"data": {"插入數據成功": "INSERT INTO d1001 (ts, current, voltage, phase) VALUES ('2021-07-13 14:06:34.630', '10.2', '219', '0.32') ('2021-07-13 14:06:35.779', '10.15', '217', '0.33') d1002 (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', '10.27', '0.31') "},"error": null
    }
    

2.修改數據

  • URL

    /dp/insertdata/

  • method

    put

  • 請求示例

    // 根據時間戳修改數據
    {"db_name":"test","sub_table_name":"d1001","ts_name":"ts","ts_value":"2021-07-13 00:00:00.000","fields":["current","voltage"],"value":["11","2"]
    }
    
  • 響應示例

    {"code": 1,"data": {"修改數據成功": "INSERT INTO d1001 (ts,current, voltage) VALUES ('2021-07-13 00:00:00.000','11', '2')"},"error": null
    }
    

3.刪除數據

  • URL

    /dp/insertdata/

  • method

    delete

  • 請求示例

    // 刪除時間戳等于2021-07-13 00:00:00.000的數據
    {"db_name":"test","sub_table_name":"d1001","ts_name":"ts","ts_value":"2021-07-13 00:00:00.000","operation":"EN"
    }
    
    • “operation”:“GT” – 大于(Greater Than)
    • “operation”:“GE” – 大于等于(Greater Than or Equal)
    • “operation”:“LT” – 小于(Less Than)
    • “operation”:“LE” – 小于等于(Less Than or Equal)
    • “operation”:“EQ” – 等于(Equal)
    • “operation”:“NE” – 不等于(Not Equal)
  • 響應示例

    {"code": 1,"data": {"刪除數據成功": "DELETE FROM d1001 WHERE ts = '2021-07-13 00:00:00.000'"},"error": null
    }
    

二、EMQX斷線重連

1、實現流程

  1. 設置客戶端配置
    • 配置持久化會話 (clean_session=False,并配置唯一ID)。
    • 設置重連策略,包括最大重試次數和重連間隔時間。
  2. 檢測連接狀態
    • 在連接斷開的回調函數中,記錄斷開的原因,并嘗試重連。
  3. 重連處理
    • 使用退避算法進行重連,并在重連成功后重新訂閱主題。

2、斷線重連邏輯

客戶端斷開連接后,自動調用on_disconnect方法,更改連接狀態,嘗試連接(調用reconnect重連方法)。

重連設置最大連接次數,以及使用退避算法,防止短時間內多次重連。

    def on_disconnect(self, client, userdata, rc):self.is_connected = Falseif rc != 0:print("客戶端斷開。嘗試重新連接...")self.reconnect()def reconnect(self):while not self.is_connected and self.reconnect_attempts < self.max_reconnect_attempts:try:wait_time = min(2 ** self.reconnect_attempts, 60) + random.uniform(0, 1)print(f"嘗試重新連接 {self.reconnect_attempts + 1}, 等待 {wait_time:.2f} 秒...")time.sleep(wait_time)self.client.reconnect()  # 使用 reconnect 方法重新連接self.is_connected = True  # 如果成功連接,設置 is_connected 為 Trueexcept Exception as e:print(f"重連失敗: {e}")self.reconnect_attempts += 1if self.reconnect_attempts >= self.max_reconnect_attempts:print("已達到最大重新連接嘗試次數,進入睡眠模式。")# 在這里可以添加進入休眠模式的邏輯

3、整體斷線重連demo代碼


import time
import random
import paho.mqtt.client as mqtt# MQTT Broker 配置
BROKER_ADDRESS = "192.168.101.130" # 服務端
BROKER_PORT = 1883 # 端口
KEEP_ALIVE_INTERVAL = 60  # 保活時間(秒)
TOPIC = "test/topic" # 主題
MAX_RECONNECT_ATTEMPTS = 10  # 最大重連嘗試次數
CLIENT_ID = "test_client_id" # client_id
USERNAME = 'test' # 用戶名
PASSWORD = 'test' # 密碼
QOS = 1
i = 1class MQTTClient:def __init__(self, broker_address, broker_port, topic, keep_alive=60, max_reconnect_attempts=10):self.broker_address = broker_addressself.broker_port = broker_portself.topic = topicself.keep_alive = keep_aliveself.max_reconnect_attempts = max_reconnect_attemptsself.reconnect_attempts = 0self.is_connected = False# 創建 MQTT 客戶端并啟用持久會話self.client = mqtt.Client(clean_session=False, client_id=CLIENT_ID)# 設置用戶名密碼self.client.username_pw_set(USERNAME, PASSWORD)# 設置回調函數self.client.on_connect = self.on_connectself.client.on_message = self.on_messageself.client.on_disconnect = self.on_disconnectdef on_connect(self, client, userdata, flags, rc):if rc == 0:print("連接到 broker ")self.is_connected = Trueself.reconnect_attempts = 0self.client.subscribe(self.topic, qos=QOS)  # 重新訂閱主題else:print(f"連接失敗并返回代碼 {rc}")def on_message(self, client, userdata, msg):print(f"在主題 '{msg.topic}' 收到的消息為: '{msg.payload.decode()}'")def on_disconnect(self, client, userdata, rc):self.is_connected = Falseif rc != 0:print("客戶端斷開。嘗試重新連接...")self.reconnect()def reconnect(self):while not self.is_connected and self.reconnect_attempts < self.max_reconnect_attempts:try:wait_time = min(2 ** self.reconnect_attempts, 60) + random.uniform(0, 1)print(f"嘗試重新連接 {self.reconnect_attempts + 1}, 等待 {wait_time:.2f} 秒...")time.sleep(wait_time)self.client.reconnect()  # 使用 reconnect 方法重新連接self.is_connected = True  # 如果成功連接,設置 is_connected 為 Trueexcept Exception as e:print(f"重連失敗: {e}")self.reconnect_attempts += 1if self.reconnect_attempts >= self.max_reconnect_attempts:print("已達到最大重新連接嘗試次數,進入睡眠模式。")# 在這里可以添加進入休眠模式的邏輯def start(self):# 初始連接try:self.client.connect(self.broker_address, self.broker_port, self.keep_alive)except Exception as e:print(f"初始連接失敗: {e} 。嘗試重新連接...")self.reconnect()# 啟動 MQTT 客戶端的循環self.client.loop_start()def stop(self):# 停止 MQTT 客戶端的循環并斷開連接self.client.loop_stop()self.client.disconnect()def publish(self, message):if self.is_connected:self.client.publish(self.topic, message, qos=QOS)if __name__ == "__main__":# 創建并啟動 MQTT 客戶端mqtt_client = MQTTClient(BROKER_ADDRESS, BROKER_PORT, TOPIC, KEEP_ALIVE_INTERVAL, MAX_RECONNECT_ATTEMPTS)mqtt_client.start()try:while True:message = f"Hello MQTT {i}"mqtt_client.publish(message)i = i + 1time.sleep(1)except KeyboardInterrupt:print("Exiting...")mqtt_client.stop()

三、查看所有的mqtt topic

認證需要API秘鑰

image-20240412171132570

API接口:

http://192.168.101.130:18083/api/v5/topics

代碼:

import urllib.request
import json
import base64username = 'ee26b0dd4af7e749'
password = 'ALsVuG69AjvFVVU1DoUKC3w0CeP1Nuajrd9CyI3brlCyN'# url = 'http://192.168.101.130:18083/api/v5/nodes'
url = "http://192.168.101.130:18083/api/v5/topics"req = urllib.request.Request(url)
req.add_header('Content-Type', 'application/json')auth_header = "Basic " + base64.b64encode((username + ":" + password).encode()).decode()
req.add_header('Authorization', auth_header)with urllib.request.urlopen(req) as response:data = json.loads(response.read().decode())print(data)

輸出結果:

{'data': [{'node': 'emqx@127.0.0.1', 'topic': 't/1'}, {'node': 'emqx@127.0.0.1', 'topic': '$share/c/t/1'}, {'node': 'emqx@127.0.0.1', 'topic': '$share/a/t/1'}, {'node': 'emqx@127.0.0.1', 'topic': '$share/b/t/1'}], 'meta': {'count': 4, 'limit': 100, 'page': 1, 'hasnext': False}}

四、EMQX 到 Tdengine 數據導入性能測試報告**

測試環境:

  • EMQX 版本:5.5.0 (單體)

  • Tdengine 版本:3.2.2.0 (單體)

  • 操作系統:centos7

  • CPU:4核

    image-20240311162549500

  • 內存:3G

測試步驟:

  1. 在 EMQX 中配置數據源或者編寫程序連接TDengine,確保實時數據可以被發送到指定的 Tdengine 數據庫中。
  2. 在 Tdengine 數據庫中創建相應的表結構,確保能夠正確接收和存儲實時數據。
  3. 啟動數據導入功能,讓 EMQX 將實時數據發送到 Tdengine 數據庫中。
  4. 使用監控工具監視系統的 CPU 利用率、內存使用情況以及數據導入速率。

測試一:EMQX直接存儲到TDengine

使用EMQX訂閱EMQX中的Topic,存入到TDengine中

場景一:單線程發布消息

  • 一個客戶端發送數據
  • 每10毫秒發送一次
  • 測試發送一萬條數據
  • 用時0:01:47.116768
測試結果
  • 數據導入TDengine速率:

    平均每秒處理84.5條數據。

    最大處理95.3條數據

    最小處理55.3條數據

    image-20240314132250870

  • EMQX CPU利用率、 內存使用情況:

    CPU利用率最高26.3%,最低13.2%

    內存使用平均1.66GB

    image-20240314132401998

  • Tdengine CPU利用率、 內存使用情況:

    CPU利用率最高7.67%,最低1.83%

    內存平均668MB

    image-20240314132510169

  • 機器信息:

    image-20240314132550134

場景二:多線程發布消息

  • 五個客戶端發送數據
  • 每10毫秒發送一次
  • 測試發送五萬條數據
  • 用時0:01:49.660773
測試結果
  • 數據導入TDengine速率:

    平均每秒處理 360條數據。

    最大處理460條數據

    最小處理68.5條數據

    image-20240314135636248

    image-20240314135649631

  • EMQX CPU利用率、 內存使用情況:

    cpu最高百分之四十,最低百分之十

    內存平均1.32G

    image-20240314135455099

  • Tdengine CPU利用率、 內存使用情況:

    cpu最高百分之十二

    內存平均334M

    image-20240314135508596

  • 機器信息:

    image-20240314135749600

測試二:Python訂閱消息存儲到TDengine

通過Python程序訂閱EMQX中的Topic,存入到TDengine中

場景一:單線程發布消息

  • 一個客戶端發送數據
  • 每10毫秒發送一次
  • 測試發送一萬條數據
  • 用時0:01:48.202740
測試結果:
  • 數據導入TDengine速率:

    平均每秒處理 81.8條數據。

    最大處理87.0條數據

    最小處理77.9條數據

    image-20240315141556759

  • Tdengine CPU利用率、 內存使用情況:

    cpu最高利用率2.17%,平均1.8%

    內存平均M

    image-20240315132225193

  • 機器信息:

    image-20240315141355156

場景二:多線程發布消息

  • 五個客戶端發送數據
  • 每50毫秒發送一次
  • 測試發送五萬條數據
  • 用時0:01:47.998246
測試結果
  • 數據導入TDengine速率:

    平均每秒處理76條數據。

    最大處理83.1條數據

    最小處理63.9條數據

    image-20240315142549985

  • Tdengine CPU利用率、 內存使用情況:

    cpu最高利用率1.3%,平均1.14%

    內存平均643M

    image-20240315142706079

  • 機器信息:

    image-20240315142836804

測試三:Java訂閱消息存儲到TDengine

場景一:單線程發布消息

  • 一個客戶端發送數據
  • 每10毫秒發送一次
  • 測試發送一萬條數據
  • 用時0:01:46.728102
測試結果
  • 數據導入TDengine速率:

    平均每秒處理 79.9條數據。

    最大處理95.5條數據

    最小處理41.8條數據

    image-20240315145341616

  • Tdengine CPU利用率、 內存使用情況:

    cpu最高利用率3.50%,平均2.14%

    內存平均646M

    image-20240315145406259

  • 機器信息:

    image-20240315145429519

場景二:多線程發布消息

  • 五個客戶端發送數據
  • 每10毫秒發送一次
  • 測試發送五萬條數據
  • 用時0:01:46.728102
測試結果
  • 數據導入TDengine速率:

    平均每秒處理 83.6條數據。

    最大處理95.3條數據

    最小處理48.9條數據

    image-20240315151949104

  • Tdengine CPU利用率、 內存使用情況:

    cpu最高利用率3.05%,平均1.86%

    內存平均657M

    image-20240315152002293

  • 機器信息:

    image-20240315152028787

測試四:Go訂閱消息存儲到TDengine

場景一:單線程發布消息

  • 一個客戶端發送數據
  • 每10毫秒發送一次
  • 測試發送一萬條數據
  • 用時0:01:48.395355
測試結果
  • 數據導入TDengine速率:

    平均每秒處理 82.6條數據。

    最大處理95.0條數據

    最小處理50.2條數據

    image-20240318132045758

  • Tdengine CPU利用率、 內存使用情況:

    cpu最高利用率12%,平均4.35%

    內存平均399M

    image-20240318132102394

  • 機器信息:

    image-20240318132135438

場景二:多線程發布消息

  • 五個客戶端發送數據
  • 每10毫秒發送一次
  • 測試發送五萬條數據
  • 用時0:01:46.728102
測試結果
  • 數據導入TDengine速率:

    平均每秒處理 237條數據。

    最大處理239條數據

    最小處理235條數據

    image-20240318134322446

  • Tdengine CPU利用率、 內存使用情況:

    cpu最高利用率3.43%,平均2.62%

    內存平均423M

    image-20240318134342581

  • 機器信息:

    image-20240318134402890

結果:

emqx直接連接TDenginepython訂閱消息存儲到TDengineJava訂閱消息存儲到TDenginego訂閱消息存儲到TDengine
單線程發布消息平均每秒84.5條,最大95.3條平均每秒81.8條,最大87條平均每秒79.9條,最大95.5條平均每秒86.2條,最大95.0條
多線程發布消息平均每秒 360條,最大460條平均每秒76條,最大83.1條平均每秒83.6條,最大95.3條平均每秒237條,最大239條

結論:

? 存儲到TDengine速率:emqx>go>java>python

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/90051.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/90051.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/90051.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

從分析到優化:Amazon Q CLI 助力 EKS 網絡調用鏈剖析與運維實踐

1. 引言 在 Amazon EKS&#xff08;Elastic Kubernetes Service&#xff09;環境中&#xff0c;理解從 ALB&#xff08;Application Load Balancer&#xff09;到 Pod 的完整網絡調用鏈對運維人員至關重要。本文將展示如何利用 Amazon Q CLI 這一 AI 助手工具&#xff0c;通過…

Class10簡潔實現

Class10簡潔實現 import torch from torch import nn from d2l import torch as d2l# 輸入為28*28&#xff0c;輸出為10類&#xff0c;第1、2隱藏層256神經元 num_inputs, num_outputs, num_hiddens1, num_hiddens2 784, 10, 256, 256 # 第1個隱藏層丟棄率為0.2&#xff0c;第…

【多線程篇22】:ConcurrentHashMap的并發安全原理剖析

文章目錄一、HashMap 的“不安全”&#xff1a;問題的根源1. 數據結構回顧 (JDK 1.8)2. 并發下的致命缺陷&#xff1a;put 操作二、ConcurrentHashMap 的安全之道 (JDK 1.8)1. 核心數據結構2. 安全的 put 操作&#xff1a;分場景精細化加鎖3. 安全的 size() 計算&#xff1a;并…

【Java + Vue 實現圖片上傳后 導出圖片及Excel 并壓縮為zip壓縮包】

系統環境&#xff1a; Java JDK&#xff1a;1.8.0_202 Node.js&#xff1a;v12.2.0 Npm&#xff1a;6.9.0 Java后端實現 Controller /*** xxxx-導出* param response 返回信息體* param files 上傳的圖片文件* param param1 參數1* param param2 參數2*/PostMapping("/ex…

安科瑞:能源微電網助力工業園區“綠色”發展

朱以真近日&#xff0c;廈門市工業和信息化局印發工業園區綠色智慧微電網建設&#xff0c;擬開展全市工業園區綠色智慧微電網試點通知&#xff0c;那么對于如何實現綠色園區的建設是今天的話題。對工業園區綠色智慧微電網建設需求&#xff0c;其核心價值體現在“源-網-荷-儲-充…

VUE2 學習筆記3 v-on、事件修飾符、鍵盤事件

事件處理v-on用于事件交互。語法&#xff1a;v-on:要綁定的事件“事件觸發時執行的函數” &#xff08;函數這里可以寫括號&#xff0c;也可以不寫&#xff0c;沒有影響&#xff09;簡寫&#xff1a;:事件觸發時要執行的函數&#xff0c;在Vue配置參數中&#xff0c;通過method…

變換域通訊系統CCSK的matlab仿真

CCSK&#xff08;Cyclic Code Shift Keying&#xff09;通信系統的MATLAB仿真。實現完整的CCSK調制、AWGN信道傳輸和解調過程&#xff0c;并計算了誤碼率&#xff08;BER&#xff09;。 % CCSK通信系統仿真 clear; clc; close all;% 參數設置 L 31; % m序列…

技術演進中的開發沉思-40 MFC系列:多線程協作

今天說說MFC的線程&#xff0c;當年用它實現中間件消息得心應手之時&#xff0c;可以實現一邊實時接收數據&#xff0c;一邊更新界面圖表圖文信息&#xff0c;順滑得讓人想吹聲口哨。 MFC 多線程它像給程序裝上了分身術&#xff0c;讓原本只能 “單任務跑腿” 的代碼&#xff0…

高速公路自動化安全監測主要內容

近年來&#xff0c;隨著社會經濟的快速發展&#xff0c;高速公路的通車里程不斷增加&#xff0c;交通流量日益增大。與此同時&#xff0c;高速公路交通事故數量也呈現出一定的增長趨勢。這些事故不僅造成了大量的人員傷亡和財產損失&#xff0c;還嚴重影響了社會的穩定和經濟的…

完美解決 Ubuntu 中自定義啟動器圖標重復的問題(以 MATLAB 為例)

如果你在 Ubuntu 上為 MATLAB、PyCharm、Android Studio 或其他第三方應用創建了自定義啟動器&#xff0c;并把它固定到了左側的 Dock 欄&#xff0c;你很可能遇到過這個令人抓狂的場景&#xff1a; 當你滿心歡喜地點擊固定的圖標啟動程序后&#xff0c;Dock 欄上竟然“憑空”冒…

一文讀懂深度模型優化器,掌握煉丹工具

深度模型優化器是訓練神經網絡的核心工具&#xff0c;其目標是高效地找到損失函數的最小值。從基礎的隨機梯度下降&#xff08;SGD&#xff09;到結合一階動量修正與兩階段更新的Ranger&#xff0c;優化器的發展始終圍繞著加速收斂、提升穩定性、適應參數差異和增強泛化能力四大…

關于虛擬機部署的k8s集群calico組件在開機恢復快照時的問題

在恢復虛擬機開機快照的時候總會出現各種的網絡問題&#xff0c;連接不到自己的443端口&#xff0c;集群狀態異常等等&#xff0c;繞一圈都發現不了問題&#xff0c;其實根本就沒問題。虛擬機環境再現&#xff0c;恢復快照后的k8s環境&#xff0c;nfs莫名宕掉提示此報錯&#x…

從感知到決策:虛擬仿真系統與視覺算法融合下的多路RTSP視頻接入技術探究

1?? 背景概述 隨著國防信息化和智能化進程的不斷加快&#xff0c;虛擬仿真系統 與 智能視覺算法平臺 正逐步成為現代化裝備研發、測試驗證與戰術訓練的重要技術支撐。相比傳統的靜態建模或離線推演&#xff0c;新一代作戰仿真與智能系統更強調 實時性、感知能力與動態交互&a…

Android app如果不適配Android 15會怎么樣

深度思考完成 用時49秒 首先&#xff0c;用戶的問題是&#xff1a;“Android app如果不適配Android 15會怎么樣&#xff1f;”我需要基于搜索結果來回答。 讓我回顧搜索結果&#xff1a; ?1&#xff1a;標題是“Android 15應用適配指南”&#xff0c;內容強調了不適配Androi…

Vue-21-利用Vue3的axios+Python的flask實現前后端交互功能

文章目錄 1 任務需求 2 前端 2.1 模板template 2.2 JS代碼script 2.3 Calculate.vue(子組件) 2.4 App.vue(根組件) 3 后端 3.1 導入模塊 3.2 創建應用實例 3.3 配置CORS 3.4 定義路由 3.5 處理請求 3.6 main.py 4 附錄 4.1 CORS 4.1.1 全局啟用CORS 4.1.2 限制允許的域名(更安…

動態規劃之最長回文子串

題目&#xff1a;最長回文子串 給你一個字符串 s&#xff0c;找到 s 中最長的 回文 子串。 示例 1&#xff1a; 輸入&#xff1a;s “babad” 輸出&#xff1a;“bab” 解釋&#xff1a;“aba” 同樣是符合題意的答案。 示例 2&#xff1a; 輸入&#xff1a;s “cbbd” 輸…

Linux 編程中的錯誤處理機制詳解 —— `errno` 全解析

文章目錄Linux 編程中的錯誤處理機制詳解 —— errno 全解析一、什么是 errno&#xff1f;?為什么需要 errno&#xff1f;? 它在哪里定義&#xff1f;二、errno 的設置與讀取規則?? errno 不是總是有效&#xff01;?使用 errno 的正確步驟&#xff1a;三、與 errno 配套使…

力扣-最長遞增子序列

簡單記錄學習~給你一個整數數組 nums &#xff0c;找到其中最長嚴格遞增子序列的長度。子序列 是由數組派生而來的序列&#xff0c;刪除&#xff08;或不刪除&#xff09;數組中的元素而不改變其余元素的順序。例如&#xff0c;[3,6,2,7] 是數組 [0,3,1,6,2,2,7] 的子序列。示例…

公司內部網址怎么在外網打開?如何讓外網訪問內網的網站呢?

很多公司內部本地會部署有中小型的服務器&#xff0c;可以很好的方便用于一些辦公業務系統&#xff0c;或測試開發需要。在數字化辦公和生活場景中&#xff0c;除了公司內部局域網內訪問公司系統外&#xff0c;經常會遇到需要讓外網訪問內網網站的情況。比如企業員工遠程辦公時…

有趣的css - 多選立體標簽按鈕

&#x1f36d; 大家好&#xff0c;我是 Just&#xff0c;這里是「設計師工作日常」&#xff0c;今天分享的是一個交互較完整的多選立體標簽按鈕。 最新文章通過公眾號「設計師工作日常」發布。 目錄整體效果核心代碼html 代碼css 部分代碼完整代碼如下html 頁面css 樣式頁面渲…