在金融市場中,大額訂單的一次性交易可能會對市場價格產生較大沖擊,導致交易成本增加。例如,大額買入訂單可能會迅速推高股價,使后續買入成本上升;大額賣出訂單則可能打壓股價,造成資產賤賣。拆單算法通過將大額訂單拆分成多個小額訂單,在指定時間段的不同時間點進行小額訂單交易,避免了一次性大額交易對市場價格的過度影響。
本教程使用了?DolphinDB?的 CEP (復雜事件處理引擎用)產品,系統的講解了基于 CEP 引擎搭建拆單調度系統的全流程,包括 TWAP 和 VWAP 兩種拆單算法。讀者將學習以下內容:
- CEP 的基礎介紹。
- 如何基于 CEP 引擎實現 TWAP 算法拆單。
- 如何基于 CEP 引擎實現?VWAP 算法拆單
1. CEP 引擎介紹
復雜事件處理(Complex Event Processing,簡稱 CEP )引擎是一款用于實時處理和分析事件流中復雜事件的引擎。其主要功能包括接收實時數據流,定義事件并從事件流中檢測特定事件,進而對滿足指定規則的事件執行預設的操作。詳細功能介紹如下:
- 事件捕捉和過濾: 從大量實時數據流中找到特定事件。
- 事件模式:識別指定的事件模式,這些模式可以涉及多個事件的組合,形成具有特定含義的事件序列。
- 復雜事件處理:執行復雜的事件處理邏輯,包括篩選、聚合、轉換等操作,以識別關鍵信息或發現特定的業務模式。
?圖 1-1 CEP 引擎架構圖
由圖 1-1 可以看出,一個完整的 CEP 應用包括以下幾個部分:事件流序列化器、事件流反序列器、事件分發器、CEP 子引擎。事件是貫穿這些部分的基本元素,詳細介紹請查閱官網。
2. TWAP 拆單算法
本章將介紹如何使用 CEP 引擎實現 TWAP 拆單算法。
2.1 算法思想
TWAP(Time Weighted Average Price),時間加權平均價格算法,是一種最簡單的拆單算法交易策略,主要適用于流動性較好的市場和訂單規模較小的交易。該模型將交易時間進行均勻分割,并在每個分割節點上等量拆分訂單進行提交。例如,可以將某個交易日的交易時間平均分為 n 段,TWAP 算法會將該交易日需要執行的訂單均勻分配在這 n 個時間段上去執行,從而使得交易均價跟蹤 TWAP 。TWAP 模型設計的目的是使交易對市場影響減小的同時提供一個較低的平均成交價格,從而達到減小交易成本的目的。TWAP 算法的公式如下:
其中,n?為時間段的個數,pricei?為分割節點上拆分訂單的價格。但使用 TWAP 算法進行拆單存在一些問題:
- 在訂單規模很大的情況下,均勻分配到每個節點上的下單量仍然較大,當市場流動性不足時仍可能對市場造成一定的沖擊。
- 傳統的 TWAP 算法對交易時間和訂單規模進行均勻拆分,這種有規律的交易模式容易被市場其他參與者察覺和預測。一旦其他交易者發現了這種模式,就可以根據其規律提前布局,增加交易成本。
本例基于上述問題,對傳統 TWAP 算法做了如下改進:
- 子單規模、下單時間間隔在用戶指定的范圍內隨機,使得交易模式更加隱蔽,避免被其他市場參與者察覺和針對。
- 對大額母單的拆單狀態進行實時管理,包括暫停下單、恢復下單和終止下單。用戶可以根據實時市場形勢,對母單狀態進行管理,增加風險容錯。
2.2 功能模塊
TWAP 算法邏輯實現主要依賴 DolphinDB 如下產品和功能:
- CEP 引擎:核心組件,把所有流式數據包括行情和訂單都看作是事件流,自定義事件流的處理規則。
- 數據回放功能:模擬實時快照行情。
- 流表的發布訂閱功能:解耦用戶母單發布和拆單 CEP 引擎,促進系統內模塊間的通信。
TWAP 的算法邏輯實現流程圖如下所示:
圖 2-1 TWAP 算法流程圖
本例中 CEP 引擎的策略啟動事件是母單事件?ParentOrder
?,母單事件進入引擎后,將啟動對母單狀態管理事件?OrderAlterAction
?的監聽。行情快照數據通過回放功能進入鍵值內存表后,供拆單核心函數?PlaceOrder
?讀取,以確定子單下單的價格。下面將分模塊介紹它們的主要功能。
1)數據回放
數據回放?Replay
?是 DolphinDB 中常用于高頻策略回測場景的方法,它可以根據指定的回放模式,按時間順序將一個或多個數據表的數據回放到某個數據表或引擎,模擬實時數據的寫入。
由于行情快照由交易所按固定時間間隔推送,?Replay
?可以很好地將這種數據按時間戳排序后輸出到流數據表。而行情快照數據中包含了交易所中多種基金的大量的歷史行情,因此直接提供給 CEP 引擎會導致慢查詢。在現實市場中,確定子單價格只需要該基金最新的快照盤口數據,因此一支基金只需要保留最新的行情快照記錄,DolphinDB 提供的鍵值內存表可以實現上述思路。有關鍵值內存表的說明見官方文檔?。
本例中鍵值內存表以基金 ID 為主鍵,快照行情數據(包含買賣盤口十檔價格)為非主鍵列,就可以向 CEP 引擎提供多支基金的實時行情快照數據,以模擬實時行情快照寫入。
2)流表發布訂閱
DolphinDB 的流表訂閱采用經典的發布-訂閱(Pub-Sub)通信模型,通過消息隊列實現流數據的發布與訂閱,從而將流數據生產者(發布者)與消費者(訂閱者)解耦。異構流數據表接收流數據的輸入,當新的流數據注入到該流數據表時,發布者會將這些數據發送到所有訂閱者的消費隊列,并通知所有訂閱方,從消費隊列中獲取數據進行增量處理。
內存表可以通過?subscribeTable
?函數訂閱流數據表,上文中的鍵值內存表訂閱流數據表,需要自定義回調函數,并在回調函數中將接收到的增量數據插入到鍵值內存表中。最終鍵值內存表中記錄了所有基金的最新行情快照數據。如下圖所示。
?
圖2-2 鍵值對內存表功能示意圖
CEP引擎可以通過?subscribeTable
?函數訂閱流數據表,需要指定?handler?為 CEP 引擎的句柄,通過?getStreamEngine
?獲得對應 CEP 引擎的句柄。CEP 引擎訂閱異構流數據表,當流表中出現增量?ParentOrder
?和?OrderAlterAction
?事件時,將事件注入 CEP 引擎,CEP 引擎添加對應的監視器和回調函數進行事件處理,完成拆單下單和母單狀態管理的過程。
3)CEP 引擎
CEP 引擎模塊是拆單系統中最重要,也是最復雜的部分。在本案例中,采用了動態啟動策略的方式:當引擎內的事件監聽器(Event Listener)捕獲策略啟動事件?ParentOrder
?時,才會設置對?OrderAlterAction
?的監聽。行情快照通過數據回放進入流數據表,流數據表將數據發布到鍵值內存表中,供 CEP 引擎查詢基金最新的盤口價格。
- 拆單下單:
ParentOrder
?注入 CEP 引擎后,調用核心函數?PlaceOrder
?進行拆單下單。PlaceOrder
首先判斷母單狀態,若處于初始化/下單中,則進行拆單。PlaceOrder
?根據用戶指定的子單股數范圍,使用 DolphinDB 的?rand
?函數隨機選取子單股數。從鍵值內存表中讀取當前基金的最新盤口價格,確定子單價格。確定子單參數后,將子單輸出到子單流表中即下單。下單后判斷是否下單完畢,若下單完畢則注銷監視器;否則使用?rand
?函數隨機選取間隔時間后,定時啟動下一次拆單下單。 - 母單狀態修改:策略啟動后,CEP 引擎設置對?
OrderAlterAction
?事件的監聽,當?OrderAlterAction
注入 CEP 引擎,監視器根據母單當前狀態和目標狀態對母單進行操作。例如將處于下單狀態的母單設置為暫停下單,此時 CEP 引擎會暫停拆單下單的過程,監聽下一個?OrderAlterAction
?事件注入引擎,從而恢復下單。OrderAlterAction
?事件輸出到狀態修改流表中。 - 可視化:Dashboard 是 DolphinDB 提供的一個強大的數據可視化和分析工具,旨在幫助用戶更好地理解和利用數據。將子單流表和狀態修改流表中的數據輸出到 Dashboard ,可以實時觀察母單的拆單下單過程,以及母單的狀態修改情況。
2.3 代碼實現
在本節中我們將具體介紹 TWAP 拆單系統代碼的實現,包括定義事件類、鍵值內存表訂閱流表、回放行情數據、定義監視器、創建 CEP 引擎、CEP 引擎訂閱流表、啟動策略實例等功能模塊,完整代碼見文末附錄。
step1-定義事件類
DolphinDB 將事件定義為類,首先使用 DolphinDB 腳本語言將母單信息、母單狀態變更信息定義為類。完整的事件類代碼見附件。
- 母單類?
ParentOrder
?:除了母單的基本信息,例如母單 ID、批次、基金代碼、證券代碼、執行人、母單股數、買賣方向等,需要定義為母單類的成員變量外,還需要將拆單的核心參數定義為母單類的成員變量:
fundCode :: STRING // 基金IDtradeQuantity :: LONG //母單股數tradeDirection :: STRING // 買賣方向,"B"對應買,"S"對應賣//拆單參數splitMethod :: STRING // 拆單算法priceOption :: INT // 買一還是賣一價格startTime :: TIMESTAMP // 拆單開始時間endTime :: TIMESTAMP // 拆單結束時間lowChildOrderQty :: INT // 子單最小股數highChildOrderQty :: INT // 子單最大股數lowSplitInterval :: INT // 最小拆單間隔(秒)highSplitInterval :: INT // 最大拆單間隔(秒)orderStatus :: STRING // 拆單狀態
splitMethod
?指定算法類型,這里為 TWAP;priceOption
?指定子單價格使用行情快照中的買一價格還是賣一價格;startTime
?、endTime
?指定拆單時間范圍;lowChildOrderQty
?、highChildOrderQty
?指定子單下單股數范圍;lowSplitInterval
?、highSplitInterval
?指定子單拆單時間間隔范圍;?orderStatus
?記錄母單狀態,例如初始化、下單中、暫停等。
- 母單狀態變更類?
OrderAlterAction
?:將對母單狀態變更操作的信息定義為母單狀態變更類,包含以下成員變量。
splitOrderId :: STRING //操作的母單號eventType :: STRING //事件類型operation :: STRING //操作類型batchId :: STRING //批次ID(母單的唯一ID)handlerEmpid :: STRING //執行人handlerName :: STRING // 執行人eventTime :: TIMESTAMP // 下達變更單時間
核心成員變量是?operation
?指定此次狀態變更的操作類型,例如暫停、恢復、終止下單等。CEP 引擎的監視器根據操作類型對母單的?orderStatus
?進行變更。
step2-創建母單、子單記錄內存表
創建母單記錄內存表?parentOrderManage
?,修改訂單記錄內存表?alterOrderManage
?,子單接收流表?subOrderStream
?以及 CEP 引擎訂閱的異構流數據表?orderBlobStream
?。
//創建母單記錄內存表
colNames=["splitOrderId","eventType","batchId","tagBatchId","sortNum","fundCode","fundName","assetId","assetName","combinationNo","combinationName","stockCode","stockName","tradeDate","tradeQuantity","tradeDirection","market","handlerEmpid","handlerName","splitMethod","orderType","price","startTime","endTime","splitInterval","orderStatus","splitOrderAmount","eventTime","lastUpdateTime"]
colTypes=[STRING,SYMBOL,STRING,STRING,INT,SYMBOL,SYMBOL,STRING,STRING,STRING,STRING,SYMBOL,SYMBOL,STRING,LONG,SYMBOL,SYMBOL,STRING,STRING,SYMBOL,SYMBOL,DOUBLE,TIMESTAMP,TIMESTAMP,INT,SYMBOL,INT,TIMESTAMP,TIMESTAMP]
share table(1:0,colNames,colTypes) as parentOrderManage//創建修改訂單記錄內存表
colNames=`splitOrderId`eventType`operation`batchId`handlerEmpid`handlerName`eventTime
colTypes=[STRING,STRING,STRING,STRING,STRING,STRING,TIMESTAMP]
share table(1:0, colNames, colTypes) as alterOrderManage// 創建子單接收流數據表
colNames=["splitOrderId","batchId","tagBatchId","sortNum","fundCode","fundName","assetId","assetName","combinationNo","combinationName","stockCode","stockName","tradeDate","tradeQuantity","tradeDirection","market","handlerEmpid","handlerName","orderType","price","lastUpdateTime"]
colTypes=[STRING,STRING,STRING,INT,SYMBOL,SYMBOL,STRING,STRING,STRING,STRING,SYMBOL,SYMBOL,STRING,LONG,SYMBOL,SYMBOL,STRING,STRING,SYMBOL,DOUBLE,TIMESTAMP]
share streamTable(1:0, colNames, colTypes) as subOrderStream// 創建異構流數據表,被CEP引擎訂閱
share(streamTable(100:0,`timestamp`eventType`blob`splitOrderId, [TIMESTAMP, STRING,BLOB,STRING]),"orderBlobStream")
step3-鍵值內存表訂閱流表
使用鍵值內存表?snapshotOutputKeyedTb
?訂閱行情快照流表?snapshotStream
?,首先定義兩張表。
- 表結構定義:定義行情快照流表?
snapshotStream
?,用于接收回放的行情快照數據;定義鍵值內存表snapshotOutputKeyedTb
?,用于存儲每支基金的最新行情快照。
// 定義行情快照流表,這個流表用于接收回放的行情快照數據
colNames = `Market`TradeDate`TradeTime`SecurityID`OfferPrice`BidPrice`OfferOrderQty`BidOrderQty
coltypes = [SYMBOL,DATE,TIME,SYMBOL,DOUBLE[],DOUBLE[],INT[],INT[]]
share streamTable(1:0,colNames,coltypes) as snapshotStream// 創建鍵值對表,該表訂閱snapshotStream,存儲每個基金的買盤和賣盤,每個基金的買盤和賣盤只有一條記錄
snapshotOutputKeyedTbTmp = keyedTable(`SecurityID,1:0,colNames,coltypes)
share snapshotOutputKeyedTbTmp as snapshotOutputKeyedTb
snapshotStream
?中,SecurityID
?為基金唯一 ID ,OfferPrice
?為賣盤十檔價格,OfferOrderQty
?為賣盤對應的十檔委托量;BidPrice
?為買盤十檔價格,BidOrderQty
?為買盤對應的十檔委托量。
snapshotOutputKeyedTb
?中,主鍵為?SecurityID
?,字段和?snapshotStream
?一致。最終每支基金只會保存一條最新的行情快照記錄。
- 訂閱:
snapshotOutputKeyedTb
?訂閱?snapshotStream
?中的增量行情快照數據。
// 訂閱snapshotStream回調函數
def handleSnapshot(msg) {// 拿到所有數據data = exec * from msg// 向snapshotOutputKeyedTb表寫入insert into snapshotOutputKeyedTb values(data[`Market],data[`TradeDate],data[`TradeTime],data[`SecurityID],data[`OfferPrice],data[`BidPrice],data[`OfferOrderQty],data[`BidOrderQty])
}
// 訂閱
subscribeTable(tableName = `snapshotStream,actionName=`handleSnapshot,handler = handleSnapshot,msgAsTable=true,batchSize = 1)
使用?subscribeTable
?函數訂閱?snapshotStream
?,回調函數?handleSnapshot
?接收到的增量行情快照數據插入到?snapshotOutputKeyedTb
?。
step4-定義監視器
CEP 引擎內部監視器的配置是拆單系統實現中最關鍵的步驟。監視器內封裝了整個拆單策略,其結構大致如下。
class SplitMonitor:CEPMonitor{def SplitMonitor() {//本例中,初始monitor 不需要傳值, 在克隆復制任務monitor 時進行設置。}//初始記錄母單記錄信息def initPOrderManageInfo(pOrder){...}//更新母單記錄信息def updatePOrderManageInfo(pOrder,opTime){...}
}//TWAP 算法下單監聽 monitor,繼承關系
class TWAPSplitMonitor:SplitMonitor {// 記錄子單總下股數的變量subOrderQtys :: INT// 當前母單parentOrder :: ParentOrderdef TWAPSplitMonitor() {//本例中,初始monitor 不需要傳值, 在克隆復制任務monitor 時進行設置。}// 在范圍內選取隨機數,被時間浮動和單量浮動調用def randNum(lowNum, highNum){...}//TWAP 下單方法def placeOrder(){...}//母單拆單狀態變更操作def orderAlter(oaAction){...}//初始化parentOrder,進行拆單下單,設置OrderAlterAction事件監聽def startPlaceOrder(pOrder){...}//創建母單下單monitor實例def forkParentOrderMonitor(pOrder){...}//初始任務def onload(){ addEventListener(forkParentOrderMonitor, "ParentOrder", ,"all")}
}
成員變量?subOrderQtys
?:由于子單是給定股數范圍內隨機股數下單,使用?subOrderQtys
?記錄當前已下子單的股數和,避免超出母單股數。
成員變量?parentOrder
?:記錄當前母單的參數,包括基本信息,拆單參數,和拆單狀態。
下面將按照 CEP 引擎運作的邏輯順序,依次介紹監視器中各個成員方法的具體內容。
- onload 初始任務
創建引擎并實例化監視器后,將首先調用其內部的?onload
?函數。回顧上文, CEP 引擎工作流的源頭是監聽策略啟動事件?ParentOrder
?,一旦監聽到?ParentOrder
?注入才進行下一步的操作。因此,在?onload
?函數中,只需考慮設置相關的事件監聽器以便啟動策略即可。使用?addEventListener
?函數監聽?ParentOrder
?事件注入,指定回調函數為?forkParentOrderMonitor
?,事件類型是?ParentOrder
?,設置監聽次數為持續監聽。
//初始任務def onload(){ addEventListener(forkParentOrderMonitor, "ParentOrder", ,"all")}
onload
?方法設置了一個事件監聽器,監聽所有的母單事件?ParentOrder
?。當監聽到該類型事件時,下一步將啟動整個拆單下單過程。為了控制母單拆單下單的線程安全,為每個母單創建不同的 Monitor 實例,因此在對應的回調函數?forkParentOrderMonitor
?中需要包含對 Monitor 實例的創建和母單參數傳遞等步驟。從?onload
?方法開始,函數調用的流程與實現的功能可以分為四個模塊,如下圖所示。
圖2-3 函數調用模塊圖
其中,?startPlaceOrder
?函數是后三個模塊的啟動函數,啟動順序如上圖所示。模塊 3 、模塊 4 中,函數調用的流程與實現的功能如下圖所示。
圖 2-4 模塊 3 、模塊 4 流程圖
接下來從策略啟動事件對應的回調函數?forkParentOrderMonitor
?開始來介紹具體的代碼實現。
- forkParentOrderMonitor 生成監視器實例
在現實的交易市場中,系統會同時接到多個需要拆分的大額委托單。若 CEP 引擎內只使用一個 Monitor 實例來操作當前委托單的拆單下單,由于一個 Monitor 實例只有一個?parentOrder
?成員變量,則會使得成員變量?parentOrder
?的屬性值一直被新注入的母單事件修改,引起線程安全問題。
為了解決上面的問題,CEP 引擎內的初始 Monitor 只負責監控策略啟動事件注入,參考上文的?onload
?函數。每當?onload
?函數監測到新的?ParentOrder
?事件注入,調用?forkParentOrderMonitor
?生成一個新的 sub-Monitor 實例,進行當前委托單的拆單下單操作。
//生成一個母單下單monitor實例def forkParentOrderMonitor(pOrder){name = "母單下單["+pOrder.splitOrderId +"]"spawnMonitor(name,startPlaceOrder, pOrder)}
- startPlaceOrder 啟動核心模塊
startPlaceOrder
?函數中,包含模塊 2 、模塊 3 和模塊 4 的啟動步驟,函數定義如下。
//模塊啟動函數def startPlaceOrder(pOrder){// 設置當前子任務 monitor 對象的內部母單變量parentOrder = pOrder // 對子單量總股數初始化為0subOrderQtys = 0//TWAP 拆單初始化parentOrder.setAttr(`orderStatus,'初始化')parentOrder.setAttr(`sortNum,0) //拆單順序號//記錄母單狀態到內存表initPOrderManageInfo(parentOrder)//計算母單開始拆單下單時間if(parentOrder.startTime == null|| now()>=parentOrder.startTime){//初始下單時間為空,或者初始下單時間早于現在,則立即開始下單placeOrder()}else {//下單等待時間,按指定startTime 時間開始下單//當前時間到開始下單時間間隔,計算出來的是毫秒數,轉成秒period_wait = round((parentOrder.startTime - now())\1000 ,0)//指定在 period_wait 秒后開始啟動一次下單addEventListener(placeOrder,,,1,,duration(period_wait+"s"))}//創建母單變更操作監聽器,持續監聽addEventListener(orderAlter, "OrderAlterAction", <OrderAlterAction.splitOrderId = pOrder.splitOrderId>,"all")}
函數邏輯如下:
- 先進行當前 Monitor 母單成員變量?
parentOrder
?、子單總股數成員變量?subOrderQtys
?的初始化。然后調用?initPOrderManageInfo
?函數,將當前母單事件記錄到內存表?parentOrderManage
?中,對應模塊 2 。 - 初始化完成,檢查當前時間是否到達開始拆單時間。若已經超過開始下單時間,則調用?
placeOrder
?函數對當前委托單進行拆單下單;否則等待到達開始時間后,調用?placeOrder
?函數。對應模塊 3 中的拆單開始時間判斷。 - 拆單啟動完成,使用?
addEventListener
?函數啟動對?OrderAlterAction
?事件的監聽。對應模塊 4 中的初始監聽。
- initPOrderManageInfo 記錄母單信息
initPOrderManageInfo
?函數的定義十分簡單,將監聽到的母單事件記錄到對應內存表?parentOrderManage
?中。
//初始記錄母單記錄信息def initPOrderManageInfo(pOrder){parentOrderManage=objByName('parentOrderManage')insert into parentOrderManage values(pOrder.splitOrderId,pOrder.eventType,pOrder.batchId,pOrder.tagBatchId,pOrder.sortNum,pOrder.fundCode,pOrder.fundName,pOrder.assetId,pOrder.assetName,pOrder.combinationNo,pOrder.combinationName,pOrder.stockCode,pOrder.stockName,pOrder.tradeDate,pOrder.tradeQuantity,pOrder.tradeDirection,pOrder.market,pOrder.handlerEmpid,pOrder.handlerName,pOrder.splitMethod,pOrder.orderType,pOrder.price,pOrder.startTime,pOrder.endTime,0,pOrder.orderStatus,0,pOrder.eventTime,now())}
- randNum 生成范圍內隨機數
randNum
?函數用于在給定范圍內生成隨機整數并返回。上文提到,用戶可以在母單事件中指定子單股數范圍為?lowChildOrderQty
?~?highChildOrderQty
?,拆單時間間隔范圍為?lowSplitInterval
?~?highSplitInterval
?。randNum
?函數用于在給定范圍內隨機生成子單股數和時間間隔。
// 在范圍內選取隨機數,被時間浮動和單量浮動調用def randNum(lowNum, highNum){if(lowNum == highNum){return lowNum}// 向量保存可以選擇的浮動值nums = array(INT, 0).append!(lowNum..highNum)// 隨機下標,范圍是0~highnum-lownum,這里返回的是數組,因此要帶上索引indexNum = highNum-lowNumindex = rand(indexNum, 1)[0];// 取出浮動值return nums[index];}
randNum
?函數先生成一個數組保存給定范圍內的所有整數,然后生成隨機下標,根據隨機下標訪問并返回。
- updatePOrderManageInfo 更新母單最后修改時間
updatePOrderManageInfo
?函數用于修改母單的最后修改時間。
//更新母單記錄信息def updatePOrderManageInfo(pOrder,opTime){parentOrderManage=objByName('parentOrderManage')update parentOrderManage set sortNum = pOrder.sortNum,orderStatus=pOrder.orderStatus, lastUpdateTime = opTime where splitOrderId = pOrder.splitOrderId}
- placeOrder 拆單核心函數
placeOrder
?函數是拆單下單的核心函數,對應模塊 3 中的后半部分。
//TWAP 下單方法def placeOrder(){//判斷是否超過下單時限if(now()>= parentOrder.endTime){ //當前時間大于下單結束時間,則不再下單parentOrder.setAttr(`orderStatus,'時限中止') updatePOrderManageInfo(parentOrder,now())return}//判斷當前母單狀態是否是可以下單狀態,不是則退出if(!(parentOrder.orderStatus in ['初始化','下單中'])){return}// 計算子單已經下過的單數totalQty = subOrderQtys//計算剩余待下單股數 = 母單股數 - 所有子單數remainQty = parentOrder.tradeQuantity - totalQty// 計算當前應該下的子單數//若剩余股數大于等于最小子單股數, 下隨機子單股數if(remainQty >= parentOrder.lowChildOrderQty){//下單股數,分兩個區間,如果剩余股數在low-high之間,則下的股數在low-remain之間隨機,否則在low-high之間隨機if(remainQty < parentOrder.highChildOrderQty){subOrderQty = randNum(parentOrder.lowChildOrderQty, remainQty)}else{subOrderQty = randNum(parentOrder.lowChildOrderQty, parentOrder.highChildOrderQty)}}else{//否則下單剩余股數subOrderQty = remainQty}// 更新子單股數subOrderQtys = subOrderQtys+subOrderQty// 更新剩余單數remainQty = remainQty-subOrderQty// 拿到母單的基金代碼v_securityId = parentOrder.fundCode// 直接從分布式表中進行查詢,定義一個函數if(parentOrder.priceOption == 0){//從買一價格獲取,從BidPrice[0]獲取價格// 從鍵值對表中獲取BidPrice = exec BidPrice from snapshotOutputKeyedTb where SecurityID = v_securityId// 買一價格subOrderPrice = BidPrice[0]}else{//從賣一價格獲取,從OfferPrice[0]獲取價格// 從鍵值對表中獲取OfferPrice = exec OfferPrice from snapshotOutputKeyedTb where SecurityID = v_securityId// 賣一價格subOrderPrice = OfferPrice[0]}//構建子單//創建子單時間subOrderPlaceTime = now()//構建下達子單到流表subOrderStream = objByName('subOrderStream')// 插入子單流數據表insert into subOrderStream values(parentOrder.splitOrderId,parentOrder.batchId,parentOrder.splitOrderId+'_'+(parentOrder.sortNum+1),parentOrder.sortNum+1,parentOrder.fundCode,parentOrder.fundName,parentOrder.assetId,parentOrder.assetName,parentOrder.combinationNo,parentOrder.combinationName,parentOrder.stockCode,parentOrder.stockName,parentOrder.tradeDate,subOrderQty,parentOrder.tradeDirection,parentOrder.market,parentOrder.handlerEmpid,parentOrder.handlerName,parentOrder.orderType,subOrderPrice,subOrderPlaceTime);//設置下單次數parentOrder.setAttr(`sortNum,parentOrder.sortNum+1) //判斷是否還需繼續下單if(remainQty>0){ parentOrder.setAttr(`orderStatus,'下單中') //保存母單信息updatePOrderManageInfo(parentOrder,subOrderPlaceTime)//設置下次下單的監聽,等待時間為拆單策略參數中設定 // addEventListener(handler=placeOrder, wait=duration(parentOrder.splitInterval+"s"), times=1) // 等待時間進行浮動realTime = randNum(parentOrder.lowSplitInterval, parentOrder.highSplitInterval)addEventListener(placeOrder,,,1,,duration(realTime+"s"))}else{//最后一次下單,銷毀下單監聽parentOrder.setAttr(`orderStatus,'下單完畢') //保存母單信息updatePOrderManageInfo(parentOrder,now())//下單完結,銷毀監視器destroyMonitor()}}
placeOrder
?函數邏輯如下:
- 判斷當前時間是否超過拆單結束時間,若超過結束時間,則設置母單狀態為?
'時限中止'
?,并調用updatePOrderManageInfo
?函數更新母單最后修改時間。 - 判斷當前母單狀態是否處于?
'初始化'
?或?'下單中'
?,如果不滿足,則停止拆單。 - 根據成員變量?
subOrderQtys
?和母單屬性?tradeQuantity
?計算剩余待下單股數?remainQty
?,調用?randNum
?函數確定子單的股數?subOrderQty
?,并對?subOrderQtys
?和?remainQty
?進行更新。 - 根據母單屬性?
priceOption
?確定子單價格為買一還是賣一價格,并從鍵值內存表?snapshotOutputKeyedTb
?中查詢。 - 構建子單,并插入子單流表?
subOrderStream
?。 - 根據剩余股數?
remainQty
?判斷是否還需要下單。若還需要下單,則保存母單狀態和修改時間,調用?randNum
?函數確定下單間隔,并定時再次調用?placeOrder
?函數重復上述步;若已經沒有剩余股數,則保存母單狀態和修改時間,最后摧毀監視器。
- orderAlter 母單狀態管理
startPlaceOrder
?函數中設置了對?OrderAlterAction
?事件的監聽。當?OrderAlterAction
?事件注入,調用?orderAlter
?函數對母單狀態進行修改。
//母單拆單變更操作def orderAlter(oaAction){alterOrderManage=objByName('alterOrderManage')insert into alterOrderManage values (oaAction.splitOrderId,oaAction.eventType,oaAction.operation,oaAction.batchId,oaAction.handlerEmpid,oaAction.handlerName,now()) if(oaAction.operation=='暫停'){ parentOrder.setAttr(`orderStatus,'暫停')updatePOrderManageInfo(parentOrder,now())}else if(oaAction.operation=='恢復'&& parentOrder.orderStatus=='暫停'){parentOrder.setAttr(`orderStatus,'下單中')updatePOrderManageInfo(parentOrder,now())//重新開始下單 placeOrder()}else if(oaAction.operation=='終止母單'){parentOrder.setAttr(`orderStatus,'終止母單') //保存母單信息updatePOrderManageInfo(parentOrder,now())//下單完結,銷毀監視器 destroyMonitor()}}
函數邏輯如下:
- 將狀態修改記錄保存到狀態修改流表?
alterOrderManage
?中。 - 根據?
OrderAlterAction
?事件的?operation
?屬性對母單做對應的操作。若?operation
?為?'暫停'
?,則將母單狀態設置為暫停,并更新最后修改時間,此時?placeOrder
?中的狀態校驗不通過,不會進行拆單操作;若?operation
?為?'恢復'
?并且母單狀態為?'暫停'
?,則將母單狀態重置為下單中,并更新最后修改時間,此時?placeOrder
?中的狀態校驗通過,將繼續拆單下單過程;若?operation
?為?'終止母單'
?,則將母單狀態設置為終止母單,更新最后修改時間,并摧毀監視器,結束拆單下單過程。
step5.創建 CEP 引擎并訂閱異構流表
使用?createCEPEngine
?函數創建 CEP 引擎,并使用?subscribeTable
?函數使 CEP 引擎訂閱異構流表?orderBlobStream
?,orderBlobStream
?中接收?ParentOrder
?和?OrderAlterAction
?事件流。
//創建下單任務引擎,表示TwapSplitMonitor引擎訂閱的流數據表類型(母單類型和變更操作類型,多余的字段壓縮到BLOB中)
dummy = table(1:0, `timestamp`eventType`blobs`splitOrderId, `TIMESTAMP`STRING`BLOB`STRING)
//創建cep 監聽引擎
engine = createCEPEngine(name='TwapSplitMonitor', monitors=<TWAPSplitMonitor()>, dummyTable=dummy, eventSchema=[ParentOrder,OrderAlterAction],timeColumn=`timestamp)// 訂閱異構流表
subscribeTable(tableName="orderBlobStream", actionName="orderBlobStream",handler=getStreamEngine("TwapSplitMonitor"),msgAsTable=true)
step6-回放行情數據
使用?replay
?函數,向行情快照流表?snapshotStream
?中回放分布式表中的歷史行情快照數據,模擬實時行情寫入。
// 往行情流表里回放行情數據
snapshotTb = loadTable("dfs://l2TLDB_TWAP","snapshot")
// 回放前十支股票就行,選擇需要的列
replayData = select Market,TradeDate,TradeTime,SecurityID,OfferPrice,BidPrice,OfferOrderQty,BidOrderQty from snapshotTb where SecurityID in (select distinct SecurityID from snapshotTb limit 10) and TradeTime>=09:30m
// 每秒20條回放
submitJob("replay_snapshot", "snapshot", replay{replayData, snapshotStream, `TradeDate, `TradeTime, 20, true})
本例中歷史行情快照數據保存在分布式表?snapshot
?中,由于基金行情快照數據太多,這里僅回放 10 條基金的行情數據。snapshotStream
?接收到回放數據后,自動將增量數據發布到?snapshotOutputKeyedTb
?。最終?snapshotOutputKeyedTb
?中,保存 10 支基金的最新行情快照數據。
step7-啟動策略實例
使用 DolphinDB 提供的 JAVA API ,將?ParentOrder
?事件放入?orderBlobStream
?,啟動拆單策略;隨后將?OrderAlterAction
?事件放入?orderBlobStream
?,觀察 CEP 引擎對母單狀態的管理。核心函數?putOrder
?如下。完整的啟動策略項目文件見附錄。
public static void putOrder() throws IOException, InterruptedException {// 連接dolphindb數據庫DBConnection conn = DBUtil.getConnection();// 封裝母單訂閱流表EventSender sender1 = EventSenderHelperTWAP.createEventSender(conn);// 拿到拆單參數mapHashMap<String, Object> map = getMap();// 定義母單DolphinDbParentSplitParamsVo dolphinDbParentVo1 = new DolphinDbParentSplitParamsVo("SP_001_2025030500001", // splitOrderId: 母單拆單操作單元唯一ID"ParentOrder", // eventType: 事件類型"2025030500001", // batchId: 母單唯一ID"", // tagBatchId: 子單唯一ID0, // sortNum: 拆單順序號(從1開始)"501019", // fundCode: 基金代碼"藍籌精選混合", // fundName: 基金名稱"A123456789", // assetId: 資產單元ID"量化投資組合", // assetName: 資產單元名稱"C789", // combinationNo: 組合編號"全天候策略", // combinationName: 組合名稱"600000", // stockCode: 證券代碼"浦發銀行", // stockName: 證券名稱"20231010", // tradeDate: 交易日期(yyyyMMdd)48000L, // tradeQuantity: 交易量(注意L后綴)"B", // tradeDirection: 交易方向(1-買入)"SSE", // market: 交易市場"E1001", // handlerEmpid: 執行人工號"王強", // handlerName: 執行人姓名(String) map.get("splitMethod"), // splitMethod: 拆單算法(String) map.get("orderType"), // orderType: 訂單類型(Double) map.get("price"), // 子單下單價格(Integer) map.get("priceOption"), //選擇賣一價格下子單(LocalDateTime) map.get("startTime"), // startTime: 拆單開始時間(LocalDateTime) map.get("endTime"), // endTime: 拆單結束時間(Integer) map.get("lowChildOrderQty"), //子單數量范圍(Integer) map.get("highChildOrderQty"),(Integer) map.get("lowSplitInterval"), // splitInterval: 拆單間隔范圍(秒)(Integer) map.get("highSplitInterval"),"", // orderStatus: 拆單狀態LocalDateTime.now() // eventTime: 事件下達時間);// 定義暫停操作DolphinDbOrderActionVo orderAlterAction = new DolphinDbOrderActionVo("SP_001_2025030500001", // splitOrderId: 母單拆單操作單元唯一ID"subOrder", // eventType"暫停", // operation"2025030500001", // batchId"OOOO1", // handlerEmpid"王強", // handlerNameLocalDateTime.now() // eventTime);// 定義恢復操作DolphinDbOrderActionVo orderAlterAction1 = new DolphinDbOrderActionVo("SP_001_2025030500001", // splitOrderId: 母單拆單操作單元唯一ID"subOrder", // eventType"恢復", // operation"2025030500001", // batchId"OOOO1", // handlerEmpid"王強", // handlerNameLocalDateTime.now() // eventTime);//發送母單,將母單放入訂閱流表,供CEP引擎消費sender1.sendEvent(dolphinDbParentVo1.getEventType(), dolphinDbParentVo1.toEntities());System.out.println("母單放入母單訂閱流表");Thread.sleep(5000);//下達暫停,將母單狀態暫停單放入訂閱流表,供CEP引擎消費sender1.sendEvent(orderAlterAction.getEventType(), orderAlterAction.toEntities());System.out.println("暫停單放入訂閱流表");Thread.sleep(5000);//下達恢復,將母單狀態恢復單放入訂閱流表,供CEP引擎消費sender1.sendEvent(orderAlterAction1.getEventType(), orderAlterAction1.toEntities());System.out.println("恢復單放入訂閱流表");}
母單中的拆單參數,通過?HashMap
?傳入,模擬現實系統中的用戶自定義拆單參數傳遞。
2.4 結果檢視
本小節通過查看輸出事件展示拆單系統運行的結果。DolphinDB Web 端提供了強大的數據可視化和分析工具——數據面板(Dashboard),旨在幫助用戶更好地理解和利用數據。在本例中,母單事件、母單狀態修改事件注入 CEP 引擎都分別記錄到對應的內存表,拆分子單進行下單記錄到子單接收流表,如此在 Dashboard 中便可以選取需要的數據進行可視化。
step1-JAVA 環境準備
配置系統的 maven 、jdk 環境,本例中的 jdk 和 maven 版本如下:
jdk - java version "1.8.0_441"
maven - Apache Maven 3.8.6
step2-數據準備
下載附錄中的 TWAP 算法代碼并解壓。將?data/snapshot_twap.csv
?放到?dolphindb/server
?目錄下。運行導入腳本?data/data_input.dos
?進行建庫建表,并將測試數據導入建好的分布式表中。將?data/dashboard.TWAP 拆單監控.json
?導入到 Dashboard。
step3-系統環境準備
運行腳本?01 clearEnv.dos
?、?02 Event.dos
?、?03 createTable.dos
?、?04 subscribeSnapshot.dos
?。01 clearEnv.dos
?腳本將系統中已存在的內存共享表、訂閱信息、流式引擎等進行清除,確保不會重復定義;?02 Event.dos
?、?03 createTable.dos
?、?04 subscribeSnapshot.dos
?腳本分別對應上文介紹的定義事件類、創建母單子單記錄內存表、鍵值內存表訂閱流表的功能。
step4-CEP引擎創建
運行腳本?05 Monitor.dos
?、06 createCEPEngine.dos
?,分別對應上文中定義監視器、創建CEP引擎訂閱異構流表的功能。
step5-回放行情快照數據
運行腳本?07 replaySnapshot.dos
?,將快照數據回放到快照流表?snapshotStream
?中。由于鍵值內存表?snapshotOutputKeyedTb
?訂閱了?snapshotStream
?,數據會被自動發布到?snapshotOutputKeyedTb
?中。回放后使用如下語句查詢?snapshotOutputKeyedTb
?中的數據:
select * from snapshotOutputKeyedTb
查詢到十支基金的最新的行情數據如下:
圖2-5 鍵值內存表數據示意圖
其中前四列分別表示基金所屬市場、交易日期、交易時間和基金唯一代碼;?OfferPrice
?和?BidPrice
?表示市場中賣盤和買盤十檔價格;OfferOrderQty
?和?BidOrderQty
?表示市場中對應的賣盤和買盤十檔委托量。
step6-啟動策略
下載附錄中的 JAVA API 策略啟動代碼并解壓。修改?common/DBUtil.java
?中的數據庫配置為用戶自己的環境。運行?startTWAP.java
?,將母單事件、母單狀態修改事件放入異構流表中,觀察 Dashboard 中,母單監控、子單監控、變更單監控中的對應輸出如下:
圖2-6 Dashboard輸出結果圖
本例中,startTWAP.java
?中指定:母單總股數為 48000 ,子單的下單數量在 10000 ~ 12000 間浮動,下單間隔在 7s ~ 9s 間浮動,母單的基金代碼是?501019
?,?priceOption
?指定為 0 即選擇子單價格為買一價格。
可以觀察到,子單的下單數量在給定范圍內隨機;從子單的下單時間也能推理出,下單間隔也在給定范圍內隨機;子單的下單價格為?snapshotOutputKeyedTb
?中基金代碼為?501019
?的買一價格。
2.5 總結
本章通過循序漸進的方式,介紹了如何使用 DolphinDB 的 CEP引擎實現 TWAP 拆單算法。首先說明了算法思想;然后模塊化介紹了系統的功能;其次詳細介紹了系統的實現流程和代碼,其中最復雜的是監視器定義的部分,詳細闡述了各個函數之間的調用關系;最后將系統的拆單過程,在結果檢視部分使用 Dashboard 進行展示。
3. VWAP 拆單算法
本章將介紹如何使用 CEP 引擎實現 VWAP 拆單算法。
3.1 算法思想
VWAP(Volume Weighted Average Price,成交量加權平均價格)算法是一種廣泛使用的拆單交易策略,主要適用于大額訂單執行。該模型通過分析歷史成交量分布模式,將大訂單按成交量比例拆分到各個時間段執行,使成交均價盡可能接近市場 VWAP 基準。VWAP 的計算公式如下:
其中,pricei?為分割節點上拆分訂單的價格,volumei?為分割節點上拆分訂單的股數。
與 TWAP 均勻分配不同,VWAP 會根據市場典型的成交量分布特征動態調整下單量,在成交量大的時段分配更多訂單,在成交量小的時段分配較少訂單。這種設計既考慮了時間因素,又充分尊重市場的流動性分布規律。
3.2.功能模塊
VWAP 算法的功能模塊與上文中的 TWAP 相似。算法邏輯通過 CEP 引擎實現;通過數據回放功能模擬實時行情快照寫入;使用流表訂閱功能解耦用戶母單發布和拆單 CEP 引擎。大致流程如下圖所示。
圖3-1 VWAP算法流程圖
與 TWAP 算法不同的是:
- VWAP 拆單的子單股數不再是給定范圍內隨機,而是根據前一天的逐筆交易數據,先計算出每個時段的交易量占全天總交易量的比例,子單股數為前一天當前時段的交易量比例與母單股數的乘積。本例中時段長度為一分鐘。
- VWAP 拆單的時間間隔不再是給定范圍內隨機,而是固定的時段間隔,即一分鐘。
3.3 代碼實現
在本節中我們介紹 VWAP 算法的代碼實現,也主要圍繞與上文 TWAP 算法的區別來闡述。
step1-定義事件類
定義事件類代碼與 TWAP 算法?step1-定義事件類?類似,不同的是,母單類中不再有子單股數范圍上下限、拆單間隔范圍上下限四個參數:
//拆單參數開始splitMethod :: STRING // 拆單算法orderType :: STRING // 訂單類型(限價/市價)price :: DOUBLE // 限定價格priceOption :: INT // 買一還是賣一價格startTime :: TIMESTAMP // 拆單開始時間endTime :: TIMESTAMP // 拆單結束時間orderStatus :: STRING // 拆單狀態//拆單參數結束eventTime :: TIMESTAMP // 事件下達時間
step2-創建內存表
- 創建母單記錄內存表?
parentOrderManage
?,修改訂單記錄內存表?alterOrderManage
?,子單接收流表?subOrderStream
?以及 CEP 引擎訂閱的異構流數據表?orderBlobStream
?。代碼同 TWAP 算法。 - 創建一個內存表,用于存儲基金一天每分鐘的交易量。代碼如下:
// 創建內存表,用于存儲每分鐘的交易量
trade = loadTable("dfs://l2TLDB","trade")
tradeData = select * from trade where SecurityID = "300041" and TradeDate = 2023.02.01
QtyTB =select * from (select sum(TradeQty) as TradeQty from tradeData group by minute(TradeTime) as TradeTimeInterval order by TradeTimeInterval) where TradeTimeInterval > 09:29m
share QtyTB as QtyTb
最終?QtyTb
?表中記錄了基金一天每分鐘的交易量,如圖所示。
圖3-2 歷史交易量表
step3-鍵值內存表訂閱流表
鍵內存表訂閱流表代碼同 TWAP 算法?step3-鍵值內存表訂閱流表?。
step4-定義監視器
VWAP 拆單算法中,子單股數和拆單時間間隔不需要在給定范圍內隨機,因此監視器類中不需要?rand
?函數。監視器類的結構如下:
class SplitMonitor:CEPMonitor{def SplitMonitor() {//本例中,初始monitor 不需要傳值, 在克隆復制任務monitor 時進行設置。}//初始記錄母單記錄信息def initPOrderManageInfo(pOrder){...}//更新母單記錄信息def updatePOrderManageInfo(pOrder,opTime){...}
}//vwap 算法下單監聽 monitor,繼承關系
class VWAPSplitMonitor:SplitMonitor {// 記錄子單總下股數的變量subOrderQtys :: INT// 母單parentOrder :: ParentOrder// 開始拆單與歷史交易表對應的時間(分鐘)splitStartTime :: MINUTEdef VWAPSplitMonitor() {//本例中,初始monitor 不需要傳值, 在克隆復制任務monitor 時進行設置。}//VWAP 下單方法def placeOrder(){...}//初始化parentOrder,進行拆單下單,設置OrderAlterAction事件監聽 def startPlaceOrder(pOrder){...}//創建母單下單monitor實例def forkParentOrderMonitor(pOrder){}//初始任務def onload(){addEventListener(forkParentOrderMonitor, "ParentOrder", ,"all")}
}
成員變量?subOrderQtys
?:使用?subOrderQtys
?記錄當前已下子單的股數和,避免超出母單股數。
成員變量?parentOrder
?:記錄當前母單的參數,包括基本信息,拆單參數,和拆單狀態。
成員變量?splitStartTime
?:記錄當前子單的時間,與歷史交易表時段對應。
下面將按照 CEP 引擎運作的邏輯順序,依次介紹監視器中各個成員方法的具體內容。
- onload 初始任務
創建引擎并實例化監視器后,將首先調用其內部的?onload
?函數。在?onload
?函數中,使用?addEventListener
?函數監聽?ParentOrder
?事件注入,指定回調函數為?forkParentOrderMonitor
?,事件類型是?ParentOrder
?,設置監聽次數為持續監聽。
//初始任務def onload(){ addEventListener(forkParentOrderMonitor, "ParentOrder", ,"all")}
onload
?方法設置了一個事件監聽器,監聽所有的母單事件?ParentOrder
?。當監聽到該類型事件時,下一步將啟動整個拆單下單過程。為了控制母單拆單下單的線程安全,為每個母單創建不同的 Monitor 實例,因此在對應的回調函數?forkParentOrderMonitor
?中需要包含對 Monitor 實例的創建和母單參數傳遞等步驟。從?onload
?方法開始,函數調用的流程與實現的功能可以分為三個模塊,如下圖所示。
圖3-3 函數調用模塊圖
其中,?startPlaceOrder
?函數是兩個模塊的啟動函數,啟動順序如上圖所示。模塊 3 中,函數調用的流程與實現的功能如下圖所示。
圖3-4 模塊三函數調用流程圖
接下來從策略啟動事件對應的回調函數?forkParentOrderMonitor
?開始來介紹具體的代碼實現。
- forkParentOrderMonitor 生成監視器實例
forkParentOrderMonitor
?函數代碼實現同 TWAP 算法?step5.定義監視器?。
- startPlaceOrder 啟動核心模塊
startPlaceOrder
?函數代碼實現大致同 TWAP 算法?step5.定義監視器?。但需要對拆單時間進行初始化。
// 交易開始時間初始化
?splitStartTime =
?minute(09:30m)
即從上午 9:30 開始拆單。
- initPOrderManageInfo 記錄母單信息
initPOrderManageInfo
?函數代碼實現同 TWAP 算法?step5.定義監視器。
- updatePOrderManageInfo 更新母單最后修改時間
updatePOrderManageInfo
?函數代碼實現同 TWAP 算法?step5.定義監視器。
- placeOrder 拆單核心函數
placeOrder
?函數是拆單下單的核心函數,對應模塊 3 中的后半部分。
//VWAP 下單方法def placeOrder(){writeLog("============開始下單==========")//判斷是否超過下單時限if(now()>= parentOrder.endTime){ //當前時間大于下單結束時間,則不再下單parentOrder.setAttr(`orderStatus,'時限中止') updatePOrderManageInfo(parentOrder,now())// 摧毀監視器destroyMonitor()return}//判斷當前母單狀態是否是可以下單狀態,不是則退出if(!(parentOrder.orderStatus in ['初始化','下單中'])){return}//計算剩余待下單股數 = 母單股數 - 所有子單數remainQty = parentOrder.tradeQuantity - subOrderQtys// 計算當前應該下的子單數// 查找歷史交易表,計算總交易量totalQty = exec sum(TradeQty) from QtyTb// 跳過中間的空白區域if(splitStartTime==11:31m){splitStartTime = 13:00m}// 將當前拆單時間轉為分鐘nowTime = minute(splitStartTime)// 查找歷史交易表,當前時段(一分鐘),是否有交易量nowQtyVector = exec TradeQty from QtyTb where TradeTimeInterval=nowTime// 確定子單股數if(nowQtyVector.size()==0){//當前時段沒有交易記錄,不下單,直接設置下一次下單的監聽parentOrder.setAttr(`orderStatus,'下單中') //保存母單信息updatePOrderManageInfo(parentOrder,now())// 將拆單時間更新splitStartTime = temporalAdd(splitStartTime,1,"m")//設置下次下單的監聽,等待時間為10s addEventListener(placeOrder,,,1,,duration(10+"s"))return}else{// 拿到當前時段總交易量nowQty = nowQtyVector[0]totalNum = parentOrder.tradeQuantity// 根據比例計算,這里類型轉換是為了計算小數比例,否則為0,最后向上取整subOrderQty = ceil((double(nowQty)/totalQty)*totalNum)// subOrderQty可能會超過剩余單數subOrderQty = (subOrderQty>remainQty) ? remainQty : subOrderQty}// 將拆單時間更新splitStartTime = temporalAdd(splitStartTime,1,"m")// 更新子單股數subOrderQtys = subOrderQtys+subOrderQty// 更新剩余單數remainQty = remainQty-subOrderQty// 拿到母單的基金代碼v_securityId = parentOrder.fundCode// 直接從分布式表中進行查詢,定義一個函數if(parentOrder.priceOption == 0){//從買一價格獲取,從BidPrice[0]獲取價格// 從鍵值對表中獲取BidPrice = exec BidPrice from snapshotOutputKeyedTb where SecurityID = v_securityId// 買一價格subOrderPrice = BidPrice[0]}else{//從賣一價格獲取,從OfferPrice[0]獲取價格// 從鍵值對表中獲取OfferPrice = exec OfferPrice from snapshotOutputKeyedTb where SecurityID = v_securityId// 賣一價格subOrderPrice = OfferPrice[0]}//構建子單//創建子單時間subOrderPlaceTime = now()//構建下達子單到流表subOrderStream = objByName('subOrderStream') // 插入子單流數據表insert into subOrderStream values(parentOrder.splitOrderId,parentOrder.batchId,parentOrder.splitOrderId+'_'+(parentOrder.sortNum+1),parentOrder.sortNum+1,parentOrder.fundCode,parentOrder.fundName,parentOrder.assetId,parentOrder.assetName,parentOrder.combinationNo,parentOrder.combinationName,parentOrder.stockCode,parentOrder.stockName,parentOrder.tradeDate,subOrderQty,parentOrder.tradeDirection,parentOrder.market,parentOrder.handlerEmpid,parentOrder.handlerName,parentOrder.orderType,subOrderPrice,subOrderPlaceTime);//設置下單次數parentOrder.setAttr(`sortNum,parentOrder.sortNum+1) //更新剩余股數updateRemainQty()//判斷是否還需繼續下單if(remainQty>0){ parentOrder.setAttr(`orderStatus,'下單中') //保存母單信息updatePOrderManageInfo(parentOrder,subOrderPlaceTime)//設置下次下單的監聽,等待時間為10s addEventListener(placeOrder,,,1,,duration(10+"s"))}else{//最后一次下單,銷毀下單監聽parentOrder.setAttr(`orderStatus,'下單完畢') //保存母單信息updatePOrderManageInfo(parentOrder,now())//下單完結,銷毀監視器destroyMonitor()}}
placeOrder
?函數邏輯如下:
- 判斷當前時間是否超過拆單結束時間,若超過結束時間,則設置母單狀態為?
'時限中止'
?,并調用updatePOrderManageInfo
?函數更新母單最后修改時間。 - 判斷當前母單狀態是否處于?
'初始化'
?或?'下單中'
?,如果不滿足,則停止拆單。 - 根據成員變量?
subOrderQtys
?和母單屬性?tradeQuantity
?計算剩余待下單股數?remainQty
?,查詢歷史交易表?QtyTb
?,根據當前時段的歷史交易比例和母單股數,計算確定子單的股數?subOrderQty
?,并對?subOrderQtys
?和?remainQty
?進行更新。 - 根據母單屬性?
priceOption
?確定子單價格為買一還是賣一價格,并從鍵值內存表?snapshotOutputKeyedTb
?中查詢。 - 構建子單,并插入子單流表?
subOrderStream
?。 - 根據剩余股數?
remainQty
?判斷是否還需要下單。若還需要下單,則保存母單狀態和修改時間,并定時再次調用?placeOrder
?函數重復上述步;若已經沒有剩余股數,則保存母單狀態和修改時間,最后摧毀監視器。
step5.創建 CEP 引擎訂閱異構流表
創建 CEP 引擎訂閱異構流表代碼同 TWAP 算法?step6.創建 CEP 引擎并訂閱異構流表?。
step6-回放行情數據
回放行情數據代碼同 TWAP 算法?step4-回放行情數據?。
step7.啟動策略實例
啟動策略實例代碼同 TWAP 算法?step7.啟動策略實例?。
3.4 結果檢視
本小節通過查看 Dashboard 中輸出事件展示拆單系統運行的結果。在本例中,母單事件、母單狀態修改事件注入 CEP 引擎都分別記錄到對應的內存表,拆分子單進行下單記錄到子單接收流表,如此在 Dashboard 中便可以選取需要的數據進行可視化。
step1-JAVA環境準備
配置系統的 maven 、jdk 環境,本例中的 jdk 和 maven 版本如下:
jdk - java version "1.8.0_441"
maven - Apache Maven 3.8.6
step2-數據準備
下載附錄中的 VWAP 算法代碼并解壓。將?data/snapshot_vwap.csv
?、data/trade.csv
放到?dolphindb/server
?目錄下。運行導入腳本?data/data_input_snapshot.dos
?進行建庫建表,并將測試數據導入建好的分布式表中。運行導入腳本?data/data_input_trade.dos
?進行建庫建表,并將測試數據導入建好的分布式表中。將?data/dashboard.VWAP 拆單監控.json
?導入到 Dashboard 。
step3-系統環境準備
運行腳本?01 clearEnv.dos
?、?02 Event.dos
?、?03 createTable.dos
?、?04 subscribeSnapshot.dos
?。01 clearEnv.dos
?腳本將系統中已存在的內存共享表、訂閱信息、流式引擎等進行清除,確保不會重復定義;?02 Event.dos
?、?03 createTable.dos
?、?04 subscribeSnapshot.dos
?腳本分別對應上文介紹的定義事件類、創建內存表、鍵值內存表訂閱流表的功能。
step4-CEP 引擎創建
運行腳本?05 Monitor.dos
?、06 createCEPEngine.dos
?,分別對應上文中定義監視器、創建CEP引擎訂閱異構流表的功能。
step5-回放行情快照數據
運行腳本?07 replaySnapshot.dos
?,將快照數據回放到快照流表?snapshotStream
?中。由于鍵值內存表?snapshotOutputKeyedTb
?訂閱了?snapshotStream
?,數據會被自動發布到?snapshotOutputKeyedTb
?中。
step6-啟動策略
下載附錄中的 JAVA API 策略啟動代碼并解壓。修改?common/DBUtil.java
?中的數據庫配置為用戶自己的環境。運行?startVWAP.java
?,將母單事件放入異構流表中,觀察 Dashboard 中的輸出。
- 子單監控中的對應輸出如下:
圖3-5 子單監控輸出
本例中,startVWAP.java
?中指定:母單的基金代碼是?300041
?,總股數為48000,?priceOption
?指定為 0 即選擇子單價格為買一價格。
可以觀察到,子單的下單數量根據當前時段的歷史交易比例計算得到,下單時間間隔為 10s;子單的下單價格為?snapshotOutputKeyedTb
?中基金代碼為?300041
?的買一價格。
3.5 總結
本章通過循序漸進的方式,介紹了如何使用 DolphinDB 的 CEP引擎實現 VWAP 拆單算法。首先說明了算法思想;然后模塊化介紹了系統的功能;其次詳細介紹了系統的實現流程和代碼,其中最復雜的是監視器定義的部分,詳細闡述了各個函數之間的調用關系;最后將系統的拆單過程,在結果檢視部分使用 Dashboard 進行展示。
4. 總結
本文基于 CEP 引擎構建了一套完整的算法拆單調度框架。通過??事件驅動架構??和??動態事件監聽器??來實現訂單下達、拆單的監聽與處理。同時利用??鍵值內存表緩存實時行情快照,為子單價格計算提供毫秒級響應支持。在功能實現上,文中提供了了 TWAP 和 VWAP 拆單算法實現 Demo (隨機浮動參數避免市場預測) ,并通過????可視化監控面板??進行了實時展示訂單狀態等信息。
5. 附錄
5.1 TWAP 算法
demo1_TWAP
5.2 VWAP 算法
demo2_VWAP.zip
5.3 JAVA API 下達母單啟動代碼
cepSplitDemo(基礎)