前言
Swoole官方文檔:Swoole 文檔
Swoole 使 PHP 開發人員可以編寫高性能高并發的 TCP、UDP、Unix Socket、HTTP、 WebSocket 等服務,讓 PHP 不再局限于 Web 領域。Swoole4 協程的成熟將 PHP 帶入了前所未有的時期, 為性能的提升提供了獨一無二的可能性。Swoole 可以廣泛應用于互聯網、移動通信、云計算、 網絡游戲、物聯網(IOT)、車聯網、智能家居等領域。使用 PHP + Swoole 可以使企業 IT 研發團隊的效率大大提升,更加專注于開發創新產品。
Java 在運行前需要先編譯,而 PHP 則可以直接將文件丟到服務器上就能運行,這就是解釋執行與編譯執行的區別。Java這類的語言擁有固定、明確的變量類型,被稱為靜態語言;而 PHP 這類的語言則可以歸結為動態語言,特點是變量不用指定類型。
對于這兩種語言編譯運行方式來說,類似 Java 語言會將代碼一次加載到內存,效率明顯會提升不少,畢竟內存和硬盤的速度差距還是蠻大的。而且會一次性將很多初始對象,類模板文件加載,調用的時候不用重新再加載實例化,性能就會有更進一步的上升空間。但是,類似 Java 語言通常都是需要編譯成一個可執行的中間文件的,如果有代碼的更新,則必須重啟整個程序。
解釋執行語言優缺點很明顯就和編譯執行語言是反過來的了。解釋執行語言每一次運行一個腳本,就需要將所有相關的文件全部加載一次,而且如果沒別的優化的話(比如 OPcache),所有的相關文件都要從硬盤讀取、加載內存、實例化這些步驟中從頭走一遍。可想而知,他的效率和性能是完全無法與靜態語言相比的。但是,優點也很明確,隨時修改一個文件就可以隨時上線,線上業務不用中斷。
Swoole 是如何來解決效率性能問題的?它就是通過直接將代碼加載到內存的方式,就像 Java 一樣來啟動一個進程,實現 PHP 代碼的高性能執行。同時,盡量保持代碼還是可以按照傳統的方式來寫,為 PHP 提供了一個高性能的解決方案。?
安裝
初學者建議直接在寶塔上安裝PHP環境及Swoole拓展。注意:swoole程序只能在Linux上運行。
HTTP 服務器
創建http_server.php文件。
<?php$server = new swoole\Http\Server('0.0.0.0',1234);$server->set(['enable_coroutine'=>true]);// $server->on 以上的代碼塊在程序啟動時會執行一次,且后續請求不會再觸發,可以放上框架的初始化代碼,這樣就可以做到只在程序啟動時初始化。$server->on('Request',function($request,$response){ // 有請求進入會執行此代碼塊static $a = 1;$a++;list($controller,$action) = explode('/',trim($request->server['request_uri'],'/'));$response->header('Content-Tpye','text/html;charset=utf-8');$response->end("<h1>hello!Swoole.controller is {$controller},action is {$action},a is {$a}</h1>");return;});$server->start();?>
將文件上傳到服務器后,在目錄下執行 php?http_server.php 以啟動服務。
瀏覽器訪問 localhost:1234/index/test , 會得到以下輸出:
hello!Swoole.controller is index,action is test,a is 2
可以發現,在不斷請求接口下,$a 的值會一直累加(常駐內存),而不像之前傳統的PHP開發中,變量每次都會被釋放掉。
Coroutine 協程創建
<?php
$server = new swoole\Http\Server('0.0.0.0',1234);
$server->set(['enable_coroutine'=>true]);
$server->on('Request',function($request,$response){Coroutine::create(function(){Coroutine::sleep(2);var_dump('協程1');var_dump(time());});Coroutine::create(function(){Coroutine::sleep(2);var_dump('協程2');var_dump(time());});});
$server->start();?>
在服務器上執行?php?http_server.php 重啟服務,并訪問之前的地址,控制臺輸出如下。
[root@VM-12-13-centos swoole]# php server_test.php
string(7) "協程1"
int(1709349803)
string(7) "協程2"
int(1709349803)
可以發現,打印出來的時間并沒有相差兩秒,協程間是同步執行的,并不會進行阻塞。而在rpm模式代碼是從上往下同步執行的。
Websocket服務
創建 Websocket_test.php 文件,并上傳到服務器。
<?php// 初始化 WebSocket 服務器,在本地監聽 1234端口
$server = new Swoole\WebSocket\Server("localhost", 1234);// 建立連接時觸發
$server->on('open', function (Swoole\WebSocket\Server $server, $request) {echo "server: handshake success with fd{$request->fd}\n";
});// 收到消息時觸發推送
$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";$server->push($frame->fd, "from:{$frame->fd}:{$frame->data}");
});// 關閉 WebSocket 連接時觸發
$server->on('close', function ($ser, $fd) {echo "client {$fd} closed\n";
});// 啟動 WebSocket 服務器
$server->start();?>
?本地創建 websocket_client.html 。
<!DOCTYPE html>
<html>
<head><meta charset="UTF-8"><title>Chat Client</title>
</head>
<body>
<script>window.onload = function () {var nick = prompt("Enter your nickname");var input = document.getElementById("input");input.focus();// 初始化客戶端套接字并建立連接var socket = new WebSocket("ws://ip:1234");// 連接建立時觸發socket.onopen = function (event) {console.log("Connection open ..."); }// 接收到服務端推送時執行socket.onmessage = function (event) {var msg = event.data;var node = document.createTextNode(msg);var div = document.createElement("div");div.appendChild(node);document.body.insertBefore(div, input);input.scrollIntoView();};// 連接關閉時觸發socket.onclose = function (event) {console.log("Connection closed ..."); }input.onchange = function () {var msg = nick + ": " + input.value;// 將輸入框變更信息通過 send 方法發送到服務器socket.send(msg);input.value = "";};}
</script>
<input id="input" style="width: 100%;">
</body>
</html>
在?Websocket_test.php 所在目錄執行 php?Websocket_test.php 以啟動服務。本地打開websocket_client.html網頁。
輸入昵稱后,在輸入框發送消息。
控制臺輸出:
[root@VM-55-23-centos swoole]# php websocket_test.php
server: handshake success with fd1
receive from 1:Hola: hello,world~,opcode:1,fin:1
receive from 1:Hola: 你好,opcode:1,fin:1
執行異步任務 (Task)*
在 Server 程序中如果需要執行很耗時的操作,比如一個聊天服務器發送廣播,Web 服務器中發送郵件。如果直接去執行這些函數就會阻塞當前進程,導致服務器響應變慢。
Swoole 提供了異步任務處理的功能,可以投遞一個異步任務到 TaskWorker 進程池中執行,不影響當前請求的處理速度。
創建 task_test.php 文件,并上傳到服務器。
<?php$serv = new Swoole\Server('127.0.0.1', 1234);// 設置工作進程數量。
$serv->set(['work_num' =>2, // worker_num是用來處理請求邏輯的進程數'task_worker_num' => 2 // task_num是異步任務投遞進程,專門處理異步任務的執行,類似 fast-cgi
]);// 接收到數據時回調此函數,此回調函數在worker進程中執行。
$serv->on('Receive', function($serv, $fd, $reactor_id, $data) {//投遞異步任務$task_id = $serv->task($data);// 線程間通信,向其他工作進程發送消息$serv->sendMessage('hello task process',$serv->worker_id);
});// 當工作進程收到由 $server->sendMessage() 發送的消息時會觸發 onPipeMessage 事件。worker/task 進程都可能會觸發 onPipeMessage 事件
$serv->on('pipeMessage',function($serv,$src_worker_id,$data){echo "#{$serv->worker_id} message from #$src_worker_id: $data\n";
})// 處理異步任務(此回調函數在task進程中執行)。
$serv->on('Task', function ($serv, $task_id, $reactor_id, $data) {echo "New AsyncTask[id={$task_id}]";// 返回任務執行的結果$serv->finish("{$data} -> OK");
});//處理異步任務的結果(此回調函數在worker進程中執行)。
$serv->on('Finish', function ($serv, $task_id, $data) {echo "AsyncTask[{$task_id}] Finish: {$data}".PHP_EOL;
});$serv->start();?>
使用 php task_test.php 運行后,再另開窗口使用 telnet 127.0.0.1 1234?連接此 TCP 服務,并發送消息。
[root@VM-55-23-centos ~]# telnet 127.0.0.1 1234
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
hello,world // 發送的消息
回到swoole程序窗口,可以看到當服務接收到數據(onReceive)后,會向task投遞異步任務,在onTask中處理任務。
[root@VM-55-23-centos swoole]# php task_test.php
Dispatch AsyncTask: id=0
New AsyncTask[id=0]
AsyncTask[0] Finish: hello,world-> OK
調用?$serv->task()
?后,程序立即返回,繼續向下執行代碼。onTask 回調函數 Task 進程池內被異步執行。執行完成后調用?$serv->finish()
?返回結果。
更多事件參考:事件 | Swoole4 文檔
當我們使用?ps -aux | grep task_test 命令查看進程:
- swoole啟動的主進程是master進程負責全局管理,然后master進程會再fork一個manager進程。
- manager進程開始統一管理進程創建回收管理。
- manager進程根據設置的worker_num和task_worker_num來創建work進程和task進程。
因此啟動swoole我們能看到的進程數是:2+worker_num+task_worker_num,2中包含manager進程和master進程
毫秒定時器
創建 timer_test.php 文件,并上傳到服務器。
<?php
use Swoole\Coroutine;// 創建協程容器
Coroutine\run(function(){// 創建定時器,2000ms執行一次,一直執行Swoole\Timer::tick(2000,function(int $timer_id , $parma1 , $parma2){echo "timer by tick,timer id is #$timer_id , after 2000ms , parma1:$parma1 , parma2:$parma2,".PHP_EOL;// 在定時器中創建定時器,4000ms執行一次,一直執行Swoole\Timer::tick(4000,function(int $timer_id){echo "timer tick by timer tick,timer id is {$timer_id} , after 4000ms,".PHP_EOL;// 清除指定id的定時器Swoole\Timer::clear($timer_id);});},"A","B");// 創建定時器,3000ms執行一次,只會執行一次Swoole\Timer::after(3000,function(){echo "timer tick by after , after 3000ms,".PHP_EOL;});// 獲取定時器列表,循環輸出定時器信息foreach(Swoole\Timer::list() as $timer_id){var_dump("timer info:");var_dump(Swoole\Timer::info($timer_id));};// 清除所有定時器// Swoole\Timer::clearAll();});?>
執行 php?timer_server.php 以啟動服務。
輸出如下:
string(11) "timer info:"
array(5) {["exec_msec"]=>int(3000)["exec_count"]=>int(0)["interval"]=>int(0)["round"]=>int(0)["removed"]=>bool(false)
}
string(11) "timer info:"
array(5) {["exec_msec"]=>int(2000)["exec_count"]=>int(0)["interval"]=>int(2000)["round"]=>int(0)["removed"]=>bool(false)
}
timer by tick,timer id is #1 , after 2000ms , parma1:A , parma2:B,
timer tick by after , after 3000ms,
timer by tick,timer id is #1 , after 2000ms , parma1:A , parma2:B,
timer by tick,timer id is #1 , after 2000ms , parma1:A , parma2:B,
timer tick by timer tick,timer id is 3 , after 4000ms,
timer by tick,timer id is #1 , after 2000ms , parma1:A , parma2:B,
timer tick by timer tick,timer id is 4 , after 4000ms,
timer by tick,timer id is #1 , after 2000ms , parma1:A , parma2:B,
timer tick by timer tick,timer id is 5 , after 4000ms,
timer by tick,timer id is #1 , after 2000ms , parma1:A , parma2:B,
timer tick by timer tick,timer id is 6 , after 4000ms,
timer by tick,timer id is #1 , after 2000ms , parma1:A , parma2:B,
timer tick by timer tick,timer id is 7 , after 4000ms,
定時器常用方法:?
// 設置一個間隔時鐘定時器。
Swoole\Timer::tick(int $msec, callable $callback_function, ...$params): int// 在指定的時間后執行函數。Swoole\Timer::after 函數是一個一次性定時器,執行完成后就會銷毀。
Swoole\Timer::after(int $msec, callable $callback_function, ...$params): int// 使用定時器 ID 來刪除定時器。
Swoole\Timer::clear(int $timer_id): bool// 清除當前 Worker 進程內的所有定時器。
Swoole\Timer::clearAll(): bool// 返回 timer 的信息。
Swoole\Timer::info(int $timer_id): array// 返回定時器迭代器,可使用 foreach 遍歷當前 Worker 進程內所有 timer 的 id
Swoole\Timer::list(): Swoole\Timer\Iterator
高性能共享內存 Table
創建 timer_test.php 文件,并上傳到服務器。
<?php
// 創建內存表并設置表大小、表字段
$table = new Swoole\Table(256);
$table->column('id', Swoole\Table::TYPE_INT);
$table->column('name', Swoole\Table::TYPE_STRING, 64);
$table->create();$serv = new Swoole\Server('127.0.0.1', 1234);// 設置數據包分發策略(分發給Woker進程)為輪詢模式
$serv->set(['dispatch_mode' => 1]);
$serv->table = $table;// 接收到數據時觸發
$serv->on('receive', function ($serv, $id, $reactor_id, $data) {$cmd = explode(" ", trim($data));//getif ($cmd[0] == 'get'){if (count($cmd) < 2){$cmd[1] = $id;}$get_id = intval($cmd[1]);// 從內存表中獲取數據$info = $serv->table->get($get_id);$serv->send($id, var_export($info, true)."\n");}//setelseif ($cmd[0] == 'set'){// 往內存表中存放數據$ret = $serv->table->set($id, array('id' => $cmd[1], 'name' => $cmd[2]));if ($ret === false){$serv->send($id, "ERROR\n");}else{$serv->send($id, "OK\n");}}else{$serv->send($id, "command error.\n");}
});$serv->start();?>
使用 php task_test.php 運行后,再另開窗口使用 telnet 127.0.0.1234?連接此 TCP 服務,并發送數據。
[root@VM-55-23-centos ~]# telnet 127.0.0.1 1234
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
get 2
false
set 1 sam
OK
get 1
array ('id' => 1,'name' => 'sam',
)
當服務關閉時,內存表也會被釋放。
協程
協程創建的常規方式
創建 coroutine_test.php 文件,并上傳到服務器。
<?php
use Swoole\Coroutine;
use function Swoole\Coroutine\run;// 必須使用run創建協程容器,才能使用協程。框架能直接使用go是因為框架啟動時已創建了協程容器
run(function(){// 1.go()創建協程,開啟短命名可用(默認開啟)go(function(){// 使用協程中的Sleep才不會同步阻塞Coroutine::sleep(2);echo 'this is a coroutine by go'.PHP_EOL;});// 2.Coroutine::create() 原生創建協程Coroutine::create(function(){Coroutine::sleep(2);echo 'this is a coroutine by Coroutine::create'.PHP_EOL;});echo 'first-'.PHP_EOL;
});// 當執行完協程容器的代碼塊才會執行到這
echo 'end-'.PHP_EOL;?>
執行 php?coroutine_test.php 以啟動服務,得到以下輸出:
first-
this is a coroutine by go
this is a coroutine by Coroutine::create
end-
協程的創建方式:
1.go();
2.Coroutine::create();
協程間是同步執行的,并不會進行阻塞。而在rpm模式代碼是從上往下同步執行的。
并發執行協程
并發執行多個協程,并且通過數組,返回這些協程方法的返回值。
<?php
use Swoole\Coroutine;
use function Swoole\Coroutine\run;Coroutine\run(function(){// 并發執行多個協程,并且通過數組,返回這些協程方法的返回值。$result = Coroutine\batch(['name' => function(){Coroutine\System::sleep(2);return 'Hola'; // 返回結果},'area' => function(){Coroutine\System::sleep(2);return 'China'; // 返回結果},]);var_dump($result);
});?>
輸出:
array(2) {["name"]=>string(4) "Hola"["area"]=>string(5) "China"
}
協程間通信?
Channel
通道,用于協程間通訊,支持多生產者協程和多消費者協程。底層自動實現了協程的切換和調度。
1.通道與?PHP?的?Array?類似,僅占用內存,沒有其他額外的資源申請,所有操作均為內存操作,無?IO?消耗
2.底層使用?PHP?引用計數實現,無內存拷貝。即使是傳遞巨大字符串或數組也不會產生額外性能消耗
3.channel?基于引用計數實現,是零拷貝的
創建 timer_test.php 文件,并上傳到服務器。
<?php
use Swoole\Coroutine;
use function Swoole\Coroutine\run;
use Swoole\Coroutine\Channel;run(function(){// 創建channel(相當于一個隊列)$channel = new Channel(1);// 每隔1s向channel中寫入一條數據,寫五次go(function() use ($channel) {for($i = 0; $i < 5; $i++){Coroutine::sleep(1);//向通道中寫入數據$channel->push(['rand' => rand(1000,9999) , 'index' => $i ]);}});// 一直輪詢管道,有數據則輸出Coroutine::create(function() use ($channel) {while(1){// 從通道中讀取數據$data = $channel->pop(1);if($data){var_dump($data);}else{var_dump($channel->errCode === SWOOLE_CHANNEL_TIMEOUT);break;}}});});?>
執行 php?channel_test.php 以啟動服務,得到以下輸出:
array(2) {["rand"]=>int(5746)["index"]=>int(0)
}
array(2) {["rand"]=>int(8235)["index"]=>int(1)
}
array(2) {["rand"]=>int(2584)["index"]=>int(2)
}
array(2) {["rand"]=>int(6474)["index"]=>int(3)
}
array(2) {["rand"]=>int(6893)["index"]=>int(4)
}
array(2) {["rand"]=>int(3986)["index"]=>int(5)
}
WaitGroup
創建 waitGroup_test.php 文件,并上傳到服務器。
<?php
use Swoole\Coroutine;
use Swoole\Coroutine\WaitGroup;
use Swoole\Coroutine\Http\Client;
use function Swoole\Coroutine\run;run(function () {$wg = new WaitGroup();$result = [];$wg->add();//啟動第一個協程Coroutine::create(function () use ($wg, &$result) {Coroutine::sleep(2);$result[] = 123;$wg->done();});$wg->add();//啟動第二個協程Coroutine::create(function () use ($wg, &$result) {Coroutine::sleep(2);$result[] = 321;$wg->done();});//掛起當前協程,等待所有任務完成后恢復$wg->wait();//這里 $result 包含了 2 個任務執行結果var_dump($result);
});?>
執行 php wiatGroup_test.php 以啟動服務,得到以下輸出:
array(2) {[0]=>int(123)[1]=>int(321)
}
可以看到,輸出內容包含了兩個協程的結果。
1.add 方法增加計數
2.done 表示任務已完成
3.wait 等待所有任務完成恢復當前協程的執行
4.WaitGroup 對象可以復用,add、done、wait 之后可以再次使用
Barrier?
在?Swoole Library?中底層提供了一個更便捷的協程并發管理工具:
Coroutine\Barrier
?協程屏障,或者叫協程柵欄。基于?PHP
?引用計數和?Coroutine API
?實現。相比于?Coroutine\WaitGroup,
Coroutine\Barrier
?使用更簡單一些,只需通過參數傳遞或者閉包的?use
?語法,引入子協程函數上即可。
創建 barrier_test.php 文件,并上傳到服務器。
<?php
use Swoole\Coroutine\Barrier;
use function Swoole\Coroutine\run;
use Swoole\Coroutine;run(function () {// 創建一個新的協程屏障$barrier = Barrier::make();$count = 0;Coroutine::create(function () use ($barrier, &$count) {Coroutine::sleep(0.5);$count++;});Coroutine::create(function () use ($barrier, &$count) {Coroutine::sleep(1);$count++;});// 會自動掛起當前協程,等待引用該協程屏障的子協程退出Barrier::wait($barrier);var_dump($count);
});?>
執行 php barrier_test.php 以啟動服務,得到以下輸出:
int(2)
協程內異常處理
try/catch
在協程編程中可直接使用 try/catch?處理異常。但必須在協程內捕獲,不得跨協程捕獲異常。
不僅是應用層 throw 的 Exception,底層的一些錯誤也是可以被捕獲的,如 function、class、method 不存在
<?php
use function Swoole\Coroutine\run;
use Swoole\Coroutine;run(function(){Coroutine::create(function () use ($barrier, &$count) {try{// 不存在的方法xxx();}catch(\Throwable $e){var_dump($e->getMessage());}});// 協程1的錯誤并不會影響協程2Coroutine::create(function () use ($barrier, &$count) {Coroutine::sleep(2);echo 'go go go';});});?>
?輸出:
string(32) "Call to undefined function xxx()"
go go go
錯誤必須捕獲。
register_shutdown_function
由于 Swoole 是常駐內存的,所以禁止在 Swoole 中使用?exit/die,會導致當前工作的 Worker 進程、Task 進程、User 進程、以及 Swoole\Process 進程會立即退出。
Server 運行期一旦發生致命錯誤,那客戶端連接將無法得到回應。如 Web 服務器,如果有致命錯誤應當向客戶端發送 HTTP 500 錯誤信息。
在 PHP 中可以通過 register_shutdown_function + error_get_last 2 個函數來捕獲致命錯誤,并將錯誤信息發送給客戶端連接。
致命錯誤捕獲示例代碼:
<?php$http = new Swoole\Http\Server('127.0.0.1', 1234);
$http->on('request', function ($request, $response) {register_shutdown_function(function () use ($response) {$error = error_get_last();var_dump($error);switch ($error['type'] ?? null) {case E_ERROR :case E_PARSE :case E_CORE_ERROR :case E_COMPILE_ERROR :// log or send:// error_log($message);// $server->send($fd, $error['message']);$response->status(500);$response->end($error['message']);break;}});exit(0);
});
$http->start();?>
協程調度?
用戶的每個請求都會創建一個協程,請求結束后協程結束,如果同時有成千上萬的并發請求,某一時刻某個進程內部會存在成千上萬的協程,那么 CPU 資源是有限的,到底執行哪個協程的代碼?
決定到底讓 CPU 執行哪個協程的代碼的決斷過程就是協程調度
,Swoole
?的調度策略又是怎么樣的呢?
-
首先,在執行某個協程代碼的過程中發現這行代碼遇到了?Co::sleep()?或者產生了網絡?IO,例如?MySQL->query(),這肯定是一個耗時的過程,Swoole?就會把這個 MySQL 連接的 Fd 放到?EventLoop?中。
- 然后讓出這個協程的 CPU 給其他協程使用:即?yield(掛起)
- 等待 MySQL 數據返回后再繼續執行這個協程:即?resume(恢復)
-
其次,如果協程的代碼有 CPU 密集型代碼,可以開啟?enable_preemptive_scheduler,Swoole 會強行讓這個協程讓出 CPU。
協程調度創建協程
創建文件 coroutine_scheduler_test.php 文件,并上傳到服務器。
<?php
use Swoole\Coroutine;$scheduler = new Coroutine\Scheduler();
// 設置可創建的最大協程數為200
$scheduler->set(['max_coroutine'=>200]);// 添加任務。等待調用 start 方法時,一起啟動并執行。
$scheduler->add(function($a,$b){Coroutine\System::sleep(1);var_dump($a);var_dump(time());var_dump($b);var_dump(time());
},'aaa','bbb');// 添加任務。等待調用 start 方法時,一起啟動并執行。
$scheduler->add(function($c){Coroutine\System::sleep(1);var_dump($c);var_dump(time());
},'ccc');// 創建并行協程。在 start 時會同時啟動 $num 個 $fn 協程,并行地執行。
$scheduler->parallel(2,function($c){Coroutine\System::sleep(1);echo "$c cid is ".Coroutine::getCid().'\n';var_dump(time());
},'ccc');var_dump('start...');
// 啟動程序。遍歷 add 和 parallel 方法添加的協程任務,并執行。
$scheduler->start();
var_dump('end...');
執行 php coroutine_scheduler_test.php 以啟動服務,得到以下輸出:
string(8) "start..."
string(3) "aaa"
int(1709353338)
string(3) "bbb"
int(1709353338)
ccc cid is 4
int(1709353338)
ccc cid is 3
int(1709353338)
string(3) "ccc"
int(1709353338)
string(6) "end..."
其中:
1.$scheduler->add(); // 向協程調度中添加一個任務
2.$scheduler->parallel(); // 創建并行協程。在 start 時會同時啟動 $num 個 $fn 協程,并行地執行。
3.$scheduler->start(); // 遍歷 add 和? parallel?方法添加的協程任務,并執行。
同時可以基于?$scheduler->parallel()?可以做到創建N個協程執行相同的任務:
<?php
use Swoole\Coroutine;
use function Swoole\Coroutine\run;Coroutine\run(function(){$result = [];Coroutine\parallel(3,function() use (&$result){Coroutine\System::sleep(2);$result[] = 333;});var_dump($result);});
輸出:
array(3) {[0]=>int(333)[1]=>int(333)[2]=>int(333)
}
協程的掛起和恢復
代碼示例:
<?php
use Swoole\Coroutine;function test(){var_dump('this is a test function');
}// 創建協程容器
Coroutine\run(function(){// 協程創建后返回協程id$cid1 = Coroutine::create(function(){echo "co 1 start\n";// 手動讓出當前協程的執行權。Coroutine::yield();echo "co 1 end\n";}); // 協程創建后返回協程id$cid2 = Coroutine::create(function(){test();}); var_dump('--------');Coroutine::create(function() use ($cid1,$cid2){var_dump("co 2 start\n");Coroutine::sleep(1);// 喚醒當前協程Coroutine::resume($cid1);echo "co 2 end\n";}); // 會在協程關閉之前 (即協程函數執行完畢時) 進行調用Cortoutine::defer(function(){var_dump('coroutine end');});var_dump('=======');$coList = Coroutine::listCoroutines();foreach($coList as $cid){var_dump(Coroutine::getBackTrace($cid));}var_dump('++++++++');
});?>
?啟動服務,會得到以下輸出:
co 1 start
string(23) "this is a test function"
string(8) "--------"
string(11) "co 2 start
"
string(7) "======="
array(1) {[0]=>array(6) {["line"]=>int(58)["function"]=>string(5) "sleep"["class"]=>string(16) "Swoole\Coroutine"["type"]=>string(2) "::"["args"]=>array(1) {[0]=>int(1)}}
}
array(1) {[0]=>array(6) {["line"]=>int(46)["function"]=>string(5) "yield"["class"]=>string(16) "Swoole\Coroutine"["type"]=>string(2) "::"["args"]=>array(0) {}}
}
array(1) {[0]=>array(6) {["line"]=>int(68)["function"]=>string(12) "getBackTrace"["class"]=>string(16) "Swoole\Coroutine"["type"]=>string(2) "::"["args"]=>array(1) {[0]=>int(1)}}
}
string(8) "++++++++"
co 1 end
co 2 end
yield():手動讓出當前協程的執行權。而不是基于 IO 的協程調度。
必須與 Coroutine::resume() 方法成對使用。該協程 yield 以后,必須由其他外部協程 resume,否則將會造成協程泄漏,被掛起的協程永遠不會執行。
?resume():手動恢復某個協程,使其繼續運行,不是基于 IO 的協程調度。當前協程處于掛起狀態時,另外的協程中可以使用 resume 再次喚醒當前協程
協程系統雜項函數
<?php
use Swoole\Coroutine;Coroutine\run(function(){Coroutine::create(function(){// 執行一條 shell 指令。底層自動進行協程調度。$ret = Coroutine\System::exec('mkdir test_dir');var_dump($ret);});Coroutine::create(function(){// 將域名解析為 IP。基于同步的線程池模擬實現,底層自動進行協程調度。$ret = Coroutine\system::gethostbyname("www.baidu.com", AF_INET);var_dump($ret);});Coroutine::create(function(){// 進行 DNS 解析,查詢域名對應的 IP 地址。$ret = Coroutine\System::getaddrinfo("www.baidu.com");var_dump($ret);});Coroutine::create(function(){// 域名地址查詢。$ret = Coroutine\System::dnsLookup("www.baidu.com");var_dump($ret);});
});?>
進程池及進程間通信
進程池,基于 Swoole\Server 的 Manager 管理進程模塊實現。可管理多個工作進程。該模塊的核心功能為進程管理,相比 Process 實現多進程,Process\Pool 更加簡單,封裝層次更高。
創建進程池及進程間通信
創建文件 process_pool_test.php,并上傳服務器:
<?php
use Swoole\Process;
use Swoole\Coroutine;
use Swoole\Process\Pool;
/*** 創建進程池參數:* 1.設置工作進程數量* 2.設置通信方式* 3.當通信方式為隊列(SWOOLE_IPC_MSGQUEUE)時,需要設置隊列的key* 4.是否開啟協程*/
$pool = new Swoole\Process\Pool(2,SWOOLE_IPC_UNIXSOCK,0, true);// 子進程啟動,自動創建協程容器及協程
$pool->on('workerstart', function(Swoole\Process\Pool $pool, int $workerId){var_dump($workerId);// 獲取當前工作進程對象。返回 Swoole\Process 對象。$process = $pool->getProcess(0);// 導出socket對象,實現 Worker 進程間通信$socket = $process->exportsocket();if($workerId ==0){var_dump('000');// recv() 暫時掛起,等待數據到來恢復echo $socket->recv();// 向socket發送數據$socket->send("hello proc1\n");echo "proce stop\n";} else {var_dump('111');$socket->send("hello proc0\n");// recv() 暫時掛起,等待數據到來恢復echo $socket->recv();// 向socket發送數據echo "proc1 stop\n";// 不關閉pool的情況下,會有兩個協程循環收發消息$pool->shutdown();}
});$pool->start();?>
啟動服務,輸出如下:
int(0)
string(3) "000"
int(1)
string(3) "111"
hello proc0
proce stop
hello proc1
proc1 stop
Swoole\Process\Pool 一共提供了三種進程間通信的方式:
1.消息隊列:SWOOLE_IPC_MSGQUEUE,需設置隊列key。
2.Socket 通信:SWOOLE_IPC_SOCKET,如果客戶端與服務端不在同一服務器可使用該方式。
3.UnixSocket:SWOOLE_IPC_UNIXSOCKET,推薦
進程管理器
進程管理器,基于?Process\Pool? 實現。可以管理多個進程。相比與 Process\Pool ,可以非常方便的創建多個執行不同任務的進程,并且可以控制每一個進程是否要處于協程環境。
use Swoole\Process\Manager;
use Swoole\Process\Pool;$pm = new Manager();for ($i = 0; $i < 2; $i++) {// 增加一個工作進程$pm->add(function (Pool $pool, int $workerId) {});
}$pm->start();
子進程的創建以及回收
例如我們啟動的一個服務就可以理解為一個進程,當服務結束也意味著進程結束。而在主進程所另外創建的進程就被稱為“子進程”。
而當主進程結束時,子進程還未結束(回收),子進程則會變成僵尸進程,所以主進程結束前需保證子進程全部結束。
<?php
use Swoole\Process;// 循環創建三個子進程
for ($n = 1; $n <= 3; $n++) {// 創建子進程$process = new Process(function () use ($n) {echo 'Child #' . getmypid() . " start and sleep {$n}s" . PHP_EOL;sleep($n);echo 'Child #' . getmypid() . ' exit' . PHP_EOL;});$process->start();
}// 主進程必須等待回收子進程,否則會讓子進程變成僵尸進程// 阻塞等待,會阻塞主進程,等待子進程結束
for ($n = 3; $n--;) {$status = Process::wait(true); // 等待阻塞echo "Recycled #{$status['pid']}, code={$status['code']}, signal={$status['signal']}" . PHP_EOL;
}?>
結果:
Child #18130 start and sleep 1s
Child #18131 start and sleep 2s
Child #18132 start and sleep 3s
Child #18130 exit
Recycled #18130, code=0, signal=0
Child #18131 exit
Recycled #18131, code=0, signal=0
Child #18132 exit
Recycled #18132, code=0, signal=0
回收子進程也可以通過另一個方式:
<?php
use Swoole\Process;
use Swoole\Coroutine;
use function Swoole\Coroutine\run;// 循環創建三個子進程
for ($n = 1; $n <= 3; $n++) {// 創建子進程$process = new Process(function () use ($n) {echo 'Child #' . getmypid() . " start and sleep {$n}s" . PHP_EOL;sleep($n);echo 'Child #' . getmypid() . ' exit' . PHP_EOL;});$process->start();
}Coroutine\run(functiom(){while($ret = Swoole\Coroutine\System::wait(5)){ // 等待子進程全部退出echo "子進程結束:子進程為{$ret['pid']}".PHP_EOL;}
});?>
結果:
Child #20899 start and sleep 1s
Child #20900 start and sleep 2s
Child #20901 start and sleep 3s
Child #20899 exit
子進程結束:子進程為20899
Child #20900 exit
子進程結束:子進程為20900
Child #20901 exit
子進程結束:子進程為20901
示例:創建子進程,完成父子通信并監聽子進程退出后對其進行回收。
<?php
use Swoole\Process;
use Swoole\Coroutine;
use Swoole\Timer;
use function Swoole\Coroutine\run;// 創建子進程,并每隔2s向主進程發送消息
$process = new Process(function($proc){Timer::tick(2000,function() use ($proc){$socket = $proc->exportSocket(); $socket->send("hello master,this is child , send 2000ms");var_dump($socket->recv());});
},false,1,true);// 啟動子進程
$process->start();// 主進程創建協程
Coroutine\run(function() use ($process){// 非阻塞監聽子進程退出,監聽到后解除監聽、并清除所有定時器Process::signal(SIGCHLD,function(){while($ret = Process::wait(false)){Process::signal(SIGCHLD , null); // 解除監聽Timer::clearAll(); // 清除所有定時器,包括父進程}});$socket = $process->exportsocket(); // 主進程每隔一秒向子進程發送消息Timer::tick(1000,function() use ($socket){echo "this is parent tick".PHP_EOL;$socket->send("hello child,this is master, send 1000ms");});$count = 2;while($count){$count--;var_dump($socket->recv());if($count == 0){// 殺死子進程Process::kill($process->pid);}};});?>
結果:
this is parent tick
this is parent tick
string(78) "hello child,this is master, send 1000mshello child,this is master, send 1000ms"
string(40) "hello master,this is child , send 2000ms"
this is parent tick
this is parent tick
string(78) "string(40) "hello child,this is master, send 1000mshello child,this is master, send 1000mshello master,this is child , send 2000ms"
鎖
進程間鎖 Lock
PHP 代碼中可以很方便地創建一個鎖,用來實現數據同步。Lock 類支持 5 種鎖的類型
鎖類型 | 說明 |
---|---|
SWOOLE_MUTEX | 互斥鎖 |
SWOOLE_RWLOCK | 讀寫鎖 |
SWOOLE_SPINLOCK | 自旋鎖 |
SWOOLE_FILELOCK | 文件鎖 (廢棄) |
SWOOLE_SEM | 信號量 (廢棄) |
進程間鎖示例:
<?php$lock = new Swoole\Lock(SWOOLE_MUTEX);
echo "[Master]create lock\n";
$lock->lock();
if (pcntl_fork() > 0)
{sleep(1);$lock->unlock();
}
else
{echo "[Child] Wait Lock\n";$lock->lock();echo "[Child] Get Lock\n";$lock->unlock();exit("[Child] exit\n");
}
echo "[Master]release lock\n";
unset($lock);
sleep(1);
echo "[Master]exit\n";?>
?輸出結果:
[Master]create lock
[Child] Wait Lock
[Master]release lock
[Child] Get Lock
[Child] exit
[Master]exit
無法在協程中石油鎖。?
進程間無鎖計數器 Atomic
Atomic 是 Swoole 底層提供的原子計數操作類,可以方便整數的無鎖原子增減。
當有并發請求對計數器進行操作時,Atomic會自帶鎖。
<?php$atomic = new Swoole\Atomic();$serv = new Swoole\Server('127.0.0.1', '1234');$serv->set(['worker_num' => 1,'log_file' => '/dev/null'
]);
$serv->atomic = $atomic;
$serv->on("receive", function ($serv, $fd, $reactor_id, $data) {$cmd = explode(" ", trim($data));$cmd[1] = $cmd[1] ?? 1;if($cmd[0] == 'add'){$serv->atomic->add($cmd[1]);$serv->send($fd,'add ok,now num is '.$serv->atomic->get().PHP_EOL);}else if($cmd[0] == 'sub'){$serv->atomic->sub($cmd[1]);$serv->send($fd,'sub ok,now num is '.$serv->atomic->get().PHP_EOL);}else{$serv->send($fd,"unkown command {$cmd[0]}".PHP_EOL);}
});$serv->start();?>
啟動發我,再另開窗口使用 telnet 127.0.0.1 1234?連接此 TCP 服務,并發送消息。
[root@VM-55-23-centos ~]# telnet 127.0.0.1 1234
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
add 1
add ok,now num is 1
add 10
add ok,now num is 11
sub 5
sub ok,now num is 6
協程實際IO操作
在之前的示例中,大部分是利用 Couroutine::sleep 來模擬 IO 操作。接下來使用協程進行文件寫入、數據庫、網絡請求操作。
<?php
use function Swoole\Coroutine\run;
use Swoole\Coroutine;// 設置協程化類型
Coroutine::set(['hook_flags' => ' SWOOLE_HOOK_TPC | SWOOLE_HOOK_FILE | SWOOLE_HOOK_CURL ',
]);run(function(){var_dump('====first====');Coroutine::Create(function(){$fp = fopen('./test_dir/test.log','a+');fwrite($fp,str_repeat('A',1024));fwrite($fp,str_repeat('B',1024));var_dump('wirte file success');});var_dump('====2====');Coroutine::Create(function(){$mysqli = @new mysqli('127.0.0.1','root','123456','db_name',3306);if($mysqli->connect_errno != 0){var_dump('數據庫連接失敗:'.$mysqli->connect_errno.'--'.$mysqli->connect_error);return;}$mysqli->query("set name 'utf8'");$res = $mysqli->query('select * from user');if($res){while($row = $res->fetch_assoc()){echo json_encode($row) . "\n";}}$res->free();var_dump('read mysql success');});var_dump('====3====');Coroutine::Create(function(){$ch = curl_init();curl_setopt($ch , CURLOPT_URL , 'http://www.baidu.com/');curl_setopt($ch , CURLOPT_HEADER , false);curl_setopt($ch , CURLOPT_RETURNTRANSFER , 1);$result = curl_exec($ch);curl_close($ch);var_dump('curl success');});var_dump('====end====');});?>
輸出結果:
string(13) "====first===="
string(18) "wirte file success"
string(9) "====2===="
{"id":"1","name":"Ho","age":"1"}
{"id":"2","name":"La","age":"2"}
string(18) "read mysql success"
string(9) "====3===="
string(12) "curl success"
string(11) "====end===="
在協程的使用中,由于協程間是同時進行的,且任務的執行是由CPU進行調度的,任務的執行順序無法保證。
實現MySQL連接對象單例模式
新建文件 mysql/pool.php ,用于連接池相關初始化工作。
<?php
use Swoole\Coroutine;
use function Swoole\Coroutine\run;
use Swoole\Coroutine\Channel;class Pool
{// 連接池,用于存放連接private $pool = null;// 連接配置信息private static $config;// 單例模式private static $instance;// 唯一公開的方法,用于獲取單例public static function getInstance(array $config){if(empty(self::$instance)){if(empty($config)){throw new RuntimeException('Config is empty.');}else{self::$config = $config;}self::$instance = new static($config);}return self::$instance;}// 初始化連接池private function __construct($config){if(empty($this->$pool)){// 一鍵協程化Coroutine::set(['hook_flags' => 'SWOOLE_HOOK_TPC' ]);run(function() use ($config){$this->pool = new Channel($config['pool_size']);for($i = 0 ; $i < $config['pool_size'] ; $i++){go(function() use ($config){try{// 獲取連接對象,放入連接池中$mysqli = @new mysqli($config['host'],$config['username'],$config['password'],$config['db_name'],$config['port']);$this->pool->push($mysqli,$config['time_out']);}catch(Throwable $e){var_dump($e);throw new RuntimeException('MySQL connet error:'.$mysqli->errow , $mysqli->errno);}});}});}}// 從連接池獲取MySQL連接public function getDBManager(){go(function(){if($this->pool->length() > 0){$this->$mysqli = $this->pool->pop(self::$config['time_out']);if($this->$mysqli === false){throw new RuntimeException('get MySQL failed.');}}else{throw new RuntimeException('MySQL pool is empty.');}});return $this->$mysqli;}// 獲取當前連接池中的剩余連接數public function getPoolSize(){return $this->pool->length();}// 向連接池歸還連接public function push($mysqli){$this->pool->push($mysqli , $config['time_out']);}// 防止被克隆private function _clone(){}}?>
新建文件 pool_test.php。
<?php
use Swoole\Coroutine;
use Swoole\Event;
var_dump('start...');// 定義數據庫連接信息
$config = ['pool_size'=> 5,'host' => '127.0.0.1','username' => 'root','password' => '123456','db_name' => 'test','port' => '3306','time_out' => 1
];
include('./mysql/Pool.php');// 獲取連接池實例
$mysqlPool = Pool::getInstance($config);
var_dump('當前連接池內剩余連接數:'.$mysqlPool->getPoolSize());$lock = new Swoole\Lock(SWOOLE_MUTEX);// 獲取連接前加鎖,防止破壞單例
$lock->lock();// 從連接池中獲取一個連接
$mysqli = $mysqlPool->getDBManager();// 獲取連接成功后解鎖
$lock->unlock();var_dump('當前連接池內剩余連接數:'.$mysqlPool->getPoolSize());// 創建協程
Coroutine::Create(function() use ($mysqli , $mysqlPool){$list = $mysqli->query('select * from user');if($list){var_dump('查詢結果:');while($row = $list->fetch_assoc()){echo json_encode($row) . "\n";}}// 釋放變量$list->free();// 協程結束前調用該方法Coroutine::defer(function() use ($mysqli , $mysqlPool){var_dump('歸還連接...');// 將連接對象push進連接池$mysqlPool->push($mysqli);var_dump('當前連接池內剩余連接數:'.$mysqlPool->getPoolSize());});});var_dump('end...');
Event::wait();
?>
輸出結果:
string(8) "start..."
string(35) "當前連接池內剩余連接數:5"
string(35) "當前連接池內剩余連接數:4"
string(13) "查詢結果:"
{"id":"1","name":"Ho","age":"1"}
{"id":"2","name":"La","age":"2"}
string(15) "歸還連接..."
string(35) "當前連接池內剩余連接數:5"
string(6) "end..."