当前位置:   article > 正文

Redis入门--头歌实验Redis事务与流水线

Redis入门--头歌实验Redis事务与流水线
任务描述

本关任务:编写一个商品交易平台的后端处理逻辑。

相关知识

手机、互联网普遍的当下,系统会同时处理多客户端的请求,而在多客户端同时处理相同的数据时,数据一致性就变得十分重要,稍不谨慎的操作就会导致数据出错。本关卡以用户购买商品这一实际应用场景为背景,实现使用 Redis 事务保证数据一致性。

用户购买商品依托一个商品交易平台进行,该平台中定义了一些数据结构:

  • 用户信息存储在哈希键 users:* 中(其中*是用户ID),记录了两个属性:
    • 用户姓名(name
    • 用户余额(funds
  • 用户仓库用集合键 inventory:* 保存(其中*是用户ID),其中元素为:
    • 商品的唯一标识。
  • 如下所示:

  • 同时我们使用一个有序集合 market 存储商品买卖信息:

  • 成员为:
    • 由商品 ID 和卖家 ID 通过英文字符 . 拼接而成。
    • 例如:ItemO.27
  • 分值为:商品售价。

为了完成本关任务,你需要掌握:1.Redis事务的特性,2.将商品加入平台的实现方式,3.购买商品的实现方式,4.非事务性流水线。

Redis事务的特性
Redis事务概述

Redis 中的事务是一组命令的集合,事务和命令一样,是 Redis 的最小执行单位。事务保证这组命令要么都执行,要么都不执行(All or Nothing)。

事务的原理是将一组命令发送给 Redis,然后再让 Redis 依次执行这些命令:

  1. 127.0.0.1:6379> MULTI
  2. OK
  3. 127.0.0.1:6379> SADD test:set:1 1
  4. QUEUED
  5. 127.0.0.1:6379> SADD test:set:2 2
  6. QUEUED
  7. 127.0.0.1:6379> EXEC
  8. 1) (integer) 1
  9. 2) (integer) 1

Redis 在接收到 EXEC 命令后才会将事务队列中的所有命令依次执行,并获取其执行的结果(返回值)。在事务执行完毕前,用户无法根据事务中命令的结果来做不同处理(提交或回滚)。

由于 Redis 事务是最小执行单位,所以它保证一个事务内的命令不被其他命令插入,在事务执行完毕后,Redis 才会响应其他请求。

Redis事务错误处理

Redis 事务执行遇到错误时,会根据错误原因做不同处理:

  • 语法错误(命令不存在/命令参数错误)
    • 事务的所有命令均不执行
    • 返回错误。
  • 运行中错误(使用不合适的命令操作键等)
    • 除了出现错误的命令不执行,其他均执行
    • 返回每个命令的执行结果。

Redis 事务的错误处理机制可以看出,Redis 不提供关系型数据库的回滚(ROLLBACK)功能,在运行出错时,需要自己将数据库复原到事务执行前的状态。保证事务要么都执行,要么都不执行的特性。

Redis数据一致性保证

要保证数据一致性,就是要防止多个进程同时操作同一个数据时产生资源争抢。加锁是确保数据一致性的方法之一,一般分为乐观锁和悲观锁。

乐观锁与悲观锁

悲观锁(pessimistic locking,关系型数据库会对被访问的数据行进行加悲观锁,直到事务被提交(Commit)或回滚(ROLLBACK)时解除,此时如果其他客户端试图对被加锁数据进行访问,则会被阻塞到第一个事务执行完毕。

悲观锁会导致其他客户端的长时间等待,所以 Redis 采用了乐观锁(optimisitic locking的方式,只在数据被其他客户端修改的情况下,通知所有加锁的客户端,这样客户端不需要等待取得锁的客户端执行完毕,只需要在得到通知时进行重试即可。

所以在 Redis 中,我们通过乐观锁来确保数据一致性。

将商品加入平台的实现方式

将商品放到平台上销售遵循以下规则:

  • 卖家拥有该商品。
    • 扣除卖家商品成功
      • 将商品加入平台。
    • 扣除卖家商品失败
      • 不允许将商品加入平台。
  • 卖家不拥有该商品。
    • 不允许将商品加入平台。

        如果按照传统的 Redis 事务,简单地将扣除卖家商品与将商品加入商品买卖信息有序集合放到一个事务中。当用户通过多终端同时执行操作,或者用户在短时间内多次执行操作时,就会引发数据一致性问题。多个进程同时验证到卖家拥有该商品,并开始扣除卖家商品,由于操作的原子性,第二次及之后的扣除操作会失败,但事务中的其他命令会继续执行,导致卖家将一个商品多次加入平台,从而引发数据出错。

        这时我们需要使用 WATCH 命令(乐观锁)来解决数据一致性问题。WATCH 命令可以监控一或多个键,一旦其中有一个键被修改/删除时,之后的事务都不会被执行。WATCH 命令的语法如下:

WATCH key [key ...]

当监视的 key 被其他命令改动时,事务将被打断并返回一个错误 WatchError,用户可以根据自身需求选择重试或者取消事务。在这里我们就需要对商品所在的用户仓库进行监视,并在捕获到错误时进行重试,重试时间为 5 秒,如果重试失败,则应该取消事务。

此时,将商品加入平台的步骤变为如下:

  • 对用户仓库加乐观锁,监视其变化。
    • 无变化。
      • 用户拥有该商品。
        • 从用户仓库中扣除该商品。
        • 将商品加入到商品买卖信息有序集合中。
      • 用户不拥有该商品。
        • 取消对用户仓库的乐观锁,取消监控。
    • 有变化
      • 进行重试。
      • 限定时间为 5 秒。
  1. import time
  2. import redis
  3. # 函数定义:将商品添加到市场
  4. def add_item_to_market(itemid, sellerid, price):
  5. # 拼接卖家的库存键名
  6. repertory = "inventory:" + sellerid
  7. # 拼接商品键名
  8. item = itemid + "." + sellerid
  9. # 设置超时时间为5秒
  10. end = time.time() + 5
  11. # 创建 Redis 连接的 pipeline
  12. pipe = conn.pipeline()
  13. # 在规定时间内执行以下操作
  14. while time.time() < end:
  15. try:
  16. # 监视卖家的库存是否发生变化
  17. pipe.watch(repertory)
  18. # 检查卖家是否仍持有该商品
  19. if not pipe.sismember(repertory, itemid):
  20. # 如果卖家不再持有该商品,则解除对卖家库存的监视
  21. pipe.unwatch()
  22. # 返回空值,表示加入失败(与重试失败区分)
  23. return None
  24. # 开始事务操作
  25. pipe.multi()
  26. # 将商品添加到市场
  27. pipe.zadd("market", item, price)
  28. # 从卖家的库存中移除该商品
  29. pipe.srem(repertory, itemid)
  30. # 执行事务
  31. pipe.execute()
  32. # 返回 True 表示成功将商品加入市场
  33. return True
  34. except redis.exceptions.WatchError:
  35. # 如果捕获到 WatchError,说明卖家的库存发生了变化,需要重试
  36. pass
  37. # 如果超时或重试失败,返回 False
  38. return False

我们在发现卖家不再拥有该商品时,就意味着不能再对用户仓库进行扣除商品操作,商品不能被加入到平台中,所以就没有必要继续对用户仓库进行监控。此时,我们使用 UNWATCH 命令取消了对用户仓库的监视,并取消将商品加入平台的操作。UNWATCH 命令语法如下:

UNWATCH

需要注意的是UNWATCH 命令会解除对**所有 key **的监视。

当我们确定卖家拥有该商品,且同一时刻没有其他进程对用户仓库进行扣除操作时,可以将商品加入平台,将扣除卖家商品与将商品加入商品买卖信息有序集合放到一个事务中执行。

购买商品的实现方式

成功购买商品需要满足三个条件:

  • 买家用户余额足够购买该商品。
  • 买家用户没有同时在购买其他商品(也就是说:买家余额不变化)
  • 该商品没有被其他用户买走。

通过分析上述条件,可以发现购买商品过程中存在两个独占资源

  • 商品买卖信息中的某商品条目(对应的键:商品买卖信息有序集合)
  • 买家余额(对应的键:买家用户信息)

所以我们需要对这两个独占资源所属的键加乐观锁(进行监视):

  1. # 生成买家键名
  2. buyer = "users:" + buyerid
  3. # 创建 Redis 连接的 pipeline
  4. pipe = conn.pipeline()
  5. # 监视市场和买家键,以便在事务中对它们进行操作
  6. pipe.watch("market", buyer)

乐观锁帮助我们确保后两个条件成立,接下来需要判断该用户余额是否足够购买该商品:

  1. # 获取该商品价格
  2. price = pipe.zscore("market", itemid)
  3. # 获取买家用户余额
  4. funds = int(pipe.hget(buyer, "funds"))
  5. if funds < price:
  6. pipe.unwatch()

当用户余额不足以购买该商品时,意味着交易不能继续进行,所以需要解除对商品买卖信息和买家个人信息的监视,同时终止交易。

在满足了上述三个条件后,购买商品的过程就可以顺利的进行了:

  • 买家用户余额减去商品价格的数值。
  • 卖家用户余额增加商品价格的数值。
  • 从商品买卖信息有序集合中移除该商品。
  • 为买家用户仓库增加该商品。
  1. # itemid 的格式为 `itemX.userX`,其中:
  2. # 前半部分为商品ID
  3. # 后半部分为卖家ID
  4. # 所以可以使用 split 方法将其分为两段
  5. item, sellerid = itemid.split(".")
  6. seller = "users:" + sellerid
  7. repertory = "inventory:" + buyerid
  8. # 开始 Redis 事务操作
  9. pipe.multi()
  10. # 增加卖家的资金
  11. pipe.hincrby(seller, "funds", int(price))
  12. # 减少买家的资金
  13. pipe.hincrby(buyer, "funds", int(-price))
  14. # 将商品添加到买家的库存
  15. pipe.sadd(repertory, item)
  16. # 从市场中移除该商品
  17. pipe.zrem("market", itemid)
  18. # 执行事务
  19. pipe.execute()

通过对商品买卖信息和买家用户信息加锁,确保了商品被其他买家买走,或者买家账户正在支付其他商品时,程序会阻止用户进行交易,保证了被竞争资源的独占性,避免数据出错,维护了系统数据一致性。

非事务性流水线

使用事务的好处除了在执行时不会被其他命令中断外,还可以通过使用流水线加快事务执行的速度。实际上,在不使用事务的情况下,我们也可以通过使用流水线提高命令的执行效率。

流水线通过一次发送所有命令来减少通信次数,降低通信延迟带来的时间开销,创建流水线的方式在之前就已经使用过:

pipe = conn.pipeline()

按上述方式调用 pipeline 方法时会默认传入 True 参数指定使用事务的方式提交命令,客户端将会使用 MULTIEXEC 命令将所有命令包裹起来,延迟命令的执行。更为重要的是,MULTIEXEC 命令会消耗一定的资源。

所以当我们只需要使用流水线的情况下,我们可以传入 False 参数:

pipe_without_transaction = conn.pipeline(False)

通过使用流水线一次性发送多条命令,可以提高 Redis 的整体性能。

通过下图,我们可以对比不使用流水线(左侧)使用流水线(右侧)的通信过程:

可以明显地看出,通过使用流水线,通信往返次数降低到了原来的三分之一,大大降低了通信时间开销,如果 Redis 和应用服务器通过局域网相连,这样的修改则可以减少24毫秒的时间开销。

  1. #!/usr/bin/env python
  2. #-*- coding:utf-8 -*-
  3. import time
  4. import redis
  5. conn = redis.Redis()
  6. # 将商品放到市场上
  7. def add_item_to_market(itemid, sellerid, price):
  8. # 构建相关键
  9. repertory = "inventory:" + sellerid # 卖家的仓库键
  10. item = itemid + "." + sellerid # 商品键
  11. end = time.time() + 5 # 设置超时时间为5秒
  12. pipe = conn.pipeline()
  13. # 在超时时间内尝试执行操作
  14. while time.time() < end:
  15. try:
  16. pipe.watch(repertory) # 监视卖家的仓库
  17. if not pipe.sismember(repertory, itemid): # 如果商品不在卖家的仓库中
  18. pipe.unwatch()
  19. return None
  20. pipe.multi() # 开启事务
  21. pipe.zadd("market", item, price) # 将商品添加到市场
  22. pipe.srem(repertory, itemid) # 从卖家的仓库中移除商品
  23. pipe.execute() # 执行事务
  24. return True
  25. except redis.exceptions.WatchError: # 处理 WatchError 异常
  26. pass
  27. return False
  28. # 购买商品
  29. def purchase(buyerid, itemid):
  30. item, sellerid = itemid.split(".") # 解析商品和卖家id
  31. buyer = "users:" + buyerid # 买家键
  32. seller = "users:" + sellerid # 卖家键
  33. repertory = "inventory:" + buyerid # 买家的仓库键
  34. end = time.time() + 10 # 设置超时时间为10秒
  35. pipe = conn.pipeline()
  36. # 在超时时间内尝试执行操作
  37. while time.time() < end:
  38. try:
  39. pipe.watch("market", buyer) # 监视市场和买家键
  40. price = pipe.zscore("market", itemid) # 获取商品价格
  41. funds = int(pipe.hget(buyer, "funds")) # 获取买家资金
  42. if funds < price: # 如果买家资金不足
  43. pipe.unwatch()
  44. return None
  45. pipe.multi() # 开启事务
  46. pipe.hincrby(seller, "funds", int(price)) # 增加卖家资金
  47. pipe.hincrby(buyer, "funds", int(-price)) # 减少买家资金
  48. pipe.sadd(repertory, item) # 将商品添加到买家的仓库
  49. pipe.zrem("market", itemid) # 从市场中移除商品
  50. pipe.execute() # 执行事务
  51. return True
  52. except redis.exceptions.WatchError: # 处理 WatchError 异常
  53. pass
  54. return False

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/384825
推荐阅读
相关标签
  

闽ICP备14008679号