引言:為什么選擇 TDengine 與 Rust?
TDengine 是一款專為物聯網、車聯網、工業互聯網等時序數據場景優化設計的開源時序數據庫,支持高并發寫入、高效查詢及流式計算,通過“一個數據采集點一張表”與“超級表”的概念顯著提升性能。
Rust 作為一門系統級編程語言,近年來在數據庫、嵌入式系統、分布式服務等領域迅速崛起,以其內存安全、高性能著稱,與 TDengine 的高效特性天然契合,適合構建高可靠、高性能的數據處理系統。
TDengine Rust 連接器的設計與架構
TDengine Rust 連接器的主要目標是提供一個高效、安全且易于使用的接口,讓開發者能夠通過 Rust 語言與 TDengine 數據庫進行高效的交互。連接器的設計充分考慮了 Rust 語言的優勢,如內存安全、并發處理和高性能,同時確保與 TDengine 數據庫之間的通信可靠且高效。
主要模塊間的架構如下圖:
解釋:
-
taos-query 模塊作為核心,負責定義公共接口和數據結構。
-
taos-optin 模塊和 taos-ws 模塊分別實現了這些公共接口,負責原生連接和 WebSocket 連接的業務邏輯實現。
-
taos 模塊封裝了 taos-optin 和 taos-ws 模塊中的實現,同時將 taos-query 模塊所定義的公共接口和數據結構予以暴露。
-
最終,用戶只需依賴 taos 模塊,即可輕松使用整個系統,而無需了解內部復雜的實現細節。
快速入門:從安裝到代碼開發
1.在 Cargo.toml 中添加以下內容:
[dependencies]
anyhow = "1.0.96"
chrono = "0.4.39"
serde = "1.0.217"
taos = "0.12.3"
tokio = "1.43.0"
2.代碼演示,數據寫入與查詢:
use chrono::{DateTime, Local};
use taos::*;#[tokio::main]
async fn main() -> anyhow::Result<()> {// Establish native connectionlet dsn = "taos://localhost:6030";let taos = TaosBuilder::from_dsn(dsn)?.build().await?;let db = "power";// Prepare the databasetaos.exec_many([format!("DROP DATABASE IF EXISTS {db}"),format!("CREATE DATABASE {db}"),format!("USE {db}"),]).await?;let inserted = taos.exec_many([// Create a super table"CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \TAGS (group_id INT, location varchar(20))",// Create a subtable"CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')",// Write one record at a time"INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)",// Write Null values"INSERT INTO d0 VALUES(now, NULL, NULL, NULL)",// Automatically create table on insert"INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)",// Write multiple records at once"INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)",]).await?;assert_eq!(inserted, 6);let mut result = taos.query("SELECT * FROM meters").await?;for field in result.fields() {println!("Get the field: {}", field.name());}println!();// Query option 1, using row stream.let mut rows = result.rows();while let Some(row) = rows.try_next().await? {for (name, value) in row {println!("Get the value of {}: {}", name, value);}println!()}// Query option 2, deserialize using serde.#[derive(Debug, serde::Deserialize)]#[allow(dead_code)]struct Record {// Deserialize timestamp to chrono::DateTime<Local>ts: DateTime<Local>,// Deserialize float to f32current: Option<f32>,// Deserialize int to i32voltage: Option<i32>,// Deserialize float to f32phase: Option<f32>,// Deserialize int to i32group_id: i32,// Deserialize varchar to Stringlocation: String,}let records: Vec<Record> = taos.query("SELECT * FROM meters").await?.deserialize().try_collect().await?;println!("Get records: {:?}", records);Ok(())
}
高級功能與最佳實踐
連接池優化:提升高并發性能
頻繁創建和銷毀數據庫連接會帶來顯著的開銷,尤其是在高并發場景(如物聯網設備高頻上報數據)中。通過連接池復用連接,可減少 TCP 握手、認證等重復操作,提升整體吞吐量。
代碼示例:
use taos::*;#[tokio::main]
async fn main() -> anyhow::Result<()> {// Create a connection poollet dsn = "taos://localhost:6030";let pool = TaosBuilder::from_dsn(dsn)?.pool()?;let taos = pool.get().await?;let db = "power";// Prepare the databasetaos.exec_many([format!("DROP DATABASE IF EXISTS {db}"),format!("CREATE DATABASE {db}"),format!("USE {db}"),]).await?;let inserted = taos.exec_many([// Create a super table"CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \TAGS (group_id INT, location varchar(20))",// Create a subtable"CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')",// Write one record at a time"INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)",// Write Null values"INSERT INTO d0 VALUES(now, NULL, NULL, NULL)",// Automatically create table on insert"INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)",// Write multiple records at once"INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)",]).await?;assert_eq!(inserted, 6);Ok(())
}
原生連接與 WebSocket 連接
對于原生連接和 WebSocket 連接這兩種方式,除了建立連接所使用的 DSN 不同外,其余接口調用沒有差異。
代碼示例:
use taos::*;#[tokio::main]
async fn main() -> anyhow::Result<()> {// let dsn = "taosws://localhost:6041"; // WebSocket connectionlet dsn = "taos://localhost:6030"; // Native connectionlet _taos = TaosBuilder::from_dsn(dsn)?.build().await?;Ok(())
}
同步接口和異步接口
TDengine Rust 連接器的接口分為同步接口和異步接口。通常情況下,同步接口基于異步接口實現,二者的方法簽名,除了異步接口多了 async 關鍵字外,基本一致。
異步接口的代碼示例可查閱第 3 章節,同步接口的代碼示例如下:
use taos::sync::*;fn main() -> anyhow::Result<()> {// Establish native connectionlet dsn = "taos://localhost:6030";let taos = TaosBuilder::from_dsn(dsn)?.build()?;let db = "power";// Prepare the databasetaos.exec_many([format!("DROP DATABASE IF EXISTS {db}"),format!("CREATE DATABASE {db}"),format!("USE {db}"),])?;let inserted = taos.exec_many([// Create a super table"CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \TAGS (group_id INT, location varchar(20))",// Create a subtable"CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')",// Write one record at a time"INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)",// Write Null values"INSERT INTO d0 VALUES(now, NULL, NULL, NULL)",// Automatically create table on insert"INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)",// Write multiple records at once"INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)",])?;assert_eq!(inserted, 6);Ok(())
}
結語:開源社區的星辰大海
TDengine Rust 連接器的成長史,是一個典型的技術民主化故事——從社區萌芽到官方支持,從邊緣工具到核心生態。它證明了兩個事實:
-
開源協作的效率:在數據庫領域,社區貢獻者的“需求反哺”比閉門造車更能擊中痛點。
-
Rust 的生態潛力:作為系統級編程語言的 Rust,正在時序數據處理這樣的垂直領域開辟新戰場。
我們期待你的參與,歡迎提交 PR,一起推動 TDengine Rust 連接器的進步,攜手開創更加美好的開源未來!
附錄
-
https://docs.taosdata.com/
-
https://github.com/taosdata/taos-connector-rust