Online服務器的第三部分就是數據層,send_request_to_db開始了數據層的處理邏輯:
int send_request_to_db(int cmd, sprite_t* p, int body_len, const void* body_buf, uint32_t id);
在該函數里首先以懶惰的方式連接數據庫服務器,獲取一個網絡連接,注意參數p,如果該參數為空,那么就說明不關心數據庫代理服務器返回的數據:
if (!p) pkg->seq = 0;
else pkg->seq = (sprite_fd (p) << 16) | p->waitcmd;
注意以上的代碼,如果不關心返回數據,那么直接將pkg的seq字段設置為0即可,如果關心返回結果,就需要用這個seq字段保存一些信息了,比如當前處理的業務協議是什么,還有就是這個客戶端實體p的對應的父進程的套結字描述符是多少,然后將這個pkg連同信息體一起發送給數據庫代理服務器,等到代理服務器返回的時候會進入worker_handle_net中處理,注意handle_process函數里關于子進程的執行路線:
int worker_handle_net(int fd, void* buf, int len, sprite_t** u)
{
assert( len >= sizeof(server_proto_t) );
server_proto_t* dbpkg = buf;
…
} else if (fd == proxysvr_fd) {
return handle_db_return(fd, dbpkg, len, u);
}
return 0;
}
執行流進入handle_db_return:
static int handle_db_return(int fd, server_proto_t* dbpkg, int len, sprite_t** sp)
{
int waitcmd = dbpkg->seq & 0xFFFF;
int conn = dbpkg->seq >> 16;
if (!dbpkg->seq || waitcmd == PROTO_LOGOUT) //如果不關心返回數據,則在send_request_to_db就已經將seq設置成了0,于是直接返回,否則取出保存的fd信息
return 0;
if (!(*sp = get_sprite_by_fd(conn)) || (*sp)->waitcmd != waitcmd) {
//出錯
}
…
int err = -1;
switch (dbpkg->ret) {
case 0:
break; //成功
…//處理各種錯誤碼
在處理各種錯誤碼的時候可以根據不同的協議進行不同的動作,協議保存在sprite_t的waitcmd字段中。在沒有錯誤的情況下就會進入數據層的回調處理:
#define DO_MESSAGE(n, func) /
case n: err = func(*sp, dbpkg->id, dbpkg->body, len - sizeof (server_proto_t)); break
和協議層的處理十分類似,也是回調函數的形式,只不過這里沒有提前注冊,只是簡單的封裝了一下switch-case開關。對于前面的例子就是:
DO_MESSAGE(SVR_PROTO_RACE_SIGN, race_sign_callback);
int race_sign_callback(sprite_t* p, uint32_t id, char* buf, int len)
{
uint32_t itms[2] = {12999, 12998};
CHECK_BODY_LEN(len, 4);
p->teaminfo.team = *(uint32_t*)buf;
if (p->teaminfo.team != 1 && p->teaminfo.team != 2) {
ERROR_RETURN(("race failed/t[%u %u]", p->id, p->teaminfo.team), -1);
}
db_single_item_op(0, p->id, itms[p->teaminfo.team - 1], 1, 1);
response_proto_uint32(p, p->waitcmd, p->teaminfo.team, 0); //一定要向客戶端回應,否則客戶端將掛起
return 0;
}
一定要返回給客戶端一個數據,因為客戶端和服務器是一問多答式的,服務器的應答可以分好幾部分來返回給客戶端,比如一共需要返回5次,那么在這5次全部返回之間,服務器是不接受同一個客戶端的別的請求的,必然是一問多答,而不是多問多答。注意send_to_self的最后一個參數的意義:
int send_to_self(sprite_t *p, uint8_t *buffer, int len, int completed)
如果completed為1,那么在該函數中就會將p的waitcmd設置為0,代表當前的這個協議已經處理完畢,online可以處理下一個協議請求了,否則就意味著當前的協議還沒有處理完畢,online不接收新的協議請求,這個在dispatch_protocol中體現:
if (p->waitcmd != 0) {
send_to_self_error(p, cmd, -ERR_system_busy, 0);
WARN_RETURN(("wait for cmd=%d, id=%u, new cmd=%d", p->waitcmd, p->id, cmd), 0);
}
Onlien服務器通過這種方式解決了一些同步問題,一條協議沒有處理完是不接受另外的協議的。關于數據同步,其實online服務器使用了另外的方案,并沒有使用傳統的鎖之類的,而是使用了一個全局變量,并且onlien中不存在線程的概念,因此基本不存在處理數據時的數據共享訪問,因此一個子進程同時只能處理一個客戶的請求,因此全局變量msg被定義出來,用來保存需要返回給客戶端的消息,注意包含協議頭部。最后的問題就是請求和回應時的數據組織了,對于請求包,用UNPKG_UINT32來解析包的內容,j是游標號,需要在外部定義然后在外部使用,初始值就是需要開始解析的位置距離包(也就是b)開始的以字節為單位的大小,比如一個buffer,協議頭為8個字節,我們需要解析協議體,也就是有效載荷,那么我們需要如下代碼:
Int j = 8, v = count;
UNPKG_UINT32(buffer, count, j);
只要看看下面的定義就一目了然了:
#define UNPKG_UINT32(b, v, j) /
do { /
(v) = ntohl( *(uint32_t*)((b)+(j)) ); (j) += 4; /
} while (0)
對于封包同樣的方式,只是將流程反過來了:
#define PKG_UINT32(b, v, j) /
do { /
*(uint32_t*)((b)+(j)) = htonl(v); (j) += 4; /
} while (0)
在往客戶端返回包的時候,封包的過程就是用的PKG_UINT32,如果連包頭一起封裝,那么就是下面的流程:
int j = sizeof(protocol_t); //空余了包頭的空間
PKG_UINT32(msg, intbuf1, j); //從包頭的下一個字節開始打包
PKG_UINT32(msg, intbuf2, j); //繼續打包
…
關于事件處理器是和數據庫相關的處理器并列的邏輯處理器,這個處理器主要處理系統事件的,由于事件分為好多種,如果寫進一個協議處理回調函數會使得這個函數的職責太多,不明確,如果每個事件作為一個協議封裝,那么又會使整個協議處理器的架構主次不分,很含糊,因此就專門為事件處理單獨列一個更低級的層次進行處理,也就是和協議處理不在一個層次,而專門為所有事件單獨封裝一個協議處理回調函數,然后為了協議處理的清晰,在這個協議處理鉤子中將事件分發到不同的事件處理器中,如此一來,事件處理就單獨成了一個子層次