当前位置:   article > 正文

重写代码实现Flink连接Redis集群(Java版,解决Flink无法连接私有IP Redis集群问题)_flink.streaming.redis maven

flink.streaming.redis maven

1 缘起

团队需要使用实时数据(小时级别)给业务系统使用,
数据流向是这样:实时数据-》Kafka-》Flink-》Redis(集群和哨兵),
可是,Flink原生并不支持Redis连接,
于是,有第三方提供连接工具,如https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
但是,我在实际操作的过程中,遇到问题:无法初始化Redis集群连接,异常信息如下,
大家也不用参考官方的bahir文档 了,没有作用。
所以,我探究了一下(其实是好几下)bahir的源码,准备重新写Redis的集群连接,
这里有一个小插曲,本来打算继承FlinkJedisClusterConfig这个类重新写Redis的集群连接,
发现作者还是很聪明的,将构造器设置为私有(private),不让继承,所以,这条路断了,
另寻其他出路,依据bahir重写Redis集群连接相关代码,提高Redis客户端版本,
帮助其他读者在使用时避坑,
分享如下(包括重写的代码)。

  • 异常信息
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[?:1.8.0_262]
    at java.lang.Integer.parseInt(Integer.java:580) ~[?:1.8.0_262]
    at java.lang.Integer.valueOf(Integer.java:766) ~[?:1.8.0_262]
    at redis.clients.util.ClusterNodeInformationParser.getHostAndPortFromNodeLine(ClusterNodeInformationParser.java:39) ~[blob_p-a436c38c6f05b2f134b34bb28d2ce7eaba02a621-50f891edc883b56f49b72ec65d36ab09:?]
    at redis.clients.util.ClusterNodeInformationParser.parse(ClusterNodeInformationParser.java:14) ~[blob_p-a436c38c6f05b2f134b34bb28d2ce7eaba02a621-50f891edc883b56f49b72ec65d36ab09:?]

  • 异常信息截图
    在这里插入图片描述

2 为什么第三方(bahir)提供的工具无法连接Redis集群?

该工具连接集群配置节点的方式为:InetSocketAddress,
我们知道,这种方式(InetSocketAddress)是无法获取私有IP的,通过InetSocketAddress.getHostame获取Linux直接的私有IP时,无法获取IP地址,因此,无法连接到Redis。配置Redis集群节点源码如下:

位置:org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig.Builder#setNodes
在这里插入图片描述
如上可知配置Redis集群节点地址是通过InetSocketAddress,但是,搞笑的是,获取节点时,将节点InetSocketAddress转换为HostAndPort,源码如下,为什么一开始不直接使用HostAndPort方式配置Redis集群节点?这样获取节点时就不用再转换一次,并且,造成Flink无法连接私有IP的Redis集群,原作者为什么这么做?表示不理解。吐槽归吐槽,最终落脚点还是要解决问题,
如何使Flink连接私有IP的Redis集群,参见下文。
位置:org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig#getNodes
在这里插入图片描述

3 完整依赖

基于Java实现Flink连接及操作Redis,添加的完整依赖如下,
可以直接使用,其中,

  • 添加jedis客户端3.5.1
    原生Redis连接flink-connector-redis_2.11依赖的Jedis客户端版本为2.8.0,
    无法连接Redis私有IP集群,因此,排除该包,重新添加高版本Jedis客户端。

  • 集成maven-assembly-plugin
    打包时,需要将以来打入jar包,特添加了:maven-assembly-plugin,
    将项目使用的所有依赖均放入最终的jar包中,正常情况下,Flink中已经包含了部分依赖,
    可是,Flink官方不支持Redis连接,所以,没有Redis相关的依赖,
    想使用Flink操作Redis,需要将Redis相关的包一起包装在jar包中。

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>bigdata-tutorial</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>bigdata-tutorial</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.14.5</flink.version>
    <scala.version>2.12</scala.version>
    <redis.version>3.5.1</redis.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_${scala.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>flink-connector-redis_2.11</artifactId>
      <version>1.0</version>
      <exclusions>
        <exclusion>
          <groupId>redis.clients</groupId>
          <artifactId>jedis</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>${redis.version}</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>8</source>
          <target>8</target>
          <encoding>UTF-8</encoding>
        </configuration>
      </plugin>
      <plugin>
        <!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-assembly-plugin -->
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.3.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass>
                com.monkey.App
              </mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112

4 重写代码

代码重写了三个部分:

  • Sink入口
  • Redis连接工厂
  • Redis集群连接配置

4.1 Sink入口

Flink连接如Kafka、MySQL、Redis等需要添加配置对应的Sink,
因此,实现Flink操作Redis,需要重写Redis的Sink,
根据Flink的设计模型,Redis的Sink需要继承RichSinkFunction,
重写的代码如下:

package com.monkey.examples.redis.myredis;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Objects;

/**
 * Redis进入Flink门户(暂且这么理解,后续完善).
 *
 * @author xindaqi
 * @since 2022-07-16 17:57
 */
public class MyRedisSink<IN> extends RichSinkFunction<IN> {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class);

    /**
     * 补充的Redis数据类型.如{@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
     * 其他的数据类型 {@link RedisDataType} 只有两个变量,如添加名称列表和值.
     * 但是对于 {@link RedisDataType#HASH} 和 {@link RedisDataType#SORTED_SET}需要三个变量.
     * <p>对 {@link RedisDataType#HASH} 需要名称, 键和元素(值).
     * {@code additionalKey} 用作 {@link RedisDataType#HASH}的名称
     * <p>对于 {@link RedisDataType#SORTED_SET} 需要名称、元素和权重(打分).
     * {@code additionalKey} 用作 {@link RedisDataType#SORTED_SET}的名称
     */
    private String additionalKey;
    private RedisMapper<IN> redisSinkMapper;
    private RedisCommand redisCommand;

    private FlinkJedisConfigBase flinkJedisConfigBase;
    private RedisCommandsContainer redisCommandsContainer;

    /**
     * 新建与Redis服务的连接,{@link MyRedisSink} .
     *
     * @param flinkJedisConfigBase Redis配置 {@link FlinkJedisConfigBase}
     * @param redisSinkMapper      生成Redis命令和键值.
     */
    public MyRedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
        Objects.requireNonNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
        Objects.requireNonNull(redisSinkMapper, "Redis Mapper can not be null");
        Objects.requireNonNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
        this.flinkJedisConfigBase = flinkJedisConfigBase;
        this.redisSinkMapper = redisSinkMapper;
        RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
        this.redisCommand = redisCommandDescription.getCommand();
        this.additionalKey = redisCommandDescription.getAdditionalKey();
    }

    /**
     * 当新数据进入sink时回调该方法,并转发到Redis通道.
     * 使用何种的命令取决于Redis数据类型 (参见 {@link RedisDataType}),
     * 可用的命令有: RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD.
     *
     * @param input 输入数据
     */
    @Override
    public void invoke(IN input) throws Exception {
        String key = redisSinkMapper.getKeyFromData(input);
        String value = redisSinkMapper.getValueFromData(input);

        switch (redisCommand) {
            case RPUSH:
                this.redisCommandsContainer.rpush(key, value);
                break;
            case LPUSH:
                this.redisCommandsContainer.lpush(key, value);
                break;
            case SADD:
                this.redisCommandsContainer.sadd(key, value);
                break;
            case SET:
                this.redisCommandsContainer.set(key, value);
                break;
            case PFADD:
                this.redisCommandsContainer.pfadd(key, value);
                break;
            case PUBLISH:
                this.redisCommandsContainer.publish(key, value);
                break;
            case ZADD:
                this.redisCommandsContainer.zadd(this.additionalKey, value, key);
                break;
            case ZREM:
                this.redisCommandsContainer.zrem(this.additionalKey, key);
                break;
            case HSET:
                this.redisCommandsContainer.hset(this.additionalKey, key, value);
                break;
            default:
                throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
        }
    }

    /**
     * 初始化Redis连接:单体、哨兵或者集群.
     *
     * @throws IllegalArgumentException 如果jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig为空
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        try {
            this.redisCommandsContainer = MyRedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
            this.redisCommandsContainer.open();
        } catch (Exception e) {
            LOG.error("Redis 初始化失败: ", e);
            throw e;
        }
    }

    /**
     * Closes commands container.
     *
     * @throws IOException if command container is unable to close.
     */
    @Override
    public void close() throws IOException {
        if (redisCommandsContainer != null) {
            redisCommandsContainer.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135

4.2 Redis连接工厂

我们知道,Redis发展至今,Redis最常用的3中存在形式为:单机、哨兵和集群。
所以,为了提高兼容性,设计时,需要考虑这3中方式的连接,
因此,这里设计了一个连接工厂,根据不同的对象类型,新建对一个的连接,
实现如下:

package com.monkey.examples.redis.myredis;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
import org.apache.flink.streaming.connectors.redis.common.container.RedisClusterContainer;
import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
import org.apache.flink.streaming.connectors.redis.common.container.RedisContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisSentinelPool;

import java.util.Objects;

/**
 * Redis连接配置工厂.
 *
 * @author xindaqi
 * @since 2022-07-16 17:59
 */
public class MyRedisCommandsContainerBuilder {

    private static final Logger LOG = LoggerFactory.getLogger(MyRedisCommandsContainerBuilder.class);

    /**
     * 根据连接类型初始化Redis连接,包括单机Redis、集群Redis和哨兵Redis
     * 参见:{@link RedisCommandsContainer}
     *
     * @param flinkJedisConfigBase redis配置
     * @return @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
     */
    public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){
        if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){
            FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase;
            return MyRedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
        } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
            FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase;
            return MyRedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
        } else if (flinkJedisConfigBase instanceof MyRedisClusterConfig) {
            MyRedisClusterConfig flinkJedisClusterConfig = (MyRedisClusterConfig) flinkJedisConfigBase;
            return MyRedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
        } else {
            throw new IllegalArgumentException("Jedis configuration not found");
        }
    }

    /**
     * 单机Redis连接配置.
     *
     * @param jedisPoolConfig JedisPool配置
     * @return 单机Redis连接容器
     * @throws NullPointerException if jedisPoolConfig is null
     */
    public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
        Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null");

        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
        genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
        genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());

        JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
                jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
                jedisPoolConfig.getDatabase());
        return new RedisContainer(jedisPool);
    }


    /**
     * 集群Redis连接配置.
     *
     * @param jedisClusterConfig 集群Redis配置
     * @return 集群Redis连接容器
     * @throws NullPointerException 如果jedisPoolConfig为空
     */
    public static RedisCommandsContainer build(MyRedisClusterConfig jedisClusterConfig) {
        try {
            Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null");
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            // Jedis池:最大连接数
            jedisPoolConfig.setMaxTotal(1);
            // Jedis池:最大空闲连接数
            jedisPoolConfig.setMaxIdle(10);
            // Jedis池:等待时间
            jedisPoolConfig.setMaxWaitMillis(3000);
            jedisPoolConfig.setTestOnBorrow(Boolean.TRUE);
            JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisPoolConfig);
            return new RedisClusterContainer(jedisCluster);
        } catch(Exception ex) {
            LOG.error(">>>>>>>>MyCluster initialized failed:", ex);
            return null;
        }

    }

    /**
     * 哨兵连接配置.
     *
     * @param jedisSentinelConfig 哨兵配置
     * @return 哨兵连接容器
     * @throws NullPointerException 如果jedisSentinelConfig为空
     */
    public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
        Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null");

        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
        genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
        genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());

        JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
                jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
                jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
                jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
        return new RedisContainer(jedisSentinelPool);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121

4.3 Redis集群连接配置

Redis集群配置主要有两部分,
一,集群节点地址,如IP和PORT;
二,连接池参数,如最大连接数、最大等待时间、超时时间等。
这里重写的Redis集群连接直接使用HostAndPort方式,保证可以直接通过IP和PORT连接Redis集群。
实现代码如下:

package com.monkey.examples.redis.myredis;

import org.apache.flink.streaming.connectors.redis.common.Util;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import redis.clients.jedis.HostAndPort;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

/**
 * Redis集群连接配置.
 *
 * @author xindaqi
 * @since 2022-07-16 15:21
 */
public class MyRedisClusterConfig extends FlinkJedisConfigBase {

    private static final long serialVersionUID = 1L;
    /**
     * Redis集群节点地址:IP和PORT
     */
    private final Set<HostAndPort> nodes;

    /**
     * 最大连接数
     */
    private final int maxRedirections;

    public MyRedisClusterConfig(Set<HostAndPort> nodes, int connectionTimeout, int maxRedirections, int maxTotal, int maxIdle, int minIdle) {
        super(connectionTimeout, maxTotal, maxIdle, minIdle);
        Objects.requireNonNull(nodes, "Node information should be presented");
        Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
        this.nodes = new HashSet(nodes);
        this.maxRedirections = maxRedirections;
    }

    public Set<HostAndPort> getNodes() {
        return nodes;
    }

    public int getMaxRedirections() {
        return this.maxRedirections;
    }

    public String toString() {
        return "JedisClusterConfig{nodes=" + this.nodes + ", timeout=" + this.connectionTimeout + ", maxRedirections=" + this.maxRedirections + ", maxTotal=" + this.maxTotal + ", maxIdle=" + this.maxIdle + ", minIdle=" + this.minIdle + '}';
    }

    public static class Builder {
        private Set<HostAndPort> nodes;
        private int timeout = 2000;
        private int maxRedirections = 5;
        private int maxTotal = 8;
        private int maxIdle = 8;
        private int minIdle = 0;

        public Builder() {
        }

        public Builder setNodes(Set<HostAndPort> nodes) {
            this.nodes = nodes;
            return this;
        }

        public Builder setTimeout(int timeout) {
            this.timeout = timeout;
            return this;
        }

        public Builder setMaxRedirections(int maxRedirections) {
            this.maxRedirections = maxRedirections;
            return this;
        }

        public Builder setMaxTotal(int maxTotal) {
            this.maxTotal = maxTotal;
            return this;
        }

        public Builder setMaxIdle(int maxIdle) {
            this.maxIdle = maxIdle;
            return this;
        }

        public Builder setMinIdle(int minIdle) {
            this.minIdle = minIdle;
            return this;
        }

        public MyRedisClusterConfig build() {
            return new MyRedisClusterConfig(this.nodes, this.timeout, this.maxRedirections, this.maxTotal, this.maxIdle, this.minIdle);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95

5 测试

5.1 Redis命令配置

添加操作Redis的命令和数据类型,以及读取数据的方法。

package com.monkey.examples.redis.cluster;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * 集群Redis命令配置.
 *
 * @author xindaqi
 * @since 2022-07-16 10:49
 */
public class ClusterCommandConfig implements RedisMapper<Tuple2<String, Integer>> {

    @Override
    public RedisCommandDescription getCommandDescription() {
        // hset
        return new RedisCommandDescription(RedisCommand.HSET,"flink-cluster");
    }

    @Override
    public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2) {
        return stringIntegerTuple2.f0;
    }

    @Override
    public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2) {
        return stringIntegerTuple2.f1.toString();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

5.2 测试数据

用于写入Redis的数据。

package com.monkey.examples.redis;

/**
 * 写入Redis的测试数据.
 *
 * @author xindaqi
 * @since 2022-07-15 16:48
 */
public class FakeData {

    public static final String[] DATA =
            new String[] {
                    "xiaohua, xiaohong",
                    "xiaohong, xiaolan",
                    "xiaohua, xiaoxiao"
            };
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

5.3 数据统计

简单的词频统计,
抛砖引玉,提供简单的测试样例。

package com.monkey.examples.redis;

/**
 * 数据分割.
 *
 * @author xindaqi
 * @since 2022-07-15 16:45
 */
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
        String[] tokens = s.toLowerCase().split("\\W+");
        for(String token : tokens){
            if(token.length() > 0){
                collector.collect(new Tuple2<String,Integer>(token,1));
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

5.4 Redis连接及填充数据

最终的操作主函数,连接Redis集群并向Redis写入数据,
通过Maven达成jar包,在Flink平台运行。

package com.monkey.examples.redis.cluster;

import com.monkey.examples.redis.FakeData;
import com.monkey.examples.redis.LineSplitter;
import com.monkey.examples.redis.SinkRedisMapper;
import com.monkey.examples.redis.myredis.MyRedisClusterConfig;
import com.monkey.examples.redis.myredis.MyRedisSink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
import redis.clients.jedis.HostAndPort;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

/**
 * 连接Redis集群及存储数据.
 *
 * @author xindaqi
 * @since 2022-07-15 18:26
 */
public class ClusterOps {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> text = null;
        text = executionEnvironment.fromElements(FakeData.DATA);
        DataStream<Tuple2<String,Integer>> counts = text.flatMap(new LineSplitter()).keyBy(value -> value.f0).sum(1);
        //控制台打印
        counts.print().setParallelism(1);
        // Redis集群节点地址
        Set<HostAndPort> clusterNodes = new HashSet<>(6);
        clusterNodes.add(new HostAndPort("192.168.1.12", 9001));
        clusterNodes.add(new HostAndPort("192.168.1.12", 9002));
        clusterNodes.add(new HostAndPort("192.168.1.12", 9003));
        clusterNodes.add(new HostAndPort("192.168.1.12", 9004));
        clusterNodes.add(new HostAndPort("192.168.1.12", 9005));
        clusterNodes.add(new HostAndPort("192.168.1.12", 9006));

        MyRedisClusterConfig myConf = new MyRedisClusterConfig.Builder().setNodes(clusterNodes).build();
        System.out.println(">>>>>>Nodes:" + myConf.getNodes());
        counts.addSink(new MyRedisSink<>(myConf,new ClusterCommandConfig()));
        executionEnvironment.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

5.5 提交任务

在Flink平台提交任务,
操作步骤如下图所示:
在这里插入图片描述

  • 成功运行
    在这里插入图片描述
  • 查看数据
    完成Redis连接和写入数据。
    在这里插入图片描述

6 小结

核心:
(1)第三方Redis连接工具bahir使用InetSocketAddress配置Redis集群节点IP和PORT,无法连接私有IP的Redis集群;
(2)重写连接Redis集群代码,直接通过HostAndPort方式设置节点IP和PORT,保证私有的IP同样可以连接Redis集群;
(3)Flink原生不支持Redis连接,执行Redis相关类型任务需要将Redis相关的依赖打入jar包中;
(4)第三方Redis连接的依赖中排除jedis客户端,重新集成Jedis高版本依赖,如3.5.1。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/767761
推荐阅读
相关标签
  

闽ICP备14008679号