使用ParameterTool讀取配置文件
Flink讀取參數的對象
Commons-cli
: Apache提供的,需要引入依賴ParameterTool
:Flink內置
ParameterTool 比 Commons-cli 使用上簡便;
ParameterTool能避免Jar包的依賴沖突
建議使用第二種
使用ParameterTool對象可以直接獲取配置文件中的信息,需要如下依賴
<!-- Flink基礎依賴 【ParameterTool類 在該依賴中】 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId></dependency><!-- Flink流批處理依賴 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId></dependency>
Java讀取資源的方式
Class.getResourceAsStream(Path)
:Path 必須以 “/”,表示從ClassPath的根路徑讀取資源Class.getClassLoader().getResourceAsStream(Path)
:Path 無須以 “/”,默認從ClassPath的根路徑讀取資源
推薦使用第2種,以類加載器的方式獲取靜態資源文件,不要通過ClassPath的相對路徑查找
最基本的工具類
public class ParameterUtil {// 創建 ParameterTool 對象public static ParameterTool getParameters() {// 讀取 resources 文件夾下 "flink.properties" 文件InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);try {return ParameterTool.fromPropertiesFile(inputStream);} catch (Exception e) {throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);}}
}
可以通過
ParameterUtil.getParameters().get("redis.port")
直接讀取key對應的value值
Flink寫入Redis方式
- 繼承RichSinkFunction (Flink-Stream)
- 使用第3方的包 (Apache-Bahir-Flink)
Apache-Bahir-Flink 的 Redis-Connector的缺點:
- 使用Jedis, 沒有使用Lettuce
- 沒有對 Flink Table/SQL Api 的支持
不少基于bahir二開的例子解決了上述問題
gitee地址:https://gitee.com/jeff-zou/flink-connector-redis?_from=gitee_search
github地址:https://github.com/apache/bahir-flink
bahir 集成了許多連接器,其中就包含Redis
Flink官網上也可以看到bahir的影子
方便起見,接下來就基于bahir,Flink寫入Redis集群
基于巴希爾(Bahir)-Flink寫入Redis集群
引入connector連接器依賴
<!-- Flink-Connector-Redis --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_${scala.binary.version}</artifactId></dependency>
依賴版本定義在父模塊中
實現RedisMapper接口自定義Sink
首先實現RedisMapper接口
并指定泛型——處理元素的類型
/*** 基于apache bachir flink的RedisSink,作用于Redis String數據類型*/
public class RedisSinkByBahirWithString implements RedisMapper<Tuple2<String, String>> {/*** 指定Redis的命令*/@Overridepublic RedisCommandDescription getCommandDescription() {/* ************************ 如果Redis的數據類型是 hash 或 z-Set* RedisCommandDescription 的構造方法必須傳入 additionalKey* additionalKey就是Redis的鍵** *********************/return new RedisCommandDescription(RedisCommand.SET);}/*** 從數據流里獲取Key值*/@Overridepublic String getKeyFromData(Tuple2<String, String> input) {return input.f0;}/*** 從數據流里獲取Value值*/@Overridepublic String getValueFromData(Tuple2<String, String> input) {return input.f1;}
}
寫入Redis工具類
public class RedisWriteUtil {/* ************************ FlinkJedisClusterConfig:集群模式* FlinkJedisPoolConfig:單機模式* FlinkJedisSentinelConfig:哨兵模式** *********************/// Jedis配置private static final FlinkJedisClusterConfig JEDIS_CONF;static {ParameterTool parameterTool = ParameterUtil.getParameters();String host = parameterTool.get("redis.host");String port = parameterTool.get("redis.port");/* ************************ InetSocketAddress 是Java的套接字** *********************/InetSocketAddress inetSocketAddress = new InetSocketAddress(host, Integer.parseInt(port));Set<InetSocketAddress> set = new HashSet<>();set.add(inetSocketAddress);JEDIS_CONF = new FlinkJedisClusterConfig.Builder().setNodes(set).build();}/*** 基于Bahir寫入Redis,Redis的數據是String類型*/public static void writeByBahirWithString(DataStream<Tuple2<String, String>> input) {input.addSink(new RedisSink<>(JEDIS_CONF, new RedisSinkByBahirWithString()));}}
測試一下
class RedisWriteUtilTest {@DisplayName("測試基于Bahir寫入Redis,Redis數據類型是String類型")@Testvoid writeByBahirWithString() throws Exception {LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();DataStreamSource<Tuple2<String, String>> dataStream = env.fromElements(Tuple2.of("k", "v"));RedisWriteUtil.writeByBahirWithString(dataStream);env.execute();}
}
非常完美!寫入成功
Flink讀取Redis方式
- 繼承RichSourceFunction (實現自定義Source)
- 繼承RichParallelSourceFunction (實現自定義Source)【可以指定并行度】
- 實現SourceFunction接口 (實現自定義Source)
RichParallelSourceFunction 和 RichSourceFunction區別
RichParallelSourceFunction 可以設置并行度
RichParallelSourceFunction 和 RichSourceFunction 代碼是可以互相套用
RichParallelSourceFunction 默認的并行度是cpu 的 核心數(core數)
RichSourceFunction 的并行度只能是1
繼承RichSourceFunction類-Flink讀取Redis集群
前置準備
定義枚舉類
Redis數據類型枚舉類
@Getter
public enum RedisDataType {STRING,HASH,LIST,SET,SORTED_SET,;RedisDataType() {}
}
定義Redis命令的枚舉類
,便于Source判斷操作
@Getter
public enum RedisCommand {// get stringGET(RedisDataType.STRING);private final RedisDataType redisDataType;RedisCommand(RedisDataType redisDataType) {this.redisDataType = redisDataType;}
}
Jedis配置類
bahir依賴中自帶jedis依賴一般不用,自行引入jedis,jedis依賴版本要與巴希爾中jedis版本保持一致
public class JedisConf {public static JedisCluster getJedisCluster() throws IOException {ParameterTool parameterTool =ParameterUtil.getParameters();String host = parameterTool.get("redis.host");String port = parameterTool.get("redis.port");/* *********************** Jedis對象** JedisPool : 用于redis單機版* JedisCluster: 用于redis集群** JedisCluster對象能夠自動發現正常的redis節點** *********************/HostAndPort hostAndPort = new HostAndPort(host,Integer.parseInt(port));Set<HostAndPort> nodes = new HashSet<>();nodes.add(hostAndPort);return new JedisCluster(nodes);}
}
封裝Jedis對象的redis方法
封裝Jedis對象的redis方法,方便統一調用和維護
public class JedisBuilder {private JedisCluster jedis = null;public JedisBuilder(JedisCluster jedisCluster) {this.jedis = jedisCluster;}public void close() {if (this.jedis != null) {this.jedis.close();}}/*** Redis的Get方法*/public String get(String key) {return jedis.get(key);}
}
自定義Source
Redis數據的映射對象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RedisPO implements Serializable {private String data;}
Flink 自定義Redis Source讀取Redis
/* *********************** 【富函數類】 比函數類提供了更多函數生命周期,提供了獲取上下文的方法* 富函數類通常是抽象類* *********************/
public class RedisSource extends RichSourceFunction<RedisPO> {/*** Jedis對象*/private JedisBuilder jedisBuilder;/*** Redis命令枚舉對象*/private final RedisCommand redisCommand;/*** redis key*/private final String key;public RedisSource(RedisCommand redisCommand, String key) {this.redisCommand = redisCommand;this.key = key;}/*** volatile 修飾的變量,它的更新都會通知其他線程.*/private volatile boolean isRunning = true;/*** Redis的連接初始化*/@Overridepublic void open(Configuration parameters) throws Exception {JedisCluster jedisCluster = JedisConf.getJedisCluster();jedisBuilder = new JedisBuilder(jedisCluster);}/*** Redis數據的讀取*/@Overridepublic void run(SourceContext<RedisPO> output) throws Exception {/* ************************ 一直監聽Redis數據的讀取** *********************/String data = null;// while (isRunning) {switch (redisCommand.getRedisDataType()) {case STRING:data = jedisBuilder.get(key);}output.collect(new RedisPO(data));// }}@Overridepublic void cancel() {this.isRunning = false;}}
讀取Redis工具類
public class RedisReadUtil {public static DataStream<RedisPO> read(StreamExecutionEnvironment env,RedisCommand redisCommand,String key) {return env.addSource(new RedisSource(redisCommand, key));}
}
測試一下
class RedisReadUtilTest {@DisplayName("測試自定義Source讀取Redis,Redis數據類型是String類型")@Testvoid testReadByCustomSourceWithString() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<RedisPO> dataStream = RedisReadUtil.read(env,RedisCommand.GET,"k");dataStream.print();env.execute();}
}
測試成功!