godis源碼分析——Redis協議解析器

前言

redis這個目錄下的所有代碼就是為了一個事情,就是適配redis。

流程

redis下的基本流程

在這里插入圖片描述

源碼

在redis/client/client.go

主要是客戶端處理

package clientconst (created = iotarunningclosed
)type B struct {data   chan stringticker *time.Ticker
}// Client is a pipeline mode redis client
type Client struct {conn net.Conn// 等待發送pendingReqs chan *request // wait to send// 等待響應waitingReqs chan *request // waiting responseticker      *time.Tickeraddr        stringstatus  int32working *sync.WaitGroup // its counter presents unfinished requests(pending and waiting)
}// 這個一個發送到redis的請求結構
// request is a message sends to redis server
type request struct {id        uint64args      [][]bytereply     redis.Replyheartbeat boolwaiting   *wait.Waiterr       error
}const (chanSize = 256maxWait  = 3 * time.Second
)// MakeClient creates a new client
func MakeClient(addr string) (*Client, error) {conn, err := net.Dial("tcp", addr)if err != nil {return nil, err}return &Client{addr:        addr,conn:        conn,pendingReqs: make(chan *request, chanSize),waitingReqs: make(chan *request, chanSize),working:     &sync.WaitGroup{},}, nil
}// 開始啟動異步程序
// Start starts asynchronous goroutines
func (client *Client) Start() {client.ticker = time.NewTicker(10 * time.Second)// 每個方法都會監聽channelgo client.handleWrite()go client.handleRead()go client.heartbeat()atomic.StoreInt32(&client.status, running)
}// 異步關閉客戶端
// Close stops asynchronous goroutines and close connection
func (client *Client) Close() {atomic.StoreInt32(&client.status, closed)client.ticker.Stop()// stop new requestclose(client.pendingReqs)// wait stop processclient.working.Wait()// clean_ = client.conn.Close()close(client.waitingReqs)
}// 重新連接
func (client *Client) reconnect() {logger.Info("reconnect with: " + client.addr)_ = client.conn.Close() // ignore possible errors from repeated closesvar conn net.Connfor i := 0; i < 3; i++ {var err errorconn, err = net.Dial("tcp", client.addr)if err != nil {logger.Error("reconnect error: " + err.Error())time.Sleep(time.Second)continue} else {break}}if conn == nil { // reach max retry, abortclient.Close()return}client.conn = connclose(client.waitingReqs)for req := range client.waitingReqs {req.err = errors.New("connection closed")req.waiting.Done()}client.waitingReqs = make(chan *request, chanSize)// restart handle readgo client.handleRead()
}// 監聽發送心跳
func (client *Client) heartbeat() {for range client.ticker.C {client.doHeartbeat()}
}// 寫入監聽
func (client *Client) handleWrite() {for req := range client.pendingReqs {client.doRequest(req)}
}// 發送一個請求到redis服務器
// Send sends a request to redis server
func (client *Client) Send(args [][]byte) redis.Reply {if atomic.LoadInt32(&client.status) != running {return protocol.MakeErrReply("client closed")}req := &request{args:      args,heartbeat: false,waiting:   &wait.Wait{},}req.waiting.Add(1)client.working.Add(1)defer client.working.Done()// 放入client.pendingReqs <- reqtimeout := req.waiting.WaitWithTimeout(maxWait)if timeout {return protocol.MakeErrReply("server time out")}if req.err != nil {return protocol.MakeErrReply("request failed " + req.err.Error())}return req.reply
}// 心跳
func (client *Client) doHeartbeat() {request := &request{args:      [][]byte{[]byte("PING")},heartbeat: true,waiting:   &wait.Wait{},}request.waiting.Add(1)client.working.Add(1)defer client.working.Done()client.pendingReqs <- requestrequest.waiting.WaitWithTimeout(maxWait)
}func (client *Client) doRequest(req *request) {if req == nil || len(req.args) == 0 {return}// 數據轉換為bytere := protocol.MakeMultiBulkReply(req.args)bytes := re.ToBytes()var err error// 三次重試for i := 0; i < 3; i++ { // only retry, waiting for handleRead_, err = client.conn.Write(bytes)if err == nil ||(!strings.Contains(err.Error(), "timeout") && // only retry timeout!strings.Contains(err.Error(), "deadline exceeded")) {break}}if err == nil {// 成功發送通知client.waitingReqs <- req} else {req.err = errreq.waiting.Done()}
}// 完成請求
func (client *Client) finishRequest(reply redis.Reply) {defer func() {if err := recover(); err != nil {debug.PrintStack()logger.Error(err)}}()request := <-client.waitingReqsif request == nil {return}request.reply = replyif request.waiting != nil {request.waiting.Done()}
}// 處理響應數據
func (client *Client) handleRead() {// 數據轉義ch := parser.ParseStream(client.conn)for payload := range ch {// 檢查消息體有沒有錯誤if payload.Err != nil {status := atomic.LoadInt32(&client.status)if status == closed {return}client.reconnect()return}client.finishRequest(payload.Data)}
}

在redis/conn/conn.go

TCP連接方法管理

import ("net""sync""time""github.com/hdt3213/godis/lib/logger""github.com/hdt3213/godis/lib/sync/wait"
)const (// flagSlave means this a connection with slaveflagSlave = uint64(1 << iota)// flagSlave means this a connection with masterflagMaster// flagMulti means this connection is within a transactionflagMulti
)// Connection represents a connection with a redis-cli
type Connection struct {conn net.Conn// wait until finish sending data, used for graceful shutdownsendingData wait.Wait// lock while server sending responsemu    sync.Mutexflags uint64// subscribing channelssubs map[string]bool// password may be changed by CONFIG command during runtime, so store the passwordpassword string// queued commands for `multi`queue    [][][]bytewatching map[string]uint32txErrors []error// selected dbselectedDB int
}// 連接池
var connPool = sync.Pool{New: func() interface{} {return &Connection{}},
}// 返回遠程地址
// RemoteAddr returns the remote network address
func (c *Connection) RemoteAddr() string {return c.conn.RemoteAddr().String()
}// Close disconnect with the client
func (c *Connection) Close() error {c.sendingData.WaitWithTimeout(10 * time.Second)_ = c.conn.Close()c.subs = nilc.password = ""c.queue = nilc.watching = nilc.txErrors = nilc.selectedDB = 0connPool.Put(c)return nil
}// 創建一個連接實例
// NewConn creates Connection instance
func NewConn(conn net.Conn) *Connection {// 從線程池去c, ok := connPool.Get().(*Connection)if !ok {logger.Error("connection pool make wrong type")return &Connection{conn: conn,}}c.conn = connreturn c
}// Write sends response to client over tcp connection
func (c *Connection) Write(b []byte) (int, error) {if len(b) == 0 {return 0, nil}c.sendingData.Add(1)defer func() {c.sendingData.Done()}()return c.conn.Write(b)
}// 獲取連接名稱
func (c *Connection) Name() string {if c.conn != nil {return c.conn.RemoteAddr().String()}return ""
}// 訂閱放入map
// Subscribe add current connection into subscribers of the given channel
func (c *Connection) Subscribe(channel string) {c.mu.Lock()defer c.mu.Unlock()if c.subs == nil {c.subs = make(map[string]bool)}c.subs[channel] = true
}// 訂閱刪除
// UnSubscribe removes current connection into subscribers of the given channel
func (c *Connection) UnSubscribe(channel string) {c.mu.Lock()defer c.mu.Unlock()if len(c.subs) == 0 {return}delete(c.subs, channel)
}// 獲取訂閱集合長度
// SubsCount returns the number of subscribing channels
func (c *Connection) SubsCount() int {return len(c.subs)
}// GetChannels returns all subscribing channels
func (c *Connection) GetChannels() []string {if c.subs == nil {return make([]string, 0)}channels := make([]string, len(c.subs))i := 0for channel := range c.subs {channels[i] = channeli++}return channels
}// 設置密碼
// SetPassword stores password for authentication
func (c *Connection) SetPassword(password string) {c.password = password
}// 獲取密碼
// GetPassword get password for authentication
func (c *Connection) GetPassword() string {return c.password
}// 獲取可變狀態
// InMultiState tells is connection in an uncommitted transaction
func (c *Connection) InMultiState() bool {return c.flags&flagMulti > 0
}// 設置可變狀態
// SetMultiState sets transaction flag
func (c *Connection) SetMultiState(state bool) {if !state { // reset data when cancel multic.watching = nilc.queue = nilc.flags &= ^flagMulti // clean multi flagreturn}c.flags |= flagMulti
}// 返回當前事務的隊列命令
// GetQueuedCmdLine returns queued commands of current transaction
func (c *Connection) GetQueuedCmdLine() [][][]byte {return c.queue
}// 命令加入隊列
// EnqueueCmd  enqueues command of current transaction
func (c *Connection) EnqueueCmd(cmdLine [][]byte) {c.queue = append(c.queue, cmdLine)
}// AddTxError stores syntax error within transaction
func (c *Connection) AddTxError(err error) {c.txErrors = append(c.txErrors, err)
}// GetTxErrors returns syntax error within transaction
func (c *Connection) GetTxErrors() []error {return c.txErrors
}// ClearQueuedCmds clears queued commands of current transaction
func (c *Connection) ClearQueuedCmds() {c.queue = nil
}// GetWatching returns watching keys and their version code when started watching
func (c *Connection) GetWatching() map[string]uint32 {if c.watching == nil {c.watching = make(map[string]uint32)}return c.watching
}// GetDBIndex returns selected db
func (c *Connection) GetDBIndex() int {return c.selectedDB
}// SelectDB selects a database
func (c *Connection) SelectDB(dbNum int) {c.selectedDB = dbNum
}func (c *Connection) SetSlave() {c.flags |= flagSlave
}func (c *Connection) IsSlave() bool {return c.flags&flagSlave > 0
}func (c *Connection) SetMaster() {c.flags |= flagMaster
}func (c *Connection) IsMaster() bool {return c.flags&flagMaster > 0
}

在redis/conn/fake.go

假連接,用于測試

在redis/parser/parser.go

用于解析客戶端發來的數據

package parserimport ("bufio""bytes""errors""io""runtime/debug""strconv""strings""github.com/hdt3213/godis/interface/redis""github.com/hdt3213/godis/lib/logger""github.com/hdt3213/godis/redis/protocol"
)// 消息體結構
// Payload stores redis.Reply or error
type Payload struct {Data redis.ReplyErr  error
}// 解析從io流的數據
// ParseStream reads data from io.Reader and send payloads through channel
func ParseStream(reader io.Reader) <-chan *Payload {ch := make(chan *Payload)go parse0(reader, ch)return ch
}// 解析byte
// ParseBytes reads data from []byte and return all replies
func ParseBytes(data []byte) ([]redis.Reply, error) {ch := make(chan *Payload)reader := bytes.NewReader(data)go parse0(reader, ch)var results []redis.Replyfor payload := range ch {if payload == nil {return nil, errors.New("no protocol")}if payload.Err != nil {if payload.Err == io.EOF {break}return nil, payload.Err}results = append(results, payload.Data)}return results, nil
}// 解析第一個消息體
// ParseOne reads data from []byte and return the first payload
func ParseOne(data []byte) (redis.Reply, error) {ch := make(chan *Payload)reader := bytes.NewReader(data)go parse0(reader, ch)payload := <-ch // parse0 will close the channelif payload == nil {return nil, errors.New("no protocol")}return payload.Data, payload.Err
}// 私有方法,
func parse0(rawReader io.Reader, ch chan<- *Payload) {// 最后判斷有無錯誤,有則打印日志defer func() {if err := recover(); err != nil {logger.Error(err, string(debug.Stack()))}}()// 解析流reader := bufio.NewReader(rawReader)for {line, err := reader.ReadBytes('\n')if err != nil {// 異常處理ch <- &Payload{Err: err}close(ch)return}// 解析長度length := len(line)// 過短異常if length <= 2 || line[length-2] != '\r' {// there are some empty lines within replication traffic, ignore this error//protocolError(ch, "empty line")continue}line = bytes.TrimSuffix(line, []byte{'\r', '\n'})// 根據不同的字符,做不同的解析方法,ASCII判斷switch line[0] {case '+':content := string(line[1:])ch <- &Payload{Data: protocol.MakeStatusReply(content),}if strings.HasPrefix(content, "FULLRESYNC") {err = parseRDBBulkString(reader, ch)if err != nil {ch <- &Payload{Err: err}close(ch)return}}case '-':ch <- &Payload{Data: protocol.MakeErrReply(string(line[1:])),}case ':':value, err := strconv.ParseInt(string(line[1:]), 10, 64)if err != nil {protocolError(ch, "illegal number "+string(line[1:]))continue}ch <- &Payload{Data: protocol.MakeIntReply(value),}case '$':err = parseBulkString(line, reader, ch)if err != nil {ch <- &Payload{Err: err}close(ch)return}case '*':err = parseArray(line, reader, ch)if err != nil {ch <- &Payload{Err: err}close(ch)return}default:args := bytes.Split(line, []byte{' '})ch <- &Payload{Data: protocol.MakeMultiBulkReply(args),}}}
}// 解析字符串
func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)if err != nil || strLen < -1 {protocolError(ch, "illegal bulk string header: "+string(header))return nil} else if strLen == -1 {ch <- &Payload{Data: protocol.MakeNullBulkReply(),}return nil}body := make([]byte, strLen+2)_, err = io.ReadFull(reader, body)if err != nil {return err}ch <- &Payload{Data: protocol.MakeBulkReply(body[:len(body)-2]),}return nil
}// RDB和后續AOF之間沒有CRLF,因此需要區別對待
// there is no CRLF between RDB and following AOF, therefore it needs to be treated differently
func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error {header, err := reader.ReadBytes('\n')header = bytes.TrimSuffix(header, []byte{'\r', '\n'})if len(header) == 0 {return errors.New("empty header")}strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)if err != nil || strLen <= 0 {return errors.New("illegal bulk header: " + string(header))}body := make([]byte, strLen)_, err = io.ReadFull(reader, body)if err != nil {return err}ch <- &Payload{Data: protocol.MakeBulkReply(body[:len(body)]),}return nil
}func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64)// nStrs > 0為合法if err != nil || nStrs < 0 {protocolError(ch, "illegal array header "+string(header[1:]))return nil} else if nStrs == 0 {ch <- &Payload{Data: protocol.MakeEmptyMultiBulkReply(),}return nil}// 消息合法判斷lines := make([][]byte, 0, nStrs)for i := int64(0); i < nStrs; i++ {var line []byteline, err = reader.ReadBytes('\n')if err != nil {return err}length := len(line)if length < 4 || line[length-2] != '\r' || line[0] != '$' {protocolError(ch, "illegal bulk string header "+string(line))break}strLen, err := strconv.ParseInt(string(line[1:length-2]), 10, 64)if err != nil || strLen < -1 {protocolError(ch, "illegal bulk string length "+string(line))break} else if strLen == -1 {lines = append(lines, []byte{})} else {body := make([]byte, strLen+2)_, err := io.ReadFull(reader, body)if err != nil {return err}lines = append(lines, body[:len(body)-2])}}// 合法消息裝入通道ch <- &Payload{Data: protocol.MakeMultiBulkReply(lines),}return nil
}func protocolError(ch chan<- *Payload, msg string) {err := errors.New("protocol error: " + msg)ch <- &Payload{Err: err}
}

在redis/protocol/asserts/asserts.go

用于測試檢查

在redis/protocol/consts.go

定義的一些常量

在redis/protocol/errors.go

定義的一些錯誤

在redis/protocol/reply.go

協議消息返回

在redis/server/server.go

TCP服務接收到連接后,異步拉起服務,用于客戶端的消息處理

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/44893.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/44893.shtml
英文地址,請注明出處:http://en.pswp.cn/web/44893.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Docker安裝RabbitMQ(帶web管理端)

1.拉取帶web管理的鏡像 可以拉取rabbitmq對應版本的web管理端&#xff0c;比如&#xff1a;rabbitmq:3.9.11-management&#xff0c;也可以直接拉取帶web管理端的最新版本 rabbitmq:management. docker pull rabbitmq:3.9.11-management 注意&#xff1a;如果docker pull ra…

sqlalchemy使用with_entities返回指定數據列

sqlalchemy使用with_entities返回指定數據列 在 SQLAlchemy 中,with_entities 方法用于指定查詢語句返回的實體(Entity)或列(Column)。它允許你限制查詢的返回結果,只包含你感興趣的特定字段或實體 使用方法 假設有一個名為 User 的 SQLAlchemy 模型類,包含以下字段:…

Unity3D中如何降低游戲的Drawcall詳解

在Unity3D游戲開發中&#xff0c;Drawcall是一個至關重要的性能指標&#xff0c;它指的是CPU通知GPU繪制一個物體的命令次數。過多的Drawcall會導致游戲性能下降&#xff0c;因此優化Drawcall的數量是提高游戲性能的關鍵。本文將詳細介紹Unity3D中降低Drawcall的幾種主要方法&a…

設計模式使用場景實現示例及優缺點(行為型模式——模板方法模式)

模板方法模式&#xff08;Template Method Pattern&#xff09; 模板方法模式&#xff08;Template Method Pattern&#xff09;是一種行為設計模式&#xff0c;它定義了一個操作中的算法的骨架&#xff0c;將算法的一些步驟延遲到子類中。這樣可以在不改變算法的結構的前提下…

Git使用介紹教程

Git使用介紹教程 小白第一次寫博客,內容寫的可能不是很詳細,僅供參考,大家一起努力 gitee網址:https://gitee.com 大部分的開發團隊都以 Git 作為自己的版本控制工具,需要對 Git 的使用非常的熟悉。這篇文章中本人整理了自己在開發過程中經常使用到的 Git 命令,方便在偶…

jenkins系列-06.harbor

https://github.com/goharbor/harbor/releases?page2 https://github.com/goharbor/harbor/releases/download/v2.3.4/harbor-offline-installer-v2.3.4.tgz harbor官網&#xff1a;https://goharbor.io/ 點擊 Download now 鏈接&#xff0c;會自動跳轉到上述github頁面&am…

C++ | Leetcode C++題解之第233題數字1的個數

題目&#xff1a; 題解&#xff1a; class Solution { public:int countDigitOne(int n) {// mulk 表示 10^k// 在下面的代碼中&#xff0c;可以發現 k 并沒有被直接使用到&#xff08;都是使用 10^k&#xff09;// 但為了讓代碼看起來更加直觀&#xff0c;這里保留了 klong l…

Redis系列命令更新--Redis哈希命令

一、設置密碼驗證&#xff1a; 使用文本編輯器&#xff0c;這里使用Notepad&#xff0c;打開Redis服務配置文件。 注意&#xff1a;不要找錯了&#xff0c;通常為redis.windows-service.conf&#xff0c;而不是redis.windows.conf。后者是以非系統服務方式啟動程序使用的配置…

使用個人p12證書請求https接口數據

依賴 <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.3</version></dependency>code package com.hexin.cbas.test;import org.apache.commons.net.util.TrustM…

《BASeg: Boundary aware semantic segmentation for autonomous driving》論文解讀

期刊&#xff1a;Neural Networks | Journal | ScienceDirect.com by Elsevier 年份&#xff1a;2023 代碼&#xff1a;https://github.com/Lature-Yang/BASeg 摘要 語義分割是自動駕駛領域街道理解任務的重要組成部分。現有的各種方法要么專注于通過聚合全局或多尺度上下文…

曠野之間20 - Google 研究的推測 RAG

為什么選擇 RAG 新興能力 直到最近&#xff0c;人們發現 LLM 具有新興能力&#xff0c;即在與用戶或任務交互過程中出現的意外功能。 這些功能的示例包括&#xff1a; 解決問題&#xff1a; LLM 可以利用其語言理解和推理能力&#xff0c;為未經過明確培訓的任務提供富有洞…

js的原型鏈

原型鏈: 1.如何構成原型鏈&#xff1f; 2.原型鏈上屬性的增刪改查。 3.絕大多數對象的最終都會繼承自Object.prototype (var obj Object.create(null或者undefined)沒有原型)。 4.Object.create(原型)。 構成原型鏈和操作原型鏈屬性&#xff1a; //最頂的原型是Object.pro…

性能優化篇:SQL數據庫查表速度優化

SQL數據庫查詢的性能優化是確保數據庫能夠快速響應和高效處理請求的關鍵。以下是一些常見的SQL數據庫查詢性能優化方法: 索引優化: 創建適當的索引:為經常在WHERE子句中使用的列、JOIN操作涉及的列以及排序操作涉及的列創建索引。避免過多的索引:雖然索引可以提高查詢速度,…

python的字符串

字符串 簡單操作 創建 利用 ‘ ’ 或 “ ” 將字符或數字包裹起來的都為字符串 a"你好" 格式化字符串 元組的字符格式化 字符串格式化函數 srt.format() f格式化 方法 split()//指定分割符經行分割 strip()//指定移除字符頭尾的字符 join()//指定序列中的字符連接成新…

【Perl】Perl 語言入門

1. Perl語言介紹 Perl 是一種高級、解釋型、動態編程語言&#xff0c;由Larry Wall在1987年發布。Perl 以其強大的文本處理能力而聞名&#xff0c;特別是在處理報告生成、文件轉換、系統管理任務等方面。它吸收了C、Shell腳本語言、AWK、sed等語言的特性&#xff0c;并加入了大…

Go:常量運算符流程控制

目錄 一、常量 1.1 常量基本定義 1.2 常量組的定義 1.3 常量枚舉 二、運算符 2.1 算數運算符 2.2 關系運算符 2.3 邏輯運算符 2.4 位運算符 2.5 賦值運算符 2.6 指針運算符 2.7 運算符優先級 三、流程控制 3.1 if-else 條件語句 3.2 switch-case語句 3.3 for 循…

5、 測試

這里寫目錄標題 1、自動化測試簡介&#xff08;1&#xff09;自動化測試是什么&#xff08;2&#xff09;為什么要寫測試測試節約你的時間發現錯誤&#xff0c;預防錯誤測試使得代碼更有吸引力 2、基礎測試策略3、開始寫第一個測試&#xff08;1&#xff09;首先得有個bug&…

Not Invented Here 不是在這里發明的 / Proudly found elsewhere 自豪地在其他地方找到

注&#xff1a; 機翻&#xff0c;未校對。 兩篇關于創新管理的小文章 Not Invented Here 不是在這里發明的 In the history of organizational success, the enterprises that dominate tend to flawlessly execute on ideas that were created elsewhere. Examine just abo…

智慧水利解決方案:從理論到實踐的全面跨越,展示其在水資源管理、水災害預警、水生態保護等方面的創新應用

目錄 一、引言&#xff1a;智慧水利的時代背景與意義 二、智慧水利的理論框架與技術體系 1、理論框架 2、技術體系 三、智慧水利在水資源管理中的應用 1、水資源優化配置 2、水量水質協同管理 四、智慧水利在水災害預警中的應用 1、洪水預警與應急響應 2、干旱監測與評…

git 創建分支--命令行

在Git中創建分支是一個相對簡單且重要的操作&#xff0c;它允許開發者在不影響主代碼庫的情況下進行開發或修復工作。以下是創建Git分支的步驟&#xff1a; 一、基本步驟 打開命令行終端&#xff1a; 首先&#xff0c;需要打開命令行終端&#xff08;在Windows上可以是CMD、Po…