Seata

Seata 介绍

Seata 是从两段提交演变而来的一种分布式事务解决方案,提供了 ATTCCSAGAXA 等事务模式,这里重点介绍 AT模式。

既然 Seata 是两段提交,那我们看看它在每个阶段都做了点啥?下边我们还以下单扣库存、扣余额举例。

部分介绍转载于seata官方文档等:http://seata.io/zh-cn/docs/overview/what-is-seata.html | https://zhuanlan.zhihu.com/p/344184343 | https://zhuanlan.zhihu.com/p/315164700

先介绍Seata分布式事务的几种角色:

  1. Transaction Coordinator(TC): 全局事务协调者,用来协调全局事务和各个分支事务(不同服务)的状态, 驱动全局事务和各个分支事务的回滚或提交。
  2. Transaction Manager(TM): 事务管理者,业务层中用来开启/提交/回滚一个整体事务(在调用服务的方法中用注解开启事务)。
  3. Resource Manager(RM): 资源管理者,一般指业务数据库代表了一个分支事务(Branch Transaction),管理分支事务与TC进行协调注册分支事务并且汇报分支事务的状态,驱动分支事务的提交或回滚。

Seata 实现分布式事务,设计了一个关键角色 UNDO_LOG (回滚日志记录表),我们在每个应用分布式事务的业务库中创建这张表,这个表的核心作用就是,将业务数据在更新前后的数据镜像组织成回滚日志,备份在 UNDO_LOG 表中,以便业务异常能随时回滚。

第一个阶段

比如:下边我们更新 user 表的 name 字段。

update user set name = '锦泉' where name = 'ArthurJQ'

首先 Seata 的 JDBC 数据源代理通过对业务 SQL 解析,提取 SQL 的元数据,也就是得到 SQL 的类型(UPDATE),表(user),条件(where name = 'ArthurJQ')等相关的信息。

先查询数据前镜像,根据解析得到的条件信息,生成查询语句,定位一条数据。

select  name from user where name = 'ArthurJQ'
ID NAME USER_ID
1 ArthurJQ 9527

紧接着执行业务 SQL,根据前镜像数据主键查询出后镜像数据

select name from user where id = 1
ID NAME USER_ID
1 锦泉 9527

把业务数据在更新前后的数据镜像组织成回滚日志,将业务数据的更新和回滚日志在同一个本地事务中提交,分别插入到业务表和 UNDO_LOG 表中。

回滚记录数据格式如下:包括 afterImage 前镜像、beforeImage 后镜像、 branchId 分支事务ID、xid 全局事务ID

{
    "branchId":641789253,
    "xid":"xid:xxx",
    "undoItems":[
        {
            "afterImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "name":"id",
                                "type":4,
                                "value":1
                            }
                        ]
                    }
                ],
                "tableName":"product"
            },
            "beforeImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "name":"id",
                                "type":4,
                                "value":1
                            }
                        ]
                    }
                ],
                "tableName":"product"
            },
            "sqlType":"UPDATE"
        }
    ]
}

这样就可以保证,任何提交的业务数据的更新一定有相应的回滚日志。

在本地事务提交前,各分支事务需向 全局事务协调者 TC 注册分支 ( Branch Id) ,为要修改的记录申请 全局锁 ,要为这条数据加锁,利用 SELECT FOR UPDATE 语句。而如果一直拿不到锁那就需要回滚本地事务。TM 开启事务后会生成全局唯一的 XID,会在各个调用的服务间进行传递。

有了这样的机制,本地事务分支(Branch Transaction)便可以在全局事务的第一阶段提交,并马上释放本地事务锁定的资源。相比于传统的 XA 事务在第二阶段释放资源,Seata 降低了锁范围提高效率,即使第二阶段发生异常需要回滚,也可以快速 从UNDO_LOG 表中找到对应回滚数据并反解析成 SQL 来达到回滚补偿

最后本地事务提交,业务数据的更新和前面生成的 UNDO LOG 数据一并提交,并将本地事务提交的结果上报给全局事务协调者 TC。

第二个阶段

第二阶段是根据各分支的决议做提交或回滚:

如果决议是全局提交,此时各分支事务已提交并成功,这时 全局事务协调者(TC) 会向分支发送第二阶段的请求。收到 TC 的分支提交请求,该请求会被放入一个异步任务队列中,并马上返回提交成功结果给 TC。异步队列中会异步和批量地根据 Branch ID 查找并删除相应 UNDO LOG 回滚记录。并把相关事务信息如:行锁删除,之后让因为在竞争锁被阻塞的事务顺利进行。

如果二阶段是回滚,过程比全局提交麻烦一点,

首先在 Client 端收到 TC 告知的二阶段是回滚时,会去查到对应的事务的 undolog,取出后镜像,对比当前的数据(因为 SeataAT 是从业务应用层面进行保护分布式事务,如果此时在数据库层面直接修改了库内信息,这个时候 SeataAT 的行锁不起隔离性作用),如果出现了在全局事务以外的数据修改,此时判定为脏写,而 Seata 因为无法感知这个脏写如何发生,此时只能打印日志和触发异常通知,告知用户需要人工介入(规范修改数据入口可避免脏写)。

而如果没有发生脏写就比较简单了,拿出前镜像,众所皆知事务是需要有原子性的,要么一起发生,要么都不发生,此时前镜像记录了发生之前的数据,进行回滚后,就达到了类似本地事务那样的原子性效果。回滚后,再把事务相关信息,如 undolog,行锁进行删除。二阶段回滚算是告一段落了。

注意:这里删除回滚日志记录操作,一定是在本地业务事务执行之后

AT 在 Seata 的分布式事务框架

可以看到,AT 与其它事务模式在 Seata 事务框架中,会多出一个 undolog 的表(相对其它模式的入侵点),但是除此之外,对业务来说,几乎是零入侵性,这也就是为什么 AT 模式在 Seata 中受众广泛的原因。

写隔离

  • 一阶段本地事务提交前,需要确保先拿到 全局锁
  • 拿不到 全局锁 ,不能提交本地事务。
  • 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。

以一个示例来说明:

两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。

tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁

tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。

如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。

此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。

因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。

读隔离

在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted)

如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。

SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。

出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

Seata Server 搭建

Seata 是一个需独立部署的中间件,所以先搭 Seata Server

seata下载:https://seata.io/zh-cn/blog/download.html

seata源码和seata server两个都要下载,分别解压,如下:

file.conf

seata\conf\目录下的file.conf 文件用于配置持久化事务日志的模式,目前提供 filedbredis 三种方式。例如使用 db 方式修改file.conf如下:

## transaction log store, only used in seata-server
store {
  ## store mode: file、db、redis
  mode = "db"

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.cj.jdbc.Driver"
    url = "jdbc:mysql://10.0.1.110:3308/seata?serverTimezone=Asia/Shanghai"
    user = "mysql"
    password = "mysql"
    minConn = 5
    maxConn = 100
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }

}

注意:在选择 db 方式后,需要在对应数据库创建 globalTable(持久化全局事务)、branchTable(持久化各提交分支的事务)、 lockTable(持久化各分支锁定资源事务)三张表。

seata server数据库创建

里边有对应的 globalTable(持久化全局事务)、branchTable(持久化各提交分支的事务)、 lockTable(持久化各分支锁定资源事务)三张表,粘出来去对应的数据库创建就行了。比如mysql.sql:

-- the table to store GlobalSession data
-- 持久化全局事务
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
-- 持久化各提交分支的事务
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
-- 持久化每个分支锁表事务
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

seata server配置文件推送到nacos

打开config.txt文件,因为要推送到nacos(方便统一配置),把需要配置的参数保留就好,修改配置如下:

service.vgroupMapping.my_test_tx_group=default
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://10.0.1.110:3308/seata?serverTimezone=Asia/Shanghai
store.db.user=#你创建表的数据库登录的用户名
store.db.password=#数据库登录密码
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000

进入nacos目录,git执行命令

sh nacos-config.sh -h 10.0.1.104 -p 8848 -g SEATA_GROUP -t 6e802a92-d7a4-498f-8f14-5625988e1608 -u nacos -w nacos

上面指令执行成功后nacos配置中心能看到推送过去的配置,如下

registry.conf

registry.conf 文件设置 注册中心 和 配置中心:

目前注册中心支持 nacoseurekarediszkconsuletcd3sofa 七种,这里我使用的 nacos作为注册中心 ; 配置中心支持 nacosapollozkconsuletcd3 五种方式。

将seata server注册到nacos,打开\seata\conf目录下的registry.conf,修改部分配置如下:

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10

  nacos {
    application = "seata-server"
    serverAddr = "10.0.1.104:8848"
    group = "SEATA_GROUP"
    namespace = "" #nacos服务空间
    cluster = "default"
    username = "" #nacos用户名
    password = "" #nacos密码
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
    serverAddr = "10.0.1.104:8848"
    namespace = "" #nacos服务空间
    group = "SEATA_GROUP"
    username = "" #nacos用户名
    password = "" #nacos密码
  }
}

配置完以后在 \seata\bin 目录下启动 seata-server 即可,到这 Seata 的服务端就搭建好了。

Seata Client

Seata Server 环境搭建完,接下来我们新建三个服务 order-server(下单服务)、storage-server(扣减库存服务)、account-server(账户金额服务),分别服务注册到 nacos。

应用接入Seata Server

添加seata依赖:

    <artifactId>framework-seata</artifactId>

    <dependencies>
        <!-- alibaba-seata -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- spring-boot-seata -->
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.4.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <!-- 指定项目编译时的java版本和编码方式 -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler-plugin.version}</version>
                <configuration>
                    <target>${java.version}</target>
                    <source>${java.version}</source>
                    <encoding>${encoding}</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

每个服务创建application.yml文件:

# Seata 配置项,对应 SeataProperties 类
seata:
  enabled: true
  tx-service-group: my_test_tx_group # Seata 事务组编号,用于 TC 集群名
  # Seata 服务配置项,对应 ServiceProperties 类
  config:
    type: nacos
    nacos:
      group: SEATA_GROUP
      namespace: 09dd3d79-92c0-47fe-bd1d-0f1b7f8dc928
      server-addr: 10.0.1.104:8848
      username: ""
      password: ""
  # Seata 注册中心配置项,对应 RegistryProperties 类
  registry:
    type: nacos # 注册中心类型,默认为 file
    nacos:
      application: seata-server # Nacos 服务名
      cluster: default # 使用的 Nacos 机器名
      group: SEATA_GROUP # Nacos 组
      namespace: 09dd3d79-92c0-47fe-bd1d-0f1b7f8dc928 # Nacos 命名空间
      serverAddr: 10.0.1.104:8848 # Nacos 服务地址
      username: ""
      password: ""

举个栗子

业务大致流程:用户发起下单请求,本地 order 订单服务创建订单记录,并通过 RPC 远程调用 storage 扣减库存服务和 account 扣账户余额服务,只有三个服务同时执行成功,才是一个完整的下单流程。如果某个服执行失败,则其他服务全部回滚。

Seata 对业务代码的侵入性非常小,代码中使用只需用 @GlobalTransactional 注解开启一个全局事务即可

@Override
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public void create(Order order) {

    String xid = RootContext.getXID();

    LOGGER.info("------->交易开始");
    //本地方法
    orderDao.create(order);

    //远程方法 扣减库存
    storageApi.decrease(order.getProductId(), order.getCount());

    //远程方法 扣减账户余额
    LOGGER.info("------->扣减账户开始order中");
    accountApi.decrease(order.getUserId(), order.getMoney());
    LOGGER.info("------->扣减账户结束order中");

    LOGGER.info("------->交易结束");
    LOGGER.info("全局事务 xid: {}", xid);
}

seata client 表创建

进入都seata源码里,选择客户端,再选择at模式:

打开mysql.sql(选择自己对应的数据库,我是用的mysql),前边说过 Seata AT 模式实现分布式事务,必须在相关的业务库中创建 undo_log 表来存数据回滚日志,表结构如下:

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

我们在每个应用分布式事务的业务库中创建这张表,这个表的核心作用就是,将业务数据在更新前后的数据镜像组织成回滚日志,备份在 UNDO_LOG 表中,以便业务异常能随时回滚。

模块接入seata

将上一步的操作,新建了一个模块framework-seata,所以其他业务模块想接入seata,只需要引入该模块即可(不要忘记每个client都需要新建undo_log表)。

<dependency>
  <groupId>cn.wisefly</groupId>
  <artifactId>framework-seata</artifactId>
  <version>${wisefly.version}</version>
</dependency>

注意事项

  1. 开启分布式事务需要在入口方法加上@GlobalTransactional注解
  2. 不建议方法有try-catch,如果有需要在catch里面加上GlobalTransactionContext.reload(RootContext.getXID()).rollback();
  3. 由于seata在一阶段提交的时候就commit,所以存在脏读可能。强烈建议在涉及分布式事务的查询语句后面加上for update

   转载规则


《Seata》 锦泉 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录