之前在工作中使用 Laravel Octane 的 concurrently 處理并發時,發現在隊列和定時任務中不會觸發并發效果。經過分析,作了如下猜測:隊列和定時任務都屬于一個獨立的進程,與 Octane 服務無關,而 Octane concurrently 恰恰需要在 Octane 環境下才能運行。
后來通過代碼進行環境檢測和查看 php 的進程,證明猜想成立。
info('check env', ['served by octane' => isset($_SERVER['LARAVEL_OCTANE']) && ((int)$_SERVER['LARAVEL_OCTANE'] === 1),'on swoole server' => (extension_loaded('swoole') || extension_loaded('openswoole')) && app()->bound(Server::class)
]);
為了能夠在任意代碼中實現并發,我們研究參考了 Hyperf 框架關于協程的代碼,然后抽取了如下兩個類:
<?phpnamespace App\Services;use App\Exceptions\ParallelExecutionException;
use Laravel\Octane\Facades\Octane;
use Throwable;
use Swoole\Coroutine as Co;class Parallel
{protected array $callbacks = [];protected array $results = [];/*** @var Throwable[]*/protected array $throwables = [];public function add(callable $callable, $key = null): void{if (is_null($key)) {$this->callbacks[] = $callable;} else {$this->callbacks[$key] = $callable;}}public function wait(bool $throw = true): array{if (isset($_SERVER['LARAVEL_OCTANE']) && ((int)$_SERVER['LARAVEL_OCTANE'] === 1)) {return Octane::concurrently($this->callbacks, 300000);}app('log')->useLoggingLoopDetection(false);Co\run(function () {foreach ($this->callbacks as $key => $callback) {Co::create(function () use ($callback, $key) {try {$this->results[$key] = $callback();} catch (Throwable $throwable) {$this->throwables[$key] = $throwable;unset($this->results[$key]);}});}});if ($throw && ($throwableCount = count($this->throwables)) > 0) {$message = 'Detecting ' . $throwableCount . ' throwable occurred during parallel execution:' . PHP_EOL . $this->formatThrowAbles($this->throwables);$executionException = new ParallelExecutionException($message);$executionException->setResults($this->results);$executionException->setThrowAbles($this->throwables);unset($this->results, $this->throwables);throw $executionException;}app('log')->useLoggingLoopDetection(true);return $this->results;}private function formatThrowAbles(array $throwables): string{$output = '';foreach ($throwables as $key => $value) {$output .= sprintf('(%s) %s: %s' . PHP_EOL . '%s' . PHP_EOL, $key, get_class($value), $value->getMessage(), $value->getTraceAsString());}return $output;}
}
<?phpnamespace App\Exceptions;use RuntimeException;class ParallelExecutionException extends RuntimeException
{protected array $results = [];protected array $throwables = [];public function getResults(): array{return $this->results;}public function setResults(array $results): void{$this->results = $results;}public function getThrowAbles(): array{return $this->throwables;}public function setThrowAbles(array $throwables): array{return $this->throwables = $throwables;}
}
其中,第一個類的作用是檢測系統是否運行在 Octane 環境下,是則調用Octane concurrently,否則就執行 Swoole 協程代碼,使用起來也比較簡單:
$parallel = new Parallel();
$parallel->add(fn() => $this->analysisStructure(), 'structure');
$parallel->add(fn() => $this->analysisHabit(), 'habit');
['structure' => $structure,'habit' => $habit,
] = $parallel->wait();
之所以沒有完全使用 Swoole 協程,是因為相比之下,Octane 代碼更加優雅,我們在期待著某天更新后,Octane concurrently 也能直接在隊列中運行使用。
第二個類的作用比較簡單,就是對協程中異常的一個定義。
另外在分析過程中,我們也發現了一個比較有意思的事情:
如圖所示,當我在路由中運行檢測代碼時,Octane 和 Swoole Server 都為 true;在控制器中運行檢測代碼時,又只有 Octane 為true;為什么會有這樣的區分?我個人猜測是 Octane 在將框架代碼讀進內存時,特意跳過了控制器中的代碼,以避免數據更新不及時等問題。
有知道具體原因的小伙伴,歡迎留言探討。