前言
以采集數據處理邏輯為例,數據采集分為不同種類如:MQTT、MODBUS、HTTP等,不同的采集數據有不同的解析處理邏輯。但
總體解析處理步驟是固定的
。可以使用工廠方法設計模式簡化代碼,讓代碼變得更加優雅。
代碼實踐
抽象類
總體步驟一致,先聲明一個抽象類包含所有處理步驟,具體處理步驟由不同子類自行實現。【大體處理框架
】
public abstract class AbstractCollectService {protected abstract Boolean handleAlarm(CollectDataMessage message);protected abstract Boolean handleCollect(CollectDataMessage message);public CollectDataMessage parseKafkaMessage(String kafkaMessage, CollectTypeEnum collectTypeEnum){// 工廠方法return CollectFactory.getInstance(collectTypeEnum).parseKafkaMessage2DataMessage(kafkaMessage);}protected abstract CollectDataMessage parseKafkaMessage2DataMessage(String kafkaMessage);public Boolean doHandle(CollectDataMessage message) {Boolean ret;switch (message.getHandleTypeEnum()){case ALARM:ret = handleAlarm(message) ;break ;case COLLECT:ret = handleCollect(message) ;break ;default:ret = false ;}return ret ;}}
枚舉類
采集數據枚舉類與子實現類一一對應
:
public enum CollectTypeEnum {MQTT,MODBUS,HTTP;
}
子實現類
不同采集數據的子類處理邏輯,各自實現抽象類中抽象方法(核心邏輯)。
工廠方法
定義工廠方法,使用枚舉做判斷條件,真正處理不同邏輯時,需要顯示地傳出對應枚舉參數
以便得到對應實現類對象。
public class CollectFactory {public static AbstractCollectService getInstance(CollectTypeEnum collectTypeEnum) {switch (collectTypeEnum) {case MQTT:return MqttCollectService.getInstance();case MODBUS:return ModbusCollectService.getInstance();case HTTP:return HttpCollectService.getInstance();default:throw new IllegalArgumentException("Unknown collect type");}}
}
具體子類對象,都是采用【基于類初始化】獲取的單例對象
。隨便一個為例,其他子類同理。
簡單的工廠方法設計模式就這樣實現了~
最終使用
顯示指定枚舉參數
處理函數:
public class KafkaMsg2CollectMsgRichMapFunction extends RichMapFunction<String, CollectDataMessage> {private static final Logger log = LoggerFactory.getLogger(KafkaMsg2CollectMsgRichMapFunction.class) ;private final CollectTypeEnum collectTypeEnum;public KafkaMsg2CollectMsgRichMapFunction(CollectTypeEnum collectTypeEnum) {this.collectTypeEnum = collectTypeEnum;}@Overridepublic void open(Configuration parameters) throws Exception {}@Overridepublic CollectDataMessage map(String kafkaMessage) {try {// 根據顯示指定的枚舉類,獲取對應子類實現相應邏輯AbstractCollectService collectService = CollectFactory.getInstance(collectTypeEnum);return collectService.parseKafkaMessage(kafkaMessage, collectTypeEnum);} catch (RuntimeException e) {log.info("解析采集數據異常", e);throw new RuntimeException(e);}}@Overridepublic void close() throws Exception {}
}