今天這篇是接上上篇RPC原理之后這篇是講如何使用go本身自帶的標準庫RPC。這篇篇幅會比較短。重點在于上一章對的補充。
文章目錄
- RPC包的概念
- 使用RPC包
- 服務器代碼分析
- 如何實現的?
- 總結
- Server還提供了兩個注冊服務的方法
- 客戶端代碼分析
- 如何實現的?
- 如何異步編程同步?
- 總結
- codec/序列化框架
- 使用JSON協議的RPC
RPC包的概念
回顧RPC原理
看完回顧后其實就可以繼續需了解并使用go中所提供的包。
Go語言的 rpc
包提供對通過網絡或其他i/o
連接導出的對象方法的訪問,服務器注冊一個對象,并把它作為服務對外可見(服務名稱就是類型名稱)。
注冊后,對象的導出方法將支持遠程訪問。服務器可以注冊不同類型的多個對象(服務) ,但是不支持注冊同一類型的多個對象。
Go官方提供了一個RPC庫: net/rpc
。
包rpc
提供了通過網絡訪問一個對象的輸出方法的能力。
服務器需要注冊對象,通過對象的類型名暴露這個服務。
注冊后這個對象的輸出方法就可以遠程調用,這個庫封裝了底層傳輸的細節,包括序列化(默認GOB
序列化器)。
對象的方法要能遠程訪問,它們必須滿足一定的條件,否則這個對象的方法會被忽略:
- 方法的
類型
是可輸出的 - 方法
本身
是可輸出的 - 方法必須由兩個參數,必須是輸出類型或者是內建類型
- 方法的第二個參數必須是
指針類型
- 方法返回類型為
error
所以一個輸出方法的格式如下:
func (t *T) MethodName(argType T1, replyType *T2) error
這里的T
、T1
、T2
能夠被encoding/gob
序列化,即使使用其它的序列化框架
,將來這個需求可能回被弱化。
- 第一個參數
(T1)
代表調用者(client)提供的參數 - 第二個參數
(*T2)
代表要返回給調用者的計算結果 - 方法的返回值如果不為空, 那么它作為一個
字符串
返回給調用者(所以需要一個序列化框架) - 如果返回
error
,則reply
參數不會返回給調用者
使用RPC包
簡單例子,是一個非常簡單的服務。
我們在這個里面就搞1
和12
就好:
在這個例子中定義了一個簡單的RPC服務器和客戶端,使用的方法是一個
第一步
需要定義傳入參數和返回參數的數據結構
type Args struct {A, B int
}
type Quotient struct {Quo, Rem int
}
第二步
定義一個服務對象,這個服務對象可以很簡單。
比如類型是int
或者是interface{}
,重要的是它輸出的方法。
type Arith int
第三步
實現這個類型的兩個方法, 乘法和除法:
func (t *Arith) Multiply(args *Args, reply *int) error {*reply = args.A * args.Breturn nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {if args.B == 0 {return errors.New("divide by zero")}quo.Quo = args.A / args.Bquo.Rem = args.A % args.Breturn nil
}
第四步
實現RPC服務器: 基于tcp實現
生成了一個Arith
對象,并使用rpc.Register
注冊這個服務,然后通過HTTP暴露出來
arith := new(Arith)
rpc.Register(arith)
rpc.HandleHTTP()
l, e := net.Listen("tcp", ":9091")
if e != nil {log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
select{
}
客戶端可以看到服務Arith
以及它的兩個方法Arith.Multiply
和Arith.Divide
第五步
創建一個客戶端,建立客戶端和服務器端的連接: 分為同步調用和異步調用(都是遠程調用)
同步調用:
client, err := rpc.DialHTTP("tcp", "127.0.0.1:9091")
if err != nil {log.Fatal("dialing:", err)
}args := &server.Args{7,8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
異步調用:
client, err := rpc.DialHTTP("tcp", "127.0.0.1:9091")
if err != nil {log.Fatal("dialing:", err)
}
quotient := new(Quotient)
divCall := client.Go("Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done // will be equal to divCall
// check errors, print, etc.
完整的例子:
- 創建一個
service.go
的文件用來保存結構體對象以及方法
package mainimport "errors"type Args struct {A, B int
}type Quotient struct {Quo, Rem int
}type Arith intfunc (t *Arith) Multiply(args *Args, reply *int) error {*reply = args.A * args.Breturn nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {if args.B == 0 {return errors.New("divide by zero")}quo.Quo = args.A / args.Bquo.Rem = args.A % args.Breturn nil
}
- 創建一個
RPC
服務端,server.go
package mainimport ("log""net""net/http""net/rpc"
)func main() {arith := new(Arith)rpc.Register(arith)rpc.HandleHTTP()l, e := net.Listen("tcp", ":9091")if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)select {}
}
- 創建一個客戶端,
client.go
package mainimport ("fmt""log""net/rpc"
)func main() {// 建立HTTP連接client, err := rpc.DialHTTP("tcp", "127.0.0.1:9091")if err != nil {log.Fatal("dialing:", err)}// 同步調用args := &Args{7, 8}var reply interr = client.Call("Arith.Multiply", args, &reply)if err != nil {log.Fatal("arith error:", err)}fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)// 異步調用quotient := new(Quotient)divCall := client.Go("Arith.Divide", args, quotient, nil)replyCall := <-divCall.Done // will be equal to divCall// check errors, print, etc.fmt.Println(replyCall.Error)fmt.Println(quotient)
}
打開終端:
先啟動服務器:
go run server.go service.go
在打開一個終端:
最后啟動一個客戶端:
go run client.go service.go
結果為:
服務器代碼分析
Server
的很多方法
你可以直接調用,這對于一個簡單的Server
的實現更方便,但是你如果需要配置不同的Server,
比如不同的監聽地址或端口,就需要自己生成Server:
var DefaultServer = NewServer()
Server
有多種Socket監聽
的方式:
func (server *Server) Accept(lis net.Listener)func (server *Server) HandleHTTP(rpcPath, debugPath string)func (server *Server) ServeCodec(codec ServerCodec)func (server *Server) ServeConn(conn io.ReadWriteCloser)func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)func (server *Server) ServeRequest(codec ServerCodec) error
ServeHTTP
: 實現了處理http
請求的業務邏輯,它首先處理http
的CONNECT
請求, 接收后就Hijacker
這個連接conn
, 然后調用ServeConn
在這個連接上處理這個客戶端的請求。- 其實是實現了
http.Handler
接口,我們一般不直接調用這個方法。Server.HandleHTTP
設置rpc的上下文路徑rpc.HandleHTTP
使用默認的上下文路徑`DefaultRPCPath
DefaultDebugPath
- 當你啟動一個
http server
的時候http.ListenAndServe
,面設置的上下文將用作RPC傳輸,這個上下文的請求會教給ServeHTTP
來處理 - 以上是
RPC over http
的實現,可以看出net/rpc
只是利用http CONNECT
建立連接,這和普通的RESTful api
還是不一樣的。 - (源碼)
- 其實是實現了
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {if req.Method != "CONNECT" {w.Header().Set("Content-Type", "text/plain; charset=utf-8")w.WriteHeader(http.StatusMethodNotAllowed)io.WriteString(w, "405 must CONNECT\n")return}conn, _, err := w.(http.Hijacker).Hijack()if err != nil {log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())return}io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")server.ServeConn(conn)
}
如何實現的?
Accept
用來處理一個監聽器,一直在監聽客戶端的連接,一旦監聽器接收了一個連接,則還是交給ServeConn
在另外一個goroutine
中去處理:(源碼)
//Accept接受偵聽器上的連接并提供請求
//每個傳入連接。接受阻塞,直到監聽器
//返回非nil錯誤。對象中調用Accept
//go語句
func (server *Server) Accept(lis net.Listener) {for {conn, err := lis.Accept()if err != nil {log.Print("rpc.Serve: accept:", err.Error())return}go server.ServeConn(conn)}
}
協程進入ServeConn
可以看出,很重要的一個方法就是ServeConn
// ServeConn在單連接上運行服務器。
// ServeConn阻塞,服務連接,直到客戶端掛起。
//調用者通常在go語句中調用ServeConn。
// ServeConn使用gob連接格式(參見包gob)
//連接。要使用備用編解碼器,請使用ServeCodec。
//有關并發訪問的信息,請參閱NewClient的注釋。.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {buf := bufio.NewWriter(conn)srv := &gobServerCodec{rwc: conn,dec: gob.NewDecoder(conn),enc: gob.NewEncoder(buf),encBuf: buf,}server.ServeCodec(srv)
}
連接其實是交給一個ServerCodec
去處理,這里默認
使用gobServerCodec
去處理,這是一個未輸出默認的編解碼器,可以使用其它的編解碼器。
// ServeCodec類似于ServeConn,但使用指定的編解碼器來
//解碼請求和編碼響應。
func (server *Server) ServeCodec(codec ServerCodec) {sending := new(sync.Mutex)wg := new(sync.WaitGroup)for {service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)if err != nil {if debugLog && err != io.EOF {log.Println("rpc:", err)}if !keepReading {break}// send a response if we actually managed to read a header.if req != nil {server.sendResponse(sending, req, invalidRequest, codec, err.Error())server.freeRequest(req)}continue}wg.Add(1)go service.call(server, sending, wg, mtype, req, argv, replyv, codec)}//我們已經看到沒有更多的請求。//在關閉編解碼器之前等待響應。wg.Wait()codec.Close()
}
它其實一直從連接中讀取請求,然后調用go service.call在另外的goroutine中處理服務調用。
總結
-
對象重用。
Request
和Response
都是可重用的,通過Lock
處理競爭。這在大并發
的情況下很有效。 -
使用了大量的
goroutine
。如果使用一定數量的goroutine
作為worker池
去處理這個case,可能還會有些性能的提升,但是更復雜了。使用goroutine
可以獲得了非常好的性能。 -
業務處理是異步的,服務的執行不會阻塞其它消息的讀取。
-
一個
codec實例
必然和一個connnection
相關,因為它需要從connection
中讀取request和發送response
。
go的rpc官方庫
的消息(request
和response
)的定義很簡單, 就是消息頭(header
)+內容體(body
)。
消息體是reply類型的序列化后的值。
type Request struct {ServiceMethod string // format: "Service.Method"Seq uint64 // 客戶端選擇的序列號// 包含過濾或未導出的字段
}
Server還提供了兩個注冊服務的方法
第二個方法為服務起一個別名,否則服務名已它的類型命名
func (server *Server) Register(rcvr interface{}) errorfunc (server *Server) RegisterName(name string, rcvr interface{}) error
它們倆底層調用register進行服務的注冊(這里的源碼太多就不放了)
func (server *Server) register(rcvr interface{}, name string, useName bool) error
受限于Go
語言的特點,我們不可能在接到客戶端的請求的時候,根據反射動態的創建一個對象,就是Java
那樣。
因此在Go語言中,我們需要預先創建一個服務map這是在編譯的時候完成的
說白了這里需要建立一個注冊名與服務之間的映射關系
server.serviceMap = make(map[string]*service)
同時每個服務還有一個方法map: map[string]*methodType
,通過suitableMethods
建立映射:
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType
這樣rpc
在讀取請求header
,通過查找這兩個map
,就可以得到要調用的服務及它的對應方法了。
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {if wg != nil {defer wg.Done()}mtype.Lock()mtype.numCalls++mtype.Unlock()function := mtype.method.Func// 調用該方法,為應答提供一個新值。returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})// 該方法的返回值是一個錯誤。.errInter := returnValues[0].Interface()errmsg := ""if errInter != nil {errmsg = errInter.(error).Error()}server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)server.freeRequest(req)
}
客戶端代碼分析
客戶端要建立和服務器的連接
func Dial(network, address string) (*Client, error)func DialHTTP(network, address string) (*Client, error)func DialHTTPPath(network, address, path string) (*Client, error)func NewClient(conn io.ReadWriteCloser) *Clientfunc NewClientWithCodec(codec ClientCodec) *Client
如何實現的?
DialHTTP
和 DialHTTPPath
是通過HTTP的方式和服務器建立連接,他倆的區別之在于是否設置上下文路徑:
// DialHTTPPath連接HTTP RPC服務器在指定的網絡地址和路徑上
func DialHTTPPath(network, address, path string) (*Client, error) {conn, err := net.Dial(network, address)if err != nil {return nil, err}io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")// 在切換到RPC協議之前,需要成功的HTTP響應resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})if err == nil && resp.Status == connected {return NewClient(conn), nil}if err == nil {err = errors.New("unexpected HTTP response: " + resp.Status)}conn.Close()return nil, &net.OpError{Op: "dial-http",Net: network + " " + address,Addr: nil,Err: err,}
}
首先發送 CONNECT
請求,如果連接成功則通過NewClient(conn)
創建client
。
Dial
則通過TCP
直接連接服務器
// Dial連接到指定網絡地址的RPC服務器
func Dial(network, address string) (*Client, error) {conn, err := net.Dial(network, address)if err != nil {return nil, err}return NewClient(conn), nil
}
注意:根據服務是over HTTP還是 over TCP選擇合適的連接方式
NewClient
則創建一個缺省codec
為glob序列化庫
的客戶端
// NewClient返回一個新的Client來處理到連接另一端的服務集合。
//在連接的寫端添加一個緩沖區,報頭和有效載荷作為一個單元發送。
//
//連接的讀寫部分是獨立序列化的,不需要聯鎖。然而,每一半都可以訪問并發,所以conn的實現應該防止,并發讀或并發寫。
func NewClient(conn io.ReadWriteCloser) *Client {encBuf := bufio.NewWriter(conn)client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}return NewClientWithCodec(client)
}
如果想用其它的序列化庫,你可以調用NewClientWithCodec
方法 一般用來做RPC框架的
// NewClientWithCodec類似于NewClient,但使用指定的編碼請求和解碼響應。
func NewClientWithCodec(codec ClientCodec) *Client {client := &Client{codec: codec,pending: make(map[uint64]*Call),}go client.input()return client
}
重要的是input
方法,它以一個死循環的方式不斷
地從連接中讀取response
,然后調用map
中讀取等待的Call.Done channel
通知完成。(這個其實有點令牌掃描的作用,消息隊列中有說)
消息的結構和服務器一致,都是Header+Body
的方式
客戶端的調用有兩個方法: Go
和 Call
Go方法
是異步的,它返回一個Call指針對象
, 它的Done
是一個channel
,如果服務返回,Done
就可以得到返回的對象(實際是Call對象
,包含Reply
和error信息
)。Call 方法
是同步的,它實際是調用Go實現的。
如何異步編程同步?
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Donereturn call.Error
}
從一個Channel
中讀取對象會被阻塞住,直到有對象可以讀取,這種實現很簡單,也很方便。
總結
從源碼中:我們還可以學到鎖Lock的一種實用方式,也就是盡快的釋放鎖,而不是defer mu.Unlock
直到函數執行到最后才釋放,那樣鎖
占用的時間太長了。
codec/序列化框架
rpc框架默認使用gob序列化庫,很多情況下我們追求更好的效率的情況下,或者追求更通用的序列化格式,我們可能采用其它的序列化方式, 比如protobuf, json, xml等。
市面上也有許多序列化框架。速度快而且非常好用。gRPC
是互聯網后臺常用的RPC
框架,其內部是由protobuf
協議完成通訊。這個后面再講。
(JDK Serializable
、FST
、Kryo
、Protobuf
、Thrift
、Hession
和Avro
,Fury
)
Fury
是最新的序列化框架:號稱比jdk 快170倍,后面會講的 支持多種語言
Go官方庫實現了JSON-RPC 1.0
。JSON-RPC
是一個通過JSON格式
進行消息傳輸的RPC規范
,因此可以進行跨語言的調用。
Go的net/rpc/jsonrpc
庫可以將JSON-RPC
的請求轉換成自己內部的格式:
func (c *serverCodec) ReadRequestHeader(r *rpc.Request) error {c.req.reset()if err := c.dec.Decode(&c.req); err != nil {return err}r.ServiceMethod = c.req.Methodc.mutex.Lock()c.seq++c.pending[c.seq] = c.req.Idc.req.Id = nilr.Seq = c.seqc.mutex.Unlock()return nil
}
使用JSON協議的RPC
rpc
包默認使用的是 gob 協議
對傳輸數據進行序列化/反序列化
,比較有局限性
。
將例子進行修改:
服務器端:
package mainimport ("log""net""net/rpc""net/rpc/jsonrpc"
)func main() {arith := new(Arith)rpc.Register(arith)l, e := net.Listen("tcp", ":9091")if e != nil {log.Fatal("listen error:", e)}for {conn, _ := l.Accept()// 使用JSON協議rpc.ServeCodec(jsonrpc.NewServerCodec(conn))}
}
客戶端:
package mainimport ("fmt""log""net""net/rpc""net/rpc/jsonrpc"
)func main() {// 建立HTTP連接conn, err := net.Dial("tcp", "127.0.0.1:9091")if err != nil {log.Fatal("dialing:", err)}client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))// 同步調用args := &Args{7, 8}var reply interr = client.Call("Arith.Multiply", args, &reply)if err != nil {log.Fatal("arith error:", err)}fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)// 異步調用quotient := new(Quotient)divCall := client.Go("Arith.Divide", args, quotient, nil)replyCall := <-divCall.Done // will be equal to divCall// check errors, print, etc.fmt.Println(replyCall.Error)fmt.Println(quotient)
}
如何使用與上面的例子一致。
社區中各式RPC框架(grpc、thrift等)就是為了讓RPC調用更方便。