此前dbpystream庫是用python開發 web api。今天在rust中試用一下protobuf。
一、 protobuf編譯器下載
具體見相關文章。沒有編譯器,protobuf無法運行。
windows參見:
https://blog.csdn.net/wowotuo/article/details/139458846?spm=1001.2014.3001.5502。
二、proto文件的準備
proto文件中主要模擬了一個dbpystream中一個get_price函數的輸入和輸出的格式,輸入HistoryBarRequest ,輸出HistoryBarResponse。HistoryBarResponse中,有代碼名稱,日期,開盤價,最高價等。
在格式中,包括了string,TimeStamp,double; 其中repeated就是vec格式。
syntax = "proto3";
package dbdata;
import public "google/protobuf/timestamp.proto";
service DataService {rpc query (HistoryBarRequest) returns (HistoryBarRequest) {}
}
service Login{rpc auth (Auth) returns (Response) {}
}
message Auth{string id =1; string password=2;
}
message HistoryBarRequest {string security = 1;string frequency = 2;FieldParam fields = 3;google.protobuf.Timestamp start_date = 4;//收集時間google.protobuf.Timestamp end_date = 5;//收集時間bool is_fq =6 ;
}
message HistoryBarResponse{repeated string securitycode = 1;repeated google.protobuf.Timestamp datetime =2;repeated double open = 3;repeated double high = 4;repeated double close = 5;repeated double low =6;repeated double volume=7;repeated double amount=8;repeated sint64 is_fq = 9;
}message FieldParam{bool is_all = 1;
}message Response {bool status = 1;bytes msg = 2;string error = 3;
}
三、toml文件、文件目錄結構、build.rs
1、toml文件有
[package]
name = "clap-2"
version = "0.1.0"
edition = "2021"# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html[dependencies]
axum = "0.7.5" # web 服務器
anyhow = "1" # 錯誤處理
reqwest = { version = "0.12.4", features = ["json"] } # HTTP 客戶端
tokio = { version = "1", features = ["full"] } # 異步處理庫
prost = "0.12.6"
# Only necessary if using Protobuf well-known types:
prost-types = "0.12.6"
serde = { version = "1", features = ["derive"] } # 序列化/反序列化數據
polars = { version = "0.39.0", features = ["json"]}
chrono = { version = "0.4", features = ["unstable-locales"] }
[build-dependencies]
prost-build = "0.12.6" # 編譯 protobuf
上面polars,chrono,prost-types,prost-build,prost是關鍵庫,其它暫時可以不看。
2、目錄結構
具體如下:
PS D:\my_program\clap-2> tree /F
卷 新加卷 的文件夾 PATH 列表
卷序列號為 D855-8BFE
D:.
│ .gitignore
│ build.rs
│ Cargo.lock
│ Cargo.toml
│ dbdata.proto
│
└─src│ main.rs│└─pbdbdata.rsmod.rs
可見,在src/目錄下,創建了一個pb文件夾,存放未來生成的dbdata.proto文件。
3、build.rs
fn main() {prost_build::Config::new().out_dir("src/pb")//設置proto輸出目錄.compile_protos(&["dbdata.proto"], &["."])//我們要處理的proto文件.unwrap();
}
運行cargo build,即生成了dbdata.proto.
四、原始數據、main.rs
1、原始數據準備
這個原始數據的格式,即收到request后,將發送這個數據內容出去。
文件名稱是"C:\Users\Desktop\test.csv"。
這里采用了polars來讀取csv文件。
2、main.rs
下面的main.rs模擬了收到resquest,發送response的過程。這個過程可以用web框架,如axum,也可以用grpc框架。這部分不是今天的重點。
use pb::dbdata::{self, HistoryBarResponse};
mod pb;
use prost_types::Timestamp;
use std::time::{Duration, SystemTime};
use polars::prelude::*;
use chrono::{NaiveDate, NaiveDateTime,NaiveTime};
fn main() ->Result<(),PolarsError>{let request = dbdata::HistoryBarRequest {security: String::from("600036.XSHG"),frequency: String::from("1minute"),fields: Some(dbdata::FieldParam {is_all:true}),start_date: Some(prost_types::Timestamp::from(SystemTime::now()-Duration::from_secs(3600*12*250))),end_date:Some(prost_types::Timestamp::from(SystemTime::now())),is_fq:true,};println!("模擬收到request:{:?}",request);println!("模擬開始進行相應的數據處理.....");let file = r"C:\Users\Desktop\test.csv";let df: DataFrame = CsvReader::from_path(file)?.has_header(true).finish().unwrap();println!("starting...");println!("df: {:?}",df);let res = HistoryBarResponse{securitycode : df.column("securitycode")?.str()?.into_no_null_iter().map(|s|String::from(s)).collect(),datetime:df.column("date")?.str()?.into_no_null_iter().map(|t| convert(t)).collect(),open:df.column("open")?.f64()?.into_no_null_iter().collect(),high:df.column("high")?.f64()?.into_no_null_iter().collect(),close:df.column("close")?.f64()?.into_no_null_iter().collect(),low:df.column("low")?.f64()?.into_no_null_iter().collect(),volume:df.column("volume")?.i64()?.into_no_null_iter().map(|v|v as f64).collect(),amount:df.column("amount")?.f64()?.into_no_null_iter().collect(),is_fq:df.column("is_fq")?.i64()?.into_no_null_iter().collect(),};//println!("{:?}", res);let encoded = prost::Message::encode_to_vec(&res);let decoded = < pb::dbdata::HistoryBarResponse as prost::Message>::decode(&encoded[..]).unwrap();println!("模擬發送相應的數據: {:?}", &decoded.securitycode[0]);Ok(())
}
// 簡單由&str生成Timestamp,這里格式是"%Y/%m/%d",只是模擬代碼。
fn convert(dt_str:&str) ->Timestamp {let naive_date = NaiveDate::parse_from_str(dt_str, "%Y/%m/%d").unwrap();let nano_second = NaiveTime::from_hms_milli_opt(0, 0, 0, 0).unwrap();let dt: NaiveDateTime = naive_date.and_time(nano_second );Timestamp{seconds:dt.and_utc().timestamp(),nanos:0,}
}
運行如下:
模擬收到request:HistoryBarRequest { security: "600036.XSHG", frequency: "1minute", fields: Some(FieldParam {
is_all: true }), start_date: Some(Timestamp { seconds: 1707035277, nanos: 595181300 }), end_date: Some(Timestamp { seconds: 1717835277, nanos: 595183100 }), is_fq: true }
模擬開始進行相應的數據處理.....
starting...
df: shape: (482, 9)
┌──────────────┬───────────┬────────┬────────┬───┬────────┬────────┬─────────────┬───────┐
│ securitycode ┆ date ┆ open ┆ high ┆ … ┆ close ┆ volume ┆ amount ┆ is_fq │
│ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ f64 ┆ f64 ┆ ┆ f64 ┆ i64 ┆ f64 ┆ i64 │
╞══════════════╪═══════════╪════════╪════════╪═══╪════════╪════════╪═════════════╪═══════╡
│ 600036.XSHG ┆ 2021/2/3 ┆ 1210.4 ┆ 1222.3 ┆ … ┆ 1221.5 ┆ 12234 ┆ 1.4943831e7 ┆ 1 │
│ 600037.XSHG ┆ 2021/2/4 ┆ 1210.5 ┆ 1222.4 ┆ … ┆ 1221.6 ┆ 12235 ┆ 1.4946276e7 ┆ 1 │
│ 600038.XSHG ┆ 2021/2/5 ┆ 1210.6 ┆ 1222.5 ┆ … ┆ 1221.7 ┆ 12236 ┆ 1.4949e7 ┆ 1 │
│ 600039.XSHG ┆ 2021/2/6 ┆ 1210.7 ┆ 1222.6 ┆ … ┆ 1221.8 ┆ 12237 ┆ 1.4951e7 ┆ 1 │
│ 600040.XSHG ┆ 2021/2/7 ┆ 1210.8 ┆ 1222.7 ┆ … ┆ 1221.9 ┆ 12238 ┆ 1.4954e7 ┆ 1 │
│ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │
│ 600513.XSHG ┆ 2022/5/26 ┆ 1258.1 ┆ 1270.0 ┆ … ┆ 1269.2 ┆ 12711 ┆ 1.6133e7 ┆ 1 │
│ 600514.XSHG ┆ 2022/5/27 ┆ 1258.2 ┆ 1270.1 ┆ … ┆ 1269.3 ┆ 12712 ┆ 1.6135e7 ┆ 1 │
│ 600515.XSHG ┆ 2022/5/28 ┆ 1258.3 ┆ 1270.2 ┆ … ┆ 1269.4 ┆ 12713 ┆ 1.6138e7 ┆ 1 │
│ 600516.XSHG ┆ 2022/5/29 ┆ 1258.4 ┆ 1270.3 ┆ … ┆ 1269.5 ┆ 12714 ┆ 1.6140423e7 ┆ 1 │
│ 600517.XSHG ┆ 2022/5/30 ┆ 1258.5 ┆ 1270.4 ┆ … ┆ 1269.6 ┆ 12715 ┆ 1.6142964e7 ┆ 1 │
└──────────────┴───────────┴────────┴────────┴───┴────────┴────────┴─────────────┴───────┘
模擬發送相應的數據: "600036.XSHG"