項目github地址:
由于我們項目的需要,我就研究了一下關于websocket的相關內容,去實現一個聊天室的功能。
經過幾天的探索,現在使用Gin框架實現了一個完整的聊天室+消息實時通知系統。有什么不完善的地方還請大佬指正。
用到的技術
websocket、gin、mysql、redis、協程、通道
實現思路
說到聊天室可以有多種方法實現,例如:使用單純的MySQL也可以實現,但是為什么要選擇使用websocket去實現呢?有什么優勢呢?
websocket是基于TCP/IP,獨立的HTTP協議的雙向通信協議,這就使實時的消息通知成為可能, 同時又符合Go高效處理高并發的語言特點,結合聊天室又是高并發的,所以采取的室websocket進行消息的轉接,MySQL持久化聊天消息,redis用于做一些判斷。
首先用戶在進入App時,客戶端和服務端建立一個websocket連接,并開啟一個通道。
當服務端收到客戶端的消息后,將消息寫入通道里,服務端監聽通道的消息,并將消息取出,使用接收人的websocket連接將消息廣播到接收人那里。
實現代碼
下面開始實現:
創建模型,用于關系的確立及數據的傳輸
//數據庫存儲消息結構體,用于持久化歷史記錄
type ChatMessage struct {gorm.ModelDirection string //這條消息是從誰發給誰的SendID int //發送者idRecipientID int //接受者idGroupID string //群id,該消息要發到哪個群里面去Content string //內容Read bool //是否讀了這條消息
}//群聊結構體
type Group struct {ID string ` gorm:"primaryKey"` //群idCreatedAt time.TimeUpdatedAt time.TimeDeletedAt gorm.DeletedAt `gorm:"index"`GroupName string `json:"group_name"` //群名GroupContent string `json:"group_content"` //群簽名GroupIcon string `json:"group_icon"` //群頭像GroupNum int //群人數GroupOwnerId int //群主idUsers []User `gorm:"many2many:users_groups;"` //群成員
}type UsersGroup struct {GroupId string `json:"group_id"`UserId int `json:"user_id"`
}// 用于處理請求后返回一些數據
type ReplyMsg struct {From string `json:"from"`Code int `json:"code"`Content string `json:"content"`
}// 發送消息的類型
type SendMsg struct {Type int `json:"type"`RecipientID int `json:"recipient_id"` //接受者idContent string `json:"content"`
}// 用戶類
type Client struct {ID string //消息的去向RecipientID int //接受者idSendID int //發送人的idGroupID string //群聊idSocket *websocket.Conn //websocket連接對象Send chan []byte //發送消息用的管道
}// 廣播類,包括廣播內容和源用戶
type Broadcast struct {Client *ClientMessage []byteType int
}// 用戶管理,用于管理用戶的連接及斷開連接
type ClientManager struct {Clients map[string]*ClientBroadcast chan *BroadcastReply chan *ClientRegister chan *ClientUnregister chan *Client
}//創建一個用戶管理對象
var Manager = ClientManager{Clients: make(map[string]*Client), // 參與連接的用戶,出于性能的考慮,需要設置最大連接數Broadcast: make(chan *Broadcast),Register: make(chan *Client), //新建立的連接訪放入這里面Reply: make(chan *Client),Unregister: make(chan *Client), //新斷開的連接放入這里面
}
創建連接
func WsHandle(c *gin.Context) {myid := c.Query("myid")userid, err := strconv.Atoi(myid)if err != nil {zap.L().Error("轉換失敗", zap.Error(err))ResponseError(c, CodeParamError)}//將http協議升級為ws協議conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {return true}}).Upgrade(c.Writer, c.Request, nil)if err != nil {http.NotFound(c.Writer, c.Request)return}//創建一個用戶客戶端實例,用于記錄該用戶的連接信息client := new(model.Client)client = &model.Client{ID: myid + "->",SendID: userid,Socket: conn,Send: make(chan []byte),}//使用管道將實例注冊到用戶管理上model.Manager.Register <- client//開啟兩個協程用于讀寫消息go Read(client)go Write(client)
}//用于讀管道中的數據
func Read(c *model.Client) {//結束把通道關閉defer func() {model.Manager.Unregister <- c//關閉連接_ = c.Socket.Close()}()for {//先測試一下連接能不能連上c.Socket.PongHandler()sendMsg := new(model.SendMsg)err := c.Socket.ReadJSON(sendMsg)c.RecipientID = sendMsg.RecipientIDif err != nil {zap.L().Error("數據格式不正確", zap.Error(err))model.Manager.Unregister <- c_ = c.Socket.Close()return}//根據要發送的消息類型去判斷怎么處理//消息類型的后端調度switch sendMsg.Type {case 1: //私信SingleChat(c, sendMsg)case 2: //獲取未讀消息UnreadMessages(c)case 3: //拉取歷史消息記錄HistoryMsg(c, sendMsg)case 4: //群聊消息廣播GroupChat(c, sendMsg)}}
}//用于將數據寫進管道中
func Write(c *model.Client) {defer func() {_ = c.Socket.Close()}()for {select {//讀取管道里面的信息case message, ok := <-c.Send://連接不到就返回消息if !ok {_ = c.Socket.WriteMessage(websocket.CloseMessage, []byte{})return}fmt.Println(c.ID+"接收消息:", string(message))replyMsg := model.ReplyMsg{Code: int(CodeConnectionSuccess),Content: fmt.Sprintf("%s", string(message)),}msg, _ := json.Marshal(replyMsg)//將接收的消息發送到對應的websocket連接里rwLocker.Lock()_ = c.Socket.WriteMessage(websocket.TextMessage, msg)rwLocker.Unlock()}}
}
后端調度
//聊天的后端調度邏輯
//單聊
func SingleChat(c *model.Client, sendMsg *model.SendMsg) {//獲取當前用戶發出到固定用戶的消息r1, _ := redis.REDIS.Get(context.Background(), c.ID).Result()//從redis中取出固定用戶發給當前用戶的消息id := CreateId(strconv.Itoa(c.RecipientID), strconv.Itoa(c.SendID))r2, _ := redis.REDIS.Get(context.Background(), id).Result()//根據redis的結果去做未關注聊天次數限制if r2 >= "3" && r1 == "" {ResponseWebSocket(c.Socket, CodeLimiteTimes, "未相互關注,限制聊天次數")return} else {//將消息寫入redisredis.REDIS.Incr(context.Background(), c.ID)//設置消息的過期時間_, _ = redis.REDIS.Expire(context.Background(), c.ID, time.Hour*24*30*3).Result()}fmt.Println(c.ID+"發送消息:", sendMsg.Content)//將消息廣播出去model.Manager.Broadcast <- &model.Broadcast{Client: c,Message: []byte(sendMsg.Content),}
}//查看未讀消息
func UnreadMessages(c *model.Client) {//獲取數據庫中的未讀消息msgs, err := mysql.GetMessageUnread(c.SendID)if err != nil {ResponseWebSocket(c.Socket, CodeServerBusy, "服務繁忙")}for i, msg := range msgs {replyMsg := model.ReplyMsg{From: msg.Direction,Content: msg.Content,}message, _ := json.Marshal(replyMsg)_ = c.Socket.WriteMessage(websocket.TextMessage, message)//發送完后將消息設為已讀msgs[i].Read = trueerr := mysql.UpdateMessage(&msgs[i])if err != nil {ResponseWebSocket(c.Socket, CodeServerBusy, "服務繁忙")}}
}//拉取歷史消息記錄
func HistoryMsg(c *model.Client, sendMsg *model.SendMsg) {//拿到傳過來的時間timeT := TimeStringToGoTime(sendMsg.Content)//查找聊天記錄//做一個分頁處理,一次查詢十條數據,根據時間去限制次數//別人發給當前用戶的direction := CreateId(strconv.Itoa(c.RecipientID), strconv.Itoa(c.SendID))//當前用戶發出的id := CreateId(strconv.Itoa(c.SendID), strconv.Itoa(c.RecipientID))msgs, err := mysql.GetHistoryMsg(direction, id, timeT, 10)if err != nil {ResponseWebSocket(c.Socket, CodeServerBusy, "服務繁忙")}//把消息寫給用戶for _, msg := range *msgs {replyMsg := model.ReplyMsg{From: msg.Direction,Content: msg.Content,}message, _ := json.Marshal(replyMsg)_ = c.Socket.WriteMessage(websocket.TextMessage, message)//發送完后將消息設為已讀if err != nil {ResponseWebSocket(c.Socket, CodeServerBusy, "服務繁忙")}}
}//群聊消息廣播
func GroupChat(c *model.Client, sendMsg *model.SendMsg) {//根據消息類型判斷是否為群聊消息//先去數據庫查詢該群下的所有用戶users, err := mysql.GetAllGroupUser(strconv.Itoa(sendMsg.RecipientID))if err != nil {ResponseWebSocket(c.Socket, CodeServerBusy, "服務繁忙")}//向群里面的用戶廣播消息for _, user := range users {//獲取群里每個用戶的連接if int(user.ID) == c.SendID {continue}c.ID = strconv.Itoa(c.SendID) + "->"c.GroupID = strconv.Itoa(sendMsg.RecipientID)c.RecipientID = int(user.ID)model.Manager.Broadcast <- &model.Broadcast{Client: c,Message: []byte(sendMsg.Content),}}
}
轉發消息
//用于在啟動時進行監聽
func Start(manager *model.ClientManager) {for {fmt.Println("<-----監聽通信管道----->")select {//監測model.Manager.Register這個的變化,有新的東西加入管道時會被監聽到,從而建立連接case conn := <-model.Manager.Register:fmt.Println("建立新連接:", conn.ID)//將新建立的連接加入到用戶管理的map中,用于記錄連接對象,以連接人的id為鍵,以連接對象為值model.Manager.Clients[conn.ID] = conn//返回成功信息controller.ResponseWebSocket(conn.Socket, controller.CodeConnectionSuccess, "已連接至服務器")//斷開連接,監測到變化,有用戶斷開連接case conn := <-model.Manager.Unregister:fmt.Println("連接失敗:", conn.ID)if _, ok := model.Manager.Clients[conn.ID]; ok {controller.ResponseWebSocket(conn.Socket, controller.CodeConnectionBreak, "連接已斷開")}//關閉當前用戶使用的管道//close(conn.Send)//刪除用戶管理中的已連接的用戶delete(model.Manager.Clients, conn.ID)case broadcast := <-model.Manager.Broadcast: //廣播消息message := broadcast.MessagerecipientID := broadcast.Client.RecipientID//給一個變量用于確定狀態flag := falsecontentid := createId(strconv.Itoa(broadcast.Client.SendID), strconv.Itoa(recipientID))rID := strconv.Itoa(recipientID) + "->"//遍歷客戶端連接map,查找該用戶有沒有在線,判斷的是對方的連接例如:1要向2發消息,我現在是用戶1,那么我需要判斷2->1是否存在在用戶管理中for id, conn := range model.Manager.Clients {//如果找不到就說明用戶不在線,與接收人的id比較if id != rID {continue}//走到這一步,就說明用戶在線,就把消息放入管道里面select {case conn.Send <- message:flag = truedefault: //否則就把該連接從用戶管理中刪除close(conn.Send)delete(model.Manager.Clients, conn.ID)}}//判斷完之后就把將消息發給用戶if flag {fmt.Println("用戶在線應答")controller.ResponseWebSocket(model.Manager.Clients[rID].Socket, controller.CodeConnectionSuccess, string(message))//把消息插到數據庫中msg := model.ChatMessage{Direction: contentid,SendID: broadcast.Client.SendID,RecipientID: recipientID,GroupID: broadcast.Client.GroupID,Content: string(message),Read: true,}err := mysql.DB.Create(&msg).Errorif err != nil {zap.L().Error("在線發送消息出現了錯誤", zap.Error(err))}} else { //如果不在線controller.ResponseWebSocket(broadcast.Client.Socket, controller.CodeConnectionSuccess, "對方不在線")//把消息插到數據庫中msg := model.ChatMessage{Direction: contentid,SendID: broadcast.Client.SendID,RecipientID: recipientID,GroupID: broadcast.Client.GroupID,Content: string(message),Read: false,}err := mysql.DB.Create(&msg).Errorif err != nil {zap.L().Error("不在線發送消息出現了錯誤", zap.Error(err))}}}}}func createId(uid, toUid string) string {return uid + "->" + toUid
}