基于apache flink的流處理實時模型
44元
包郵
(需用券)
去購買 >
以前的數據分析項目(版本1.4.2),對從Kafka讀取的原始數據流,調用split接口實現分流.
新項目決定使用Flink 1.7.2,使用split接口進行分流的時候,發現接口被標記為depracted(后續可能會被移除).
搜索相關文檔,發現新版本Flink中推薦使用帶外數據進行分流.
預先建立OutputTag實例(LogEntity是從kafka讀取的日志實例類).
private static final OutputTag APP_LOG_TAG = new OutputTag<>("appLog", TypeInformation.of(LogEntity.class));
private static final OutputTag ANALYZE_METRIC_TAG = new OutputTag<>("analyzeMetricLog", TypeInformation.of(LogEntity.class));
對kafka讀取的原始數據,通過process接口,打上相應標記.
private static SingleOutputStreamOperator sideOutStream(DataStream rawLogStream) {
return rawLogStream
.process(new ProcessFunction() {
@Override
public void processElement(LogEntity entity, Context ctx, Collector out) throws Exception {
// 根據日志等級,給對象打上不同的標記
if (entity.getLevel().equals(ANALYZE_LOG_LEVEL)) {
ctx.output(ANALYZE_METRIC_TAG, entity);
} else {
ctx.output(APP_LOG_TAG, entity);
}
}
})
.name("RawLogEntitySplitStream");
}
// 調用函數,對原始數據流中的對象進行標記
SingleOutputStreamOperator sideOutLogStream = sideOutStream(rawLogStream);
// 根據標記,獲取不同的數據流,以便后續進行進一步分析
DataStream appLogStream = sideOutLogStream.getSideOutput(APP_LOG_TAG);
DataStream rawAnalyzeMetricLogStream = sideOutLogStream.getSideOutput(ANALYZE_METRIC_TAG);
通過以上步驟,就實現了數據流的切分.
PS:
如果您覺得我的文章對您有幫助,請關注我的微信公眾號,謝謝!
原文鏈接:https://www.cnblogs.com/jason1990/p/11610130.html
java 11官方入門(第8版)教材
79.84元
包郵
(需用券)
去購買 >