# Seata 分布式事务

# 概述

  • 分布式事务指事务的操作位于不同的服务节点上,因此需要服务与服务之间远程协作才能完成事务操作,这种分布式系统环境下由不同的服务之间通过网络远程协作完成事务称之为分布式事务。

  • Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 为用户提供了 ATTCCXA 和 SAGA(使用较复杂,不建议使用) 四种事务模式,为用户打造一站式的分布式解决方案。

  • Seata术语

    • TC (Transaction Coordinator) - 事务协调者

    负责维护全局和分支事务的状态,驱动全局事务提交或回滚,即seata-server端服务。

    • TM (Transaction Manager) - 事务管理器

    定义全局事务的范围:开始全局事务、提交或回滚全局事务,即请求发起方。

    • RM (Resource Manager) - 资源管理器

    管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚,即请求参与方。

  • 工作流程

seata_0 1.TM 请求 TC,开始一个新的全局事务,TC 会为这个全局事务生成一个 XID。

2.XID 通过微服务的调用链传递到其他微服务。

3.RM 把本地事务作为这个XID的分支事务注册到TC。

4.TM 请求 TC 对这个 XID 进行提交或回滚。

5.TC 指挥这个 XID 下面的所有分支事务进行提交、回滚。

下面详细介绍下每种模式的使用情况。

# AT模式

# AT模式介绍

  • 运行机制

    基于两阶段提交协议的演变,一阶段记录业务数据和回滚日志并在同一个本地事务中提交,释放本地锁和连接资源;二阶段异步提交及清理回滚日志或通过一阶段的回滚日志进行反向补偿回滚。

  • 适用场景

    基于支持本地 ACID 事务的关系型数据库,适用于公司内部绝大部分业务场景。

  • 优势

    业务无侵入,使用简单(使用注解即可)。

  • 劣势

    在本地事务提交前,要尝试先拿到该记录的全局锁,对性能有一定影响;AT 需要做 SQL 解析,在 SQL 支持上不及利用本地事务的XA模式,具体 SQL 限制参考官方文档 SQL限制 (opens new window)

# 服务端配置

# windows 版本-适用于本地调试

1.下载安装包点击去下载 (opens new window),下载完成后解压

2.进入解压后的seata-server-1.6.1conf目录,修改application.yml文件,这是seata服务端的核心配置文件,可以通过该文件配置服务注册方式、配置读取方式等。

seata_6

配置如下(需要重点关注注释的配置)

server:
  port: 7091

spring:
  ## seata服务的应用名
  application:
    name: seata-server
logging:
  config: classpath:logback-spring.xml
  file:
    path: ${log.home:${user.home}/logs/seata}

console:
  user:
    username: seata
    password: seata
seata:
  config:
    # support: nacos, consul, apollo, zk, etcd3
    type: nacos
    nacos:
      ## 配置中心地址
      server-addr: 114.251.235.6:31728
      ## 配置中心命名空间ID
      namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
      ## 配置中心群组名称
      group: SEATA_GROUP
      ## nacos用户名
      username: lizimin_local
      ## nacos密码
      password: lizimin_local
      context-path:
      ## seata配置文件的dataId
      data-id: seataServer.properties
  registry:
    # support: nacos, eureka, redis, zk, consul, etcd3, sofa
    type: nacos
    preferred-networks: 30.240.*
    nacos:
      ## seata服务的应用名
      application: seata-server
      ## seata服务注册地址
      server-addr: 114.251.235.6:31728
      ## seata服务注册群组
      group: SEATA_GROUP
      ## seata服务的命名空间ID
      namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
      ## seata服务集群名称
      cluster: default
      ## nacos用户名
      username: lizimin_local
      ## nacos密码
      password: lizimin_local
      context-path:
  server:
    service-port: 8091 #If not configured, the default is '${server.port} + 1000'
    max-commit-retry-timeout: -1
    max-rollback-retry-timeout: -1
    rollback-retry-timeout-unlock-enable: false
    enable-check-auth: true
    enable-parallel-request-handle: true
    retry-dead-threshold: 130000
    xaer-nota-retry-timeout: 60000
    enableParallelRequestHandle: true
    recovery:
      committing-retry-period: 1000
      async-committing-retry-period: 1000
      rollbacking-retry-period: 1000
      timeout-retry-period: 1000
    undo:
      log-save-days: 7
      log-delete-period: 86400000
    session:
      branch-async-queue-size: 5000 #branch async remove queue size
      enable-branch-async-remove: false #enable to asynchronous remove branchSession
  store:
    # support: file 、 db 、 redis
    mode: db
    db:
      datasource: druid
      db-type: kingbase
      driver-class-name: com.kingbase8.Driver
      url: jdbc:kingbase8://114.251.235.9:8721/seata
      user: system
      password: aq1unKiemrWatdt2
      min-conn: 10
      max-conn: 100
      global-table: global_table
      branch-table: branch_table
      lock-table: lock_table
      distributed-lock-table: distributed_lock
      query-limit: 1000
      max-wait: 5000
    redis:
      mode: single
      database: 0
      min-conn: 10
      max-conn: 100
      password:
      max-total: 100
      query-limit: 1000
      single:
        host: 127.0.0.1
        port: 6379
      sentinel:
        master-name:
        sentinel-hosts:
  metrics:
    enabled: false
    registry-type: compact
    exporter-list: prometheus
    exporter-prometheus-port: 9898
  transport:
    rpc-tc-request-timeout: 15000
    enable-tc-server-batch-send-response: false
    shutdown:
      wait: 3
    thread-factory:
      boss-thread-prefix: NettyBoss
      worker-thread-prefix: NettyServerNIOWorker
      boss-thread-size: 1
  #  server:
  #    service-port: 8091 #If not configured, the default is '${server.port} + 1000'
  security:
    secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
    tokenValidityInMilliseconds: 1800000
    ignore:
      urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login

3.在nacos上新建命名空间,名称为seata,ID与步骤2中nacos.namespace保持一致

seata_7

seata命名空间下新建配置,Data ID与步骤2中config.nacos.dataId保持一致;Group与config.nacos.group保持一致

seata_8

拷贝下述配置到新建的配置文件中,重点可关注注释部分的配置,完整配置描述请参考官网参数配置 (opens new window)

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
#事务群组,my_test_tx_group为分组(可自定义),配置项值为TC集群名(也可自定义)
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=20
client.rm.lock.retryTimes=60
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=true
client.rm.sagaBranchRegisterEnable=true
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
#事务会话信息存储方式为,file本地文件(不支持HA),db数据库(支持HA)
store.mode=db
#store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
#事务会话信息存储数据库类型
store.db.dbType=mysql
#事务会话信息存储数据库驱动
store.db.driverClassName=com.mysql.cj.jdbc.Driver
#事务会话信息存储数据库地址,创建脚本见后续步骤
store.db.url=jdbc:mysql://localhost:3306/seata?useUnicode=true&rewriteBatchedStatements=true
#事务会话信息存储数据库用户名
store.db.user=root
#事务会话信息存储数据库密码
store.db.password=root
store.db.minConn=5
store.db.maxConn=100
#全局事务的表名,创建脚本见后续步骤
store.db.globalTable=global_table
#分支事务的表名,创建脚本见后续步骤
store.db.branchTable=branch_table
store.db.queryLimit=100
##全局锁的表名,创建脚本见后续步骤
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
#store.redis.sentinel.masterName=
#store.redis.sentinel.sentinelHosts=
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
#store.redis.password=
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=kryo
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

4.在步骤3中配置的store.db.url所指定的schema下创建以下表

  • 全局事务表
--Mysql
CREATE TABLE IF NOT EXISTS `global_table`
(
  `xid`                       VARCHAR(128) NOT NULL,
  `transaction_id`            BIGINT,
  `status`                    TINYINT      NOT NULL,
  `application_id`            VARCHAR(32),
  `transaction_service_group` VARCHAR(32),
  `transaction_name`          VARCHAR(128),
  `timeout`                   INT,
  `begin_time`                BIGINT,
  `application_data`          VARCHAR(2000),
  `gmt_create`                DATETIME,
  `gmt_modified`              DATETIME,
  PRIMARY KEY (`xid`),
  KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
  KEY `idx_transaction_id` (`transaction_id`)
  ) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

--Oracle/kingbase
CREATE TABLE global_table
(
  xid                       VARCHAR2(128) NOT NULL,
  transaction_id            NUMBER(19),
  status                    NUMBER(3)     NOT NULL,
  application_id            VARCHAR2(32),
  transaction_service_group VARCHAR2(32),
  transaction_name          VARCHAR2(128),
  timeout                   NUMBER(10),
  begin_time                NUMBER(19),
  application_data          VARCHAR2(2000),
  gmt_create                TIMESTAMP(0),
  gmt_modified              TIMESTAMP(0),
  PRIMARY KEY (xid)
);

CREATE INDEX idx_status_gmt_modified ON global_table (status, gmt_modified);
CREATE INDEX idx_transaction_id ON global_table (transaction_id);
  • 分支事务表
--Mysql
CREATE TABLE IF NOT EXISTS `branch_table`
(
  `branch_id`         BIGINT       NOT NULL,
  `xid`               VARCHAR(128) NOT NULL,
  `transaction_id`    BIGINT,
  `resource_group_id` VARCHAR(32),
  `resource_id`       VARCHAR(256),
  `branch_type`       VARCHAR(8),
  `status`            TINYINT,
  `client_id`         VARCHAR(64),
  `application_data`  VARCHAR(2000),
  `gmt_create`        DATETIME(6),
  `gmt_modified`      DATETIME(6),
  PRIMARY KEY (`branch_id`),
  KEY `idx_xid` (`xid`)
  ) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

--Oracle/kingbase
CREATE TABLE branch_table
(
  branch_id         NUMBER(19)    NOT NULL,
  xid               VARCHAR2(128) NOT NULL,
  transaction_id    NUMBER(19),
  resource_group_id VARCHAR2(32),
  resource_id       VARCHAR2(256),
  branch_type       VARCHAR2(8),
  status            NUMBER(3),
  client_id         VARCHAR2(64),
  application_data  VARCHAR2(2000),
  gmt_create        TIMESTAMP(6),
  gmt_modified      TIMESTAMP(6),
  PRIMARY KEY (branch_id)
);

CREATE INDEX idx_xid ON branch_table (xid);
  • 全局锁表
--Mysql
CREATE TABLE IF NOT EXISTS `lock_table`
(
  `row_key`        VARCHAR(128) NOT NULL,
  `xid`            VARCHAR(128),
  `transaction_id` BIGINT,
  `branch_id`      BIGINT       NOT NULL,
  `resource_id`    VARCHAR(256),
  `table_name`     VARCHAR(32),
  `pk`             VARCHAR(36),
  `status`         TINYINT      NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
  `gmt_create`     DATETIME,
  `gmt_modified`   DATETIME,
  PRIMARY KEY (`row_key`),
  KEY `idx_status` (`status`),
  KEY `idx_branch_id` (`branch_id`),
  KEY `idx_xid` (`xid`)
  ) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

--Oracle/kingbase
CREATE TABLE lock_table
(
  row_key        VARCHAR2(128) NOT NULL,
  xid            VARCHAR2(128),
  transaction_id NUMBER(19),
  branch_id      NUMBER(19)    NOT NULL,
  resource_id    VARCHAR2(256),
  table_name     VARCHAR2(32),
  pk             VARCHAR2(36),
  status         NUMBER(3)     NOT NULL DEFAULT 0,
  gmt_create     TIMESTAMP(0),
  gmt_modified   TIMESTAMP(0),
  PRIMARY KEY (row_key)
);

comment on column lock_table.status is '0:locked ,1:rollbacking';
CREATE INDEX idx_branch_id ON lock_table (branch_id);
CREATE INDEX lock_table_idx_xid ON lock_table (xid);
CREATE INDEX idx_status ON lock_table (status);
  • 分支事锁表
--Mysql
CREATE TABLE IF NOT EXISTS `distributed_lock`
(
  `lock_key`       CHAR(20) NOT NULL,
  `lock_value`     VARCHAR(20) NOT NULL,
  `expire`         BIGINT,
  primary key (`lock_key`)
  ) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);

--Oracle/kingbase
CREATE TABLE distributed_lock (
   lock_key   VARCHAR2(20)  NOT NULL,
   lock_value   VARCHAR2(20)  NOT NULL,
   expire    DECIMAL(18)   NOT NULL,
   PRIMARY KEY (lock_key)
);

INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);

5.进入seata-server-1.6.1bin目录,双击seata-server.bat即可启动seata-server服务。

seata_9

效果如下即启动成功

seata_10

# linux 版本-正式环境

1.下载安装包

下载安装包后手动上传到服务器点击去下载 (opens new window)

下载完成后解压

tar -xvf seata-server-1.6.1.tar.gz

解压后目录如下

seata_13

2.进入目录/seata/seata/conf,修改application.yml文件,具体参考windows版步骤2

3.在nacos上新建命名空间与配置文件,创建过程参考windows版步骤3

4.创建数据库表,脚本参考windows版步骤4。

5.进入目录/seata/bin,执行以下脚本命令启动服务,详细启动参数可参考支持的启动参数 (opens new window)

nohup sh seata-server.sh -p 8092 -h 127.0.0.1 -m db > ../logs/start.out 2>&1 &

查看logs/start.out日志效果如下即启动成功

seata_14

# 客户端配置

1.各业务数据库执行脚本

  • Mysql数据库
CREATE TABLE IF NOT EXISTS `undo_log`
(
  `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
  `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
  `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
  `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
  `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
  ) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
  • Oracle/KingBase数据库
CREATE TABLE undo_log
(
  id            NUMBER(19)    NOT NULL,
  branch_id     NUMBER(19)    NOT NULL,
  xid           VARCHAR2(128) NOT NULL,
  context       VARCHAR2(128) NOT NULL,
  rollback_info BLOB          NOT NULL,
  log_status    NUMBER(10)    NOT NULL,
  log_created   TIMESTAMP(0)  NOT NULL,
  log_modified  TIMESTAMP(0)  NOT NULL,
  PRIMARY KEY (id),
  CONSTRAINT ux_undo_log UNIQUE (xid, branch_id)
);

COMMENT ON TABLE undo_log IS 'AT transaction mode undo table';

-- Generate ID using sequence and trigger
CREATE SEQUENCE UNDO_LOG_SEQ START WITH 1 INCREMENT BY 1;

2.各服务添加seata相关配置

#seata相关配置
seata:
  #是否开启spring-boot自动装配,默认true
  enabled: true
  #是否开启数据源自动代理,默认true,如果使用多租户动态数据源功能则需关闭此按钮
  enableAutoDataSourceProxy: true
  #数据源代理模式 可选值AT、XA,默认为AT
  data-source-proxy-mode: AT
  #事务分组配置项,程序会拼接[service.vgroupMapping.事务分组配置项]这样一个完整配置去配置中心上的seata-server
  #配置文件seataServer.properties中查找该完整配置的值,其值就是TC集群的名称,然后就可以根据集群名称获取真实的
  #TC服务列表。所以要求该配置项必须与seataServer.properties中[service.vgroupMapping.xxx=集群名称]的xxx相同
  tx-service-group: my_test_tx_group
  registry:
    type: nacos
    nacos:
      #seata服务端应用名称,与registry.conf中保持一致
      application: seata-server
      #seata服务端在注册中心上的地址,与registry.conf中保持一致
      server-addr: 114.242.246.250:8040
      #seata服务端在注册中心上的分组,与registry.conf中保持一致
      group : SEATA_GROUP
      #seata服务端在注册中心上的命名空间,与registry.conf中保持一致
      namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
      #seata服务端所属注册中心用户名,与registry.conf中保持一致
      username: nacos
      #seata服务端所属注册中心密码,与registry.conf中保持一致
      password: nacos
      #TC集群名称,与registry.conf中保持一致
      cluster: default
  config:
    type: nacos
    nacos:
      #seata服务端在配置中心的地址,与registry.conf中保持一致
      server-addr: 114.242.246.250:8040
      #seata服务端在配置中心的分组,与registry.conf中保持一致
      group : SEATA_GROUP
      #seata服务端在配置中心的命名空间,与registry.conf中保持一致
      namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
      #seata服务端在配置中心的配置文件名称,与registry.conf中保持一致
      dataId: seataServer.properties
      #seata服务端所属配置中心用户名,与registry.conf中保持一致
      username: nacos
      #seata服务端所属配置中心密码,与registry.conf中保持一致
      password: nacos

3.关闭 Ribbon 的重试机制

# v4.3 之前版本需设置,之后 ribbon 已经移除,无需设置
ribbon:
  MaxAutoRetriesNextServer: 0

为什么要关闭服务调用的重试?远程业务调用失败有两种可能:(1)远程业务执行失败 (2)远程业务执行成功,网络失败。对于第2种事务场景可能会进行重试,从而导致某个业务执行两次。如果业务上能够控制某个事务接口的幂等性,则不用关闭重试。

4.添加seata依赖

<dependency>
    <groupId>com.mediway.hos</groupId>
    <artifactId>hos-framework-seata-starter</artifactId>
</dependency>

5.发起方服务的方法上添加@GlobalTransactional注解即可。

# 注意事项

  • @GlobalTransactional注解只需要配置在发起方服务上即可,被调用服务无需配置。

  • 异常需要能够保证被发起方服务感知到,如果发生异常但中途被捕获截断则 seata 无法发起回滚操作。

# AT模式使用示例

模拟员工成功签订合同给员工发放奖励的场景,服务间调用链为合同服务->员工服务->账户服务,合同服务为发起方。

# 准备测试数据

分别在合同服务、员工服务、账户服务所在的数据库创建合同表、员工信息表、账户表

  • 合同表
CREATE TABLE `contract` (
  `id` varchar(32) NOT NULL COMMENT 'ID',
  `name` varchar(32) DEFAULT NULL COMMENT '名称',
  `signer` varchar(32) DEFAULT NULL COMMENT '合同签订人',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT NULL COMMENT '更新时间',
  `is_deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否删除',
  `tenant_id` varchar(32) DEFAULT NULL COMMENT '租户ID',
  `remark` text COMMENT '备注',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='合同表'
  • 员工信息表
CREATE TABLE `staff` (
  `id` varchar(32) NOT NULL COMMENT '主键',
  `name` varchar(500) DEFAULT NULL COMMENT '姓名',
  `gender` varchar(2) DEFAULT NULL COMMENT '性别',
  `age` int DEFAULT NULL COMMENT '年龄',
  `email` varchar(60) DEFAULT NULL COMMENT '邮箱',
  `phone` varchar(500) DEFAULT NULL COMMENT '手机号',
  `tenant_id` varchar(32) DEFAULT NULL COMMENT '租户id',
  `org_id` varchar(32) DEFAULT NULL COMMENT '部门id',
  `description` varchar(255) DEFAULT NULL COMMENT '描述',
  `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `is_deleted` int DEFAULT NULL,
  `business_key` varchar(64) DEFAULT NULL COMMENT '业务流水号',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='员工信息表'
  • 账户表
CREATE TABLE `account` (
  `id` varchar(32) NOT NULL COMMENT 'ID',
  `staff_id` varchar(32) DEFAULT NULL COMMENT '员工id',
  `amount` decimal(20,2) NOT NULL DEFAULT '0.00' COMMENT '账户余额',
  `status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '状态 1-正常  2-冻结  3-已补偿',
  `business_key` varchar(64) DEFAULT NULL COMMENT '业务流水号',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT NULL COMMENT '更新时间',
  `is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否删除',
  `tenant_id` varchar(32) DEFAULT NULL COMMENT '租户ID',
  `remark` text COMMENT '备注',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='账户表'
  • 分别为员工信息表和账户表添加一条测试数据
insert into `staff` (`id`, `name`, `gender`, `age`, `email`, `phone`, `tenant_id`, `org_id`, `description`, `create_time`, `update_time`, `is_deleted`, `business_key`) values('e2ab1960cd737111154d46878b5bccbb','丽丽','女','28','lili@qq.com','13201122530','111111',NULL,'jingjing','2023-05-08 11:34:48','2023-05-08 11:34:51',NULL,NULL);
insert into `account` (`id`, `staff_id`, `amount`, `status`, `business_key`, `create_time`, `update_time`, `is_deleted`, `tenant_id`, `remark`) values('b43efca9f12b546df2a87adae09846c5','e2ab1960cd737111154d46878b5bccbb','0.00','1',NULL,'2023-05-08 11:34:48','2023-05-08 11:34:51','0','111111',NULL);

# 代码示例

1.合同服务controller

/**
 * 签订合同-seata-AT模式演示demo
 *
 * @param staffId 员工id
 * @param name    合同名称
 * @param signer  合同签订者
 * @param amount  奖励金额
 * @return
 */
@ApiOperation(value = "签订合同")
@PostMapping("/signContract")
public BaseResponse<Contract> signContract(
        @RequestParam("staffId") String staffId,
        @RequestParam("name") String name,
        @RequestParam("signer") String signer,
        @RequestParam("amount") BigDecimal amount) {
    return BaseResponse.success(contractService.signContract(staffId, name, signer, amount));
    }

2.合同服务service接口及实现类

public interface ContractService extends BaseService<Contract> {
    
    /**
     * 签订合同-AT模式
     *
     * @param staffId 员工id
     * @param name    合同名称
     * @param signer  合同签订者
     * @param amount  合同奖励金额
     * @return
     */
    Contract signContract(String staffId, String name, String signer, BigDecimal amount);
}
@Slf4j
@Service
public class ContractServiceImpl extends BaseServiceImpl<ContractMapper, Contract> implements ContractService {

    @Autowired
    private StaffFeignClient staffFeignClient;

    /**
     * 签订合同-AT模式
     *
     * @param staffId 员工id
     * @param name    合同名称
     * @param signer  合同签订者
     * @param amount  合同奖励金额
     * @return
     */
    @Override
    @GlobalTransactional(rollbackFor = Exception.class,name = "signContract")
    public Contract signContract(String staffId, String name, String signer, BigDecimal amount) {
        log.info("[签订合同]开始,请求入参,staffId:{},name:{},signer:{},amount:{}", staffId, name, signer, amount);
        log.info("开始全局事务,XID = " + RootContext.getXID());

        // 增加一条合同数据
        Contract contract = addContract(name, signer);

        // 给用户发放签订合同成功奖励
        BaseResponse<Staff> staffBaseResponse = staffFeignClient.awardForSignContract(staffId, amount);
        log.info("[给用户发放签订合同成功奖励]返回结果:{}", staffBaseResponse);

        //此处需要加此判断,否则下游异常会被全局异常处理机制捕获,无法向上传递到TM,导致不回滚
        if (!staffBaseResponse.isSuccess()) {
            throw new RuntimeException("[给用户发放签订合同成功奖励]失败");
        }

        log.info("[签订合同]结束。。。。。。。。。。。]");
        return contract;
    }


    /**
     * 增加合同记录
     *
     * @param name	 合同名称
     * @param signer 合同签订者
     * @return       新增的合同记录
     */
    private Contract addContract(String name, String signer) {
        Contract contract = new Contract();
        contract.setName(name);
        contract.setSigner(signer);
        contract.setIsDeleted(0);
        insert(contract);
        return contract;
    }

}

3.员工服务FeignClient

@FeignClient(value = "oa-user-service", path = "/user/staff")
public interface StaffFeignClient {

    /**
     * 给用户发放签订合同成功奖励-AT模式
     *
     * @param staffId 员工id
     * @param amount  奖励金额
     * @return
     */
    @PostMapping("/awardForSignContract")
    BaseResponse<Staff> awardForSignContract(@RequestParam String staffId, @RequestParam BigDecimal amount);
    
}

4.员工服务controller

@Slf4j
@Api(tags = "员工信息")
@RestController
@RequestMapping("/user/staff")
public class StaffController extends BaseController<Staff> {

    @Autowired
    private StaffService staffService;

    /**
     * 给用户发放签订合同成功奖励-AT模式
     *
     * @param staffId 员工id
     * @param amount  奖励金额
     * @return
     */
    @ApiOperation(value = "给用户发放签订合同成功奖励-AT模式")
    @PostMapping("/awardForSignContract")
    public BaseResponse<Staff> awardForSignContract(@RequestParam("staffId") String staffId, @RequestParam("amount") BigDecimal amount) {
        return BaseResponse.success(staffService.awardForSignContract(staffId, amount));
    }

}

5.员工服务service接口及实现类

@Service
public interface StaffService extends BaseService<Staff> {

    /**
     * 给用户发放签订合同成功奖励-AT模式
     *
     * @param staffId 员工id
     * @param amount  奖励金额
     * @return
     */
    Staff awardForSignContract(String staffId, BigDecimal amount);
}
@Slf4j
@Service
public class StaffServiceImpl extends BaseServiceImpl<StaffMapper, Staff> implements StaffService {

    @Autowired
    private AccountFeignClient accountFeignClient;

    /**
     * 给用户发放签订合同成功奖励-AT模式
     *
     * @param staffId 员工id
     * @param amount  奖励金额
     * @return
     */
    @Override
    public Staff awardForSignContract(String staffId, BigDecimal amount) {
        log.info("[给用户发放签订合同成功奖励]开始,请求入参,staffId:{},amount:{}", staffId, amount);
        log.info("开始全局事务,XID = " + RootContext.getXID());

        // 修改员工描述为已签订合同
        Staff staff = updateStaffDescForContract(staffId);

        // 增加员工账户金额
        BaseResponse<Account> response = accountFeignClient.addAccountAmount(staffId, amount);
        log.info("[给账户增加余额]返回结果:{}", response);

        //此处需要加此判断,否则下游异常会被全局异常处理机制捕获,无法向上传递到TM,导致不回滚
        if (!response.isSuccess()) {
            throw new RuntimeException("[给账户增加余额]失败");
        }

        log.info("[给用户发放签订合同成功奖励]结束。。。。。。。。。。。]");
        return staff;
    }


    /**
     * 更新员工描述为已签订合同
     *
     * @param staffId 员工id
     * @return
     */
    private Staff updateStaffDescForContract(String staffId) {
        Staff staff = selectById(staffId);
        if (staff == null) {
            throw new RuntimeException("员工信息为空");
        }

        staff.setDescription("已成功签订合同");
        staff.setUpdateTime(new Date());
        updateById(staff);

        return staff;
    }

}

6.账户服务FeignClient

@FeignClient(value = "hos-account-service", path = "/account")
public interface AccountFeignClient {

    /**
     * 添加账户余额-AT模式
     *
     * @param staffId   员工id
     * @param addAmount 增加金额
     * @return          操作账户
     */
    @PostMapping("/addAccountAmount")
    BaseResponse<Account> addAccountAmount(@RequestParam String staffId, @RequestParam BigDecimal addAmount);
}

7.账户服务controller

@Slf4j
@RestController
@RequestMapping("/account")
@Api(value = "账户管理")
public class AccountController extends BaseController<Account> {

    @Autowired
    private AccountService accountService;

    /**
     * 添加账户余额-AT模式
     *
     * @param staffId   员工id
     * @param addAmount 增加金额
     * @return          操作账户
     */
    @PostMapping("/addAccountAmount")
    public BaseResponse<Account> addAccountAmount(@RequestParam("staffId") String staffId, @RequestParam("addAmount") BigDecimal addAmount) {
        return BaseResponse.success(accountService.addAccountAmount(staffId, addAmount));
    }

}

8.账户服务service接口及实现类

public interface AccountService extends BaseService<Account> {

    /**
     * 给账户增加余额-AT模式
     *
     * @param staffId	员工id
     * @param addAmount	增加的金额
     * @return
     */
    Account addAccountAmount(String staffId, BigDecimal addAmount);

}
@Slf4j
@Service
public class AccountServiceImpl extends BaseServiceImpl<AccountMapper, Account> implements AccountService {

    @Autowired
    private AccountMapper accountMapper;

    /**
     * 给账户增加余额-AT/XA模式
     *
     * @param staffId	员工id
     * @param addAmount	增加的金额
     * @return
     */
    @Override
    public Account addAccountAmount(String staffId, BigDecimal addAmount) {
        log.info("[添加账户余额]开始,请求入参,staffId:{},amount:{}", staffId, addAmount);
        log.info("开始全局事务,XID = " + RootContext.getXID());

        Account account = accountMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getStaffId, staffId));
        if (account == null) {
            throw new RuntimeException("待操作账户不存在");
        }
        account.setAmount(account.getAmount().add(addAmount));
        account.setUpdateTime(new Date());
        updateById(account);

        // todo 模拟失败
        //int i = 10/0;

        log.info("[添加账户余额]结束。。。。。。。。。。。]");
        return account;
    }

}

9.启动三个业务服务及网关服务,正常请求接口

http://localhost:7100/contract/contract/signContract?staffId=e2ab1960cd737111154d46878b5bccbb&name=20亿超级订单合同&signer=丽丽&amount=300.00
method:POST

接口响应结果

{
    "code": "200",
    "msg": "success",
    "data": {
        "id": "72941a3c269a37a4577c646699984d1b",
        "createTime": "2022-06-02 09:59:32",
        "updateTime": "2022-06-02 09:59:32",
        "current": 0,
        "size": 0,
        "name": "20亿超级订单合同",
        "signer": "丽丽",
        "tenantId": null,
        "isDeleted": 0,
        "remark": null
    },
    "success": true
}

数据库结果,数据已经正确入库

seata_1

10.模拟失败情况

修改AccountServiceImpl,放开//int i = 10/0前的注释,清空之前测试数据,再次发起请求。

接口响应结果

{
    "code": "99001009",
    "msg": "业务处理异常",
    "data": null,
    "success": false
}

代码日志

seata_3

数据库效果

seata_2 数据没有入库,进行了回滚,说明分布式事务生效。至此AT模式示例演示完成。

# TCC模式

# TCC模式介绍

  • 运行机制

    seata_tcc_1

    TCC是Try-Confirm-Cancel的简称,分别对应seata中的预处理Prepare、确认 Confirm、撤销Rollback。Prepare 操作做业务检查及资源预留,Confirm 做业务确认操作,Rollback 实现一个与 Prepare 相反的操作即回滚操作。一阶段执行自定义的 Prepare 操作,如果执行成功则二阶段会执行自定义的 Confirm 操作,否则执行 Rollback 操作进行回滚。

  • 适用场景:

    对于无法完全依赖于数据库事务特性的分布式事务,如涉及非关系型数据库与中间件的操作、跨公司服务的调用、跨语言的应用调用就可使用TCC模式。

  • 优势:

    使用灵活,不依赖于数据库的事务特性来实现两阶段提交,而是采用代码来实现;同时不需要对数据加全局锁,允许多个事务同时操作数据,因此性能很高。

  • 劣势:

    所有分支事务都要手动实现 Prepare、Confirm、Rollback 三个方法, 对业务代码侵入性很强;需要在代码中处理各种异常,所以要将各种情况考虑全面,因为在分布式环境中,出现网络超时、重发,机器宕机等一系列的异常,一旦这些异常情况没有处理或者处理不当,就可能导致业务数据错误。

# 服务端配置

配置流程同AT模式

# 客户端配置

1.各服务添加seata相关配置

#seata相关配置
seata:
  #是否开启spring-boot自动装配,默认true
  enabled: true
  #是否开启数据源自动代理,默认true,如果使用多租户动态数据源功能则需关闭此按钮
  enableAutoDataSourceProxy: true
  #数据源代理模式 可选值AT、XA,默认为AT
  data-source-proxy-mode: AT
  #事务分组配置项,程序会拼接[service.vgroupMapping.事务分组配置项]这样一个完整配置去配置中心上的seata-server
  #配置文件seataServer.properties中查找该完整配置的值,其值就是TC集群的名称,然后就可以根据集群名称获取真实的
  #TC服务列表。所以要求该配置项必须与seataServer.properties中[service.vgroupMapping.xxx=集群名称]的xxx相同
  tx-service-group: my_test_tx_group
  registry:
    type: nacos
    nacos:
      #seata服务端应用名称,与registry.conf中保持一致
      application: seata-server
      #seata服务端在注册中心上的地址,与registry.conf中保持一致
      server-addr: 114.242.246.250:8040
      #seata服务端在注册中心上的分组,与registry.conf中保持一致
      group : SEATA_GROUP
      #seata服务端在注册中心上的命名空间,与registry.conf中保持一致
      namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
      #seata服务端所属注册中心用户名,与registry.conf中保持一致
      username: nacos
      #seata服务端所属注册中心密码,与registry.conf中保持一致
      password: nacos
      #TC集群名称,与registry.conf中保持一致
      cluster: default
  config:
    type: nacos
    nacos:
      #seata服务端在配置中心的地址,与registry.conf中保持一致
      server-addr: 114.242.246.250:8040
      #seata服务端在配置中心的分组,与registry.conf中保持一致
      group : SEATA_GROUP
      #seata服务端在配置中心的命名空间,与registry.conf中保持一致
      namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
      #seata服务端在配置中心的配置文件名称,与registry.conf中保持一致
      dataId: seataServer.properties
      #seata服务端所属配置中心用户名,与registry.conf中保持一致
      username: nacos
      #seata服务端所属配置中心密码,与registry.conf中保持一致
      password: nacos

2.添加seata依赖

<dependency>
    <groupId>com.mediway.hos</groupId>
    <artifactId>hos-framework-seata-starter</artifactId>
</dependency>

3.定义被调用服务

3.1.TCC接口

TCC模式的接口上需要添加@LocalTCC注解,表明这是一个TCC接口

示例:

@LocalTCC
public interface AccountTccService {

}

3.2.接口方法

接口需要定义三个方法,分别对应Try、Confirm 和 Cancel三个操作。其中对应Try操作的方法上需要添加@TwoPhaseBusinessAction注解,该注解含有三个属性如下

  • name:业务操作唯一标识

  • commitMethod:提交时执行的方法,与对应Confirm操作的方法名称保持一致,默认值为commit

  • rollbackMethod:回滚时执行的方法,与对应Cancel操作的方法名称保持一致,默认值为rollback

示例:

/**
 * 给账户增加余额-TCC-prepare方法
 *
 * @param staffId   员工id
 * @param addAmount 增加的金额
 * @return
 */
@TwoPhaseBusinessAction(name = "addAccountAmountPrepare", commitMethod = "addAccountAmountCommit", rollbackMethod = "addAccountAmountRollBack")
void addAccountAmountPrepare(
        @BusinessActionContextParameter(paramName = "staffId") String staffId,
        @BusinessActionContextParameter(paramName = "addAmount") BigDecimal addAmount);

/**
 * 给账户增加余额-TCC-commit方法
 *
 * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
 * @return
 */
boolean addAccountAmountCommit(BusinessActionContext context);

/**
 * 给账户增加余额-TCC-rollback方法(与prepare方法相反的操作)
 *
 * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
 * @return
 */
boolean addAccountAmountRollBack(BusinessActionContext context);

3.3.参数注解@BusinessActionContextParameter

该注解用来修饰 Try 方法的入参,被修饰的入参可以在 Commit 方法和 Rollback 方法中通过 BusinessActionContext 获取。

3.4.TCC事务上下文对象BusinessActionContext

TCC事务上下文对象,可以通过该对象获取到 Try 方法的入参

4.发起方服务的方法上添加@GlobalTransactional注解。

# 注意事项

  • @GlobalTransactional注解只需要配置在发起方服务上即可,被调用服务无需配置。
  • 异常需要能够保证被发起方服务感知到,如果发生异常但中途被捕获截断则 seata 无法发起回滚操作。
  • Confirm 和 Cancel方法一定要保证执行成功,如果执行异常或返回结果为false,则seata会不断进行重试,直到执行成功。所以建议如果这两个方法一直执行失败,需要人工参与解决失败问题。
  • Try、Confirm 和 Cancel三个操作都要保证接口幂等性
  • 空回滚问题:在没用调用Try方法的情况下,先调用了第二阶段的Cancel方法。解决思路是Cancel方法需要识别出这是一个空回滚,然后直接返回成功即可。具体方法例如可以在Try方法执行时在操作记录表中添加一条记录,如果执行Cancel方法时查询该表没有记录,则表明Try方法没有执行,即当前就是空回滚,此时直接返回成功即可。
  • 悬挂问题:第二阶段的Cancel方法比第一阶段的Try方法先执行。由于允许空回滚,在Cancel方法先执行后,此时如果再执行Try方法,那么Try方法预留的业务资源后续无人能够处理,导致资源悬挂。解决方法是在执行Try方法时判断操作记录表是否已被Cancel方法修改过,如果被修改过则不再执行后续操作,直接返回即可。

# TCC模式使用示例

模拟员工成功签订合同给员工发放奖励的场景,服务间调用链为合同业务分别调用合同服务(本模块调用)员工服务(跨模块调用)账户服务(跨模块调用),合同服务为发起方。

1.合同服务controller

@Slf4j
@RestController
@RequestMapping("/contract")
@Api(value = "合同管理")
public class ContractController extends BaseController<Contract> {

    @Autowired
    private ContractService contractService;

    @Autowired
    private ContractBiz contractBiz;

    /**
     * 签订合同-seata-TCC模式演示demo
     *
     * @param staffId 员工id
     * @param name    合同名称
     * @param signer  合同签订者
     * @param amount  奖励金额
     * @return
     */
    @ApiOperation(value = "签订合同")
    @PostMapping("/signContractTcc")
    public BaseResponse signContractTcc(
            @RequestParam("staffId") String staffId,
            @RequestParam("name") String name,
            @RequestParam("signer") String signer,
            @RequestParam("amount") BigDecimal amount) {
        contractBiz.signContractTcc(staffId, name, signer, amount);
        return BaseResponse.success();
    }
}
    

2.合同业务biz

@Slf4j
@Component
public class ContractBiz {

    @Autowired
    private ContractTccService contractTccService;

    @Autowired
    private StaffFeignClient staffFeignClient;

    @Autowired
    private AccountFeignClient accountFeignClient;

    /**
     * 签订合同-TCC模式-TM(服务发起方)
     *
     * @param staffId 员工id
     * @param name    合同名称
     * @param signer  合同签订者
     * @param amount  合同奖励金额
     * @return
     */
    @GlobalTransactional(rollbackFor = Exception.class, name = "signContractTcc")
    public void signContractTcc(String staffId, String name, String signer, BigDecimal amount) {
        log.info("[签订合同-TCC模式]开始执行,staffId:{},name:{},signer:{},amount:{}", staffId, name, signer, amount);
        // 合同业务
        contractTccService.signContractPrepare(name, signer);

        // 员工业务
        BaseResponse staffBaseResponse = staffFeignClient.awardForSignContractTcc(staffId);
        log.info("[给用户发放签订合同成功奖励]返回结果:{}", staffBaseResponse);
        if (!staffBaseResponse.isSuccess()) {
            throw new RuntimeException("[给用户发放签订合同成功奖励]失败");
        }

        // 账户业务
        BaseResponse accountBaseResponse = accountFeignClient.addAccountAmountTcc(staffId, amount);
        log.info("[给账户增加余额]返回结果:{}", accountBaseResponse);
        if (!accountBaseResponse.isSuccess()) {
            throw new RuntimeException("[给账户增加余额]失败");
        }

        log.info("[签订合同-TCC模式]执行结束,staffId:{},name:{},signer:{},amount:{}", staffId, name, signer, amount);
    }
}

3.合同服务service接口及实现类

@LocalTCC
public interface ContractTccService {

    /**
     * 签订合同-TCC-prepare方法
     *
     * @param name    合同名称
     * @param signer  合同签订者
     * @return
     */
    @TwoPhaseBusinessAction(name = "signContractPrepare", commitMethod = "signContractCommit", rollbackMethod = "signContractRollBack")
    void signContractPrepare(
            @BusinessActionContextParameter(paramName = "name") String name,
            @BusinessActionContextParameter(paramName = "signer") String signer);

    /**
     * 签订合同-TCC-commit方法
     *
     * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
     * @return
     */
    boolean signContractCommit(BusinessActionContext context);

    /**
     * 签订合同-TCC-rollback方法(与prepare方法相反的操作)
     *
     * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
     * @return
     */
    boolean signContractRollBack(BusinessActionContext context);

}
@Slf4j
@Service
public class ContractTccServiceImpl implements ContractTccService {

    @Autowired
    private ContractMapper contractMapper;

    /**
     * 签订合同-TCC-prepare方法
     *
     * @param name    合同名称
     * @param signer  合同签订者
     * @return
     */
    @Override
    public void signContractPrepare(String name, String signer) {
        log.info("--------->XID =" + RootContext.getXID() + " 合同服务prepare操作准备执行!");
        // 全局事务id
        String xid = RootContext.getXID();
        // 幂等性校验
        Contract contract = contractMapper.selectOne(Wrappers.<Contract>lambdaQuery()
                .eq(Contract::getName, name).eq(Contract::getSigner, signer).eq(Contract::getRemark, xid));
        if (contract != null) {
            return;
        }

        // 添加合同记录
        contract = new Contract();
        contract.setName(name);
        contract.setSigner(signer);
        contract.setIsDeleted(0);
        contract.setCreateTime(new Date());
        // 备注字段存储全局事务id
        contract.setRemark(xid);
        contractMapper.insert(contract);

        log.info("--------->XID =" + RootContext.getXID() + " 合同服务prepare成功!");
    }

    /**
     * 签订合同-TCC-commit方法
     *
     * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
     * @return
     */
    @Override
    public boolean signContractCommit(BusinessActionContext context) {
        log.info("--------->XID =" + RootContext.getXID() + " 合同服务commit操作准备执行!");
        // 合同名称
        String name = context.getActionContext("name").toString();
        // 合同签订者
        String signer = context.getActionContext("signer").toString();
        // 全局事务id
        String xid = context.getXid();
        Contract contract = contractMapper.selectOne(Wrappers.<Contract>lambdaQuery()
                .eq(Contract::getName, name).eq(Contract::getSigner, signer).eq(Contract::getRemark, xid));
        if (contract != null) {
            log.info("--------->XID =" + context.getXid() + " 合同服务commit成功!");
        }

        return contract != null;
    }

    /**
     * 签订合同-TCC-rollback方法(与prepare方法相反的操作)
     *
     * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
     * @return
     */
    @Override
    public boolean signContractRollBack(BusinessActionContext context) {
        log.info("--------->XID =" + RootContext.getXID() + " 合同服务rollback操作准备执行!");
        // 合同名称
        String name = context.getActionContext("name").toString();
        // 合同签订者
        String signer = context.getActionContext("signer").toString();
        // 全局事务id
        String xid = context.getXid();

        // 幂等性校验+空回滚校验
        LambdaQueryWrapper<Contract> queryWrapper = Wrappers.<Contract>lambdaQuery()
                .eq(Contract::getName, name).eq(Contract::getSigner, signer).eq(Contract::getRemark, xid);
        Contract contract = contractMapper.selectOne(queryWrapper);
        if (contract == null) {
            return true;
        }

        // 删除合同记录
        contractMapper.delete(queryWrapper);

        log.info("--------->XID =" + context.getXid() + " 合同服务rollback成功!");
        return true;
    }

}

4.员工服务FeignClient

@FeignClient(value = "oa-user-service", path = "/user/staff")
public interface StaffFeignClient {

    /**
     * 给用户发放签订合同成功奖励-TCC模式
     *
     * @param staffId 员工id
     * @return
     */
    @PostMapping("/awardForSignContractTcc")
    BaseResponse awardForSignContractTcc(@RequestParam String staffId);
    
}

5.员工服务controller

@Slf4j
@Api(tags = "员工信息")
@RestController
@RequestMapping("/user/staff")
public class StaffController extends BaseController<Staff> {

    @Autowired
    private StaffTccService staffTccService;

    /**
     * 给用户发放签订合同成功奖励-TCC模式
     *
     * @param staffId 员工id
     * @return
     */
    @ApiOperation(value = "给用户发放签订合同成功奖励-TCC模式")
    @PostMapping("/awardForSignContractTcc")
    public BaseResponse awardForSignContractTcc(@RequestParam("staffId") String staffId) {
        staffTccService.awardForSignContractPrepare(staffId);
        return BaseResponse.success();
    }

}

6.员工服务service接口及实现类

@Service
@LocalTCC
public interface StaffTccService {

    /**
     * 给用户发放签订合同成功奖励-TCC-prepare方法
     *
     * @param staffId 员工id
     * @return
     */
    @TwoPhaseBusinessAction(name = "awardForSignContractPrepare", commitMethod = "awardForSignContractCommit", rollbackMethod = "awardForSignContractRollBack")
    void awardForSignContractPrepare(@BusinessActionContextParameter(paramName = "staffId") String staffId);

    /**
     * 给用户发放签订合同成功奖励-TCC-commit方法
     *
     * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
     * @return
     */
    boolean awardForSignContractCommit(BusinessActionContext context);

    /**
     * 给用户发放签订合同成功奖励-TCC-rollback方法(与prepare方法相反的操作)
     *
     * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
     * @return
     */
    boolean awardForSignContractRollBack(BusinessActionContext context);
}
@Slf4j
@Service
public class StaffTccServiceImpl implements StaffTccService {

    @Autowired
    private StaffMapper staffMapper;


    /**
     * 给用户发放签订合同成功奖励-TCC-prepare方法
     *
     * @param staffId 员工id
     * @return
     */
    @Override
    public void awardForSignContractPrepare(String staffId) {
        log.info("--------->XID =" + RootContext.getXID() + " 员工服务prepare操作准备执行!");

        Staff staff = staffMapper.selectById(staffId);
        if (staff == null) {
            throw new RuntimeException("员工不存在");
        }

        // 幂等性校验
        if (StringUtil.isNotBlank(staff.getDescription())) {
            return;
        }

        // 更新员工描述为已签订合同
        staff.setDescription("已成功签订合同");
        staff.setUpdateTime(new Date());
        staffMapper.updateById(staff);

        log.info("--------->XID =" + RootContext.getXID() + " 员工服务prepare成功!");
    }

    /**
     * 给用户发放签订合同成功奖励-TCC-commit方法
     *
     * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
     * @return
     */
    @Override
    public boolean awardForSignContractCommit(BusinessActionContext context) {
        log.info("--------->XID =" + RootContext.getXID() + " 员工服务commit操作准备执行!");
        // 员工名称
        String staffId = context.getActionContext("staffId").toString();
        Staff staff = staffMapper.selectById(staffId);
        if (staff == null) {
            throw new RuntimeException("员工不存在");
        }

        if (StringUtil.isNotBlank(staff.getDescription())) {
            log.info("--------->XID =" + RootContext.getXID() + " 员工服务commit成功!");
        }

        return StringUtil.isNotBlank(staff.getDescription());
    }

    /**
     * 给用户发放签订合同成功奖励-TCC-rollback方法(与prepare方法相反的操作)
     *
     * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
     * @return
     */
    @Override
    public boolean awardForSignContractRollBack(BusinessActionContext context) {
        log.info("--------->XID =" + RootContext.getXID() + " 员工服务rollback操作准备执行!");
        // 员工名称
        String staffId = context.getActionContext("staffId").toString();
        Staff staff = staffMapper.selectById(staffId);
        if (staff == null) {
            throw new RuntimeException("员工不存在");
        }
        // 幂等性校验+空回滚校验
        if (StringUtil.isBlank(staff.getDescription())) {
            return true;
        }

        // 更新员工描述为""
        staff.setDescription("");
        staff.setUpdateTime(new Date());
        staffMapper.updateById(staff);

        log.info("--------->XID =" + RootContext.getXID() + " 员工服务rollback成功!");
        return true;
    }

}

7.账户服务FeignClient

@FeignClient(value = "hos-account-service", path = "/account")
public interface AccountFeignClient {

    /**
     * 添加账户余额-TCC模式
     *
     * @param staffId   员工id
     * @param addAmount 增加金额
     */
    @PostMapping("/addAccountAmountTcc")
    BaseResponse addAccountAmountTcc(@RequestParam String staffId, @RequestParam BigDecimal addAmount);
}

8.账户服务controller

@Slf4j
@RestController
@RequestMapping("/account")
@Api(value = "账户管理")
public class AccountController extends BaseController<Account> {

    @Autowired
    private AccountTccService accountTccService;

    /**
     * 添加账户余额-TCC模式
     *
     * @param staffId   员工id
     * @param addAmount 增加金额
     * @return          操作账户
     */
    @PostMapping("/addAccountAmountTcc")
    public BaseResponse addAccountAmountTcc(@RequestParam("staffId") String staffId, @RequestParam("addAmount") BigDecimal addAmount) {
        accountTccService.addAccountAmountPrepare(staffId, addAmount);
        return BaseResponse.success();
    }

}

9.账户服务service接口及实现类

@LocalTCC
public interface AccountTccService {

    /**
     * 给账户增加余额-TCC-prepare方法
     *
     * @param staffId   员工id
     * @param addAmount 增加的金额
     * @return
     */
    @TwoPhaseBusinessAction(name = "addAccountAmountPrepare", commitMethod = "addAccountAmountCommit", rollbackMethod = "addAccountAmountRollBack")
    void addAccountAmountPrepare(
            @BusinessActionContextParameter(paramName = "staffId") String staffId,
            @BusinessActionContextParameter(paramName = "addAmount") BigDecimal addAmount);

    /**
     * 给账户增加余额-TCC-commit方法
     *
     * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
     * @return
     */
    boolean addAccountAmountCommit(BusinessActionContext context);

    /**
     * 给账户增加余额-TCC-rollback方法(与prepare方法相反的操作)
     *
     * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
     * @return
     */
    boolean addAccountAmountRollBack(BusinessActionContext context);
}
@Slf4j
@Service
public class AccountTccServiceImpl implements AccountTccService {

    @Autowired
    private AccountMapper accountMapper;


    /**
     * 给账户增加余额-TCC-prepare方法
     *
     * @param staffId   员工id
     * @param addAmount 增加的金额
     * @return
     */
    @Override
    public void addAccountAmountPrepare(String staffId, BigDecimal addAmount) {
        log.info("--------->XID =" + RootContext.getXID() + " 账户服务prepare操作准备执行!");
        Account account = accountMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getStaffId, staffId));
        if (account == null) {
            throw new RuntimeException("待操作账户不存在");
        }

        //幂等校验,账户状态为正常状态
        if (account.getStatus() == 1) {
            // 冻结账户
            account.setStatus(2);
            // 给账户增加金额
            account.setAmount(account.getAmount().add(addAmount));
            account.setUpdateTime(new Date());
            accountMapper.updateById(account);
        }

        //todo 模拟失败
        //int i = 10/0;

        log.info("--------->XID =" + RootContext.getXID() + " 账户服务prepare成功!");
    }

    /**
     * 给账户增加余额-TCC-commit方法
     *
     * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
     * @return
     */
    @Override
    public boolean addAccountAmountCommit(BusinessActionContext context) {
        log.info("--------->XID =" + RootContext.getXID() + " 账户服务commit操作准备执行!");
        // 员工id
        String staffId = context.getActionContext("staffId").toString();
        Account account = accountMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getStaffId, staffId));
        if (account == null) {
            throw new RuntimeException("待操作账户不存在");
        }
        // 幂等校验,判断账户如果处在非冻结状态(当前方法重复执行)则直接返回
        if (account.getStatus() != 2) {
            return true;
        }

        // 更新账户状态为正常
        account.setStatus(1);
        account.setUpdateTime(new Date());
        accountMapper.updateById(account);



        log.info("--------->XID =" + context.getXid() + " 账户服务commit成功!");
        return true;
    }

    /**
     * 给账户增加余额-TCC-rollback方法(与prepare方法相反的操作)
     *
     * @param context 可从该参数中获取到prepare方法中的@BusinessActionContextParameter中的参数
     * @return
     */
    @Override
    public boolean addAccountAmountRollBack(BusinessActionContext context) {
        log.info("--------->XID =" + RootContext.getXID() + " 账户服务rollback操作准备执行!");
        // 员工id
        String staffId = context.getActionContext("staffId").toString();
        // 奖励金额
        BigDecimal addAmount = new BigDecimal(context.getActionContext("addAmount").toString());

        Account account = accountMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getStaffId, staffId));
        if (account == null) {
            throw new RuntimeException("待操作账户不存在");
        }
        // 幂等+空回滚校验,判断账户如果处在非冻结状态(没有执行prepare方法或当前方法重复执行)则直接返回
        if (account.getStatus() != 2) {
            return true;
        }

        // 更新账户状态为正常
        account.setStatus(1);
        // 给账户减去奖励金额
        account.setAmount(account.getAmount().subtract(addAmount));
        account.setUpdateTime(new Date());
        accountMapper.updateById(account);

        log.info("--------->XID =" + context.getXid() + " 账户服务rollback成功!");
        return true;
    }

}

10.启动三个业务服务及网关服务,正常请求接口

http://localhost:7100/contract/contract/signContractTcc?staffId=e2ab1960cd737111154d46878b5bffdd&name=2022年北京大厦建筑合同&signer=黎明&amount=10000.50
method:POST

接口响应结果

{
    "code": "200",
    "msg": "success",
    "data": null,
    "success": true
}

数据库结果,数据已经正确入库

seata_15

seata日志显示全局事务提交成功

seata_16

11.模拟失败情况

修改AccountTccServiceImpl,放开//int i = 10/0前的注释,清空之前测试数据,再次发起请求。

接口响应结果

{
    "code": "99001009",
    "msg": "业务处理异常",
    "data": null,
    "success": false
}

seata日志显示全局回滚成功

seata_17

业务服务日志

seata_18

数据库效果

seata_19 数据没有入库,进行了回滚,至此TCC模式示例演示完成。

# XA模式

# XA模式介绍

  • XA介绍

    XA 规范是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准。

    XA 规范描述了全局的事务管理器与局部的资源管理器之间的接口。 XA规范的目的是允许多个资源(如数据库,应用服务器,消息队列等)在同一事务中访问,这样可以使 ACID 属性跨越应用程序而保持有效。

    XA 规范使用两阶段提交(2PC,Two-Phase Commit)来保证所有资源同时提交或回滚任何特定的事务。

  • 运行机制

    在 Seata 定义的分布式事务框架内,利用事务资源(数据库、消息服务等)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种事务模式。执行阶段业务 SQL 操作放在 XA 分支中进行,完成后执行 XA prepare,由资源对 XA 协议的支持来保证可回滚和持久化;完成阶段执行 XA 分支的提交和回滚操作。

    seata_xa_1

  • 适用场景:

    适用于对数据一致性要求较高的场景,要求数据库能够支持 XA 协议(Mysql和Oracle都支持)。

  • 优势:

    业务无侵入,使用简单(使用注解即可);对数据一致性有较高的保障。

  • 劣势:

    数据在整个事务处理过程结束前都被锁定,锁的粒度大导致可能锁定更多无辜数据,并且由于协议阻塞容易产生死锁,性能较差。

# 服务端配置

配置流程同AT模式

# 客户端配置

1.各服务添加seata相关配置

#seata相关配置
seata:
  #是否开启spring-boot自动装配,默认true
  enabled: true
  #是否开启数据源自动代理,默认true,如果使用多租户动态数据源功能则需关闭此按钮
  enableAutoDataSourceProxy: true
  #数据源代理模式 可选值AT、XA,默认为AT
  data-source-proxy-mode: XA
  #事务分组配置项,程序会拼接[service.vgroupMapping.事务分组配置项]这样一个完整配置去配置中心上的seata-server
  #配置文件seataServer.properties中查找该完整配置的值,其值就是TC集群的名称,然后就可以根据集群名称获取真实的
  #TC服务列表。所以要求该配置项必须与seataServer.properties中[service.vgroupMapping.xxx=集群名称]的xxx相同
  tx-service-group: my_test_tx_group
  registry:
    type: nacos
    nacos:
      #seata服务端应用名称,与registry.conf中保持一致
      application: seata-server
      #seata服务端在注册中心上的地址,与registry.conf中保持一致
      server-addr: 114.242.246.250:8040
      #seata服务端在注册中心上的分组,与registry.conf中保持一致
      group : SEATA_GROUP
      #seata服务端在注册中心上的命名空间,与registry.conf中保持一致
      namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
      #seata服务端所属注册中心用户名,与registry.conf中保持一致
      username: nacos
      #seata服务端所属注册中心密码,与registry.conf中保持一致
      password: nacos
      #TC集群名称,与registry.conf中保持一致
      cluster: default
  config:
    type: nacos
    nacos:
      #seata服务端在配置中心的地址,与registry.conf中保持一致
      server-addr: 114.242.246.250:8040
      #seata服务端在配置中心的分组,与registry.conf中保持一致
      group : SEATA_GROUP
      #seata服务端在配置中心的命名空间,与registry.conf中保持一致
      namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
      #seata服务端在配置中心的配置文件名称,与registry.conf中保持一致
      dataId: seataServer.properties
      #seata服务端所属配置中心用户名,与registry.conf中保持一致
      username: nacos
      #seata服务端所属配置中心密码,与registry.conf中保持一致
      password: nacos

2.关闭 Ribbon 的重试机制

# v4.3 之前版本需设置,之后 ribbon 已经移除,无需设置
ribbon:
  MaxAutoRetriesNextServer: 0

为什么要关闭服务调用的重试?远程业务调用失败有两种可能:(1)远程业务执行失败 (2)远程业务执行成功,网络失败。对于第2种事务场景可能会进行重试,从而导致某个业务执行两次。如果业务上能够控制某个事务接口的幂等性,则不用关闭重试。

3.添加seata依赖

<dependency>
    <groupId>com.mediway.hos</groupId>
    <artifactId>hos-framework-seata-starter</artifactId>
</dependency>

4.发起方服务的方法上添加@GlobalTransactional注解。

# 注意事项

  • 注意事项参考AT模式

# XA模式使用示例

代码示例同AT模式一致

1.启动三个业务服务及网关服务,正常请求接口

http://localhost:7100/contract/contract/signContract?staffId=e2ab1960cd737111154d46878b5bccbb&name=20亿超级订单合同&signer=丽丽&amount=300.00
method:POST

接口响应结果

{
    "code": "200",
    "msg": "success",
    "data": {
        "id": "723dd2c05959a9011265c32f2c3952b2",
        "createTime": "2022-06-06 16:45:14",
        "updateTime": "2022-06-06 16:45:14",
        "current": 0,
        "size": 0,
        "name": "20亿超级订单合同",
        "signer": "丽丽",
        "tenantId": null,
        "isDeleted": 0,
        "remark": null
    },
    "success": true
}

数据库结果,数据已经正确入库

seata_20

seata日志显示全局事务提交成功

seata_21

2.模拟失败情况

修改AccountServiceImpl,放开//int i = 10/0前的注释,清空之前测试数据,再次发起请求。

接口响应结果

{
    "code": "99001009",
    "msg": "业务处理异常",
    "data": null,
    "success": false
}

seata日志显示全局回滚成功

seata_22

业务服务日志

seata_23

数据库效果

seata_24

数据没有入库,进行了回滚,至此XA模式示例演示完成。

# 与多数据源结合使用

# 概述

seata可以和多数据源结合使用,能够对租户绑定的数据源所产生的分布式事务进行控制。大体流程为当请求到达服务时,根据当前租户绑定的数据源进行切换,seata对切换后的数据源进行代理,实现分布式事务的控制。需要注意的是只有AT模式和XA模式支持与多数据源结合使用,TCC模式结合多数据源使用时应用程序会直接报错,这一点需要注意。

# 服务端配置

配置流程同普通AT模式一致

# 客户端配置

1.在各个微服务的主数据源(yml配置文件配置的数据源)中创建租户表和数据源表,并将租户绑定各自数据源

  • Mysql数据库
-- 租户表
CREATE TABLE `tenant` (
  `id` CHAR(32) NOT NULL COMMENT '租户ID',
  `tenant_name` VARCHAR (200) DEFAULT NULL COMMENT '租户名称',
  `datasource_id` CHAR(32) DEFAULT NULL COMMENT '数据源主键',
  `create_time` DATETIME DEFAULT NULL COMMENT '创建时间',
  `update_time` DATETIME DEFAULT NULL COMMENT '更新时间',
  `is_deleted` SMALLINT DEFAULT NULL COMMENT '是否删除',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = INNODB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = DYNAMIC;

-- 数据源表
CREATE TABLE `hos_tenant_datasource` (
  `id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '主键',
  `name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '名称',
  `driver_class` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '驱动类',
  `url` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '连接地址',
  `username` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '用户名',
  `password` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '密码',
  `remark` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '备注',
  `create_by` char(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '创建人主键',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_by` char(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '更新人主键',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  `is_deleted` smallint DEFAULT NULL COMMENT '是否已删除',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='数据源配置表';

  • Oracle数据库
-- 租户表
CREATE TABLE "TENANT" (
  "id" char(32) NOT NULL,
  "tenant_name" varchar2 (200),
  "datasource_id" char(32),
  "create_time" date,
  "update_time" date,
  "is_deleted" number(1),
  PRIMARY KEY ("id")
);

-- 数据源表
CREATE TABLE "HOS_TENANT_DATASOURCE" (
  "id" char(32) NOT NULL,
  "name" varchar2(200),
  "driver_class" varchar2(200),
  "url" varchar2(500),
  "username" varchar2(200),
  "password" varchar2(200),
  "remark" varchar2(5000),
  "create_by" char(32),
  "create_time" date,
  "update_by" char(32),
  "update_time" date ,
  "is_deleted" number(1),
  PRIMARY KEY ("id")
);

将租户表中的datasource_id指定不同的数据源表的id,这样即完成了租户与数据源的绑定。

seata_25

2.在租户绑定的数据源中初始化seata的undo_log脚本(仅AT模式需要)

  • Mysql数据库
-- MySql
CREATE TABLE `undo_log` (
  `branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  `log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='AT transaction mode undo table';
  • Oracle数据库
CREATE TABLE undo_log
(
    id            NUMBER(19)    NOT NULL,
    branch_id     NUMBER(19)    NOT NULL,
    xid           VARCHAR2(128) NOT NULL,
    context       VARCHAR2(128) NOT NULL,
    rollback_info BLOB          NOT NULL,
    log_status    NUMBER(10)    NOT NULL,
    log_created   TIMESTAMP(0)  NOT NULL,
    log_modified  TIMESTAMP(0)  NOT NULL,
    PRIMARY KEY (id),
    CONSTRAINT ux_undo_log UNIQUE (xid, branch_id)
);

COMMENT ON TABLE undo_log IS 'AT transaction mode undo table';

-- Generate ID using sequence and trigger
CREATE SEQUENCE UNDO_LOG_SEQ START WITH 1 INCREMENT BY 1;

3.各服务添加seata与多数据源相关配置

#多数据源相关配置
spring:
  datasource:
    dynamic:
      #是否开启seata支持
      seata: true
      #使用seata事务模式,支持AT/XA,默认为AT模式
      seata-mode: AT

#多租户相关配置
framework:
  multi-tenant:
    #多租户开关,默认关闭
    enable: true
    #租户条件列名,默认为tenant_id
    column: tenant_id
    #忽略表名,默认为空,多个以中划线分隔
    ignore-table:
      #此处需要忽略数据源表
      - hos_tenant_datasource
      #- contract
      #- organization 
    #动态数据源开关,默认关闭
    dynamic-datasource: true

#seata相关配置
seata:
  #是否开启spring-boot自动装配,默认true
  enabled: true
  #是否开启数据源自动代理,默认true,如果使用多租户动态数据源功能则需关闭此按钮
  enableAutoDataSourceProxy: false
  #数据源代理模式 可选值AT、XA(与多数据源结合使用也仅支持这两种模式),默认为AT
  data-source-proxy-mode: AT
  #事务分组配置项,程序会拼接[service.vgroupMapping.事务分组配置项]这样一个完整配置去配置中心上的seata-server
  #配置文件seataServer.properties中查找该完整配置的值,其值就是TC集群的名称,然后就可以根据集群名称获取真实的
  #TC服务列表。所以要求该配置项必须与seataServer.properties中[service.vgroupMapping.xxx=集群名称]的xxx相同
  tx-service-group: my_test_tx_group
  registry:
    type: nacos
    nacos:
      #seata服务端应用名称,与registry.conf中保持一致
      application: seata-server
      #seata服务端在注册中心上的地址,与registry.conf中保持一致
      server-addr: 114.242.246.250:8040
      #seata服务端在注册中心上的分组,与registry.conf中保持一致
      group : SEATA_GROUP
      #seata服务端在注册中心上的命名空间,与registry.conf中保持一致
      namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
      #seata服务端所属注册中心用户名,与registry.conf中保持一致
      username: nacos
      #seata服务端所属注册中心密码,与registry.conf中保持一致
      password: nacos
      #TC集群名称,与registry.conf中保持一致
      cluster: default
  config:
    type: nacos
    nacos:
      #seata服务端在配置中心的地址,与registry.conf中保持一致
      server-addr: 114.242.246.250:8040
      #seata服务端在配置中心的分组,与registry.conf中保持一致
      group : SEATA_GROUP
      #seata服务端在配置中心的命名空间,与registry.conf中保持一致
      namespace: 928cc136-e6e5-4189-a164-9693bc5618c1
      #seata服务端在配置中心的配置文件名称,与registry.conf中保持一致
      dataId: seataServer.properties
      #seata服务端所属配置中心用户名,与registry.conf中保持一致
      username: nacos
      #seata服务端所属配置中心密码,与registry.conf中保持一致
      password: nacos

# 关闭ribbon失败重试,v4.3 之前版本需设置,之后 ribbon 已经移除,无需设置
ribbon:
  MaxAutoRetriesNextServer: 0

4.各服务添加seata与多数据源依赖

<dependency>
    <groupId>com.mediway.hos</groupId>
    <artifactId>hos-framework-seata-starter</artifactId>
</dependency>

<dependency>
    <groupId>com.mediway.hos</groupId>
    <artifactId>hos-framework-tenant-starter</artifactId>
</dependency>

下面步骤5、6、7、8皆是多租户模块hos-framework-tenant-starter所需配置,如之前服务已引入过该模块并对步骤5、6、7、8进行过实现,则此处无需再次配置。

5.定义租户ID过滤器(内容仅供参考)

@WebFilter(urlPatterns = "/*")
public class TenantIdFilter implements Filter {

    @Override
    public void init(FilterConfig filterConfig) throws ServletException {

    }

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest req = (HttpServletRequest) servletRequest;
        String tenantId = req.getHeader("tenantId");
        if (StringUtil.isNotBlank(tenantId)) {
            TenantUtil.setTenantId(tenantId);
        }
        filterChain.doFilter(servletRequest, servletResponse);

    }

    @Override
    public void destroy() {

    }
}

应用程序需要事先定义好过滤器将租户id从请求header中取出放到上下文中,在单体版只需拦截单个应用放入一次即可,而微服务版每个服务都要将租户id放入上下文,所以需要定义公共的过滤器被所有微服务引用。

6.各服务启动类添加注解@ServletComponentScan使定义的过滤器生效

7.定义租户实体类Tenant(内容仅供参考,但是必须含有数据源ID字段)

@Data
public class Tenant {

    private String id;

    private String tenantName;

    private String datasourceId;

    private Date createTime;

    private String updateTime;

    private Integer isDeleted;
}

8.定义DataSourceIdProvider(内容仅供参考)

@Component
public class DemoDataSourceIdProvider implements DataSourceIdProvider {

    @Autowired
    private JdbcTemplate jdbcTemplate;


    /**
     * 根据租户id获取数据源id
     *
     * @param tenantId 租户id
     * @return 数据源id
     */
    @Override
    public String getDataSourceId(String tenantId) throws Exception {
        //查询sql
        String queryDataSourceByIdSql = "SELECT datasource_id AS datasourceId FROM tenant WHERE id = ?";
        List<Tenant> dataSources = this.jdbcTemplate.query(queryDataSourceByIdSql, new String[]{tenantId}, new BeanPropertyRowMapper(Tenant.class));
        if (CollectionUtil.isEmpty(dataSources)) {
            return null;
        }
        return dataSources.get(0).getDatasourceId();
    }
}

9.发起方服务的方法上添加@GlobalTransactional注解

# 注意事项

  • 各个微服务的主数据源(配置在nacos中的数据源)应该保持一致,数据源表hos_tenant_datasource及租户表tenant都存于主数据源中。

  • 异常需要能够保证被发起方服务感知到,如果发生异常但中途被捕获截断则 seata 无法发起回滚操作。

# 使用示例

我们以租户请求合同签订接口为例演示seata的AT模式结合多数据源如何使用。

1.数据准备

  • 在主数据源中执行下述sql,分别新建两个数据源和两个租户,并将租户1绑定数据源1,租户2绑定数据源2。
INSERT INTO `hos_tenant_datasource` (`id`, `name`, `driver_class`, `url`, `username`, `password`, `remark`, `create_by`, `create_time`, `update_by`, `update_time`, `is_deleted`) VALUES('1','测试数据源Mysql001','com.mysql.cj.jdbc.Driver','jdbc:mysql://localhost:3306/demo_dynamic?useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT%2B8','root','123456',NULL,'admin','2023-05-09 17:07:59','admin','2023-05-09 17:08:01','0');
INSERT INTO `hos_tenant_datasource` (`id`, `name`, `driver_class`, `url`, `username`, `password`, `remark`, `create_by`, `create_time`, `update_by`, `update_time`, `is_deleted`) VALUES('2','测试数据源Oracle002','oracle.jdbc.driver.OracleDriver','jdbc:oracle:thin:@114.242.246.250:8521/XE','HOS_OPEN_DEV','wvB27Mdkw',NULL,'admin','2023-05-09 17:11:07','admin','2023-05-09 17:11:12','0');

insert into `tenant` (`id`, `tenant_name`, `datasource_id`, `create_time`, `update_time`, `is_deleted`) values('111111','测试租户1','1','2023-05-09 17:01:16','2023-05-09 17:01:18','0');
insert into `tenant` (`id`, `tenant_name`, `datasource_id`, `create_time`, `update_time`, `is_deleted`) values('222222','测试租户2','2','2023-05-09 17:02:48','2023-05-09 17:02:50','0');
  • 在数据源1、数据源2中分别新建undo_log表(脚本见客户端配置步骤2),结果如下

seata_27

seata_28

  • 在数据源1中准备租户1的业务数据(MySql数据库)

合同表

CREATE TABLE `contract` (
  `id` varchar(32) NOT NULL COMMENT 'ID',
  `name` varchar(32) DEFAULT NULL COMMENT '名称',
  `signer` varchar(32) DEFAULT NULL COMMENT '合同签订人',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT NULL COMMENT '更新时间',
  `is_deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否删除',
  `tenant_id` varchar(32) DEFAULT NULL COMMENT '租户ID',
  `remark` text COMMENT '备注',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='合同表';

员工信息表

CREATE TABLE `staff` (
  `id` varchar(32) NOT NULL COMMENT '主键',
  `name` varchar(500) DEFAULT NULL COMMENT '姓名',
  `gender` varchar(2) DEFAULT NULL COMMENT '性别',
  `age` int DEFAULT NULL COMMENT '年龄',
  `email` varchar(60) DEFAULT NULL COMMENT '邮箱',
  `phone` varchar(500) DEFAULT NULL COMMENT '手机号',
  `tenant_id` varchar(32) DEFAULT NULL COMMENT '租户id',
  `org_id` varchar(32) DEFAULT NULL COMMENT '部门id',
  `description` varchar(255) DEFAULT NULL COMMENT '描述',
  `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  `is_deleted` int DEFAULT NULL,
  `business_key` varchar(64) DEFAULT NULL COMMENT '业务流水号',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='员工信息表';

账户表

CREATE TABLE `account` (
  `id` varchar(32) NOT NULL COMMENT 'ID',
  `staff_id` varchar(32) DEFAULT NULL COMMENT '员工id',
  `amount` decimal(20,2) NOT NULL DEFAULT '0.00' COMMENT '账户余额',
  `status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '状态 1-正常  2-冻结  3-已补偿',
  `business_key` varchar(64) DEFAULT NULL COMMENT '业务流水号',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT NULL COMMENT '更新时间',
  `is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否删除',
  `tenant_id` varchar(32) DEFAULT NULL COMMENT '租户ID',
  `remark` text COMMENT '备注',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='账户表';

分别为员工信息表和账户表添加一条测试数据

insert into `staff` (`id`, `name`, `gender`, `age`, `email`, `phone`, `tenant_id`, `org_id`, `description`, `create_time`, `update_time`, `is_deleted`, `business_key`) values('e2ab1960cd737111154d46878b5bccbb','丽丽','女','28','lili@qq.com','13201122530','111111',NULL,'jingjing','2023-05-08 11:34:48','2023-05-08 11:34:51',NULL,NULL);
insert into `account` (`id`, `staff_id`, `amount`, `status`, `business_key`, `create_time`, `update_time`, `is_deleted`, `tenant_id`, `remark`) values('b43efca9f12b546df2a87adae09846c5','e2ab1960cd737111154d46878b5bccbb','0.00','1',NULL,'2023-05-08 11:34:48','2023-05-08 11:34:51','0','111111',NULL);

seata_29

  • 在数据源2中准备租户2的业务数据(Oracle数据库)

合同表

CREATE TABLE "CONTRACT" 
   (	"ID" VARCHAR2(32) NOT NULL ENABLE, 
	"NAME" VARCHAR2(32) DEFAULT NULL, 
	"SIGNER" VARCHAR2(32) DEFAULT NULL, 
	"CREATE_TIME" DATE DEFAULT CURRENT_DATE NOT NULL ENABLE, 
	"UPDATE_TIME" DATE DEFAULT CURRENT_DATE NOT NULL ENABLE, 
	"IS_DELETED" NUMBER(4,0) DEFAULT 0 NOT NULL ENABLE, 
	"TENANT_ID" VARCHAR2(32) DEFAULT NULL,
    "REMARK" VARCHAR2(2000) DEFAULT NULL,
    CHECK ("ID" IS NOT NULL) ENABLE, 
    PRIMARY KEY ("ID")
);

员工信息表

CREATE TABLE "STAFF" 
   (	"ID" VARCHAR2(32) NOT NULL ENABLE, 
	"NAME" VARCHAR2(60) DEFAULT NULL, 
	"GENDER" VARCHAR2(3) DEFAULT NULL, 
	"AGE" NUMBER(2,0) DEFAULT NULL, 
	"EMAIL" VARCHAR2(60) DEFAULT NULL, 
	"PHONE" VARCHAR2(60) DEFAULT NULL, 
	"ORG_ID" VARCHAR2(32) DEFAULT NULL, 
	"DESCRIPTION" VARCHAR2(255) DEFAULT NULL, 
	"CREATE_TIME" DATE DEFAULT CURRENT_DATE, 
	"UPDATE_TIME" DATE DEFAULT CURRENT_DATE, 
	"TENANT_ID" VARCHAR2(32) DEFAULT NULL, 
	"IS_DELETED" NUMBER(1,0) DEFAULT NULL, 
	 CHECK ("ID" IS NOT NULL) ENABLE, 
	 PRIMARY KEY ("ID")
);

账户表

CREATE TABLE "ACCOUNT" 
   (	"ID" VARCHAR2(32) NOT NULL ENABLE, 
	"STAFF_ID" VARCHAR2(32) DEFAULT NULL, 
	"AMOUNT" NUMBER(20,0) DEFAULT '0.00', 
	"STATUS" NUMBER(1,0) DEFAULT NULL, 
	"BUSINESS_KEY" VARCHAR2(64) DEFAULT NULL, 
	"CREATE_TIME" DATE DEFAULT CURRENT_DATE, 
	"UPDATE_TIME" DATE DEFAULT CURRENT_DATE, 
	"TENANT_ID" VARCHAR2(32) DEFAULT NULL, 
	"IS_DELETED" NUMBER(1,0) DEFAULT NULL, 
    "REMARK" VARCHAR2(2000) DEFAULT NULL,
	 CHECK ("ID" IS NOT NULL) ENABLE, 
	 PRIMARY KEY ("ID")
);

分别为员工信息表和账户表添加一条测试数据

INSERT INTO STAFF
(ID, NAME, GENDER, AGE, EMAIL, PHONE, ORG_ID, DESCRIPTION, CREATE_TIME, UPDATE_TIME, TENANT_ID, IS_DELETED)
VALUES('d5e1137514be473c873bf30e14325e0e', '黎明', '男', 22, 'liming@qq.com', '13555554432', '1', NULL, TIMESTAMP '2022-01-14 00:00:00.000000', TIMESTAMP '2022-02-16 00:00:00.000000', '222222', 0);
INSERT INTO ACCOUNT
(ID, STAFF_ID, AMOUNT, STATUS, BUSINESS_KEY, CREATE_TIME, UPDATE_TIME, TENANT_ID, IS_DELETED, REMARK)
VALUES('0accd19660a2bed1008cb6cfec635db6', 'd5e1137514be473c873bf30e14325e0e', 0, 1, NULL, TIMESTAMP '2022-01-14 00:00:00.000000', TIMESTAMP '2022-01-14 00:00:00.000000', '222222', 0, NULL);

seata_30

2.配置

按照客户端配置模块配置即可。

3.代码示例

AT模式使用示例代码一致。

4.启动三个业务服务及网关服务,我们以租户1的身份来正常请求接口

http://localhost:7100/contract/contract/signContract?staffId=e2ab1960cd737111154d46878b5bccbb&name=20亿超级订单合同&signer=丽丽&amount=300.00
header:tenantId-111111
method:POST

接口响应结果

{
    "code": "200",
    "msg": "success",
    "data": {
        "id": "4953ed1abbc55de0d3410916e5a7d720",
        "createTime": "2022-06-07 14:20:24",
        "updateTime": "2022-06-07 14:20:24",
        "current": 0,
        "size": 0,
        "name": "20亿超级订单合同",
        "signer": "丽丽",
        "tenantId": null,
        "isDeleted": 0,
        "remark": null
    },
    "success": true
}

租户1绑定的Mysql数据库demo_dynamic结果显示数据已经正确入库

seata_31

seata日志显示AT模式全局事务提交成功

seata_32

应用程序日志显示确实使用了数据源demo_dynamic,同时也开启了多租户的字段隔离机制

seata_33

我们换租户2来请求其绑定的数据源看下效果

http://localhost:7100/contract/contract/signContract?staffId=e2ab1960cd737111154d46878b5bffdd&name=粮食订单合同&signer=黎明&amount=12000.00
header:tenantId-222222
method:POST

接口响应结果

{
    "code": "200",
    "msg": "success",
    "data": {
        "id": "0d423ce4917b1782c36dd460168c0185",
        "createTime": "2022-06-07 14:35:52",
        "updateTime": "2022-06-07 14:35:52",
        "current": 0,
        "size": 0,
        "name": "粮食订单合同",
        "signer": "黎明",
        "tenantId": null,
        "isDeleted": 0,
        "remark": null
    },
    "success": true
}

租户2绑定的Oracle数据库HOS结果显示数据已经正确入库

seata_34

seata日志显示AT模式全局事务提交成功

seata_35

应用程序日志显示确实使用了数据源HOS,同时也开启了多租户的字段隔离机制

seata_36

我们发现seata的AT模式与多数据源结合使用时正向流程是没有问题的,下面演示下异常情况,验证事务是否会回滚。

5.模拟失败情况

修改AccountServiceImpl,放开//int i = 10/0前的注释,清空之前测试数据,再次以租户1发起请求

http://localhost:7100/contract/contract/signContract?staffId=e2ab1960cd737111154d46878b5bccbb&name=20亿超级订单合同&signer=丽丽&amount=300.00
header:tenantId-111111
method:POST

接口响应结果

{
    "code": "500",
    "msg": "[给用户发放签订合同成功奖励]失败",
    "data": null,
    "success": false
}

seata日志显示全局回滚成功

seata_38

业务服务日志

seata_39

数据库效果

seata_40

数据没有入库,进行了回滚,我们再以租户2请求一次

http://localhost:7100/contract/contract/signContract?staffId=e2ab1960cd737111154d46878b5bffdd&name=粮食订单合同&signer=黎明&amount=12000.00
header:tenantId-222222
method:POST

接口响应结果

{
    "code": "500",
    "msg": "[给用户发放签订合同成功奖励]失败",
    "data": null,
    "success": false
}

seata日志显示全局回滚成功

seata_41

业务服务日志

seata_42

数据库效果

seata_43

我们发现无论在Mysql数据库下还是Oracle数据库下,seata的AT模式都是支持与多数据源结合使用的,至于XA模式大家可以自己尝试下效果。