大綱
- 新建工程
- 新增依賴
- 數據對象
- 序列化器
- 接入數據源
- 測試
- 修改Slot個數
- 打包、提交、運行
- 工程代碼
在《Java版Flink使用指南——從RabbitMQ中隊列中接入消息流》一文中,我們從RabbitMQ隊列中讀取了字符串型數據。如果我們希望讀取的數據被自動化轉換為一個對象,則需要定制序列化器。本文我們就將講解數據源序列化器的定制方法。
新建工程
我們在IntelliJ中新建一個工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入與Flink的版本:1.19.1
新增依賴
在pom.xml中新增RabbitMQ連接器
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>
新增Json庫依賴
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.17.1</version></dependency>
新增lombok庫,主要是為了使用它的一些注解
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.32</version><scope>provided</scope></dependency>
數據對象
我們新建一個簡單的數據對象SampleData
src/main/java/org/example/vo/SampleData.java
package org.example.vo;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {private Long id;private String name;private int age;private Boolean married;private Double salary;public String toJson() throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsString(this);}public static SampleData fromJson(String json) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.readValue(json, SampleData.class);}
}
這個方法包含兩個方法,一個是將SampleData 轉換成字符串,另一個是將字符串轉成SampleData 對象。
序列化器
我們定義的數據源序列化器要實現AbstractDeserializationSchema接口,主要是通過deserialize方法將二進制數組轉換成SampleData 對象。
src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java
package org.example.serializer;import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;import java.io.IOException;public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {@Overridepublic SampleData deserialize(byte[] message) throws IOException {return SampleData.fromJson(new String(message));}@Overridepublic boolean isEndOfStream(SampleData nextElement) {return false;}@Overridepublic TypeInformation<SampleData> getProducedType() {return TypeInformation.of(SampleData.class);}
}
接入數據源
我們在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq對了寫入了大量SampleData 數據。這次我們將其作為數據源來做測試
這次我們在創建RMQSource時傳入序列化器SampleDataRabbitMQSourceSerializer。它會將從RabbitMQ獲取的數據轉換成SampleData對象。
然后我們獲取所有“已婚”(filter.getMarried() == true)的數據,將其打印到日志中。
String queueName = "data.to.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);
測試
修改Slot個數
由于我們要運行兩個流式計算任務,于是需要兩個Slot。
vim conf/config.yaml
將numberOfTaskSlots的值改成2。
打包、提交、運行
我們將本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交運行
然后在日志中可以看到“已婚”的數據都在輸出
tail -f log/*
工程代碼
https://github.com/f304646673/FlinkDemo