golang實現mediasoup的tcp服務及channel通道

tcp模塊

定義相關類

  • Client:表示客戶端連接,包含網絡連接conn、指向服務器的指針Server和Channel指針c。
  • server:表示TCP服務器,包含服務器地址address、TLS配置config以及三個回調函數:
    • onNewClientCallback:新客戶端連接時調用。
    • onClientConnectionClosed:客戶端連接關閉時調用。
    • onNewMessage:客戶端接收新消息時調用。

客戶端相關接口

  • Client.listen():客戶端監聽方法,讀取連接數據,調用onNewMessage回調。
  • Client.Send(message string):發送文本消息給客戶端。
  • Client.SendBytes(b []byte):發送字節數據給客戶端。
  • Client.Conn():獲取客戶端的網絡連接。
  • Client.Close():關閉客戶端的網絡連接。

服務器相關接口

  • server.OnNewClient(callback func(c *Client)):設置新客戶端連接的回調。
  • server.OnClientConnectionClosed(callback func(c *Client, err error)):設置客戶端連接關閉的回調。
  • server.OnNewMessage(callback func(c *Client, message []byte, size int)):設置客戶端接收新消息的回調。
  • server.Listen():啟動網絡服務器,監聽連接。

服務器初始化

  • NewTcpServer(address string) *server:創建新的TCP服務器實例,不使用TLS。
  • NewTcpServerWithTLS(address, certFile, keyFile string) *server:創建帶有TLS功能的TCP服務器實例。

服務器啟動流程

  1. 使用NewTcpServer或NewTcpServerWithTLS創建服務器實例。
  2. 設置回調函數,響應新客戶端連接、客戶端連接關閉和接收新消息事件。
  3. 調用server.Listen()開始監聽連接。

TLS支持

  • 如果需要TLS,使用NewTcpServerWithTLS函數,提供證書和密鑰文件路徑。

參考demo

package MediasoupLibimport ("bufio""time""crypto/tls""log""net"
)// Client holds info about connectiontype Client struct {conn net.ConnServer *serverc *Channel
}// TCP server
type server struct {address string // Address to open connection: localhost:9999config *tls.ConfigonNewClientCallback func(c *Client)onClientConnectionClosed func(c *Client, err error)onNewMessage func(c *Client, message []byte, size int)
}// Read client data from channel
func (c *Client) listen() {fmt.Printf("tcp client listen() ")c.Server.onNewClientCallback(c)reader := bufio.NewReader(c.conn)for {recv := make([]byte, 1500) //MTU 1500size, err := reader.Read(recv)if err != nil {c.conn.Close()c.Server.onClientConnectionClosed(c, err)fmt.Printf("tcp client close! %s", err.Error())return}if size == 0 {time.Sleep(time.Millisecond * 250)fmt.Printf("tcp client recv size=0")continue}recv = recv[0:size]c.Server.onNewMessage(c, recv, size)}
}// Send text message to client
func (c *Client) Send(message string) error {return c.SendBytes([]byte(message))
}// Send bytes to client
func (c *Client) SendBytes(b []byte) error {_, err := c.conn.Write(b)if err != nil {c.conn.Close()c.Server.onClientConnectionClosed(c, err)}return err
}func (c *Client) Conn() net.Conn {return c.conn
}func (c *Client) Close() error {return c.conn.Close()
}// Called right after server starts listening new client
func (s *server) OnNewClient(callback func(c *Client)) {s.onNewClientCallback = callback
}// Called right after connection closed
func (s *server) OnClientConnectionClosed(callback func(c *Client, err error)) {s.onClientConnectionClosed = callback
}// Called when Client receives new message
func (s *server) OnNewMessage(callback func(c *Client, message []byte, size int)) {s.onNewMessage = callback
}// Listen starts network server
func (s *server) Listen() {var listener net.Listenervar err errorif s.config == nil {listener, err = net.Listen("tcp", s.address)} else {listener, err = tls.Listen("tcp", s.address, s.config)}if err != nil {fmt.Printf("Error starting TCP server.\r\n", err)}defer listener.Close()for {conn, err := listener.Accept()if err != nil {fmt.Printf("tcpserver listner Accept error:%s", err.Error())}client := &Client{conn: conn,Server: s,}go client.listen()}
}// Creates new tcp server instance
func NewTcpServer(address string) *server {fmt.Printf("Creating server with address %s", address)server := &server{address: address,}server.OnNewClient(func(c *Client) {c.c = NewChannel(c)})server.OnNewMessage(func(c *Client, message []byte, size int) {c.c.onData(c, message, size)})server.OnClientConnectionClosed(func(c *Client, err error) {c.c.Close()fmt.Printf("OnClientConnectionClosed   err = %s", err.Error())})return server
}func NewTcpServerWithTLS(address, certFile, keyFile string) *server {cert, err := tls.LoadX509KeyPair(certFile, keyFile)if err != nil {fmt.Printf("Error loading certificate files. Unable to create TCP server with TLS functionality.\r\n", err)}config := &tls.Config{Certificates: []tls.Certificate{cert},}server := NewTcpServer(address)server.config = configreturn server}

channel模塊

ChannelListener接口定義:

  • ChannelListener:定義了一個接口,包含兩個方法OnChannelEvent和OnChannelStringEvent,用于監聽通道事件。

Channel結構體:

  • 包含字段如MediasoupClient(指向Client的指針)、PendingSent(一個同步映射,用于存儲待發送的數據)、LastBinaryNotification、ID、Pid、udpAddress、udpPort、queue和messageQueue(兩個循環隊列)、Num和Listeners(一個映射,存儲監聽器)。
  • 包含一個互斥鎖mutex,用于并發控制。

Channel的接口:

  • NewChannel:構造函數,創建并返回一個新的Channel實例。
  • AddListener和RemoveListener:用于添加和移除監聽器。
  • processMessage:處理接收到的消息。
  • onData:處理接收到的數據。
  • handle:一個循環,從隊列中取出項目并處理。
  • handleMessage:處理消息隊列中的消息。
  • process:根據消息類型進行不同的處理。
  • Close:關閉通道,清理資源。
  • Request:發送請求并返回一個通道用于接收響應。
  • SetUdp:設置UDP地址和端口。

并發處理:

  • 使用sync.Map和sync.RWMutex來處理并發,確保數據的一致性和線程安全。

循環隊列:

  • 使用MeetGo.CycleQueue作為循環隊列,用于存儲消息和數據。

參考demo

import ("encoding/json""fmt""strconv""sync""time""strings"
)//###########SendRequest begin############/
var REQUEST_TIMEOUT = 30000type SendRequest struct {ID       stringMethod   stringInternal map[string]interface{}Data     map[string]interface{}
}//async chan SendReponse
type SendReponse struct {ID       intTargetId intEvent    stringAccepted boolRejected boolInternal map[string]interface{}Data     map[string]interface{}Reason   stringBinary   bool
}type AsyncSingal struct {Async chan SendReponse
}
###########SendRequest End############/////###########CycleQueue begin############//
type CycleQueue struct {data  []interface{} //存儲空間front int           //前指針,前指針負責彈出數據移動rear  int           //尾指針,后指針負責添加數據移動cap   int           //設置切片最大容量
}func NewCycleQueue(cap int) *CycleQueue {return &CycleQueue{data:  make([]interface{}, cap),cap:   cap,front: 0,rear:  0,}
}//入隊操作
//判斷隊列是否隊滿,隊滿則不允許添加數據
func (q *CycleQueue) Push(data interface{}) bool {//check queue is fullif (q.rear+1)%q.cap == q.front { //隊列已滿時,不執行入隊操作return false}q.data[q.rear] = data         //將元素放入隊列尾部q.rear = (q.rear + 1) % q.cap //尾部元素指向下一個空間位置,取模運算保證了索引不越界(余數一定小于除數)return true
}//出隊操作
//需要考慮: 隊隊為空沒有數據返回了
func (q *CycleQueue) Pop() interface{} {if q.rear == q.front {return nil}data := q.data[q.front]q.data[q.front] = nilq.front = (q.front + 1) % q.capreturn data
}//因為是循環隊列, 后指針減去前指針 加上最大值, 然后與最大值 取余
func (q *CycleQueue) QueueLength() int {return (q.rear - q.front + q.cap) % q.cap
}func (q *CycleQueue) FindDataByRequestId(requestId string) string {for i := 0; i < q.QueueLength(); i++ {if strings.Count(q.data[i].(string), requestId) == 1 {emitData := q.data[i].(string)q.data = append(q.data[:i], q.data[i+1:]...)return emitData}}return ""
}
///###########CycleQueue############import ("encoding/json""fmt""strconv""sync""time"MeetGo "vrv.meeting.server/MeetGo"
)const NS_MAX_SIZE int = 655350var messageBuffer = make([]byte, NS_MAX_SIZE)
var messageIndex = 0type ChannelListener interface {OnChannelEvent(string, map[string]interface{})OnChannelStringEvent(string, string)
}type Channel struct {MediasoupClient        *ClientPendingSent            sync.MapLastBinaryNotification interface{}ID                     intPid                    intudpAddress             stringudpPort                intqueue                  CycleQueuemessageQueue          CycleQueueNum                    intListeners              map[string]ChannelListenermutex                  sync.RWMutex
}func NewChannel(tcpClient *Client) *Channel {channel := new(Channel)channel.MediasoupClient = tcpClientchannel.queue = MeetGo.NewCycleQueue(1000)channel.messageQueue = MeetGo.NewCycleQueue(10000)channel.Num = 0channel.Listeners = make(map[string]ChannelListener, 100)go channel.handle()go channel.handleMessage()return channel
}func (channel *Channel) AddListener(id string, listener ChannelListener) {channel.mutex.Lock()channel.Listeners[id] = listenerchannel.mutex.Unlock()
}func (channel *Channel) RemoveListener(id string) {channel.mutex.Lock()delete(channel.Listeners, id)channel.mutex.Unlock()
}func (channel *Channel) processMessage(message string) {jsonMessage := make(map[string]interface{})err := json.Unmarshal([]byte(message), &jsonMessage)if err != nil {MeetGo.Log.Error("Channel processMessage error:%s", err.Error())return}if jsonMessage["registId"] != nil {MeetGo.Log.Debug("client registId succeeded [id:%s]", jsonMessage["registId"].(string))channel.ID, _ = strconv.Atoi(jsonMessage["registId"].(string))channel.Pid = int(jsonMessage["pid"].(float64))Global_Worker.OnMediasoupWorkerOnline(channel.ID, channel, jsonMessage["registId"].(string))} else if jsonMessage["id"] != nil {idd := int(jsonMessage["id"].(float64))value, ret := channel.PendingSent.Load(idd)if !ret {fmt.Printf("received Response does not match any sent Request")return}channel.PendingSent.Delete(idd)asyncReponse := value.(*MeetGo.AsyncSingal)if jsonMessage["accepted"] != nil && jsonMessage["accepted"].(bool) {MeetGo.Log.Debug("request succeeded [id:%d]", int(jsonMessage["id"].(float64)))sendReponse := MeetGo.SendReponse{ID:       idd,Accepted: jsonMessage["accepted"].(bool),Data:     jsonMessage["data"].(interface{}).(map[string]interface{}),}asyncReponse.Async <- sendReponse} else {MeetGo.Log.Debug("request failed [id:%d, reason: %s]", int(jsonMessage["id"].(float64)), jsonMessage["reason"].(string))sendReponse := MeetGo.SendReponse{ID:     int(jsonMessage["id"].(float64)),Reason: jsonMessage["reason"].(string),}asyncReponse.Async <- sendReponse}} else if jsonMessage["targetId"] != nil && jsonMessage["event"] != nil {if jsonMessage["binary"] != nil {channel.LastBinaryNotification = jsonMessagereturn} else if jsonMessage["data"] != nil {listenerKey := fmt.Sprintf("%d", int(jsonMessage["targetId"].(float64)))channel.mutex.RLock()listener := channel.Listeners[listenerKey]channel.mutex.RUnlock()if listener != nil {listener.OnChannelEvent(jsonMessage["event"].(string), jsonMessage["data"].(map[string]interface{}))}} else {data := make(map[string]interface{})listenerKey := fmt.Sprintf("%d", int(jsonMessage["targetId"].(float64)))channel.mutex.RLock()listener := channel.Listeners[listenerKey]channel.mutex.RUnlock()if listener != nil {listener.OnChannelEvent(jsonMessage["event"].(string), data)}}} else {fmt.Printf("received message is not a Response nor a Notification")return}
}func (channel *Channel) onData(client *Client, message []byte, size int) {for {ret := channel.messageQueue.Push(message)if ret {break} else {time.Sleep(40 * time.Millisecond)}}
}func (channel *Channel) handle() {for {item := channel.queue.Pop()if item == nil {time.Sleep(40 * time.Millisecond)continue}channel.process(item)time.Sleep(1 * time.Millisecond)}
}func (channel *Channel) handleMessage() {ns := NetString{bufLen: 0, length: 0, state: 0}for {item := channel.messageQueue.Pop()if item == nil {time.Sleep(40 * time.Millisecond)continue}message := item.([]byte)var nsPayloads [][]byteerr := ns.NsUnmarshal(message, &nsPayloads)if err != nil {fmt.Printf("Channel handleMessage nsPayload error %s", err.Error())return}for _, nsPayload := range nsPayloads {channel.queue.Push(nsPayload)}time.Sleep(1 * time.Millisecond)}
}func (channel *Channel) process(data interface{}) {nsPayload := data.([]byte)if channel.LastBinaryNotification == nil {switch nsPayload[0] {// 123 = '{' (a Channel JSON messsage).case 123:channel.processMessage(string(nsPayload))break// 68 = 'D' (a debug log).case 68:fmt.Printf(string(nsPayload))break// 87 = 'W' (a warning log).case 87:fmt.Printf(string(nsPayload))break// 69 = 'E' (an error log).case 69:fmt.Printf(string(nsPayload))breakdefault:fmt.Printf("unexpected data: %s", string(nsPayload))}} else {msg := channel.LastBinaryNotificationchannel.LastBinaryNotification = niljsonMsg := make(map[string]interface{})err := json.Unmarshal([]byte(msg.(string)), &jsonMsg)if err != nil {panic(err)}listenerKey := fmt.Sprintf("%d", int(jsonMsg["targetId"].(float64)))channel.mutex.RLock()listener := channel.Listeners[listenerKey]channel.mutex.RUnlock()if listener != nil {listener.OnChannelStringEvent(jsonMsg["event"].(string), jsonMsg["data"].(string))}}
}func (channel *Channel) Close() {channel.PendingSent.Range(func(k, v interface{}) bool {channel.PendingSent.Delete(k)return true})registId := strconv.Itoa(channel.ID)Global_Worker.OnMediasoupWorkerOffline(registId)time.Sleep(time.Millisecond * 250) //?fmt.Printf("channel.MediasoupClient.Close() ")channel.MediasoupClient.Close()
}func (c *Channel) Request(method string, internal,data map[string]interface{}) (chan MeetGo.SendReponse, int64) {id := RandomNumberGenerator(10000000, 99999999)fmt.Printf("MediasoupLib Channel [method:%s, id:%d]", method, id)request := MeetGo.RequestJson{ID:       id,Method:   method,Internal: internal,Data:     data,}requestJson := request.Encode()requestSend := nsEncode(requestJson)fmt.Printf("___requestSend : %s", requestSend)sendReponse := new(MeetGo.AsyncSingal)sendReponse.Async = make(chan MeetGo.SendReponse)if sendReponse != nil {c.PendingSent.Store(int(id), sendReponse)}defer c.MediasoupClient.Send(requestSend)return sendReponse.Async, id
}func (channel *Channel) SetUdp(udpAddress string, udpPort int) {channel.udpAddress = udpAddresschannel.udpPort = udpPort
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/16671.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/16671.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/16671.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

最大連續1的個數(滑動窗口)

算法原理&#xff1a; 這道題大眼一看是關于翻轉多少個0的問題&#xff0c;但是&#xff0c;如果你按照這種思維去做題&#xff0c;肯定不容易。所以我們要換一種思維去做&#xff0c;這種思維不是一下就能想到的&#xff0c;所以想不到也情有可原。 題目是&#xff1a;給定一…

Vue3:動態路由+子頁面(新增、詳情頁)動態路由配置(代碼全注釋)

文章目錄 實現思路調用后端接口獲取用戶權限獲取頁面權限動態綁定到路由對象中動態添加子頁面路由 實現思路 emm&#xff0c;項目中使用動態路由實現根據后端返回的用戶詳情信息&#xff0c;動態將該用戶能夠訪問的頁面信息&#xff0c;動態生成并且綁定到路由對象中。但是后…

如何從清空的回收站中恢復已刪除的Excel文件?

“嗨&#xff0c;幾天前我刪除了很多沒有備份的Excel文件。回收站已清空。當我意識到我犯了一個大錯誤時&#xff0c;所有的Excel文件都消失了&#xff0c;回收站里什么都沒有。清空回收站后是否可以恢復已刪除的 Excel 文件&#xff1f; 回收站是一種工具&#xff0c;可讓您在…

LeetCode 343. 整數拆分 (dp動態規劃)

343. 整數拆分 力扣題目鏈接(opens new window) 給定一個正整數 n&#xff0c;將其拆分為至少兩個正整數的和&#xff0c;并使這些整數的乘積最大化。 返回你可以獲得的最大乘積。 示例 1: 輸入: 2輸出: 1解釋: 2 1 1, 1 1 1。 示例 2: 輸入: 10輸出: 36解釋: 10 3 …

【openlayers系統學習】4.2Mapbox 樣式渲染圖層

二、Mapbox 樣式渲染圖層 顯然我們目前的地圖需要一些樣式。 VectorTile? 圖層的樣式與 Vector? 圖層的樣式工作方式完全相同。那里描述的樣式在這里也適用。 對于這樣的地圖&#xff0c;創建數據驅動的樣式&#xff08;對矢量圖層操作&#xff09;非常簡單。但矢量切片也用…

單兵組網設備+指揮中心:集群系統技術詳解

一、單兵設備功能特點 單兵組網設備是現代通信技術的重要成果&#xff0c;旨在為單個作戰或工作單元提供高效的通信和數據傳輸能力。其主要功能特點包括&#xff1a; 1. 便攜性&#xff1a;設備輕巧&#xff0c;便于單兵攜帶和使用&#xff0c;適應各種復雜環境。 2. 通信能…

簡述vue-router 組件復用導致路由參數失效怎么辦

當使用Vue Router時&#xff0c;組件復用可能會導致路由參數失效的問題。為了解決這個問題&#xff0c;我們可以采取以下策略&#xff1a; 1. 監聽路由變化 在Vue組件中&#xff0c;我們可以使用watch屬性來監聽$route對象的變化。當路由發生變化時&#xff0c;如果目標組件是…

第 8 章 機器人實體導航實現_路徑規劃(自學二刷筆記)

重要參考&#xff1a; 課程鏈接:https://www.bilibili.com/video/BV1Ci4y1L7ZZ 講義鏈接:Introduction Autolabor-ROS機器人入門課程《ROS理論與實踐》零基礎教程 9.3.5 導航實現05_路徑規劃 路徑規劃仍然使用 navigation 功能包集中的 move_base 功能包。 5.1編寫launch文…

PHP之fastadmin系統配置分組增加配置和使用

目錄 一、實現功能&#xff1a;fasttadmin實現添加系統配置分組和添加參數、使用 二、添加分組 三、配置分組參數 四、最終存儲位置 五、獲取配置參數 一、實現功能&#xff1a;fasttadmin實現添加系統配置分組和添加參數、使用 二、添加分組 在字典配置中找到分組對應鍵值…

linux系統——top資源管理器

在linux系統中&#xff0c;有類似于windows系統中的資源管理器&#xff0c;top用于實時的監控系統的任務執行狀態以及硬件配置信息 在linux中&#xff0c;輸入top命令&#xff0c;可以進入相應界面&#xff0c;在此界面可以使用一些指令進行操作 如&#xff0c;輸入z 可以改變…

終端安全管理系統、天銳DLP(數據泄露防護系統)| 數據透明加密保護,防止外泄!

終端作為企業員工日常辦公、數據處理和信息交流的關鍵工具&#xff0c;承載著企業運營的核心信息資產。一旦終端安全受到威脅&#xff0c;企業的敏感數據將面臨泄露風險&#xff0c;業務流程可能遭受中斷&#xff0c;甚至整個企業的運營穩定性都會受到嚴重影響。 因此&#xff…

【EVI】Hume AI 初探

寫在前面的話 Hume AI宣布已在B輪融資中籌集5000萬美元&#xff0c;由前Google DeepMind研究員Alan Cowen創立并擔任CEO。該AI模型專注于理解人類情感&#xff0c;并發布了「共情語音界面」演示&#xff0c;通過語音對話實現互動。從 Hume AI 官網展示的信息&#xff0c;EVI 能…

計算機視覺與深度學習實戰:以Python為工具,基于深度學習的汽車目標檢測

隨著人工智能技術的飛速發展,計算機視覺與深度學習已經成為當今科技領域的熱點。其中,汽車目標檢測作為自動駕駛、智能交通等系統的核心技術,受到了廣泛關注。本文將以Python為工具,探討基于深度學習的汽車目標檢測方法及其實戰應用。 一、計算機視覺與深度學習基礎 計算機…

力扣刷題--747. 至少是其他數字兩倍的最大數【簡單】

題目描述 給你一個整數數組 nums &#xff0c;其中總是存在 唯一的 一個最大整數 。 請你找出數組中的最大元素并檢查它是否 至少是數組中每個其他數字的兩倍 。如果是&#xff0c;則返回 最大元素的下標 &#xff0c;否則返回 -1 。 示例 1&#xff1a; 輸入&#xff1a;n…

Python-opencv通過距離變換提取圖像骨骼

文章目錄 距離變換distanceTransform函數 距離變換 如果把二值圖像理解成地形&#xff0c;黑色表示海洋&#xff0c;白色表示陸地&#xff0c;那么陸地上任意一點&#xff0c;到海洋都有一個最近的距離&#xff0c;如下圖所示&#xff0c;對于左側二值圖像來說&#xff0c;【d…

Gitee的原理及應用詳解(三)

本系列文章簡介&#xff1a; Gitee是一款開源的代碼托管平臺&#xff0c;是國內最大的代碼托管平臺之一。它基于Git版本控制系統&#xff0c;提供了代碼托管、項目管理、協作開發、代碼審查等功能&#xff0c;方便團隊協作和項目管理。Gitee的出現&#xff0c;在國內的開發者社…

漂流瓶掛機項目,聊天腳本賺錢新玩法,號稱單機30-50+ (教程+軟件)

一、項目簡介&#xff1a; 漂流瓶掛機項目主要是通過使用探遇漂流瓶、音麥漂流瓶等聊天軟件&#xff0c;為用戶提供一個聊天賺錢的平臺。男性用戶需要充值后才能發送消息&#xff0c;而女性用戶則可以通過接收消息賺取分紅。男性用戶發送給女性用戶的消息費用大約在.1-.2元之間…

VScode中對git的學習筆記

1.git是什么&#xff1f; Git是一個功能強大的分布式版本控制系統&#xff0c;由Linux內核的創始人Linus Torvalds在2005年創建。它以其速度、數據完整性和支持大型項目的能力而聞名&#xff0c;被廣泛應用于軟件開發中。Git允許開發者在本地機器上擁有完整的代碼庫副本&#x…

讀書筆記分享

1.蘇格拉底只在需要的時候才索取&#xff0c;那樣便能以最少的物質滿足自身的要求。他認為每個人都天生體質脆弱&#xff0c;只有在貧乏的環境中才會鍛煉地強壯起來。生活中的大多數人認為&#xff0c;奢華才是幸福的生活。無休止的物質積聚&#xff0c;讓人們每天生活在一個內…

2024-05-27 blue-vh-問題點

摘要: 2024-05-27 思考-日記-問題點 問題點: 一. 同步接口的并發問題 接口調用是在客戶端的的上下文&#xff0c;無論是線程&#xff0c;協程&#xff0c;是在客戶端的執行上下文里面同步的話&#xff0c;是同步客戶端的調用接口的上下文&#xff0c;阻塞的是客戶端的上下文&a…