前言
php cli 命令模式我想在日常開發中,大家用的都比較少。其實,在某些場景,cli命令真的很有作用,
我舉個例子
在mysql數據庫的某個表tab1中數據量有3000W+條數據,現在需要對這張表中的每一條數據做計算處理。將處理的結果放置在新表tab2上,因為數據量很大,我想大家不會采用php-fpm模式去處理這個吧,肯定還是要cli,并將執行時間設置為長時執行(set_time_limit(0))。經測試,單條數據處理耗時3s左右。那么3000W+ * 3 = 9000W 多秒的時間才能計算完全部的數據,這個時間是運營無法接受的。那么我們的想法肯定是單進程處理改為多進程處理。將計算量分攤到多個進程上處理達到優化的目的。 那么我們怎么來設計呢。
程序設計
封裝 - php - swoole_proccess 多進程模塊
<?phpclass ProcessPool{private $process;/*** Worker 進程數組* @var array*/private $process_list = [];/*** 正在被使用的進程* @var array*/private $process_use = [];/*** 最少進程數量* @var int*/private $min_worker_num = 3;/*** 最多進程數量* @var int*/private $max_worker_num = 6;/*** 當前進程數量* @var int*/private $current_num;/*** @var callable 閉包待執行函數*/public $callable;public function __construct($callable,$max_worker_num = 6){$this->callable = $callable;$this->max_worker_num = $max_worker_num;$this->process = new swoole_process(array($this, 'run'), false, 2);$this->process->start();swoole_process::wait();}public function run(){$this->current_num = $this->min_worker_num;//建立全部的worker進程for($i = 0; $i < $this->current_num; $i++){$process = new swoole_process(array($this, 'task_run'), false, 2);$pid = $process->start();$this->process_list[$pid] = $process;$this->process_use[$pid] = 0;}foreach($this->process_list as $process){swoole_event_add($process->pipe, function ($pipe) use ($process){$data = $process->read();
// var_dump($data . '空閑');//接收子進程處理完成的信息,而且重置為空閑$this->process_use[$data] = 0;});}//每秒定時向worker管道投遞任務swoole_timer_tick(1000 ,function ($timer_id){static $index = 0;$index = $index + 1;$flag = true; //是否新建workerforeach ($this->process_use as $pid => $used){if($used == 0){$flag = false;//標記為正在使用$this->process_use[$pid] = 1;// 在父進程內調用write,子進程能夠調用read接收此數據$this->process_list[$pid]->write($index);break;}}if($flag && $this->current_num < $this->max_worker_num){//沒有閑置worker,新建worker來處理$process = new swoole_process(array($this, 'task_run'), false, 2);$pid = $process->start();$this->process_list[$pid] = $process;$this->process_use[$pid] = 1;$this->process_list[$pid]->write($index);$this->current_num++;}
// var_dump('第' .$index. '個任務');if($index >= ($this->max_worker_num + 1)){foreach($this->process_list as $process){$process->write("exit");}swoole_timer_clear($timer_id);$this->process->exit();}});}/*** 子進程處理* @param $worker*/public function task_run($worker){swoole_event_add($worker->pipe, function($pipe) use($worker){$data = $worker->read();
// var_dump($worker->pid . ':' . $data);if($data == 'exit'){$worker->exit();exit;}//模擬耗時任務// sleep(5);call_user_func($this->callable,$worker,$data);//告訴主進程處理完成//在子進程內調用write,父進程能夠調用read接收此數據$worker->write($worker->pid);});}
}
對數據量進行分攤,交給不同的進程處理。比如進程1處理0-200W的數據,進程2處理200W-300W的數據,依次類推。并做好日志記錄,記錄哪些處理失敗了,原因是什么,方便后期針對性處理。
<?php$count = 30000000;//3000W 數據總量$secNum = 2000000; //200W 每個進程要處理的數據量$maxWorkNum = intval(ceil($count / $secNum));$processPool = new ProcessPool(function(\Swoole\Process $worker,$index) use($maxWorkNum,$secNum){//index 為進程編號從1開始計數;work為進程對象$start = (($index-1) * $secNum) + 1;$end = $index > $maxWorkNum ? 0 : $index * $secNum;//開始任務$companyJob = new CompanyRelationshipJob($start,$end,$index);$companyJob->run();},$maxWorkNum);
總結
當遇到數據量大的業務場景,我們需要考慮怎么將數據量均攤給多個進程處理,利用多核cpu的優勢加快數據的處理滿足業務需求。