赞
踩
Redis client可以说是有很多,不同的Client在使用方式,性能方面都有一些区别。Jedis作为老资格的redis client目前来说对redis的接口算是支持的最好的,也是使用起来最简单。Lettuce使用起来可能是三者之间最复杂的,但是也是性能最高的,特别是lettuce还支持了全异步的连接和连接池,更是加大了性能。vertx-redisClient作为全异步微服务框架vertx的一个组件,也是用在vertx框架中,也是一个全异步的redis-client,使用的话经常用在vertx框架中。
Redis client | 难度 | 同步异步 | 对Redis支持完善度 | 性能 |
Jedis | 低 | 同步 | 高 | 低 |
Lettuce | 高 | 同步/异步 | 中 | 高 |
vertx-redisClient | 中 | 异步 | 低 | 中 |
Jedis:redis.clients:jedis:3.3.0
Lettuce:io.lettuce:lettuce-core:6.0.0.M1
Vertx-redisClient:io.vertx:vertx-redis-client:3.8.5
在此时间段使用的都是各个client最新的版本,这些版本也是最近做了一次比较大的更新,主要是为了支持redis6.0发布的对acl的支持,关于redis acl详细介绍可以看篇博客:https://www.cnblogs.com/zhoujinyi/p/13222464.html。Redis acl Jedis3.3.0已经支持,Lettuce也是发布了一个实验版本M1对acl做了支持,目前vertx-redisClient还是不支持redis acl的。
- private fun creatRedisCluster() = JedisCluster(
- redisConfig.redisNodes.map { HostAndPort(it.host, it.port) }.toSet(),
- redisConfig.timeOut ?: 2000, // set connection time out, the jedis default is 2000ms
- redisConfig.timeOut ?: 2000, // set reconnection time out, the jedis default same as connection time out
- redisConfig.reconnectAttempts ?: 5, // max reconnection times
- redisConfig.username,
- redisConfig.password,
- null, // set client connection name, needn't configuration here, the jedis default is null
- JedisPoolConfig().apply {
- redisConfig.poolConfig.let {
- maxIdle = it.maxIdle
- minIdle = it.minIdle
- maxTotal = it.maxTotal
- }
- }
- )
我们直接使用的是集群模式,其他模式更加简单就不做多说,详细的请参考:GitHub - redis/jedis: Redis Java client。如果你看过Redis Lettuce客户端异步连接池详解就应该知道,Lettuce在集群模式下我们需要给客户端设置cluster拓扑刷新机制,在集群出现问题或者变动的时候客户端能及时的刷新cluster拓扑从而防止各种异常的持续出现。可以看到我们在初始化Jedis Client的时候并没有给设置cluster拓扑刷新之类的属性,然而Jedis也没有给我们提供这样的接口或者配置,难道是Jedis不支持这样的配置么,答案肯定是否定的,之前也说了Jedis目前是对Redis支持的最好的客户端,那么怎么可能不考虑这样的问题呢,其实Jedis在底层中已经自动实现了这样的配置,我们看Jedis源码:
- public T run(String key) {
- return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
- }
在使用Jedis Client的时候,无论你使用那个API最终调的都是这个API,最终会走到runWithRetries这个接口:
- private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
- if (attempts <= 0) {
- throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");
- }
-
- Jedis connection = null;
- try {
-
- if (redirect != null) {
- connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
- if (redirect instanceof JedisAskDataException) {
- // TODO: Pipeline asking with the original command to make it faster....
- connection.asking();
- }
- } else {
- if (tryRandomNode) {
- connection = connectionHandler.getConnection();
- } else {
- connection = connectionHandler.getConnectionFromSlot(slot);
- }
- }
-
- return execute(connection);
-
- } catch (JedisNoReachableClusterNodeException jnrcne) {
- throw jnrcne;
- } catch (JedisConnectionException jce) {
- // release current connection before recursion
- releaseConnection(connection);
- connection = null;
-
- if (attempts <= 1) {
- //We need this because if node is not reachable anymore - we need to finally initiate slots
- //renewing, or we can stuck with cluster state without one node in opposite case.
- //But now if maxAttempts = [1 or 2] we will do it too often.
- //TODO make tracking of successful/unsuccessful operations for node - do renewing only
- //if there were no successful responses from this node last few seconds
- this.connectionHandler.renewSlotCache();
- }
-
- return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
- } catch (JedisRedirectionException jre) {
- // if MOVED redirection occurred,
- if (jre instanceof JedisMovedDataException) {
- // it rebuilds cluster's slot cache recommended by Redis cluster specification
- this.connectionHandler.renewSlotCache(connection);
- }
-
- // release current connection before recursion
- releaseConnection(connection);
- connection = null;
-
- return runWithRetries(slot, attempts - 1, false, jre);
- } finally {
- releaseConnection(connection);
- }
- }
可以看到当发生JedisConnectionException或者JedisRedirectionException异常的时候里面都会在某种情况下调renewSlotCache这个接口,其实这个接口实际上就是在做我们刚说的cluster的拓扑刷新,也就说说Jedis他已经实现了这样的功能,只不过是在我们使用Cluster Api的时候发生部分异常的时候会去自己刷新拓扑。至于Jedis具体是怎么刷新拓扑的有兴趣的可以继续往下跟源码,这里就不多做介绍。
因为Jedis的api都是同步的,所以我们只示例一个同步的接口
fun exists(key: String): Boolean = redisCommand.exists(key)
Lettuce Redis Client的初始化就不再这里多说了,之前的Redis Lettuce客户端异步连接池详解。
- suspend inline fun <T, R> AsyncPool<T>.use(block: (T) -> R): R {
- // Await uses future inside a suspend coroutine
- val borrowed = acquire().await()
- try {
- return block(borrowed)
- } finally {
- release(borrowed)
- }
- }
-
- suspend fun exists(key: String): RedisFuture<Long> =
- redisAsyncPool.redisPool.use { conn ->
- return conn.async().exists(key)
- }
这个是一个直接使用lettuce api的接口,但是我们实际调用中,可以有三种实现再去封装,以提供给不同的场景
- // kotlin异步使用,使用suspeng挂起函数,使用await获取Fauter<T>的值
- suspend fun isRevokedAsync(key: String): Boolean =
- exists(key).await().toInt() != 0
-
- // java异步使用,使用java1.8提供的CompletionStage<T>作为返回值类型
- fun isRevokedFuture(key: String): CompletionStage<Boolean> =
- redisAsyncPool.redisPool.let { redisPool ->
- redisPool.acquire().thenCompose { conn ->
- redisPool.use(conn) {
- conn.async().exists(key)
- .thenApply { it.toInt() != 0 }
- }
- }
- }
-
- // 同步
- fun isRevoked(key: String): Boolean =
- redisAsyncPool.redisPool.let { redisPool ->
- val conn = redisPool.acquire().get()
- redisPool.use(conn) {
- conn.sync().exists(key).toInt() != 0
- }
- }
- private fun redisCluster(): RedisAPI {
- val endpoints = redisConfig.redisNodes.mapNotNull {
- SocketAddress.inetSocketAddress(it.port, it.host)
- }.toMutableList()
-
- val redisOptions = RedisOptions()
- .setType(RedisClientType.CLUSTER) // set redis client type: cluster
- .setEndpoints(endpoints)
- .setPassword(redisConfig.password)
- .setUseSlave(RedisSlaves.SHARE) // set SLAVE nodes can randomly
- .setNetClientOptions(
- NetClientOptions()
- .setReconnectAttempts(redisConfig.reconnectAttempts ?: 0) // set reconnection times
- .setReconnectInterval(redisConfig.reconnectInterval
- ?: 1000) // set reconnection interval
- )
- val client = Redis.createClient(vertx, redisOptions)
- return RedisAPI.api(client)
- }
- private fun exists(args: List<String>, handle: (AsyncResult<Response>) -> Unit) {
- redisAPI.exists(args, handle)
- }
-
- // 使用kotlin suspendCoroutine<T>从回调函数中获取想要的返回值
- suspend fun isRevokedAsync(key: String) = suspendCoroutine<Boolean> { cont ->
- exists(listOf(key)) { result ->
- // 成功返回结果
- if (result.succeeded()) {
- cont.resume(result.result().get(0).toBoolean())
- // 否则抛一个Exception
- } else {
- cont.resumeWithException(result.cause())
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。