網關服務器
所謂網關,其實就是維持玩家客戶端的連接,將玩家發的游戲請求轉發到具體后端服務的服務器,具有以下幾個功能點:
- 長期運行,必須具有較高的穩定性和性能
- 對外開放,即客戶端需要知道網關的IP和端口,才能連接上來
- 多協議支持
- 統一入口,架構中可能存在很多后端服務,如果沒有一個統一入口,則客戶端需要知道每個后端服務的IP和端口
- 請求轉發,由于統一了入口,所以網關必須能將客戶端的請求轉發到準確的服務上,需要提供路由
- 無感更新,由于玩家連接的是網關服務器,只要連接不斷;更新后端服務器對玩家來說是無感知的,或者感知很少(根據實現方式不同)
- 業務無關(對于游戲服務器網關不可避免的可能會有一點業務)
對于http請求來說,micro框架本身已經實現了api網關,可以參閱之前的博客
牌類游戲使用微服務重構筆記(二): micro框架簡介:micro toolkit
但是對于游戲服務器,一般都是需要長鏈接的,需要我們自己實現
連接協議
網關本身應該是支持多協議的,這里就以websocket舉例說明我重構過程中的思路,其他協議類似。首先選擇提供websocket連接的庫 推薦使用melody,基于websocket庫,語法非常簡單,數行代碼即可實現websocket服務器。我們的游戲需要websocket網關的原因在于客戶端不支持HTTP2,不能和grpc服務器直連
package mainimport ("github.com/micro/go-web""gopkg.in/olahol/melody.v1""log""net/http"
)func main() {// New web serviceservice := web.NewService(web.Name("go.micro.api.gateway"))// parse command lineservice.Init()// new melodym := melody.New()// Handle websocket connectionservice.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {_ = m.HandleRequest(w, r)})// handle connection with new sessionm.HandleConnect(func(session *melody.Session) {})// handle disconnectionm.HandleDisconnect(func(session *melody.Session) {})// handle messagem.HandleMessage(func(session *melody.Session, bytes []byte) {})// run serviceif err := service.Run(); err != nil {log.Fatal("Run: ", err)}
}
復制代碼
請求轉發
網關可以收取或發送數據,并且數據結構比較統一都是[]byte
,這一點是不是很像grpc stream
,因此就可以使用protobuf
的oneof
特性來定義請求和響應,可參照上期博客
牌類游戲使用微服務重構筆記(六): protobuf爬坑
定義gateway.basic.proto
,對網關收/發的消息進行歸類
message Message {oneof message {Req req = 1; // 客戶端請求 要求響應Rsp rsp = 2; // 服務端響應Notify notify = 3; // 客戶端推送 不要求響應Event event = 4; // 服務端推送Stream stream = 5; // 雙向流請求Ping ping = 6; // pingPong pong = 7;// pong}
}
復制代碼
對于req
、notify
都是客戶端的無狀態請求,對應后端的無狀態服務器,這里僅需要實現自己的路由規則即可,比如
message Req {string serviceName = 1; // 服務名string method = 2; // 方法名bytes args = 3; // 參數google.protobuf.Timestamp timestamp = 4; // 時間戳...
}
復制代碼
- serviceName 調用rpc服務器的服務名
- method 調用rpc服務器的方法名
- args 調用參數
- timestamp 請求時間戳,用于客戶端對服務端響應做匹配識別,模擬http請求
req-rsp
思路與micro toolkit
的api網關類似(rpc 處理器),比較簡單,可參照之前的博客。
我們的項目對于此類請求都走http了,并沒有通過這個網關, 僅有一些基本的req
,比如authReq
處理session
認證。主要考慮是http簡單、無狀態、好維護,再加上此類業務對實時性要求也不高。
grpc stream轉發
游戲服務器一般都是有狀態的、雙向的、實時性要求較高,req-rsp
模式并不適合,就需要網關進行轉發。每添加一種grpc后端服務器,僅需要在oneof
中添加一個stream來拓展
message Stream {oneof stream {room.basic.Message roomMessage = 1; // 房間服務器game.basic.Message gameMessage = 2; // 游戲服務器mate.basic.Message mateMessage = 3; // 匹配服務器}
}
復制代碼
并且需要定義一個對應的路由請求,來處理轉發到哪一臺后端服務器上(實現不同也可以不需要),這里會涉及到一點業務,例如
message JoinRoomStreamReq {room.basic.RoomType roomType = 1;string roomNo = 2;
}
復制代碼
這里根據客戶端的路由請求的房間號和房間類型,網關來選擇正確的房間服務器(甚至可能鏈接到舊版本的房間服務器上)
選擇正確的服務器后,建立stream 雙向流
address := "xxxxxxx" // 選擇后的服務器地址
ctx := context.Background() // 頂層context
m := make(map[string]string) // some metadata
streamCtx, cancelFunc := context.WithCancel(ctx) // 復制一個子context// 建立stream 雙向流
stream, err := xxxClient.Stream(metadata.NewContext(streamCtx, m), client.WithAddress(address))// 存儲在session上
session.Set("stream", stream)
session.Set("cancelFunc", cancelFunc)// 啟動一個goroutine 收取stream消息并轉發
go func(c context.Context, s pb.xxxxxStream) {// 退出時關閉 streamdefer func() {session.Set("stream", nil)session.Set("cancelFunc", nil)if err := s.Close(); err != nil {// do something with close err}}()for {select {case <-c.Done():// do something with ctx cancelreturndefault:res, err := s.Recv()if err != nil {// do something with recv errreturn}// send to session 這里可以通過oneof包裝告知客戶端是哪個stream發來的消息...}}
}(streamCtx, stream)
復制代碼
轉發就比較簡單了,直接上代碼
對于某個stream的請求
message Stream {oneof stream {room.basic.Message roomMessage = 1; // 房間服務器game.basic.Message gameMessage = 2; // 游戲服務器mate.basic.Message mateMessage = 3; // 匹配服務器}
}
復制代碼
添加轉發代碼
s, exits := session.Get("stream")
if !exits {return
}if stream, ok := s.(pb.xxxxStream); ok {err := stream.Send(message)if err != nil {log.Println("send err:", err)return}
}
復制代碼
當需要關閉某個stream時, 只需要調用對應的cancelFunc
即可
if v, e := session.Get("cancelFunc"); e {if c, ok := v.(context.CancelFunc); ok {c()}
}
復制代碼
使用oneOf的好處
由于接收請求的入口統一,使用oneof
就可以一路switch case
,每添加一個req
或者一種stream
只需要添加一個case, 代碼看起來還是比較簡單、清爽的
func HandleMessageBinary(session *melody.Session, bytes []byte) {var msg pb.Messageif err := proto.Unmarshal(bytes, &msg); err != nil {// do somethingreturn}defer func() {err := recover()if err != nil {// do something with panic}}()switch x := msg.Message.(type) {case *pb.Message_Req:handleReq(session, x.Req)case *pb.Message_Stream:handleStream(session, x.Stream)case *pb.Message_Ping:handlePing(session, x.Ping)default:log.Println("unknown req type")}
}func handleStream(session *melody.Session, message *pb.Stream) {switch x := message.Stream.(type) {case *pb.Stream_RoomMessage:handleRoomStream(session, x.RoomMessage)case *pb.Stream_GameMessage:handleGameStream(session, x.GameMessage)case *pb.Stream_MateMessage:handleMateStream(session, x.MateMessage)}
}
復制代碼
熱更新
對于游戲熱更新不停服還是挺重要的,我的思路將會在之后的博客里介紹,可以關注一波 嘿嘿
坑!
- 這樣的網關,看似沒什么問題,然而跑上一段時間使用
pprof
觀測會發現goroutine
和內存都在緩慢增長,也就是存在goroutine leak!
,原因在于 micro源碼在包裝grpc時,沒有對關閉stream完善,只有收到io.EOF
的錯誤時才會關閉grpc的conn連接
func (g *grpcStream) Recv(msg interface{}) (err error) {defer g.setError(err)if err = g.stream.RecvMsg(msg); err != nil {if err == io.EOF {// #202 - inconsistent gRPC stream behavior// the only way to tell if the stream is done is when we get a EOF on the Recv// here we should close the underlying gRPC ClientConncloseErr := g.conn.Close()if closeErr != nil {err = closeErr}}}return
}
復制代碼
并且有一個TODO
// Close the gRPC send stream
// #202 - inconsistent gRPC stream behavior
// The underlying gRPC stream should not be closed here since the
// stream should still be able to receive after this function call
// TODO: should the conn be closed in another way?
func (g *grpcStream) Close() error {return g.stream.CloseSend()
}
復制代碼
解決方法也比較簡單,自己fork一份源碼改一下關閉stream的時候同時關閉conn(我們的業務是可以的因為在grpc stream客戶端和服務端均實現收到err后關閉stream),或者等作者更新用更科學的方式關閉
- melody的session在
get
和set
數據時會發生map的讀寫競爭而panic,可以查看issue,解決方法也比較簡單
一起學習
本人學習golang、micro、k8s、grpc、protobuf等知識的時間較短,如果有理解錯誤的地方,歡迎批評指正,可以加我微信一起探討學習