系列所有文章
https://zhuanlan.zhihu.com/p/115017849?zhuanlan.zhihu.com在基本熟悉 nom 之后, 這次我們準備用 nom 實現一個 redis 通信協議的解析器. 選擇 redis 是因為 redis 的通信協議易讀且比較簡單.
準備
如果你對 redis 通信協議不熟悉的話可以查閱 通信協議(protocol). 簡單來說 redis 通信協議分為統一請求協議(這里只討論新版請求協議)和回復協議, 請求協議可以方便地通過 Rust 內置的 format!
拼接構成, 而通信協議則使用 nom 解析. redis 協議非常簡單, 這里不再贅述.
首先我們需要一個 redis 服務器, 這里我在開發的機器上用 docker 啟動一個 redis 服務器:
docker run -d --name redis -p 6379:6379 redis redis-server --appendonly yes
測試下 redis 服務
telnet localhost 6379
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
ping
+PONG
出現 +PONG 說明服務器已正常運行
實現基本功能
首先創建項目
cargo new rcli && cd rcli
添加如下依賴
[dependencies]
tokio = { version = "0.2", features = ["full"]}
nom = "5"
bytes = "0.5.4"
structopt = "0.3.14"
structopt 可以幫助我們快速構建命令行工具輸入 redis 命令幫助測試, bytes 則可以幫助我們處理字節, tokio 依賴是上個測試代碼遺留的依賴, 剛好新代碼也需要 tcp 連接, 索性使用 tokio 處理 tcp 連接, nom 自然是用于解析回復.
首先我們需要創建 tcp 連接與 redis 通信, 并且寫入一些數據看看協議是否管用:
use bytes::{BufMut, BytesMut};
use std::error::Error;
use tokio::net::TcpStream;
use tokio::prelude::*;#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {let mut stream = TcpStream::connect("127.0.0.1:6379").await?;let mut buf = [0u8; 1024];let mut resp = BytesMut::with_capacity(1024);let (mut reader, mut writer) = stream.split();// 向服務器發送 PINGwriter.write(b"*1rn$4rnPINGrn").await?;let n = reader.read(&mut buf).await?;resp.put(&buf[0..n]);// 返回結果應該是 PONGprintln!("{:?}", resp);Ok(())
}
如上面代碼展示的, 我們創建一個 tcp 連接和一個緩沖 buf, 在成功連接后根據協議嘗試寫入 *1rn$4rnPINGrn
, 預期結果是服務器返回 "+PONGrn"
.
現在我們可以創建 CLI 實現幾個常用的 redis 命令, 方便我們向服務器發送命令. 創建 commands.rs
文件, 記得在 main.rs
中導入它.
以 rpush
為例, rpush
命令用法為 RPUSH key value [value …]
使用 structopt 可以這樣定義一個枚舉(使用結構體也可以, 但因為將來有很多子命令, 所以枚舉更合適)
use structopt::StructOpt;#[derive(Debug, Clone, StructOpt)]
pub enum Commands {/// push value to listRpush {/// redis keykey: String,/// valuevalues: Vec<String>,},
}
接著在 main.rs
中使用 Commands
解析命令行
use structopt::StructOpt;
mod commands;#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {// 創建 tcp 連接, buf 等...let com = commands::Commands::from_args();// 發送命令 ...
}
運行項目看下效果
cargo run -- helppush value to listUSAGE:rrdis-cli rpush <key> [values]...FLAGS:-h, --help Prints help information-V, --version Prints version informationARGS:<key> redis key<values>... value
接下來要把從命令行傳來的參數轉換為 redis 統一請求. redis 以 rn
為分隔符, redis 請求格式以 *argc
開頭, argc
是此次請求的參數個數, 每個參數先以 $<參數長度>
聲明參數長度, 接著 rn
分割符, 然后是參數數據, 若有多個參數則重復此步驟. 最后以 rn
結尾.
比如上面的 PING
轉換為 *1rn$4rnPINGrn
, 而 GET
轉換為 *2rn$3rnGETrn$3rnkeyrn
.
可以使用一個 builder 幫助我們轉換:
use bytes::{BufMut, BytesMut};#[derive(Debug, Clone)]
struct CmdBuilder {args: Vec<String>,
}impl CmdBuilder {fn new() -> Self {CmdBuilder { args: vec![] }}fn arg(mut self, arg: &str) -> Self {self.args.push(format!("${}", arg.len()));self.args.push(arg.to_string());self}fn add_arg(&mut self, arg: &str) {self.args.push(format!("${}", arg.len()));self.args.push(arg.to_string());}fn to_bytes(&self) -> BytesMut {let mut bytes = BytesMut::new();bytes.put(&format!("*{}rn", self.args.len() / 2).into_bytes()[..]);bytes.put(&self.args.join("rn").into_bytes()[..]);bytes.put(&b"rn"[..]);bytes}
}
CmdBuilder
做的很簡單, 保存通過 arg
或 add_arg
傳入的參數, 在 to_bytes
方法中拼接這些參數為有效的請求.
例如可以通過如下方式構建一個 GET
命令
let cmd = CmdBuilder::new().arg("GET").arg("key").to_bytes()
接下來使用 CmdBuilder
為 Commands
實現 to_bytes
方法
impl Commands {pub fn to_bytes(&self) -> bytes::BytesMut {let cmd = match self {Commands::Rpush { key, values } => {let mut builder = CmdBuilder::new().arg("RPUSH").arg(key);values.iter().for_each(|v| builder.add_arg(v));builder.to_bytes()}};cmd}
}
改寫 main
函數發送構建的請求
// ... 省略
let com = commands::Commands::from_args();
writer.write(&com.to_bytes()).await?;
cargo run -- rpush list a b c d# redis 成功返回響應
:3rn
All is well, 對于其他命令可以通過相同方法實現, 可以在 rrdis-cli/src/commands.rs 看到完整實現.
解析回復
現在終于到 nom 出場了. 新建 reply.rs
文件, 并在 main.rs
導入. 首先導入需要使用的 nom 方法, 接著定義 Reply
, 因為 redis 回復種類有限, 所以用一個枚舉是非常合適的.
use nom::branch::alt;
use nom::bytes::complete::tag;
use nom::bytes::complete::{take_while, take_while1, take_while_m_n};
use nom::combinator::map;
use nom::multi::many_m_n;
use nom::sequence::delimited;
use nom::IResult;#[derive(Debug)]
pub enum Reply {// 狀態回復或單行回復SingleLine(String),// 錯誤回復Err(String),// 整數回復Int(i64),// 批量回復Batch(Option<String>),// 多條批量回復MultiBatch(Option<Vec<Reply>>),// 回復中沒有, 這里是為了方便進行錯誤處理添加的BadReply(String),
}
單行回復
協議中單行回復定義如下:
一個狀態回復(或者單行回復,single line reply)是一段以 "+" 開始、 "rn" 結尾的單行字符串。
所以解析思路是: 如果回復以"+"開頭, 則讀取余下字節存作為回復, 直到 "rn", 偽代碼如下
take_if("+"), take_util_new_line, take_if("rn")
nom 中的 tag
可以完美實現偽代碼中的 take_if
功能, 令人驚喜的是對于"消耗輸入直到不符合某種條件"這個常見解析模式, nom 提供了 take_while
函數, 所以我們的解析函數可以寫成:
fn parse_single_line(i: &str) -> IResult<&str, Reply> {let (i, _) = tag("+")(i)?;let (i, resp) = take_while(|c| c != 'r' && c != 'n')(i)?;let (i, _) = tag("rn")(i)?;Ok((i, Reply::SingleLine(resp.to_string())))
}
tag
和 take_while
讓解析函數的功能非常直觀地展現出來, 這讓它看著想偽代碼, 但它真的能運行!
在函數中只有 take_while
返回的結果是我們想要的, 但兩個 tag
又是不可或缺, 對于這一常見解析模式 nom 提供了 delimited
這個組合子函數, 這個組合子函數接受三個類似 tag("xx")
這樣的基本函數, 依次應用這三個函數, 如果成功, 則返回第二個函數解析的結果.
所以我們的函數可以這樣寫:
fn parse_single_line(i: &str) -> IResult<&str, Reply> {let (i, resp) = delimited(tag("+"),take_while(|c| c != 'r' && c != 'n'),tag("rn"),)(i)?;Ok((i, Reply::SingleLine(String::from(resp))))
}
錯誤回復
錯誤回復定義:
錯誤回復和狀態回復非常相似, 它們之間的唯一區別是, 錯誤回復的第一個字節是 "-" , 而狀態回復的第一個字節是 "+"
所以錯誤回復解析函數和上面的差不多:
fn parse_err(i: &str) -> IResult<&str, Reply> {let (i, resp) = delimited(tag("-"),// take_while1 與 take_while 類似, 但要求至少一個字符符合條件take_while1(|c| c != 'r' && c != 'n'),tag("rn"),)(i)?;Ok((i, Reply::Err(String::from(resp))))
}
整數回復
整數回復就是一個以 ":" 開頭, CRLF 結尾的字符串表示的整數,
整數回復結構與前兩種類似, 區別在于中間是整數, 需要將 take_while1
的返回值轉換為整數.
如果沒有進行類型轉換解析函數可以這樣實現:
fn parse_int(i: &str) -> IResult<&str, Reply> {let (i, int) = delimited(tag(":"),// 注意負數前綴take_while1(|c: char| c.is_digit(10) || c == '-'),tag("rn"),)(i)?;// ... 類型轉換Ok((i, Reply::Int(int)))
}
注意到 nom 提供的基本解析工廠函數如 tag
創建的解析函數返回值都是 IResult
, 它與 Result
類似, 可以應用 map
運算子, 不過這個 map
需使用 nom 提供的
map(take_while1(|c: char| c.is_digit(10) || c == '-'), |int: &str| int.parse::<i64>().unwrap())
通過 nom 的 map 函數可以把返回值從 IResult<&str, &str>
映射為 IResult<&str, i64>
, 最后解析函數可以寫成
fn parse_int(i: &str) -> IResult<&str, Reply> {let (i, int) = delimited(tag(":"),map(take_while1(|c: char| c.is_digit(10) || c == '-'),|int: &str| int.parse::<i64>().unwrap(),),tag("rn"),)(i)?;Ok((i, Reply::Int(int)))
}
批量回復
服務器發送的內容中: - 第一字節為 "$" 符號 - 接下來跟著的是表示實際回復長度的數字值 - 之后跟著一個 CRLF - 再后面跟著的是實際回復數據 - 最末尾是另一個 CRLF
同時批量回復還有特殊情況
如果被請求的值不存在, 那么批量回復會將特殊值 -1 用作回復的長度值, 這種回復稱為空批量回復(NULL Bulk Reply)
此時協議要求客戶端返回空對象, 對于 Rust 則是 None
, 所以 BatchReply
才會被定義為 BatchReply<Option<String>>
.
所以這個函數的解析可能稍微復雜點, 但方法與上面沒有太大差異, 除了新的 take_while_m_n
, take_while_m_n
與 take_while
類似, 不同的是它可以指定消耗輸入最小數和最大數m, n.
如果是空回復則嘗試匹配 rn
, 如果成功, 直接返回, 否則根據拿到的回復長度, 獲取那么多長度的字符, 接著應該碰到 rn
.
fn parse_batch(i: &str) -> IResult<&str, Reply> {let (i, _) = tag("$")(i)?;let (i, len) = (take_while1(|c: char| c.is_digit(10) || c == '-'))(i)?;if len == "-1" {let (i, _) = tag("rn")(i)?;Ok((i, Reply::Batch(None)))} else {let len = len.parse::<usize>().unwrap();let (i, resp) = delimited(tag("rn"), take_while_m_n(len, len, |_| true), tag("rn"))(i)?;Ok((i, Reply::Batch(Some(String::from(resp)))))}
}
多條批量回復
多條批量回復是由多個回復組成的數組, 數組中的每個元素都可以是任意類型的回復, 包括多條批量回復本身。 多條批量回復的第一個字節為 "*" , 后跟一個字符串表示的整數值, 這個值記錄了多條批量回復所包含的回復數量, 再后面是一個 CRLF
多條批量回復其實是對上面四種回復的嵌套, 但需要注意"空白多條批量回復"和"無內容多條批量回復"這兩種特殊情況.
空白多條回復為 "*0rn", 無內容多條批量回復為 "*-1rn", 在解析時需要對這兩種特殊情況進行處理. 在其他情況則可以應用 nom 提供的 alt
組合子服用之前的四個解析函數; alt
即"可選的", 它接受多個解析函數元組, 依次嘗試應用每個函數, 返回第一個成功解析結果或拋出錯誤.
同時對于重復應用某個解析函數 m 到 n 次這種模式, nom 提供了 many_m_n
組合子, 對于 fn parse_item(&str) -> IResult<&str, Reply>
這樣的函數, many_m_n(parse_item, 0, 12)
返回值為 IResult<&str, Vec<Reply>>
.
理清邏輯后解析多條批量回復的解析函數雖然有些長但還是很清晰的:
fn parse_multi_batch(i: &str) -> IResult<&str, Reply> {let (i, count) = delimited(tag("*"),take_while1(|c: char| c.is_digit(10) || c == '-'),tag("rn"),)(i)?;if count == "-1" {let (i, _) = tag("rn")(i)?;Ok((i, Reply::MultiBatch(None)))} else {let count = count.parse::<usize>().unwrap();let (i, responses) = many_m_n(count,count,alt((parse_single_line, parse_err, parse_int, parse_batch)),)(i)?;// 做個嚴格檢查, 檢查解析到的個數與預期的是否一致if responses.len() != count {Ok((i,Reply::BadReply(format!("expect {} items, got {}", count, responses.len())),))} else {Ok((i, Reply::MultiBatch(Some(responses))))}}
}
最后用 alt
做個"匯總"
fn parse(i: &str) -> IResult<&str, Reply> {alt((parse_single_line,parse_err,parse_int,parse_batch,parse_multi_batch,))(i)
}
至此我們我們的解析函數到完成了, 為 Reply
實現 Display
特性后對 redis 返回的消息應用 parse
然后把解析結果打印出來即可驗證解析函數正確性. 完整代碼在
匯總
完整代碼可以在我的 rrdis-cli 查看. 不知道大家對 nom 的評價如何, 我覺得使用 nom
提供的基本函數和一系列組合子從最小元素出發, 搭積木似的構建出更復雜的解析函數, 即降低了開發難度, 熟悉之后代碼邏輯還挺清晰的.
整個 rrdis-cli 項目實現 set, get, incr, lrange, rpush 和 ping 這基本命令, 實現其他命令也是非常簡單; 并且實現了絕大部分(還有一些特殊錯誤情況沒處理)協議解析, 整個項目代碼量如下
tokei .
-------------------------------------------------------------------------------Language Files Lines Code Comments Blanks
-------------------------------------------------------------------------------Markdown 1 4 4 0 0Rust 3 332 284 20 28TOML 1 15 12 1 2
-------------------------------------------------------------------------------Total 5 351 300 21 30
-------------------------------------------------------------------------------
Rust 代碼只有 332 行, 挺簡潔的, 估計比我用 Python 實現都少.
下一篇使用 nom 寫什么還不確定, 隨緣更新吧~
怎么說也是萬字長文, 如果覺得文章可以, 請點個贊, 謝謝~