一、 引言:為什么選擇InfluxDB 3?
- 項目背景:
- 在我們的隧道風機監控系統中,實時數據的采集、存儲和高效查詢是至關重要的核心需求。風機運行產生的振動、傾角、電流、溫度等參數是典型的時序數據,具有高并發寫入、數據量持續增長以及對近期數據查詢性能要求高的特點。傳統關系型數據庫在應對這類場景時,往往面臨性能瓶頸和復雜的查詢優化問題。
- 時序數據庫專門設計用于處理時間序列數據,這類數據通常以極高的頻率生成,例如傳感器數據、日志數據或金融交易數據。
- InfluxDB 3的核心特性吸引點:
InfluxDB 3 Core 在架構和功能上進行了多項改進,與 InfluxDB 1.x/2.x 相比,具有更高的性能、更好的擴展性和更廣泛的數據處理能力。舊版本在處理大規模數據時可能會遇到性能瓶頸,而 3 Core 通過優化存儲引擎和查詢性能,顯著提升了處理能力。此外,3 Core 對 SQL 的支持和 Arrow Flight 的集成,使得它在數據查詢和系統集成方面更具優勢。
InfluxDB 3 Core官方文檔:https://docs.influxdb.org.cn/influxdb3/core/
二、 環境準備與InfluxDB 3 Core的安裝與啟動 (Windows 11)
- 下載與解壓:
- 在官方文檔中獲取InfluxDB 3 Core 壓縮包,并解壓到目錄。
- 我的解壓目錄位于
G:\瀏覽器下載\influxdb3-core-3.0.1-windows_amd64\
- 啟動InfluxDB 3 Core服務:
使用 influxdb3 serve 命令來啟動服務器,并可以通過參數指定對象存儲類型和數據目錄。
我的啟動命令如下:
.\influxdb3.exe serve --object-store file --node-id mywindowsnode --data-dir "G:\瀏覽器下載\influxdb3-core-3.0.1-windows_amd64\data"
參數解釋:
- object-store file
: 指定使用本地文件系統作為對象存儲。這是默認選項,但明確指出總是個好習慣。
- node-id <你的節點ID>
: 為你的InfluxDB節點指定一個唯一的ID,例如 mynode 或 win11node。
- data-dir "<你的數據目錄>"
: 指定數據存儲的目錄。
- 創建管理員令牌 (Admin Token):
在你啟動InfluxDB 3服務器之后,你就可以創建管理員令牌了。管理員令牌擁有對InfluxDB 3實例所有操作的最高權限。
好的,我們來逐一解答你的問題。
1. InfluxDB 3的數據庫與MySQL數據庫的對比
是的,在概念層面,InfluxDB 3中的“數據庫”與MySQL中的“數據庫”是類似的。它們都是一個邏輯上的容器,用來組織和隔離數據。
相似之處:
- 容器:兩者都充當最高級別的數據組織單元。在一個MySQL服務器上可以有多個數據庫,同樣,在一個InfluxDB 3實例中也可以創建多個數據庫。
- 隔離:不同的數據庫通常用于存放不同項目、應用或數據集的數據,實現數據隔離。
- 包含表(Tables):MySQL數據庫包含多個表(Table),InfluxDB 3的數據庫也包含多個表(Table)。
不同之處(關鍵區別):
- 數據模型和用途:
- MySQL:是關系型數據庫(RDBMS),數據以結構化的行和列存儲在表中,非常適合事務處理、復雜關系查詢。
- InfluxDB 3:是時間序列數據庫(TSDB),專門為處理帶有時間戳的數據(如監控指標、傳感器數據、事件日志)而設計和優化。它的核心是時間。
- 表(Table)的概念:
- MySQL:表有預定義的列和數據類型。
- InfluxDB 3:文檔中提到 “A
table
is equivalent to ameasurement
”。measurement
是InfluxDB早期版本的概念。在InfluxDB 3中,一個表通常對應一種類型的時間序列數據(例如,cpu
表存放CPU指標,temperature
表存放溫度讀數)。
- 核心組件:
- InfluxDB 3的表:包含
tags
(標簽,用于索引和快速過濾的元數據,通常是字符串鍵值對)、fields
(字段,實際的度量值,可以是整數、浮點數、布爾值、字符串) 和一個特殊的time
列 (納秒級精度的時間戳)。 - MySQL的表:由列組成,每列有其數據類型,通過主鍵、外鍵等維護關系。
- InfluxDB 3的表:包含
- Schema(模式):
- InfluxDB 3:是 “schema-on-write”(寫入時定義模式)。當你第一次寫入數據到一個新表時,InfluxDB會根據寫入的數據動態創建表的模式(特別是
tag
列的集合和順序,這些一旦創建就不可更改)。之后可以動態添加新的field
列,但不能再為該表添加新的tag
列。 - MySQL:通常是 “schema-on-read” 或更準確地說是預定義模式,你需要在寫入數據前明確定義表的結構(列名、數據類型、約束等)。
- InfluxDB 3:是 “schema-on-write”(寫入時定義模式)。當你第一次寫入數據到一個新表時,InfluxDB會根據寫入的數據動態創建表的模式(特別是
總結:你可以把InfluxDB 3的數據庫理解為一個專門存放時間序列數據的大容器。在這個容器里,你會按數據類型(比如CPU使用率、溫度、訂單量)創建不同的“表”(在InfluxDB的語境下也叫measurement)。
2. 如何創建管理員令牌 (Admin Token)
文檔中提到,在你啟動InfluxDB 3服務器之后,你就可以創建管理員令牌了。管理員令牌擁有對InfluxDB 3實例所有操作的最高權限。
步驟如下:
-
確保InfluxDB 3服務器正在運行:
你在上一步已經啟動了服務器,它應該在第一個命令提示符/PowerShell窗口中運行。 -
打開一個新的命令提示符 (CMD) 或 PowerShell 窗口:
- 不要關閉正在運行服務器的那個窗口。你需要一個新的窗口來執行
influxdb3
的客戶端命令。
- 不要關閉正在運行服務器的那個窗口。你需要一個新的窗口來執行
-
導航到InfluxDB 3的目錄(如果
influxdb3.exe
不在系統PATH中):
在新打開的窗口中,輸入:cd G:\瀏覽器下載\influxdb3-core-3.0.1-windows_amd64
-
執行創建管理員令牌的命令:
根據文檔,使用influxdb3 create token --admin
子命令。你可能還需要指定InfluxDB服務器的地址(如果不是默認的http://localhost:8181
)。.\influxdb3.exe create token --admin --host http://localhost:8181
或者,如果
influxdb3.exe
已經在你的系統路徑 (PATH) 中,或者你就在其目錄下,可以簡化為:influxdb3 create token --admin --host http://localhost:8181
--admin
:表示你正在創建一個管理員級別的令牌。--host http://localhost:8181
:指定了InfluxDB服務器正在監聽的地址和端口。如果你的服務器運行在其他地址或端口,請相應修改。
-
保存令牌:
執行命令后,它會在窗口中直接輸出一個很長的字符串,這就是你的管理員令牌。
非常重要:文檔中強調 “InfluxDB lets you view the token string only when you create the token. Store your token in a secure location, as you cannot retrieve it from the database later.”
這意味著:- 立即復制這個令牌字符串。
- 將它保存在一個安全的地方 (比如密碼管理器或一個受保護的文本文件中)。
- 一旦你關閉了這個窗口或者執行了其他命令,你就無法再次從InfluxDB中找回這個確切的令牌字符串了。InfluxDB內部只存儲令牌的哈希值。
-
(可選但推薦)設置環境變量:
為了方便以后使用influxdb3
CLI而不需要每次都輸入--token
參數,你可以將這個令牌設置到一個環境變量中。文檔推薦的環境變量名是INFLUXDB3_AUTH_TOKEN
。
在新的命令提示符窗口中設置(僅對當前窗口有效):set INFLUXDB3_AUTH_TOKEN=你的令牌字符串粘貼在這里
或者在PowerShell中(僅對當前窗口有效):
$env:INFLUXDB3_AUTH_TOKEN="你的令牌字符串粘貼在這里"
如果你想讓這個環境變量在系統重啟后依然有效,你需要通過系統屬性來設置它(搜索“編輯系統環境變量”)。
現在你就有了一個管理員令牌,可以用它來進行后續的數據庫創建、數據寫入和查詢等操作了。
三、 Java項目集成:Spring Boot (RuoYi) 與 InfluxDB 3的連接
- 選擇合適的Java客戶端庫:
對于InfluxDB 3.x,官方推薦使用influxdb3-java
這個新的、輕量級的、社區維護的客戶端庫。 - Maven依賴配置:
在pom.xml
中添加influxdb3-java
(最新穩定版) 的依賴。
<dependency><groupId>com.influxdb</groupId><artifactId>influxdb3-java</artifactId><version>1.0.0</version> <!-- 請使用最新的穩定版本 --></dependency>
- 關鍵JVM參數配置:
由于 influxdb3-java 底層使用了Apache Arrow Flight,它可能需要訪問一些通常被模塊系統封裝的JDK內部API。你需要在啟動你的Spring Boot應用程序時,添加JVM參數。
在 IntelliJ IDEA 中添加 JVM 參數:
--add-opens=java.base/java.nio=ALL-UNNAMED
在命令行中運行 JAR 包時:
java --add-opens=java.base/java.nio=ALL-UNNAMED -jar your-application.jar
- Spring Boot配置文件 (
application.yml
或.properties
):- 配置InfluxDB 3的連接信息 (
host
,token
,database
)。
- 配置InfluxDB 3的連接信息 (
influxdb:client3:host: http://localhost:8181token: apiv3_60Fa_JuZFuH5563Lfu6Ag63yml9KJnG-GDDa2dCQllf9JTQXebf76OiNIZOHaPnosx78hB61Q_e1ziO26F585gdatabase: tunnel_fengji # 你的InfluxDB 3數據庫名
- 創建InfluxDB服務層 (
IInfluxDBService
接口和InfluxDBService
實現類):InfluxDBService
實現:@PostConstruct
初始化InfluxDBClient.getInstance()
。@PreDestroy
關閉客戶端client.close()
。- 數據庫自動創建機制:解釋InfluxDB 3在首次寫入時會自動創建數據庫和表。
package com.ruoyi.oil.service.impl;import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.PointValues;
import com.influxdb.v3.client.query.QueryOptions;
import com.ruoyi.oil.service.IInfluxDBService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import com.fasterxml.jackson.databind.ObjectMapper; // 用于JSON解析
import com.ruoyi.oil.domain.DeviceDataMessage; // 導入你創建的POJO@Service
public class InfluxDBService implements IInfluxDBService {private static final Logger logger = LoggerFactory.getLogger(InfluxDBService.class);private final ObjectMapper objectMapper = new ObjectMapper(); // 用于將JSON字符串轉換為對象@Value("${influxdb.client3.host}")private String host;@Value("${influxdb.client3.token}")private String token;@Value("${influxdb.client3.database}")private String database;private InfluxDBClient client;@PostConstructpublic void init() {logger.info("Initializing InfluxDB 3 native client for host: {}, database: {}", host, database);try {this.client = InfluxDBClient.getInstance(host, token.toCharArray(), database);logger.info("InfluxDB 3 native client initialized successfully.");} catch (Exception e) {logger.error("Failed to initialize InfluxDB 3 native client", e);}}@PreDestroypublic void close() {if (this.client != null) {try {this.client.close();logger.info("InfluxDB 3 native client closed.");} catch (Exception e) {logger.error("Error closing InfluxDB 3 native client", e);}}}/*** 處理并寫入從硬件設備接收到的JSON消息。** @param jsonMessage 收到的JSON字符串* @return 如果處理和寫入至少一個點成功則返回true,否則false*/public boolean processAndWriteDeviceData(String jsonMessage) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot process message.");return false;}try {DeviceDataMessage message = objectMapper.readValue(jsonMessage, DeviceDataMessage.class);logger.info("Parsed device data message: {}", message.getDeviceName());if (message.getItems() == null) {logger.warn("No items found in the message for device: {}", message.getDeviceName());return false;}List<Point> pointsToWrite = new ArrayList<>();String measurement = "fan_data"; // 你可以根據 deviceType 或其他邏輯動態設置// 通用標簽,適用于該消息中的所有數據點Map<String, String> commonTags = new HashMap<>();commonTags.put("iotId", message.getIotId());commonTags.put("productKey", message.getProductKey());commonTags.put("deviceName", message.getDeviceName());if (message.getDeviceType() != null) {commonTags.put("deviceType", message.getDeviceType());}DeviceDataMessage.Items items = message.getItems();// 處理每個itemif (items.getLightCurrent() != null && items.getLightCurrent().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"LightCurrent", items.getLightCurrent()));}if (items.getAx() != null && items.getAx().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"ax", items.getAx()));}if (items.getRoll() != null && items.getRoll().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"roll", items.getRoll()));}if (items.getAy() != null && items.getAy().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"ay", items.getAy()));}if (items.getTemperature() != null && items.getTemperature().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"temperature", items.getTemperature()));}if (items.getAz() != null && items.getAz().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"az", items.getAz()));}if (items.getPitch() != null && items.getPitch().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"pitch", items.getPitch()));}if (items.getYaw() != null && items.getYaw().getValue() != null) {pointsToWrite.add(createPointFromItemData(measurement, commonTags,"yaw", items.getYaw()));}if (pointsToWrite.isEmpty()) {logger.warn("No valid data points to write from message for device: {}", message.getDeviceName());return false;}return writePoints(pointsToWrite);} catch (Exception e) {logger.error("Error processing and writing device data: {}", e.getMessage(), e);return false;}}/*** 輔助方法,從ItemData創建InfluxDB Point。*/private Point createPointFromItemData(String measurement, Map<String, String> commonTags,String fieldName, DeviceDataMessage.ItemData itemData) {Point point = Point.measurement(measurement).setTimestamp(Instant.ofEpochMilli(itemData.getTime())); // 從毫秒時間戳創建InstantcommonTags.forEach(point::setTag);point.setField(fieldName, itemData.getValue()); // ItemData中的value是Doublereturn point;}@Overridepublic boolean writePoint(String measurement, Map<String, String> tags, Map<String, Object> fields, Instant timestamp) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot write point.");return false;}try {Point point = Point.measurement(measurement);if (timestamp != null) {point.setTimestamp(timestamp);} else {point.setTimestamp(Instant.now());}if (tags != null) {tags.forEach(point::setTag);}if (fields != null) {fields.forEach((key, value) -> {if (value instanceof Long) point.setField(key, (Long) value);else if (value instanceof Double) point.setField(key, (Double) value);else if (value instanceof Boolean) point.setField(key, (Boolean) value);else if (value instanceof String) point.setField(key, (String) value);else if (value instanceof Integer) point.setField(key, ((Integer)value).longValue());else if (value instanceof Float) point.setField(key, ((Float)value).doubleValue());else {logger.warn("Unsupported field type for key '{}': {}. Converting to string.", key, value.getClass().getName());point.setField(key, value.toString());}});}client.writePoint(point); // 默認寫入到客戶端初始化時指定的databaselogger.debug("Successfully wrote point using influxdb3-java: {}", point.toLineProtocol());return true;} catch (Exception e) {logger.error("Error writing point with influxdb3-java: {}", e.getMessage(), e);return false;}}@Overridepublic boolean writePoints(List<Point> points) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot write points.");return false;}if (points == null || points.isEmpty()) {logger.warn("Point list is empty or null. Nothing to write.");return true;}try {client.writePoints(points); // 默認寫入到客戶端初始化時指定的databaselogger.debug("Successfully wrote {} points using influxdb3-java.", points.size());return true;} catch (Exception e) {logger.error("Error writing points with influxdb3-java: {}", e.getMessage(), e);return false;}}@Overridepublic boolean writeRecord(String lineProtocol) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot write record.");return false;}try {client.writeRecord(lineProtocol); // 默認寫入到客戶端初始化時指定的databaselogger.debug("Successfully wrote line protocol record using influxdb3-java.");return true;} catch (Exception e) {logger.error("Error writing line protocol record with influxdb3-java: {}", e.getMessage(), e);return false;}}@Overridepublic Stream<Object[]> queryRaw(String sqlQuery) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot query.");return Stream.empty();}logger.debug("Executing SQL query (raw Object[]): {}", sqlQuery);try {return client.query(sqlQuery);} catch (Exception e) {logger.error("Error executing SQL query (raw Object[]): {}", e.getMessage(), e);return Stream.empty();}}@Overridepublic Stream<Object[]> queryRaw(String sqlQuery, Map<String, Object> params) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot query.");return Stream.empty();}logger.debug("Executing parametrized SQL query (raw Object[]): {} with params: {}", sqlQuery, params);try {return client.query(sqlQuery, params);} catch (Exception e) {logger.error("Error executing parametrized SQL query (raw Object[]): {}", e.getMessage(), e);return Stream.empty();}}@Overridepublic Stream<Object[]> queryRawWithInfluxQL(String influxQLQuery) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot query.");return Stream.empty();}logger.debug("Executing InfluxQL query (raw Object[]): {}", influxQLQuery);try {return client.query(influxQLQuery, QueryOptions.INFLUX_QL);} catch (Exception e) {logger.error("Error executing InfluxQL query (raw Object[]): {}", e.getMessage(), e);return Stream.empty();}}@Overridepublic Stream<PointValues> queryPoints(String sqlQuery) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot query points.");return Stream.empty();}logger.debug("Executing SQL query for PointValues: {}", sqlQuery);try {return client.queryPoints(sqlQuery);} catch (Exception e) {logger.error("Error executing SQL query for PointValues: {}", e.getMessage(), e);return Stream.empty();}}@Overridepublic Stream<PointValues> queryPoints(String sqlQuery, Map<String, Object> params) {if (client == null) {logger.error("InfluxDB 3 client is not initialized. Cannot query points with params.");return Stream.empty();}logger.warn("Executing parametrized SQL query for PointValues. " +"The influxdb3-java client.queryPoints API in README (1.0.0) " +"does not show direct Map-based parameterization for PointValues stream. " +"This method might require constructing SQL with parameters manually if not supported by API. " +"Falling back to non-parametrized queryPoints for now if params are present and non-empty, or use queryRaw.");// 根據 README, client.query(sql, params) 返回 Stream<Object[]>// client.queryPoints(sql) 返回 Stream<PointValues> 但沒有 params map// 如果確實需要參數化并得到 PointValues,需要檢查庫是否有其他方法,或手動處理if (params != null && !params.isEmpty()) {logger.error("Parameter-map based queryPoints is not directly supported by this example based on README. " +"Use queryRaw(sql, params) and process Object[] or construct SQL string with parameters manually for queryPoints.");// 或者可以嘗試動態構建SQL字符串,但要注意SQL注入風險// String finalSql = replaceQueryParameters(sqlQuery, params); // 你需要實現這個方法// return client.queryPoints(finalSql);return Stream.empty(); // 或者拋出 UnsupportedOperationException}try {return client.queryPoints(sqlQuery);} catch (Exception e) {logger.error("Error executing SQL query for PointValues (fallback non-parametrized): {}", e.getMessage(), e);return Stream.empty();}}// 示例方法 (與之前一致,只是調用的方法現在是基于influxdb3-java的)public void writeFanMetricsExample() {String measurement = "fan_sensor_data";Map<String, String> tags = new HashMap<>();tags.put("tunnel_id", "T002");tags.put("fan_id", "Fan_D01");Map<String, Object> fields = new HashMap<>();fields.put("vibration_x", 0.33);fields.put("temperature_celsius", 31.5);fields.put("active_power", 2.5);writePoint(measurement, tags, fields, Instant.now());}public void queryFanMetricsExample() {String sql = "SELECT time, tunnel_id, fan_id, vibration_x, temperature_celsius, active_power " +"FROM fan_sensor_data WHERE fan_id = 'Fan_D01' AND time >= now() - interval '1 hour' " +"ORDER BY time DESC LIMIT 3";logger.info("Querying with SQL for PointValues stream (Recommended for typed access):");try (Stream<PointValues> stream = queryPoints(sql)) { // 使用實現了的 queryPointsstream.forEach((PointValues p) -> {// 根據你的SELECT語句,你知道這些字段和標簽是存在的System.out.printf("| Time: %-30s | Tunnel: %-8s | Fan: %-8s | VibX: %-8.3f | Temp: %-8.2f | Power: %-8.2f |%n",p.getTimestamp(), // 主時間戳p.getTag("tunnel_id"),p.getTag("fan_id"),p.getField("vibration_x", Double.class),p.getField("temperature_celsius", Double.class),p.getField("active_power", Double.class));});} catch (Exception e) {logger.error("Error in queryFanMetricsExample (queryPoints): ", e);}System.out.println("----------------------------------------------------------------------------------------------------------");logger.info("Querying with SQL for raw Object[] stream (manual handling based on SELECT order):");// 列順序: time, tunnel_id, fan_id, vibration_x, temperature_celsius, active_powertry (Stream<Object[]> stream = queryRaw(sql)) { // 使用實現了的 queryRawstream.forEach(row -> {if (row != null && row.length == 6) {System.out.printf("| Time: %-30s | Tunnel: %-8s | Fan: %-8s | VibX: %-8s | Temp: %-8s | Power: %-8s |%n",row[0], row[1], row[2], row[3], row[4], row[5]);} else {logger.warn("Unexpected row format in raw query: {}", (Object)row);}});} catch (Exception e) {logger.error("Error in queryFanMetricsExample (queryRaw): ", e);}System.out.println("----------------------------------------------------------------------------------------------------------");}
}
IInfluxDBService實現:
package com.ruoyi.oil.service;import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.PointValues;import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;public interface IInfluxDBService {boolean writePoint(String measurement, Map<String, String> tags, Map<String, Object> fields, Instant timestamp);boolean writePoints(List<Point> points);boolean writeRecord(String lineProtocol);/*** 執行SQL查詢并返回原始的Object數組流。* 調用者負責關閉Stream,并根據SQL查詢的SELECT子句解析Object[]中的數據。** @param sqlQuery SQL查詢語句* @return 代表結果行的Object數組流*/Stream<Object[]> queryRaw(String sqlQuery);/*** 執行參數化的SQL查詢并返回原始的Object數組流。* 調用者負責關閉Stream,并根據SQL查詢的SELECT子句解析Object[]中的數據。** @param sqlQuery 參數化的SQL查詢語句 (例如 "SELECT * FROM table WHERE tag1 = $param1")* @param params 參數名和值的Map* @return 代表結果行的Object數組流*/Stream<Object[]> queryRaw(String sqlQuery, Map<String, Object> params);/*** 執行InfluxQL查詢并返回原始的Object數組流。* 調用者負責關閉Stream,并根據InfluxQL查詢的SELECT子句解析Object[]中的數據。** @param influxQLQuery InfluxQL查詢語句* @return 代表結果行的Object數組流*/Stream<Object[]> queryRawWithInfluxQL(String influxQLQuery);boolean processAndWriteDeviceData(String jsonMessage);/*** 執行SQL查詢并返回 PointValues 流,方便按類型獲取字段和標簽。* 調用者負責關閉Stream。** @param sqlQuery SQL查詢語句* @return PointValues對象的流*/Stream<PointValues> queryPoints(String sqlQuery);/*** 執行參數化的SQL查詢并返回 PointValues 流。* 注意:influxdb3-java 1.0.0 的 queryPoints API 可能不直接支持 Map 形式的參數化。* 此方法目前可能回退到非參數化版本或需要調用者自行構造含參數的SQL。** @param sqlQuery 參數化的SQL查詢語句* @param params 參數名和值的Map (其在此方法中的支持取決于客戶端庫的實際能力)* @return PointValues對象的流*/Stream<PointValues> queryPoints(String sqlQuery, Map<String, Object> params);
}
四、 核心操作:數據寫入與查詢
數據模型設計 (針對隧道風機監控)
在InfluxDB 3中,數據組織的核心概念包括數據庫(Database)、表(Table,在InfluxDB語境下也常稱為Measurement)、標簽(Tag)和字段(Field)。時間戳(Time)是每條記錄固有的組成部分。
對于我們的隧道風機監控系統,我們設計了如下的數據模型:
-
數據庫 (Database):我們創建了一個名為
tunnel_fan_monitoring
(或根據實際項目命名)的數據庫,作為所有風機監控數據的統一存儲容器。 -
表/Measurement (Table / Measurement):
- 考慮到風機產生的各類傳感器數據(振動、傾角、電流、溫度等)通常是描述同一設備在相近時間點的狀態,并且我們可能需要將這些數據一起分析,我們決定采用一個統一的表來存儲這些指標。
- 表名:
device_metrics
- 這個表將包含所有風機的各類傳感器讀數。如果未來有特定類型的傳感器數據量極大或查詢模式非常獨立,也可以考慮拆分為更細粒度的表。
-
標簽 (Tags):
標簽用于存儲元數據,是數據點的主要索引維度,常用于WHERE
子句的過濾和GROUP BY
子句的分組。在我們的風機監控場景中,關鍵的標簽包括:iotId
(String): 硬件設備在物聯網平臺上的唯一標識符。productKey
(String): 設備所屬的產品型號標識。deviceName
(String): 設備的自定義名稱,例如 “tunnel-A-fan-01”,這是我們系統中標識具體風機的主要業務ID。deviceType
(String): 設備類型,例如 “FanDevice”, “SensorHub”,用于區分不同類型的硬件。(可選) location_zone
(String): 風機所在的隧道區域或更細分的地理位置標簽,如果需要按區域進行聚合分析。
重要特性:標簽集合與順序的不可變性
InfluxDB 3的一個核心設計是,當數據首次寫入一個新表時,該表中出現的標簽鍵及其順序(InfluxDB內部決定的順序)就被固定下來了。之后,你不能再為這個表添加新的標簽鍵。這意味著在設計初期,必須仔細規劃好一個表需要哪些核心的、用于索引和分組的維度作為標簽。如果后續確實需要新的索引維度,可能需要重新設計表結構或創建新表。 -
字段 (Fields):
字段用于存儲實際的測量值或具體的屬性信息。對于風機監控數據,字段將包括:ax
,ay
,az
(Double): X, Y, Z軸的振動值。roll
,pitch
,yaw
(Double): 翻滾角、俯仰角、偏航角。LightCurrent
(Double): 光照傳感器電流(或根據實際意義命名,如operating_current
)。temperature
(Double): 溫度讀數。(可選) status_message
(String): 風機的詳細狀態描述或錯誤信息(如果不是主要用于過濾或聚合)。(可選) online_status
(Boolean/Integer): 表示設備在線狀態的布爾值或整數值,如果設備上下線事件也作為時序數據記錄。
-
時間戳 (Time):
- 每條數據點都必須有一個時間戳,表示數據采集或事件發生的時間。InfluxDB 3支持納秒級精度,我們在Java客戶端中統一使用
Instant
對象,并以納秒精度寫入。
- 每條數據點都必須有一個時間戳,表示數據采集或事件發生的時間。InfluxDB 3支持納秒級精度,我們在Java客戶端中統一使用
這個數據模型旨在平衡查詢靈活性和InfluxDB的性能特點。通過合理的標簽設計,我們可以高效地根據設備ID、類型或位置篩選數據,并通過字段獲取具體的監控指標。
定義數據POJO類:
- 展示如何為接收到的不同JSON消息結構(設備狀態消息、傳感器數據消息)創建對應的Java POJO類 (DeviceStatusMessage, DeviceDataMessage 及其內部類)。
- 使用Jackson注解 (@JsonProperty) 處理JSON鍵名與Java變量名不一致的情況。
package com.ruoyi.oil.controller;import com.ruoyi.common.annotation.Anonymous;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.core.page.TableDataInfo;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.oil.domain.IotAlert;
import com.ruoyi.oil.domain.IotDevice;
import com.ruoyi.oil.domain.dto.DeptsDTO;
import com.ruoyi.oil.domain.dto.DeviceQuery;
import com.ruoyi.oil.domain.dto.DeviceTopic;
import com.ruoyi.oil.domain.dto.DeviceTsDTO;
import com.ruoyi.oil.domain.vo.DeviceInfoVO;
import com.ruoyi.oil.service.IInfluxDBService;
import com.ruoyi.oil.service.IIotDeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.http.ResponseEntity;
import com.influxdb.v3.client.PointValues; // 如果直接處理Stream<PointValues>
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.Map;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.LinkedHashMap;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletResponse;
import java.util.List;@Anonymous
@RestController
@RequestMapping("/iot/data")
public class IotDataController extends BaseController {@Autowiredprivate IInfluxDBService influxDBService;/*** 獲取特定風機在指定時間范圍內的所有指標 (使用 queryPoints)*/@Anonymous@GetMapping("/query")public ResponseEntity<List<Map<String, Object>>> getDataByFan(@RequestParam String deviceName,@RequestParam @Nullable @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime startTime,@RequestParam @Nullable @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime endTime,@RequestParam(defaultValue = "100") int limit) {// 基礎SQL查詢語句StringBuilder sqlBuilder = new StringBuilder();sqlBuilder.append("SELECT time, ").append("\"deviceName\", ") // <--- 修改這里.append("\"iotId\", ") // <--- 建議所有tag和field名都用雙引號,特別是如果它們不是純小寫.append("\"productKey\", ").append("ax, ay, az, roll, pitch, yaw, temperature, \"LightCurrent\" "); // LightCurrent也用引號sqlBuilder.append("FROM ").append("fan_data").append(" ");sqlBuilder.append("WHERE \"deviceName\" = '").append(escapeSqlIdentifier(deviceName)).append("'"); // 添加時間過濾條件(如果提供了時間參數)if (startTime != null) {sqlBuilder.append(" AND time >= '").append(startTime.toInstant().toString()).append("'");}if (endTime != null) {sqlBuilder.append(" AND time < '").append(endTime.toInstant().toString()).append("'");}sqlBuilder.append(" ORDER BY time DESC LIMIT ").append(limit);String sqlQuery = sqlBuilder.toString();List<Map<String, Object>> results = new ArrayList<>();try (Stream<PointValues> stream = influxDBService.queryPoints(sqlQuery)) {results = stream.map(pv -> {Map<String, Object> row = new LinkedHashMap<>();if (pv.getTimestamp() != null) row.put("time", pv.getTimestamp());// 根據你的SELECT語句明確提取if (pv.getTag("deviceName") != null) row.put("deviceName", pv.getTag("deviceName"));if (pv.getTag("iotId") != null) row.put("iotId", pv.getTag("iotId"));if (pv.getTag("productKey") != null) row.put("productKey", pv.getTag("productKey"));putFieldIfPresent(pv, row, "ax", Double.class);putFieldIfPresent(pv, row, "ay", Double.class);putFieldIfPresent(pv, row, "az", Double.class);putFieldIfPresent(pv, row, "roll", Double.class);putFieldIfPresent(pv, row, "pitch", Double.class);putFieldIfPresent(pv, row, "yaw", Double.class);putFieldIfPresent(pv, row, "temperature", Double.class);putFieldIfPresent(pv, row, "LightCurrent", Double.class);return row;}).collect(Collectors.toList());} catch (Exception e) {// log errorreturn ResponseEntity.status(500).body(null);}return ResponseEntity.ok(results);}// 輔助方法,用于從 PointValues 安全地獲取字段并放入 Mapprivate <T> void putFieldIfPresent(PointValues pv, Map<String, Object> map, String fieldName, Class<T> type) {try {T value = pv.getField(fieldName, type);if (value != null) {map.put(fieldName, value);}} catch (Exception e) {// 字段不存在或類型不匹配時,getField會拋異常// logger.trace("Field '{}' not found or type mismatch in PointValues", fieldName);}}// 非常基礎的SQL標識符清理,防止簡單注入。生產環境需要更健壯的方案或使用預編譯語句。private String escapeSqlIdentifier(String identifier) {if (identifier == null) return null;return identifier.replace("'", "''");}
}
package com.ruoyi.oil.controller;import com.ruoyi.common.annotation.Anonymous;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.core.page.TableDataInfo;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.oil.domain.IotAlert;
import com.ruoyi.oil.domain.IotDevice;
import com.ruoyi.oil.domain.dto.DeptsDTO;
import com.ruoyi.oil.domain.dto.DeviceQuery;
import com.ruoyi.oil.domain.dto.DeviceTopic;
import com.ruoyi.oil.domain.dto.DeviceTsDTO;
import com.ruoyi.oil.domain.vo.DeviceInfoVO;
import com.ruoyi.oil.service.IIotDeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;import javax.servlet.http.HttpServletResponse;
import java.util.List;/*** 設備管理Controller* * @author wanglong* @date 2024-04-15*/@RestController
@RequestMapping("/iot/device")
public class IotDeviceController extends BaseController
{@Autowiredprivate IIotDeviceService iotDeviceService;/*** 查詢設備管理列表*/@PreAuthorize("@ss.hasPermi('iot:device:list')")@GetMapping("/list")public TableDataInfo list(DeviceQuery deviceQuery){startPage();List<DeviceInfoVO> list = iotDeviceService.selectIotDeviceDataList(deviceQuery);return getDataTable(list);}@PreAuthorize("@ss.hasPermi('iot:device:list')")@GetMapping("/listF")public TableDataInfo listF(IotDevice iotDevice){startPage();List<IotDevice> list = iotDeviceService.selectIotDeviceDataListF(iotDevice);return getDataTable(list);}/*** 導出設備管理列表*/@PreAuthorize("@ss.hasPermi('iot:device:export')")@PostMapping("/export")public void export(HttpServletResponse response, IotDevice iotDevice){List<IotDevice> list = iotDeviceService.selectIotDeviceList(iotDevice);ExcelUtil<IotDevice> util = new ExcelUtil<IotDevice>(IotDevice.class);util.exportExcel(response, list, "設備管理數據");}/*** 獲取設備管理詳細信息*/@PreAuthorize("@ss.hasPermi('iot:device:query')")@GetMapping(value = "/{deviceId}")public AjaxResult getInfo(@PathVariable("deviceId") Long deviceId){return success(iotDeviceService.selectIotDeviceByDeviceId(deviceId));}/*** 新增設備管理*/@PreAuthorize("@ss.hasPermi('iot:device:add')")@Log(title = "設備管理", businessType = BusinessType.INSERT)@PostMappingpublic AjaxResult add(@RequestBody IotDevice iotDevice){return toAjax(iotDeviceService.insertIotDevice(iotDevice));}/*** 修改設備管理*/@PreAuthorize("@ss.hasPermi('iot:device:edit')")@Log(title = "設備管理", businessType = BusinessType.UPDATE)@PutMapping@Anonymouspublic AjaxResult edit(@RequestBody IotDevice iotDevice){return toAjax(iotDeviceService.updateIotDevice(iotDevice));}/*** 刪除設備管理*/@PreAuthorize("@ss.hasPermi('iot:device:remove')")@Log(title = "設備管理", businessType = BusinessType.DELETE)@DeleteMapping("/{deviceIds}")public AjaxResult remove(@PathVariable Long[] deviceIds){return toAjax(iotDeviceService.deleteIotDeviceByDeviceIds(deviceIds));}/*** 獲取設備警報事件記錄* @param iotAlert* @return*/@GetMapping("/getEvents")public TableDataInfo getEvent(IotAlert iotAlert) {startPage();List<IotAlert> iotAlerts = iotDeviceService.queryEvent(iotAlert);return getDataTable(iotAlerts);}/*** 修改設備運轉周期及電機開關* @param deviceTopic* @return*/@Log(title = "設備管理", businessType = BusinessType.UPDATE)@PreAuthorize("@ss.hasPermi('iot:device:setZhouqi')")@PostMapping("cycle")public AjaxResult cycle(@RequestBody DeviceTopic deviceTopic) {iotDeviceService.updateCycleStatus(deviceTopic);return success();}/*** 激活設備* @param deviceTopic* @return*/@PostMapping("setActive")public AjaxResult activeDevice(@RequestBody DeviceTopic deviceTopic) {iotDeviceService.setActiveCode(deviceTopic);return success();}/*** 導入excel*/@Log(title = "設備管理", businessType = BusinessType.IMPORT)@PreAuthorize("@ss.hasPermi('iot:device:import')")@PostMapping("/importData")public AjaxResult importData(MultipartFile file) throws Exception{ExcelUtil<IotDevice> util = new ExcelUtil(IotDevice.class);List<IotDevice> IotDeviceList = util.importExcel(file.getInputStream());String operName = getUsername();String message = iotDeviceService.importData(IotDeviceList, operName);return success(message);}/*** 導出數據模板* @param response*/@PostMapping("/importTemplate")public void importTemplate(HttpServletResponse response){ExcelUtil<IotDevice> util = new ExcelUtil<IotDevice>(IotDevice.class);util.importTemplateExcel(response, "用戶數據");}@PostMapping("/tempAcc")public TableDataInfo getTempInfo(@RequestBody DeviceTsDTO deviceTsDTO) {return getDataTable(iotDeviceService.selectTempAcc(deviceTsDTO));}/*** 獲取每天全部設備的油量使用量* @param iotDevice* @return*/@GetMapping("/getDayFuel")public AjaxResult getDayFuel(IotDevice iotDevice){return success(iotDeviceService.getDayFue(iotDevice));}/*** 更新部門* @param deptsDTO* @return*/@Log(title = "設備管理", businessType = BusinessType.UPDATE)@PreAuthorize("@ss.hasPermi('iot:device:updateDepts')")@PostMapping("/updateDepts")public AjaxResult updateDepts(@RequestBody DeptsDTO deptsDTO){return toAjax(iotDeviceService.updateDepts(deptsDTO));}/*** 上傳圖片* @param file* @return* @throws Exception*/@PostMapping("/uploadImage")public AjaxResult uploadImage(@RequestParam("file") MultipartFile file) throws Exception{return iotDeviceService.uploadImage(file);}/*** 根據路徑刪除照片* @param path* @return*/@PostMapping("/deletePhoto")public AjaxResult deletePhoto(@RequestParam("path") String path){return toAjax(iotDeviceService.deletePhoto(path));}@GetMapping("/getDeviceInfo")public AjaxResult getDeviceInfo(@RequestParam Long deviceId) {return success(iotDeviceService.getDeviceInfo(deviceId));}
}
Controller層數據讀取示例:
- 構建動態SQL(根據tag篩選、時間范圍篩選)。
/*** 獲取特定風機在指定時間范圍內的所有指標 (使用 queryPoints)*/@Anonymous@GetMapping("/query")public ResponseEntity<List<Map<String, Object>>> getDataByFan(@RequestParam String deviceName,@RequestParam @Nullable @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime startTime,@RequestParam @Nullable @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) OffsetDateTime endTime,@RequestParam(defaultValue = "100") int limit) {// 基礎SQL查詢語句StringBuilder sqlBuilder = new StringBuilder();sqlBuilder.append("SELECT time, ").append("\"deviceName\", ") // <--- 修改這里.append("\"iotId\", ") // <--- 建議所有tag和field名都用雙引號,特別是如果它們不是純小寫.append("\"productKey\", ").append("ax, ay, az, roll, pitch, yaw, temperature, \"LightCurrent\" "); // LightCurrent也用引號sqlBuilder.append("FROM ").append("fan_data").append(" ");sqlBuilder.append("WHERE \"deviceName\" = '").append(escapeSqlIdentifier(deviceName)).append("'"); // 添加時間過濾條件(如果提供了時間參數)if (startTime != null) {sqlBuilder.append(" AND time >= '").append(startTime.toInstant().toString()).append("'");}if (endTime != null) {sqlBuilder.append(" AND time < '").append(endTime.toInstant().toString()).append("'");}sqlBuilder.append(" ORDER BY time DESC LIMIT ").append(limit);String sqlQuery = sqlBuilder.toString();List<Map<String, Object>> results = new ArrayList<>();try (Stream<PointValues> stream = influxDBService.queryPoints(sqlQuery)) {results = stream.map(pv -> {Map<String, Object> row = new LinkedHashMap<>();if (pv.getTimestamp() != null) row.put("time", pv.getTimestamp());// 根據你的SELECT語句明確提取if (pv.getTag("deviceName") != null) row.put("deviceName", pv.getTag("deviceName"));if (pv.getTag("iotId") != null) row.put("iotId", pv.getTag("iotId"));if (pv.getTag("productKey") != null) row.put("productKey", pv.getTag("productKey"));putFieldIfPresent(pv, row, "ax", Double.class);putFieldIfPresent(pv, row, "ay", Double.class);putFieldIfPresent(pv, row, "az", Double.class);putFieldIfPresent(pv, row, "roll", Double.class);putFieldIfPresent(pv, row, "pitch", Double.class);putFieldIfPresent(pv, row, "yaw", Double.class);putFieldIfPresent(pv, row, "temperature", Double.class);putFieldIfPresent(pv, row, "LightCurrent", Double.class);return row;}).collect(Collectors.toList());} catch (Exception e) {// log errorreturn ResponseEntity.status(500).body(null);}return ResponseEntity.ok(results);}
五、 遇到的問題與解決方案(踩坑實錄)
ClassNotFoundException: com.influxdb.v3.client.PointValues
:- 原因分析:Maven依賴問題、IDE緩存。
- 解決方法:強制刷新Maven依賴、清理IDE緩存、檢查
pom.xml
。
java.net.ConnectException: Connection refused
:- 原因分析:InfluxDB服務未運行、配置的URL/端口錯誤、防火墻。
- 排查步驟。
- SQL查詢報錯
Schema error: No field named ...
:- 原因分析:SQL中列名大小寫與實際存儲不一致,未用雙引號包裹駝峰式或特殊標識符。
- 解決方法:在SQL中對這類標識符使用雙引號并保持大小寫。
- 處理不同結構的JSON消息:
- 問題:設備上線消息和數據消息格式不同。
- 解決方案:使用
ObjectMapper.readTree()
預解析判斷關鍵字段,然后反序列化到不同的POJO。
PointValues
API的理解:- 初期可能誤以為有
getTagKeys()
等方法,實際需要按名獲取。 - 如何有效地從
PointValues
提取數據到自定義結構。
- 初期可能誤以為有
- JVM參數
--add-opens
的重要性:不加此參數可能會導致運行時與Arrow Flight相關的底層錯誤。
倉庫地址:https://github.com/dream-one/infulxDB3-JAVA