在物聯網系統中時序數據庫和關系型數據庫如何使用?

在物聯網系統中,時序數據庫(TSDB)和關系型數據庫(RDBMS)的存儲順序設計需要根據數據特性、業務需求和系統架構綜合考慮。以下是典型的設計方案和邏輯順序:


1. 常見存儲順序方案

方案一:先寫時序數據庫,后異步同步到關系型數據庫

適用場景:高頻傳感器數據為主,業務數據可容忍短暫延遲。
流程

  1. MQTT Broker 接收設備原始數據(如 devices/A/temperature)。
  2. 數據首先寫入時序數據庫(如InfluxDB):
    • 存儲原始時間序列數據(高吞吐、低延遲)。
  3. 異步處理層(如Kafka/Flink)消費數據,處理后寫入關系型數據庫:
    • 提取關鍵狀態(如最新溫度值)寫入MySQL的device_status表。
    • 關聯設備元數據(如設備所屬用戶)。

優點

  • 確保傳感器數據的寫入性能最大化。
  • 避免高頻寫入拖累關系型數據庫。

示例代碼(偽代碼):

# MQTT回調處理
def on_mqtt_message(topic, payload):# 1. 原始數據寫入InfluxDBinflux.write({"measurement": "sensor_data","tags": {"device_id": topic.split('/')[1]},"fields": {"temperature": payload.temp},"time": payload.timestamp})# 2. 異步推送至Kafka,后續處理kafka.produce("device_updates", key=device_id, value=payload)# Kafka消費者處理業務邏輯
def kafka_consumer():for message in kafka.consume():# 3. 關聯設備元數據并寫入MySQLdevice = mysql.query("SELECT * FROM devices WHERE id = ?", message.device_id)mysql.execute("UPDATE device_status SET last_temp = ?, updated_at = NOW() WHERE device_id = ?",message.temp, message.device_id)

示例代碼(以下是使用Java實現的等效代碼,包含MQTT回調處理、InfluxDB寫入和通過Kafka異步處理寫入MySQL的邏輯):

import org.eclipse.paho.client.mqttv3.*;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.sql.*;
import java.time.Instant;
import java.util.Properties;public class MqttDataProcessor {// InfluxDB 配置private final InfluxDBClient influxDBClient;// Kafka 生產者private final KafkaProducer<String, DeviceData> kafkaProducer;// MySQL 連接private final Connection mysqlConnection;public MqttDataProcessor(InfluxDBClient influxDBClient, KafkaProducer<String, DeviceData> kafkaProducer,Connection mysqlConnection) {this.influxDBClient = influxDBClient;this.kafkaProducer = kafkaProducer;this.mysqlConnection = mysqlConnection;}// MQTT回調處理public IMqttMessageListener createMqttListener() {return (topic, message) -> {try {// 解析payloadDeviceData data = parsePayload(topic, message.getPayload());// 1. 原始數據寫入InfluxDBwriteToInfluxDB(data);// 2. 異步推送至KafkasendToKafka(data);} catch (Exception e) {e.printStackTrace();}};}private DeviceData parsePayload(String topic, byte[] payload) {// 這里應該是你的實際payload解析邏輯String deviceId = topic.split("/")[1];// 示例: 假設payload是JSON格式 {"temp": 25.5, "timestamp": 123456789}String json = new String(payload);// 實際項目中可以使用Gson/Jackson等庫double temp = Double.parseDouble(json.split("\"temp\":")[1].split(",")[0]);long timestamp = Long.parseLong(json.split("\"timestamp\":")[1].split("}")[0]);return new DeviceData(deviceId, temp, Instant.ofEpochSecond(timestamp));}private void writeToInfluxDB(DeviceData data) {try (WriteApi writeApi = influxDBClient.getWriteApi()) {Point point = Point.measurement("sensor_data").addTag("device_id", data.getDeviceId()).addField("temperature", data.getTemp()).time(data.getTimestamp(), WritePrecision.S);writeApi.writePoint(point);}}private void sendToKafka(DeviceData data) {ProducerRecord<String, DeviceData> record = new ProducerRecord<>("device_updates", data.getDeviceId(), data);kafkaProducer.send(record, (metadata, exception) -> {if (exception != null) {exception.printStackTrace();}});}// Kafka消費者處理業務邏輯public void startKafkaConsumer() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "device-data-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.your.package.DeviceDataDeserializer"); // 需要自定義try (KafkaConsumer<String, DeviceData> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(List.of("device_updates"));while (true) {ConsumerRecords<String, DeviceData> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, DeviceData> record : records) {// 3. 關聯設備元數據并寫入MySQLupdateMySQL(record.value());}}}}private void updateMySQL(DeviceData data) {String query = "SELECT * FROM devices WHERE id = ?";String update = "UPDATE device_status SET last_temp = ?, updated_at = NOW() WHERE device_id = ?";try (PreparedStatement selectStmt = mysqlConnection.prepareStatement(query);PreparedStatement updateStmt = mysqlConnection.prepareStatement(update)) {// 查詢設備元數據selectStmt.setString(1, data.getDeviceId());ResultSet rs = selectStmt.executeQuery();if (rs.next()) {// 更新設備狀態updateStmt.setDouble(1, data.getTemp());updateStmt.setString(2, data.getDeviceId());updateStmt.executeUpdate();}} catch (SQLException e) {e.printStackTrace();}}// 設備數據DTOpublic static class DeviceData {private String deviceId;private double temp;private Instant timestamp;// 構造器、getter和setterpublic DeviceData(String deviceId, double temp, Instant timestamp) {this.deviceId = deviceId;this.temp = temp;this.timestamp = timestamp;}// 省略getter和setter...}
}// 使用示例
public class Main {public static void main(String[] args) throws Exception {// 初始化InfluxDB客戶端InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", "token".toCharArray(),"org", "bucket");// 初始化Kafka生產者Properties kafkaProps = new Properties();kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.your.package.DeviceDataSerializer"); // 需要自定義KafkaProducer<String, DeviceData> kafkaProducer = new KafkaProducer<>(kafkaProps);// 初始化MySQL連接Connection mysqlConn = DriverManager.getConnection("jdbc:mysql://localhost:3306/iot_db", "user", "password");// 創建處理器MqttDataProcessor processor = new MqttDataProcessor(influxDBClient, kafkaProducer, mysqlConn);// 啟動Kafka消費者線程new Thread(processor::startKafkaConsumer).start();// 配置MQTT客戶端MqttClient mqttClient = new MqttClient("tcp://broker.example.com:1883", "java-client");mqttClient.connect();// 訂閱主題并設置回調mqttClient.subscribe("devices/+/data", 0, processor.createMqttListener());}
}

注意事項:
依賴庫:需要添加以下依賴:

MQTT: org.eclipse.paho.client.mqttv3

InfluxDB: com.influxdb.influxdb-client-java

Kafka: org.apache.kafka.kafka-clients

MySQL: mysql.mysql-connector-java

序列化:需要為Kafka實現DeviceData的序列化器和反序列化器。

錯誤處理:實際項目中需要更完善的錯誤處理和重試機制。

資源管理:確保正確關閉所有連接和資源。

線程安全:如果高并發場景,需要考慮線程安全問題。


方案二:雙寫(時序庫+關系庫)

適用場景:數據一致性要求高,且寫入壓力可控。
流程

  1. MQTT消息同時寫入時序數據庫和關系型數據庫(需事務或最終一致性保證)。
  2. 關系型數據庫僅存儲關鍵狀態快照(如設備最新狀態),而非全部原始數據。

優點

  • 數據實時一致,適合關鍵業務狀態(如設備告警閾值)。

挑戰

  • 需處理寫入沖突(如使用分布式事務或補償機制)。

方案三:關系型數據庫為主,定期歸檔到時序庫

適用場景:歷史數據分析需求明確,但實時查詢以業務數據為主。
流程

  1. 數據先寫入MySQL的device_realtime表。
  2. 定時任務將過期數據批量遷移至InfluxDB,MySQL中僅保留近期數據。

優點

  • 簡化實時業務查詢(所有數據在MySQL中)。
  • 降低MySQL存儲壓力。

2. 存儲順序設計原則

(1)根據數據特性分層
數據層級存儲目標數據庫選擇示例
原始時序數據高頻寫入、長期存儲時序數據庫每秒溫度讀數
狀態快照最新狀態查詢關系型數據庫設備當前溫度、在線狀態
業務元數據關聯查詢、事務操作關系型數據庫設備所屬用戶、地理位置
(2)寫入路徑優化
  • 高頻數據路徑:MQTT → 時序數據庫 → (可選)異步聚合后寫入關系庫。
  • 低頻元數據路徑:業務系統直接CRUD操作關系型數據庫。
(3)一致性保證
  • 最終一致性:通過消息隊列(如Kafka)解耦,確保數據最終同步。
  • 強一致性:使用分布式事務(如XA協議),但性能較低。

3. 典型物聯網架構示例

在這里插入圖片描述

關鍵點

  1. 實時性要求高的數據(如傳感器讀數)直連時序數據庫。
  2. 需要業務關聯的數據(如“設備所屬用戶”)通過流處理關聯后寫入MySQL。
  3. 歷史數據分析直接從時序數據庫查詢。

4. 選擇建議

  • 優先時序數據庫:若90%以上的查詢是基于時間范圍的聚合(如“過去24小時溫度趨勢”)。
  • 優先關系型數據庫:若需頻繁JOIN查詢(如“查詢設備A的所有者手機號”)。
  • 混合使用:大多數生產環境會同時使用兩者,通過寫入順序設計平衡性能與功能需求。

通過合理設計存儲順序,可以同時滿足物聯網場景的高性能寫入復雜業務查詢需求。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/89244.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/89244.shtml
英文地址,請注明出處:http://en.pswp.cn/web/89244.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

django安裝、跨域、緩存、令牌、路由、中間件等配置

注意&#xff1a;如果是使用 PyCharm 編程工具就不用創建虛擬化&#xff0c;直接打開 PyCharm 選擇新建的目錄直接調過下面的步驟11. 項目初始化如果不是用 PyCharm 編輯器就需要手動創建虛擬環境在項目目錄cmd&#xff0c;自定義名稱的虛擬環境# 激活虛擬環境 python -m venv …

時間的弧線,邏輯的航道——標準單元延遲(cell delay)的根與源

時序弧 在這篇文章中&#xff0c;我們將討論影響標準單元延遲的因素。在開始討論之前&#xff0c;我們需要先了解一下什么是時序弧 (Timing Arcs)&#xff1a; 時序弧 (Timing Arcs)&#xff1a; 時序弧代表了信號從一個輸入流向一個輸出的方向。它存在于組合邏輯和時序邏輯中&…

《透視定軸:CSS 3D魔方中視覺層級的秩序法則》

當CSS的代碼編織出一個能自由旋轉的3D魔方&#xff0c;六個色彩各異的面在空間中翻轉、重疊時&#xff0c;最考驗技術的并非旋轉動畫的流暢度&#xff0c;而是每個面在任意角度下都能保持符合現實邏輯的前后關系。為何有時某個面會突兀地“穿透”另一個面&#xff1f;為何旋轉到…

RTL編程中常用的幾種語言對比

以下是RTL&#xff08;寄存器傳輸級&#xff09;編程中常用的幾種硬件描述語言&#xff08;HDL&#xff09;及其核心差異的對比分析。RTL編程主要用于數字電路設計&#xff0c;通過描述寄存器間的數據傳輸和邏輯操作實現硬件功能。以下內容綜合了行業主流語言的技術特性與應用場…

前端面試題(HTML、CSS、JavaScript)

目錄 一、HTML src與href區別 對html語義化理解 語義化標簽有哪些&#xff1f; script中的defer與async區別 行內元素與塊級元素有哪些&#xff1f; canvas與svg區別 SEO優化 html5新特性 二、CSS 盒模型 選擇器優先級 偽元素與偽類 隱藏元素幾種方式 水平/垂直…

Linux-線程控制

線程等待pthread_join()pthread_join 是 Linux 系統中用于線程同步的重要函數&#xff0c;主要作用是等待指定線程結束并回收其資源。基本功能- 阻塞當前調用線程&#xff0c;直到目標線程執行結束。 - 回收目標線程的資源&#xff0c;避免產生“僵尸線程”。 - 可選地獲取目標…

RAG優化秘籍:基于Tablestore的知識庫答疑系統架構設計

目錄一、技術架構設計二、雙流程圖解析橫向架構對比縱向核心流程三、企業級代碼實現Python檢索核心TypeScript前端接入YAML部署配置四、性能對比驗證五、生產級部署方案六、技術前瞻分析附錄&#xff1a;完整技術圖譜一、技術架構設計 原創架構圖 #mermaid-svg-3Ktoc4oH4xlbD6…

i.mx8 RTC問題

項目場景&#xff1a;需要增加外置RTC&#xff0c;保證時間的精準。問題描述&#xff1a;基本情況&#xff0c;外置i2c接口的RTC&#xff0c;注冊、讀寫都正常&#xff0c;但是偶發性重啟后&#xff0c;系統時間是2022&#xff0c;rtc時間是1970&#xff0c;都像是恢復了默認時…

數據集相關類代碼回顧理解 | utils.make_grid\list comprehension\np.transpose

目錄 utils.make_grid list comprehension np.transpose utils.make_grid x_gridutils.make_grid(x_grid, nrow4, padding2) make_grid 函數來自torchvision的utils模塊&#xff0c;用于圖像數據可視化&#xff0c;將一批圖像排列成一個網格。 x_grid&#xff1a;四維圖像…

C#中Static關鍵字解析

本文僅作為參考大佬們文章的總結。 Static關鍵字是C#語言中一個基礎而強大的特性&#xff0c;它能夠改變類成員的行為方式和生命周期。本文系統性總結static關鍵字的各類用法、核心特性、適用場景以及需要注意的問題&#xff0c;以幫助掌握這一重要概念。 一、Static關鍵字概…

通用綜合文字識別聯動 MES 系統:OCR 是數據流通的核心

制造業的 MES 系統需實時整合生產數據以調控流程&#xff0c;但車間的工單、物料標簽、質檢報告等多為紙質或圖片形式&#xff0c;傳統人工錄入不僅滯后&#xff0c;還易出錯&#xff0c;導致 MES 系統數據斷層。通用綜合文字識別借助 OCR 技術&#xff0c;成為連接這些信息與 …

【Linux 學習指南】網絡編程基礎:從 IP、端口到 Socket 與 TCP/UDP 協議詳解

文章目錄&#x1f4dd;理解源IP地址和目的IP地址&#x1f320; 認識端口號&#x1f309;端口號范圍劃分&#x1f309;理解"端口號"和"進程ID"&#x1f309;理解源端口號和目的端口號&#x1f309;理解socket&#x1f320;傳輸層的典型代表&#x1f309;認識…

React+Next.js+Tailwind CSS 電商 SEO 優化

一、項目背景與技術選型?1. 原始痛點?項目最初基于純 React 開發&#xff08;SPA 架構&#xff09;&#xff0c;存在三個致命問題&#xff1a;?搜索引擎爬蟲無法有效抓取動態渲染的商品詳情、分類頁內容&#xff1b;?單頁面應用 難以實現頁面級的 meta 定制&#xff0c;關鍵…

Process Lasso:提升電腦性能的得力助手

在日常使用電腦的過程中&#xff0c;我們常常會遇到這樣的問題&#xff1a;電腦運行緩慢、程序響應遲緩、多任務處理時卡頓不斷。這些問題不僅影響工作效率&#xff0c;還讓人感到非常煩躁。其實&#xff0c;這些問題很多時候是因為電腦的進程管理不夠優化。而Process Lasso正是…

AI驅動的大前端內容創作與個性化推送:資訊類應用實戰指南

在信息爆炸的時代&#xff0c;資訊類應用面臨兩大核心挑戰&#xff1a;一是如何高效生產海量優質內容&#xff0c;二是如何讓用戶從海量信息中快速獲取感興趣的內容。AI技術的介入正在重構資訊類應用的開發模式&#xff0c;從內容生產到用戶觸達形成全鏈路智能化。本文將從開發…

2025/7/16——java學習總結

Java IO 流全體系總結&#xff1a;從基礎到實戰的完整突破&#xff08;重寫&#xff09;一、基礎核心&#xff1a;字節流與字符流的底層邏輯&#xff08;一&#xff09;字節流&#xff1a;二進制數據的讀寫基礎操作字節輸入流&#xff1a;掌握 FileInputStream 單字節讀取細節&…

書籍自然數數組的排序(8)0715

題目給定一個長度為N的整型數組arr&#xff0c;其中有N個互不相等的自然數1~N&#xff0c;請實現arr的排序&#xff0c;但是不要把下標0~N-1位置上的數通過直接賦值的方式替換成1~N。解答 arr在調整之后應該事下標從0到N-1的位置上依次放著1~N&#xff0c;即arr[index] index …

【08】MFC入門到精通——MFC模態對話框 和 非模態對話框 解析 及 實例演示

文章目錄八、模態對話框 和 非模態對話框 創建及顯示8.1 對話框是怎樣彈出的8.2 模態對話框的創建及顯示8.3 非模態對話框的創建及顯示8.4 完整代碼下載八、模態對話框 和 非模態對話框 創建及顯示 Windows對話框分為兩類&#xff1a;模態對話框 和 非模態對話框。 模態對話框…

github上傳大文件(多種解決方案)

之前一直用vscode的上傳項目方法&#xff0c;這個方便之處在于不用打開git終端輸入各種命令&#xff0c;不過麻煩的是我一直無法拉取github上的遠程倉庫提交&#xff0c;每次只能更新已有的倉庫并且上傳的文件還不能太大&#xff0c;應該是不能超過100MB&#xff0c;而且直接在…

生活污水深度除磷的方法

生活污水中磷含量過多的危害大家都知道總磷是水質檢測的重要指標之一&#xff0c;在污水處理中生活污水往往都會出現總磷超標的現象。生活污水磷超標的危害是多方面的主要包括水體富營養化、危害水生生物、影響人類健康&#xff0c;以及可能引發藍藻水華等問題。除磷方法污水的…