
原因
進程作為程序執行過程中資源分配的基本單位,擁有獨立的地址空間,同一進程的線程可以共享本進程的全局變量,靜態變量等數據和地址空間,但進程之間資源相互獨立。
由于PHP語言不支持多線程,因此Swoole使用多進程模式,再多進程模式下就存在進程內存隔離,進程間通信與數據共享問題。
swoole中master主進程
會創建manager管理進程
和reactor線程
,真正的工作進程為worker進程
。
manager
是創建和管理worker
進程,reactor
進程測試監聽socket
,接受數據任務,發送給worker
進程去工作,因此所有業務邏輯最終都是在worker
進程中進行,worker
進程之間的數據共享與通信必不可少。
swoole中設置選項worker_num
啟動的worker
進程數,默認設置為CPU核數
。
例如:
$server = new swoole_server('127.0.0.1',9898);
$server->set(array('worker_num' => 4, //設置啟動的Worker進程數。
));
如上面說描述,進程存在進程隔離:
$fds = array();
$server->on('connect', function ($server, $fd){echo "connection open: {$fd}n";global $fds;$fds[] = $fd;var_dump($fds);
});
$fds
雖然是全局變量,但是只在但前的進程內有效,swoole服務器底層會創建多個worker
進程,此處打印出來的只有部分連接的fd
。
解決方法:
swoole為我們提供了兩種有效的解決方法,都是基于多進程內存型數據庫,代替單進程PHP變量來存儲fd
。
第一種為:swoole_redis
,特點是使用簡單,跟PHP原生的redis
用法幾乎一致。
第二種為:swoole_table
,這是swoole官方研制的一款內存型數據庫,比redis
的可擴展性要強許多,單機器的情況下牛牛推薦大家使用這種方法。
Swoole_Tbale
swoole_redis
沒什么好談的,因為redis
都是個老活了,用法都一個鳥樣。
下面我們就重點搞下swoole_table
的用法。
一波官方說明襲來:
Table
一個基于共享內存和鎖實現的超高性能,并發數據結構。用于解決多進程/多線程數據共享和同步加鎖問題。
請謹慎使用數組方式讀寫Table, 建議使用文檔中提供的API來進行操作
數組方式取出的SwooleTableRow對象為一次性對象, 請勿依賴其進行過多操作
優勢
性能強悍,單線程每秒可讀寫200萬次
應用代碼無需加鎖,Table內置行鎖自旋鎖,所有操作均是多線程/多進程安全。用戶層完全不需要考慮數據同步問題。
支持多進程,Table可以用于多進程之間共享數據
使用行鎖,而不是全局鎖,僅當2個進程在同一CPU時間,并發讀取同一條數據才會進行發生搶鎖
Table的內存容量不受PHP的memory_limit控制
官方文檔地址:https://wiki.swoole.com/wiki/page/p-table.html
多進程數據共享的WebSocket例子:
<?php
// +----------------------------------------------------------------------
// 小黃牛blog - Swoole 即時通訊交互處理
// +----------------------------------------------------------------------
// Copyright (c) 2018 https://xiuxian.junphp.com All rights reserved.
// +----------------------------------------------------------------------
// Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// Author: 小黃牛 <1731223728@qq.com>
// +----------------------------------------------------------------------
class Server{/*** 客戶端身份存儲器*/private $_table = []; /*** WS的啟動實例*/private $_ws;/*** host-IP,0.0.0.0表示允許接收所有請求*/private $_host = '0.0.0.0';/*** 端口號*/private $_port = '9502';/*** 最大服務端心跳重連次數*/private $_max = 3;/*** 強制心跳重連啟動狀態*/private $_status = true;/*** 這是啟動服務端的入口*/public function run() { $this->start_service(); $this->start_table(); $this->start_handshake();$this->start_message();$this->end();}/*** ①啟動websocker服務*/private function start_service() {# 創建websocket服務器對象,監聽0.0.0.0:9502端口$this->_ws = new swoole_websocket_server($this->_host, $this->_port);$this->_ws->set(['worker_num' => 4,// 開4個工作進程]);}/*** ①創建Table服務*/private function start_table() {# 創建最大只能存儲1024個用戶的數據$this->_table = new swoole_table(1024);# 創建字段$this->_table->column('fd', swoole_table::TYPE_INT, 8); // FD$this->_table->column('status', swoole_table::TYPE_INT, 8); // 離線狀態$this->_table->column('heartbeat', swoole_table::TYPE_INT, 8); // 心跳重連數$this->_table->column('user_id', swoole_table::TYPE_STRING, 32); // 會員ID$this->_table->column('user_nice', swoole_table::TYPE_STRING, 32); // 會員名稱$this->_table->create();# 將表附加到ws實例里,方便后續使用$this->_ws->user = $this->_table;}/*** ②監聽WebSocket握手申請*/private function start_handshake() {# 監聽WebSocket連接打開事件$this->_ws->on('open', function ($ws, $request){# 這里可以做些鑒權驗證之類的});}/*** ③監聽客戶端消息發送請求*/private function start_message() {# 監聽WebSocket消息事件$this->_ws->on('message', function ($ws, $frame) {$data = json_decode($frame->data, true);$user_id = $data['user_id'];# 加入存儲器$this->_ws->user->set($user_id, ['fd' => $frame->fd, # FD'status' => 1, # 設置上線狀態'heartbeat' => 0, # 重置心跳重連數'user_id' => $data['user_id'], # 用戶ID'user_nice' => $data['user_nice'], # 用戶昵稱]);# 登錄廣播處理if ($data['code'] == 1) {# 發送廣播上線消息$data['content'] = '【'.$data['user_nice'].'】騎著小黃牛上線啦~!';$this->broadcast($ws, $this->json($data), $user_id);# 心跳重連檢測} else if ($data['code'] == 4) {$this->broadcast($ws, $frame->data, $user_id);$this->timer();# 其他請求} else {# 廣播消息$this->broadcast($ws, $frame->data, $user_id);}});} /*** ④監聽客戶端退出事件*/private function end() {# 這里加入了unset,清除open存儲器,防止存儲器無限增大# 監聽WebSocket連接關閉事件$this->_ws->on('close', function ($ws, $fd) {# 這塊提取用戶信息還有優化空間,實際開發中這樣for會消耗內存$user = null;foreach ($this->_ws->user as $k=>$v) {if ($v['fd'] == $fd) {$user = $v;break;}}# 獲取用戶ID$user_id = $user['user_id'];# 獲取用戶nice$user_nice = $user['user_nice'];# 設置離線狀態$this->_ws->user->set($user_id, ['status' => 0, # 設置離線狀態]);$data = ['code' => 2,'user_id' => $user_id,'user_nice' => $user_nice,'content' => '【'.$user_nice.'】騎著小掃帚灰溜溜的走了~~!'];# 廣播消息$this->broadcast($ws, $this->json($data));});$this->_ws->start();}/*** 廣播消息* @todo 無* @author 小黃牛* @version v1.0.0.1 + 2018.11.12* @deprecated 暫不棄用* @global 無* @param object $wx 實例* @param string $content 廣播內容* @param string $id 用戶的userid* @param bool $status 是否做心跳限制 * @return void*/private function broadcast($ws, $content, $id=null, $status=false) {# 向所有人廣播foreach ($this->_ws->user as $k=>$v) {# 不向自己廣播,并且要在線的# 注意,這里一定要有上線狀態的限制,否則假設用戶已經退出,但你的進程還開著,實際上已經關閉,這時候push就會報錯# 只有正常在線的用戶才能接收到廣播# 加入心跳檢測限制if ($k != $id && $v['status'] == 1 && $status == true) {$ws->push($v['fd'], $content);} else if ($v['user_id'] != $id && $v['status'] == 1 && $v['heartbeat'] == 0) {$ws->push($v['fd'], $content);}}}/*** 數組轉json* @todo 無* @author 小黃牛* @version v1.0.0.1 + 2018.11.08* @deprecated 暫不棄用* @global 無* @param array $array 數組* @return json*/private function json($array) {return json_encode($array, JSON_UNESCAPED_UNICODE);}/*** 服務端定時強制心跳檢測* @todo 無* @author 小黃牛* @version v1.0.0.1 + 2018.11.08* @deprecated 暫不棄用* @global 無* @return void*/private function timer() {# 注意強制心跳觸發器不能放在open事件里,因為那時候用戶還沒有提交登錄請求,是還沒有userID的# 還有,強制心跳定時器只能觸發一次,否則會出現生成多個定時器的情況if ($this->_status) {$this->_status = false;/*** ⑤服務端強制心跳檢測* 每隔1分鐘發送1次,如果連續3次強制心跳檢測未通過,服務端將強制斷開連接*/$obj = $this;swoole_timer_tick(60000, function ($timer_id) use (&$obj) {# 廣播消息$obj->broadcast($obj->_ws, $obj->json(['code' => 5]), null, true);# 所有人的心跳次數+1foreach ($this->_ws->user as $k=>$v) {if (empty($v['heartbeat'])) {# 重置心跳次數$this->_ws->user->set($v['user_id'], ['heartbeat' => 0,]);}# 心跳次數累加$this->_ws->user->set($v['user_id'], ['heartbeat' => $v['heartbeat']+1]);# 心跳次數大于等于_max && 在線的 的連接關閉if ($v['heartbeat'] >= $obj->_max && $v['status'] == 1) {$data = $v;# 發送強制掉線廣播$data['code'] = 6;$data['content'] = '【'.$data['user_nice'].'】已被服務端強制下線!';$obj->broadcast($obj->_ws, $obj->json($data), null, true);# 這里不需要unset連接,因為在close事件中,已經將這個連接設置為離線了# 主動關閉連接k$obj->_ws->close($v['fd']);}}});}}
}
$socketServer = new Server();
$socketServer->run();
最后推薦大家可以用下我開源的一個基于Swoole4.5+研發的PHP框架。該框架基于注解實現了很多好玩的功能,很適合新人快速上手Swoole擴展。
SW-X框架-專注高性能便捷開發而生的PHP-SwooleX框架?www.sw-x.cn