文章目錄
- 首先是解決如何運行gatewayworker
- 調試gatewayworker程序
- 向指定客戶端發送消息
- 在TP框架中調用Gateway的API
- 總結說明
測試環境 windows10;PHP7.2;TP5.1;
這里只介紹如何使用TP集成的workerman擴展庫think-worker
,原生workerman的使用請參考官方文檔
TP5.1集成了workerman,使用composer require topthink/think-worker=2.0.*
安裝即可。
TP5.1只能安裝think-worker2.0版本,最新的think-worker3.0版本是給TP6.0用的,但依賴安裝workerman的版本是最新的。
雖然集成了,但是在windows下使用還是有許多問題,比如直接運行命令php think woker:gateway
會報錯GatewayWorker Not Support On Windows.
windows解決方案 ,Linux下可以直接運行(應該吧~)。
官方的使用文檔也不夠詳細,只列舉了worker
和worker:server
兩種運行方式的簡單示列。但是大部分使用workerman都是奔著GatewayWorker去的,畢竟自己用workerman完全搭建還是需要技術和時間的。
單純的使用workerman,直接運行php think worker
或php think worker:server
就可以,調試也非常簡單,TP官方文檔有說明就不贅述了,重點是gatewayworker。
首先是解決如何運行gatewayworker
根據workerman的文檔,windows下不能在同一個php文件中運行多個worker,所以需要修改tinkphp的命令行
新建自定義命令行文件application\common\command\Workerman.php
<?phpnamespace app\common\command;use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\facade\Config;
use Workerman\Worker;/*** Worker 命令行*/
class Workerman extends Command
{protected function configure(){$this->setName('workerman')->addArgument('service', Argument::OPTIONAL, 'workerman service: gateway|register|businessworker', null)->addOption('host', 'H', Option::VALUE_OPTIONAL, 'the host of workerman server', null)->addOption('port', 'P', Option::VALUE_OPTIONAL, 'the port of workerman server', null)->addOption('daemon', 'd', Option::VALUE_OPTIONAL, 'Run the workerman server in daemon mode.')->setDescription('workerman Server for ThinkPHP');}public function execute(Input $input, Output $output){$service = $input->getArgument('service');$option = Config::pull('gateway_worker');if ($input->hasOption('host')) {$host = $input->getOption('host');} else {$host = !empty($option['host']) ? $option['host'] : '0.0.0.0';}if ($input->hasOption('port')) {$port = $input->getOption('port');} else {$port = !empty($option['port']) ? $option['port'] : '2347';}$registerAddress = !empty($option['registerAddress']) ? $option['registerAddress'] : '127.0.0.1:1236';switch ($service) {case 'register':$this->register($registerAddress);break;case 'businessworker':$this->businessWorker($registerAddress, isset($option['businessWorker']) ? $option['businessWorker'] : []);break;case 'gateway':$this->gateway($registerAddress, $host, $port, $option);break;default:$output->writeln("<error>Invalid argument action:{$service}, Expected gateway|register|businessworker .</error>");exit(1);break;}Worker::runAll();}/*** 啟動register* @access public* @param string $registerAddress* @return void*/public function register($registerAddress){// 初始化registernew Register('text://' . $registerAddress);}/*** 啟動businessWorker* @access public* @param string $registerAddress registerAddress* @param array $option 參數* @return void*/public function businessWorker($registerAddress, $option = []){// 初始化 bussinessWorker 進程$worker = new BusinessWorker();$this->option($worker, $option);$worker->registerAddress = $registerAddress;}/*** 啟動gateway* @access public* @param string $registerAddress registerAddress* @param string $host 服務地址* @param integer $port 監聽端口* @param array $option 參數* @return void*/public function gateway($registerAddress, $host, $port, $option = []){// 初始化 gateway 進程if (!empty($option['socket'])) {$socket = $option['socket'];unset($option['socket']);} else {$protocol = !empty($option['protocol']) ? $option['protocol'] : 'websocket';$socket = $protocol . '://' . $host . ':' . $port;unset($option['host'], $option['port'], $option['protocol']);}$gateway = new Gateway($socket, isset($option['context']) ? $option['context'] : []);// 以下設置參數都可以在配置文件中重新定義覆蓋$gateway->name = 'Gateway';$gateway->count = 4;$gateway->lanIp = '127.0.0.1';$gateway->startPort = 2000;$gateway->pingInterval = 30;$gateway->pingNotResponseLimit = 0;$gateway->pingData = '{"type":"ping"}';$gateway->registerAddress = $registerAddress;// 全局靜態屬性設置foreach ($option as $name => $val) {if (in_array($name, ['stdoutFile', 'daemonize', 'pidFile', 'logFile'])) {Worker::${$name} = $val;unset($option[$name]);}}$this->option($gateway, $option);}/*** 設置參數* @access protected* @param Worker $worker Worker對象* @param array $option 參數* @return void*/protected function option($worker, array $option = []){// 設置參數if (!empty($option)) {foreach ($option as $key => $val) {$worker->$key = $val;}}}
}
在application\command.php
命令行參數配置文件中添加
return ['workerman' => '\\app\\common\\command\\Workerman',
];
打開三個cmd命令窗口,分別運行
php think workerman register
php think workerman businessworker
php think workerman gateway
運行結果
調試gatewayworker程序
添加Events監聽事件文件application\workerman\Events.php
,這里偷懶直接復制了官方的Events文件,自己寫的話,方法沒寫全運行時會報錯退出,所以干脆直接全部復制,修改一下命名空間即可。
<?phpnamespace app\workerman;use GatewayWorker\Lib\Gateway;
use think\worker\Application;
use Workerman\Worker;/*** Worker 命令行服務類*/
class Events
{/*** onWorkerStart 事件回調* 當businessWorker進程啟動時觸發。每個進程生命周期內都只會觸發一次** @access public* @param \Workerman\Worker $businessWorker* @return void*/public static function onWorkerStart(Worker $businessWorker){$app = new Application;$app->initialize();}/*** onConnect 事件回調* 當客戶端連接上gateway進程時(TCP三次握手完畢時)觸發** @access public* @param int $client_id* @return void*/public static function onConnect($client_id){Gateway::sendToCurrentClient("Your client_id is $client_id");}/*** onWebSocketConnect 事件回調* 當客戶端連接上gateway完成websocket握手時觸發** @param integer $client_id 斷開連接的客戶端client_id* @param mixed $data* @return void*/public static function onWebSocketConnect($client_id, $data){var_export($data);}/*** onMessage 事件回調* 當客戶端發來數據(Gateway進程收到數據)后觸發** @access public* @param int $client_id* @param mixed $data* @return void*/public static function onMessage($client_id, $data){Gateway::sendToAll($data);}/*** onClose 事件回調 當用戶斷開連接時觸發的方法** @param integer $client_id 斷開連接的客戶端client_id* @return void*/public static function onClose($client_id){GateWay::sendToAll("client[$client_id] logout\n");}/*** onWorkerStop 事件回調* 當businessWorker進程退出時觸發。每個進程生命周期內都只會觸發一次。** @param \Workerman\Worker $businessWorker* @return void*/public static function onWorkerStop(Worker $businessWorker){echo "WorkerStop\n";}
}
修改配置監聽文件config\gateway_worker.php
// BusinsessWorker配置'businessWorker' => ['name' => 'BusinessWorker','count' => 1,'eventHandler' => '\app\workerman\Events', // 原來是\think\worker\Events,改成自己的監聽文件位置],
添加前端測試文件,這里使用的是vue,關于前端如何使用webSocket,網上到處都是,也很簡單。
// vue測試代碼片段
export default {data () {return {websocket: null}},mounted () {this.websocket = new WebSocket('ws://127.0.0.1:2348') // 使用gateway的地址this.websocket.onmessage = evt => {console.log(evt.data) // 打印接收的消息}}
}
重啟businessworker服務,運行vue,前端控制臺會打印
Your client_id is 7f00000107d000000001
這是在Events文件監聽事件onConnect中的程序,當客戶端連接時,向當前客戶端發送信息,多開幾個窗口,測試多客戶端連接時的效果。
向指定客戶端發送消息
首先需要明確的是,gatewayworker只能通過client_id識別客戶端,每產生一次連接,就會生成一個client_id,即便是同一個頁面,發生了多次連接,gatewayworker也會認為是不同的客戶端。
實際業務中客戶端往往是以用戶id或其他形式的id作為區分,所以實際業務中需要將client_id和業務id進行綁定并做判斷,這里做測試就不深入討論了,直接用client_id進行測試
修改前端文件
// vue模板代碼片段
<template><el-row type="flex"><el-select v-model="selectClientId"><el-option v-for="(item, index) in clients" :key="index" :value="item" :label="item" /></el-select><el-input v-model="message"></el-input><el-button @click="submit">發送</el-button></el-row>
</template>
// vue js代碼片段data () {return {websocket: null,clients: [], // client用戶列表selectClientId: '', // 選擇的用戶message: '' // 需要發送的消息}},methods: {submit () {const data = {client_id: this.selectClientId, // 指定的客戶端idmessage: this.message}this.websocket.send(JSON.stringify(data))}},mounted () {this.websocket = new WebSocket('ws://127.0.0.1:2348') // 使用gateway的地址this.websocket.onmessage = evt => {const data = JSON.parse(evt.data)if (data.type === 'login') {this.clients.push(data.client_id)}console.log(data.message)}}
修改監聽文件,修改了onConnect和onMessage兩個監聽回調
# ...public static function onConnect($client_id){// Gateway::sendToCurrentClient("Your client_id is $client_id");$message = ['type' => 'login','client_id' => $client_id,'message' => 'user ' . $client_id . ' is login',];Gateway::sendToAll(json_encode($message));}# ...public static function onMessage($client_id, $data){// Gateway::sendToAll($data);$data = json_decode($data, true);$form_client = $client_id;$to_client = $data['client_id'];$message = $data['message'];$send_message = ['type' => 'message','message' => "user {$form_client} send {$message} to you",];if ($to_client) {// 如果有指定用戶,則發送給指定用戶Gateway::sendToClient($to_client, json_encode($send_message));} else {// 沒有指定用戶,發送給全部Gateway::sendToAll($data);}}
重啟worker服務,測試效果
workerman的官方文檔中明確指出不建議直接通過客戶端發送消息,而是通過原來的框架處理業務邏輯
與ThinkPHP等框架結合
總體原則:
現有mvc框架項目與GatewayWorker獨立部署互不干擾
所有的業務邏輯都由網站頁面post/get到mvc框架中完成
GatewayWorker不接受客戶端發來的數據,即GatewayWorker不處理任何業務邏輯,GatewayWorker僅僅當做一個單向的推送通道
僅當mvc框架需要向瀏覽器主動推送數據時才在mvc框架中調用Gateway的API(GatewayClient)完成推送
在TP框架中調用Gateway的API
workerman官方文檔建議使用GatewayClient
提供的API發送數據,這個需要額外安裝composer require workerman/gatewayclient
,使用方法在官方文檔中有說明,和使用gateway一樣。但在TP的實際測試中,無需安裝也可以正常使用,這里使用的是GatewayWorker\Lib\Gateway
,也不需要配置參數,可以直接使用。
TP處理業務邏輯的控制器
<?phpnamespace app\index\controller;use GatewayWorker\Lib\Gateway;
use think\Controller;class Index extends Controller
{public function index(){$client_id = $this->request->get('client_id');$send_message = $this->request->get('message');$message = ['type' => 'message','message' => $this->request->get('message'),];Gateway::sendToClient($client_id, json_encode($message));}
}
瀏覽器直接訪問或ajax訪問效果一致,運行結果
至此,TP5.1中使用think-worker調試基本通過,剩下的就是根據實際業務邏輯進行處理了。
總結說明
在官方文檔 與ThinkPHP等框架結合 的使用說明中和案例中發現,不需要在Events監聽文件中寫業務邏輯和判斷,所有的業務邏輯都可以在TP框架中完成,Events的作用僅僅是將client_id
告訴客戶端。
而在tink-worker原來的Events文件中,當客戶端連接時,就向當前客戶端發送過一條信息"Your client_id is 7f00000107d000000001"
,使用正則匹配就能拿到client_id,無需更改文件。
那么TP5.1的think-worker的使用可以簡化如下
- windows下修改gatewayworker的啟動方式,Linux無需更改(我也沒有測試)。
- php業務邏輯中使用
GatewayWorker\Lib\Gateway
調用gateway的API給客戶端發送消息。
所以不需要過多的更改gateway的配置文件,也不需要額外的建立監聽文件,就可以直接使用gateway了,當然windows環境下因為機制問題,所以更改了啟動方式。饒了一大圈回來,發現think-worker的使用方式是如此簡單,所以官方文檔是覺得太簡單了所以沒有給使用說明的必要么😓