赞
踩
在分布式系统下,一个业务跨越多个服务或数据源,每个服务都是一个分支事务,要保证所有分支事务最终状态一致,这样的事务就是分布式事务。
1998年,加州大学的计算机科学家 Eric Brewer 提出,分布式系统有三个指标:
Eric Brewer 说,分布式系统无法同时满足这三个指标。这个结论就叫做 CAP 定理。
- 简述CAP定理内容? 分布式系统节点通过网络连接,一定会出现分区问题(P) 当分区出现时,系统的一致性(C)和可用性(A)就无法同时满足
- 思考:elasticsearch集群是CP还是AP? ES集群出现分区时,故障节点会被剔除集群,数据分片会重新分配到其它节点,保证数据一致。因此是低可用性,高一致性,属于CP
BASE理论是对CAP的一种解决思路,包含三个思想:
临时
的不一致状态。而分布式事务最大的问题是各个子事务的一致性问题,因此可以借鉴CAP定理和BASE理论:
最终一致
。强一致
。但事务等待过程中,处于弱可用状态。解决分布式事务,各个子系统之间必须能感知到彼此的事务状态,才能保证状态一致,因此需要一个事务协调者
来协调每一个事务的参与者(子系统事务)。
这里的子系统事务,称为分支事务
;有关联的各个分支事务在一起称为全局事务
- 简述BASE理论三个思想:基本可用、软状态、最终一致
- 解决分布式事务的思想和模型: 全局事务:整个分布式事务 分支事务:分布式事务中包含的每个子系统的事务 最终一致思想:各分支事务分别执行并提交,如果有不一致的情况,再想办法恢复数据
强一致思想:各分支事务执行完业务不要提交,等待彼此结果。而后统一提交或回滚
Seata是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。致力于提供高性能和简单易用的分布式事务服务,为用户打造一站式的分布式解决方案。
官网地址:http://seata.io/,其中的文档、播客中提供了大量的使用说明、源码分析。
Seata提供了四种不同的分布式事务解决方案:
XA模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
TCC模式:最终一致的分阶段事务模式,有业务侵入
AT模式:最终一致的分阶段事务模式,无业务侵入,也是Seata的默认模式
SAGA模式:长事务模式,有业务侵入
注意:serverAddr = "127.0.0.1:8848"地址要是nacos的启动地址
registry.conf
registry { # tc服务的注册中心类,这里选择nacos,也可以是eureka、zookeeper等 type = "nacos" nacos { # seata tc 服务注册到 nacos的服务名称,可以自定义 application = "seata-tc-server" serverAddr = "127.0.0.1:8848" group = "DEFAULT_GROUP" namespace = "" cluster = "SH" username = "nacos" password = "nacos" } } config { # 读取tc服务端的配置文件的方式,这里是从nacos配置中心读取,这样如果tc是集群,可以共享配置 type = "nacos" # 配置nacos地址等信息 nacos { serverAddr = "127.0.0.1:8848" namespace = "" group = "SEATA_GROUP" username = "nacos" password = "nacos" dataId = "seataServer.properties" } }
# 数据存储方式,db代表数据库 store.mode=db store.db.datasource=druid store.db.dbType=mysql store.db.driverClassName=com.mysql.cj.jdbc.Driver store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=UTC store.db.user=root store.db.password=123456 store.db.minConn=5 store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 # 事务、日志等配置 server.recovery.committingRetryPeriod=1000 server.recovery.asynCommittingRetryPeriod=1000 server.recovery.rollbackingRetryPeriod=1000 server.recovery.timeoutRetryPeriod=1000 server.maxCommitRetryTimeout=-1 server.maxRollbackRetryTimeout=-1 server.rollbackRetryTimeoutUnlockEnable=false server.undo.logSaveDays=7 server.undo.logDeletePeriod=86400000 # 客户端与服务端传输方式 transport.serialization=seata transport.compressor=none # 关闭metrics功能,提高性能 metrics.enabled=false metrics.registryType=compact metrics.exporterList=prometheus metrics.exporterPrometheusPort=9898
其中的数据库地址、用户名、密码都需要修改成你自己的数据库信息。
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- 分支事务表 -- ---------------------------- DROP TABLE IF EXISTS `branch_table`; CREATE TABLE `branch_table` ( `branch_id` bigint(20) NOT NULL, `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `transaction_id` bigint(20) NULL DEFAULT NULL, `resource_group_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `branch_type` varchar(8) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `status` tinyint(4) NULL DEFAULT NULL, `client_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `gmt_create` datetime(6) NULL DEFAULT NULL, `gmt_modified` datetime(6) NULL DEFAULT NULL, PRIMARY KEY (`branch_id`) USING BTREE, INDEX `idx_xid`(`xid`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- 全局事务表 -- ---------------------------- DROP TABLE IF EXISTS `global_table`; CREATE TABLE `global_table` ( `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `transaction_id` bigint(20) NULL DEFAULT NULL, `status` tinyint(4) NOT NULL, `application_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `transaction_service_group` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `transaction_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `timeout` int(11) NULL DEFAULT NULL, `begin_time` bigint(20) NULL DEFAULT NULL, `application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `gmt_create` datetime NULL DEFAULT NULL, `gmt_modified` datetime NULL DEFAULT NULL, PRIMARY KEY (`xid`) USING BTREE, INDEX `idx_gmt_modified_status`(`gmt_modified`, `status`) USING BTREE, INDEX `idx_transaction_id`(`transaction_id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; SET FOREIGN_KEY_CHECKS = 1;
进入bin目录,运行其中的seata-server.bat
即可:
启动成功后,seata-server应该已经注册到nacos注册中心了。
打开浏览器,访问nacos地址:http://localhost:8848,然后进入服务列表页面,可以看到seata-tc-server的信息:
第一步:引入相关依赖
<!--seata--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <exclusions> <!--版本较低,因此排除--> <exclusion> <artifactId>seata-spring-boot-starter</artifactId> <groupId>io.seata</groupId> </exclusion> </exclusions> </dependency> <!--seata starter采用1.4.2版本--> <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>${seata.version}</version> </dependency>
第二步:配置yml文件,让微服务通过注册中心找到seate-tc-server
第三步:在seata-server.bat控制台看到注册成功
测试服务,当订单数超过库存时,事务会进行回滚,如下所示:
简述AT模式与XA模式最大的区别是什么?
XA模式一阶段不提交事务,锁定资源;AT模式一阶段直接提交,不锁定资源。
XA模式依赖数据库机制实现回滚;AT模式利用数据快照实现数据回滚。
XA模式强一致;AT模式最终一致
AT模式存在脏写问题
:一个事务A修改完数据释放DB锁,事务B又获得DB锁修改数据后释放锁,此时A出错,获得DB锁根据快照恢复数据,会导致事务B对数据的修改丢失
上述问题可以通过全局锁
来解决:当事务2执行业务后想要获取全局锁,而事务1回滚也需要DB锁,这样会形成死锁问题;解决办法是事务2等待一段时间无法获取全局锁,就回滚并释放DB锁;这样就解决了脏写和死锁问题。
但是全局锁是由seata管理的,而非seata管理的事务要和seata管理的事物修改一个数据时,就不需要使用全局锁,这样又会回到脏写问题。AT模式其实对这个问题也会处理,由于保存快照时会生成两个快照【修改前快照、修改后快照】,当事务1要回滚时比对修改后快照,发现数据被非seata管理的事物修改了,就给通知人工处理。
总结:AT模式虽然隔离性比较差,但是可以利用全局锁实现读写隔离
前两种模式都要加锁来保证隔离性,性能会变差;TCC模式通过人工编码大大提高了效率。
第一步:创建冻结金额表【包括:冻结金额、事务状态等】
第二步:编写业务【try、confrim、cancel】逻辑,判断空回滚和业务悬挂
AccountTCCService.java
@LocalTCC
public interface AccountTCCService {
@TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel")
void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "money")int money);
boolean confirm(BusinessActionContext ctx);
boolean cancel(BusinessActionContext ctx);
}
AccountTCCServiceImpl.java
@Service @Slf4j public class AccountTCCServiceImpl implements AccountTCCService { @Autowired private AccountMapper accountMapper; @Autowired private AccountFreezeMapper freezeMapper; @Override @Transactional public void deduct(String userId, int money) { // 0.获取事务id = RootContext.getXID(); String xid = RootContext.getXID(); /** 1. 避免业务悬挂:即在try之前,不能有cancel * 判断freeze中是否有记录,如果有,表示cancel执行过, * 不能执行业务try */ if(freezeMapper.selectById(xid) != null){ // cancel执行过了,拒绝业务 return; } // 2.扣减可用余额 accountMapper.deduct(userId, money); // 3.记录冻结金额,事务状态 AccountFreeze freeze = new AccountFreeze(); freeze.setUserId(userId); freeze.setFreezeMoney(money); freeze.setState(AccountFreeze.State.TRY); freeze.setXid(xid); freezeMapper.insert(freeze); } @Override public boolean confirm(BusinessActionContext ctx) { // 1.获取事务id String xid = ctx.getXid(); // 2.根据id删除冻结记录 int count = freezeMapper.deleteById(xid); return count == 1; } @Override public boolean cancel(BusinessActionContext ctx) { // 0.查询冻结记录 String xid = ctx.getXid(); AccountFreeze freeze = freezeMapper.selectById(xid); /** 1.空回滚判断 * 判断freeze若为null,证明try没执行,需要空回滚 */ if (freeze == null) { // 从上下文获取userid String userId = ctx.getActionContext("userId").toString(); // 插入一条空数据 AccountFreeze freeze1 = new AccountFreeze(); freeze1.setUserId(userId); freeze1.setFreezeMoney(0); freeze1.setState(AccountFreeze.State.CANCEL); freeze1.setXid(xid); freezeMapper.insert(freeze); return true; } /** 2. 判断幂等(cancel不能执行多次) * 看状态是否为cancel */ if (freeze.getState() == 3) { // 已经处理一次cancel,无需再次cancel return true; } // 3.恢复可用余额 accountMapper.refund(freeze.getUserId(), freeze.getFreezeMoney()); // 4.将冻结金额清零,状态改为CANCEL freeze.setFreezeMoney(0); freeze.setState(AccountFreeze.State.CANCEL); int count = freezeMapper.updateById(freeze); return count == 1; } }
第三步:测试
服务找到集群?
方式一:通过代码中的配置文件,但是一旦集群变了就要修改代码不方便;
方式二:nacos的热更新,将配置文件放到nacos的配置中心,随时更新。
节点名称 | ip地址 | 端口号 | 集群名称 |
---|---|---|---|
seata | 127.0.0.1 | 8091 | SH |
seata2 | 127.0.0.1 | 8092 | HZ |
之前我们已经启动了一台seata服务,端口是8091,集群名为SH。
现在,将seata目录复制一份,起名为seata2
修改seata2/conf/registry.conf内容如下:直接修改集群即可
registry { # tc服务的注册中心类,这里选择nacos,也可以是eureka、zookeeper等 type = "nacos" nacos { # seata tc 服务注册到 nacos的服务名称,可以自定义 application = "seata-tc-server" serverAddr = "127.0.0.1:8848" group = "DEFAULT_GROUP" namespace = "" cluster = "HZ" username = "nacos" password = "nacos" } } config { # 读取tc服务端的配置文件的方式,这里是从nacos配置中心读取,这样如果tc是集群,可以共享配置 type = "nacos" # 配置nacos地址等信息 nacos { serverAddr = "127.0.0.1:8848" namespace = "" group = "SEATA_GROUP" username = "nacos" password = "nacos" dataId = "seataServer.properties" } }
进入seata2/bin目录,然后运行命令:
seata-server.bat -p 8092
打开nacos控制台,查看服务列表:
点进详情查看:
接下来,我们需要将tx-service-group与cluster的映射关系都配置到nacos配置中心。
新建一个配置:
配置的内容如下:
# 事务组映射关系 service.vgroupMapping.seata-demo=SH service.enableDegrade=false service.disableGlobalTransaction=false # 与TC服务的通信配置 transport.type=TCP transport.server=NIO transport.heartbeat=true transport.enableClientBatchSendRequest=false transport.threadFactory.bossThreadPrefix=NettyBoss transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler transport.threadFactory.shareBossWorker=false transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector transport.threadFactory.clientSelectorThreadSize=1 transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread transport.threadFactory.bossThreadSize=1 transport.threadFactory.workerThreadSize=default transport.shutdown.wait=3 # RM配置 client.rm.asyncCommitBufferLimit=10000 client.rm.lock.retryInterval=10 client.rm.lock.retryTimes=30 client.rm.lock.retryPolicyBranchRollbackOnConflict=true client.rm.reportRetryCount=5 client.rm.tableMetaCheckEnable=false client.rm.tableMetaCheckerInterval=60000 client.rm.sqlParserType=druid client.rm.reportSuccessEnable=false client.rm.sagaBranchRegisterEnable=false # TM配置 client.tm.commitRetryCount=5 client.tm.rollbackRetryCount=5 client.tm.defaultGlobalTransactionTimeout=60000 client.tm.degradeCheck=false client.tm.degradeCheckAllowTimes=10 client.tm.degradeCheckPeriod=2000 # undo日志配置 client.undo.dataValidation=true client.undo.logSerialization=jackson client.undo.onlyCareUpdateColumns=true client.undo.logTable=undo_log client.undo.compress.enable=true client.undo.compress.type=zip client.undo.compress.threshold=64k client.log.exceptionRate=100
接下来,需要修改每一个微服务的application.yml文件,让微服务读取nacos中的client.properties文件:
seata:
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
username: nacos
password: nacos
group: SEATA_GROUP
data-id: client.properties
重启微服务,现在微服务到底是连接tc的SH集群,还是tc的HZ集群,都统一由nacos的client.properties来决定了。由于client.properties设置的集群是HZ,所以重启服务后,三个服务都到了HZ(端口号8091)中。
现在修改client.properties设置的集群是SH,所以重启服务后,三个服务都到了SH(端口号8092)中。
此时还可以看到orderApplication等从SH(8091端口)集群转到了HZ(8092端口)集群
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。