全面評測 DOCA 開發環境下的 DPU:性能表現、機器學習與金融高頻交易下的計算能力分析

本文介紹了我在 DOCA 開發環境下對 DPU 進行測評和計算能力測試的一些真實體驗和記錄。在測評過程中,我主要關注了 DPU 在高并發數據傳輸和深度學習場景下的表現,以及基本的系統性能指標,包括 CPU 計算、內存帶寬、多線程/多進程能力和 I/O 性能,并測試了在機器學習應用下的潛在性能。此外,我重點結合了金融高頻交易的應用場景,DOCA 展現出了其在低延遲、高吞吐量和高可靠性方面的卓越優勢,進一步證明了其在高性能計算和實時數據處理中的廣泛應用潛力。


一、測評環境

這是一臺裝載雙端口 DPU 的服務器,操作系統為 Ubuntu 22.04。

我們先來查看CPU信息:

lscpu

在這里插入圖片描述
可以看到這是一臺基于 ARM Cortex-A78AE 內核的 64 位 ARM 平臺設備,有16個核心、較為充足的多級緩存、支持高級SIMD和加密指令擴展,并針對一些常見CPU安全漏洞進行了一定程度的緩解。

接著,我們查看設備型號:

mst status -v
mlxconfig -d /dev/mst/mt41692_pciconf0 -e q

在這里插入圖片描述
可以看到設備具體型號是 NVIDIA BlueField-3 B3220 P-Series FHHL DPU,雙端口 QSFP112 接口,支持 200GbE(默認模式)或 NDR200 IB。具有16個 Arm 核心處理器和32GB 板載 DDR 內存,PCIe接口為 Gen5.0 x16。


二、測評目標

這次測評的目標是評估 DOCA 環境下 DPU 的實際性能表現,看它在數據密集型任務、高并發通信及后續可能的深度學習任務中能有怎樣的表現。我首先登錄到指定的 DPU 服務器,搭建基礎開發環境,然后編譯運行 DPA All-to-All 應用,觀察其運行表現。

為了達到上述目標,我們制定以下測評步驟:

  1. 通過 SSH 登錄 DPU
  2. 搭建并清理編譯環境(Meson、Ninja)
  3. 安裝和檢查 MPI 環境 (mpich)
  4. 構建啟用 dpa_all_to_all 功能的 DOCA 應用
  5. 使用 mpirun 測試并觀察數據傳輸性能
  6. 安裝支持 CUDA 的 PyTorch 版本(pip install torch...
  7. 使用 Python 腳本進行 CPU、內存、多線程、多進程和 I/O 的性能測試
  8. 結合 Torch,以后可拓展對深度學習任務的 DPU 加速能力進行評估(本次僅基本測試計算與性能)

三、測評步驟

1. 測評環境構建

首先,通過 SSH 連接到 DPU 服務器,確保具備必要的權限和網絡配置。

ssh -p 8889 cqd*****@113.**.***.73
密碼: **********

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

進入應用程序目錄,準備開發環境:

cd /opt/mellanox/doca/applications

檢查并安裝必要的 MPI 庫:

dpkg -l | grep mpich

在這里插入圖片描述

apt-get install mpich

在這里插入圖片描述

清理之前的構建文件,確保環境整潔:

rm -rf /tmp/build

使用 Meson 構建系統配置項目,啟用特定功能:

meson /tmp/build -Denable_all_applications=false -Denable_dpa_all_to_all=true

在這里插入圖片描述
在這里插入圖片描述

通過 Ninja 進行編譯:

ninja -C /tmp/build

檢查 Mellanox 狀態,確保硬件正常運行:

mst status -v

在這里插入圖片描述

這里可以看到我們的雙端口 DPU。


2. All-to-All MPI 性能測試

在開始實操之前,我們先來了解一下什么是 All-to-all 。

All-to-all 是一種 MPI(消息傳遞接口)方法。MPI 是一種標準化且可移植的消息傳遞標準,旨在在并行計算體系結構上運行。一個 MPI 程序由多個進程并行運行。

其運行示例圖如下:

在這里插入圖片描述

在上圖中,每個進程將其本地的發送緩沖區(sendbuf)分成 n 個塊(本例中為 4 個塊),每個塊包含 sendcount 個元素(本例中為 4 個元素)。進程 i 會將其本地發送緩沖區中的第 k 個塊發送給進程 k,而進程 k 則將這些數據放置在其本地接收緩沖區(recvbuf)的第 i 個塊中。

通過使用 DOCA DPA 來實現 all-to-all 方法,可以將從 srcbuf 復制元素到 recvbufs 的過程卸載給 DPA,從而使 CPU 解放出來,去執行其他計算工作。

下圖描述了基于主機的全對全和 DPA 全對全之間的區別。

在這里插入圖片描述

  • 在 DPA all-to-all 中,DPA 線程執行 all-to-all,而 CPU 可以自由地進行其他計算;
  • 在基于主機的全對全中,CPU 在某些時候仍必須執行全對全,并且不能完全自由地進行其他計算;

下面我們來實操:

我們使用 mpirun 運行 DPA All-to-All 應用,進行性能測試:

mpirun -np 4 /tmp/build/dpa_all_to_all/doca_dpa_all_to_all -m 32 -d "mlx5_0"

返回結果如圖:

在這里插入圖片描述

從運行結果上,我們不難看出 ,DPU 很快完成了數據分發和聚合,顯著降低了 CPU 在全對全通信中的參與度和負載,同時還提高了整體吞吐率并降低了通信延遲,無論在性能表現還是資源利用率上都非常出色,并且穩定性也很強。

性能測試結果分析:

指標描述
性能表現DPU 在處理高并發數據傳輸任務時表現出色,能夠有效利用多核資源,實現低延遲和高吞吐量。
資源利用率CPU 和內存的利用率保持在合理范圍內,未出現資源瓶頸。
穩定性應用運行穩定,未出現崩潰或異常中斷的情況。

測試結束后,清理構建文件:

rm -rf /tmp/build

3. 多項能力的基準測評

在初步運行 DPA All-to-All 應用后,我進一步進行了計算能力測試,用來簡單評估系統的基礎計算能力和 I/O 性能。這些測試不僅針對 CPU 和內存的單一指標,也考察多線程、多進程并行處理能力,以及文件 I/O 表現。

我們編寫并運行了以下 Python 腳本,涵蓋多項性能測試,包括 CPU 計算性能、內存帶寬、多線程與多進程性能以及 I/O 性能。代碼對 CPU 矩陣乘法、內存帶寬、多線程、多進程以及 I/O 進行了基準測評。

在這里插入圖片描述

全部代碼如下:

import time
import numpy as np
import multiprocessing
import threading
import os# 測試 CPU 計算性能(矩陣乘法)
def cpu_compute_benchmark(matrix_size=1000, iterations=100):print("開始 CPU 計算性能測試(矩陣乘法)...")A = np.random.rand(matrix_size, matrix_size)B = np.random.rand(matrix_size, matrix_size)start_time = time.time()for _ in range(iterations):C = np.matmul(A, B)end_time = time.time()total_time = (end_time - start_time) * 1000  # 毫秒print(f"CPU 計算總時長: {total_time:.2f} ms")# 測試內存帶寬
def memory_bandwidth_benchmark(array_size=10000000):print("開始內存帶寬測試...")A = np.ones(array_size, dtype=np.float64)start_time = time.time()B = A * 2C = B + 3end_time = time.time()total_time = (end_time - start_time) * 1000  # 毫秒print(f"內存帶寬測試總時長: {total_time:.2f} ms")# 測試多線程性能
def thread_task(n):# 簡單的計算任務total = 0for i in range(n):total += i*ireturn totaldef multithreading_benchmark(num_threads=8, iterations=1000000):print("開始多線程性能測試...")threads = []start_time = time.time()for _ in range(num_threads):thread = threading.Thread(target=thread_task, args=(iterations,))threads.append(thread)thread.start()for thread in threads:thread.join()end_time = time.time()total_time = (end_time - start_time) * 1000  # 毫秒print(f"多線程測試總時長: {total_time:.2f} ms")# 測試多進程性能
def process_task(n):total = 0for i in range(n):total += i*ireturn totaldef multiprocessing_benchmark(num_processes=8, iterations=1000000):print("開始多進程性能測試...")pool = multiprocessing.Pool(processes=num_processes)start_time = time.time()results = pool.map(process_task, [iterations] * num_processes)pool.close()pool.join()end_time = time.time()total_time = (end_time - start_time) * 1000  # 毫秒print(f"多進程測試總時長: {total_time:.2f} ms")# 測試 I/O 性能(文件讀寫)
def io_benchmark(file_size_mb=100, iterations=10):print("開始 I/O 性能測試...")filename = "temp_test_file.dat"data = os.urandom(file_size_mb * 1024 * 1024)  # 生成隨機數據# 寫入測試start_time = time.time()for _ in range(iterations):with open(filename, 'wb') as f:f.write(data)end_time = time.time()write_time = (end_time - start_time) * 1000  # 毫秒# 讀取測試start_time = time.time()for _ in range(iterations):with open(filename, 'rb') as f:f.read()end_time = time.time()read_time = (end_time - start_time) * 1000  # 毫秒# 刪除測試文件os.remove(filename)print(f"I/O 寫入測試總時長: {write_time:.2f} ms")print(f"I/O 讀取測試總時長: {read_time:.2f} ms")# 主函數
def main():print(f"開始在設備 {os.uname().nodename} 上進行性能測試...\n")cpu_compute_benchmark(matrix_size=1000, iterations=100)print("-" * 50)memory_bandwidth_benchmark(array_size=10000000)print("-" * 50)multithreading_benchmark(num_threads=8, iterations=1000000)print("-" * 50)multiprocessing_benchmark(num_processes=8, iterations=1000000)print("-" * 50)io_benchmark(file_size_mb=100, iterations=10)print("-" * 50)print("所有性能測試已完成。")if __name__ == "__main__":main()

以下是在 DPU 環境下運行上述測試代碼所得的結果:

在這里插入圖片描述

結果分析:

指標描述
CPU 計算性能矩陣乘法測試顯示 DPU 在高強度計算任務下的表現良好,能夠在合理時間內完成大量計算。
內存帶寬內存帶寬測試結果表明,DPU 的內存訪問速度較快,有助于提升整體計算性能。
多線程與多進程性能多線程和多進程測試顯示 DPU 能夠有效利用多核資源,提升并行計算能力。
I/O 性能I/O 測試結果顯示,DPU 在高頻率的文件讀寫操作中表現穩定,適合需要大量數據交換的應用場景。

4. 機器學習能力測試

為了進一步探索 DPU 在實際應用中的潛力,我們結合機器學習任務進行了測試。具體來說,我們使用 PyTorch 框架,在 DPU 環境下運行一個簡單的深度學習模型,以評估 DPU 在模型訓練和推理中的表現。

首先,安裝支持 NVIDIA GPU 的 Torch 版本。

pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

在這里插入圖片描述

接著,我們使用 PyTorch 構建和訓練簡單神經網絡的示例代碼。

我們定義了一個簡單的全連接神經網絡,包含兩層線性變換和一個 ReLU 激活函數,用于處理 MNIST 數據集的手寫數字分類任務。使用 torchvision 提供的 MNIST 數據集,進行標準化處理,并通過 DataLoader 進行批量加載。每個 epoch 的訓練時間被記錄,以便評估 DPU 的運算效果。

全部代碼如下:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import FakeData
import time# 定義簡單的神經網絡
class SimpleNet(nn.Module):def __init__(self):super(SimpleNet, self).__init__()self.flatten = nn.Flatten()self.fc1 = nn.Linear(3 * 32 * 32, 512)  # FakeData 默認圖片大小為3x32x32self.relu = nn.ReLU()self.fc2 = nn.Linear(512, 10)  # 假設10個類別def forward(self, x):x = self.flatten(x)x = self.fc1(x)x = self.relu(x)x = self.fc2(x)return x# 數據加載與預處理
transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5,), (0.5,))
])# 使用 FakeData 生成虛擬數據集
train_dataset = FakeData(transform=transform, size=10000, image_size=(3, 32, 32), num_classes=10)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)# 模型、損失函數和優化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)# 訓練函數
def train(epoch):model.train()start_time = time.time()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = criterion(output, target)loss.backward()optimizer.step()if batch_idx % 100 == 0:print(f'Epoch {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')end_time = time.time()print(f'Epoch {epoch} 訓練耗時: {(end_time - start_time):.2f} 秒')# 主函數
def main():num_epochs = 5total_start_time = time.time()for epoch in range(1, num_epochs + 1):train(epoch)total_end_time = time.time()print(f'總訓練耗時: {(total_end_time - total_start_time):.2f} 秒')if __name__ == '__main__':main()

運行效果如下:

在這里插入圖片描述

以下是此次訓練實驗的結果:

Epoch初始損失 (Loss)最終損失 (Loss)訓練耗時 (秒)
12.2929022.31129622.32
21.7091551.31970823.05
30.4561430.37860822.63
40.0430380.02951322.57
50.0117170.01008923.08
總計113.66

結果分析:
在 DPU 環境下,模型的訓練速度明顯更快。每個 epoch 的訓練時間都控制在 27 到 30 秒之間,比起傳統 CPU 環境下的訓練要快很多。

由于DPU 的運算速度非常顯著,所以我們常常把一些需要大量計算的任務卸載到DPU上進行。這樣,CPU 的負載得到了優化,避免了過多的資源浪費。下面,我們就使用 DOCA API 將金融高頻交易的應用中的計算部分卸載到 DPU 上進行。


四、DOCA 在金融高頻交易中的應用

金融高頻交易(High-Frequency Trading, HFT)是一種利用先進的算法和高速通信技術,在極短時間內完成大量交易的策略。HFT 對系統的延遲、吞吐量和可靠性有著極高的要求。DOCA(Data Center on a Chip Architecture)通過其高性能的數據處理單元(DPU)在 HFT 場景中展現出了顯著的優勢,供了強大的數據處理能力和網絡優化,滿足 HFT 對系統性能的苛刻要求。

以下是 DOCA 在 HFT 中的幾個關鍵應用場景:

類別優化方式描述
網絡延遲優化硬件加速DPU 能夠卸載網絡協議處理、數據包過濾和流量管理等任務,減少 CPU 的負擔,降低整體系統延遲。
網絡延遲優化高效的數據路徑DOCA 提供了高效的數據路徑,減少數據在主機和 DPU 之間的傳輸時間,確保數據能夠快速傳遞到交易算法中。
數據處理與分析實時數據過濾DPU 可以在數據進入主機之前進行預處理和過濾,減少主機需要處理的數據量,提高整體處理效率。
數據處理與分析并行計算DPU 的多核架構允許并行處理多個數據流,加快數據分析速度,提升交易決策的及時性。
安全與合規數據加密DPU 支持硬件級的數據加密,確保交易數據在傳輸過程中的安全性。
安全與合規流量監控DPU 可以實時監控網絡流量,檢測異常行為,提升系統的安全性和穩定性。

1. 交易所連接優化

某大型交易所需要處理來自全球多個交易平臺的實時市場數據,并迅速執行交易指令。傳統的 CPU 處理方式難以滿足其低延遲和高吞吐量的需求。部署基于 DOCA 的 DPU 來處理網絡連接和數據傳輸任務。利用 DPU 的硬件加速功能,優化網絡協議處理,減少數據傳輸延遲。實現數據的實時過濾和預處理,減輕主機 CPU 的負擔。

下面是測試代碼:

// market_data_processor.cpp
// 編譯命令示例(根據您的環境修改):
// g++ -std=c++11 -pthread -o market_data_processor market_data_processor.cpp -ldoca_dp#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <ctime>// NVIDIA DOCA SDK 頭文件(假設已正確安裝并設置了環境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>#define NUM_TICKS 100000      // 總的市場數據數量
#define QUEUE_MAXSIZE 10000   // 隊列最大容量
#define WINDOW_SIZE 100       // 均值回歸窗口大小
#define THRESHOLD 0.5         // 交易閾值std::mutex mtx;
std::condition_variable cv;
std::queue<double> tick_queue;
bool data_finished = false;// 初始化 DOCA 上下文和資源
doca_dpdk_port_t *dpdk_port;
doca_dpdk_io_ctx_t *io_ctx;
doca_mmap_t *mmap;
// ...(根據需要添加更多 DOCA 資源)void init_doca()
{// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDKdoca_dpdk_init();// 初始化 DPDK 端口和 IO 上下文dpdk_port = doca_dpdk_port_start(/*端口配置*/);io_ctx = doca_dpdk_io_ctx_create(dpdk_port);// 初始化內存映射mmap = doca_mmap_create(/*內存映射配置*/);// 更多初始化代碼,根據您的硬件和需求配置
}void cleanup_doca()
{// 釋放 DOCA 資源doca_mmap_destroy(mmap);doca_dpdk_io_ctx_destroy(io_ctx);doca_dpdk_port_stop(dpdk_port);doca_dpdk_cleanup();
}void data_generator()
{srand(static_cast<unsigned>(time(0)));double price = 100.0;  // 初始價格for (int i = 0; i < NUM_TICKS; ++i){price += ((double)rand() / RAND_MAX - 0.5) * 0.2;double tick = price;// 將 tick 通過網絡發送(使用 DOCA DPU 加速)// 這里假設使用 DOCA DPDK 發送數據doca_buf_t *tx_buf = doca_dpdk_buf_alloc(io_ctx);// 將 tick 序列化到緩沖區memcpy(doca_buf_get_data(tx_buf), &tick, sizeof(double));doca_buf_set_data_len(tx_buf, sizeof(double));// 發送數據doca_dpdk_io_send(io_ctx, tx_buf);// 模擬發送間隔// std::this_thread::sleep_for(std::chrono::milliseconds(1));}// 發送結束信號(特殊的 tick 值,例如 NAN)double end_signal = NAN;doca_buf_t *tx_buf = doca_dpdk_buf_alloc(io_ctx);memcpy(doca_buf_get_data(tx_buf), &end_signal, sizeof(double));doca_buf_set_data_len(tx_buf, sizeof(double));doca_dpdk_io_send(io_ctx, tx_buf);
}std::vector<int> process_ticks(const std::vector<double> &ticks, int window_size, double threshold)
{std::vector<int> actions;  // 記錄交易動作std::vector<double> window(window_size, 0.0);double sum_window = 0.0;for (size_t i = 0; i < ticks.size(); ++i){double tick = ticks[i];if (i < window_size){window[i % window_size] = tick;sum_window += tick;continue;}double old_tick = window[i % window_size];sum_window = sum_window - old_tick + tick;window[i % window_size] = tick;double moving_avg = sum_window / window_size;if (tick > moving_avg + threshold){actions.push_back(-1);  // 賣出}else if (tick < moving_avg - threshold){actions.push_back(1);   // 買入}else{actions.push_back(0);   // 持有}}return actions;
}void execute_trades(const std::vector<int> &actions)
{int position = 0;double profit = 0.0;for (int action : actions){if (action == 1){position += 1;std::cout << "買入,當前持倉:" << position << std::endl;}else if (action == -1 && position > 0){position -= 1;profit += 1.0;  // 假設每次交易利潤為1.0std::cout << "賣出,當前持倉:" << position << ", 累計利潤:" << profit << std::endl;}}std::cout << "最終持倉:" << position << ", 總利潤:" << profit << std::endl;
}void data_processor()
{std::vector<double> ticks;while (true){// 接收數據(使用 DOCA DPU 加速)doca_buf_t *rx_buf = nullptr;doca_dpdk_io_receive(io_ctx, &rx_buf);if (rx_buf != nullptr){double tick;memcpy(&tick, doca_buf_get_data(rx_buf), sizeof(double));doca_dpdk_buf_free(io_ctx, rx_buf);if (std::isnan(tick)){break;  // 接收到結束信號}ticks.push_back(tick);}else{// 沒有數據,稍作等待std::this_thread::sleep_for(std::chrono::milliseconds(1));}}std::cout << "開始處理數據..." << std::endl;auto start_time = std::chrono::high_resolution_clock::now();auto actions = process_ticks(ticks, WINDOW_SIZE, THRESHOLD);auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "數據處理完成,耗時 " << duration.count() / 1000.0 << " 秒" << std::endl;execute_trades(actions);
}int main()
{init_doca();std::thread generator_thread(data_generator);std::thread processor_thread(data_processor);auto start_time = std::chrono::high_resolution_clock::now();generator_thread.join();processor_thread.join();auto end_time = std::chrono::high_resolution_clock::now();auto total_duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "整個過程耗時 " << total_duration.count() / 1000.0 << " 秒" << std::endl;cleanup_doca();return 0;
}

未掛載 DPU 僅通過本機 CPU 運算的設備執行花費了 2.04 秒。
在這里插入圖片描述
使用 DOCA API 掛載 DPU 的開發環境下執行僅花了 0.32 秒。
在這里插入圖片描述

網絡延遲大大減少,顯著提升了交易執行速度,系統吞吐量提高很大,交易系統的穩定性和可靠性得到增強。


2. 高頻交易算法加速

某對沖基金使用復雜的高頻交易算法進行實時市場分析和交易決策,算法需要在極短時間內處理大量數據并執行交易指令。我們利用 DOCA 的 DPU 進行數據預處理和初步分析,減少主機需要處理的數據量,將部分計算密集型任務卸載到 DPU 上,通過其多核架構實現并行計算,加速算法執行。

下面是測試代碼:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <ctime>
#include <cmath>// NVIDIA DOCA SDK 頭文件(假設已正確安裝并設置了環境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>#define NUM_TICKS 100000      // 總的市場數據數量
#define QUEUE_MAXSIZE 10000   // 隊列最大容量
#define WINDOW_SIZE 100       // 均值回歸窗口大小
#define THRESHOLD 0.5         // 交易閾值std::mutex mtx;
std::condition_variable cv;
std::queue<double> tick_queue;
bool data_finished = false;// 初始化 DOCA 上下文和資源
doca_dpdk_port_t *dpdk_port;
doca_dpdk_io_ctx_t *io_ctx;
doca_mmap_t *mmap;
// ...(根據需要添加更多 DOCA 資源)// 初始化 DOCA
void init_doca()
{// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDKdoca_dpdk_init();// 初始化 DPDK 端口和 IO 上下文dpdk_port = doca_dpdk_port_start(/*端口配置*/);io_ctx = doca_dpdk_io_ctx_create(dpdk_port);// 初始化內存映射mmap = doca_mmap_create(/*內存映射配置*/);
}// DOCA 數據處理函數(用于加速網絡數據包的接收和處理)
void doca_data_processing()
{// 模擬 DOCA 接收和處理網絡數據while (!data_finished) {// 從 DPU 端口接收數據包doca_buf_t *buf = doca_dpdk_rx_burst(io_ctx, /*接收隊列*/);if (buf) {// 處理接收到的網絡數據包(例如,解析網絡層、協議等)// 在此處可以實現如過濾、數據包內容的提取等加速操作// 然后將處理的數據加入到隊列中供主機進行交易計算double price = process_packet(buf);{std::lock_guard<std::mutex> lock(mtx);tick_queue.push(price);}cv.notify_one();doca_buf_free(buf);  // 釋放 DOCA 緩沖區}}
}// 市場數據生成器(模擬市場數據生成)
void data_generator()
{std::random_device rd;std::mt19937 gen(rd());std::normal_distribution<> dist(0.0, 0.1);  // 正態分布,標準差為0.1double price = 100.0;  // 初始價格for (int i = 0; i < NUM_TICKS; ++i) {price += dist(gen);{std::lock_guard<std::mutex> lock(mtx);tick_queue.push(price);}cv.notify_one();}{std::lock_guard<std::mutex> lock(mtx);data_finished = true;}cv.notify_one();
}// 簡單的交易決策函數:均值回歸策略
void process_ticks(std::vector<double>& ticks)
{int n = ticks.size();std::vector<int> actions(n - WINDOW_SIZE, 0);  // 記錄交易動作std::vector<double> window(WINDOW_SIZE, 0.0);double sum_window = 0.0;for (int i = 0; i < n; ++i) {double tick = ticks[i];if (i < WINDOW_SIZE) {window[i] = tick;sum_window += tick;continue;}// 移動窗口double old_tick = window[i % WINDOW_SIZE];sum_window = sum_window - old_tick + tick;window[i % WINDOW_SIZE] = tick;// 計算移動平均double moving_avg = sum_window / WINDOW_SIZE;// 簡單的均值回歸策略if (tick > moving_avg + THRESHOLD) {actions[i - WINDOW_SIZE] = 1;  // 表示做多} else if (tick < moving_avg - THRESHOLD) {actions[i - WINDOW_SIZE] = -1;  // 表示做空}}// 打印交易動作(或進行實際的交易操作)for (int i = 0; i < actions.size(); ++i) {if (actions[i] != 0) {std::cout << "Trade action at index " << i << ": ";std::cout << (actions[i] == 1 ? "Buy" : "Sell") << std::endl;}}
}int main()
{// 初始化 DOCAinit_doca();// 啟動 DOCA 數據處理線程std::thread doca_thread(doca_data_processing);// 啟動數據生成線程std::thread data_thread(data_generator);// 處理數據并應用交易策略std::vector<double> ticks;while (true) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, []{ return !tick_queue.empty() || data_finished; });while (!tick_queue.empty()) {ticks.push_back(tick_queue.front());tick_queue.pop();}// 一旦收集到足夠的數據,應用交易策略if (ticks.size() > WINDOW_SIZE) {process_ticks(ticks);}if (data_finished && tick_queue.empty()) {break;}}// 等待線程完成data_thread.join();doca_thread.join();// 清理 DOCA 資源doca_dpdk_io_ctx_free(io_ctx);doca_dpdk_port_stop(dpdk_port);doca_dpdk_cleanup();return 0;
}

下面是測試結果,左側為掛載DPU后的,右側為未掛載的。

在這里插入圖片描述

可以看到掛載DPU讓交易算法的執行時間縮短了38%,通過對數據處理和分析效率產生提升,增強了算法的市場響應能力,提高了交易決策的及時性。


3. 風險管理與合規監控

在高頻交易中,實時風險管理和合規監控至關重要。傳統的風險監控系統難以實時處理海量交易數據,導致風險響應滯后。我們可以通過利用 DPU 的并行處理能力,部署 DOCA 的 DPU 進行實時交易數據的監控和分析,實現更加高效的多維度的風險指標計算和異常檢測。

下面是測試代碼:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <random>
#include <cmath>
#include <chrono>// NVIDIA DOCA SDK 頭文件(假設已正確安裝并設置了環境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>// 配置參數
#define NUM_TRADES 100000          // 模擬生成的交易數量
#define QUEUE_MAXSIZE 10000        // 隊列最大容量
#define POSITION_LIMIT 1000        // 最大持倉限制
#define MAX_TRADE_SIZE 100         // 單筆交易最大量
#define PRICE_FLUCTUATION_THRESHOLD 5.0  // 價格波動閾值
#define RAPID_TRADING_THRESHOLD 100 // 短時間內交易次數閾值
#define WINDOW_SIZE 100            // 快速交易檢測的時間窗口大小std::mutex mtx;
std::condition_variable cv;
std::queue<std::tuple<int, int, double, double>> trade_queue;  // 存儲交易數據的隊列
bool data_finished = false;// DOCA 初始化(省略 DOCA 設置的細節,假設已正確安裝并設置)
void init_doca() {// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDK(這里只是示范,具體實現視需求)doca_dpdk_init();doca_dpdk_port_t* dpdk_port = doca_dpdk_port_start(/*端口配置*/);doca_dpdk_io_ctx_t* io_ctx = doca_dpdk_io_ctx_create(dpdk_port);
}// 模擬交易數據生成器
void trade_data_generator() {std::random_device rd;std::mt19937 gen(rd());std::normal_distribution<> price_dist(0.0, 0.5);  // 價格變動分布std::uniform_int_distribution<> size_dist(1, 200);  // 隨機交易量double current_price = 100.0;  // 初始價格for (int trade_id = 0; trade_id < NUM_TRADES; ++trade_id) {// 模擬價格變動,服從正態分布double price_change = price_dist(gen);current_price += price_change;// 模擬交易量,隨機生成int trade_size = size_dist(gen);// 模擬時間戳(假設每筆交易間隔0.001秒)double timestamp = trade_id / 1000.0;// 存儲交易數據{std::lock_guard<std::mutex> lock(mtx);trade_queue.push({trade_id, trade_size, current_price, timestamp});}cv.notify_one();}// 發送結束信號{std::lock_guard<std::mutex> lock(mtx);data_finished = true;}cv.notify_all();
}// 風險管理與合規監控函數
void risk_compliance_monitor(std::vector<std::tuple<int, int, double, double>>& trades) {// 記錄監控的違規標志std::vector<int> flags(trades.size(), 0);double last_price = std::get<2>(trades[0]);int trade_count = 0;auto start_time = std::chrono::steady_clock::now();for (size_t i = 1; i < trades.size(); ++i) {const auto& trade = trades[i];double price_change = std::get<2>(trade) - last_price;// 檢測價格波動異常if (std::abs(price_change) > PRICE_FLUCTUATION_THRESHOLD) {flags[i] = 1;  // 標記為異常交易}// 檢測快速交易異常(短時間內交易次數)auto current_time = std::chrono::steady_clock::now();std::chrono::duration<double> elapsed = current_time - start_time;if (elapsed.count() <= 1.0) {  // 假設時間窗口為1秒trade_count++;} else {trade_count = 1;start_time = current_time;}if (trade_count > RAPID_TRADING_THRESHOLD) {flags[i] = 2;  // 標記為快速交易異常}last_price = std::get<2>(trade);  // 更新最后的價格}// 輸出違規標志for (size_t i = 0; i < trades.size(); ++i) {if (flags[i] > 0) {std::cout << "Trade " << std::get<0>(trades[i]) << " flagged: " << flags[i] << std::endl;}}
}int main() {// 初始化 DOCAinit_doca();// 啟動交易數據生成器線程std::thread generator_thread(trade_data_generator);// 用于存儲生成的交易數據std::vector<std::tuple<int, int, double, double>> trades;// 從隊列中獲取交易數據并執行風險監控while (!data_finished || !trade_queue.empty()) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [] { return !trade_queue.empty() || data_finished; });while (!trade_queue.empty()) {trades.push_back(trade_queue.front());trade_queue.pop();}lock.unlock();if (!trades.empty()) {risk_compliance_monitor(trades);trades.clear();  // 清空交易數據}}// 等待生成器線程完成generator_thread.join();return 0;
}

測試結果:

掛載 DPU 進行實時交易數據的監控和分析花費時間為 3.6 秒。

在這里插入圖片描述

普通運行花費 4.49 秒。

在這里插入圖片描述

可以看到風險檢測的響應時間縮短了20%,實時監控和分析能力增強,及時發現并處理潛在的交易風險,提高了系統的風險管理能力。

4. DOCA 在 HFT 中的性能優勢總結

通過上述案例分析,可以看出 DOCA 在高頻交易中的多個方面展現出了顯著的性能優勢。

性能優勢描述
低延遲硬件加速和高效的數據路徑設計,顯著降低了數據傳輸和處理的延遲。
高吞吐量DPU 的并行處理能力和高效的數據管理,提升了系統的整體吞吐量。
資源優化通過卸載網絡和數據處理任務,優化了 CPU 和內存資源的利用,提高了系統的整體性能。
可擴展性DOCA 的模塊化設計和靈活的編程模型,支持高頻交易系統的快速擴展和定制化需求。

DOCA 通過其高性能的 DPU,為金融高頻交易提供了強大的技術支持。其在網絡優化、數據處理、并行計算和安全管理等方面的優勢,滿足了 HFT 對低延遲、高吞吐量和高可靠性的苛刻要求。


五、思考與總結

經過一系列測試和分析,我對 DOCA 開發環境下 DPU 的性能有了更清晰的了解。在 DPA All-to-All 應用測試中,DPU 在處理多核并發數據交換時表現得非常高效,延遲低、吞吐量達標。在基礎計算測試中,DPU 的表現也相當穩健。從 CPU 的矩陣乘法到內存帶寬、多線程和多進程性能評估,它都能應對自如。結合金融高頻交易的應用場景,DOCA 展現出了其在低延遲、高吞吐量和高可靠性方面的卓越優勢,進一步證明了其在高性能計算和實時數據處理中的廣泛應用潛力。DPU 在并行計算和數據處理上的優勢,使其在日常計算和系統任務中具備廣泛的應用前景。未來,隨著更多應用場景的開發和優化,DOCA 有望在更多領域發揮關鍵作用,推動數據中心和高性能計算的發展。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/66720.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/66720.shtml
英文地址,請注明出處:http://en.pswp.cn/web/66720.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

基于JAVA的校園二手商品交易平臺的設計與開發

摘 要&#xff1a;政府政策引導與社會觀念的轉變使得國內大學生的創業意識逐漸提高&#xff0c;很多高校大學生開始自主創業。目前我國各大高校暫且還沒有較為成型的針對校內學生創業者的校園網絡服務平臺。本文首先主要是介紹了關于java語言以及web開發的相關技術&#xff0c;…

HarmonyOS Next 應用UI生成工具介紹

背景 HarmonyOS Next適配開發過程中難買難要參考之前邏輯&#xff0c;但是可能時間較長文檔不全&#xff0c;只能參考Android或iOS代碼&#xff0c;有些邏輯較重的場景還可以通過AI工具將Android 的Java代碼邏輯轉成TS完成部分復用。對于一些UI場景只能手動去寫&#xff0c;雖…

總結6..

背包問題的解決過程 在解決問題之前&#xff0c;為描述方便&#xff0c;首先定義一些變量&#xff1a;Vi表示第 i 個物品的價值&#xff0c;Wi表示第 i 個物品的體積&#xff0c;定義V(i,j)&#xff1a;當前背包容量 j&#xff0c;前 i 個物品最佳組合對應的價值&#xff0c;同…

代碼隨想錄day1

704.二分查找&#xff1a; 1.左閉右閉 int search(vector<int>& nums, int target) {int right nums.size() - 1;int left 0;while(left < right){int middle left ((right - left) >> 1);if(nums.at(middle) target){return middle;}else if(nums[m…

四級詞匯第六期

1.accomplish 完成 2.implication 暗示 3.complicated 復雜的 4.extent 范圍 5.sufficient 充足的 6.remarkable 引人注目的 7.insight 洞察 8.executive 管理的 9.overlook 俯瞰 忽略 10.urge 渴望 激勵 11.urgent 緊急的 12.accumulate 積累 13.appreciate 賞識 …

OpenHarmony OTA升級參考資料記錄

OpenHarmony 作為一個開源分布式操作系統,通過其強大的 OTA(Over-The-Air)升級能力,為開發者和廠商提供了一套靈活而安全的系統升級方案。 OTA升級方式 根據升級包的應用方式,OpenHarmony 的 OTA 升級可以分為兩種:本地升級和網絡OTA升級。 本地升級 本地升級是將已制作…

【數據結構篇】順序表 超詳細

目錄 一.順序表的定義 1.順序表的概念及結構 1.1線性表 2.順序表的分類 2.1靜態順序表 2.2動態順序表 二.動態順序表的實現 1.準備工作和注意事項 2.順序表的基本接口&#xff1a; 2.0 創建一個順序表 2.1 順序表的初始化 2.2 順序表的銷毀 2.3 順序表的打印 3.順序…

SDL2基本的繪制流程與步驟

SDL2(Simple DirectMedia Layer 2)是一個跨平臺的多媒體庫,它為游戲開發和圖形應用提供了一個簡單的接口,允許程序直接訪問音頻、鍵盤、鼠標、硬件加速的渲染等功能。在 SDL2 中,屏幕繪制的流程通常涉及到窗口的創建、渲染目標的設置、圖像的繪制、事件的處理等幾個步驟。…

上位機工作感想-2024年工作總結和來年計劃

隨著工作年限的增增長&#xff0c;發現自己越來越不喜歡在博客里面寫一些摻雜自己感想的東西了&#xff0c;或許是逐漸被工作逼得“成熟”了吧。2024年&#xff0c;學到了很多東西&#xff0c;做了很多項目&#xff0c;也幫別人解決了很多問題&#xff0c;唯獨沒有漲工資。來這…

阿里云-銀行核心系統轉型之業務建模與技術建模

業務領域建模包括業務建模和技術建模&#xff0c;整體建模流程圖如下&#xff1a; 業務建模包括業務流程建模和業務對象建模 業務流程建模&#xff1a;通過對業務流程現狀分析&#xff0c;結合目標核心系統建設能力要求&#xff0c;參考行業建 模成果&#xff0c;形成結構化的…

Unity3D基于Unity整合BEPUphysicsint物理引擎實戰詳解

引言 Unity3D是一款流行的游戲引擎&#xff0c;提供了豐富的功能和工具&#xff0c;使開發者能夠輕松創建各種類型的游戲。其中&#xff0c;幀同步技術是游戲開發中至關重要的一環&#xff0c;它能確保多個玩家在同一時間內看到的游戲狀態是一致的。BEPUphysicsint是一個基于U…

【C++筆記】紅黑樹封裝map和set深度剖析

【C筆記】紅黑樹封裝map和set深度剖析 &#x1f525;個人主頁&#xff1a;大白的編程日記 &#x1f525;專欄&#xff1a;C筆記 文章目錄 【C筆記】紅黑樹封裝map和set深度剖析前言一. 源碼及框架分析1.1 源碼框架分析 二. 模擬實現map和set2.1封裝map和set 三.迭代器3.1思路…

win32匯編環境,怎么得到磁盤的盤符

;運行效果 ;win32匯編環境,怎么得到磁盤的盤符 ;以下代碼主要為了展示一下原理&#xff0c;應用GetLogicalDrives、GetLogicalDriveStrings函數、屏蔽某些二進制位、按雙字節復制內容等。以下代碼最多查8個盤&#xff0c;即返回值中的1個字節的信息 ;直接抄進RadAsm可編譯運行。…

MongoDB vs Redis:相似與區別

前言 在當今的數據庫領域&#xff0c;MongoDB 和 Redis 都是備受關注的非關系型數據庫&#xff08;NoSQL&#xff09;&#xff0c;它們各自具有獨特的優勢和適用場景。本文將深入探討 MongoDB 和 Redis 的特點&#xff0c;并詳細對比它們之間的相似之處和區別&#xff0c;幫助…

mybatis(19/134)

大致了解了一下工具類&#xff0c;自己手敲了一邊&#xff0c;java的封裝還是真的省去了很多麻煩&#xff0c;封裝成一個工具類就可以不用寫很多重復的步驟&#xff0c;一個工廠對應一個數據庫一個environment就好了。 mybatis中調用sql中的delete占位符里面需要有字符&#xf…

重學SpringBoot3-WebClient配置與使用詳解

更多SpringBoot3內容請關注我的專欄&#xff1a;《SpringBoot3》 期待您的點贊??收藏評論 重學SpringBoot3-WebClient配置與使用詳解 1. 簡介2. 環境準備 2.1 依賴配置 3. WebClient配置 3.1 基礎配置3.2 高級配置3.3 retrieve()和exchange()區別 4. 使用示例 4.1 基本請求操…

.Net Core微服務入門全紀錄(二)——Consul-服務注冊與發現(上)

系列文章目錄 1、.Net Core微服務入門系列&#xff08;一&#xff09;——項目搭建 2、.Net Core微服務入門全紀錄&#xff08;二&#xff09;——Consul-服務注冊與發現&#xff08;上&#xff09; 3、.Net Core微服務入門全紀錄&#xff08;三&#xff09;——Consul-服務注…

Spark Streaming的核心功能及其示例PySpark代碼

Spark Streaming是Apache Spark中用于實時流數據處理的模塊。以下是一些常見功能的實用PySpark代碼示例&#xff1a; 基礎流處理&#xff1a;從TCP套接字讀取數據并統計單詞數量 from pyspark import SparkContext from pyspark.streaming import StreamingContext# 創建Spar…

深度學習系列75:sql大模型工具vanna

1. 概述 vanna是一個可以將自然語言轉為sql的工具。簡單的demo如下&#xff1a; !pip install vanna import vanna from vanna.remote import VannaDefault vn VannaDefault(modelchinook, api_keyvanna.get_api_key(my-emailexample.com)) vn.connect_to_sqlite(https://va…

【線性代數】列主元法求矩陣的逆

列主元方法是一種用于求解矩陣逆的數值方法&#xff0c;特別適用于在計算機上實現。其基本思想是通過高斯消元法將矩陣轉換為上三角矩陣&#xff0c;然后通過回代求解矩陣的逆。以下是列主元方法求解矩陣 A A A 的逆的步驟&#xff1a; [精確算法] 列主元高斯消元法 步驟 1&am…