1. 與MySQL的比較
InfluxDB | MySQL | 解釋 |
---|---|---|
Bucket | Database | 數據庫 |
Measurement | Table | 表 |
Tag | Indexed Column | 索引列 |
Field | Column | 普通列 |
Point | Row | 每行數據 |
2. 安裝FluxDB
brew update
默認安裝 2.x的版本
brew install influxdb
查看influxdb
版本
influxd version # InfluxDB 2.7.11 (git: fbf5d4ab5e) build_date: 2024-11-26T18:06:07Z
啟動influxdb
influxd
訪問面板
http://localhost:8086/
配置用戶信息
保存token
L5IeK5vutRmkCuyzbz781GVKj4fR6fKGQdl3CaWAPNEKmigrI0Yt8IlEN5_qkO9Lgb80BpcISK0U4WSkWDcqIQ==
3. 使用行協議寫入數據
官網規范
- 首先是一個
measurementName
,和指定MySQL
的表名一樣 - 然后是
Tag
,和指定MySQL
的索引列一樣,多個Tag
通過逗號分隔 - 然后是
Field
,和指定MySQL
的普通列一樣多個Field
通過逗號分隔,與Tag
通過空格分隔 - 最后是時間戳(選填,下面測試時單位為秒)
測試寫入:
user,name=jack age=11 1748264631
結果:
4. 使用Flux查詢數據
- from:從哪個Bucket即桶中查詢數據
- range:根據時間篩選數據,單位有ms毫秒,s秒,m分鐘,h消失,d天,w星期,mo月,y年,比如
range(start: -1d, stop:now())
就是過去一天內的數據,其中stop:now()
是默認的,可以不寫。 - filter:根據列篩選數據
樣例并解釋:
from(bucket: "demo") # 從demo這個數據庫中去數據|> range(start: -1d, stop:now()) # 時間范圍篩選|> filter(fn: (r) => r["_measurement"] == "user") # 從這個user這個表查詢數據|> filter(fn: (r) => r["name"] == "jack") # 根據索引等值查詢,相當于MySQL后面的where條件,influx會根據這個tag上的倒排索引加快查詢速度|> filter(fn: (r) => r["_field"] == "age") # 相當于MySQL查詢具體的列的數據,只不過有多個Field會被拆分為多行,每行對應一個Field的數據
關于r["_field"] == "age"
的問題:為什么需要這么查詢?因為Field如果有多個,就會被拆成多行
比如我們插入數據時是這樣的:user,name=jack age=18,height=180 1716715200000000000
,雖然這是一個數據點Point,但是由于有兩個Field
,那么查詢到的數據其實是兩行,如果加了r["_field"] == "age"
,就只會出現第一條數據,注意Tag不會被拆分為多行
_measurement | name | _field | _value | _time |
---|---|---|---|---|
user | jack | age | 18 | 2024-05-26 00:00:00Z |
user | jack | height | 180 | 2024-05-26 00:00:00Z |
5. SpringBoot集成
5.1 引入依賴
<dependency><groupId>com.influxdb</groupId><artifactId>influxdb-client-java</artifactId><version>6.9.0</version>
</dependency>
<dependency><groupId>org.jetbrains.kotlin</groupId><artifactId>kotlin-stdlib</artifactId><version>1.8.20</version>
</dependency>
5.2. 插入數據
5.2.1 基礎數據
private final static String token = "L5IeK5vutRmkCuyzbz781GVKj4fR6fKGQdl3CaWAPNEKmigrI0Yt8IlEN5_qkO9Lgb80BpcISK0U4WSkWDcqIQ==";
private final static String org = "test";
private final static String bucket = "demo";
private final static String url = "http://127.0.0.1:8086";
5.2.2 通過行協議插入
private static void writeDataByLine() {InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray());WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();String data = "user,name=tom age=18 1748270504";writeApi.writeRecord(bucket, org, WritePrecision.S, data);
}
5.2.3 通過Point插入
private static void writeDataByPoint() {InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray());WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();Point point = Point.measurement("user").addTag("name", "jerry").addField("age", 20f).time(Instant.now(), WritePrecision.S);writeApi.writePoint(bucket, org, point);
}
5.2.4 通過Pojo類插入
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;import java.time.Instant;@Measurement(name = "user")
@NoArgsConstructor
@AllArgsConstructor
public class InfluxData {@Column(tag = true)String name;@ColumnFloat age;@Column(timestamp = true)Instant time;
}
private static void writeDataByPojo() {InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray());WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();InfluxData influxData = new InfluxData("cat", 30f, Instant.now());writeApi.writeMeasurement(bucket, org, WritePrecision.S, influxData);
}
5.3 查詢數據
private static void queryData() {InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray());String query = "from(bucket: \"demo\")\n" +" |> range(start: -1d, stop:now())\n" +" |> filter(fn: (r) => r[\"_measurement\"] == \"user\")";List<FluxTable> fluxTables = influxDBClient.getQueryApi().query(query, org);for (FluxTable fluxTable : fluxTables) {// 根據索引列分組for (FluxRecord record : fluxTable.getRecords()) { // 每組的數據System.out.println(record.getValues());}System.out.println();}
}
最終結果:
5.4 查詢升級
自定義查詢參數,時間范圍查詢
@Data
public class InfluxDataQuery {private String plcName;@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime startTime;@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime stopTime;private String topic;
}
public List<Map<String, Object>> queryData(InfluxDataQuery queryParams) {String plcName = queryParams.getPlcName();LocalDateTime startTime = queryParams.getStartTime(), stopTime = queryParams.getStopTime();String topic = queryParams.getTopic();if (startTime == null) {throw new RuntimeException("startTime不能為空");}InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray());StringBuilder sb = new StringBuilder();sb.append("\nfrom(bucket: \"").append(bucket).append("\")\n");if (stopTime == null) {stopTime = LocalDateTime.now();}sb.append(" |> range(start:").append(startTime.atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)).append(",stop:").append(stopTime.atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)).append(")\n");if (StringUtils.hasText(plcName)) {sb.append(" |> filter(fn: (r) => r[\"plcName\"] == \"").append(plcName).append("\")\n");}if (StringUtils.hasText(topic)) {sb.append(" |> filter(fn: (r) => r[\"_measurement\"] == \"").append(topic).append("\")\n");}log.info("query: {}", sb);List<FluxTable> fluxTables = influxDBClient.getQueryApi().query(sb.toString(), org);List<Map<String, Object>> dataList = new ArrayList<>();for (FluxTable fluxTable : fluxTables) {// 根據索引列分組for (FluxRecord record : fluxTable.getRecords()) { // 每組的數據dataList.add(record.getValues());}}return dataList;
}
拼接好的SQL大概長這樣子: