目錄
- 讀取文本數據
- 讀取端口數據
事實上Flink本身是流批統一的處理架構,批量的數據集本質上也是流,沒有必要用兩套不同的API來實現。所以從Flink 1.12開始,官方推薦的做法是直接使用DataStream API,在提交任務時通過將執行模式設為BATCH來進行批處理:
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
對于Flink而言,流才是整個處理邏輯的底層核心,所以流批統一之后的DataStream API更加強大,可以直接處理批處理和流處理的所有場景。
讀取文本數據
需要處理數據如下:
hello flink
hello java
hello world
package com.tsg.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("input/word.txt");SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = dataStreamSource.flatMap((String value, Collector<Tuple2<String, Long>> out) -> {String[] split = value.split(" ");for (String s : split) {out.collect(Tuple2.of(s, 1L));}}).returns(Types.TUPLE(Types.STRING,Types.LONG));
// KeyedStream<Tuple2<String, Long>, Tuple> tuple2TupleKeyedStream = wordAndOne.keyBy(0);KeyedStream<Tuple2<String, Long>, String> tuple2TupleKeyedStream = wordAndOne.keyBy(data->data.f0);SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2TupleKeyedStream.sum(1);sum.print();env.execute();}
}
讀取端口數據
package com.tsg.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StreamWordCount {public static void main(String[] args) throws Exception {// 創建流式執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 從參數中提取主機名和端口號ParameterTool parameterTool = ParameterTool.fromArgs(args);String hostname = parameterTool.get("host");int port = parameterTool.getInt("port");DataStreamSource<String> lineStream = env.socketTextStream(hostname,port);
// DataStreamSource<String> lineStream = env.socketTextStream("master", 7777);SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = lineStream.flatMap((String str, Collector<Tuple2<String, Long>> out) -> {// 注意這里的Collector是org.apache.flink.util.Collector;String[] split = str.split(" ");for (String s : split) {out.collect(Tuple2.of(s, 1L));}}).returns(Types.TUPLE(Types.STRING,Types.LONG));KeyedStream<Tuple2<String, Long>, Tuple> tuple2TupleKeyedStream = tuple2SingleOutputStreamOperator.keyBy(0);SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2TupleKeyedStream.sum(1);sum.print();env.execute();}
}