赞
踩
在了解了Seata 的工作原理后,如何在springboot 项目中进行整合使用,本文进行阐述。
提示:本文使用 seata 版本 :1.5 ;在整合之前 你需要先要部署seate 的服务
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.14</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-seata-storage</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-seata-storage</name> <description>spring-seata-storage</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> <optional>true</optional> <version>3.1.6</version> </dependency> <!-- mysql --> <!-- https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.5.2</version> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.21</version> <!-- <version>5.1.49</version>--> </dependency> <!-- https://mvnrepository.com/artifact/net.sf.jsqlparser/jsqlparser --> <dependency> <groupId>net.sf.jsqlparser</groupId> <artifactId>jsqlparser</artifactId> <version>0.8.0</version> </dependency> <dependency> <groupId>io.swagger</groupId> <artifactId>swagger-annotations</artifactId> <version>1.6.2</version> </dependency> <!-- https://mvnrepository.com/artifact/io.seata/seata-spring-boot-starter --> <!-- <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.5.2</version> </dependency>--> <!--处理 在SpringBoot 2.4.x的版本之后,对于bootstrap.properties/bootstrap.yaml配置文件(我们合起来成为Bootstrap配置文件)的支持, 需要导入如下的依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> <version>3.1.6</version> </dependency> <!-- seata 使用nacos 作为配置中心和 注册中心时 使用 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> <version>2021.0.5.0</version> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>2021.0.5.0</version> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <!-- 里面已经集成服务间调用X-id的传递,包括FeignClient的重写,如果在之前自定义封装过Feign,注意启动冲突--> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <version>2021.0.5.0</version> <exclusions> <!--去除低版本--> <exclusion> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/io.seata/seata-spring-boot-starter --> <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.6.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <mainClass>com.example.springseata.springseatastorage</mainClass> </configuration> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> <resources> <resource> <directory>src/main/resources</directory> <filtering>true</filtering> </resource> </resources> </build> </project>
jar 目的:
springcloud ,springboot,springcloud-alibaba 版本对应:版本说明
目的: 了解seat 客户端和服务端的 关系,清楚其配置的含义;
seata server 的存储模式:
存储模式指定了Seata Server用于存储事务和元数据的方式,这个只有在seata 服务端才有。Seata Server支持三种存储模式:file、db和Redis。列如 当开始事务时需要生成全局事务id Xid ,在生成后就需要Seata 服务进行存储,所以他是Seata Server的重要组成部分;
seata server 的配置中心:
配置中心的作用:用于配置 Seata Server的相关参数, Seata Server 端在启动的时候也可以配置很多参数,如果把这些参数都放到 Seata Server 中,每次修改度需要连接到Seata Server 的服务器,然后对其配置文件进行修改,如果把这个参数放到 配置中心如config ,zk,nacos 就可以在配置中心进行修改,更加的方便;
seata server 的注册中心:
Seata Server 的注册中心是用于存储和管理 Seata 事务参与者的信息的组件;多个Seata Server 可以成为一个集群,然后注册到zk,eureka,nacos 里,这样方便集群的管理;
seata client 的配置中心:
同seata-sever 端相同,客户端在启动的时候也可以进行参数的配置,所以可以在客户端我们也可以 将参数放到 config ,zk,nacos 进行配置;
seata client 的注册中心:
seata 客户端的注册中心,用于服务端的发现,一个重要的信息就是,可以将seata 客户端和seata 服务端 注册到相同的地方,已nacos 为例:服务端和客户端 注册到相同命名空间,相同分组,相同应用,此时客户端会动态的发现 seata 服务的地址,此时不需要在显示的设置 seata 服务断的地址;
Seata 客户端在启动时会从 Nacos 服务获取 service.vgroupMapping.your-service-group
定义的服务群组实例,这需要和 Seata 服务端的配置保持一致。以下三个参数需要和 Seata 服务端保持一致:application
:, group
, namespace
目的:让 seata 接管数据源,然后才能协调各个微服务的本地事务;
seata:
enabled: true # spring 自动装配
enable-auto-data-source-proxy: true # 开启数据源自动代理
transport:
enable-client-batch-send-request: true #客户端事务消息请求是否批量合并发送,默认true,false单条发送
tx-service-group: my_tx_group # 事务分组
application-id: storage
service:
vgroup-mapping:
my_tx_group: default
# grouplist:
# default: localhost:8091
这中连接模式只在 seata 客户端为 file 模式下生效
; 其他方式如nacos ,需要配置使得 seata-server 和seata-client 都注册在相同命名空间,相同分组,相同application 下,这样seata-client 就可以动态的发现seata-server 的实例地址;目的: 了解本地springboot 服务如何连接到 seata server 端
客户端和服务端 seata 都是默认已file 作为注册中心:
file
seata-server 服务端 注册中心使用的file 本地模式,则seata 客户端 也需要使用file注册模式:
此时需要配置 seata.service.grouplist.default=127.0.0.1:8091 用于客户端连接 seata 服务端;
nacos
seata-server 服务端 注册中心使用的nacos,则seata 客户端 也需要使用nacos注册模式:
seata-server 服务端 注册中心使用的nacos,则seata 客户端 也需要使用nacos注册模式: seata: enabled: true # spring 自动装配 enable-auto-data-source-proxy: true # 开启数据源自动代理 transport: enable-client-batch-send-request: true #客户端事务消息请求是否批量合并发送,默认true,false单条发送 tx-service-group: my_tx_group application-id: storage service: vgroup-mapping: my_tx_group: default # grouplist: # default: localhost:8091 registry: # type: file type: nacos nacos: application: seata-server server-addr: localhost:8848 group: SEATA_GROUP namespace: 7f2765bc-d8c1-4b49-b706-8c292d2ffac9 username: nacos password: nacos
注意 application,server-addr,group,namespace,username,password 客户端和服务端报纸一致;
目的: 了解本地springboot 如何对seata 进行参数配置
客户端和服务端 seata 都是默认已file 作为配置中心,他们的只是用作参数的配置使用,所以他们各自是独立的;
seata: enabled: true # spring 自动装配 enable-auto-data-source-proxy: true # 开启数据源自动代理 transport: enable-client-batch-send-request: true #客户端事务消息请求是否批量合并发送,默认true,false单条发送 tx-service-group: my_tx_group application-id: storage service: vgroup-mapping: my_tx_group: default # grouplist: # default: localhost:8091 config: type: nacos # type: file nacos: server-addr: localhost:8848 namespace: c0befc56-f31b-4de6-929b-6f75604992d1 # namespace: 7f2765bc-d8c1-4b49-b706-8c292d2ffac9 group: DEFAULT_GROUP username: nacos password: nacos ##if use MSE Nacos with auth, mutex with username/password attribute #access-key: "" #secret-key: "" data-id: seata.properties
本文测试时虽然在 nacos 中创建了seata.properties 的配置文件但是其 配置的参数没有起作用;
会提示无法找到service.vgroupMapping.my_tx_group(my_tx_group 为定义的分组名称)的集群,通过创建 service.vgroupMapping.my_tx_group 配置文件 并且定义default 得以解决,目前没有找到其它解决办法;
seata:
enabled: true # spring 自动装配
enable-auto-data-source-proxy: true # 开启数据源自动代理
所以需要去重 手动加入的代理,防止冲突,导致 事务失效
<dependency>
<groupId>com.alibaba.cloud</groupId>
<!-- 里面已经集成服务间调用X-id的传递,包括FeignClient的重写,如果在之前自定义封装过Feign,注意启动冲突-->
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2021.0.5.0</version>
<exclusions>
<!--去除低版本-->
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
所以可以去除 手动增加的对于feign 接口xid 的特殊处理防止冲突:
seata 客户端的事务模式默认使用AT 模式;通过以下两个步骤配置就可使用:
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;
SHOW VARIABLES LIKE 'innodb_support_xa';
如果没有开通,则打开 MySQL 配置文件 my.cnf 或 my.ini。在文件中找到 [mysqld] 部分,并在该部分下添加以下行:
[mysqld]
innodb_support_xa = 1
保存配置文件并重启 MySQL 服务;
seata.data-source-proxy-mode=XA
根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode 和 TCC (Branch) Transaction Mode.
AT 模式(参考链接 TBD)基于 支持本地 ACID 事务 的 关系型数据库:
一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。
相应的,TCC 模式,不依赖于底层数据资源的事务支持:
一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
二阶段 commit 行为:调用 自定义 的 commit 逻辑。
二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
CREATE TABLE `tcc_fence_log` (
`xid` varchar(128) NOT NULL COMMENT 'global id',
`branch_id` bigint(20) NOT NULL COMMENT 'branch id',
`action_name` varchar(64) NOT NULL COMMENT 'action name',
`status` tinyint(4) NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',
`gmt_create` datetime(3) NOT NULL COMMENT 'create time',
`gmt_modified` datetime(3) NOT NULL COMMENT 'update time',
PRIMARY KEY (`xid`,`branch_id`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
public interface IOrderService extends IService<Order> {
Boolean orderCreate(OrderReqDto reqDto);
}
实现类:
@Override @GlobalTransactional(rollbackFor = Exception.class) public Boolean orderCreate(OrderReqDto reqDto) { log.error("Seata全局事务id=================>{}", RootContext.getXID()); // stock服务未报错,order服务报错 try { // 扣减库存-通过fegin 接口进行调用 storageFeignApi.tccSumGoodNum(reqDto.getByGoods()); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } // 订单创建 Order order = new Order(); order.setAccountId("123").setMoney(reqDto.getOrderMoney()); this.save(order); return true; }
controller:
@ResponseBody
@PostMapping("/tcc/sure")
public Boolean orderCreate(@RequestBody OrderReqDto reqDto) throws SQLException {
return orderService.orderCreate(reqDto);
}
@ResponseBody
@PostMapping(value = "/tcc/sumGoodNum")
public boolean tccSumGoodNum(@RequestBody List<GoodDto> param){
return tccStorageService.deduct(param);
}
ITccStorageService:
package com.example.springseatastorage.service; import com.example.springseatastorage.controller.dto.GoodDto; import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; import java.util.List; @LocalTCC public interface ITccStorageService { /** * 减库存 * useTCCFence=true 为开启防悬挂 * @param param */ @TwoPhaseBusinessAction(name = "deduct", commitMethod = "busCommit", rollbackMethod = "busRollback", useTCCFence = true) boolean deduct(@BusinessActionContextParameter(paramName = "param") List<GoodDto>param); /** * 提交事务 * * @param actionContext * @return */ boolean busCommit(BusinessActionContext actionContext); /** * 回滚事务 * * @param actionContext * @return */ boolean busRollback(BusinessActionContext actionContext); }
ITccStorageServicempl:
package com.example.springseatastorage.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.example.springseatastorage.controller.dto.GoodDto; import com.example.springseatastorage.entity.Storage; import com.example.springseatastorage.service.IStorageService; import com.example.springseatastorage.service.ITccStorageService; import io.seata.core.context.RootContext; import io.seata.rm.tcc.api.BusinessActionContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @Slf4j @Service public class ITccStorageServicempl implements ITccStorageService { @Autowired private IStorageService storageService; @Override @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED) public boolean deduct(List<GoodDto> param) { log.error("Seata全局事务id=================>{}", RootContext.getXID()); Map<String, Integer> good = param.stream().collect(Collectors.toMap(GoodDto::getGoodId, GoodDto::getBuyNum)); LambdaQueryWrapper<Storage> wrapper = new LambdaQueryWrapper<>(); wrapper.in(Storage::getGoodId, param.stream().map(e -> e.getGoodId()).collect(Collectors.toList())); List<Storage> storages = storageService.list(wrapper); storages.stream().forEach(e -> { e.setGoodNum(e.getGoodNum() - good.get(e.getGoodId())); }); storageService.updateBatchById(storages); return true; } @Override public boolean busCommit(BusinessActionContext actionContext) { log.info("busCommit xid = " + actionContext.getXid() + "提交成功"); return true; } @Override public boolean busRollback(BusinessActionContext actionContext) { // 编写对应的业务数据进行回滚 List<GoodDto> param = actionContext.getActionContext("param", List.class); // int count = actionContext.getActionContext("count", Integer.class); // LambdaQueryChainWrapper<Stock> eq = lambdaQuery().eq(Stock::getCommodityCode, commodityCode); // Long count1 = eq.one().getCount(); // // 扣了多少数,需要重新添加回去 // lambdaUpdate().set(Stock::getCount, count + count1) // .eq(Stock::getCommodityCode, commodityCode) // .update(); log.info("busRollback xid = " + actionContext.getXid() + "回滚成功"); return true; } }
本文通过springboot 项目整合seata 然后 通过 AT,XA,TCC 事务模式,模拟实现分布式事务。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。