tcp模塊
定義相關類
- Client:表示客戶端連接,包含網絡連接conn、指向服務器的指針Server和Channel指針c。
- server:表示TCP服務器,包含服務器地址address、TLS配置config以及三個回調函數:
- onNewClientCallback:新客戶端連接時調用。
- onClientConnectionClosed:客戶端連接關閉時調用。
- onNewMessage:客戶端接收新消息時調用。
客戶端相關接口
- Client.listen():客戶端監聽方法,讀取連接數據,調用onNewMessage回調。
- Client.Send(message string):發送文本消息給客戶端。
- Client.SendBytes(b []byte):發送字節數據給客戶端。
- Client.Conn():獲取客戶端的網絡連接。
- Client.Close():關閉客戶端的網絡連接。
服務器相關接口
- server.OnNewClient(callback func(c *Client)):設置新客戶端連接的回調。
- server.OnClientConnectionClosed(callback func(c *Client, err error)):設置客戶端連接關閉的回調。
- server.OnNewMessage(callback func(c *Client, message []byte, size int)):設置客戶端接收新消息的回調。
- server.Listen():啟動網絡服務器,監聽連接。
服務器初始化
- NewTcpServer(address string) *server:創建新的TCP服務器實例,不使用TLS。
- NewTcpServerWithTLS(address, certFile, keyFile string) *server:創建帶有TLS功能的TCP服務器實例。
服務器啟動流程
- 使用NewTcpServer或NewTcpServerWithTLS創建服務器實例。
- 設置回調函數,響應新客戶端連接、客戶端連接關閉和接收新消息事件。
- 調用server.Listen()開始監聽連接。
TLS支持
- 如果需要TLS,使用NewTcpServerWithTLS函數,提供證書和密鑰文件路徑。
參考demo
package MediasoupLibimport ("bufio""time""crypto/tls""log""net"
)type Client struct {conn net.ConnServer *serverc *Channel
}
type server struct {address string config *tls.ConfigonNewClientCallback func(c *Client)onClientConnectionClosed func(c *Client, err error)onNewMessage func(c *Client, message []byte, size int)
}
func (c *Client) listen() {fmt.Printf("tcp client listen() ")c.Server.onNewClientCallback(c)reader := bufio.NewReader(c.conn)for {recv := make([]byte, 1500) size, err := reader.Read(recv)if err != nil {c.conn.Close()c.Server.onClientConnectionClosed(c, err)fmt.Printf("tcp client close! %s", err.Error())return}if size == 0 {time.Sleep(time.Millisecond * 250)fmt.Printf("tcp client recv size=0")continue}recv = recv[0:size]c.Server.onNewMessage(c, recv, size)}
}
func (c *Client) Send(message string) error {return c.SendBytes([]byte(message))
}
func (c *Client) SendBytes(b []byte) error {_, err := c.conn.Write(b)if err != nil {c.conn.Close()c.Server.onClientConnectionClosed(c, err)}return err
}func (c *Client) Conn() net.Conn {return c.conn
}func (c *Client) Close() error {return c.conn.Close()
}
func (s *server) OnNewClient(callback func(c *Client)) {s.onNewClientCallback = callback
}
func (s *server) OnClientConnectionClosed(callback func(c *Client, err error)) {s.onClientConnectionClosed = callback
}
func (s *server) OnNewMessage(callback func(c *Client, message []byte, size int)) {s.onNewMessage = callback
}
func (s *server) Listen() {var listener net.Listenervar err errorif s.config == nil {listener, err = net.Listen("tcp", s.address)} else {listener, err = tls.Listen("tcp", s.address, s.config)}if err != nil {fmt.Printf("Error starting TCP server.\r\n", err)}defer listener.Close()for {conn, err := listener.Accept()if err != nil {fmt.Printf("tcpserver listner Accept error:%s", err.Error())}client := &Client{conn: conn,Server: s,}go client.listen()}
}
func NewTcpServer(address string) *server {fmt.Printf("Creating server with address %s", address)server := &server{address: address,}server.OnNewClient(func(c *Client) {c.c = NewChannel(c)})server.OnNewMessage(func(c *Client, message []byte, size int) {c.c.onData(c, message, size)})server.OnClientConnectionClosed(func(c *Client, err error) {c.c.Close()fmt.Printf("OnClientConnectionClosed err = %s", err.Error())})return server
}func NewTcpServerWithTLS(address, certFile, keyFile string) *server {cert, err := tls.LoadX509KeyPair(certFile, keyFile)if err != nil {fmt.Printf("Error loading certificate files. Unable to create TCP server with TLS functionality.\r\n", err)}config := &tls.Config{Certificates: []tls.Certificate{cert},}server := NewTcpServer(address)server.config = configreturn server}
channel模塊
ChannelListener接口定義:
- ChannelListener:定義了一個接口,包含兩個方法OnChannelEvent和OnChannelStringEvent,用于監聽通道事件。
Channel結構體:
- 包含字段如MediasoupClient(指向Client的指針)、PendingSent(一個同步映射,用于存儲待發送的數據)、LastBinaryNotification、ID、Pid、udpAddress、udpPort、queue和messageQueue(兩個循環隊列)、Num和Listeners(一個映射,存儲監聽器)。
- 包含一個互斥鎖mutex,用于并發控制。
Channel的接口:
- NewChannel:構造函數,創建并返回一個新的Channel實例。
- AddListener和RemoveListener:用于添加和移除監聽器。
- processMessage:處理接收到的消息。
- onData:處理接收到的數據。
- handle:一個循環,從隊列中取出項目并處理。
- handleMessage:處理消息隊列中的消息。
- process:根據消息類型進行不同的處理。
- Close:關閉通道,清理資源。
- Request:發送請求并返回一個通道用于接收響應。
- SetUdp:設置UDP地址和端口。
并發處理:
- 使用sync.Map和sync.RWMutex來處理并發,確保數據的一致性和線程安全。
循環隊列:
- 使用MeetGo.CycleQueue作為循環隊列,用于存儲消息和數據。
參考demo
import ("encoding/json""fmt""strconv""sync""time""strings"
)
var REQUEST_TIMEOUT = 30000type SendRequest struct {ID stringMethod stringInternal map[string]interface{}Data map[string]interface{}
}
type SendReponse struct {ID intTargetId intEvent stringAccepted boolRejected boolInternal map[string]interface{}Data map[string]interface{}Reason stringBinary bool
}type AsyncSingal struct {Async chan SendReponse
}
type CycleQueue struct {data []interface{} front int rear int cap int
}func NewCycleQueue(cap int) *CycleQueue {return &CycleQueue{data: make([]interface{}, cap),cap: cap,front: 0,rear: 0,}
}
func (q *CycleQueue) Push(data interface{}) bool {if (q.rear+1)%q.cap == q.front { return false}q.data[q.rear] = data q.rear = (q.rear + 1) % q.cap return true
}
func (q *CycleQueue) Pop() interface{} {if q.rear == q.front {return nil}data := q.data[q.front]q.data[q.front] = nilq.front = (q.front + 1) % q.capreturn data
}
func (q *CycleQueue) QueueLength() int {return (q.rear - q.front + q.cap) % q.cap
}func (q *CycleQueue) FindDataByRequestId(requestId string) string {for i := 0; i < q.QueueLength(); i++ {if strings.Count(q.data[i].(string), requestId) == 1 {emitData := q.data[i].(string)q.data = append(q.data[:i], q.data[i+1:]...)return emitData}}return ""
}
import ("encoding/json""fmt""strconv""sync""time"MeetGo "vrv.meeting.server/MeetGo"
)const NS_MAX_SIZE int = 655350var messageBuffer = make([]byte, NS_MAX_SIZE)
var messageIndex = 0type ChannelListener interface {OnChannelEvent(string, map[string]interface{})OnChannelStringEvent(string, string)
}type Channel struct {MediasoupClient *ClientPendingSent sync.MapLastBinaryNotification interface{}ID intPid intudpAddress stringudpPort intqueue CycleQueuemessageQueue CycleQueueNum intListeners map[string]ChannelListenermutex sync.RWMutex
}func NewChannel(tcpClient *Client) *Channel {channel := new(Channel)channel.MediasoupClient = tcpClientchannel.queue = MeetGo.NewCycleQueue(1000)channel.messageQueue = MeetGo.NewCycleQueue(10000)channel.Num = 0channel.Listeners = make(map[string]ChannelListener, 100)go channel.handle()go channel.handleMessage()return channel
}func (channel *Channel) AddListener(id string, listener ChannelListener) {channel.mutex.Lock()channel.Listeners[id] = listenerchannel.mutex.Unlock()
}func (channel *Channel) RemoveListener(id string) {channel.mutex.Lock()delete(channel.Listeners, id)channel.mutex.Unlock()
}func (channel *Channel) processMessage(message string) {jsonMessage := make(map[string]interface{})err := json.Unmarshal([]byte(message), &jsonMessage)if err != nil {MeetGo.Log.Error("Channel processMessage error:%s", err.Error())return}if jsonMessage["registId"] != nil {MeetGo.Log.Debug("client registId succeeded [id:%s]", jsonMessage["registId"].(string))channel.ID, _ = strconv.Atoi(jsonMessage["registId"].(string))channel.Pid = int(jsonMessage["pid"].(float64))Global_Worker.OnMediasoupWorkerOnline(channel.ID, channel, jsonMessage["registId"].(string))} else if jsonMessage["id"] != nil {idd := int(jsonMessage["id"].(float64))value, ret := channel.PendingSent.Load(idd)if !ret {fmt.Printf("received Response does not match any sent Request")return}channel.PendingSent.Delete(idd)asyncReponse := value.(*MeetGo.AsyncSingal)if jsonMessage["accepted"] != nil && jsonMessage["accepted"].(bool) {MeetGo.Log.Debug("request succeeded [id:%d]", int(jsonMessage["id"].(float64)))sendReponse := MeetGo.SendReponse{ID: idd,Accepted: jsonMessage["accepted"].(bool),Data: jsonMessage["data"].(interface{}).(map[string]interface{}),}asyncReponse.Async <- sendReponse} else {MeetGo.Log.Debug("request failed [id:%d, reason: %s]", int(jsonMessage["id"].(float64)), jsonMessage["reason"].(string))sendReponse := MeetGo.SendReponse{ID: int(jsonMessage["id"].(float64)),Reason: jsonMessage["reason"].(string),}asyncReponse.Async <- sendReponse}} else if jsonMessage["targetId"] != nil && jsonMessage["event"] != nil {if jsonMessage["binary"] != nil {channel.LastBinaryNotification = jsonMessagereturn} else if jsonMessage["data"] != nil {listenerKey := fmt.Sprintf("%d", int(jsonMessage["targetId"].(float64)))channel.mutex.RLock()listener := channel.Listeners[listenerKey]channel.mutex.RUnlock()if listener != nil {listener.OnChannelEvent(jsonMessage["event"].(string), jsonMessage["data"].(map[string]interface{}))}} else {data := make(map[string]interface{})listenerKey := fmt.Sprintf("%d", int(jsonMessage["targetId"].(float64)))channel.mutex.RLock()listener := channel.Listeners[listenerKey]channel.mutex.RUnlock()if listener != nil {listener.OnChannelEvent(jsonMessage["event"].(string), data)}}} else {fmt.Printf("received message is not a Response nor a Notification")return}
}func (channel *Channel) onData(client *Client, message []byte, size int) {for {ret := channel.messageQueue.Push(message)if ret {break} else {time.Sleep(40 * time.Millisecond)}}
}func (channel *Channel) handle() {for {item := channel.queue.Pop()if item == nil {time.Sleep(40 * time.Millisecond)continue}channel.process(item)time.Sleep(1 * time.Millisecond)}
}func (channel *Channel) handleMessage() {ns := NetString{bufLen: 0, length: 0, state: 0}for {item := channel.messageQueue.Pop()if item == nil {time.Sleep(40 * time.Millisecond)continue}message := item.([]byte)var nsPayloads [][]byteerr := ns.NsUnmarshal(message, &nsPayloads)if err != nil {fmt.Printf("Channel handleMessage nsPayload error %s", err.Error())return}for _, nsPayload := range nsPayloads {channel.queue.Push(nsPayload)}time.Sleep(1 * time.Millisecond)}
}func (channel *Channel) process(data interface{}) {nsPayload := data.([]byte)if channel.LastBinaryNotification == nil {switch nsPayload[0] {case 123:channel.processMessage(string(nsPayload))breakcase 68:fmt.Printf(string(nsPayload))breakcase 87:fmt.Printf(string(nsPayload))breakcase 69:fmt.Printf(string(nsPayload))breakdefault:fmt.Printf("unexpected data: %s", string(nsPayload))}} else {msg := channel.LastBinaryNotificationchannel.LastBinaryNotification = niljsonMsg := make(map[string]interface{})err := json.Unmarshal([]byte(msg.(string)), &jsonMsg)if err != nil {panic(err)}listenerKey := fmt.Sprintf("%d", int(jsonMsg["targetId"].(float64)))channel.mutex.RLock()listener := channel.Listeners[listenerKey]channel.mutex.RUnlock()if listener != nil {listener.OnChannelStringEvent(jsonMsg["event"].(string), jsonMsg["data"].(string))}}
}func (channel *Channel) Close() {channel.PendingSent.Range(func(k, v interface{}) bool {channel.PendingSent.Delete(k)return true})registId := strconv.Itoa(channel.ID)Global_Worker.OnMediasoupWorkerOffline(registId)time.Sleep(time.Millisecond * 250) fmt.Printf("channel.MediasoupClient.Close() ")channel.MediasoupClient.Close()
}func (c *Channel) Request(method string, internal,data map[string]interface{}) (chan MeetGo.SendReponse, int64) {id := RandomNumberGenerator(10000000, 99999999)fmt.Printf("MediasoupLib Channel [method:%s, id:%d]", method, id)request := MeetGo.RequestJson{ID: id,Method: method,Internal: internal,Data: data,}requestJson := request.Encode()requestSend := nsEncode(requestJson)fmt.Printf("___requestSend : %s", requestSend)sendReponse := new(MeetGo.AsyncSingal)sendReponse.Async = make(chan MeetGo.SendReponse)if sendReponse != nil {c.PendingSent.Store(int(id), sendReponse)}defer c.MediasoupClient.Send(requestSend)return sendReponse.Async, id
}func (channel *Channel) SetUdp(udpAddress string, udpPort int) {channel.udpAddress = udpAddresschannel.udpPort = udpPort
}