赞
踩
团队需要使用实时数据(小时级别)给业务系统使用,
数据流向是这样:实时数据-》Kafka-》Flink-》Redis(集群和哨兵),
可是,Flink原生并不支持Redis连接,
于是,有第三方提供连接工具,如https://bahir.apache.org/docs/flink/current/flink-streaming-redis/,
但是,我在实际操作的过程中,遇到问题:无法初始化Redis集群连接,异常信息如下,
大家也不用参考官方的bahir文档 了,没有作用。
所以,我探究了一下(其实是好几下)bahir的源码,准备重新写Redis的集群连接,
这里有一个小插曲,本来打算继承FlinkJedisClusterConfig这个类重新写Redis的集群连接,
发现作者还是很聪明的,将构造器设置为私有(private),不让继承,所以,这条路断了,
另寻其他出路,依据bahir,重写Redis集群连接相关代码,提高Redis客户端版本,
帮助其他读者在使用时避坑,
分享如下(包括重写的代码)。
异常信息
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[?:1.8.0_262]
at java.lang.Integer.parseInt(Integer.java:580) ~[?:1.8.0_262]
at java.lang.Integer.valueOf(Integer.java:766) ~[?:1.8.0_262]
at redis.clients.util.ClusterNodeInformationParser.getHostAndPortFromNodeLine(ClusterNodeInformationParser.java:39) ~[blob_p-a436c38c6f05b2f134b34bb28d2ce7eaba02a621-50f891edc883b56f49b72ec65d36ab09:?]
at redis.clients.util.ClusterNodeInformationParser.parse(ClusterNodeInformationParser.java:14) ~[blob_p-a436c38c6f05b2f134b34bb28d2ce7eaba02a621-50f891edc883b56f49b72ec65d36ab09:?]
异常信息截图
该工具连接集群配置节点的方式为:InetSocketAddress,
我们知道,这种方式(InetSocketAddress)是无法获取私有IP的,通过InetSocketAddress.getHostame获取Linux直接的私有IP时,无法获取IP地址,因此,无法连接到Redis。配置Redis集群节点源码如下:
位置:org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig.Builder#setNodes
如上可知配置Redis集群节点地址是通过InetSocketAddress,但是,搞笑的是,获取节点时,将节点InetSocketAddress转换为HostAndPort,源码如下,为什么一开始不直接使用HostAndPort方式配置Redis集群节点?这样获取节点时就不用再转换一次,并且,造成Flink无法连接私有IP的Redis集群,原作者为什么这么做?表示不理解。吐槽归吐槽,最终落脚点还是要解决问题,
如何使Flink连接私有IP的Redis集群,参见下文。
位置:org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig#getNodes
基于Java实现Flink连接及操作Redis,添加的完整依赖如下,
可以直接使用,其中,
添加jedis客户端3.5.1
原生Redis连接flink-connector-redis_2.11依赖的Jedis客户端版本为2.8.0,
无法连接Redis私有IP集群,因此,排除该包,重新添加高版本Jedis客户端。
集成maven-assembly-plugin
打包时,需要将以来打入jar包,特添加了:maven-assembly-plugin,
将项目使用的所有依赖均放入最终的jar包中,正常情况下,Flink中已经包含了部分依赖,
可是,Flink官方不支持Redis连接,所以,没有Redis相关的依赖,
想使用Flink操作Redis,需要将Redis相关的包一起包装在jar包中。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>bigdata-tutorial</artifactId> <version>1.0-SNAPSHOT</version> <name>bigdata-tutorial</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.14.5</flink.version> <scala.version>2.12</scala.version> <redis.version>3.5.1</redis.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> <exclusions> <exclusion> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${redis.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-assembly-plugin --> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass> com.monkey.App </mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
代码重写了三个部分:
Flink连接如Kafka、MySQL、Redis等需要添加配置对应的Sink,
因此,实现Flink操作Redis,需要重写Redis的Sink,
根据Flink的设计模型,Redis的Sink需要继承RichSinkFunction,
重写的代码如下:
package com.monkey.examples.redis.myredis; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Objects; /** * Redis进入Flink门户(暂且这么理解,后续完善). * * @author xindaqi * @since 2022-07-16 17:57 */ public class MyRedisSink<IN> extends RichSinkFunction<IN> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); /** * 补充的Redis数据类型.如{@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. * 其他的数据类型 {@link RedisDataType} 只有两个变量,如添加名称列表和值. * 但是对于 {@link RedisDataType#HASH} 和 {@link RedisDataType#SORTED_SET}需要三个变量. * <p>对 {@link RedisDataType#HASH} 需要名称, 键和元素(值). * {@code additionalKey} 用作 {@link RedisDataType#HASH}的名称 * <p>对于 {@link RedisDataType#SORTED_SET} 需要名称、元素和权重(打分). * {@code additionalKey} 用作 {@link RedisDataType#SORTED_SET}的名称 */ private String additionalKey; private RedisMapper<IN> redisSinkMapper; private RedisCommand redisCommand; private FlinkJedisConfigBase flinkJedisConfigBase; private RedisCommandsContainer redisCommandsContainer; /** * 新建与Redis服务的连接,{@link MyRedisSink} . * * @param flinkJedisConfigBase Redis配置 {@link FlinkJedisConfigBase} * @param redisSinkMapper 生成Redis命令和键值. */ public MyRedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) { Objects.requireNonNull(flinkJedisConfigBase, "Redis connection pool config should not be null"); Objects.requireNonNull(redisSinkMapper, "Redis Mapper can not be null"); Objects.requireNonNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null"); this.flinkJedisConfigBase = flinkJedisConfigBase; this.redisSinkMapper = redisSinkMapper; RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription(); this.redisCommand = redisCommandDescription.getCommand(); this.additionalKey = redisCommandDescription.getAdditionalKey(); } /** * 当新数据进入sink时回调该方法,并转发到Redis通道. * 使用何种的命令取决于Redis数据类型 (参见 {@link RedisDataType}), * 可用的命令有: RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD. * * @param input 输入数据 */ @Override public void invoke(IN input) throws Exception { String key = redisSinkMapper.getKeyFromData(input); String value = redisSinkMapper.getValueFromData(input); switch (redisCommand) { case RPUSH: this.redisCommandsContainer.rpush(key, value); break; case LPUSH: this.redisCommandsContainer.lpush(key, value); break; case SADD: this.redisCommandsContainer.sadd(key, value); break; case SET: this.redisCommandsContainer.set(key, value); break; case PFADD: this.redisCommandsContainer.pfadd(key, value); break; case PUBLISH: this.redisCommandsContainer.publish(key, value); break; case ZADD: this.redisCommandsContainer.zadd(this.additionalKey, value, key); break; case ZREM: this.redisCommandsContainer.zrem(this.additionalKey, key); break; case HSET: this.redisCommandsContainer.hset(this.additionalKey, key, value); break; default: throw new IllegalArgumentException("Cannot process such data type: " + redisCommand); } } /** * 初始化Redis连接:单体、哨兵或者集群. * * @throws IllegalArgumentException 如果jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig为空 */ @Override public void open(Configuration parameters) throws Exception { try { this.redisCommandsContainer = MyRedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); this.redisCommandsContainer.open(); } catch (Exception e) { LOG.error("Redis 初始化失败: ", e); throw e; } } /** * Closes commands container. * * @throws IOException if command container is unable to close. */ @Override public void close() throws IOException { if (redisCommandsContainer != null) { redisCommandsContainer.close(); } } }
我们知道,Redis发展至今,Redis最常用的3中存在形式为:单机、哨兵和集群。
所以,为了提高兼容性,设计时,需要考虑这3中方式的连接,
因此,这里设计了一个连接工厂,根据不同的对象类型,新建对一个的连接,
实现如下:
package com.monkey.examples.redis.myredis; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; import org.apache.flink.streaming.connectors.redis.common.container.RedisClusterContainer; import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; import org.apache.flink.streaming.connectors.redis.common.container.RedisContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.JedisSentinelPool; import java.util.Objects; /** * Redis连接配置工厂. * * @author xindaqi * @since 2022-07-16 17:59 */ public class MyRedisCommandsContainerBuilder { private static final Logger LOG = LoggerFactory.getLogger(MyRedisCommandsContainerBuilder.class); /** * 根据连接类型初始化Redis连接,包括单机Redis、集群Redis和哨兵Redis * 参见:{@link RedisCommandsContainer} * * @param flinkJedisConfigBase redis配置 * @return @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null */ public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){ if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){ FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase; return MyRedisCommandsContainerBuilder.build(flinkJedisPoolConfig); } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) { FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase; return MyRedisCommandsContainerBuilder.build(flinkJedisSentinelConfig); } else if (flinkJedisConfigBase instanceof MyRedisClusterConfig) { MyRedisClusterConfig flinkJedisClusterConfig = (MyRedisClusterConfig) flinkJedisConfigBase; return MyRedisCommandsContainerBuilder.build(flinkJedisClusterConfig); } else { throw new IllegalArgumentException("Jedis configuration not found"); } } /** * 单机Redis连接配置. * * @param jedisPoolConfig JedisPool配置 * @return 单机Redis连接容器 * @throws NullPointerException if jedisPoolConfig is null */ public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) { Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null"); GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle()); JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(), jedisPoolConfig.getDatabase()); return new RedisContainer(jedisPool); } /** * 集群Redis连接配置. * * @param jedisClusterConfig 集群Redis配置 * @return 集群Redis连接容器 * @throws NullPointerException 如果jedisPoolConfig为空 */ public static RedisCommandsContainer build(MyRedisClusterConfig jedisClusterConfig) { try { Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null"); JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); // Jedis池:最大连接数 jedisPoolConfig.setMaxTotal(1); // Jedis池:最大空闲连接数 jedisPoolConfig.setMaxIdle(10); // Jedis池:等待时间 jedisPoolConfig.setMaxWaitMillis(3000); jedisPoolConfig.setTestOnBorrow(Boolean.TRUE); JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisPoolConfig); return new RedisClusterContainer(jedisCluster); } catch(Exception ex) { LOG.error(">>>>>>>>MyCluster initialized failed:", ex); return null; } } /** * 哨兵连接配置. * * @param jedisSentinelConfig 哨兵配置 * @return 哨兵连接容器 * @throws NullPointerException 如果jedisSentinelConfig为空 */ public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) { Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null"); GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle()); JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(), jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase()); return new RedisContainer(jedisSentinelPool); } }
Redis集群配置主要有两部分,
一,集群节点地址,如IP和PORT;
二,连接池参数,如最大连接数、最大等待时间、超时时间等。
这里重写的Redis集群连接直接使用HostAndPort方式,保证可以直接通过IP和PORT连接Redis集群。
实现代码如下:
package com.monkey.examples.redis.myredis; import org.apache.flink.streaming.connectors.redis.common.Util; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; import redis.clients.jedis.HostAndPort; import java.util.HashSet; import java.util.Objects; import java.util.Set; /** * Redis集群连接配置. * * @author xindaqi * @since 2022-07-16 15:21 */ public class MyRedisClusterConfig extends FlinkJedisConfigBase { private static final long serialVersionUID = 1L; /** * Redis集群节点地址:IP和PORT */ private final Set<HostAndPort> nodes; /** * 最大连接数 */ private final int maxRedirections; public MyRedisClusterConfig(Set<HostAndPort> nodes, int connectionTimeout, int maxRedirections, int maxTotal, int maxIdle, int minIdle) { super(connectionTimeout, maxTotal, maxIdle, minIdle); Objects.requireNonNull(nodes, "Node information should be presented"); Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty"); this.nodes = new HashSet(nodes); this.maxRedirections = maxRedirections; } public Set<HostAndPort> getNodes() { return nodes; } public int getMaxRedirections() { return this.maxRedirections; } public String toString() { return "JedisClusterConfig{nodes=" + this.nodes + ", timeout=" + this.connectionTimeout + ", maxRedirections=" + this.maxRedirections + ", maxTotal=" + this.maxTotal + ", maxIdle=" + this.maxIdle + ", minIdle=" + this.minIdle + '}'; } public static class Builder { private Set<HostAndPort> nodes; private int timeout = 2000; private int maxRedirections = 5; private int maxTotal = 8; private int maxIdle = 8; private int minIdle = 0; public Builder() { } public Builder setNodes(Set<HostAndPort> nodes) { this.nodes = nodes; return this; } public Builder setTimeout(int timeout) { this.timeout = timeout; return this; } public Builder setMaxRedirections(int maxRedirections) { this.maxRedirections = maxRedirections; return this; } public Builder setMaxTotal(int maxTotal) { this.maxTotal = maxTotal; return this; } public Builder setMaxIdle(int maxIdle) { this.maxIdle = maxIdle; return this; } public Builder setMinIdle(int minIdle) { this.minIdle = minIdle; return this; } public MyRedisClusterConfig build() { return new MyRedisClusterConfig(this.nodes, this.timeout, this.maxRedirections, this.maxTotal, this.maxIdle, this.minIdle); } } }
添加操作Redis的命令和数据类型,以及读取数据的方法。
package com.monkey.examples.redis.cluster; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; /** * 集群Redis命令配置. * * @author xindaqi * @since 2022-07-16 10:49 */ public class ClusterCommandConfig implements RedisMapper<Tuple2<String, Integer>> { @Override public RedisCommandDescription getCommandDescription() { // hset return new RedisCommandDescription(RedisCommand.HSET,"flink-cluster"); } @Override public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2) { return stringIntegerTuple2.f0; } @Override public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2) { return stringIntegerTuple2.f1.toString(); } }
用于写入Redis的数据。
package com.monkey.examples.redis; /** * 写入Redis的测试数据. * * @author xindaqi * @since 2022-07-15 16:48 */ public class FakeData { public static final String[] DATA = new String[] { "xiaohua, xiaohong", "xiaohong, xiaolan", "xiaohua, xiaoxiao" }; }
简单的词频统计,
抛砖引玉,提供简单的测试样例。
package com.monkey.examples.redis; /** * 数据分割. * * @author xindaqi * @since 2022-07-15 16:45 */ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> { public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = s.toLowerCase().split("\\W+"); for(String token : tokens){ if(token.length() > 0){ collector.collect(new Tuple2<String,Integer>(token,1)); } } } }
最终的操作主函数,连接Redis集群并向Redis写入数据,
通过Maven达成jar包,在Flink平台运行。
package com.monkey.examples.redis.cluster; import com.monkey.examples.redis.FakeData; import com.monkey.examples.redis.LineSplitter; import com.monkey.examples.redis.SinkRedisMapper; import com.monkey.examples.redis.myredis.MyRedisClusterConfig; import com.monkey.examples.redis.myredis.MyRedisSink; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; import redis.clients.jedis.HostAndPort; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.HashSet; import java.util.Set; /** * 连接Redis集群及存储数据. * * @author xindaqi * @since 2022-07-15 18:26 */ public class ClusterOps { public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = null; text = executionEnvironment.fromElements(FakeData.DATA); DataStream<Tuple2<String,Integer>> counts = text.flatMap(new LineSplitter()).keyBy(value -> value.f0).sum(1); //控制台打印 counts.print().setParallelism(1); // Redis集群节点地址 Set<HostAndPort> clusterNodes = new HashSet<>(6); clusterNodes.add(new HostAndPort("192.168.1.12", 9001)); clusterNodes.add(new HostAndPort("192.168.1.12", 9002)); clusterNodes.add(new HostAndPort("192.168.1.12", 9003)); clusterNodes.add(new HostAndPort("192.168.1.12", 9004)); clusterNodes.add(new HostAndPort("192.168.1.12", 9005)); clusterNodes.add(new HostAndPort("192.168.1.12", 9006)); MyRedisClusterConfig myConf = new MyRedisClusterConfig.Builder().setNodes(clusterNodes).build(); System.out.println(">>>>>>Nodes:" + myConf.getNodes()); counts.addSink(new MyRedisSink<>(myConf,new ClusterCommandConfig())); executionEnvironment.execute(); } }
在Flink平台提交任务,
操作步骤如下图所示:
核心:
(1)第三方Redis连接工具bahir使用InetSocketAddress配置Redis集群节点IP和PORT,无法连接私有IP的Redis集群;
(2)重写连接Redis集群代码,直接通过HostAndPort方式设置节点IP和PORT,保证私有的IP同样可以连接Redis集群;
(3)Flink原生不支持Redis连接,执行Redis相关类型任务需要将Redis相关的依赖打入jar包中;
(4)第三方Redis连接的依赖中排除jedis客户端,重新集成Jedis高版本依赖,如3.5.1。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。