主要功能和場景
-
柔性調溫策略:這個類主要用于管理一個溫度調節流程,通過不同的策略(如策略1和策略2)來調節溫度,確保設備或環境中的溫度達到預設的目標。
-
緊急停止機制:在流程執行過程中,如果需要緊急停止,可以通過設置一個標志來立即停止所有正在進行的任務。
-
定時任務管理:使用Java的
ScheduledExecutorService
來管理定時任務,如定期檢查溫度、執行特定的溫度調節策略等。 -
Redis集成:使用Redisson客戶端與Redis數據庫交互,存儲和檢索緊急停止的狀態。
代碼詳細描述
-
類結構:
- 使用
@Slf4j
注解來自動生成日志記錄器。 - 使用
@RequiredArgsConstructor
注解來自動注入對象
- 使用
-
常量定義:
STRATEGY_DURATION
:定義了策略的持續時間,這里是5分鐘減去30秒。EMERGENCY_STOPPED_KEY
:在Redis中存儲緊急停止狀態的鍵。EMERGENCY_STOPPED
和RESTART_STOPPED
:分別代表緊急停止和重啟的狀態。
-
方法:
startProcessChain(Long planId)
:啟動整個流程,首先檢查是否需要緊急停止,然后開始執行柔性調溫策略1。applyFlexibleTempStrategy1(Long planId)
和applyFlexibleTempStrategy2(Long planId)
:分別實現策略1和策略2,包括下發溫度調節指令和召測命令,以及根據召測結果調整策略。waitFor5MinutesAfterStrategy1(Long planId)
和waitFor10MinutesAfterStrategy2(Long planId)
:在策略1和策略2之后等待一定時間,然后執行下一步。applyTargetPower(Long planId)
:在策略2之后執行目標功率控制。waitFor10MinutesBeforeStop(Long planId)
:在停止前等待10分鐘。emergencyStop(Long planId)
和restart(Long planId)
:分別用于緊急停止流程和重啟流程。isEmergencyStopped(Long planId)
和setEmergencyStopped(Long planId)
:用于檢查和設置緊急停止狀態。
使用場景
這個類適用于需要精確控制溫度的場景,如數據中心、實驗室或工業生產環境,其中溫度的精確控制對于設備的正常運行至關重要。通過這個流程管理器,可以確保在各種情況下都能有效地調節溫度,同時提供緊急停止機制以應對突發情況。
在ProcessManager
類中,每個流程都創建了一個ScheduledExecutorService
實例,通過Executors.newScheduledThreadPool(1)
創建了一個大小為1的線程池。這種設計有幾個好處:
-
資源控制:通過限制線程池的大小為1,可以確保每個流程在其生命周期內只使用一個線程。這有助于防止資源過度消耗,特別是在高并發環境中,可以避免因創建過多線程而導致的系統資源耗盡。
-
任務串行執行:由于線程池大小為1,所有提交給該線程池的任務將按順序串行執行。這意味著一個流程中的所有任務都是順序執行的,不會并發執行,這有助于簡化任務之間的同步和數據依賴問題。
-
簡化同步:在某些情況下,流程中的任務可能需要訪問共享資源或狀態,串行執行可以減少或消除對這些資源進行復雜同步的需求,因為任務不會并發地訪問這些資源。
-
避免競態條件:在單線程環境中,不會出現競態條件(race conditions),因為任務是按順序執行的。這對于確保流程的正確性和可預測性非常重要。
-
易于管理:單線程池使得任務的管理和監控更加簡單。例如,如果需要取消所有任務,只需調用
scheduler.shutdown()
即可。 -
適合定時任務:
ScheduledExecutorService
特別適合執行定時任務,如周期性任務或延遲任務。通過使用單線程池,可以確保這些任務按照預定的時間表執行,而不會因為線程爭用而產生時間偏差。
總之,為每個流程創建一個單線程的ScheduledExecutorService
可以提供一個簡單、可控且高效的方式來管理流程中的定時任務,同時確保流程的穩定性和可預測性。
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.hutool.core.text.CharSequenceUtil;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** 能力認定流程manager*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ProcessManager {/*** 5分鐘,單位為毫秒(策略持續時間),30秒執行一次,減少30秒是由于第一次進入默認執行一次召測*/private static final long STRATEGY_DURATION = 5 * 60 * 1000 - 30000;/*** Redis中存儲緊急停止值*/private static final String EMERGENCY_STOPPED_KEY = "IOT:EMERGENCY:STOPPED:VAL:";/*** 緊急停止值*/private static final String EMERGENCY_STOPPED = "1";/*** 重啟*/private static final String RESTART_STOPPED = "0";/*** Redisson客戶端*/private final RedissonClient redissonClient;/*** 啟動任務*/public void startProcessChain(Long planId) {// 否緊急停止if (isEmergencyStopped(planId)) {return;}ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);log.info("Step 0 startProcessChain with task ID: " + planId);// 立即執行柔性調溫策略1scheduler.schedule(() -> applyFlexibleTempStrategy1(planId), 0, TimeUnit.SECONDS);// 立馬取消當前任務scheduler.shutdown();}/*** 柔性調溫策略1*/private void applyFlexibleTempStrategy1(Long planId) {// 緊急停止if (isEmergencyStopped(planId)) {return;}log.info("Step 1 下發遙調指令task ID: " + planId);// 下發遙調指令String tempValue = "18";// 下發遙調任務失敗,流程結束if (packageCommandTempIssuance()) {return;}long startTime = System.currentTimeMillis();// 整個流程持續五分鐘,30秒執行一次ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {// 緊急停止if (isEmergencyStopped(planId)) {return;}log.info("Step 1 下發召測命令 task ID: " + planId);int unConformNum = packageCallForTest();// 不滿足數量=0時表示都滿足18° ± 0.5°,立即執行下一步if (unConformNum == 0) {log.info("Step 1 策略1條件已滿足,執行下一步等待 task ID: " + planId);// 滿足條件立馬執行:"等待五分鐘"策略scheduler.schedule(() -> waitFor5MinutesAfterStrategy1(planId), 0, TimeUnit.SECONDS);// 立馬取消當前任務scheduler.shutdown();}long elapsedTime = System.currentTimeMillis() - startTime;// 5分鐘內所有通道溫度未達到18° ± 0.5°,任務終止if (unConformNum != 0 && elapsedTime >= STRATEGY_DURATION) {log.info("Step 1 下發召測命令不滿足要求:planId:{},不滿足數量:{} ", planId, unConformNum);emergencyStop(planId);scheduler.shutdown();}}, 0, 30, TimeUnit.SECONDS);}/*** 柔性調溫策略1之后等待五分鐘:配置碼值動態獲取*/private void waitFor5MinutesAfterStrategy1(Long planId) {// 緊急停止if (isEmergencyStopped(planId)) {return;}String value = "5";log.info("After step 1 waiting for " + value + " minutes for task ID: " + planId);ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);// 五分鐘之后執行applyFlexibleTempStrategy2scheduler.schedule(() -> applyFlexibleTempStrategy2(planId), Long.parseLong(value),TimeUnit.MINUTES);// 立馬取消當前任務scheduler.shutdown();}/*** 柔性調溫策略2*/private void applyFlexibleTempStrategy2(Long planId) {// 緊急停止if (isEmergencyStopped(planId)) {return;}log.info("Step 2 策略2下發遙調指令 task ID: " + planId);// 下發遙調指令String tempValue = "26";// 下發遙調任務失敗,流程結束if (packageCommandTempIssuance()) {emergencyStop(planId);return;}long startTime = System.currentTimeMillis();// 整個流程持續五分鐘ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {// 緊急停止if (isEmergencyStopped(planId)) {return;}log.info("Step 2 策略2下發召測指令 task ID: " + planId);int unConformNum = packageCallForTest();// 不滿足數量=0時表示都滿足26° ± 0.5°,立即執行下一步if (unConformNum == 0) {log.info("Step 2 策略2條件已滿足,執行下一步等待 task ID: " + planId);// 滿足條件立馬執行:"等待五分鐘"策略scheduler.schedule(() -> waitFor10MinutesAfterStrategy2(planId), 0, TimeUnit.SECONDS);// 立馬取消當前任務scheduler.shutdown();}long elapsedTime = System.currentTimeMillis() - startTime;// 5分鐘內所有通道溫度未達到26° ± 0.5°,任務終止if (unConformNum != 0 && elapsedTime >= STRATEGY_DURATION) {log.info("Step 2 下發召測命令不滿足要求:planId:{},不滿足數量:{} ", planId, unConformNum);emergencyStop(planId);// 立馬取消當前任務scheduler.shutdown();}}, 0, 30, TimeUnit.SECONDS);}/*** 柔性調溫策略2之后等10分鐘:配置碼值動態獲取*/private void waitFor10MinutesAfterStrategy2(Long planId) {if (isEmergencyStopped(planId)) {return;}String value = "10";log.info("After step 2 Waiting for " + value + " minutes task: " + planId);ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);// 等10分鐘目標功率控制scheduler.schedule(() -> applyTargetPower(planId), Long.parseLong(value), TimeUnit.MINUTES);// 立馬取消當前任務scheduler.shutdown();}/*** 目標功率控制*/private void applyTargetPower(Long planId) {if (isEmergencyStopped(planId)) {return;}log.info("Step 3 目標功率控制 task ID: " + planId);// 下發遙調任務失敗,流程結束if (packageCommandTempIssuance()) {emergencyStop(planId);return;}ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);// 立即執行停止前等待10分鐘scheduler.schedule(() -> waitFor10MinutesBeforeStop(planId), 0,TimeUnit.SECONDS);// 立馬取消當前任務scheduler.shutdown();}/*** 停止前等待10分鐘:配置碼值動態獲取*/private void waitFor10MinutesBeforeStop(Long planId) {// 緊急停止if (isEmergencyStopped(planId)) {return;}String tempValue = "10";log.info("After step 3 wait for " + tempValue + " minutes task ID: " + planId);ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.schedule(() -> {log.info("waitFor10MinutesBeforeStop step 4 stop task ID: " + planId);// 立馬取消當前任務scheduler.shutdown();}, Long.parseLong(tempValue), TimeUnit.MINUTES);}/*** 緊急停止*/public void emergencyStop(Long planId) {log.info("Emergency stopping the process with task ID: " + planId);setEmergencyStopped(planId);}/*** 重啟一鍵能力認證流程*/public void restart(Long planId) {// 1:true,0:falseredissonClient.getBucket(EMERGENCY_STOPPED_KEY + planId).set(RESTART_STOPPED);}/*** 是否緊急停止*/private boolean isEmergencyStopped(Long planId) {RBucket < String > bucket = redissonClient.getBucket(EMERGENCY_STOPPED_KEY + planId);String isEmergencyStopped = bucket.get();return isEmergencyStopped != null && CharSequenceUtil.equals(isEmergencyStopped, EMERGENCY_STOPPED);}/*** 設置緊急停止值*/private void setEmergencyStopped(Long planId) {// 1:true,0:falseredissonClient.getBucket(EMERGENCY_STOPPED_KEY + planId).set(EMERGENCY_STOPPED);}/*** 下發遙調指令*/private boolean packageCommandTempIssuance() {// 下發遙調指令,示意代碼return true;}/*** 下發召測命令** @return 不滿足數量*/private Integer packageCallForTest() {// 下召測命令,拿內機數據,示意代碼return 0;}
}