上節成功實現了FlinkKafkaConsumer消費Kafka數據,并將數據寫入到控制臺,接下來將繼續將計算的結果輸入到redis中。
pom.xml
引入redis到pom包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.maven</groupId><artifactId>maven-plugin-api</artifactId><version>2.0</version></dependency><dependency><groupId>org.apache.maven.plugin-tools</groupId><artifactId>maven-plugin-annotations</artifactId><version>3.2</version></dependency><dependency><groupId>org.codehaus.plexus</groupId><artifactId>plexus-utils</artifactId><version>3.0.8</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.8.2</version><scope>test</scope></dependency><!--mybatis坐標--><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.4.5</version></dependency><!--mysql驅動坐標--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.6</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-plugin-plugin</artifactId><version>3.2</version><executions><execution><phase>package</phase><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins>
</build>
</project>
KafkaProducer.java 生產數據存入Kafka
同上一節,具體代碼
package org.example.snow.demo5;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** @author snowsong*/
public class KafkaTestProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();// Kafka 集群的初始連接地址props.put("bootstrap.servers", "172.16.1.173:9092");// 序列化器 將 Java 對象序列化為字節數組props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// kafka生產者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 消息循環for (int i = 0; i < 50; i++) {String key = "key-" + i;String value = "value-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("xue", key, value);producer.send(record);System.out.println("send: " + key);Thread.sleep(200);}// 關閉生產者producer.close();}
}
啟動服務類
Flink消費Kafka,并將結果存入redis。
設置FlinkRedisConfig
// 配置 Redis 連接池,設置 Redis 服務器地址和端口并構建對象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 創建 RedisSink 對象,用于將數據寫入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 將 RedisSink 添加到數據流中,作為數據的接收端wordData.addSink(redisSink);
MyRedisMapper
它實現了 RedisMapper 接口,用于自定義 Redis 數據的映射規則。MyRedisMapper 類用于將 Flink 數據流中的 Tuple2 對象映射到 Redis 命令中。
public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 獲取當前命令的描述信息。** @return 返回Redis命令的描述信息對象,其中包含了命令的類型為LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 從給定的Tuple2數據中獲取鍵。** @param data 一個包含兩個字符串元素的Tuple2對象* @return 返回Tuple2對象的第一個元素,即鍵*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 從給定的元組中獲取第二個元素的值。** @param data 一個包含兩個字符串元素的元組* @return 元組中的第二個元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}
starApp的完整代碼如下:
package org.example.snow.demo5;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import java.util.Properties;/*** @author snowsong*/
public class StartApp {private static final String REDIS_SERVER = "0.0.0.0";private static final Integer REDIS_PORT = 6379;public static void main(String[] args) throws Exception {// 初始化StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置 Kafka 客戶端的連接參數Properties properties = new Properties();properties.setProperty("bootstrap.servers", "172.16.1.173:9092");FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("xue",new SimpleStringSchema(), properties);DataStreamSource dataStreamSource = env.addSource(flinkKafkaConsumer);// 將接收的數據映射為二元組SingleOutputStreamOperator<Tuple2<String, String>> wordData = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {/*** 將輸入的字符串映射為 Tuple2 對象。** @param value 輸入的字符串* @return 一個包含兩個元素的 Tuple2 對象,第一個元素為 "l_words",第二個元素為輸入的字符串* @throws Exception 如果發生異常,則拋出該異常*/@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});// 配置 Redis 連接池,設置 Redis 服務器地址和端口并構建對象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 創建 RedisSink 對象,用于將數據寫入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 將 RedisSink 添加到數據流中,作為數據的接收端wordData.addSink(redisSink);env.execute();}/*** MyRedisMapper 類用于將 Flink 數據流中的 Tuple2 對象映射到 Redis 命令中。* 它實現了 RedisMapper 接口,用于自定義 Redis 數據的映射規則。*/public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 獲取當前命令的描述信息。** @return 返回Redis命令的描述信息對象,其中包含了命令的類型為LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 從給定的Tuple2數據中獲取鍵。** @param data 一個包含兩個字符串元素的Tuple2對象* @return 返回Tuple2對象的第一個元素,即鍵*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 從給定的元組中獲取第二個元素的值。** @param data 一個包含兩個字符串元素的元組* @return 元組中的第二個元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}}}
運行結果
存入redis結果