簡介
driver-go
是 TDengine 的官方 Go 語言連接器,實現了 Go 語言 database/sql 包的接口。Go 開發人員可以通過它開發存取 TDengine 集群數據的應用軟件。
Go 版本兼容性
支持 Go 1.14 及以上版本。
支持的平臺
- 原生連接支持的平臺和 TDengine 客戶端驅動支持的平臺一致。
- WebSocket/REST 連接支持所有能運行 Go 的平臺。
版本歷史
driver-go 版本 | 主要變化 | TDengine 版本 |
---|---|---|
v3.7.0 | 支持 decimal 類型 | 3.3.6.0 及更高版本 |
v3.6.0 | stmt2 原生接口,DSN 支持密碼包含特殊字符(url.QueryEscape) | 3.3.5.0 及更高版本 |
v3.5.8 | 修復空指針異常 | - |
v3.5.7 | taosWS 和 taosRestful 支持傳入 request id | - |
v3.5.6 | 提升 websocket 查詢和寫入性能 | 3.3.2.0 及更高版本 |
v3.5.5 | restful 支持跳過 ssl 證書檢查 | - |
v3.5.4 | 兼容 TDengine 3.3.0.0 tmq raw data | - |
v3.5.3 | 重構 taosWS | - |
v3.5.2 | websocket 壓縮和優化消息訂閱性能 | 3.2.3.0 及更高版本 |
v3.5.1 | 原生 stmt 查詢和 geometry 類型支持 | 3.2.1.0 及更高版本 |
v3.5.0 | 獲取消費進度及按照指定進度開始消費 | 3.0.5.0 及更高版本 |
v3.3.1 | 基于 websocket 的 schemaless 協議寫入 | 3.0.4.1 及更高版本 |
v3.1.0 | 提供貼近 kafka 的訂閱 api | - |
v3.0.4 | 新增 request id 相關接口 | 3.0.2.2 及更高版本 |
v3.0.3 | 基于 websocket 的 statement 寫入 | - |
v3.0.2 | 基于 websocket 的數據查詢和寫入 | 3.0.1.5 及更高版本 |
v3.0.1 | 基于 websocket 的消息訂閱 | - |
v3.0.0 | 適配 TDengine 3.0 查詢和寫入 | 3.0.0.0 及更高版本 |
異常和錯誤碼
如果是 TDengine 錯誤可以通過以下方式獲取錯誤碼和錯誤信息。
// import "github.com/taosdata/driver-go/v3/errors"if err != nil {tError, is := err.(*errors.TaosError)if is {fmt.Println("errorCode:", int(tError.Code))fmt.Println("errorMessage:", tError.ErrStr)} else {fmt.Println(err.Error())}}
TDengine 其他功能模塊的報錯,請參考 錯誤碼
數據類型映射
TDengine DataType | Go Type |
---|---|
TIMESTAMP | time.Time |
TINYINT | int8 |
SMALLINT | int16 |
INT | int32 |
BIGINT | int64 |
TINYINT UNSIGNED | uint8 |
SMALLINT UNSIGNED | uint16 |
INT UNSIGNED | uint32 |
BIGINT UNSIGNED | uint64 |
FLOAT | float32 |
DOUBLE | float64 |
BOOL | bool |
BINARY | string |
NCHAR | string |
JSON | []byte |
GEOMETRY | []byte |
VARBINARY | []byte |
DECIMAL | string |
注意:JSON 類型僅在 tag 中支持。
GEOMETRY 類型是 little endian 字節序的二進制數據,符合 WKB 規范。詳細信息請參考 數據類型
WKB 規范請參考Well-Known Binary (WKB)
示例程序匯總
示例程序源碼請參考:示例程序
常見問題
-
database/sql 中 stmt(參數綁定)相關接口崩潰
REST 不支持參數綁定相關接口,建議使用
db.Exec
和db.Query
。 -
使用
use db
語句后執行其他語句報錯[0x217] Database not specified or available
在 REST 接口中 SQL 語句的執行無上下文關聯,使用
use db
語句不會生效,解決辦法見上方使用限制章節。 -
使用 taosSql 不報錯使用 taosRestful 報錯
[0x217] Database not specified or available
因為 REST 接口無狀態,使用
use db
語句不會生效,解決辦法見上方使用限制章節。 -
readBufferSize
參數調大后無明顯效果readBufferSize
調大后會減少獲取結果時syscall
的調用。如果查詢結果的數據量不大,修改該參數不會帶來明顯提升,如果該參數修改過大,瓶頸會在解析 JSON 數據。如果需要優化查詢速度,需要根據實際情況調整該值來達到查詢效果最優。 -
disableCompression
參數設置為false
時查詢效率降低當
disableCompression
參數設置為false
時查詢結果會使用gzip
壓縮后傳輸,拿到數據后要先進行gzip
解壓。 -
go get
命令無法獲取包,或者獲取包超時
設置 Go 代理 go env -w GOPROXY=https://goproxy.cn,direct
。
API 參考
database/sql 驅動
driver-go
實現了 Go 的 database/sql/driver
接口,可以直接使用 Go 的 database/sql
包。提供了三個驅動:github.com/taosdata/driver-go/v3/taosSql
、github.com/taosdata/driver-go/v3/taosRestful
和 github.com/taosdata/driver-go/v3/taosWS
分別對應 原生連接
、REST 連接
和 WebSocket 連接
。
DSN 規范
數據源名稱具有通用格式,例如 PEAR DB,但沒有類型前綴(方括號表示可選):
[username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...¶mN=valueN]
完整形式的 DSN:
username:password@protocol(address)/dbname?param=value
當密碼中包含特殊字符時,需要使用 url.QueryEscape
進行轉義。
原生連接
導入驅動:
import ("database/sql"_ "github.com/taosdata/driver-go/v3/taosSql"
)
使用 taosSql
作為 driverName
并且使用一個正確的 DSN 作為 dataSourceName
如下:
var taosUri = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosUri)
支持的 DSN 參數:
cfg
指定 taos.cfg 目錄cgoThread
指定 cgo 同時執行的數量,默認為系統核數cgoAsyncHandlerPoolSize
指定異步函數的 handle 大小,默認為 10000
Rest 連接
導入驅動:
import ("database/sql"_ "github.com/taosdata/driver-go/v3/taosRestful"
)
使用 taosRestful
作為 driverName
并且使用一個正確的 DSN 作為 dataSourceName
如下:
var taosUri = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosUri)
支持的 DSN 參數:
disableCompression
是否接受壓縮數據,默認為 true 不接受壓縮數據,如果傳輸數據使用 gzip 壓縮設置為 false。readBufferSize
讀取數據的緩存區大小默認為 4K(4096),當查詢結果數據量多時可以適當調大該值。token
連接云服務時使用的 token。skipVerify
是否跳過證書驗證,默認為 false 不跳過證書驗證,如果連接的是不安全的服務設置為 true。
WebSocket 連接
導入驅動:
import ("database/sql"_ "github.com/taosdata/driver-go/v3/taosWS"
)
使用 taosWS
作為 driverName
并且使用一個正確的 DSN 作為 dataSourceName
如下:
var taosUri = "root:taosdata@ws(localhost:6041)/"
taos, err := sql.Open("taosWS", taosUri)
支持的 DSN 參數:
enableCompression
是否發送壓縮數據,默認為 false 不發送壓縮數據,如果傳輸數據使用壓縮設置為 true。readTimeout
讀取數據的超時時間,默認為 5m。writeTimeout
寫入數據的超時時間,默認為 10s。
:::note
- 與原生連接方式不同,REST 接口是無狀態的。在使用 REST 連接時,需要在 SQL 中指定表、超級表的數據庫名稱。
- 如果在 DSN 中指定了 dbname,那么,REST 連接會默認使用/rest/sql/dbname 作為 restful 請求的 url,在 SQL 中不需要指定 dbname。
:::
連接功能
Go 驅動支持創建連接,返回支持 sql/driver
標準的 Connector
接口的對象,還提供了 af
包,擴充了一些無模式寫入接口。
標準接口
database/sql
包中創建連接的接口
func Open(driverName, dataSourceName string) (*DB, error)
- 接口說明:(
database/sql
) 連接數據庫 - 參數說明:
driverName
:驅動名稱。dataSourceName
:連接參數 DSN。
- 返回值:連接對象,錯誤信息。
- 接口說明:(
擴展接口
af
包中創建連接的接口
func Open(host, user, pass, db string, port int) (*Connector, error)
- 接口說明:連接數據庫。
- 參數說明:
host
:主機地址。user
:用戶名。pass
:密碼。db
:數據庫名稱。port
:端口號。
- 返回值:連接對象,錯誤信息。
無模式寫入
af
包中使用原生連接進行無模式寫入的接口。
-
func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error
- 接口說明:無模式寫入 influxDB 格式數據。
- 參數說明:
lines
:寫入的數據。precision
:時間精度。
- 返回值:錯誤信息。
-
func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error
- 接口說明:無模式寫入 OpenTSDB JSON 格式數據。
- 參數說明:
payload
:寫入的數據。
- 返回值:錯誤信息。
-
func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error
- 接口說明:無模式寫入 OpenTSDB Telnet 格式數據。
- 參數說明:
lines
:寫入的數據。
- 返回值:錯誤信息。
ws/schemaless
包中使用 WebSocket 無模式寫入的接口
func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error
- 接口說明:無模式寫入數據。
- 參數說明:
lines
:寫入的數據。protocol
:寫入的數據協議支持的協議InfluxDBLineProtocol = 1
OpenTSDBTelnetLineProtocol = 2
OpenTSDBJsonFormatProtocol = 3
。precision
:時間精度。ttl
:數據過期時間,0 表示不過期。reqID
:請求 ID。
- 返回值:錯誤信息。
執行 SQL
Go 驅動提供了符合 database/sql
標準的接口,支持以下功能:
- 執行 SQL 語句:執行靜態 SQL 語句,并返回其生成的結果對象。
- 查詢執行:可以執行返回數據集的查詢(
SELECT
語句)。 - 更新執行:可以執行影響行數的 SQL 語句,如
INSERT
、UPDATE
、DELETE
等。 - 獲取結果:可以獲取查詢執行后返回的結果集,并遍歷查詢返回的數據。
- 獲取更新計數:對于非查詢 SQL 語句,可以獲取執行后影響的行數。
- 關閉資源:釋放數據庫資源。
標準接口
-
func (db *DB) Close() error
- 接口說明:關閉連接。
- 返回值:錯誤信息。
-
func (db *DB) Exec(query string, args ...any) (Result, error)
- 接口說明:執行查詢但不返回任何行。
- 參數說明:
query
:要執行的命令。args
:命令參數。
- 返回值:Result 對象(只有影響行數),錯誤信息。
-
func (db *DB) Query(query string, args ...any) (*Rows, error)
- 接口說明:執行查詢并返回行的結果。
- 參數說明:
query
:要執行的命令。args
:命令參數。
- 返回值:Rows 對象,錯誤信息。
-
func (db *DB) QueryRow(query string, args ...any) *Row
- 接口說明:執行查詢并返回一行結果。
- 參數說明:
query
:要執行的命令。args
:命令參數。
- 返回值:Row 對象。
擴展接口
-
func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error)
- 接口說明:執行查詢但不返回任何行。
- 參數說明:
ctx
:上下文,使用 Value 傳遞請求 id 進行鏈路追蹤,key 為taos_req_id
value 為 int64 類型值。query
:要執行的命令。args
:命令參數。
- 返回值:結果 Result 對象(只有影響行數),錯誤信息。
-
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)
- 接口說明:執行查詢并返回行結果。
- 參數說明:
ctx
:上下文,使用 Value 傳遞請求 id 進行鏈路追蹤,key 為taos_req_id
value 為 int64 類型值。query
:要執行的命令。args
:命令參數。
- 返回值:結果集 Rows 對象,錯誤信息。
-
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row
- 接口說明:執行查詢并返回一行結果,錯誤信息會在掃描 Row 時延遲返回。
- 參數說明:
ctx
:上下文,使用 Value 傳遞請求 id 進行鏈路追蹤,key 為taos_req_id
value 為 int64 類型值。query
:要執行的命令。args
:命令參數。
- 返回值:單行結果 Row 對象。
結果獲取
Go 驅動支持獲取查詢結果集,以及對應的結果集元數據,提供了用于讀取結果集中元數據和數據的方法。
結果集
通過 Rows
對象獲取查詢結果集,提供了以下方法:
-
func (rs *Rows) Next() bool
- 接口說明:準備下一行數據。
- 返回值:是否有下一行數據。
-
func (rs *Rows) Columns() ([]string, error)
- 接口說明:返回列名。
- 返回值:列名,錯誤信息。
-
func (rs *Rows) Scan(dest ...any) error
- 接口說明:將當前行的列值復制到 dest 指向的值中。
- 參數說明:
dest
:目標值。
- 返回值:錯誤信息。
-
func (rs *Rows) Close() error
- 接口說明:關閉行。
- 返回值:錯誤信息。
-
func (r *Row) Scan(dest ...any) error
- 接口說明:將當前行的列值復制到 dest 指向的值中。
- 參數說明:
dest
:目標值。
- 返回值:錯誤信息。
通過 Result
對象獲取更新結果集,提供了以下方法:
func (dr driverResult) RowsAffected() (int64, error)
- 接口說明:返回受影響的行數。
- 返回值:受影響的行數,錯誤信息。
結果集元數據
通過 Rows
對象獲取查詢結果集元數據,提供了以下方法:
-
func (rs *Rows) ColumnTypes() ([]*ColumnType, error)
- 接口說明:返回列類型。
- 返回值:列類型,錯誤信息。
-
func (ci *ColumnType) Name() string
- 接口說明:返回列名。
- 返回值:列名。
-
func (ci *ColumnType) Length() (length int64, ok bool)
- 接口說明:返回列長度。
- 返回值:列長度,是否有長度。
-
func (ci *ColumnType) ScanType() reflect.Type
- 接口說明:返回列類型對應的 Go 類型。
- 返回值:列類型。
-
func (ci *ColumnType) DatabaseTypeName() string
- 接口說明:返回列類型數據庫名稱。
- 返回值:列類型名稱。
參數綁定
Prepare 允許使用預編譯的 SQL 語句,可以提高性能并提供參數化查詢的能力,從而增加安全性。
標準接口
使用 sql/driver
的 Conn
接口中的 Prepare
方法準備一個與此連接綁定的準備好的語句,返回 Stmt
對象,使用。
-
Prepare(query string) (Stmt, error)
- 接口說明:準備返回一個與此連接綁定的準備好的語句 (statement)。
- 參數說明:
query
:要進行參數綁定的語句。
- 返回值:Stmt 對象,錯誤信息。
-
func (s *Stmt) Exec(args ...any) (Result, error)
- 接口說明:使用給定的參數執行準備好的語句并返回總結該語句效果的結果(只可以綁定列值,不支持綁定表名和 tag)。
- 參數說明:
args
:命令參數,Go 原始類型會自動轉換數據庫類型,類型不匹配可能會丟精度,建議使用與數據庫相同的類型,時間類型使用 int64 或RFC3339Nano
格式化后的字符串。
- 返回值:結果 Result 對象(只有影響行數),錯誤信息。
-
func (s *Stmt) Query(args ...any) (*Rows, error)
- 接口說明:使用給定的參數執行準備好的語句并返回行的結果。
- 參數說明:
args
:命令參數,Go 原始類型會自動轉換數據庫類型,類型不匹配可能會丟精度,建議使用與數據庫相同的類型,時間類型使用 int64 或RFC3339Nano
格式化后的字符串。
- 返回值:結果集 Rows 對象,錯誤信息。
-
func (s *Stmt) Close() error
- 接口說明:關閉語句。
- 返回值:錯誤信息。
擴展接口
af
包中提供了使用原生連接進行參數綁定的更多接口
-
func (conn *Connector) Stmt() *Stmt
- 接口說明:返回一個與此連接綁定的 Stmt 對象。
- 返回值:Stmt 對象。
-
func (s *Stmt) Prepare(sql string) error
- 接口說明:準備一個 sql。
- 參數說明:
sql
:要進行參數綁定的語句。
- 返回值:錯誤信息。
-
func (s *Stmt) NumParams() (int, error)
- 接口說明:返回參數數量。
- 返回值:參數數量,錯誤信息。
-
func (s *Stmt) SetTableNameWithTags(tableName string, tags *param.Param)
- 接口說明:設置表名和 tag。
- 參數說明:
tableName
:表名。tags
:tag。
- 返回值:錯誤信息。
-
func (s *Stmt) SetTableName(tableName string) error
- 接口說明:設置表名。
- 參數說明:
tableName
:表名。
- 返回值:錯誤信息。
-
func (s *Stmt) BindRow(row *param.Param) error
- 接口說明:綁定行。
- 參數說明:
row
:行數據。
- 返回值:錯誤信息。
-
func (s *Stmt) GetAffectedRows() int
- 接口說明:獲取受影響的行數。
- 返回值:受影響的行數。
-
func (s *Stmt) AddBatch() error
- 接口說明:添加批處理。
- 返回值:錯誤信息。
-
func (s *Stmt) Execute() error
- 接口說明:執行批處理。
- 返回值:錯誤信息。
-
func (s *Stmt) UseResult() (driver.Rows, error)
- 接口說明:使用結果。
- 返回值:結果集 Rows 對象,錯誤信息。
-
func (s *Stmt) Close() error
- 接口說明:關閉語句。
- 返回值:錯誤信息。
從 3.6.0 版本開始,提供 stmt2 綁定參數的接口
func (conn *Connector) Stmt2(reqID int64, singleTableBindOnce bool) *Stmt2
- 接口說明:從連接創建 stmt2。
- 參數說明:
reqID
:請求 ID。singleTableBindOnce
:單個子表在單次執行中只有一次數據綁定。
- 返回值:stmt2 對象。
func (s *Stmt2) Prepare(sql string) error
- 接口說明:綁定 sql 語句。
- 參數說明:
sql
:要綁定的 sql 語句。
- 返回值:錯誤信息。
func (s *Stmt2) Bind(params []*stmt.TaosStmt2BindData) error
- 接口說明:綁定數據。
- 參數說明:
- params 要綁定的數據。
- 返回值:錯誤信息。
func (s *Stmt2) Execute() error
- 接口說明:執行語句。
- 返回值:錯誤信息。
func (s *Stmt2) GetAffectedRows() int
- 接口說明:獲取受影響行數(只在插入語句有效)。
- 返回值:受影響行數。
func (s *Stmt2) UseResult() (driver.Rows, error)
- 接口說明:獲取結果集(只在查詢語句有效)。
- 返回值:結果集 Rows 對象,錯誤信息。
func (s *Stmt2) Close() error
- 接口說明:關閉 stmt2。
- 返回值:錯誤信息。
ws/stmt
包提供了通過 WebSocket 進行參數綁定的接口
-
func (c *Connector) Init() (*Stmt, error)
- 接口說明:初始化。
- 返回值:Stmt 對象,錯誤信息。
-
func (s *Stmt) Prepare(sql string) error
- 接口說明:準備一個 sql。
- 參數說明:
sql
:要進行參數綁定的語句。
- 返回值:錯誤信息。
-
func (s *Stmt) SetTableName(name string) error
- 接口說明:設置表名。
- 參數說明:
name
:表名。
- 返回值:錯誤信息。
-
func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType)
- 接口說明:設置 tag。
- 參數說明:
tags
:tag。bindType
:類型信息。
- 返回值:錯誤信息。
-
func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error
- 接口說明:綁定參數。
- 參數說明:
params
:參數。bindType
:類型信息。
- 返回值:錯誤信息。
-
func (s *Stmt) AddBatch() error
- 接口說明:添加批處理。
- 返回值:錯誤信息。
-
func (s *Stmt) Exec() error
- 接口說明:執行批處理。
- 返回值:錯誤信息。
-
func (s *Stmt) GetAffectedRows() int
- 接口說明:獲取受影響的行數。
- 返回值:受影響的行數。
-
func (s *Stmt) UseResult() (*Rows, error)
- 接口說明:使用結果。
- 返回值:Rows 對象,錯誤信息。
-
func (s *Stmt) Close() error
- 接口說明:關閉語句。
- 返回值:錯誤信息。
Rows 行結果參考 sql/driver
包中的 Rows
接口,提供以下接口
-
func (rs *Rows) Columns() []string
- 接口說明:返回列名。
- 返回值:列名。
-
func (rs *Rows) ColumnTypeDatabaseTypeName(i int) string
- 接口說明:返回列類型數據庫名稱。
- 參數說明:
i
:列索引。
- 返回值:列類型名稱。
-
func (rs *Rows) ColumnTypeLength(i int) (length int64, ok bool)
- 接口說明:返回列長度。
- 參數說明:
i
:列索引。
- 返回值:列長度,是否有長度。
-
func (rs *Rows) ColumnTypeScanType(i int) reflect.Type
- 接口說明:返回列類型對應的 Go 類型。
- 參數說明:
i
:列索引。
- 返回值:列類型。
-
func (rs *Rows) Next(dest []driver.Value) error
- 接口說明:準備下一行數據,并賦值給目標。
- 參數說明:
dest
:目標值。
- 返回值:錯誤信息。
-
func (rs *Rows) Close() error
- 接口說明:關閉行。
- 返回值:錯誤信息。
common/param
包中提供了參數綁定數據結構
以下是按照偏移設置參數的接口:
-
func NewParam(size int) *Param
- 接口說明:創建一個參數綁定數據結構。
- 參數說明:
size
:參數數量。
- 返回值:Param 對象。
-
func (p *Param) SetBool(offset int, value bool)
- 接口說明:設置布爾值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:布爾值。
-
func (p *Param) SetNull(offset int)
- 接口說明:設置空值。
- 參數說明:
offset
:偏移量 (列或標簽)。
-
func (p *Param) SetTinyint(offset int, value int)
- 接口說明:設置 Tinyint 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:Tinyint 值。
-
func (p *Param) SetSmallint(offset int, value int)
- 接口說明:設置 Smallint 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:Smallint 值。
-
func (p *Param) SetInt(offset int, value int)
- 接口說明:設置 Int 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:Int 值。
-
func (p *Param) SetBigint(offset int, value int)
- 接口說明:設置 Bigint 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:Bigint 值。
-
func (p *Param) SetUTinyint(offset int, value uint)
- 接口說明:設置 UTinyint 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:UTinyint 值。
-
func (p *Param) SetUSmallint(offset int, value uint)
- 接口說明:設置 USmallint 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:USmallint 值。
-
func (p *Param) SetUInt(offset int, value uint)
- 接口說明:設置 UInt 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:UInt 值。
-
func (p *Param) SetUBigint(offset int, value uint)
- 接口說明:設置 UBigint 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:UBigint 值。
-
func (p *Param) SetFloat(offset int, value float32)
- 接口說明:設置 Float 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:Float 值。
-
func (p *Param) SetDouble(offset int, value float64)
- 接口說明:設置 Double 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:Double 值。
-
func (p *Param) SetBinary(offset int, value []byte)
- 接口說明:設置 Binary 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:Binary 值。
-
func (p *Param) SetVarBinary(offset int, value []byte)
- 接口說明:設置 VarBinary 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:VarBinary 值。
-
func (p *Param) SetNchar(offset int, value string)
- 接口說明:設置 Nchar 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:Nchar 值。
-
func (p *Param) SetTimestamp(offset int, value time.Time, precision int)
- 接口說明:設置 Timestamp 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:Timestamp 值。precision
:時間精度。
-
func (p *Param) SetJson(offset int, value []byte)
- 接口說明:設置 Json 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:Json 值。
-
func (p *Param) SetGeometry(offset int, value []byte)
- 接口說明:設置 Geometry 值。
- 參數說明:
offset
:偏移量 (列或標簽)。value
:Geometry 值。
以下是鏈式調用設置參數的接口:
func (p *Param) AddBool(value bool) *Param
- 接口說明:添加布爾值。
- 參數說明:
value
:布爾值。
- 返回值:Param 對象。
其他類型與布爾值類似,具體接口如下:
- AddNull
- AddTinyint
- AddSmallint
- AddInt
- AddBigint
- AddUTinyint
- AddUSmallint
- AddUInt
- AddUBigint
- AddFloat
- AddDouble
- AddBinary
- AddVarBinary
- AddNchar
- AddTimestamp
- AddJson
- AddGeometry
以下是設置列類型信息的接口:
-
func NewColumnType(size int) *ColumnType
- 接口說明:創建一個列類型信息數據結構。
- 參數說明:
size
:列數量。
- 返回值:ColumnType 對象。
-
func (c *ColumnType) AddBool() *ColumnType
- 接口說明:添加布爾類型。
- 返回值:ColumnType 對象。
其他類型與布爾類型類似,具體接口如下:
- AddTinyint
- AddSmallint
- AddInt
- AddBigint
- AddUTinyint
- AddUSmallint
- AddUInt
- AddUBigint
- AddFloat
- AddDouble
- AddBinary
- AddVarBinary
- AddNchar
- AddTimestamp
- AddJson
- AddGeometry
數據訂閱
Go 驅動支持數據訂閱功能,提供了基于原生連接和 WebSocket 連接的數據訂閱接口。原生實現在 af/tmq
包中,WebSocket 實現在 ws/tmq
包中。
消費者
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
- 接口說明:創建一個消費者。
- 參數說明:
conf
:配置信息。
- 返回值:Consumer 對象,錯誤信息。
配置信息定義為:
type ConfigValue interface{}
type ConfigMap map[string]ConfigValue
創建消費者支持屬性列表:
ws.url
:WebSocket 連接地址。ws.message.channelLen
:WebSocket 消息通道緩存長度,默認 0。ws.message.timeout
:WebSocket 消息超時時間,默認 5m。ws.message.writeWait
:WebSocket 寫入消息超時時間,默認 10s。ws.message.enableCompression
:WebSocket 是否啟用壓縮,默認 false。ws.autoReconnect
:WebSocket 是否自動重連,默認 false。ws.reconnectIntervalMs
:WebSocket 重連間隔時間毫秒,默認 2000。ws.reconnectRetryCount
:WebSocket 重連重試次數,默認 3。
其他參數請參考:Consumer 參數列表,注意 TDengine 服務端自 3.2.0.0 版本開始消息訂閱中的 auto.offset.reset 默認值發生變化。
-
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
- 接口說明:訂閱主題。
- 參數說明:
topic
:主題。rebalanceCb
:平衡回調(未使用)。
- 返回值:錯誤信息。
-
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
- 接口說明:訂閱主題列表。
- 參數說明:
topics
:主題列表。rebalanceCb
:平衡回調(未使用)。
- 返回值:錯誤信息。
-
func (c *Consumer) Unsubscribe() error
- 接口說明:取消訂閱。
- 返回值:錯誤信息。
-
func (c *Consumer) Poll(timeoutMs int) tmq.Event
- 接口說明:輪詢事件。
- 參數說明:
timeoutMs
:超時時間。
- 返回值:事件。
-
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
- 接口說明:提交偏移量。
- 返回值:TopicPartition 列表,錯誤信息。
-
func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)
- 接口說明:獲取分配信息。
- 返回值:TopicPartition 列表,錯誤信息。
-
func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error
- 接口說明:跳轉到偏移量。
- 參數說明:
partition
:分區和偏移信息。ignoredTimeoutMs
:超時時間(未使用)。
- 返回值:錯誤信息。
-
func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error)
- 接口說明:獲取提交的偏移量。
- 參數說明:
partitions
:分區列表。timeoutMs
:超時時間。
- 返回值:TopicPartition 列表,錯誤信息。
-
func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error)
- 接口說明:提交偏移量。
- 參數說明:
offsets
:偏移量列表。
- 返回值:TopicPartition 列表,錯誤信息。
-
func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error)
- 接口說明:獲取當前偏移量。
- 參數說明:
partitions
:分區列表。
- 返回值:TopicPartition 列表,錯誤信息。
-
func (c *Consumer) Close() error
- 接口說明:關閉消費者。
- 返回值:錯誤信息。
消費記錄
當 Poll
返回 tmq.Event
事件時,可以通過判斷 tmq.Event
的類型獲取消費記錄或錯誤信息。當類型為 *tmq.DataMessage
時,可以獲取消費記錄。
-
func (m *DataMessage) Topic() string
- 接口說明:獲取主題。
- 返回值:主題。
-
func (m *DataMessage) DBName() string
- 接口說明:獲取數據庫名稱。
- 返回值:數據庫名稱。
-
func (m *DataMessage) Offset() Offset
- 接口說明:獲取偏移量。
- 返回值:偏移量。
-
func (m *DataMessage) Value() interface{}
- 接口說明:獲取值,具體值為
[]*tmq.data
。 - 返回值:消費到的值。
- 接口說明:獲取值,具體值為
tmq.data 結構如下:
type Data struct {TableName stringData [][]driver.Value
}
- TableName 為表名
- Data 為數據,每個元素為一行數據,每行數據為一個數組,數組元素為列值。
當 Poll 返回類型為 tmq.Error
時,可以使用 func (e Error) Error() string
獲取錯誤信息。
分區信息
當消費到數據類型為 *tmq.DataMessage
時,可以從 TopicPartition
屬性中獲取分區信息。
type TopicPartition struct {Topic *stringPartition int32Offset OffsetMetadata *stringError error
}
Topic
:主題。Partition
:分區。Offset
:偏移量。Metadata
:元數據(未使用)。Error
:錯誤信息。
可以使用 func (p TopicPartition) String() string
獲取分區信息。
偏移量元數據
從 TopicPartition
中獲取的偏移量信息,可以通過 Offset
屬性獲取偏移量元數據。當偏移量為 -2147467247
時表示未設置偏移量。
反序列化
當消費到數據類型為 *tmq.DataMessage
時,可以使用 func (m *DataMessage) Value() interface{}
獲取數據,數據類型為 []*tmq.data
。
附錄
- driver-go 文檔。
- 視頻教程。
訪問官網
更多內容歡迎訪問 TDengine 官網