本文主要解析一下SRS3.0 service部分源碼,主要和srs_service_st模塊。
srs_service_st
模塊包含了網絡服務的基礎實現,特別是與套接字(sockets)和網絡通信相關的功能。主要功能和特點包括:
(1)初始化和關閉:?srs_st_init?和?srs_close_stfd?函數用于初始化網絡服務和關閉套接字。
(2)錯誤處理: 使用?srs_error_t?類型來處理和報告錯誤。
(3)網絡選項設置: 設置套接字選項。
(4)TCP 和 UDP 監聽:?srs_tcp_listen?和?srs_udp_listen?函數用于創建 TCP 和 UDP 監聽服務。
? ? ? ? ?連接管理:?SrsTcpClient?類提供了 TCP 客戶端連接的管理,包括連接、讀取、寫入和關閉連? ? ? ? ? ?接。套接字操作: 提供了對套接字文件描述符(srs_netfd_t)的操作,如打開、關閉、讀取? ? ? ? ? ? ?和寫入。
頭文件定義:
(1)對線程/協程的API的抽象
// 存儲線程的句柄
typedef void* srs_thread_t;
// 條件變量
typedef void* srs_cond_t;
// 鎖
typedef void* srs_mutex_t;
// 獲取線程/協程句柄
extern srs_thread_t srs_thread_self();// 條件變量操作
// 創建一個新的條件變量實例
extern srs_cond_t srs_cond_new();
// 銷毀給定的條件變量
extern int srs_cond_destroy(srs_cond_t cond);
// 使當前線程等待條件變量cond。線程會釋放鎖并阻塞,直到其他線程通過srs_cond_signal或srs_cond_broadcast喚醒它
extern int srs_cond_wait(srs_cond_t cond);
// 帶超時的等待版本,如果在指定的timeout時間內條件未被滿足,則函數會返回。
extern int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout);
// 喚醒一個等待在條件變量cond上的線程。如果沒有線程等待,則什么也不做。
extern int srs_cond_signal(srs_cond_t cond);// 互斥鎖操作
// 創建一個新的互斥鎖實例。
extern srs_mutex_t srs_mutex_new();
// 銷毀給定的互斥鎖
extern int srs_mutex_destroy(srs_mutex_t mutex);
// 加鎖操作,如果鎖已被持有,則調用線程將阻塞,直到鎖可用
extern int srs_mutex_lock(srs_mutex_t mutex);
// 解鎖操作,釋放由當前線程持有的鎖
extern int srs_mutex_unlock(srs_mutex_t mutex);
(2)對網絡相關的API進一步封裝
這些代碼片段來自SRS(Simple Realtime Server)項目,涉及網絡編程的基礎操作,包括初始化、文件描述符操作、TCP/UDP連接與監聽、IO操作以及超時處理等。以下是各部分的注釋說明:```cpp
// 定義網絡文件描述符類型,通常用于表示網絡連接。
typedef void* srs_netfd_t;// 初始化ST(可能是底層事件處理庫,如libevent),要求使用epoll作為事件驅動機制。
extern srs_error_t srs_st_init();// 關閉網絡文件描述符stfd,同時確保底層的文件描述符也被關閉。
extern void srs_close_stfd(srs_netfd_t& stfd);// 設置文件描述符fd的FD_CLOEXEC標志,使得在執行exec函數族時自動關閉該文件描述符。
extern srs_error_t srs_fd_closeexec(int fd);// 設置文件描述符fd的SO_REUSEADDR選項,允許立即綁定到最近使用的且處于TIME_WAIT狀態的端口。
extern srs_error_t srs_fd_reuseaddr(int fd);// 設置文件描述符fd的SO_REUSEPORT選項,允許多個套接字綁定到同一個端口上。
extern srs_error_t srs_fd_reuseport(int fd);// 設置文件描述符fd的SO_KEEPALIVE選項,啟用TCP Keepalive機制,檢測連接是否存活。
extern srs_error_t srs_fd_keepalive(int fd);// 客戶端發起TCP連接到指定服務器,server為服務器地址,port為端口號,tm為連接超時時間,成功后pstfd中存放新連接的網絡文件描述符。
extern srs_error_t srs_tcp_connect(std::string server, int port, srs_utime_t tm, srs_netfd_t* pstfd);// 服務器端監聽TCP端點,ip為監聽地址,port為端口號,成功后pfd中存放監聽的網絡文件描述符。
extern srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd);// 服務器端監聽UDP端點,與srs_tcp_listen類似,但針對UDP協議。
extern srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd);// 獲取網絡文件描述符對應的底層文件描述符。
extern int srs_netfd_fileno(srs_netfd_t stfd);// 暫停當前線程執行指定的微秒數。
extern int srs_usleep(srs_utime_t usecs);// 根據現有的操作系統文件描述符osfd創建一個網絡文件描述符。
extern srs_netfd_t srs_netfd_open_socket(int osfd);// 直接從現有操作系統文件描述符osfd創建一個網絡文件描述符,不特指socket。
extern srs_netfd_t srs_netfd_open(int osfd);// 接收來自stfd的數據,支持UDP,buf為接收緩沖區,len為緩沖區大小,from為發送方地址信息,fromlen為發送方地址長度,tm為超時時間。
extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout);// 接受stfd上的連接請求,針對TCP服務器,成功后返回新的連接文件描述符及客戶端地址信息。
extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);// 從stfd讀取數據,nbyte指定要讀取的字節數,tm為讀取操作的超時時間。
extern ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout);// 判斷給定的超時時間tm是否表示永不超時。
extern bool srs_is_never_timeout(srs_utime_t tm);
```這些函數和類型為SRS框架提供了基礎的網絡通信能力,包括連接建立、數據收發、事件監聽與超時處理,是構建高性能網絡應用不可或缺的部分。
(3)實現自動管理互斥鎖的功能
// 快速創建鎖,##instance是宏參數,會被替換為&instance并且和_srs_auto_free_拼接
#define SrsLocker(instance) \impl__SrsLocker _srs_auto_free_##instance(&instance)class impl__SrsLocker
{
private:srs_mutex_t* lock; // 指向互斥鎖的指針
public:// 構造函數,初始化時加鎖impl__SrsLocker(srs_mutex_t* l) {lock = l;int r0 = srs_mutex_lock(*lock); // 加鎖srs_assert(!r0); // 斷言加鎖成功}// 析構函數,自動解鎖virtual ~impl__SrsLocker() {int r0 = srs_mutex_unlock(*lock); // 解鎖srs_assert(!r0); // 斷言解鎖成功}
};
(4)網絡通信。
// SrsStSocket 類實現了基于ST(Simple Transport)的TCP套接字封裝,
// 提供了同步的socket通信機制,集成于ISrsProtocolReadWriter接口,用于網絡數據的讀寫操作。class SrsStSocket : public ISrsProtocolReadWriter
{
private:// 接收和發送超時時間,單位為微秒(srs_utime_t)。// 使用SRS_UTIME_NO_TIMEOUT表示無超時。srs_utime_t rtm; // 接收超時時間srs_utime_t stm; // 發送超時時間// 已接收和已發送的數據量,單位為字節。int64_t rbytes; // 已接收的總字節數int64_t sbytes; // 已發送的總字節數// 底層ST網絡文件描述符,用于實際的I/O操作。srs_netfd_t stfd;public:// 默認構造函數SrsStSocket();// 析構函數,釋放資源virtual ~SrsStSocket();public:// 初始化函數,使用給定的st網絡文件描述符fd來設置套接字。// 注意:用戶需自行管理fd的生命期。virtual srs_error_t initialize(srs_netfd_t fd);public:// 設置接收超時時間virtual void set_recv_timeout(srs_utime_t tm);// 獲取當前接收超時時間設置virtual srs_utime_t get_recv_timeout();// 設置發送超時時間virtual void set_send_timeout(srs_utime_t tm);// 獲取當前發送超時時間設置virtual srs_utime_t get_send_timeout();// 獲取已接收的總字節數virtual int64_t get_recv_bytes();// 獲取已發送的總字節數virtual int64_t get_send_bytes();public:// 從套接字讀取數據到buf,可指定實際讀取的字節數。// @param buf 目標緩沖區// @param size 緩沖區大小// @param nread 輸出參數,實際讀取的字節數,可選virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);// 確保從套接字完全讀取size個字節到buf。// @param buf 目標緩沖區// @param size 必須讀取的字節數// @param nread 輸出參數,實際讀取的字節數,可選virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);// 向套接字寫入數據,可指定實際寫入的字節數。// @param buf 源數據緩沖區// @param size 要寫入的數據大小// @param nwrite 輸出參數,實際寫入的字節數,可選virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);// 批量寫入io矢量數據到套接字,常用于一次寫多個緩沖區數據。// @param iov 指向io矢量數組的指針// @param iov_size io矢量數組的元素數量// @param nwrite 輸出參數,實際寫入的總字節數,可選virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};// SrsTcpClient 類實現了ISrsProtocolReadWriter接口,專用于創建TCP客戶端連接。
class SrsTcpClient : public ISrsProtocolReadWriter
{
private:// 網絡文件描述符,用于表示與服務器的TCP連接。srs_netfd_t stfd;// 持有一個SrsStSocket實例,用于實際的TCP套接字讀寫操作。SrsStSocket* io;// 服務器的主機名或IP地址。std::string host;// 服務器監聽的端口號。int port;// 連接超時時間,單位為srs_utime_t。srs_utime_t timeout;public:// 構造函數,初始化TCP客戶端實例。// @param h 服務器的IP地址或主機名。// @param p 服務器端口號。// @param tm 連接超時時間。SrsTcpClient(std::string h, int p, srs_utime_t tm);// 析構函數,釋放資源。virtual ~SrsTcpClient();public:// 建立與服務器的TCP連接。// @remark 在嘗試連接前,會先關閉現有的連接(如果有)。virtual srs_error_t connect();private:// 關閉與服務器的連接。// @remark 用戶在調用此方法后不應再使用該客戶端實例。virtual void close();// 以下為ISrsProtocolReadWriter接口的實現
public:// 設置接收數據的超時時間。virtual void set_recv_timeout(srs_utime_t tm);// 獲取當前接收數據的超時時間設置。virtual srs_utime_t get_recv_timeout();// 設置發送數據的超時時間。virtual void set_send_timeout(srs_utime_t tm);// 獲取當前發送數據的超時時間設置。virtual srs_utime_t get_send_timeout();// 獲取已接收的數據字節數。virtual int64_t get_recv_bytes();// 獲取已發送的數據字節數。virtual int64_t get_send_bytes();// 從連接中讀取數據。// @param buf 存儲讀取數據的緩沖區。// @param size 緩沖區大小。// @param nread 實際讀取的字節數,輸出參數。virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);// 確保讀取指定大小的數據到緩沖區。// @param buf 存儲讀取數據的緩沖區。// @param size 需要讀取的字節數。// @param nread 實際讀取的字節數,輸出參數。virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);// 向連接寫入數據。// @param buf 包含待寫數據的緩沖區。// @param size 緩沖區中數據的大小。// @param nwrite 實際寫入的字節數,輸出參數。virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);// 批量寫入數據到連接。// @param iov 指向iovec結構體數組的指針,每個結構體描述一塊緩沖區。// @param iov_size 數組中iovec結構體的數量。// @param nwrite 實際寫入的總字節數,輸出參數。virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
源文件實現:
檢測是否支持epoll,因為有些老的系統可能不支持epoll
// 設置服務器監聽socket的backlog大小為512,這是一個推薦值,用以保證服務器能夠有效管理待處理的連接請求隊列,
// 特別是在高并發情況下。這個值也是nginx等高性能服務器常用的配置。
#define SERVER_LISTEN_BACKLOG 512// 下面的代碼塊僅在Linux系統上編譯和執行。
#ifdef __linux__// 引入epoll相關的頭文件,epoll是Linux特有的I/O多路復用技術,適用于處理大量并發的文件描述符。
#include <sys/epoll.h>// 檢查當前Linux系統是否支持epoll功能。
// 這個函數通過嘗試執行一個不可能成功的epoll_ctl調用來間接判斷。
// 如果系統支持epoll,調用應該失敗但是錯誤碼不應該是表示函數未實現的ENOSYS。
bool srs_st_epoll_is_supported(void)
{// 初始化一個epoll_event結構體,設置其感興趣的事件為EPOLLIN(表示關心讀事件),// data.ptr字段設置為NULL,因為這次調用只是試探性檢查并不關心事件的具體處理。struct epoll_event ev;ev.events = EPOLLIN;ev.data.ptr = NULL;// 嘗試執行一個無效的epoll_ctl操作:使用-1作為epoll_fd(表示不存在的epoll實例),// 并嘗試添加一個同樣無效的文件描述符(-1)到epoll集合中。// 這樣的操作必然會失敗,但關鍵在于檢查失敗的具體原因。epoll_ctl(-1, EPOLL_CTL_ADD, -1, &ev);// 如果調用失敗且錯誤碼是ENOSYS,說明系統不支持epoll;// 否則,即使調用失敗,只要錯誤碼不是ENOSYS,就認為系統支持epoll。return (errno != ENOSYS);
}#endif // __linux__
協程初始化
// 初始化SRS使用的st庫(simple thread library),并根據操作系統選擇最高效的事件處理系統。
srs_error_t srs_st_init() {
#ifdef __linux__// 檢查Linux系統是否支持epoll。epoll是Linux下用于多路復用I/O的高效接口,// 但一些較舊的Linux版本可能不支持。如果檢測到不支持,則返回錯誤。if (!srs_st_epoll_is_supported()) {return srs_error_new(ERROR_ST_SET_EPOLL, "linux epoll disabled");}
#endif// 根據操作系統選擇最佳的事件處理系統。在Linux上,默認嘗試使用epoll(),// 而在BSD等系統上則可能使用kqueue。st_set_eventsys(ST_EVENTSYS_ALT)嘗試設置為替代的高效事件系統。// 如果設置失敗,則返回錯誤信息。if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) {return srs_error_new(ERROR_ST_SET_EPOLL, "st enable st failed, current is %s", st_get_eventsys_name());}// 調用st_init初始化st庫。如果初始化失敗(返回非零值),則記錄錯誤并返回。int r0 = 0;if((r0 = st_init()) != 0){return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0);}// 如果一切順利,打印日志表明st初始化成功,并指明當前使用的事件系統。srs_trace("st_init success, use %s", st_get_eventsys_name());// 初始化成功,返回srs_success表示無錯誤。return srs_success;
}// 獲取當前協程句柄
srs_thread_t srs_thread_self()
{return (srs_thread_t)st_thread_self();
}
網絡文件的關閉和重用
/*** 關閉指定的網絡文件描述符并設置錯誤檢查。** @param stfd 待關閉的網絡文件描述符引用。* * 此函數確保安全地關閉給定的網絡文件描述符(stfd)。在關閉前,* 它會檢查stfd是否有效,然后調用st_netfd_close進行關閉操作。* 如果關閉操作失敗(返回-1),則會觸發一個斷言錯誤(srs_assert)。* 成功關閉后,將stfd設置為NULL,以防止重復關閉或誤用。*/
void srs_close_stfd(srs_netfd_t& stfd) {if (stfd) {// 確保關閉操作成功執行int err = st_netfd_close((st_netfd_t)stfd);srs_assert(err != -1); // 如果關閉失敗,觸發斷言stfd = NULL; // 成功關閉后,清空引用}
}/*** 設置文件描述符為close-on-exec并處理錯誤。** @param fd 要設置的文件描述符。* @return srs_error_t 操作狀態,成功時返回srs_success,失敗時返回具體錯誤信息。* * 該函數通過fcntl系統調用來獲取文件描述符(fd)的當前標志,并設置FD_CLOEXEC位,* 確保在執行exec函數族創建新進程時,該文件描述符自動關閉,避免泄露到子進程中。* 如果設置操作失敗,則使用srs_error_new創建并返回一個表示錯誤的srs_error_t對象。*/
srs_error_t srs_fd_closeexec(int fd) {int flags = fcntl(fd, F_GETFD); // 獲取當前文件描述符標志flags |= FD_CLOEXEC; // 設置close-on-exec標志if (fcntl(fd, F_SETFD, flags) == -1) { // 嘗試設置標志// 設置失敗,返回錯誤信息return srs_error_new(ERROR_SOCKET_SETCLOSEEXEC, "無法設置FD_CLOEXEC,文件描述符: %d", fd);}return srs_success; // 操作成功,返回成功狀態
}// 設置套接字選項:允許地址重用,以便快速重啟服務時不被TIME_WAIT狀態阻塞
srs_error_t srs_fd_reuseaddr(int fd) {int v = 1; // 值設為1,啟用選項if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(int)) == -1) { // 嘗試設置SO_REUSEADDR// 如果設置失敗,返回一個錯誤描述return srs_error_new(ERROR_SOCKET_SETREUSEADDR, "設置SO_REUSEADDR失敗,文件描述符: %d", fd);}return srs_success; // 設置成功,返回成功狀態
}// 嘗試設置套接字選項以啟用端口共享(如果操作系統支持)
srs_error_t srs_fd_reuseport(int fd) {
#if defined(SO_REUSEPORT) // 檢查SO_REUSEPORT是否被當前系統支持int v = 1; // 啟用選項的值if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &v, sizeof(int)) == -1) { // 嘗試設置SO_REUSEPORT// 如果設置失敗,記錄一個警告(而非錯誤),因為并非所有系統都支持此選項srs_warn("設置SO_REUSEPORT失敗,文件描述符: %d", fd);}
#else // 如果SO_REUSEPORT未定義,提示用戶該特性不受支持#warning "您的操作系統不支持SO_REUSEPORT功能"srs_warn("您的操作系統不支持SO_REUSEPORT,該功能在Linux內核3.9及以上版本可用");
#endifreturn srs_success; // 不管是否設置成功(取決于支持情況),都視為操作成功并返回
}
心跳檢測:
/*** 設置文件描述符的TCP Keepalive選項。** @param fd 需要設置Keepalive選項的文件描述符。* @return srs_error_t 函數執行結果,成功返回srs_success,失敗返回相應的錯誤信息。** 此函數嘗試為給定的文件描述符(fd)啟用TCP Keepalive功能。TCP Keepalive是一種機制,* 用于探測對端是否仍然在線以及連接是否活躍。如果連接在一定時間內沒有數據傳輸,* 系統將自動發送Keepalive探針以檢查連接的狀態。這有助于及時發現并關閉空閑或已斷開的連接,* 防止資源的無效占用。** 注意:此功能的可用性依賴于操作系統定義的SO_KEEPALIVE常量。在不支持該選項的平臺上,* 此函數將直接返回成功,因為在那些環境下無法執行Keepalive設置。*/
srs_error_t srs_fd_keepalive(int fd) {
#ifdef SO_KEEPALIVE // 檢查當前平臺是否支持SO_KEEPALIVE選項int v = 1; // 啟用Keepalive的標志值if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &v, sizeof(int)) == -1) { // 嘗試設置SO_KEEPALIVE選項// 如果設置失敗,返回錯誤信息return srs_error_new(ERROR_SOCKET_SETKEEPALIVE, "設置SO_KEEPALIVE失敗,文件描述符: %d", fd);}
#endifreturn srs_success; // 設置成功或在不支持的平臺上直接返回成功
}
建立TCP連接
/*** 建立一個TCP連接到指定的服務器和端口。** @param server 服務器的地址(域名或IP)。* @param port 目標服務器的端口號。* @param tm 連接超時時間(以微秒計),若為SRS_UTIME_NO_TIMEOUT則無超時限制。* @param pstfd 輸出參數,成功連接后存儲連接的文件描述符指針。* @return srs_error_t 操作結果,成功返回srs_success,失敗返回具體的錯誤信息。** 此函數執行以下步驟建立TCP連接:* 1. 根據輸入的超時時間初始化連接超時變量。* 2. 清零并準備用于存儲連接文件描述符的輸出參數。* 3. 將端口號轉換為字符串格式,用于DNS解析。* 4. 初始化addrinfo結構,設置地址族為任意(AF_UNSPEC),套接字類型為SOCK_STREAM。* 5. 調用getaddrinfo進行地址解析,獲取服務器地址信息。* 6. 創建一個TCP套接字,根據解析得到的地址族。* 7. 使用st_netfd_open_socket將原始套接字包裝為SRS使用的網絡文件描述符。* 8. 使用st_connect嘗試連接到目標服務器,應用超時設置。* 9. 若連接成功,將網絡文件描述符存儲到輸出參數*pstfd中;失敗則關閉文件描述符并返回錯誤。*/srs_error_t srs_tcp_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd) {// 定義一個變量來存儲超時時間,初始設為無超時。st_utime_t timeout = ST_UTIME_NO_TIMEOUT;// 如果傳入的超時時間tm不是默認值,則設置超時時間。if (tm != SRS_UTIME_NO_TIMEOUT) {timeout = tm;}// 初始化傳出參數pstfd為NULL。*pstfd = NULL;// 定義一個局部變量stfd,初始設為NULL。srs_netfd_t stfd = NULL;// 定義一個字符數組來存儲端口號的字符串表示。char sport[8];// 使用snprintf函數將端口號格式化為字符串。snprintf(sport, sizeof(sport), "%d", port);// 定義addrinfo結構體變量hints,用于指定getaddrinfo的搜索條件。addrinfo hints;// 將hints清零。memset(&hints, 0, sizeof(hints));// 設置搜索條件為協議族不指定,即IPv4和IPv6都搜索。hints.ai_family = AF_UNSPEC;// 設置搜索條件為流式套接字。hints.ai_socktype = SOCK_STREAM;// 定義addrinfo指針r,用于存儲getaddrinfo的結果。addrinfo* r = NULL;// 使用SrsAutoFree宏自動釋放r指向的內存。SrsAutoFree(addrinfo, r);// 調用getaddrinfo獲取服務器地址信息。if(getaddrinfo(server.c_str(), sport, (const addrinfo*)&hints, &r)) {// 如果失敗,返回錯誤。return srs_error_new(ERROR_SYSTEM_IP_INVALID, "get address info");}// 根據getaddrinfo的結果創建套接字。int sock = socket(r->ai_family, r->ai_socktype, r->ai_protocol);// 如果創建失敗,返回錯誤。if(sock == -1){return srs_error_new(ERROR_SOCKET_CREATE, "create socket");}// 斷言stfd當前是NULL。srs_assert(!stfd);// 打開套接字為網絡文件描述符。stfd = st_netfd_open_socket(sock);// 如果打開失敗,關閉套接字并返回錯誤。if(stfd == NULL){::close(sock);return srs_error_new(ERROR_ST_OPEN_SOCKET, "open socket");}// 嘗試連接到服務器。if (st_connect((st_netfd_t)stfd, r->ai_addr, r->ai_addrlen, timeout) == -1){// 如果連接失敗,關閉網絡文件描述符并返回錯誤。srs_close_stfd(stfd);return srs_error_new(ERROR_ST_CONNECT, "connect to %s:%d", server.c_str(), port);}// 將打開的網絡文件描述符賦值給傳出參數pstfd。*pstfd = stfd;// 如果連接成功,返回成功狀態。return srs_success;
}
設置TCP監聽套接字的參數并開始監聽指定地址上的連接請求
// 定義一個函數,用于在指定的IP地址和端口上創建并設置TCP監聽套接字。
srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd) {// 初始化錯誤變量為成功狀態。srs_error_t err = srs_success;// 定義一個字符數組,用于存儲端口號的字符串表示。char sport[8];// 使用snprintf函數將端口號格式化為字符串。snprintf(sport, sizeof(sport), "%d", port);// 定義addrinfo結構體變量hints,用于指定getaddrinfo的搜索條件。addrinfo hints;// 將hints清零。memset(&hints, 0, sizeof(hints));// 設置搜索條件為協議族不指定,即IPv4和IPv6都搜索。hints.ai_family = AF_UNSPEC;// 設置搜索條件為流式套接字。hints.ai_socktype = SOCK_STREAM;// 設置搜索條件為只接受數值IP地址。hints.ai_flags = AI_NUMERICHOST;// 定義addrinfo指針r,用于存儲getaddrinfo的結果。addrinfo* r = NULL;// 使用SrsAutoFree宏自動釋放r指向的內存。SrsAutoFree(addrinfo, r);// 調用getaddrinfo獲取IP地址信息。if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {// 如果失敗,創建一個新的錯誤并返回,錯誤信息中包含hints的值。return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)",hints.ai_family, hints.ai_socktype, hints.ai_flags);}// 定義一個整型變量fd,用于存儲套接字描述符。int fd = 0;// 創建套接字。if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {// 如果創建失敗,創建一個新的錯誤并返回,錯誤信息中包含套接字的域、類型和協議。return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d",r->ai_family, r->ai_socktype, r->ai_protocol);}// 調用do_srs_tcp_listen函數來設置TCP監聽套接字。if ((err = do_srs_tcp_listen(fd, r, pfd)) != srs_success) {// 如果設置失敗,關閉套接字并包裝錯誤信息后返回。::close(fd);return srs_error_wrap(err, "fd=%d", fd);}// 如果所有操作都成功,返回初始設置的成功狀態。return err;
}// 定義一個函數,用于設置TCP監聽套接字并返回操作結果。
srs_error_t do_srs_tcp_listen(int fd, addrinfo* r, srs_netfd_t* pfd) {// 初始化錯誤變量為成功狀態。srs_error_t err = srs_success;// 檢測TCP連接的存活性,參考GitHub上SRs的1044號問題。// @see https://github.com/ossrs/srs/issues/1044 if ((err = srs_fd_keepalive(fd)) != srs_success) {// 如果設置TCP keepalive失敗,返回錯誤。return srs_error_wrap(err, "set keepalive");}// 設置文件描述符在exec系列函數調用后關閉。if ((err = srs_fd_closeexec(fd)) != srs_success) {// 如果設置失敗,返回錯誤。return srs_error_wrap(err, "set closeexec");}// 設置套接字地址重用,允許立即重用本地地址。if ((err = srs_fd_reuseaddr(fd)) != srs_success) {// 如果設置失敗,返回錯誤。return srs_error_wrap(err, "set reuseaddr");}// 設置端口重用,允許多個套接字綁定到同一端口。if ((err = srs_fd_reuseport(fd)) != srs_success) {// 如果設置失敗,返回錯誤。return srs_error_wrap(err, "set reuseport");}// 綁定套接字到指定的地址和端口。if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) {// 如果綁定失敗,返回錯誤。return srs_error_new(ERROR_SOCKET_BIND, "bind");}// 開始監聽傳入的連接請求。if (::listen(fd, SERVER_LISTEN_BACKLOG) == -1) {// 如果監聽失敗,返回錯誤。return srs_error_new(ERROR_SOCKET_LISTEN, "listen");}// 打開套接字為網絡文件描述符。if ((*pfd = srs_netfd_open_socket(fd)) == NULL){// 如果打開失敗,返回錯誤。return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open");}// 如果所有操作都成功,返回初始設置的成功狀態。return err;
}
創建UDP套接字并且監聽端口
// 函數:do_srs_udp_listen
// 作用:為UDP套接字設置監聽相關選項。
srs_error_t do_srs_udp_listen(int fd, addrinfo* r, srs_netfd_t* pfd) {// 初始化錯誤狀態為成功。srs_error_t err = srs_success;// 設置文件描述符在exec()調用后自動關閉。if ((err = srs_fd_closeexec(fd)) != srs_success) {// 如果設置失敗,包裝錯誤并返回。return srs_error_wrap(err, "set closeexec");}// 允許套接字地址重用,忽略TIME_WAIT狀態。if ((err = srs_fd_reuseaddr(fd)) != srs_success) {// 如果設置失敗,包裝錯誤并返回。return srs_error_wrap(err, "set reuseaddr");}// 允許端口重用,用于負載均衡。if ((err = srs_fd_reuseport(fd)) != srs_success) {// 如果設置失敗,包裝錯誤并返回。return srs_error_wrap(err, "set reuseport");}// 將套接字綁定到指定的地址和端口。if (bind(fd, r->ai_addr, r->ai_addrlen) == -1) {// 如果綁定失敗,創建新錯誤并返回。return srs_error_new(ERROR_SOCKET_BIND, "bind");}// 將文件描述符包裝為srs_netfd_t類型的套接字。if ((*pfd = srs_netfd_open_socket(fd)) == NULL){// 如果打開失敗,創建新錯誤并返回。return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open");}// 如果所有設置成功,返回初始成功狀態。return err;
}// 函數:srs_udp_listen
// 作用:創建并初始化UDP監聽套接字。
srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd) {// 初始化錯誤狀態為成功。srs_error_t err = srs_success;// 將端口號轉換為字符串形式。char sport[8];snprintf(sport, sizeof(sport), "%d", port);// 設置getaddrinfo搜索條件。addrinfo hints;memset(&hints, 0, sizeof(hints));hints.ai_family = AF_UNSPEC; // 協議族不指定,支持IPv4和IPv6。hints.ai_socktype = SOCK_DGRAM; // 使用數據報套接字。hints.ai_flags = AI_NUMERICHOST; // 只接受數值IP地址。// 動態分配內存用于存儲getaddrinfo結果。addrinfo* r = NULL;SrsAutoFree(addrinfo, r);// 獲取IP地址信息。if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {// 如果失敗,創建新錯誤并返回。return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)",hints.ai_family, hints.ai_socktype, hints.ai_flags);}// 創建套接字。int fd = 0;if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {// 如果創建失敗,創建新錯誤并返回。return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d",r->ai_family, r->ai_socktype, r->ai_protocol);}// 調用do_srs_udp_listen函數設置監聽選項。if ((err = do_srs_udp_listen(fd, r, pfd)) != srs_success) {// 如果設置失敗,關閉套接字,包裝錯誤并返回。::close(fd);return srs_error_wrap(err, "fd=%d", fd);}// 如果所有操作成功,返回初始成功狀態。return err;
}
協程的封裝和網絡文件描述符的相關操作
// 創建一個新的條件變量。
srs_cond_t srs_cond_new() {return (srs_cond_t)st_cond_new(); // 調用st_cond_new創建條件變量并轉換類型。
}// 銷毀條件變量。
int srs_cond_destroy(srs_cond_t cond) {return st_cond_destroy((st_cond_t)cond); // 調用st_cond_destroy銷毀條件變量。
}// 等待條件變量。
int srs_cond_wait(srs_cond_t cond) {return st_cond_wait((st_cond_t)cond); // 調用st_cond_wait等待條件變量。
}// 帶超時的等待條件變量。
int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout) {return st_cond_timedwait((st_cond_t)cond, (st_utime_t)timeout); // 調用st_cond_timedwait帶超時等待條件變量。
}// 發送條件變量信號。
int srs_cond_signal(srs_cond_t cond) {return st_cond_signal((st_cond_t)cond); // 調用st_cond_signal發送條件變量信號。
}// 創建一個新的互斥鎖。
srs_mutex_t srs_mutex_new() {return (srs_mutex_t)st_mutex_new(); // 調用st_mutex_new創建互斥鎖并轉換類型。
}// 銷毀互斥鎖。
int srs_mutex_destroy(srs_mutex_t mutex) {if (!mutex) {return 0; // 如果互斥鎖為空,直接返回0。}return st_mutex_destroy((st_mutex_t)mutex); // 調用st_mutex_destroy銷毀互斥鎖。
}// 加鎖互斥鎖。
int srs_mutex_lock(srs_mutex_t mutex) {return st_mutex_lock((st_mutex_t)mutex); // 調用st_mutex_lock加鎖。
}// 解鎖互斥鎖。
int srs_mutex_unlock(srs_mutex_t mutex) {return st_mutex_unlock((st_mutex_t)mutex); // 調用st_mutex_unlock解鎖。
}// 獲取網絡文件描述符的文件編號。
int srs_netfd_fileno(srs_netfd_t stfd) {return st_netfd_fileno((st_netfd_t)stfd); // 調用st_netfd_fileno獲取文件編號。
}// 使當前線程休眠指定的微秒數。
int srs_usleep(srs_utime_t usecs) {return st_usleep((st_utime_t)usecs); // 調用st_usleep使線程休眠。
}// 將操作系統文件描述符包裝為網絡文件描述符。
srs_netfd_t srs_netfd_open_socket(int osfd) {return (srs_netfd_t)st_netfd_open_socket(osfd); // 調用st_netfd_open_socket并轉換類型。
}// 將操作系統文件描述符包裝為通用文件描述符。
srs_netfd_t srs_netfd_open(int osfd) {return (srs_netfd_t)st_netfd_open(osfd); // 調用st_netfd_open并轉換類型。
}// 從網絡文件描述符接收數據。
int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout) {return st_recvfrom((st_netfd_t)stfd, buf, len, from, fromlen, (st_utime_t)timeout); // 調用st_recvfrom接收數據。
}// 接受連接請求。
srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout) {return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout); // 調用st_accept并轉換類型。
}// 從網絡文件描述符讀取數據。
ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout) {return st_read((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout); // 調用st_read讀取數據。
}// 檢查超時時間是否為永不超時。
bool srs_is_never_timeout(srs_utime_t tm) {return tm == SRS_UTIME_NO_TIMEOUT; // 比較是否為永不超時的標記值。
}// SrsStSocket類的構造函數。
SrsStSocket::SrsStSocket() {// 初始化成員變量。stfd = NULL;stm = rtm = SRS_UTIME_NO_TIMEOUT;rbytes = sbytes = 0;
}// SrsStSocket類的析構函數。
SrsStSocket::~SrsStSocket() {// 清理工作,當前為空。
}// 初始化SrsStSocket對象。
srs_error_t SrsStSocket::initialize(srs_netfd_t fd) {stfd = fd; // 設置網絡文件描述符。return srs_success; // 返回成功狀態。
}// 設置接收超時時間。
void SrsStSocket::set_recv_timeout(srs_utime_t tm) {rtm = tm; // 設置接收超時。
}// 獲取接收超時時間。
srs_utime_t SrsStSocket::get_recv_timeout() {return rtm; // 返回接收超時。
}// 設置發送超時時間。
void SrsStSocket::set_send_timeout(srs_utime_t tm) {stm = tm; // 設置發送超時。
}// 獲取發送超時時間。
srs_utime_t SrsStSocket::get_send_timeout() {return stm; // 返回發送超時。
}// 獲取接收的字節數。
int64_t SrsStSocket::get_recv_bytes() {return rbytes; // 返回接收字節數。
}// 獲取發送的字節數。
int64_t SrsStSocket::get_send_bytes() {return sbytes; // 返回發送字節數。
}
從套接字讀取數據
// SrsStSocket類成員函數,用于從套接字讀取數據。
srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) {// 初始化錯誤狀態為成功。srs_error_t err = srs_success;// 聲明用于存儲讀取字節數的變量。ssize_t nb_read;// 如果接收超時時間設置為永不超時,則使用ST_UTIME_NO_TIMEOUT。if (rtm == SRS_UTIME_NO_TIMEOUT) {nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);} else {// 否則使用設置的超時時間。nb_read = st_read((st_netfd_t)stfd, buf, size, rtm);}// 如果nread指針不為空,將讀取的字節數賦值給它。if (nread) {*nread = nb_read;}// 成功時返回實際讀取的非負整數(值為0表示網絡連接關閉或文件結束)。// 失敗時返回-1,并設置errno以指示錯誤。if (nb_read <= 0) {// 參見GitHub上ossrs/srs的200號問題。// 如果讀取失敗,并且errno設置為ETIME,表示超時。if (nb_read < 0 && errno == ETIME) {return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm));}// 如果讀取的字節數為0,設置errno為ECONNRESET。if (nb_read == 0) {errno = ECONNRESET;}// 創建一個新的錯誤,表示讀取失敗。return srs_error_new(ERROR_SOCKET_READ, "read");}// 將讀取的字節數累加到接收字節計數器。rbytes += nb_read;// 返回初始設置的錯誤狀態。return err;
}
從套接字完全讀取數據大小
// SrsStSocket類成員函數,用于從套接字完全讀取指定大小的數據。
srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) {// 初始化錯誤狀態為成功。srs_error_t err = srs_success;// 聲明用于存儲讀取字節數的變量。ssize_t nb_read;// 如果接收超時時間設置為永不超時,則使用ST_UTIME_NO_TIMEOUT。if (rtm == SRS_UTIME_NO_TIMEOUT) {nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT);} else {// 否則使用設置的超時時間。nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm);}// 如果nread指針不為空,則將實際讀取的字節數賦值給它。if (nread) {*nread = nb_read;}// 成功時返回實際讀取的非負整數,如果讀取的字節數小于請求的nbyte,// 則表示網絡連接關閉或文件結束。// 失敗時返回-1,并設置errno以指示錯誤。if (nb_read != (ssize_t)size) {// 參見GitHub上ossrs/srs的200號問題。if (nb_read < 0 && errno == ETIME) {// 如果讀取失敗,并且errno設置為ETIME,表示超時。return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm));}// 如果讀取的字節數大于等于0但不等于請求的大小,則設置errno為ECONNRESET。if (nb_read >= 0) {errno = ECONNRESET;}// 創建一個新的錯誤,表示沒有完全讀取數據。return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully");}// 將實際讀取的字節數累加到接收字節計數器。rbytes += nb_read;// 返回初始設置的錯誤狀態。return err;
}
TCP連接
// SrsTcpClient類的構造函數,初始化TCP客戶端。
SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm) {// 初始化套接字文件描述符為NULL。stfd = NULL;// 創建SrsStSocket類的實例。io = new SrsStSocket();// 存儲提供的主機和端口。host = h;port = p;// 存儲超時時間。timeout = tm;
}// SrsTcpClient類的析構函數,清理資源。
SrsTcpClient::~SrsTcpClient() {close(); // 關閉連接并清理套接字資源。// 釋放SrsStSocket實例。srs_freep(io);
}// 連接到服務器的成員函數。
srs_error_t SrsTcpClient::connect() {// 初始化錯誤狀態為成功。srs_error_t err = srs_success;close(); // 先關閉任何現有的連接。// 斷言確保套接字文件描述符是NULL。srs_assert(stfd == NULL);// 嘗試通過srs_tcp_connect函數創建TCP連接。if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) {// 如果連接失敗,包裝錯誤并返回。return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout));}// 使用創建的套接字文件描述符初始化I/O對象。if ((err = io->initialize(stfd)) != srs_success) {return srs_error_wrap(err, "tcp: init socket object");}return err; // 返回錯誤狀態。
}// 關閉TCP連接的成員函數。
void SrsTcpClient::close() {// 如果I/O對象不存在,則忽略關閉操作。if (!io) {return;}// 關閉套接字。srs_close_stfd(stfd);
}// 設置接收超時時間的成員函數。
void SrsTcpClient::set_recv_timeout(srs_utime_t tm) {io->set_recv_timeout(tm); // 委托給I/O對象設置。
}// 獲取接收超時時間的成員函數。
srs_utime_t SrsTcpClient::get_recv_timeout() {return io->get_recv_timeout(); // 委托給I/O對象獲取。
}// 設置發送超時時間的成員函數。
void SrsTcpClient::set_send_timeout(srs_utime_t tm) {io->set_send_timeout(tm); // 委托給I/O對象設置。
}// 獲取發送超時時間的成員函數。
srs_utime_t SrsTcpClient::get_send_timeout() {return io->get_send_timeout(); // 委托給I/O對象獲取。
}// 獲取接收字節數的成員函數。
int64_t SrsTcpClient::get_recv_bytes() {return io->get_recv_bytes(); // 委托給I/O對象獲取。
}// 獲取發送字節數的成員函數。
int64_t SrsTcpClient::get_send_bytes() {return io->get_send_bytes(); // 委托給I/O對象獲取。
}// 從TCP連接中讀取數據的成員函數。
srs_error_t SrsTcpClient::read(void* buf, size_t size, ssize_t* nread) {return io->read(buf, size, nread); // 委托給I/O對象執行讀取。
}// 從TCP連接中完全讀取數據的成員函數。
srs_error_t SrsTcpClient::read_fully(void* buf, size_t size, ssize_t* nread) {return io->read_fully(buf, size, nread); // 委托給I/O對象執行完全讀取。
}// 向TCP連接寫入數據的成員函數。
srs_error_t SrsTcpClient::write(void* buf, size_t size, ssize_t* nwrite) {return io->write(buf, size, nwrite); // 委托給I/O對象執行寫入。
}// 向TCP連接寫入多個緩沖區數據的成員函數。
srs_error_t SrsTcpClient::writev(const iovec *iov, int iov_size, ssize_t* nwrite) {return io->writev(iov, iov_size, nwrite); // 委托給I/O對象執行多緩沖區寫入。
}