面向管理后臺的系統中,經常會有文件導入的需求。常規的做法就是同步等待,但在業務關系復雜(多表數據校驗)、數據量較大的情況下,管理人員只能等結果,也可能會等到超時。
使用異步的話,將導入數據的功能與后端接口解耦,避免接口超時導致的任務中止,也無需前端只為了拿個結果一直保持連接等待。
前端在上傳文件后,后端接口將導入任務推送(MQ、管道...)出去,然后直接返回前端。導入服務接到任務執行導入,并根據需求將實時導入狀態維護到緩存中。前端查詢/輪詢后端從緩存取出當前導入狀態。
流程圖如下:
異步導入.png
簡單的PHP + Swoole后端代碼示例(實際就兩個接口方法upFile、importStatus,和Task的導入處理):
/**
* Created by PhpStorm.
* User: wen
* Date: 2018/12/8
* Time: 11:09 PM
*/
require 'vendor/autoload.php';
use Swoole\Http\Server;
const BASE_DIR = __DIR__;
// 路由定義
$router = [
'GET' => [
'/importStatus' => 'importStatus' // 查詢導入狀態
],
'POST' => [
'/upFile' => 'upFile' // 上傳導入文件
]
];
// ----SWOOLE-HTTP服務設置部分
$http = new Server("127.0.0.1", 9501);
$http->set([
'worker_num' => 2,
'task_worker_num' => 4,
]);
$http->on('request', function ($request, $response) use ($router, $http) {
$funName = $router[$request->server['request_method']][$request->server['request_uri']] ?? 'NotFound';
if (!function_exists($funName)){
return backJson($response, null, 404, 'ROUTER NOT FOUND');
}
try{
$funName($request, $response, $http);
}catch (Exception $e){
return backJson($response, null, 500, $e->getMessage());
}
});
// 實際導入操作
$http->on('Task', function (swoole_server $serv, $task_id, $from_id, $data) {
$redis = getNewRedis();
$status = [
'step' => 1, // 文件準備處理
'progressRate' => '',
'info' => [],
];
$redis->set($data, json_encode($status, JSON_UNESCAPED_UNICODE));
// 讀取文件 使用了PhpOffice\PhpSpreadsheet解析EXCEL
$spreadsheet = \PhpOffice\PhpSpreadsheet\IOFactory::load(BASE_DIR . '/' . $data);
$sheetData = $spreadsheet->getActiveSheet()->toArray(null, true, true, true);
$count = count($sheetData);
$status['step'] = 2; // 文件解析完成
$status['progressRate'] = "解析到文件數據{$count}條";
$redis->set($data, json_encode($status, JSON_UNESCAPED_UNICODE));
$names = [];
var_dump($sheetData);
foreach ($sheetData as $k => $item){
if (1==$k) continue; // 第一行為表頭
if (empty($item['A'])) {
unset($sheetData[$k]);
$status['info'][] = "第{$k}行姓名為空";
continue;
}
$names[] = $item['A'];
}
$redis->set($data, json_encode($status, JSON_UNESCAPED_UNICODE));
// TODO: 驗證數據庫name是否已存在 插入等業務處理...(此處代碼省略)
// TODO: 將進度維護到redis
});
$http->on('Finish', function () {});
// ----基礎函數部分
function getNewRedis(){
($redis = new \Redis())->connect('127.0.0.1');
return $redis;
}
function backJson($response, $content, $statusCode=200, $msg=''){
$response->header('Content-Type', 'application/json');
$jsonData = [
'statusCode' => $statusCode,
'content' => $content,
'msg' => $msg,
];
$response->end(json_encode($jsonData, JSON_UNESCAPED_UNICODE));
return true;
}
function NotFound($request, $response){
return backJson($response, null, 404, 'ROUTER NOT FOUND');
}
// ----接口方法
// 上傳文件
function upFile($request, $response, $server){
$file = $request->files['file'] ?? null;
if (empty($file)) { throw new Exception('未收到上傳文件'); }
$importSN = md5($file['tmp_name'] . time()) . '.' . pathinfo($file['name'])['extension'];
$bol = move_uploaded_file($file['tmp_name'], BASE_DIR . '/' . $importSN);
if (false === $bol) { throw new Exception('文件處理異常'); }
$status = [
'step' => '0',
'progressRate' => '',
'info' => [],
];
getNewRedis()->set($importSN, json_encode($status, JSON_UNESCAPED_UNICODE));
$server->task($importSN);
return backJson($response, ['importSN'=>$importSN]);
}
// 查詢導入狀態
function importStatus($request, $response){
$importSN = $request->get['importSN'] ?? null;
if (!$importSN){ throw new Exception('導入任務編號不正確'); }
$redis = getNewRedis();
$content = $redis->get($importSN);
if (!$content){ throw new Exception('未查詢到任務'); }
return backJson($response, json_decode($content));
}
$http->start();
PostMan訪問示例:
屏幕快照 2018-12-09 上午4.06.17.png
屏幕快照 2018-12-09 上午4.04.16.png
屏幕快照 2018-12-09 上午4.04.50.png
這里主要是任務投遞的渠道,如Channel、MQ服務、Unix Socket等。
Channel:最簡單好用,同服務進程內通信,進程掛了就都gg
MQ服務:獨立服務,簡單通用,可以多服務器,可靠性高
Unix Socket:單服務器內進程間通信,偏底層