1. 基础概念介绍
1.1 什么是分布式事务
在微服务架构中,一个业务操作常常需要调用多个服务来完成。例如,在电商系统中下单时,需要同时操作订单服务和库存服务。这种跨服务的操作就需要分布式事务来保证数据一致性。
分布式事务面临以下挑战:
- 多个服务间如何协调
- 服务失败如何回滚
- 如何保证最终一致性
- 网络故障、服务不可用等问题处理
1.2 什么是DTM
DTM (Distributed Transaction Manager) 是一个开源的分布式事务管理器,它支持多种事务模式,如TCC、SAGA、XA等。DTM的特点包括:
- 简单易用的接口
- 支持多种事务模式
- 支持多种编程语言
- 高可用、高性能设计
DTM让开发者可以像使用本地事务一样简单地使用分布式事务,大大降低了开发难度。
1.3 分布式事务模式简介
分布式事务有多种实现模式,本教程将介绍两种常用模式:
SAGA模式:
将一个大事务拆分为多个小事务,每个小事务都有对应的补偿操作。如果某个小事务失败,则执行已完成事务的补偿操作,实现回滚。
TCC模式:
将一个事务分为三个阶段:Try(尝试)、Confirm(确认)、Cancel(取消)。Try阶段进行资源检查和预留,Confirm阶段确认执行,Cancel阶段在Try失败后进行取消操作。
这两种模式各有优缺点,适用于不同的业务场景。
1.4分布式事务模式对比
让我们了解一下DTM支持的几种事务模式:
事务模式 | 使用场景 | 优点 | 缺点 |
---|---|---|---|
SAGA | 长事务、低一致性要求 | 实现简单,可靠性高 | 最终一致性,补偿逻辑复杂 |
TCC | 对一致性要求高 | 强一致性,性能好 | 实现复杂,需要多个接口 |
XA | 标准事务处理 | 强一致性,使用简单 | 性能较差,锁定资源时间长 |
2阶段(2pc)消息 | 可靠消息投递 | 简单易用,适合异步 | 最终一致性 |
2. 项目概述
2.1 业务场景
本教程将实现一个简化的电商下单场景,涉及两个核心服务:
- 订单服务:负责创建和管理订单
- 库存服务:负责管理商品库存
业务流程如下:
- 用户下单时,需要创建订单并扣减库存
- 如果任一操作失败,需要回滚所有操作,保证数据一致性
我们将使用两种不同的分布式事务模式来实现这个场景:
- 使用TCC模式处理订单服务
- 使用SAGA模式处理库存服务
3. 环境准备
3.1 前置环境
首先,要先把以下环境部署好:
- Go(1.16+)
- Go-Zero:
- MySQL(5.7+):用于存储数据和DTM事务信息
- etcd:用于服务发现
3.2DTM服务部署
Docker快速启动
DTM 服务可以通过 Docker 快速启动:
# 拉取 DTM 镜像
docker pull yedf/dtm:latest# 启动 DTM 服务
docker run -it --name dtm -p 36789:36789 -p 36790:36790 -e STORE_DRIVER=mysql -e STORE_HOST=mysql:3306 -e STORE_USER=root -e STORE_PASSWORD=123456 -e STORE_PORT=3306 yedf/dtm:latest
注意:如果环境都是使用docker部署,网络要使用同一个。
本地部署
或者拉取源码启动:
git clone https://github.com/dtm-labs/dtm
源码拉取成功后,我们需要修改conf.sample.yml
文件,把服务注册到ETCD,把下面的设置取消注释:
MicroService: # gRPC/HTTP based microservice configDriver: 'dtm-driver-gozero' # name of the driver to handle register/discoverTarget: 'etcd://localhost:2379/dtmservice' # register dtm server to this urlEndPoint: 'localhost:36790'Store:Driver: 'mysql'Host: 'localhost'User: 'root'Password: '123456'Port: 3306DB: 'dtm'
然后使用这个配置启动服务
go run mian.go -c conf.sample.yml
部署完成后,可以通过浏览器访问 http://localhost:36789 ,来查看管理页面 ,这个页面可以很直观的查看到,分布式事务是否成功,以及使用的哪些事务模式。
4.使用SAGA模式
我们先从简单的SAGA模式开始,实现库存服务的扣减和补偿操作。
4.1 SAGA模式介绍
SAGA模式是一种分布式事务解决方案,将一个大事务拆分为多个小事务,每个小事务都有对应的补偿操作。
SAGA的流程如下:
- 执行一系列正向操作(如扣减库存)
- 如果任一操作失败,则执行已完成操作的补偿操作(如增加库存)
SAGA适用于可以通过补偿操作回滚的业务场景。
4.2 数据库设计
为了演示分布式事务,我们需要创建三个数据库:订单数据库、库存数据库和DTM数据库。
订单数据库:
CREATE DATABASE IF NOT EXISTS dtm_order;
USE dtm_order;CREATE TABLE `orders` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`user_id` bigint(20) NOT NULL COMMENT '用户ID',`product_id` bigint(20) NOT NULL COMMENT '产品ID',`amount` int(11) NOT NULL COMMENT '数量',`money` decimal(10,2) NOT NULL COMMENT '金额',`status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '状态: 0-创建 1-完成',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
库存数据库:
CREATE DATABASE IF NOT EXISTS dtm_stock;
USE dtm_stock;CREATE TABLE `stock` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`product_id` bigint(20) NOT NULL COMMENT '产品ID',`amount` int(11) NOT NULL COMMENT '库存数量',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uniq_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;-- 插入测试数据
INSERT INTO `stock` (`product_id`, `amount`) VALUES (1, 100);
INSERT INTO `stock` (`product_id`, `amount`) VALUES (2, 200);
DTM数据库:
CREATE DATABASE IF NOT EXISTS dtm;
DTM会自动创建所需的表,无需手动创建。
注意: 创建的数据库要和dtm配置文件中指定的数据库要一致
4.3 库存服务实现
首先,创建库存服务的Proto定义文件stock/stock.proto
:
syntax = "proto3";package stock;
option go_package="./stock";// 库存服务
service Stock {// 扣减库存rpc DeductStock(DeductStockReq) returns (DeductStockResp);// 补偿库存rpc CompensateStock(CompensateStockReq) returns (CompensateStockResp);
}// 扣减库存请求
message DeductStockReq {int64 productId = 1; // 产品IDint32 amount = 2; // 数量
}// 扣减库存响应
message DeductStockResp {bool success = 1; // 是否成功string message = 2; // 消息
}// 补偿库存请求
message CompensateStockReq {int64 productId = 1; // 产品IDint32 amount = 2; // 数量
}// 补偿库存响应
message CompensateStockResp {bool success = 1; // 是否成功string message = 2; // 消息
}
切换到 rpc/stock/目录下,使用goctl生成代码:
cd stock
goctl rpc protoc stock.proto --go_out=. --go-grpc_out=. --zrpc_out=.
修改库存服务的配置文件stock/etc/stock.yaml
:
Name: stock.rpc
ListenOn: 0.0.0.0:9002
Etcd:Hosts:- 127.0.0.1:2379Key: stock.rpcDB:DataSource: root:123456@tcp(localhost:3306)/dtm_stock?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghaiDTM: etcd://localhost:2379/dtmservice
修改配置结构stock/internal/config/config.go
:
package configimport ("github.com/zeromicro/go-zero/zrpc"
)type Config struct {zrpc.RpcServerConf//增加数据库字段DB struct {DataSource string}DTM string // DTM服务地址
}
修改服务上下文stock/internal/svc/service_context.go
:
package svcimport ("github.com/Mikaelemmmm/gozerodtm/stock/internal/config""github.com/Mikaelemmmm/gozerodtm/stock/internal/model""github.com/zeromicro/go-zero/core/stores/sqlx"
)type ServiceContext struct {Config config.ConfigStockModel model.StockModel // 库存模型
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config: c,StockModel: model.NewStockModel(sqlx.NewMysql(c.DB.DataSource)), // 初始化库存模型}
}
切换到 stock/model
目录下,使用goctl工具生成model代码:
goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_stock" -table="stock" -dir .
生成代码后,到stock/model/stockmodel.go
,增加DeductStock
和AddStock
方法:
package modelimport ("database/sql""fmt""time""github.com/zeromicro/go-zero/core/stores/sqlx"
)var _ StockModel = (*customStockModel)(nil)/*
....
*/type (// StockModel 是stock表的模型StockModel interface {// DeductStock 扣减库存DeductStock(productId int64, amount int32) error//AddStock 增加库存(用于补偿)AddStock(productId int64, amount int32) error}// DeductStock 扣减库存
func (m *customStockModel) DeductStock(productId int64, amount int32) error {query := fmt.Sprintf("update %s set amount = amount - ? where product_id = ? and amount >= ?", m.table)result, err := m.conn.Exec(query, amount, productId, amount)if err != nil {return err}affected, err := result.RowsAffected()if err != nil {return err}if affected == 0 {return fmt.Errorf("库存不足")}return nil
}// AddStock 增加库存(用于补偿)
func (m *customStockModel) AddStock(productId int64, amount int32) error {query := fmt.Sprintf("update %s set amount = amount + ? where product_id = ?", m.table)_, err := m.conn.Exec(query, amount, productId)return err
}
实现扣减库存逻辑stock/internal/logic/deduct_stock_logic.go
:
package logicimport ("context""github.com/dtm-labs/dtm/client/dtmcli""google.golang.org/grpc/codes""google.golang.org/grpc/status""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)
/*
....
*/
// 扣减库存
func (l *DeductStockLogic) DeductStock(in *stock.DeductStockReq) (*stock.DeductStockResp, error) {// todo: add your logic here and delete this lineerr := l.svcCtx.StockModel.DeductStock(in.ProductId, in.Amount)if err != nil {logx.Errorf("扣减库存失败: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) // 直接返回错误}return &stock.DeductStockResp{Success: true,Message: "扣减库存成功",}, nil
}
需要注意的是, 如果事务执行失败,返回的是RPC的状态码 codes.Aborted
和 事务结果dtmcli.ResultFailure
,如果直接返回错误的,DTM是不会进行事务补偿的。
return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
-
Aborted
:表示操作被中止,通常是由于并发问题,如序列检查失败、事务中止等。 -
ResultFailure
(dtmimp.ResultFailure
)表示事务或事务分支执行失败。当事务或事务分支在执行过程中遇到错误,例如数据库插入失败、服务调用超时等,会返回失败结果。这个结果会触发全局事务管理器进行相应的处理,如回滚事务。
后面会单独介绍下gRPC状态码和dtmcli的事务状态。
实现补偿库存逻辑stock/internal/logic/compensate_stock_logic.go
:
package logicimport ("context""fmt""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)/*
....
*/// 补偿库存
func (l *CompensateStockLogic) CompensateStock(in *stock.CompensateStockReq) (*stock.CompensateStockResp, error) {// todo: add your logic here and delete this linefmt.Println("收到补偿库存请求:", in.ProductId, in.Amount)err := l.svcCtx.StockModel.AddStock(in.ProductId, in.Amount)if err != nil {logx.Errorf("补偿库存失败: %v", err)return &stock.CompensateStockResp{Success: false,Message: err.Error(),}, nil}return &stock.CompensateStockResp{Success: true,Message: "补偿库存成功",}, nil
}
4.4 SAGA调用示例
创建test_saga.go
,演示如何使用SAGA模式调用库存服务:
package mainimport ("dtm_demo/rpc/stock/stock""log""github.com/dtm-labs/client/dtmgrpc""github.com/zeromicro/go-zero/core/discov""github.com/zeromicro/go-zero/zrpc"
)func main() {// 初始化客户端stockRpcConf := zrpc.RpcClientConf{Etcd: discov.EtcdConf{Hosts: []string{"localhost:2379"},Key: "stock.rpc",},}client := zrpc.MustNewClient(stockRpcConf)client.Conn()// DTM服务地址dtmServer := "etcd://localhost:2379/dtmservice"// 创建请求stockReq := &stock.DeductStockReq{ProductId: 1,Amount: 10, //设置库存}// 生成全局事务IDgid := dtmgrpc.MustGenGid(dtmServer)// 创建SAGA事务saga := dtmgrpc.NewSagaGrpc(dtmServer, gid)//stockRpcBusiServer 实际上就是自动获取库存服务地址 etcd://localhost:2379/stock.rpcstockRpcBusiServer, err := stockRpcConf.BuildTarget()if err != nil {log.Fatal(err)}saga.Add(stockRpcBusiServer+"/stock.Stock/DeductStock", stockRpcBusiServer+"/stock.Stock/CompensateStock", stockReq)// 提交事务err = saga.Submit()if err != nil {log.Fatalf("SAGA事务提交失败: %v", err)}
}
需要注意的是 /stock.Stock/DeductStock
和/stock.Stock/CompensateStock
这个路径是区分大小写的,一定要和goctl工具生成的一致
启动库存服务:
go run stock.go
运行测试程序:
go run test_saga.go
如果一切正常,库存将被成功扣减。如果服务出现异常(例如库存不足),DTM会自动执行补偿操作,增加库存。
当我把 Amount
设置为 100 ,库存服务提示库存不足,并自动进行补偿
5. 使用TCC模式
接下来,我们使用更复杂的TCC模式实现订单服务。
5.1 TCC模式介绍
TCC(Try-Confirm-Cancel)是一种更灵活的分布式事务模式,它将一个事务分为三个阶段:
- Try:资源检查和预留,但不真正执行业务操作
- Confirm:确认执行业务操作,只有当所有Try都成功时执行
- Cancel:取消操作,当任一Try失败时执行
TCC模式比SAGA模式更为灵活,但也更复杂,需要业务自行实现三个阶段的处理逻辑。
5.2 数据库设计
order
和stock
,仍然使用SAGA演示中的数据库表,在dtm_stock数据库中添加库存冻结表stock_tcc.sql
:
USE dtm_stock;
CREATE TABLE stock_tcc (id bigint(20) NOT NULL AUTO_INCREMENT,product_id bigint(20) NOT NULL COMMENT '产品ID',freeze_amount int(11) NOT NULL COMMENT '冻结数量',transaction_id varchar(64) NOT NULL COMMENT '事务ID',create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY uniq_trans_product (`transaction_id`, `product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
5.3 Tcc库存服务实现
因为我们现在要实现TCC模式,需要使用到 try , confirm ,cancel 三个方法,所以需要重新定义库存服务:
syntax = "proto3";package pb;
option go_package="./pb";// 库存查询请求
message ProductStockReq {int64 product_id = 1;
}// 库存查询响应
message ProductStockResp {int64 product_id = 1;int32 amount = 2;
}// 扣减库存请求
message DeductStockReq {int64 product_id = 1;int32 amount = 2;
}// 扣减库存响应
message DeductStockResp {bool success = 1;
}// TCC事务相关
message TccReq {string transaction_id = 1;int64 product_id = 2;int32 amount = 3;float money = 4;
}message TccResp {bool success = 1;
}// 库存服务
service Stock {// 查询商品库存rpc GetProductStock(ProductStockReq) returns (ProductStockResp);// 普通扣减库存(非TCC模式)//rpc DeductStock(DeductStockReq) returns (DeductStockResp);// TCC模式相关接口rpc TryDeductStock(TccReq) returns (TccResp); // Try阶段rpc ConfirmDeductStock(TccReq) returns (TccResp); // Confirm阶段rpc CancelDeductStock(TccReq) returns (TccResp); // Cancel阶段
}
切换到/stock下,使用goctl生成代码:
goctl rpc protoc stock.proto --go_out=. --go-grpc_out=. --zrpc_out=.
切换到 stock_svr/internal/model
目录下,生成model代码:
goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_stock" -table="stock" -dir .goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_stock" -table="stock_tcc" -dir .
修改stock.yaml
:
Name: stock.rpc
ListenOn: 0.0.0.0:8080
Etcd:Hosts:- 127.0.0.1:2379Key: stock.rpcDB:DataSource: root:123456@tcp(127.0.0.1:33069)/dtm_stock?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghai
修改stock_svr/internal/config/config.go
:
type Config struct {zrpc.RpcServerConfDB struct {DataSource string}
}
修改stock_svr/internal/svc/servicecontext.go
:
type ServiceContext struct {Config config.ConfigStockModel model.StockModel // 库存模型StockTccModel model.StockTccModel
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config: c,StockModel: model.NewStockModel(sqlx.NewMysql(c.DB.DataSource)), // 初始化库存模型StockTccModel: model.NewStockTccModel(sqlx.NewMysql(c.DB.DataSource)), // 初始化库存模型}
}
修改stock_svr/internal/model/stocktccmodel.go
增加新方法:
type (StockTccModel interface {stockTccModelwithSession(session sqlx.Session) StockTccModel//增加DeleteByTransProductId 方法DeleteByTransProductId(ctx context.Context, transactionId string, productId int64) error}
/***/// DeleteByTransProductId 根据事务ID和商品ID删除记录
func (m *customStockTccModel) DeleteByTransProductId(ctx context.Context, transactionId string, productId int64) error {query := fmt.Sprintf("delete from %s where transaction_id = ? and product_id = ?", m.table)_, err := m.conn.ExecCtx(ctx, query, transactionId, productId)return err
}
修改stock_svr/internal/logic/trydeductstocklogic.go
实现Try接口:
// TCC模式相关接口
func (l *TryDeductStockLogic) TryDeductStock(in *pb.TccReq) (*pb.TccResp, error) {// todo: add your logic here and delete this linefmt.Println("尝试扣减库存")//查询商品库存stockInfo, err := l.svcCtx.StockModel.FindOneByProductId(l.ctx, in.ProductId)if err != nil && err != model.ErrNotFound {//return &pb.TccResp{Success: false}, fmt.Errorf("没有该商品")//return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)//!一般数据库不会错误不需要dtm回滚,就让他一直重试//这时候就不要返回codes.Aborted, dtmcli.ResultFailure ,具体自己把控!!!return nil, status.Error(codes.Internal, err.Error())}// 检查库存是否足够if stockInfo.Amount < int64(in.Amount) {//return &pb.TccResp{Success: false}, fmt.Errorf("库存不足")//如果库存不足,直接返回失败,让dtm回滚//返回 codes.Aborted, dtmcli.ResultFailure 才能回滚return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}//如果钱小于100,直接返回失败,让dtm回滚if in.Money < 100 {return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}// 扣减库存stockInfo.Amount -= int64(in.Amount)err = l.svcCtx.StockModel.Update(l.ctx, stockInfo)if err != nil {//return &pb.TccResp{Success: false}, fmt.Errorf("库存扣除失败")//如果库存扣除失败,直接返回失败,不要让它一直重试,让dtm回滚 ,return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}// 记录冻结库存,用于回滚freeze := &model.StockTcc{ProductId: in.ProductId,FreezeAmount: int64(in.Amount),TransactionId: in.TransactionId,}_, err = l.svcCtx.StockTccModel.Insert(l.ctx, freeze)if err != nil {// 如果冻结记录失败,回滚库存stockInfo.Amount += int64(in.Amount)_ = l.svcCtx.StockModel.Update(l.ctx, stockInfo)//return &pb.TccResp{Success: false}, fmt.Errorf("记录冻结库存失败: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}return &pb.TccResp{Success: true}, nil
}
修改stock_svr/internal/logic/confirmdeductstocklogic.go
,实现 confirm 接口:
// Confirm阶段:确认扣减库存
func (l *ConfirmDeductStockLogic) ConfirmDeductStock(in *pb.TccReq) (*pb.TccResp, error) {// todo: add your logic here and delete this linefmt.Println("Confirm阶段:确认扣减库存")err := l.svcCtx.StockTccModel.DeleteByTransProductId(l.ctx, in.TransactionId, in.ProductId)if err != nil {logx.Errorf("确认扣减库存失败: %v", err)// 这里返回失败,DTM会进行重试//return &pb.TccResp{Success: false}, errreturn nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}return &pb.TccResp{Success: true}, nil
}
修改stock_svr/internal/logic/canceldeductstocklogic.go
,实现 cancel方法:
func (l *CancelDeductStockLogic) CancelDeductStock(in *pb.TccReq) (*pb.TccResp, error) {// todo: add your logic here and delete this linefmt.Println("取消扣减库存")// 查找冻结记录freeze, err := l.svcCtx.StockTccModel.FindOneByTransactionIdProductId(l.ctx, in.TransactionId, in.ProductId)if err != nil {if err == model.ErrNotFound {// 如果没有找到冻结记录,可能是已经处理过了,直接返回成功return &pb.TccResp{Success: true}, nil}/*在 TCC(Try-Confirm-Cancel)事务模式里,Cancel 方法本身的作用就是进行事务回滚,所以一般不需要再在 Cancel 方法里额外执行事务回滚操作*/return &pb.TccResp{Success: false}, fmt.Errorf("查询冻结记录失败: %v", err)//return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)}// 恢复库存stockInfo, err := l.svcCtx.StockModel.FindOneByProductId(l.ctx, in.ProductId)if err != nil {return &pb.TccResp{Success: false}, fmt.Errorf("查询商品库存失败: %v", err)}stockInfo.Amount += freeze.FreezeAmounterr = l.svcCtx.StockModel.Update(l.ctx, stockInfo)if err != nil {return &pb.TccResp{Success: false}, fmt.Errorf("恢复库存失败: %v", err)}// 删除冻结记录err = l.svcCtx.StockTccModel.Delete(l.ctx, freeze.Id)if err != nil {logx.Errorf("删除冻结记录失败: %v", err)// 虽然删除冻结记录失败,但库存已经恢复,下次重试时不会重复恢复库存}return &pb.TccResp{Success: true}, nil
}
注意错误返回方法:
- codes.Aborted 一般表示操作被中止,而 dtmcli.ResultFailure 告知 DTM 操作失败,需要回滚整个事务。当 Confirm 阶段出现问题,且希望 DTM 回滚整个事务时,适合用这个返回。
适用场景:
确认操作无法完成,比如数据库记录不存在、数据状态不一致等,这种情况下需要 DTM 触发 Cancel 阶段来回滚事务。
- status.Error(codes.Internal, err.Error())
codes.Internal 表示服务器内部出错,一般意味着系统出现了临时故障,DTM 会持续重试该操作,直至成功或者达到最大重试次数。
适用场景:
操作失败是由临时问题引起的,像数据库连接暂时中断、网络抖动等,这种情况下希望 DTM 重试操作,直至成功。
5.4 订单服务Api实现
首先,创建订单服务的Proto定义文件order/order.api
:
```syntax = "v1"type (CreateOrderReq {UserId int64 `json:"userId"`ProductId int64 `json:"productId"`Amount int `json:"amount"`Money float64 `json:"money"`}CreateOrderResp {OrderId int64 `json:"orderId"`}OrderInfo {Id int64 `json:"id"`UserId int64 `json:"userId"`ProductId int64 `json:"productId"`Amount int `json:"amount"`Money float64 `json:"money"`Status int `json:"status"`CreateTime string `json:"createTime"`}GetOrderReq {OrderId int64 `json:"orderId"`}GetOrderResp {Order OrderInfo `json:"order"`}
)service Order {@handler CreateOrderpost /api/order/create (CreateOrderReq) returns (CreateOrderResp)@handler GetOrderpost /api/order/get (GetOrderReq) returns (GetOrderResp)
}
切换到api/order/下,使用goctl生成代码:
goctl api go -api .\order.api -dir .
切换到api/order/internal/model
下, 生成model代码:
goctl model mysql datasource -url="root:123456@tcp(127.0.0.1:33069)/dtm_order" -table="orders" -dir .
修改order.yaml
:
Name: Order
Host: 0.0.0.0
Port: 8888StockRpcConf:Etcd:Hosts:- 127.0.0.1:2379Key: stock.rpcDB:DataSource: root:123456@tcp(127.0.0.1:33069)/dtm_order?charset=utf8mb4&parseTime=true&loc=Asia%2FShanghaiDTM: etcd://127.0.0.1:2379/dtmservice
修改order/internal/config/config.go
:
type Config struct {rest.RestConfStockRpcConf zrpc.RpcClientConfDTM stringDB struct {DataSource string}
}
修改order/internal/svc/servicecontext.go
:
type ServiceContext struct {Config config.ConfigOrderModel model.OrdersModelStockRpc stock.Stock
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config: c,OrderModel: model.NewOrdersModel(sqlx.NewMysql(c.DB.DataSource)),StockRpc: stock.NewStock(zrpc.MustNewClient(c.StockRpcConf)),}
}
修改order/internal/logic/createorderlogic.go
:
func (l *CreateOrderLogic) CreateOrder(req *types.CreateOrderReq) (resp *types.CreateOrderResp, err error) {// todo: add your logic here and delete this line//使用tcc,让try去检查并扣减库存,confirm和cancel去处理库存的回滚/*// 查询商品库存stockResp, err := l.svcCtx.StockRpc.GetProductStock(l.ctx, &pb.ProductStockReq{ProductId: req.ProductId,})if err != nil && !errors.Is(err, model.ErrNotFound) {return nil, fmt.Errorf("查询商品库存失败: %v", err)}fmt.Println(stockResp.Amount)if stockResp.Amount < int32(req.Amount) {return nil, fmt.Errorf("库存不足")}*/order := &model.Orders{UserId: req.UserId,ProductId: req.ProductId,Amount: int64(req.Amount),Money: req.Money,Status: 0, // 0-创建中}result, err := l.svcCtx.OrderModel.Insert(l.ctx, order)if err != nil {return nil, fmt.Errorf("创建订单失败: %v", err)}orderId, err := result.LastInsertId()if err != nil {return nil, fmt.Errorf("获取订单ID失败: %v", err)}// 4. 使用DTM的TCC模式进行分布式事务处理dtmServer := l.svcCtx.Config.DTM // DTM服务地址stockServer, err := l.svcCtx.Config.StockRpcConf.BuildTarget()if err != nil {}// 生成全局事务IDgid := dtmgrpc.MustGenGid(dtmServer)// 创建TCC事务err = dtmgrpc.TccGlobalTransaction(dtmServer, gid, func(tcc *dtmgrpc.TccGrpc) error {req := &pb.TccReq{ProductId: req.ProductId,Amount: int32(req.Amount),Money: float32(req.Money),}r := &emptypb.Empty{} // 用于接收响应err := tcc.CallBranch(req,stockServer+"/pb.Stock/TryDeductStock",stockServer+"/pb.Stock/ConfirmDeductStock",stockServer+"/pb.Stock/CancelDeductStock",r)return err})if err != nil {return nil, fmt.Errorf("创建订单失败: %v", err)}// 5. 更新订单状态为已创建orderUpdate, err := l.svcCtx.OrderModel.FindOne(l.ctx, orderId)if err != nil {logx.Errorf("查询订单失败: %v", err)// 这里不返回错误,因为已经扣减了库存,订单状态可以通过其他方式修复} else {orderUpdate.Status = 1 // 1-已完成err = l.svcCtx.OrderModel.Update(l.ctx, orderUpdate)if err != nil {logx.Errorf("更新订单状态失败: %v", err)// 同上,不返回错误}}return &types.CreateOrderResp{OrderId: orderId,}, nil
}
5.5 测试TCC
先启动 stock 服务
go run stock.go
再启动order api
go run order.go
创建请求:
curl -X POST -H "Content-Type: application/json" -d '{"userId": 1, "productId": 1, "amount": 1 , "money" :200.10}' http://localhost:8888/api/order/create
去stock服务那边查看日志,可以看到 先启动try ,然后使用confirm
解下模拟失败请求,钱设置为99.9:
curl -X POST -H "Content-Type: application/json" -d '{"userId": 1, "productId": 1, "amount": 999, "money" :99.9}' http://localhost:8888/api/order/create
事务启动cancel 进行回滚
6.DTM屏障 ,Barrier机制
6.1 Barrier机制介绍
DTM的Barrier机制主要解决以下问题:
- 空补偿问题:在Saga模式中,如果正向操作还未执行,补偿操作就执行了,这是空补偿。
- 空回滚问题:在TCC模式中,如果Try操作未执行,Cancel操作就执行了,这是空回滚。
- 悬挂问题:在分布式事务中,可能由于网络延迟等原因,先收到了Cancel/Confirm操作,后收到Try操作,此时Try操作应该被拒绝,这是悬挂。
- 幂等: 由于任何一个请求都可能出现网络异常,出现重复请求,所有的分布式事务分支操作,都需要保证幂等性
Barrier的主要原理是在数据库中添加一个barrier表,并使用数据库事务保证原子性,记录每个全局事务、分支事务、操作类型的执行情况,从而防止以上问题的发生。
用Dtm官方提供的dtmcli.barrier.mysql.sql
sql语句来创建dtm_barrier
表
create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(id bigint(22) PRIMARY KEY AUTO_INCREMENT,trans_type varchar(45) default '',gid varchar(128) default '',branch_id varchar(128) default '',op varchar(45) default '',barrier_id varchar(45) default '',reason varchar(45) default '' comment 'the branch type who insert this record',create_time datetime DEFAULT now(),update_time datetime DEFAULT now(),key(create_time),key(update_time),UNIQUE key(gid, branch_id, op, barrier_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
6.2 使用Barrier改进服务
我们以之前的sage的例子,做简单的修改。
需要注意的是,在rpc的业务中,如果使用了barrier的话,那么在model中与db交互时候必须要用事务,并且一定要跟barrier用同一个事务
,到stock/model/stockmodel.go
,修改DeductStock
和AddStock
方法:
package modelimport ("database/sql""fmt""time""github.com/zeromicro/go-zero/core/stores/sqlx"
)var _ StockModel = (*customStockModel)(nil)/*
....
*/type (// StockModel 是stock表的模型StockModel interface {// DeductStock 扣减库存DeductStock(tx *sql.Tx, productId int64, amount int32) (sql.Result, error)//AddStock 增加库存(用于补偿)AddStock(tx *sql.Tx, productId int64, amount int32) (sql.Result, error)}// DeductStock 扣减库存
func (m *customStockModel) DeductStock(tx *sql.Tx, productId int64, amount int32) (sql.Result, error) {query := fmt.Sprintf("update %s set amount = amount - ? where product_id = ? and amount >= ?", m.table)return tx.Exec(query, amount, productId, amount)
}// AddStock 增加库存(用于补偿)
func (m *customStockModel) AddStock(tx *sql.Tx, productId int64, amount int32) (sql.Result, error){query := fmt.Sprintf("update %s set amount = amount + ? where product_id = ?", m.table)return tx.Exec(query, amount, productId)}
实现扣减库存逻辑stock/internal/logic/deduct_stock_logic.go
:
package logicimport ("context""github.com/dtm-labs/dtm/client/dtmcli""google.golang.org/grpc/codes""google.golang.org/grpc/status""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)
/*
....
*/
// 扣减库存func (l *DeductStockLogic) DeductStock(in *stock.DeductStockReq) (*stock.DeductStockResp, error) {// todo: add your logic here and delete this line//barrier防止空补偿、空悬挂等具体看dtm官网即可,别忘记加barrier表在当前库中,因为判断补偿与要执行的sql一起本地事务barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()if err != nil {//!!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!return nil, status.Error(codes.Internal, err.Error())}if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {_,err := l.svcCtx.StockModel.DeductStock(tx,in.ProductId, in.Amount)if err != nil {logx.Errorf("扣减库存失败: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) // 直接返回错误}return nil}); err != nil {//!!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!return nil, status.Error(codes.Internal, err.Error())}return &stock.DeductStockResp{Success: true,Message: "扣减库存成功",}, nil
}
修改补偿库存逻辑stock/internal/logic/compensate_stock_logic.go
:
package logicimport ("context""fmt""dtm_demo/rpc/stock/internal/svc""dtm_demo/rpc/stock/stock""github.com/zeromicro/go-zero/core/logx"
)/*
....
*/// 补偿库存
func (l *CompensateStockLogic) CompensateStock(in *stock.CompensateStockReq) (*stock.CompensateStockResp, error) {// todo: add your logic here and delete this linefmt.Println("收到补偿库存请求:", in.ProductId, in.Amount)//barrier防止空补偿、空悬挂等具体看dtm官网即可,别忘记加barrier表在当前库中,因为判断补偿与要执行的sql一起本地事务barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()if err != nil {//!!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!return nil, status.Error(codes.Internal, err.Error())}if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {_, err := l.svcCtx.StockModel.AddStock(tx, in.ProductId, in.Amount)if err != nil {logx.Errorf("补偿库存失败: %v", err)return nil, status.Error(codes.Aborted, dtmcli.ResultFailure) // 直接返回错误}return nil}); err != nil {//!!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!return nil, status.Error(codes.Internal, err.Error())}return &stock.CompensateStockResp{Success: true,Message: "补偿库存成功",}, nil
}