GB28181學習(十七)——基于jrtplib實現tcp被動和主動發流

前言

GB/T28181-2022實時流的傳輸方式介紹:https://blog.csdn.net/www_dong/article/details/134255185

基于jrtplib實現tcp被動和主動收流介紹:https://blog.csdn.net/www_dong/article/details/134451387

本文主要介紹下級平臺或設備發流功能,用于對接特定的SIP服務器或上級平臺。

UDP發流

流程圖

在這里插入圖片描述

發送端流程

  • 初始化rtp參數;
  • 裸流數據做PS復用;
  • 組RTP包發送;

設計

  1. 初始化rtp參數
int CUdp::InitRtp_()
{RTPSessionParams sessionParams;sessionParams.SetMinimumRTCPTransmissionInterval(10);sessionParams.SetOwnTimestampUnit(1.0 / 90000.0);sessionParams.SetAcceptOwnPackets(true);sessionParams.SetMaximumPacketSize(1450);RTPUDPv4TransmissionParams transParams;transParams.SetRTPSendBuffer(2*1024*1024);transParams.SetBindIP(m_ip);transParams.SetPortbase((uint16_t)m_port);if (0 != Create(sessionParams, &transParams)){return -1;}SetDefaultPayloadType((uint8_t)m_payload);SetDefaultTimestampIncrement(3600);SetDefaultMark(true);RTPIPv4Address addr(ntohl(inet_addr(m_ip), (uint16_t)m_port);if(0 != AddDestination(addr)){return -1;}return 0;
}
  1. 流數據復用為PS
// 使用ireader開源庫進行ps復用
// 初始化
CData2PS::CData2PS()
{struct ps_muxer_func_t func;func.alloc = Alloc;func.free = Free;func.write = Packet;m_ps = ps_muxer_create(&func, this);// TODO codecid待補充m_ps_stream = ps_muxer_add_stream(m_ps, PSI_STREAM_H264, nullptr, 0);
}// 塞數據
int CData2PS::InputData(void* data, int len)
{if (!m_ps)return -1;uint64_t clock = time64_now();if (0 == m_ps_clock)m_ps_clock = clock;return ps_muxer_input(m_ps, m_ps_stream, 0, (clock - m_ps_clock) * 90, (clock - m_ps_clock) * 90, data, len);
}
  1. 發送rtp包
// 調用jrtplib中SendPacket(data, len);接口發送數據// 以下為SendPacket部分源碼
// 主要流程:
// 1. 構建packet
// 2. 發送rtp數據
int RTPSession::SendPacket(const void *data,size_t len,uint8_t pt,bool mark,uint32_t timestampinc)
{int status;if (!created)return ERR_RTP_SESSION_NOTCREATED;BUILDER_LOCKif ((status = packetbuilder.BuildPacket(data,len,pt,mark,timestampinc)) < 0){BUILDER_UNLOCKreturn status;}if ((status = SendRTPData(packetbuilder.GetPacket(),packetbuilder.GetPacketLength())) < 0){BUILDER_UNLOCKreturn status;}BUILDER_UNLOCKSOURCES_LOCKsources.SentRTPPacket();SOURCES_UNLOCKPACKSENT_LOCKsentpackets = true;PACKSENT_UNLOCKreturn 0;
}// 構建包
int RTPPacketBuilder::PrivateBuildPacket(const void *data,size_t len,uint8_t pt,bool mark,uint32_t timestampinc,bool gotextension,uint16_t hdrextID,const void *hdrextdata,size_t numhdrextwords)
{RTPPacket p(pt,data,len,seqnr,timestamp,ssrc,mark,numcsrcs,csrcs,gotextension,hdrextID,(uint16_t)numhdrextwords,hdrextdata,buffer,maxpacksize,GetMemoryManager());int status = p.GetCreationError();if (status < 0)return status;packetlength = p.GetPacketLength();if (numpackets == 0) // first packet{lastwallclocktime = RTPTime::CurrentTime();lastrtptimestamp = timestamp;prevrtptimestamp = timestamp;}else if (timestamp != prevrtptimestamp){lastwallclocktime = RTPTime::CurrentTime();lastrtptimestamp = timestamp;prevrtptimestamp = timestamp;}numpayloadbytes += (uint32_t)p.GetPayloadLength();numpackets++;timestamp += timestampinc;seqnr++;return 0;
}// 發送包
int RTPSession::SendRTPData(const void *data, size_t len)
{if (!m_changeOutgoingData)return rtptrans->SendRTPData(data, len);void *pSendData = 0;size_t sendLen = 0;int status = 0;status = OnChangeRTPOrRTCPData(data, len, true, &pSendData, &sendLen);if (status < 0)return status;if (pSendData){status = rtptrans->SendRTPData(pSendData, sendLen);OnSentRTPOrRTCPData(pSendData, sendLen, true);}return status;
}// 底層實現
int RTPUDPv4Transmitter::SendRTPData(const void *data,size_t len)	
{if (!init)return ERR_RTP_UDPV4TRANS_NOTINIT;MAINMUTEX_LOCKif (!created){MAINMUTEX_UNLOCKreturn ERR_RTP_UDPV4TRANS_NOTCREATED;}if (len > maxpacksize){MAINMUTEX_UNLOCKreturn ERR_RTP_UDPV4TRANS_SPECIFIEDSIZETOOBIG;}destinations.GotoFirstElement();while (destinations.HasCurrentElement()){// 調用sendto函數實現udp包的發送sendto(rtpsock,(const char *)data,len,0,(const struct sockaddr *)destinations.GetCurrentElement().GetRTPSockAddr(),sizeof(struct sockaddr_in));destinations.GotoNextElement();}MAINMUTEX_UNLOCKreturn 0;
}

tcp passive發流

流程圖

在這里插入圖片描述

發送端流程:

  • 上級平臺或sip服務器以主動方式連接,對于下級平臺或者設備(數據發送端)為被動方式;
  • 下級平臺或者設備(數據發送端)啟動端口監聽;
  • 接收上級平臺或sip服務器tcp連接請求;
  • 向上級平臺或sip服務器發送流數據;

設計

  1. 創建socket、bind、listen,啟動數據接收線程;
// TcpServer為封裝的socket類int CGBTcpServer::Start()
{if (0 != m_localPort || m_tcpServer.get())return 0;int ret = -1;do {m_tcpServer = std::make_shared<TcpServer>(nullptr, this);if (!m_tcpServer.get())break;ret = m_tcpServer->TcpCreate();if (0 != ret)break;ret = m_tcpServer->TcpBind(m_localPort);if (0 != ret)break;ret = m_tcpServer->TcpListen(5);if (0 != ret)break;m_thread = std::thread(TCPData2PSThread, this);return 0;} while (0);Stop();return ret;
}
  1. 在線程內等待連接,連接成功后接收數據并回調至應用層處理
void CGBTcpServer::TCPData2PSWorker()
{if (!m_pspacker)m_pspacker = new(std::nothrow) CData2PS(PSTCPDataCB, this);bool bAccept = false;while (m_running){if (!bAccept){if (0 == m_tcpServer->TcpAccept()){bAccept = true;if (0 != InitRtp_()){break;}}continue;}std::this_thread::sleep_for(std::chrono::seconds(1));}
}
  1. 初始化rtp參數
int CGBTcpServer::InitRtp_()
{const int packetSize = 45678;RTPSessionParams sessionparams;sessionparams.SetProbationType(RTPSources::NoProbation);sessionparams.SetOwnTimestampUnit(1.0 / packetSize);sessionparams.SetMaximumPacketSize(packetSize + 64);m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);m_rtpTcpTransmitter->Init(true);m_rtpTcpTransmitter->Create(65535, 0);int status = Create(sessionparams, m_rtpTcpTransmitter);if (status < 0){return status;}status = AddDestination(RTPTCPAddress(m_tcpServer->GetClientSocket()));if (0 != status)return status;SetDefaultPayloadType(96);SetDefaultMark(false);SetDefaultTimestampIncrement(160);return 0;
}
  1. 將數據復用為PS;
  2. tcp方式發包
// 調用jrtplib中SendPacket(data, len);接口發送數據// 以下為tcp方式SendPacket部分源碼
int RTPTCPTransmitter::SendRTPData(const void *data,size_t len)	
{return SendRTPRTCPData(data, len);
}int RTPTCPTransmitter::SendRTPRTCPData(const void *data, size_t len)
{if (!m_init)return ERR_RTP_TCPTRANS_NOTINIT;MAINMUTEX_LOCKif (!m_created){MAINMUTEX_UNLOCKreturn ERR_RTP_TCPTRANS_NOTCREATED;}// #define RTPTCPTRANS_MAXPACKSIZE							65535if (len > RTPTCPTRANS_MAXPACKSIZE){MAINMUTEX_UNLOCKreturn ERR_RTP_TCPTRANS_SPECIFIEDSIZETOOBIG;}std::map<SocketType, SocketData>::iterator it = m_destSockets.begin();std::map<SocketType, SocketData>::iterator end = m_destSockets.end();vector<SocketType> errSockets;int flags = 0;
#ifdef RTP_HAVE_MSG_NOSIGNALflags = MSG_NOSIGNAL;
#endif // RTP_HAVE_MSG_NOSIGNALwhile (it != end){uint8_t lengthBytes[2] = { (uint8_t)((len >> 8)&0xff), (uint8_t)(len&0xff) };SocketType sock = it->first;// 調用send接口發送數據// 1. 先發送2字節頭(固定格式)// 2. 再發送數據if (send(sock,(const char *)lengthBytes,2,flags) < 0 ||send(sock,(const char *)data,len,flags) < 0)errSockets.push_back(sock);++it;}MAINMUTEX_UNLOCKif (errSockets.size() != 0){for (size_t i = 0 ; i < errSockets.size() ; i++)OnSendError(errSockets[i]);}// Don't return an error code to avoid the poll thread exiting// due to one closed connection for examplereturn 0;
}

tcp active發流

流程圖

在這里插入圖片描述

發送端流程:

  • 上級平臺或sip服務器啟動tcp監聽連接,對于下級平臺或者設備(數據發送端)為主動方式;
  • 下級平臺或者設備(數據發送端)發起tcp連接;
  • 接收上級平臺或sip服務器tcp響應;
  • 向上級平臺或sip服務器發送流數據;

設計

  1. 創建socket、connect、初始化rtp,啟動數據接收線程
// TcpClient為封裝的客戶端socket類int CGBTcpClient::Start()
{if (0 != m_localPort || m_tcpClient.get())return 0;int ret = -1;do{m_tcpClient = std::make_shared<TcpClient>(nullptr, this);if (!m_tcpClient.get() || 0 != m_tcpClient->TcpCreate())break;ret = m_tcpClient->TcpConnectByTime(m_localIP.c_str(), m_localPort, 5);if (0 != ret)break;ret = InitRtp_();if (0 != ret)break;m_thread = std::thread(RTPPackerThread, this);return 0;} while (0);Stop();return ret;
}
  1. 初始化rtp參數
int CGBTcpClient::InitRtp_()
{const int packSize = 45678;RTPSessionParams sessionParams;sessionParams.SetProbationType(RTPSources::NoProbation);sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);sessionParams.SetMaximumPacketSize(packSize + 64);m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);m_rtpTcpTransmitter->Init(true);m_rtpTcpTransmitter->Create(65535, 0);if (0 != Create(sessionParams, m_rtpTcpTransmitter))return -1;if (0 != AddDestination(RTPTCPAddress(m_tcpClient->GetClientSocket())))return -1;return 0;
}
  1. 視音頻數據復用為PS
  2. 發送數據,同tcp passive發流

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/160646.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/160646.shtml
英文地址,請注明出處:http://en.pswp.cn/news/160646.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

生活如果真能像隊列一樣的話

生活如果真能像隊列一樣&#xff0c;那該多好啊。 —————————————————————————————————————————— 背包&#xff0c;隊列 可以先看他們的API&#xff1a;都含有一個無參構造函數&#xff0c;添加單個元素的方法&#xff0c;測試集合…

php項目從寶塔面板切換轉到phpstudy小皮面板

寶塔面板轉phpstudy面板 版本 寶塔面板8.0.1 phpstudy面板8.1.1.3 步驟 1、寶塔面板,找到項目文件夾,打包、下載到本地、解壓 2、本地windows系統安裝phpstudy面板,選擇盡可能一樣的配置 比如寶塔php7.4.33,可能phpstudy面板只有php7.4.3,也行 大環境一定要一致,比如…

力扣算法練習BM46—最小的K個數

題目 給定一個長度為 n 的可能有重復值的數組&#xff0c;找出其中不去重的最小的 k 個數。例如數組元素是4,5,1,6,2,7,3,8這8個數字&#xff0c;則最小的4個數字是1,2,3,4(任意順序皆可)。 數據范圍&#xff1a;0≤k,n≤10000&#xff0c;數組中每個數的大小0≤val≤1000 要…

linux signal 機制

ref&#xff1a; Linux操作系統學習筆記&#xff08;十六&#xff09;進程間通信之信號 | Ty-Chens Home https://www.cnblogs.com/renxinyuan/p/3867593.html 當執行kill -9 PID時系統發生了什么 -

Codeforces Round 910 (Div. 2) D. Absolute Beauty

D. Absolute Beauty 有兩個長度為 n n n 的整數數組 a 1 , a 2 , … , a n a_1,a_2,\ldots,a_n a1?,a2?,…,an? 和 b 1 , b 2 , … , b n b_1,b_2,\ldots,b_n b1?,b2?,…,bn? 。他將數組 b b b 的美麗值定義為 ∑ i 1 n ∣ a i ? b i ∣ . \sum_{i1}^{n} |a_i - b…

基于材料生成算法優化概率神經網絡PNN的分類預測 - 附代碼

基于材料生成算法優化概率神經網絡PNN的分類預測 - 附代碼 文章目錄 基于材料生成算法優化概率神經網絡PNN的分類預測 - 附代碼1.PNN網絡概述2.變壓器故障診街系統相關背景2.1 模型建立 3.基于材料生成優化的PNN網絡5.測試結果6.參考文獻7.Matlab代碼 摘要&#xff1a;針對PNN神…

JDK命令使用總結

目錄 javacjava javac 將源碼(*.java)編譯成字節碼(*.class) javac HelloWorld.javajava 運行字節碼(*.class) 不能加后綴名 java HelloWorld直接運行單文件源碼(*.java) Java11以上才支持 java HelloWorld.java

ROSNS3(一)

https://github.com/malintha/rosns3 第一步&#xff1a;clone和構建rosns3客戶端 第二步&#xff1a;運行 最詳細的ubuntu 安裝 docker教程 - 知乎 1. unable to find source space /home/muta/src 解決方法&#xff1a; 將副將將碰到的bug&#xff0c;解決方法_#include &…

【C++ Primer Plus學習記錄】遞增運算符(++)和遞減運算符(--)

遞增運算符&#xff08;&#xff09;和遞減運算符&#xff08;--&#xff09;&#xff1a;前綴版本位于操作數前面&#xff0c;如x&#xff1b;后綴版本位于操作數后面&#xff0c;如x。兩個版本對操作數的影響是一樣的&#xff0c;但是影響的時間不同。這就像吃飯前買單和吃飯…

Python從零開始快速搭建一個語音對話機器人

文章目錄 01-初心緣由02-準備工作03-語音機器人的搭建思路04-語音生成音頻文件05-音頻文件轉文字STT06-與圖靈機器人對話07-文字轉語音08-語音對話機器人的完整代碼09-結束語10-有問必答關于Python技術儲備一、Python所有方向的學習路線二、Python基礎學習視頻三、精品Python學…

SSH連接遠程服務器報錯:WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED 解決方法

一.錯誤描述 報錯信息里提示了路徑信息/root/.ssh/known_hosts:20 二.解決方案 方法一 輸入以下指令&#xff1a; ssh-keygen -R XXX&#xff08;需要連接遠程服務器的ip&#xff09; 按照我的例子ip:10.165.7.136&#xff0c;會返回以下信息: 重新嘗試連接&#xff1a; 輸…

C++學習 --set

目錄 1&#xff0c; 什么是set 2&#xff0c; 創建set 2-1&#xff0c; 標準數據類型 2-2&#xff0c; 自定義數據類型 2-3&#xff0c; 其他創建方式 3&#xff0c; 操作set 3-1&#xff0c; 賦值 3-2&#xff0c; 添加元素&#xff08;insert&#xff09; 3-2-1&…

MySQL的樂觀鎖和悲觀鎖

1、樂觀鎖&#xff1a; 樂觀鎖在操作數據的時候&#xff0c;是保持一種樂觀的狀態&#xff0c;認為別的線程是不會同時修改數據的&#xff0c;所以是不會上鎖的&#xff0c;但是在更新的時候&#xff0c;會判斷一下在這個期間內是否有別的線程修改過數據。 主要的流程&#x…

規劃類3d全景線上云展館幫助企業輕松拓展海外市場

科技3D線上云展館作為一種基于VR虛擬現實和互聯網技術的新一代展覽平臺。可以在線上虛擬空間中模擬真實的展館&#xff0c;讓觀眾無需親自到場&#xff0c;即可獲得沉浸式的參觀體驗。通過這個展館&#xff0c;您可以充分、全面、立體展示您的產品、服務以及各種創意作品&#…

python運算符重載之成員關系和屬性運算

1 python運算符重載之成員關系和屬性運算 1.1 重載成員關系運算符 1.1.1 contains,iter,getitem python使用成員關系運算符in時&#xff0c; 按優先級調用方法&#xff1a;contains>iter>getitem&#xff1b; class MyIters:def __init__(self,value):self.datavalu…

2023年【安全生產監管人員】考試題及安全生產監管人員找解析

題庫來源&#xff1a;安全生產模擬考試一點通公眾號小程序 安全生產監管人員考試題參考答案及安全生產監管人員考試試題解析是安全生產模擬考試一點通題庫老師及安全生產監管人員操作證已考過的學員匯總&#xff0c;相對有效幫助安全生產監管人員找解析學員順利通過考試。 1、…

【樹莓派】Camera Module 使用

工具 RPI4RPI Camera Module 2 硬件安裝 直接插到板子的相機帶子插口上即可 使用前提 libcamera-hello運行這個命令能夠成功&#xff0c;否則需要裝相應的包 在 RPI4 上測試 libcamera-jpeg -o 00001.jpg -t 2000 --width 640 --height 480t 表示程序運行&#xff08;預…

SA8255 Q+A android 登錄QNX

需要工具busybox 130|gen4_gvm:/ # cd /data/ gen4_gvm:/data # ./busybox telnet 192.168.1.1Entering character mode Escape character is ^].QNX Neutrino (localhost) (ttyp0)login: root No home directory. Logging in with home "/". # 用戶名&#xff1a…

數據結構-棧的實現

1.棧的概念及結構 棧&#xff1a;一種特殊的線性表&#xff0c;其只允許在固定的一端進行插入和刪除元素操作。進行數據插入和刪除操作的一端稱為棧頂&#xff0c;另一端稱為棧底。棧中的數據元素遵守后進先出LIFO&#xff08;Last In First Out&#xff09;的原則。 壓棧&…