Node.js 中 cluster 模塊全部 API 詳解
1. 模塊屬性
const cluster = require('cluster');// 1. isMaster
// 判斷當前進程是否為主進程
console.log('是否為主進程:', cluster.isMaster);// 2. isWorker
// 判斷當前進程是否為工作進程
console.log('是否為工作進程:', cluster.isWorker);// 3. schedulingPolicy
// 獲取或設置調度策略
// SCHED_NONE: 由操作系統調度
// SCHED_RR: 輪詢調度
console.log('當前調度策略:', cluster.schedulingPolicy);
cluster.schedulingPolicy = cluster.SCHED_RR;// 4. workers
// 獲取所有工作進程的引用
console.log('工作進程數量:', Object.keys(cluster.workers).length);
2. 主進程方法
// 1. fork()
// 創建新的工作進程
const worker = cluster.fork();// 2. setupMaster([settings])
// 配置主進程
cluster.setupMaster({exec: 'worker.js', // 工作進程文件args: ['--use', 'http'], // 傳遞給工作進程的參數silent: false, // 是否將工作進程的輸出重定向到主進程stdio: ['pipe', 'pipe', 'pipe', 'ipc'], // 標準輸入輸出配置uid: 1000, // 用戶 IDgid: 1000, // 組 IDinspectPort: 0 // 調試端口
});// 3. disconnect([callback])
// 斷開所有工作進程的連接
cluster.disconnect(() => {console.log('所有工作進程已斷開連接');
});// 4. settings
// 獲取當前配置
console.log('當前配置:', cluster.settings);
3. 工作進程屬性
// 1. worker.id
// 獲取工作進程 ID
console.log('工作進程 ID:', cluster.worker.id);// 2. worker.process
// 獲取工作進程的進程對象
console.log('進程 ID:', cluster.worker.process.pid);// 3. worker.exitedAfterDisconnect
// 判斷工作進程是否在斷開連接后退出
console.log('是否在斷開連接后退出:', cluster.worker.exitedAfterDisconnect);// 4. worker.isDead()
// 判斷工作進程是否已死亡
console.log('是否已死亡:', cluster.worker.isDead());// 5. worker.isConnected()
// 判斷工作進程是否已連接
console.log('是否已連接:', cluster.worker.isConnected());
4. 工作進程方法
// 1. worker.send(message[, sendHandle][, callback])
// 發送消息給主進程
cluster.worker.send('hello from worker', (err) => {if (err) console.error('發送消息失敗:', err);
});// 2. worker.disconnect()
// 斷開工作進程連接
cluster.worker.disconnect();// 3. worker.kill([signal])
// 終止工作進程
cluster.worker.kill('SIGTERM');
5. 事件
5.1 主進程事件
// 1. fork
// 當創建新的工作進程時觸發
cluster.on('fork', (worker) => {console.log('工作進程已創建:', worker.id);
});// 2. online
// 當工作進程上線時觸發
cluster.on('online', (worker) => {console.log('工作進程已上線:', worker.id);
});// 3. listening
// 當工作進程開始監聽時觸發
cluster.on('listening', (worker, address) => {console.log('工作進程正在監聽:', worker.id, address);
});// 4. message
// 當收到工作進程消息時觸發
cluster.on('message', (worker, message, handle) => {console.log('收到工作進程消息:', worker.id, message);
});// 5. disconnect
// 當工作進程斷開連接時觸發
cluster.on('disconnect', (worker) => {console.log('工作進程已斷開連接:', worker.id);
});// 6. exit
// 當工作進程退出時觸發
cluster.on('exit', (worker, code, signal) => {console.log('工作進程已退出:', worker.id, code, signal);
});// 7. error
// 當工作進程發生錯誤時觸發
cluster.on('error', (worker, code, signal) => {console.log('工作進程錯誤:', worker.id, code, signal);
});
5.2 工作進程事件
// 1. message
// 當收到主進程消息時觸發
cluster.worker.on('message', (message, handle) => {console.log('收到主進程消息:', message);
});// 2. disconnect
// 當工作進程斷開連接時觸發
cluster.worker.on('disconnect', () => {console.log('工作進程已斷開連接');
});// 3. error
// 當工作進程發生錯誤時觸發
cluster.worker.on('error', (code, signal) => {console.log('工作進程錯誤:', code, signal);
});// 4. exit
// 當工作進程退出時觸發
cluster.worker.on('exit', (code, signal) => {console.log('工作進程已退出:', code, signal);
});
6. 完整示例
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;if (cluster.isMaster) {console.log(`主進程 ${process.pid} 正在運行`);// 配置主進程cluster.setupMaster({exec: 'worker.js',args: ['--use', 'http'],silent: false});// 啟動工作進程for (let i = 0; i < numCPUs; i++) {cluster.fork();}// 主進程事件處理cluster.on('fork', (worker) => {console.log('工作進程已創建:', worker.id);});cluster.on('online', (worker) => {console.log('工作進程已上線:', worker.id);});cluster.on('listening', (worker, address) => {console.log('工作進程正在監聽:', worker.id, address);});cluster.on('message', (worker, message, handle) => {console.log('收到工作進程消息:', worker.id, message);worker.send('消息已收到');});cluster.on('disconnect', (worker) => {console.log('工作進程已斷開連接:', worker.id);});cluster.on('exit', (worker, code, signal) => {console.log('工作進程已退出:', worker.id, code, signal);// 重啟工作進程cluster.fork();});cluster.on('error', (worker, code, signal) => {console.log('工作進程錯誤:', worker.id, code, signal);});// 定期檢查工作進程狀態setInterval(() => {for (const id in cluster.workers) {const worker = cluster.workers[id];console.log(`工作進程 ${worker.id} 狀態:`, {pid: worker.process.pid,isDead: worker.isDead(),isConnected: worker.isConnected(),exitedAfterDisconnect: worker.exitedAfterDisconnect});}}, 5000);// 優雅關閉process.on('SIGTERM', () => {console.log('收到 SIGTERM 信號,開始優雅關閉');for (const id in cluster.workers) {cluster.workers[id].send('shutdown');cluster.workers[id].disconnect();}});
} else {// 工作進程代碼const server = http.createServer((req, res) => {res.writeHead(200);res.end(`你好世界,來自工作進程 ${process.pid}\n`);});server.listen(8000);// 工作進程事件處理cluster.worker.on('message', (message) => {console.log('收到主進程消息:', message);if (message === 'shutdown') {server.close(() => {console.log(`工作進程 ${process.pid} 已關閉`);process.exit(0);});}});cluster.worker.on('disconnect', () => {console.log('工作進程已斷開連接');});cluster.worker.on('error', (code, signal) => {console.log('工作進程錯誤:', code, signal);});cluster.worker.on('exit', (code, signal) => {console.log('工作進程已退出:', code, signal);});// 定期發送心跳setInterval(() => {cluster.worker.send('heartbeat');}, 30000);
}
7. 高級用法
// 1. 動態調整工作進程數量
const cluster = require('cluster');
const http = require('http');
const os = require('os');if (cluster.isMaster) {let workerCount = os.cpus().length;console.log(`初始工作進程數量: ${workerCount}`);// 啟動工作進程for (let i = 0; i < workerCount; i++) {cluster.fork();}// 動態調整工作進程數量process.on('SIGUSR1', () => {workerCount = Math.max(1, workerCount - 1);console.log(`減少工作進程數量至: ${workerCount}`);const workers = Object.values(cluster.workers);if (workers.length > workerCount) {workers[workers.length - 1].disconnect();}});process.on('SIGUSR2', () => {workerCount = Math.min(os.cpus().length * 2, workerCount + 1);console.log(`增加工作進程數量至: ${workerCount}`);if (Object.keys(cluster.workers).length < workerCount) {cluster.fork();}});
}// 2. 工作進程負載均衡
const cluster = require('cluster');
const http = require('http');
const os = require('os');if (cluster.isMaster) {const workerCount = os.cpus().length;const workers = [];// 啟動工作進程for (let i = 0; i < workerCount; i++) {const worker = cluster.fork();workers.push({worker,load: 0,connections: 0});}// 負載均衡http.createServer((req, res) => {// 選擇負載最低的工作進程const target = workers.reduce((min, w) => w.load < min.load ? w : min, workers[0]);target.connections++;target.load = target.connections / 100; // 簡單的負載計算// 轉發請求target.worker.send('request', { url: req.url });}).listen(8000);// 處理工作進程響應cluster.on('message', (worker, message) => {if (message.type === 'response') {const workerInfo = workers.find(w => w.worker.id === worker.id);if (workerInfo) {workerInfo.connections--;workerInfo.load = workerInfo.connections / 100;}}});
}// 3. 工作進程健康檢查
const cluster = require('cluster');
const http = require('http');if (cluster.isMaster) {const workers = new Map();// 啟動工作進程for (let i = 0; i < 4; i++) {const worker = cluster.fork();workers.set(worker.id, {worker,healthy: true,lastHeartbeat: Date.now()});}// 健康檢查setInterval(() => {const now = Date.now();for (const [id, info] of workers) {if (now - info.lastHeartbeat > 30000) {console.log(`工作進程 ${id} 可能已死亡`);info.healthy = false;info.worker.kill();const newWorker = cluster.fork();workers.set(newWorker.id, {worker: newWorker,healthy: true,lastHeartbeat: now});}}}, 5000);// 處理心跳cluster.on('message', (worker, message) => {if (message === 'heartbeat') {const info = workers.get(worker.id);if (info) {info.healthy = true;info.lastHeartbeat = Date.now();}}});
}
cluster 模塊的主要特點:
- 支持多核 CPU 并行處理
- 提供完整的事件系統
- 支持進程間通信
- 支持動態調整工作進程數量
- 支持負載均衡和健康檢查
使用建議:
- 根據 CPU 核心數合理設置工作進程數量
- 實現完善的錯誤處理和重啟機制
- 使用事件系統進行進程間通信
- 實現健康檢查確保系統穩定性
- 考慮使用更高級的進程管理工具(如 PM2)