Seata
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
官网&下载地址:http://seata.io/zh-cn/
是采用两阶段提交协议来执行的具体如下:
-
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
-
二阶段:
- 提交异步化,非常快速地完成。
- 回滚通过一阶段的回滚日志进行反向补偿。
- TC 事务协调器,维护全局和分支事务的状态,驱动全局事务提交或回滚。(单独部署)
- TM 事务管理器 定义全局事务的范围:开始全局事务、提交或回滚全局事务。(统称为服务发起者)
- RM 资源管理器 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。(服务参与者)
逻辑说明:
1.事务发起者TM事务管理器,通过RPC与TC通讯,请求得到一个事务分组(group)和全局事务ID,所有服务相关的参与者都在这一个事务分组中
2.在调用其他微服务的时候,TC协同服务会生成事务回滚的undo_log日志,再来提交事务,并通知到TC。如果没有问题,则继续下一个服务调用。
3.期间任何服务的分支事务回滚,都会通知TC,TC在通知全局事务包含所有已完成一阶段提交的分支事务回滚。
4.如果所有分支事务都正常,最后由全局事务发起者通知TC协调者,TC再通知RM删除undo_log日志。
在这个过程中为了解决写隔离的问题,都会涉及到TC的全局锁。
详细使用配置说明
这里用用户购买商品的业务逻辑为例来讲解Seata的使用。整个业务逻辑由3个微服务提供支持:
- 仓储服务:对给定的商品扣除仓储数量。
- 订单服务:根据采购需求创建订单。
- 帐户服务:从用户帐户中扣除余额。
配置说明:(TC端配置)
1.下载运行包,下载地址:https://github.com/seata/seata/releases
Seata分TC、TM和RM三个角色,TC(Server端)为单独服务端部署,TM和RM(Client端)由业务系统集成。
2.修改seata-server-1.1.0\seata\conf\file.conf
主要修改:事务日志存储模式为db,连接数据库
修改配置seata/conf/file.conf文件
## transaction log store, only used in seata-server
store {## store mode: file、db、redismode = "db" ## 改为db模式## file store propertyfile {## store location dirdir = "sessionStore"# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptionsmaxBranchSessionSize = 16384# globe session size , if exceeded throws exceptionsmaxGlobalSessionSize = 512# file buffer size , if exceeded allocate new bufferfileWriteBufferCacheSize = 16384# when recover batch read sizesessionReloadReadSize = 100# async, syncflushDiskMode = async}## database store propertydb {## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.datasource = "druid"## mysql/oracle/postgresql/h2/oceanbase etc.## 修改数据库连接dbType = "mysql"driverClassName = "com.mysql.cj.jdbc.Driver"url = "jdbc:mysql://127.0.0.1:3306/seata?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&tinyInt1isBit=false&allowMultiQueries=true&serverTimezone=Asia/Shanghai"user = "root"password = "123456"minConn = 5maxConn = 30globalTable = "global_table"branchTable = "branch_table"lockTable = "lock_table"queryLimit = 100maxWait = 5000}## redis store propertyredis {host = "127.0.0.1"port = "6379"password = ""database = "0"minConn = 1maxConn = 10queryLimit = 100}}
修改配置 seata/conf/file.conf.example 文件
transport {# tcp udt unix-domain-sockettype = "TCP"#NIO NATIVEserver = "NIO"#enable heartbeatheartbeat = true# the client batch send request enableenableClientBatchSendRequest = false#thread factory for nettythreadFactory {bossThreadPrefix = "NettyBoss"workerThreadPrefix = "NettyServerNIOWorker"serverExecutorThreadPrefix = "NettyServerBizHandler"shareBossWorker = falseclientSelectorThreadPrefix = "NettyClientSelector"clientSelectorThreadSize = 1clientWorkerThreadPrefix = "NettyClientWorkerThread"# netty boss thread size,will not be used for UDTbossThreadSize = 1#auto default pin or 8workerThreadSize = "default"}shutdown {# when destroy server, wait secondswait = 3}serialization = "seata"compressor = "none"
}## transaction log store, only used in server side
store {## store mode: file、dbmode = "db" ## 改为db模式## file store propertyfile {## store location dirdir = "sessionStore"# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptionsmaxBranchSessionSize = 16384# globe session size , if exceeded throws exceptionsmaxGlobalSessionSize = 512# file buffer size , if exceeded allocate new bufferfileWriteBufferCacheSize = 16384# when recover batch read sizesessionReloadReadSize = 100# async, syncflushDiskMode = async}## database store propertydb {## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.datasource = "druid"## mysql/oracle/postgresql/h2/oceanbase etc.## 修改数据连接dbType = "mysql"driverClassName = "com.mysql.cj.jdbc.Driver"url = "jdbc:mysql://127.0.0.1:3306/seata?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&tinyInt1isBit=false&allowMultiQueries=true&serverTimezone=Asia/Shanghai"user = "root"password = "123456"minConn = 5maxConn = 30globalTable = "global_table"branchTable = "branch_table"lockTable = "lock_table"queryLimit = 100}
}
## server configuration, only used in server side
server {recovery {#schedule committing retry period in millisecondscommittingRetryPeriod = 1000#schedule asyn committing retry period in millisecondsasynCommittingRetryPeriod = 1000#schedule rollbacking retry period in millisecondsrollbackingRetryPeriod = 1000#schedule timeout retry period in millisecondstimeoutRetryPeriod = 1000}undo {logSaveDays = 7#schedule delete expired undo_log in millisecondslogDeletePeriod = 86400000}#check authenableCheckAuth = true#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanentmaxCommitRetryTimeout = "-1"maxRollbackRetryTimeout = "-1"rollbackRetryTimeoutUnlockEnable = false
}## metrics configuration, only used in server side
metrics {enabled = falseregistryType = "compact"# multi exporters use comma dividedexporterList = "prometheus"exporterPrometheusPort = 9898
}
修改配置 seata/conf/registry.conf
registry {# file 、nacos 、eureka、redis、zk、consul、etcd3、sofatype = "nacos" ## 改为nacos注册中心nacos {# 修改本地nacos配置application = "seata-server"serverAddr = "127.0.0.1:8848"group = "SEATA_GROUP"namespace = ""cluster = "default"username = ""password = ""}eureka {serviceUrl = "http://localhost:8761/eureka"application = "default"weight = "1"}redis {serverAddr = "localhost:6379"db = 0password = ""cluster = "default"timeout = 0}zk {cluster = "default"serverAddr = "127.0.0.1:2181"sessionTimeout = 6000connectTimeout = 2000username = ""password = ""}consul {cluster = "default"serverAddr = "127.0.0.1:8500"}etcd3 {cluster = "default"serverAddr = "http://localhost:2379"}sofa {serverAddr = "127.0.0.1:9603"application = "default"region = "DEFAULT_ZONE"datacenter = "DefaultDataCenter"cluster = "default"group = "SEATA_GROUP"addressWaitTime = "3000"}file {name = "file.conf"}
}config {# file、nacos 、apollo、zk、consul、etcd3type = "file"nacos {serverAddr = "127.0.0.1:8848"namespace = ""group = "SEATA_GROUP"username = ""password = ""}consul {serverAddr = "127.0.0.1:8500"}apollo {appId = "seata-server"apolloMeta = "http://192.168.1.204:8801"namespace = "application"}zk {serverAddr = "127.0.0.1:2181"sessionTimeout = 6000connectTimeout = 2000username = ""password = ""}etcd3 {serverAddr = "http://localhost:2379"}file {name = "file.conf"}
}
3.在mysql中新建库,并配置预装表
注意库要和上面url配置保持一致,默认为:seata
注意Seata1.0以后server包中没有sql表了,具体如下:
本地数据库新建seata库
/*Navicat Premium Data TransferSource Server : 本地Source Server Type : MySQLSource Server Version : 80016Source Host : localhost:3306Source Schema : seataTarget Server Type : MySQLTarget Server Version : 80016File Encoding : 65001Date: 18/05/2020 16:26:39
*/SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for branch_table
-- ----------------------------
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table` (`branch_id` bigint(20) NOT NULL,`xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,`transaction_id` bigint(20) NULL DEFAULT NULL,`resource_group_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`lock_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`branch_type` varchar(8) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`status` tinyint(4) NULL DEFAULT NULL,`client_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`gmt_create` datetime(0) NULL DEFAULT NULL,`gmt_modified` datetime(0) NULL DEFAULT NULL,PRIMARY KEY (`branch_id`) USING BTREE,INDEX `idx_xid`(`xid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;-- ----------------------------
-- Table structure for global_table
-- ----------------------------
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table` (`xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,`transaction_id` bigint(20) NULL DEFAULT NULL,`status` tinyint(4) NOT NULL,`application_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`transaction_service_group` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`transaction_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`timeout` int(11) NULL DEFAULT NULL,`begin_time` bigint(20) NULL DEFAULT NULL,`application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`gmt_create` datetime(0) NULL DEFAULT NULL,`gmt_modified` datetime(0) NULL DEFAULT NULL,PRIMARY KEY (`xid`) USING BTREE,INDEX `idx_gmt_modified_status`(`gmt_modified`, `status`) USING BTREE,INDEX `idx_transaction_id`(`transaction_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;-- ----------------------------
-- Table structure for lock_table
-- ----------------------------
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (`row_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,`xid` varchar(96) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`transaction_id` mediumtext CHARACTER SET utf8 COLLATE utf8_bin NULL,`branch_id` mediumtext CHARACTER SET utf8 COLLATE utf8_bin NULL,`resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`table_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`pk` varchar(36) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL,`gmt_create` datetime(0) NULL DEFAULT NULL,`gmt_modified` datetime(0) NULL DEFAULT NULL,PRIMARY KEY (`row_key`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;
5.server配置完成。上传配置好的文件到centos,并启动
启动完成后,会自动注册到nacos.
分布式业务说明
这里有三个服务,一个订单服务,一个库存服务,一个账户服务
当用户下单时,会在订单服务创建一个订单;通过远程服务调用减库存;并通过远程调用来扣减账户余额;最后在订单服务中修改订单完成状态。这里跨越三个数据库,有二次调用服务。
1.业务建库
create database seata_order;create database seata_storage;create database seata_account;
2.在对应的库中建表
--订单库建立订单表
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order` (`int` bigint(11) NOT NULL AUTO_INCREMENT,`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',`count` int(11) DEFAULT NULL COMMENT '数量',`money` decimal(11, 0) DEFAULT NULL COMMENT '金额',`status` int(1) DEFAULT NULL COMMENT '订单状态: 0:创建中 1:已完结',PRIMARY KEY (`int`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '订单表' ROW_FORMAT = Dynamic;--账户库建立账户表
CREATE TABLE `t_account` (`id` bigint(11) NOT NULL COMMENT 'id',`user_id` bigint(11) DEFAULT NULL COMMENT '用户id',`total` decimal(10, 0) DEFAULT NULL COMMENT '总额度',`used` decimal(10, 0) DEFAULT NULL COMMENT '已用余额',`residue` decimal(10, 0) DEFAULT NULL COMMENT '剩余可用额度',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '账户表' ROW_FORMAT = Dynamic;INSERT INTO `t_account` VALUES (1, 1, 1000, 0, 1000);--库存库建立库存表
DROP TABLE IF EXISTS `t_storage`;
CREATE TABLE `t_storage` (`int` bigint(11) NOT NULL AUTO_INCREMENT,`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',`total` int(11) DEFAULT NULL COMMENT '总库存',`used` int(11) DEFAULT NULL COMMENT '已用库存',`residue` int(11) DEFAULT NULL COMMENT '剩余库存',PRIMARY KEY (`int`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '库存' ROW_FORMAT = Dynamic;
INSERT INTO `t_storage` VALUES (1, 1, 100, 0, 100);
3.并在每个库中建立回滚日志表
CREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`branch_id` bigint(20) NOT NULL,`xid` varchar(100) NOT NULL,`context` varchar(128) NOT NULL,`rollback_info` longblob NOT NULL,`log_status` int(11) NOT NULL,`log_created` datetime NOT NULL,`log_modified` datetime NOT NULL,`ext` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
最终server和客户端表结构如下:
4.新建工程和配置工程
pom文件内容,在spring cloud 项目基础上增加seata包
<!--Seata 包--><dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>1.3.0</version></dependency>
工程目录结构如下:一共4个服务,business-service为业务集成
各个服务增加seata配置,nacos配置参考之前的文章
# seata配置
spring.cloud.alibaba.seata.tx-service-group=lockie-test-group
# Seata 应用名称,默认使用 ${spring.application.name}
seata.application-id=${spring.application.name}
# Seata 事务组, 高版本没找到相关配置, 是否可配置未知 选用默认default
seata.tx-service-group=default
# 服务配置
# 此处Key对应 tx-service-group 的 Value, 此处 value 默认 default
seata.service.vgroup-mapping.default=default
# 分组和 Seata 服务的映射 默认端口 8091
seata.service.grouplist.default=127.0.0.1:8091
对二阶段提交的补充说明:
一阶段数据加载
在一阶段的时候,Seata会拦截参与服务的“业务SQL”
1.解析业务SQL,找到“业务SQL”要更新的业务数据,在业务更新前,将其保存成“Before image”
2.执行“业务SQL”更新业务逻辑数据
3.在业务更新数据之后,保存成为"After image",最后生成行锁
以上3个操作就保障了一个数据库事务内完成,这样就保证了第一阶段操作的原子性(类似于AOP的前置+后置逻辑处理)
二阶段提交
如果在二阶段提交顺利的话,因为业务SQL已经提交到数据库中,所以Seata框架只需要讲一阶段保存的数据快照和行锁删除,完成数据清理即可
二阶段回滚
二阶段服务在执行的过程中遇到任何异常,Seata就需要回滚一阶段已经执行的业务SQL,还原业务数据。
回滚方式就是用Before image 还原业务数据;但是在还原前会验证数据是否有脏读,对比当前数据和After image是否一致,如果完全一致,表示没有脏读,可以还原业务,如果不一致就表示有脏读,就需要人工干预处理
调用,模仿购物下单,扣减库存,扣减用户金额
调用方:添加事务注解@GlobalTransactional(rollbackFor = Exception.clas),被调用方不需要增加
@Service
public class BusinessService
{private static final Logger LOGGER = LoggerFactory.getLogger(BusinessService.class);@Autowiredprivate StorageFeignClient storageClient;@Autowiredprivate OrderFeignClient orderClient;/*** 减库存,下订单**@GlobalTransactional 一个注解搞定全局事务* @param userId* @param commodityCode* @param orderCount*/@GlobalTransactional(rollbackFor = Exception.class)public void purchase(String userId, String commodityCode, int orderCount){LOGGER.info("BusinessService purchase begin ... xid: {}" ,RootContext.getXID());storageClient.deduct(commodityCode, orderCount);orderClient.create(userId, commodityCode, orderCount);}
}
订单接口
@FeignClient("order-service")
public interface OrderFeignClient
{@GetMapping("/order/create")public void create(@RequestParam("userId") String userId, @RequestParam("productId") String productId, @RequestParam("count") Integer count);}
库存接口
@FeignClient("storage-service")
public interface StorageFeignClient
{@GetMapping("/storage/deduct")public void deduct(@RequestParam("productId") String productId, @RequestParam("count") Integer count);}
正常测试我们输入正常数据,异常测试调整库存使下单报库存不足的错误就可以。