通聯數據依托于金融大數據,結合人工智能技術為投資者提供個性化、智能化、專業化投資服務, MDL 則是通聯數據提供的高頻行情數據服務。DolphinDB 提供了能夠從 MDL 服務器獲取高頻行情數據的 DolphinDB MDL 插件,幫助用戶方便地通過 DolphinDB 腳本語言將實時行情數據接入 DolphinDB 中,以便進行后續的計算或存儲。
本文主要介紹如何通過 MDL 插件將實時行情數據寫入分布式數據庫。本文全部代碼需要運行在 2.00.11 或者更高版本的 DolphinDB Server 以及插件上,目前僅支持 Linux 系統。
1. DolphinDB MDL 行情插件介紹
MDL 插件基于 MDL 官方提供的行情數據服務 C++ SDK(即 TCP 版本 MDL)實現。MDL 插件最核心的功能是實現了行情回調函數,每次接收到行情時會寫入訂閱時所指定的 DolphinDB 共享流表中。DolphinDB MDL 插件目前已經支持了包括上交所、深交所、中金所 、鄭商所、上期能源、大商所、廣期所數據源的 50 余種數據類型,訂閱使用所需指定的參數詳情見下文。具體MDL 插件的接口介紹見?DolphinDB MDL Plugin 使用說明。
2. 基本使用介紹
2.1 安裝插件
MDL 插件目前可以在 2.00.11 版本及以后的 DolphinDB Server 通過插件市場進行安裝。節點啟動后,連接節點并在 GUI(或 VS Code、Web UI)等?DolphinDB 客戶端中執行?installPlugin
?函數,則可以下載到與當前 server 版本適配的 MDL 插件文件,插件文件包括插件描述文件及插件的二進制文件。
login("admin", "123456")
installPlugin("MDL")
installPlugin
?函數若正常返回,則代表下載成功,其返回值為插件描述文件(PluginMDL.txt)的安裝路徑,如:
/path_to_dolphindb_server/server/plugins/MDL/PluginMDL.txt
installPlugin
?函數實際上是完成從遠程文件服務器拉取插件文件到 DolphinDB Server 所在的服務器,壓縮后的 MDL 包大小約為 60 MB,下載需要一定的耗時。
2.2 加載插件
在腳本中調用插件相關的接口前,需要先加載插件。在 GUI(或 VS Code、Web UI)等客戶端中執行?loadPlugin("MDL")
。以下示例中使用了相對路徑,也可以使用 2.1 中返回的絕對路徑?/path_to_dolphindb_server/server/plugins/MDL/PluginMDL.txt。
loadPlugin("./plugins/MDL/PluginMDL.txt")
loadPlugin
?函數正常返回則插件加載成功,以 VS Code 為例,首次加載成功后返回的部分信息如下,返回值內容是 MDL 插件所提供的函數:
3. 行情的接入與存儲
本章以訂閱滬深兩市的全市場股票的逐筆數據、實時寫入 DolphinDB 分布式數據庫為例,對 MDL 插件的使用進行說明。
3.1 數據接入方案
數據接入的流程如下:
- 通過 MDL 插件訂閱深圳市場的股票逐筆委托、逐筆成交,以及上海市場的逐筆合并數據逐筆數據寫入 DolphinDB 的三個持久化流數據表。持久化流數據表是具備發布訂閱功能的內存表。
- 訂閱持久化流數據表寫入 DolphinDB 分布式數據庫,將數據存儲到磁盤上。
注意:請勿使用 MDL 插件將行情數據直接寫入分布式數據庫。因為分布式數據庫并不適用于此類高頻的流式寫入。建議在使用時,借助流數據表及其發布訂閱功能以實現部分的批處理。這樣既可以提高寫入的吞吐量,也有助于降低時延。

下面分步驟介紹關鍵的 DolphinDB 代碼實現,完整腳本見附錄。
3.2 數據接入方案實施步驟
3.2.1 流表和分布式表規劃
注意:為保證后續的?enableTableShareAndPersistence
?函數能夠正常執行,需要節點啟動之前在配置文件中(單節點:dolohindb.cfg,集群:cluster.cfg)指定配置參數?persistenceDir?,配置參考功能配置。
獲取表結構
調用?MDL::getSchema?方法可以獲取行情數據各個表的表結構。運行以下代碼可以獲取上海股票逐筆合并、深圳股票逐筆成交和逐筆委托三張表的表結構:
// 深交所股票逐筆成交
transactionSchema = MDL::getSchema(`MDLSID_MDL_SZL2, 36);
// 深交所股票逐筆委托
orderSchema = MDL::getSchema(`MDLSID_MDL_SZL2, 33);
// 上交所股票逐筆合并
ngtsSchema = MDL::getSchema(`MDLSID_MDL_SHL2, 24);
創建持久化流數據表
得到行情數據的表結構后,使用該表結構創建持久化流數據表。
cacheSize = 1000000enableTableShareAndPersistence(table=streamTable(cacheSize:0, transactionSchema[`name], transactionSchema[`type]), tableName=`transactionTable_sz, cacheSize=cacheSize)
enableTableShareAndPersistence(table=streamTable(cacheSize:0, orderSchema[`name], orderSchema[`type]), tableName=`orderTable_sz, cacheSize=cacheSize)
enableTableShareAndPersistence(table=streamTable(cacheSize:0, ngtsSchema[`name], ngtsSchema[`type]), tableName=`ngtsTable_sh, cacheSize=cacheSize)
代碼第二行的?cacheSize
?變量控制了在建表時預分配內存的大小、以及流數據表可占用的最大內存,其單位是行,設置較大的?cacheSize
?可以降低出現峰值時延的頻率。具體大小可以根據實際的可使用的內存大小決定。具體優化原理可參考?DolphinDB 流計算時延統計與性能優化。
創建分布式數據庫
為將行情數據存入分布式數據庫,需要根據之前得到的行情數據表結構來創建分布式庫表,分區規則參考自《基于 DolphinDB 存儲金融數據的分區方案最佳實踐》。
注意:由于 MDL 沒有當天的日期字段,因此需要手動增加日期字段以便于分區。
transactionColName = transactionSchema[`name]
transactionColName.append!("Date")
transactionColType = transactionSchema[`type]
transactionColType.append!(DATE)
orderColName = orderSchema[`name]
orderColName.append!("Date")
orderColType = orderSchema[`type]
orderColType.append!(DATE)
ngtsColName = ngtsSchema[`name]
ngtsColName.append!("Date")
ngtsColType = ngtsSchema[`type]
ngtsColType.append!(DATE)dbName = "dfs://Stock_TB"
transactionTbName = "transactionTable_sz"
orderTbName = "orderTable_sz"
ngtsTbName = "ngtsTable_sh"dbDate = database(, partitionType=VALUE, partitionScheme=2024.01.01..2025.01.01)
dbID = database(, partitionType=HASH, partitionScheme=[SYMBOL, 25])
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate, dbID],engine='TSDB',atomic='CHUNK')tbSchema = table(1:0, transactionColName, transactionColType)
db.createPartitionedTable(table=tbSchema,tableName=transactionTbName,partitionColumns=`Date`SecurityID,sortColumns=`SecurityID`TransactTime)
tbSchema = table(1:0, orderColName, orderColType)
db.createPartitionedTable(table=tbSchema,tableName=orderTbName,partitionColumns=`Date`SecurityID,sortColumns=`SecurityID`TransactTime)
tbSchema = table(1:0, ngtsColName, ngtsColType)
db.createPartitionedTable(table=tbSchema,tableName=ngtsTbName,partitionColumns=`Date`SecurityID,sortColumns=`SecurityID`TickTime)
3.2.2 訂閱 MDL 行情將增量數據到流數據表
建立 MDL 連接句柄
用戶配置 MDL 的連接信息及驗證 TOKEN,使用?MDL::createHandle?函數創建連接的句柄。注意創建連接句柄并沒有連接,只有在執行?MDL::connectMDL?后才會真正進行連接、接收數據。
因為通聯 MDL 不支持同時訂閱在不同服務器的數據源,也就是當同時訂閱上交所 L2 和深交所 L2 時,只能收到其中一個數據源的行情。為了解決這個問題,需要創建兩個句柄,分別訂閱上交所 L2 和深交所 L2 的數據。
需要注意的是,在?createHandle
?建立連接時可以指定?workerNum
?線程數,如果指定了大于 1 的線程數,則 MDL 的回調會在多個線程內進行,因此寫入 DolphinDB 流表的過程也是多線程的。下面的例子中指定線程數為 1,即單線程進行回調。
// 配置連接
HOST = ["mdl-sse01.datayes.com","mdl01.datayes.com","mdl02.datayes.com","mdl01.datayes.com","mdl02.datayes.com","mdl01.datayes.com","mdl02.datayes.com","mdl01.datayes.com","mdl02.datayes.com","mdl-cloud-bj.datayes.com","mdl-cloud-sz.datayes.com","mdl01.datayes.com","mdl02.datayes.com"]
PORT = [19010,19011,19011,19010,19010,19013,19013,19012,19012,19012,19012,19018,19018]
// TOKEN 根據實際需要進行替換
USERNAME = "97887ADLJFKAJLSDF98976WRUJD0KJDFLAKJDS"handle_sh = MDL::createHandle(`handle_sh, HOST, PORT, USERNAME, 1)
handle_sz = MDL::createHandle(`handle_sz, HOST, PORT, USERNAME, 1)
訂閱行情數據
使用?MDL::subscribe?函數進行 MDL 行情訂閱,在訂閱時需要傳入要訂閱的數據服務 ID 和版本號,并指定消息 ID。
// 深交所股票數據
MDL::subscribe(handle_sz, orderTable_sz, `MDLSID_MDL_SZL2, `MDLVID_MDL_SZL2, 33)
MDL::subscribe(handle_sz, transactionTable_sz, `MDLSID_MDL_SZL2, `MDLVID_MDL_SZL2, 36)
// 上交所股票數據
MDL::subscribe(handle_sh, ngtsTable_sh, `MDLSID_MDL_SHL2, `MDLVID_MDL_SHL2, 24)
有關 MDL 插件對數據品類的支持情況和相關訂閱參數,參考附錄 3.。
3.2.3 訂閱流表寫入分布式庫
訂閱 3.2 節中的三個持久化流數據表,將增量數據實時寫入分布式數據庫。
Transaction = loadTable(database=dbName, tableName=transactionTbName)
Order = loadTable(database=dbName, tableName=orderTbName)
Ngts = loadTable(database=dbName, tableName=ngtsTbName)def handleInsert(tb, mutable msg) {msg.addColumn("Date", DATE)msg.replaceColumn!("Date", take(date(now()), msg.size()))tableInsert(tb, msg)
}subscribeTable(tableName="transactionTable_sz", actionName="transactionTableInsert", offset=-1, handler=handleInsert{Transaction}, msgAsTable=true, batchSize=20000, throttle=1, reconnect=true)
subscribeTable(tableName="orderTable_sz", actionName="orderTableInsert", offset=-1, handler=handleInsert{Order}, msgAsTable=true, batchSize=20000, throttle=1, reconnect=true)
subscribeTable(tableName="ngtsTable_sh", actionName="ngtsTableInsert", offset=-1, handler=handleInsert{Ngts}, msgAsTable=true, batchSize=20000, throttle=1, reconnect=true)
handleInsert
?自定義函數會手動新增一列新的 Date 數據,并寫入當天的日期。- 通過調整?subscribeTable?函數中的?batchSize?和?throttle?參數可以控制寫入分布式數據庫的頻率。
- batchSize=20000 表示當未處理消息的數量達到 20000 時,handler 才會處理消息。
- throttle=1 表示繼上次 handler 處理消息之后,若未處理消息的數量還沒有達到 20000 ,但是時間間隔 1s 后也會處理消息。
- 因此,達到?batchSize?設置的條件或者達到?throttle?設置的條件,才會向分布式數據庫寫入一次。
3.2.4 啟動 MDL 連接,開啟數據寫入
由于 MDL 需要在連接前設置完數據的訂閱,連接后將無法添加訂閱。所以在訂閱后,使用函數?MDL::connectMDL?連接 MDL,行情數據將進入流數據表。
MDL::connectMDL(handle_sh)
MDL::connectMDL(handle_sz)
3.3 MDL 運行狀態監控
運行過程中,可以查看 MDL 行情的接收情況。可以使用?MDL::getHandleStatus?查詢 MDL 行情的接收情況。
stat = MDL::getHandleStatus()
select HandleName,CreateTime,IsConnect,SubscribeInfo from stat
返回結果如下,可以看到在本例提交了 MDL 訂閱,訂閱了深交所的逐筆成交和逐筆委托,以及上交所的逐筆合成,目前的連接狀態為 true。
查詢流表的訂閱,可以看到發布訂閱都正常進行。
getStreamingStat().subWorkers
查詢 dfs 表中某一只股票的數據,觀察數據。這里查詢?600100
?這支上交所股票的數據,執行以下命令,從 dfs 表中獲取了從連接之后的所有該股票的逐筆數據。
select * from loadTable("dfs://Stock_TB", "ngtsTable_sh") where SecurityID=`600100
至此,已經成功訂閱上交所和深交所的逐筆數據并將它落盤到 DolphinDB 分布式表中。
附錄
1. 訂閱落庫的腳本文件
MDL落庫腳本.dos
2. 常見問題
如果重復執行?loadPlugin
?加載插件,會拋出模塊已經被使用的錯誤提示,因為節點啟動后,只允許加載一次 MDL 插件,即可在任意會話中調用該插件提供的函數。錯誤提示如下:
The module [MDL] is already in use.
可以通過?try-cach
?語句捕獲這個錯誤,避免因為插件已加載而中斷后續腳本代碼的執行:
try{ loadPlugin("./plugins/MDL/PluginMDL.txt") }catch(ex){print ex}
3. MDL 插件數據品類參數表
MDL 插件目前支持了下表中出現的數據品類。在訂閱時可以查詢該表,找到想要訂閱的數據入參。表中為空的項,對應字段在訂閱時填空即可。
行情源品種 | svrID | svrVersion | 行情數據類型 | msgID | extraOrderLevel |
---|---|---|---|---|---|
上交所L2 | “MDLSID_MDL_SHL2” | “MDLVID_MDL_SHL2” | 市場行情 (mdl.4.4) | 4 | |
上交所L2 | “MDLSID_MDL_SHL2” | “MDLVID_MDL_SHL2” | 市場行情 (mdl.4.4) 帶10檔買賣方向委托隊列數據 | 4 | 1-10 (僅支持上交所、深交所快照。視具體需要的委托隊列檔位而指定, 下同。) |
上交所L2 | “MDLSID_MDL_SHL2” | “MDLVID_MDL_SHL2” | 指數行情 (mdl.4.6) | 6 | |
上交所L2 | “MDLSID_MDL_SHL2” | “MDLVID_MDL_SHL2” | 盤后固定價格行情消息 (mdl.4.16) | 16 | |
上交所L2 | “MDLSID_MDL_SHL2” | “MDLVID_MDL_SHL2” | 盤后固定價格交易逐筆成交消息 (mdl.4.17) | 17 | |
上交所L2 | “MDLSID_MDL_SHL2” | “MDLVID_MDL_SHL2” | 競價逐筆合并行情 (mdl.4.24) | 24 | |
orderbookSnapshotEngine 上交所L2股票行情快照實時合成 | "SHL2_ORDER_AND_TRANSACTION" | 非通聯MDL原始行情, 用于獲取實時快照合成的數據 | |||
深交所L2 | “MDLSID_MDL_SZL2” | “MDLVID_MDL_SZL2” | 市場行情 (mdl.6.28) | 28 | |
深交所L2 | “MDLSID_MDL_SZL2” | “MDLVID_MDL_SZL2” | 市場行情 (mdl.6.28) 帶10檔買賣方向委托隊列數據 | 28 | 1-10 |
深交所L2 | “MDLSID_MDL_SZL2” | “MDLVID_MDL_SZL2” | 指數行情 (mdl.6.29) | 29 | |
深交所L2 | “MDLSID_MDL_SZL2” | “MDLVID_MDL_SZL2” | 成交量統計指標行情快照 (mdl.6.30) | 30 | |
深交所L2 | “MDLSID_MDL_SZL2” | “MDLVID_MDL_SZL2” | 盤后定價交易業務行情快照 (mdl.6.31) | 31 | |
深交所L2 | “MDLSID_MDL_SZL2” | “MDLVID_MDL_SZL2” | 逐筆委托行情 (mdl.6.33) | 33 | |
深交所L2 | “MDLSID_MDL_SZL2” | “MDLVID_MDL_SZL2” | 逐筆成交行情 (mdl.6.36) | 36 | |
orderbookSnapshotEngine 深交所 L2股票行情快照實時合成 | "SZL2_ORDER_AND_TRANSACTION" | 非通聯MDL原始行情, 用于獲取實時快照合成的數據 | |||
中金所 L2 | “MDLSID_MDL_CFFEXL2” | “MDLVID_MDL_CFFEXL2“ | 期貨行情 (mdl.21.1) | 1 | |
鄭商所 L2 | “MDLSID_MDL_CZCEL2“ | “MDLVID_MDL_CZCEL2“ | 期貨行情 (mdl.23.1) | 1 | |
鄭商所 L2 | “MDLSID_MDL_CZCEL2“ | “MDLVID_MDL_CZCEL2“ | 期貨組合行情 (mdl.23.5) | 5 | |
上期能源 L2 | “MDLSID_MDL_SHFEL2“ | “MDLVID_MDL_SHFEL2“ | 上期期貨 (mdl.22.1) | 1 | |
上期能源 L2 | “MDLSID_MDL_SHFEL2“ | “MDLVID_MDL_SHFEL2“ | 原油期貨 (mdl.22.3) | 3 | |
大商所 L2 | “MDLSID_MDL_DCEL2” | “MDLVID_MDL_DCEL2” | 期貨行情 (mdl.24.1) | 1 | |
大商所 L2 | “MDLSID_MDL_DCEL2” | “MDLVID_MDL_SHFEL2“ | 期貨成交量統計 (mdl.24.3) | 3 | |
大商所 L2 | “MDLSID_MDL_DCEL2” | “MDLVID_MDL_SHFEL2“ | 期貨組合行情 (mdl.24.5) | 5 | |
大商所 L2 | “MDLSID_MDL_DCEL2” | “MDLVID_MDL_SHFEL2“ | 期貨最優價十筆委托 (mdl.24.7) | 7 | |
廣期所 L2 | “MDLSID_MDL_GFEXL2“ | “MDLVID_MDL_GFEXL2“ | 期貨行情 (mdl.26.1) | 1 | |
廣期所 L2 | “MDLSID_MDL_GFEXL2“ | “MDLVID_MDL_GFEXL2“ | 期貨成交量統計 (mdl.26.3) | 3 | |
廣期所 L2 | “MDLSID_MDL_GFEXL2“ | “MDLVID_MDL_GFEXL2“ | 期貨組合行情 (mdl.26.5) | 5 | |
廣期所 L2 | “MDLSID_MDL_GFEXL2“ | “MDLVID_MDL_GFEXL2“ | 期貨最優價十筆委托 (mdl.26.7) | 7 |
注:行情快照合成類型是為了對接 DolphinDB 快照合成流計算引擎的特殊處理,指將逐筆成交和逐筆委托處理為同構數據寫入一張 DolphinDB 表中。(可聯系 DolphinDB 小助手 dolphindb1,以進一步了解)