当前位置:   article > 正文

Redis客户端Jedis,Lettuce和vertx的使用比较及部分源码解析_redisclient客户端

redisclient客户端

简介

Redis client可以说是有很多,不同的Client在使用方式,性能方面都有一些区别。Jedis作为老资格的redis client目前来说对redis的接口算是支持的最好的,也是使用起来最简单。Lettuce使用起来可能是三者之间最复杂的,但是也是性能最高的,特别是lettuce还支持了全异步的连接和连接池,更是加大了性能。vertx-redisClient作为全异步微服务框架vertx的一个组件,也是用在vertx框架中,也是一个全异步的redis-client,使用的话经常用在vertx框架中。

Redis client难度同步异步对Redis支持完善度性能
Jedis同步
Lettuce同步/异步
vertx-redisClient异步

客户端版本

Jedis:redis.clients:jedis:3.3.0

Lettuce:io.lettuce:lettuce-core:6.0.0.M1

Vertx-redisClient:io.vertx:vertx-redis-client:3.8.5

在此时间段使用的都是各个client最新的版本,这些版本也是最近做了一次比较大的更新,主要是为了支持redis6.0发布的对acl的支持,关于redis acl详细介绍可以看篇博客:https://www.cnblogs.com/zhoujinyi/p/13222464.html。Redis acl Jedis3.3.0已经支持,Lettuce也是发布了一个实验版本M1对acl做了支持,目前vertx-redisClient还是不支持redis acl的。

Jedis客户端

初始化redis client

  1. private fun creatRedisCluster() = JedisCluster(
  2. redisConfig.redisNodes.map { HostAndPort(it.host, it.port) }.toSet(),
  3. redisConfig.timeOut ?: 2000, // set connection time out, the jedis default is 2000ms
  4. redisConfig.timeOut ?: 2000, // set reconnection time out, the jedis default same as connection time out
  5. redisConfig.reconnectAttempts ?: 5, // max reconnection times
  6. redisConfig.username,
  7. redisConfig.password,
  8. null, // set client connection name, needn't configuration here, the jedis default is null
  9. JedisPoolConfig().apply {
  10. redisConfig.poolConfig.let {
  11. maxIdle = it.maxIdle
  12. minIdle = it.minIdle
  13. maxTotal = it.maxTotal
  14. }
  15. }
  16. )

我们直接使用的是集群模式,其他模式更加简单就不做多说,详细的请参考:GitHub - redis/jedis: Redis Java client。如果你看过Redis Lettuce客户端异步连接池详解就应该知道,Lettuce在集群模式下我们需要给客户端设置cluster拓扑刷新机制,在集群出现问题或者变动的时候客户端能及时的刷新cluster拓扑从而防止各种异常的持续出现。可以看到我们在初始化Jedis Client的时候并没有给设置cluster拓扑刷新之类的属性,然而Jedis也没有给我们提供这样的接口或者配置,难道是Jedis不支持这样的配置么,答案肯定是否定的,之前也说了Jedis目前是对Redis支持的最好的客户端,那么怎么可能不考虑这样的问题呢,其实Jedis在底层中已经自动实现了这样的配置,我们看Jedis源码:

  1. public T run(String key) {
  2. return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null);
  3. }

在使用Jedis Client的时候,无论你使用那个API最终调的都是这个API,最终会走到runWithRetries这个接口:

  1. private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) {
  2. if (attempts <= 0) {
  3. throw new JedisClusterMaxAttemptsException("No more cluster attempts left.");
  4. }
  5. Jedis connection = null;
  6. try {
  7. if (redirect != null) {
  8. connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode());
  9. if (redirect instanceof JedisAskDataException) {
  10. // TODO: Pipeline asking with the original command to make it faster....
  11. connection.asking();
  12. }
  13. } else {
  14. if (tryRandomNode) {
  15. connection = connectionHandler.getConnection();
  16. } else {
  17. connection = connectionHandler.getConnectionFromSlot(slot);
  18. }
  19. }
  20. return execute(connection);
  21. } catch (JedisNoReachableClusterNodeException jnrcne) {
  22. throw jnrcne;
  23. } catch (JedisConnectionException jce) {
  24. // release current connection before recursion
  25. releaseConnection(connection);
  26. connection = null;
  27. if (attempts <= 1) {
  28. //We need this because if node is not reachable anymore - we need to finally initiate slots
  29. //renewing, or we can stuck with cluster state without one node in opposite case.
  30. //But now if maxAttempts = [1 or 2] we will do it too often.
  31. //TODO make tracking of successful/unsuccessful operations for node - do renewing only
  32. //if there were no successful responses from this node last few seconds
  33. this.connectionHandler.renewSlotCache();
  34. }
  35. return runWithRetries(slot, attempts - 1, tryRandomNode, redirect);
  36. } catch (JedisRedirectionException jre) {
  37. // if MOVED redirection occurred,
  38. if (jre instanceof JedisMovedDataException) {
  39. // it rebuilds cluster's slot cache recommended by Redis cluster specification
  40. this.connectionHandler.renewSlotCache(connection);
  41. }
  42. // release current connection before recursion
  43. releaseConnection(connection);
  44. connection = null;
  45. return runWithRetries(slot, attempts - 1, false, jre);
  46. } finally {
  47. releaseConnection(connection);
  48. }
  49. }

可以看到当发生JedisConnectionException或者JedisRedirectionException异常的时候里面都会在某种情况下调renewSlotCache这个接口,其实这个接口实际上就是在做我们刚说的cluster的拓扑刷新,也就说说Jedis他已经实现了这样的功能,只不过是在我们使用Cluster Api的时候发生部分异常的时候会去自己刷新拓扑。至于Jedis具体是怎么刷新拓扑的有兴趣的可以继续往下跟源码,这里就不多做介绍。

API实例

因为Jedis的api都是同步的,所以我们只示例一个同步的接口

fun exists(key: String): Boolean = redisCommand.exists(key)

Lettuce客户端

初始化Redis Cleint

Lettuce Redis Client的初始化就不再这里多说了,之前的Redis Lettuce客户端异步连接池详解

API实例

  1. suspend inline fun <T, R> AsyncPool<T>.use(block: (T) -> R): R {
  2. // Await uses future inside a suspend coroutine
  3. val borrowed = acquire().await()
  4. try {
  5. return block(borrowed)
  6. } finally {
  7. release(borrowed)
  8. }
  9. }
  10. suspend fun exists(key: String): RedisFuture<Long> =
  11. redisAsyncPool.redisPool.use { conn ->
  12. return conn.async().exists(key)
  13. }

这个是一个直接使用lettuce api的接口,但是我们实际调用中,可以有三种实现再去封装,以提供给不同的场景

  • 提供给kotlin的异步
  • 提供给java的异步
  • 同步
  1. // kotlin异步使用,使用suspeng挂起函数,使用await获取Fauter<T>的值
  2. suspend fun isRevokedAsync(key: String): Boolean =
  3. exists(key).await().toInt() != 0
  4. // java异步使用,使用java1.8提供的CompletionStage<T>作为返回值类型
  5. fun isRevokedFuture(key: String): CompletionStage<Boolean> =
  6. redisAsyncPool.redisPool.let { redisPool ->
  7. redisPool.acquire().thenCompose { conn ->
  8. redisPool.use(conn) {
  9. conn.async().exists(key)
  10. .thenApply { it.toInt() != 0 }
  11. }
  12. }
  13. }
  14. // 同步
  15. fun isRevoked(key: String): Boolean =
  16. redisAsyncPool.redisPool.let { redisPool ->
  17. val conn = redisPool.acquire().get()
  18. redisPool.use(conn) {
  19. conn.sync().exists(key).toInt() != 0
  20. }
  21. }

Vertx-redisClient客户端

初始化vertx-redisClient

  1. private fun redisCluster(): RedisAPI {
  2. val endpoints = redisConfig.redisNodes.mapNotNull {
  3. SocketAddress.inetSocketAddress(it.port, it.host)
  4. }.toMutableList()
  5. val redisOptions = RedisOptions()
  6. .setType(RedisClientType.CLUSTER) // set redis client type: cluster
  7. .setEndpoints(endpoints)
  8. .setPassword(redisConfig.password)
  9. .setUseSlave(RedisSlaves.SHARE) // set SLAVE nodes can randomly
  10. .setNetClientOptions(
  11. NetClientOptions()
  12. .setReconnectAttempts(redisConfig.reconnectAttempts ?: 0) // set reconnection times
  13. .setReconnectInterval(redisConfig.reconnectInterval
  14. ?: 1000) // set reconnection interval
  15. )
  16. val client = Redis.createClient(vertx, redisOptions)
  17. return RedisAPI.api(client)
  18. }

API实例

  1. private fun exists(args: List<String>, handle: (AsyncResult<Response>) -> Unit) {
  2. redisAPI.exists(args, handle)
  3. }
  4. // 使用kotlin suspendCoroutine<T>从回调函数中获取想要的返回值
  5. suspend fun isRevokedAsync(key: String) = suspendCoroutine<Boolean> { cont ->
  6. exists(listOf(key)) { result ->
  7. // 成功返回结果
  8. if (result.succeeded()) {
  9. cont.resume(result.result().get(0).toBoolean())
  10. // 否则抛一个Exception
  11. } else {
  12. cont.resumeWithException(result.cause())
  13. }
  14. }
  15. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/923170
推荐阅读
相关标签
  

闽ICP备14008679号