目錄
- 源算子(Source)
- 從集合中讀取數據
- 從文件讀取數據
- 從Socket讀取數據
- 從Kafka讀取數據
- 從數據生成器讀取數據
- Flink支持的數據類型
- Flink的類型系統
- Flink支持的數據類型
- 類型提示(Type Hints)
源算子(Source)
? Flink可以從各種來源獲取數據,然后構建DataStream進行轉換處理。一般將數據的輸入來源稱為數據源(data source),而讀取數據的算子就是源算子(source operator)。所以,source就是我們整個處理程序的輸入端。
? 在Flink1.12以前,舊的添加source的方式,是調用執行環境的addSource()方法:
DataStream stream = env.addSource(…);
方法傳入的參數是一個“源函數”(source function),需要實現SourceFunction接口。
? 從Flink1.12開始,主要使用流批統一的新Source架構:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多預實現的接口,此外還有很多外部連接工具也幫我們實現了對應的Source,通常情況下足以應對我們的實際需求。
從集合中讀取數據
? 最簡單的讀取數據的方式,就是在代碼中直接創建一個Java集合,然后調用執行環境的fromCollection方法進行讀取。這相當于將數據臨時存儲到內存中,形成特殊的數據結構后,作為數據源使用,一般用于測試。
package env;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class CollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 從集合讀
// DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 22, 3));// 直接填寫元素DataStreamSource<Integer> source = env.fromElements(1, 22, 3);source.print();env.execute();}
}
從文件讀取數據
? 真正的實際應用中,自然不會直接將數據寫在代碼中。通常情況下,我們會從存儲介質中獲取數據,一個比較常見的方式就是讀取日志文件。這也是批處理中最常見的讀取方式。
讀取文件,需要添加文件連接器依賴:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version>
</dependency>
package source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FileSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);FileSource.FileSourceBuilder<String> builder = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt"));FileSource<String> fileSource = builder.build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"filesource").print();env.execute();}
}
說明:
- 參數可以是目錄,也可以是文件;還可以從HDFS目錄下讀取,使用路徑hdfs://…;
- 路徑可以是相對路徑,也可以是絕對路徑;
- 相對路徑是從系統屬性user.dir獲取路徑:idea下是project的根目錄,standalone模式下是集群節點根目錄;
從Socket讀取數據
? 不論從集合還是文件,我們讀取的其實都是有界數據。在流處理的場景中,數據往往是無界的。
我們之前用到的讀取socket文本流,就是流處理場景。但是這種方式由于吞吐量小、穩定性較差,一般也是用于測試。
DataStream<String> stream = env.socketTextStream("localhost", 7777);
從Kafka讀取數據
? Flink官方提供了連接工具flink-connector-kafka,直接幫我們實現了一個消費者FlinkKafkaConsumer,它就是用來讀取Kafka數據的SourceFunction。
? 所以想要以Kafka作為數據源獲取數據,我們只需要引入Kafka連接器的依賴。Flink官方提供的是一個通用的Kafka連接器,它會自動跟蹤最新版本的Kafka客戶端。目前最新版本只支持0.10.0版本以上的Kafka。這里我們需要導入的依賴如下。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version>
</dependency>
package source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KafkaSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("master:9092,slave1:9092,slave2:9092").setGroupId("kafkasource").setTopics("topic1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"").print();env.execute();}
}
Kafka生產數據
Flink消費數據
從數據生成器讀取數據
? Flink從1.11開始提供了一個內置的DataGen 連接器,主要是用于生成一些隨機數,用于在沒有數據源的時候,進行流任務的測試以及性能測試等。1.17提供了新的Source寫法,需要導入依賴:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version>
</dependency>
package source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 數據生成器Source,四個參數* 第一個:GeneratorFunction接口,需要實現,重寫map方法,輸入類型固定是Long* 第二個,Long類型,自動生成的數字序列(從0自增)的最大值(小于),達到這個值就停止了* 第三個,限速策略,比如 每秒生成幾條數據* 第四個:返回的類型*/DataGeneratorSource<String> source = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) {return "Number:" + value;}}, Long.MAX_VALUE, RateLimiterStrategy.perSecond(100), Types.STRING);env.fromSource(source,WatermarkStrategy.noWatermarks(),"").print();env.execute();}
}
Flink支持的數據類型
Flink的類型系統
Flink使用“類型信息”(TypeInformation)來統一表示數據類型。TypeInformation類是Flink中所有類型描述符的基類。它涵蓋了類型的一些基本屬性,并為每個數據類型生成特定的序列化器、反序列化器和比較器。
Flink支持的數據類型
對于常見的Java和Scala數據類型,Flink都是支持的。Flink在內部,Flink對支持不同的類型進行了劃分,這些類型可以在Types工具類中找到:
(1)基本類型
所有Java基本類型及其包裝類,再加上Void、String、Date、BigDecimal和BigInteger。
(2)數組類型
包括基本類型數組(PRIMITIVE_ARRAY)和對象數組(OBJECT_ARRAY)。
(3)復合數據類型
- Java元組類型(TUPLE):這是Flink內置的元組類型,是Java
API的一部分。最多25個字段,也就是從Tuple0~Tuple25,不支持空字段。 - Scala 樣例類及Scala元組:不支持空字段。
- 行類型(ROW):可以認為是具有任意個字段的元組,并支持空字段。
- POJO:Flink自定義的類似于Java bean模式的類。
(4)輔助類型
Option、Either、List、Map等。
(5)泛型類型(GENERIC)
Flink支持所有的Java類和Scala類。不過如果沒有按照上面POJO類型的要求來定義,就會被Flink當作泛型類來處理。Flink會把泛型類型當作黑盒,無法獲取它們內部的屬性;它們也不是由Flink本身序列化的,而是由Kryo序列化的。
在這些類型中,元組類型和POJO類型最為靈活,因為它們支持創建復雜類型。而相比之下,POJO還支持在鍵(key)的定義中直接使用字段名,這會讓我們的代碼可讀性大大增加。所以,在項目實踐中,往往會將流處理程序中的元素類型定為Flink的POJO類型。
Flink對POJO類型的要求如下:
- 類是公有(public)的
- 有一個無參的構造方法
- 所有屬性都是公有(public)的
- 所有屬性的類型都是可以序列化的
類型提示(Type Hints)
Flink還具有一個類型提取系統,可以分析函數的輸入和返回類型,自動獲取類型信息,從而獲得對應的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情況下(比如Lambda表達式中),自動提取的信息是不夠精細的——只告訴Flink當前的元素由“船頭、船身、船尾”構成,根本無法重建出“大船”的模樣;這時就需要顯式地提供類型信息,才能使應用程序正常工作或提高其性能。
為了解決這類問題,Java API提供了專門的“類型提示”(type hints)。
回憶一下之前的word count流處理程序,我們在將String類型的每個詞轉換成(word, count)二元組后,就明確地用returns指定了返回的類型。因為對于map里傳入的Lambda表達式,系統只能推斷出返回的是Tuple2類型,而無法得到Tuple2<String, Long>。只有顯式地告訴系統當前的返回類型,才能正確地解析出完整數據。
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
Flink還專門提供了TypeHint類,它可以捕獲泛型的類型信息,并且一直記錄下來,為運行時提供足夠的信息。我們同樣可以通過.returns()方法,明確地指定轉換之后的DataStream里元素的類型。
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})